Command Processing #yg4 Program has terminated but the port is stil hogged!

I have provided my code and the output of the terminal i am not understanding why is the port still in the use even after i have passed the test case

[tester::#YG4] Running tests for Stage #YG4 (Replication - Command Processing)
[tester::#YG4] Master is running on port 6379
[tester::#YG4] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
[your_program] ['--port', '6380', '--replicaof', 'localhost 6379']
[your_program] {'dir': '/tmp', 'dbfilename': 'dump.rdb', 'port': 6380, 'replicaof': 'localhost 6379', 'master_replid': '8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb', 'master_repl_offset': '0'}
[your_program] Server listening on port 6380
[your_program] Connected to master at localhost:6379
[your_program] Sending command: b'*1\r\n$4\r\nPING\r\n'
[tester::#YG4] [handshake] master: Waiting for replica to initiate handshake with "PING" command
[tester::#YG4] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["PING"]
[tester::#YG4] [handshake] Received ["PING"]
[tester::#YG4] [handshake] master: Sent "PONG"
[tester::#YG4] [handshake] master: Sent bytes: "+PONG\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "listening-port", "6380"]
[tester::#YG4] [handshake] Received ["REPLCONF", "listening-port", "6380"]
[tester::#YG4] [handshake] master: Sent "OK"
[tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
[your_program] Sending command: b'*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n'
[tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF capa" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
[tester::#YG4] [handshake] Received ["REPLCONF", "capa", "psync2"]
[your_program] Sending command: b'*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n'
[tester::#YG4] [handshake] master: Sent "OK"
[tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "PSYNC" command
[your_program] Sending command: b'*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n'
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
[tester::#YG4] [handshake] Received ["PSYNC", "?", "-1"]
[tester::#YG4] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
[tester::#YG4] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
[tester::#YG4] [handshake] Sending RDB file...
[tester::#YG4] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#YG4] [handshake] Sent RDB file.
[tester::#YG4] [propagation] master: > SET foo 123
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n"
[tester::#YG4] [propagation] master: > SET bar 456
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n"
[tester::#YG4] [propagation] master: > SET baz 789
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n"
[tester::#YG4] [test] Getting key foo
[tester::#YG4] [test] client: $ redis-cli GET foo
[tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
[your_program] Accepted connection from ('127.0.0.1', 39974)
[your_program] Received data from master: b'+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\x08\xbce\xfa\x08used-mem\xc2\xb0\xc4\x10\x00\xfa\x08aof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Received FULLRESYNC: b'+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0'
[your_program] Received RDB data of length 88
[your_program] Buffer after RDB: b'\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Processing SET command: b'*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n'
[your_program] Handling propagated command: b'*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n'
[your_program] SET command processed. Key: b'foo', Value: b'123'
[your_program] Updated data_dict: {b'foo': KeyValue(value=b'123', expiry=None)}
[your_program] Remaining buffer after processing: b'*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Processing SET command: b'*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n'
[your_program] Handling propagated command: b'*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n'
[your_program] SET command processed. Key: b'bar', Value: b'456'
[your_program] Updated data_dict: {b'foo': KeyValue(value=b'123', expiry=None), b'bar': KeyValue(value=b'456', expiry=None)}
[your_program] Remaining buffer after processing: b'*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Processing SET command: b'*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Handling propagated command: b'*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] SET command processed. Key: b'baz', Value: b'789'
[your_program] Updated data_dict: {b'foo': KeyValue(value=b'123', expiry=None), b'bar': KeyValue(value=b'456', expiry=None), b'baz': KeyValue(value=b'789', expiry=None)}
[your_program] Remaining buffer after processing: b''
[your_program] Received '*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n'
[your_program] Arr size: b'*2'
[your_program] Arr content: [b'$3', b'GET', b'$3', b'foo', b'']
[tester::#YG4] [test] client: Received bytes: "$3\r\n123\r\n"
[tester::#YG4] [test] client: Received RESP bulk string: "123"
[tester::#YG4] [test] Received "123"
[tester::#YG4] [test] Getting key bar
[tester::#YG4] [test] client: > GET bar
[tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
[your_program] Received '*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n'
[your_program] Arr size: b'*2'
[your_program] Arr content: [b'$3', b'GET', b'$3', b'bar', b'']
[tester::#YG4] [test] client: Received bytes: "$3\r\n456\r\n"
[tester::#YG4] [test] client: Received RESP bulk string: "456"
[tester::#YG4] [test] Received "456"
[tester::#YG4] [test] Getting key baz
[tester::#YG4] [test] client: > GET baz
[tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbaz\r\n"
[your_program] Received '*2\r\n$3\r\nGET\r\n$3\r\nbaz\r\n'
[your_program] Arr size: b'*2'
[your_program] Arr content: [b'$3', b'GET', b'$3', b'baz', b'']
[tester::#YG4] [test] client: Received bytes: "$3\r\n789\r\n"
[tester::#YG4] [test] client: Received RESP bulk string: "789"
[tester::#YG4] [test] Received "789"
[tester::#YG4] Test passed.
[tester::#YG4] Terminating program
[tester::#YG4] Program terminated successfully

[tester::#HD5] Running tests for Stage #HD5 (Replication - Multi Replica Command Propagation)
[tester::#HD5] $ ./your_program.sh --port 6379
[your_program] ['--port', '6379']
[your_program] {'dir': '/tmp', 'dbfilename': 'dump.rdb', 'port': 6379, 'replicaof': '', 'master_replid': '8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb', 'master_repl_offset': '0'}
[your_program] Port 6379 is busy, retrying in 1 seconds...
[your_program] Port 6379 is busy, retrying in 1 seconds...
[your_program] Port 6379 is busy, retrying in 1 seconds...
[your_program] Port 6379 is busy, retrying in 1 seconds...
[your_program] Traceback (most recent call last):
[your_program]   File "<frozen runpy>", line 198, in _run_module_as_main
[your_program]   File "<frozen runpy>", line 88, in _run_code
[your_program]   File "/app/app/main.py", line 441, in <module>
[your_program]     asyncio.run(main())
[your_program]   File "/usr/local/lib/python3.12/asyncio/runners.py", line 194, in run
[your_program]     return runner.run(main)
[your_program]            ^^^^^^^^^^^^^^^^
[your_program]   File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
[your_program]     return self._loop.run_until_complete(task)
[your_program]            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[your_program]   File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
[your_program]     return future.result()
[your_program]            ^^^^^^^^^^^^^^^
[your_program]   File "/app/app/main.py", line 418, in main
[your_program]     server_socket.bind(("localhost", int(config["port"])))
[your_program] OSError: [Errno 98] Address in use
[tester::#HD5] dial tcp [::1]:6379: connect: connection refused
[tester::#HD5] Test failed
[tester::#HD5] Terminating program
[tester::#HD5] Program terminated successfully

My code:

import socket
import sys
import os
import struct
from collections import namedtuple
import time
from typing import Optional, Tuple
import asyncio
KeyValue = namedtuple('KeyValue', ['value', 'expiry'])
import signal
save_commands = []
data_dict = {}
config = {
    "dir": "/tmp",
    "dbfilename": "dump.rdb",
    "port": "6379",
    "replicaof": "",
    "master_replid": "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb",
    "master_repl_offset": "0",
}

db_info = {
    "index": 0,
    "hash_table_size": 0,
    "expires_size": 0
}
server_socket = None

replica_connections = []
# Constants
PING = b"PING"
PONG = b"+PONG\r\n"
ECHO = b"ECHO"
SET = b"SET"
GET = b"GET"
CONFIG = b"CONFIG"
KEYS = b"KEYS"
INFO = b"INFO"
REPLCONF = b"REPLCONF"
PSYNC = b"PSYNC"

# def signal_handler(sig, frame):
#     print("Shutting down gracefully...")
#     global server_socket
#     print(server_socket)
#     if server_socket:
#         server_socket.close()
#     sys.exit(0)


# signal.signal(signal.SIGINT, signal_handler)
# signal.signal(signal.SIGTERM, signal_handler)

def read_length(f) -> Optional[int]:
    byte = f.read(1)
    if not byte:
        return None
    
    first_byte = ord(byte)
    if (first_byte >> 6) == 0:
        return first_byte & 0x3F
    elif (first_byte >> 6) == 1:
        next_byte = ord(f.read(1))
        return ((first_byte & 0x3F) << 8) | next_byte
    elif (first_byte >> 6) == 2:
        return struct.unpack('>I', f.read(4))[0]
    else:
        return None

def read_string(f) -> Optional[bytes]:
    length = read_length(f)
    if length is None:
        return None
    return f.read(length)

def load_rdb() -> None:
    global data_dict, db_info
    rdb_path = os.path.join(config["dir"], config["dbfilename"])
    if not os.path.exists(rdb_path):
        return

    with open(rdb_path, 'rb') as f:
        header = f.read(9)
        if header != b'REDIS0011':
            print("Invalid RDB file format")
            return

        while True:
            byte = f.read(1)
            if byte == b'\xfe':
                break
            elif byte == b'\xff':
                return
            elif byte == b'\xfa':
                read_string(f)
                read_string(f)

        db_info["index"] = read_length(f)
        f.read(1)
        db_info["hash_table_size"] = read_length(f)
        db_info["expires_size"] = read_length(f)

        while True:
            byte = f.read(1)
            if byte == b'\xff':
                break

            expiry = None
            if byte in [b'\xfd', b'\xfc']:
                expiry = struct.unpack('<Q', f.read(8))[0] if byte == b'\xfc' else struct.unpack('<I', f.read(4))[0] * 1000
                byte = f.read(1)
            
            if byte != b'\x00':
                print(f"Unsupported value type: {byte}")
                continue

            key = read_string(f)
            value = read_string(f)
            if key and value:
                data_dict[key] = KeyValue(value, expiry)
    print(f"Loaded {len(data_dict)} key(s)")

def parse_cli_arguments() -> None:
    global config
    args = sys.argv[1:]
    print(args)
    for i in range(len(args)):
        if args[i] == '--dir' and i + 1 < len(args):
            config["dir"] = args[i + 1]
        elif args[i] == '--dbfilename' and i + 1 < len(args):
            config["dbfilename"] = args[i + 1]
        elif args[i] == '--port' and i + 1 < len(args):
            config["port"] = int(args[i + 1])
        elif args[i] == '--replicaof' and i + 1 < len(args):
            config["replicaof"] = args[i + 1]
        elif '--replicaof' not in args:
            config["replicaof"] = ""
    print(config)

def remove_expired_keys() -> int:
    current_time = int(time.time() * 1000)
    expired_keys = [key for key, kv in data_dict.items() if kv.expiry and kv.expiry <= current_time]
    for key in expired_keys:
        del data_dict[key]
    return len(expired_keys)

def connect_to_master(host: str, port: int) -> socket.socket:
    master_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    master_socket.connect((host, port))
    print(f"Connected to master at {host}:{port}")

    send_command(master_socket, b"*1\r\n$4\r\nPING\r\n")
    expect_response(master_socket, b'+PONG\r\n', "PONG")

    replica_port = config["port"]
    replconf_1 = f'*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n${len(str(replica_port))}\r\n{replica_port}\r\n'.encode()
    send_command(master_socket, replconf_1)
    expect_response(master_socket, b'+OK\r\n', "first REPLCONF")

    replconf_2 = b'*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n'
    send_command(master_socket, replconf_2)
    expect_response(master_socket, b'+OK\r\n', "second REPLCONF")

    replconf_3 = f'*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n'.encode()
    send_command(master_socket, replconf_3)
    return master_socket

def send_command(sock: socket.socket, command: bytes) -> None:
    print(f"Sending command: {command}")
    sock.sendall(command)

def expect_response(sock: socket.socket, expected: bytes, context: str) -> None:
    response = sock.recv(1024)
    assert response == expected, f"Received unexpected response from master during {context}"
async def listen_to_master(master_socket):
    buffer = b""
    while True:
        try:
            data = await asyncio.to_thread(master_socket.recv, 1024)
            if not data:
                print("Connection to master closed.")
                break
            buffer += data
            print(f"Received data from master: {buffer}")
            
            while buffer:
                if buffer.startswith(b'+FULLRESYNC'):
                    end_idx = buffer.find(b'\r\n')
                    if end_idx != -1:
                        print(f"Received FULLRESYNC: {buffer[:end_idx]}")
                        buffer = buffer[end_idx+2:]
                elif buffer.startswith(b'$'):
                    end_idx = buffer.find(b'\r\n')
                    if end_idx != -1:
                        length = int(buffer[1:end_idx])
                        if len(buffer) >= end_idx + 2 + length + 2:
                            rdb_data = buffer[end_idx+2:end_idx+2+length]
                            print(f"Received RDB data of length {length}")
                            buffer = buffer[end_idx+2+length+2:]
                            print(f"Buffer after RDB: {buffer}")
                            # Add this line to handle the first SET command
                            buffer = b'*3\r\n' + buffer.lstrip(b'\r\n')
                        else:
                            break
                elif buffer.startswith(b'*'):
                    end_idx = buffer.find(b'\r\n')
                    if end_idx != -1:
                        num_args = int(buffer[1:end_idx])
                        command_end = buffer.find(b'\r\n', end_idx+2)
                        for _ in range(2 * num_args - 1):
                            command_end = buffer.find(b'\r\n', command_end+2)
                            if command_end == -1:
                                break
                        if command_end != -1:
                            command = buffer[:command_end+2]
                            print(f"Processing SET command: {command}")
                            await handle_propagated_command(command)
                            buffer = buffer[command_end+2:]
                            print(f"Remaining buffer after processing: {buffer}")
                        else:
                            break
                else:
                    print(f"Unexpected data, trying to find next command: {buffer}")
                    start_idx = buffer.find(b'*')
                    if start_idx != -1:
                        buffer = buffer[start_idx:]
                        print(f"Adjusted buffer to start of next command: {buffer}")
                    else:
                        print(f"No more commands found in buffer: {buffer}")
                        buffer = b""
        except Exception as e:
            print(f"Error while listening to master: {e}")
            break
async def handle_propagated_command(command):
    print(f"Handling propagated command: {command}")
    parts = command.split(b'\r\n')
    if len(parts) >= 7 and parts[2] == b'SET':
        key = parts[4]
        value = parts[6]
        data_dict[key] = KeyValue(value, None)
        print(f"SET command processed. Key: {key}, Value: {value}")
        print(f"Updated data_dict: {data_dict}")
def handle_client(client_socket: socket.socket) -> None:
    global replica_connections
    is_replica = False

    while True:
        removed = remove_expired_keys()
        if removed > 0:
            print(f"Removed {removed} expired key(s)")

        data = client_socket.recv(1024)
        if not data:
            break
        print("Received {!r}".format(data.decode()))
        arr_size, *arr = data.split(b"\r\n")
        print(f"Arr size: {arr_size}")
        print(f"Arr content: {arr}")
        
        if len(arr) < 2:
            continue

        command = arr[1].upper()

        if command == PING:
            send_response(client_socket, PONG)
        elif command == ECHO:
            send_response(client_socket, arr[2] + b"\r\n" + arr[3] + b"\r\n")
        elif command == SET:
            print(f"Handling SET command: {arr}")
            handle_set_command(arr, client_socket)
            if not is_replica:
                propagate_command(arr)
        elif command == GET:
            handle_get_command(arr, client_socket)
        elif command == CONFIG:
            handle_config_command(arr, client_socket)
        elif command == KEYS:
            handle_keys_command(arr, client_socket)
        elif command == INFO:
            handle_info_command(client_socket)
        elif command == REPLCONF:
            send_response(client_socket, b"+OK\r\n")
            if not is_replica:
                is_replica = True
                replica_connections.append(client_socket)
        elif command == PSYNC:
            handle_psync_command(arr, client_socket)
            if not is_replica:
                is_replica = True
                replica_connections.append(client_socket)

    client_socket.close()
    if is_replica:
        replica_connections.remove(client_socket)
        print("Replica disconnected")

# def handle_propagated_command(arr: list) -> None:
#     print(f"Handling propagated command: {arr}")
#     command = arr[1].upper()
#     if command == SET:
#         key = arr[3]
#         value = arr[5]
#         exp_time = None
#         if len(arr) > 9 and arr[7].upper() == b"PX":
#             exp_time = int(arr[9])
        
#         expiry = int(time.time() * 1000) + exp_time if exp_time is not None else None
#         data_dict[key] = KeyValue(value, expiry)
#         print(f"Propagated SET applied. Dict content: {data_dict}")

def propagate_command(arr: list) -> None:
    global replica_connections
    command = b"*" + str(len(arr) // 2).encode() + b"\r\n"
    for i in range(1, len(arr), 2):
        command += b"$" + str(len(arr[i])).encode() + b"\r\n" + arr[i] + b"\r\n"
    
    disconnected_replicas = []
    for replica_socket in replica_connections:
        try:
            print(f"Sending command to replica: {command}")
            send_response(replica_socket, command)
        except ConnectionError:
            disconnected_replicas.append(replica_socket)
    
    # Remove disconnected replicas
    for replica in disconnected_replicas:
        replica_connections.remove(replica)

def send_response(sock: socket.socket, response: bytes) -> None:
    sock.sendall(response)

def handle_set_command(arr: list, sock: Optional[socket.socket]) -> None:
    key = arr[3]
    value = arr[5]
    exp_time = None 
    if len(arr) > 9 and arr[7].upper() == b"PX":
        exp_time = int(arr[9])

    expiry = int(time.time() * 1000) + exp_time if exp_time is not None else None
    data_dict[key] = KeyValue(value, expiry)
    print(f"Dict content: {data_dict}")
    if sock:
        send_response(sock, b"+OK\r\n")

def handle_get_command(arr: list, sock: socket.socket) -> None:
    key = arr[3]
    if key in data_dict:
        kv = data_dict[key]
        if kv.expiry and kv.expiry <= int(time.time() * 1000):
            del data_dict[key]
            response = b"$-1\r\n"
        else:
            response = f"${len(kv.value)}\r\n{kv.value.decode()}\r\n".encode()
    else:
        response = b"$-1\r\n"
    send_response(sock, response)

def handle_config_command(arr: list, sock: socket.socket) -> None:
    if len(arr) > 3 and arr[3].upper() == b"GET":
        param = arr[5].decode()
        if param in config:
            value = config[param]
            response = f"*2\r\n${len(param)}\r\n{param}\r\n${len(value)}\r\n{value}\r\n".encode()
        else:
            response = b"$-1\r\n"
        send_response(sock, response)
    else:
        send_response(sock, b"-ERR Unknown subcommand or wrong number of arguments for 'CONFIG'\r\n")

def handle_keys_command(arr: list, sock: socket.socket) -> None:
    pattern = arr[3]
    if pattern == b"*":
        keys_list = list(data_dict.keys())
        response_parts = [f"*{len(keys_list)}\r\n".encode()]
        for key in keys_list:
            response_parts.append(f"${len(key)}\r\n{key.decode()}\r\n".encode())
        response = b"".join(response_parts)
    else:
        response = b"*0\r\n"
    send_response(sock, response)

def handle_info_command(sock: socket.socket) -> None:
    if config["replicaof"] == "":
        info_content = "role:master"
        master_replid = config["master_replid"]
        master_repl_offset = config["master_repl_offset"]
        info_content += f"\nmaster_replid:{master_replid}"
        info_content += f"\nmaster_repl_offset:{master_repl_offset}"
        response = f"${len(info_content)}\r\n{info_content}\r\n".encode()
    else:
        info_content = "role:slave"
        response = f"${len(info_content)}\r\n{info_content}\r\n".encode()
    send_response(sock, response)

def handle_psync_command(arr: list, sock: socket.socket) -> None:
    if arr[3] == b"?" and arr[5] == b"-1":
        response = f"+FULLRESYNC {config['master_replid']} {config['master_repl_offset']}\r\n".encode()
        send_response(sock, response)
        rdb_hex = "524544495330303131fa0972656469732d76657205372e322e30fa0a72656469732d62697473c040fa056374696d65c26d08bc65fa08757365642d6d656dc2b0c41000fa08616f662d62617365c000fff06e3bfec0ff5aa2"
        rdb_content = bytes.fromhex(rdb_hex)
        rdb_data = f"${len(rdb_content)}\r\n".encode()
        send_response(sock, rdb_data + rdb_content)
    else:
        send_response(sock, b"*0\r\n")

async def main() -> None:
    global replica_connections, server_socket
    parse_cli_arguments()
    load_rdb()
    replica_connections = []
    max_retries = 5
    retry_delay = 1

    for attempt in range(max_retries):
        try:
            server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server_socket.bind(("localhost", int(config["port"])))
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server_socket.listen(5)
            print(f"Server listening on port {config['port']}")
            break
        except OSError as e:
            if e.errno == 98 and attempt < max_retries - 1:  # Address already in use
                print(f"Port {config['port']} is busy, retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
            else:
                raise
    if config["replicaof"]:
        master_host, master_port = config["replicaof"].split()
        master_socket = connect_to_master(master_host, int(master_port))
        asyncio.create_task(listen_to_master(master_socket))

    while True:
        client_socket, addr = await asyncio.to_thread(server_socket.accept)
        print(f"Accepted connection from {addr}")
        asyncio.create_task(handle_client_async(client_socket))
async def handle_client_async(client_socket: socket.socket) -> None:
    await asyncio.to_thread(handle_client, client_socket)
if __name__ == "__main__":
    asyncio.run(main())

@ShaSam2104 I tried running your code against the previous stages, but it’s no longer passing the first stage.

Suggestions:

  1. Use our CLI to test against previous stages by running:
codecrafters test --previous
  1. Focus on fixing the early stages first, as later stages depend on them.
1 Like

Hey thanks a lott for the reply. I realised it due to your comment one of the most initial test was getting failed due to bug i created got it figured now its working thanks a lott!!

2 Likes

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.