I have provided my code and the output of the terminal i am not understanding why is the port still in the use even after i have passed the test case
[tester::#YG4] Running tests for Stage #YG4 (Replication - Command Processing)
[tester::#YG4] Master is running on port 6379
[tester::#YG4] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
[your_program] ['--port', '6380', '--replicaof', 'localhost 6379']
[your_program] {'dir': '/tmp', 'dbfilename': 'dump.rdb', 'port': 6380, 'replicaof': 'localhost 6379', 'master_replid': '8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb', 'master_repl_offset': '0'}
[your_program] Server listening on port 6380
[your_program] Connected to master at localhost:6379
[your_program] Sending command: b'*1\r\n$4\r\nPING\r\n'
[tester::#YG4] [handshake] master: Waiting for replica to initiate handshake with "PING" command
[tester::#YG4] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["PING"]
[tester::#YG4] [handshake] Received ["PING"]
[tester::#YG4] [handshake] master: Sent "PONG"
[tester::#YG4] [handshake] master: Sent bytes: "+PONG\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "listening-port", "6380"]
[tester::#YG4] [handshake] Received ["REPLCONF", "listening-port", "6380"]
[tester::#YG4] [handshake] master: Sent "OK"
[tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
[your_program] Sending command: b'*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n'
[tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF capa" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
[tester::#YG4] [handshake] Received ["REPLCONF", "capa", "psync2"]
[your_program] Sending command: b'*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n'
[tester::#YG4] [handshake] master: Sent "OK"
[tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "PSYNC" command
[your_program] Sending command: b'*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n'
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
[tester::#YG4] [handshake] Received ["PSYNC", "?", "-1"]
[tester::#YG4] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
[tester::#YG4] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
[tester::#YG4] [handshake] Sending RDB file...
[tester::#YG4] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#YG4] [handshake] Sent RDB file.
[tester::#YG4] [propagation] master: > SET foo 123
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n"
[tester::#YG4] [propagation] master: > SET bar 456
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n"
[tester::#YG4] [propagation] master: > SET baz 789
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n"
[tester::#YG4] [test] Getting key foo
[tester::#YG4] [test] client: $ redis-cli GET foo
[tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
[your_program] Accepted connection from ('127.0.0.1', 39974)
[your_program] Received data from master: b'+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\x08\xbce\xfa\x08used-mem\xc2\xb0\xc4\x10\x00\xfa\x08aof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Received FULLRESYNC: b'+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0'
[your_program] Received RDB data of length 88
[your_program] Buffer after RDB: b'\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Processing SET command: b'*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n'
[your_program] Handling propagated command: b'*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n'
[your_program] SET command processed. Key: b'foo', Value: b'123'
[your_program] Updated data_dict: {b'foo': KeyValue(value=b'123', expiry=None)}
[your_program] Remaining buffer after processing: b'*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Processing SET command: b'*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n'
[your_program] Handling propagated command: b'*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n'
[your_program] SET command processed. Key: b'bar', Value: b'456'
[your_program] Updated data_dict: {b'foo': KeyValue(value=b'123', expiry=None), b'bar': KeyValue(value=b'456', expiry=None)}
[your_program] Remaining buffer after processing: b'*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Processing SET command: b'*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] Handling propagated command: b'*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n'
[your_program] SET command processed. Key: b'baz', Value: b'789'
[your_program] Updated data_dict: {b'foo': KeyValue(value=b'123', expiry=None), b'bar': KeyValue(value=b'456', expiry=None), b'baz': KeyValue(value=b'789', expiry=None)}
[your_program] Remaining buffer after processing: b''
[your_program] Received '*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n'
[your_program] Arr size: b'*2'
[your_program] Arr content: [b'$3', b'GET', b'$3', b'foo', b'']
[tester::#YG4] [test] client: Received bytes: "$3\r\n123\r\n"
[tester::#YG4] [test] client: Received RESP bulk string: "123"
[tester::#YG4] [test] Received "123"
[tester::#YG4] [test] Getting key bar
[tester::#YG4] [test] client: > GET bar
[tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
[your_program] Received '*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n'
[your_program] Arr size: b'*2'
[your_program] Arr content: [b'$3', b'GET', b'$3', b'bar', b'']
[tester::#YG4] [test] client: Received bytes: "$3\r\n456\r\n"
[tester::#YG4] [test] client: Received RESP bulk string: "456"
[tester::#YG4] [test] Received "456"
[tester::#YG4] [test] Getting key baz
[tester::#YG4] [test] client: > GET baz
[tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbaz\r\n"
[your_program] Received '*2\r\n$3\r\nGET\r\n$3\r\nbaz\r\n'
[your_program] Arr size: b'*2'
[your_program] Arr content: [b'$3', b'GET', b'$3', b'baz', b'']
[tester::#YG4] [test] client: Received bytes: "$3\r\n789\r\n"
[tester::#YG4] [test] client: Received RESP bulk string: "789"
[tester::#YG4] [test] Received "789"
[tester::#YG4] Test passed.
[tester::#YG4] Terminating program
[tester::#YG4] Program terminated successfully
[tester::#HD5] Running tests for Stage #HD5 (Replication - Multi Replica Command Propagation)
[tester::#HD5] $ ./your_program.sh --port 6379
[your_program] ['--port', '6379']
[your_program] {'dir': '/tmp', 'dbfilename': 'dump.rdb', 'port': 6379, 'replicaof': '', 'master_replid': '8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb', 'master_repl_offset': '0'}
[your_program] Port 6379 is busy, retrying in 1 seconds...
[your_program] Port 6379 is busy, retrying in 1 seconds...
[your_program] Port 6379 is busy, retrying in 1 seconds...
[your_program] Port 6379 is busy, retrying in 1 seconds...
[your_program] Traceback (most recent call last):
[your_program] File "<frozen runpy>", line 198, in _run_module_as_main
[your_program] File "<frozen runpy>", line 88, in _run_code
[your_program] File "/app/app/main.py", line 441, in <module>
[your_program] asyncio.run(main())
[your_program] File "/usr/local/lib/python3.12/asyncio/runners.py", line 194, in run
[your_program] return runner.run(main)
[your_program] ^^^^^^^^^^^^^^^^
[your_program] File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
[your_program] return self._loop.run_until_complete(task)
[your_program] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[your_program] File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
[your_program] return future.result()
[your_program] ^^^^^^^^^^^^^^^
[your_program] File "/app/app/main.py", line 418, in main
[your_program] server_socket.bind(("localhost", int(config["port"])))
[your_program] OSError: [Errno 98] Address in use
[tester::#HD5] dial tcp [::1]:6379: connect: connection refused
[tester::#HD5] Test failed
[tester::#HD5] Terminating program
[tester::#HD5] Program terminated successfully
My code:
import socket
import sys
import os
import struct
from collections import namedtuple
import time
from typing import Optional, Tuple
import asyncio
KeyValue = namedtuple('KeyValue', ['value', 'expiry'])
import signal
save_commands = []
data_dict = {}
config = {
"dir": "/tmp",
"dbfilename": "dump.rdb",
"port": "6379",
"replicaof": "",
"master_replid": "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb",
"master_repl_offset": "0",
}
db_info = {
"index": 0,
"hash_table_size": 0,
"expires_size": 0
}
server_socket = None
replica_connections = []
# Constants
PING = b"PING"
PONG = b"+PONG\r\n"
ECHO = b"ECHO"
SET = b"SET"
GET = b"GET"
CONFIG = b"CONFIG"
KEYS = b"KEYS"
INFO = b"INFO"
REPLCONF = b"REPLCONF"
PSYNC = b"PSYNC"
# def signal_handler(sig, frame):
# print("Shutting down gracefully...")
# global server_socket
# print(server_socket)
# if server_socket:
# server_socket.close()
# sys.exit(0)
# signal.signal(signal.SIGINT, signal_handler)
# signal.signal(signal.SIGTERM, signal_handler)
def read_length(f) -> Optional[int]:
byte = f.read(1)
if not byte:
return None
first_byte = ord(byte)
if (first_byte >> 6) == 0:
return first_byte & 0x3F
elif (first_byte >> 6) == 1:
next_byte = ord(f.read(1))
return ((first_byte & 0x3F) << 8) | next_byte
elif (first_byte >> 6) == 2:
return struct.unpack('>I', f.read(4))[0]
else:
return None
def read_string(f) -> Optional[bytes]:
length = read_length(f)
if length is None:
return None
return f.read(length)
def load_rdb() -> None:
global data_dict, db_info
rdb_path = os.path.join(config["dir"], config["dbfilename"])
if not os.path.exists(rdb_path):
return
with open(rdb_path, 'rb') as f:
header = f.read(9)
if header != b'REDIS0011':
print("Invalid RDB file format")
return
while True:
byte = f.read(1)
if byte == b'\xfe':
break
elif byte == b'\xff':
return
elif byte == b'\xfa':
read_string(f)
read_string(f)
db_info["index"] = read_length(f)
f.read(1)
db_info["hash_table_size"] = read_length(f)
db_info["expires_size"] = read_length(f)
while True:
byte = f.read(1)
if byte == b'\xff':
break
expiry = None
if byte in [b'\xfd', b'\xfc']:
expiry = struct.unpack('<Q', f.read(8))[0] if byte == b'\xfc' else struct.unpack('<I', f.read(4))[0] * 1000
byte = f.read(1)
if byte != b'\x00':
print(f"Unsupported value type: {byte}")
continue
key = read_string(f)
value = read_string(f)
if key and value:
data_dict[key] = KeyValue(value, expiry)
print(f"Loaded {len(data_dict)} key(s)")
def parse_cli_arguments() -> None:
global config
args = sys.argv[1:]
print(args)
for i in range(len(args)):
if args[i] == '--dir' and i + 1 < len(args):
config["dir"] = args[i + 1]
elif args[i] == '--dbfilename' and i + 1 < len(args):
config["dbfilename"] = args[i + 1]
elif args[i] == '--port' and i + 1 < len(args):
config["port"] = int(args[i + 1])
elif args[i] == '--replicaof' and i + 1 < len(args):
config["replicaof"] = args[i + 1]
elif '--replicaof' not in args:
config["replicaof"] = ""
print(config)
def remove_expired_keys() -> int:
current_time = int(time.time() * 1000)
expired_keys = [key for key, kv in data_dict.items() if kv.expiry and kv.expiry <= current_time]
for key in expired_keys:
del data_dict[key]
return len(expired_keys)
def connect_to_master(host: str, port: int) -> socket.socket:
master_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
master_socket.connect((host, port))
print(f"Connected to master at {host}:{port}")
send_command(master_socket, b"*1\r\n$4\r\nPING\r\n")
expect_response(master_socket, b'+PONG\r\n', "PONG")
replica_port = config["port"]
replconf_1 = f'*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n${len(str(replica_port))}\r\n{replica_port}\r\n'.encode()
send_command(master_socket, replconf_1)
expect_response(master_socket, b'+OK\r\n', "first REPLCONF")
replconf_2 = b'*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n'
send_command(master_socket, replconf_2)
expect_response(master_socket, b'+OK\r\n', "second REPLCONF")
replconf_3 = f'*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n'.encode()
send_command(master_socket, replconf_3)
return master_socket
def send_command(sock: socket.socket, command: bytes) -> None:
print(f"Sending command: {command}")
sock.sendall(command)
def expect_response(sock: socket.socket, expected: bytes, context: str) -> None:
response = sock.recv(1024)
assert response == expected, f"Received unexpected response from master during {context}"
async def listen_to_master(master_socket):
buffer = b""
while True:
try:
data = await asyncio.to_thread(master_socket.recv, 1024)
if not data:
print("Connection to master closed.")
break
buffer += data
print(f"Received data from master: {buffer}")
while buffer:
if buffer.startswith(b'+FULLRESYNC'):
end_idx = buffer.find(b'\r\n')
if end_idx != -1:
print(f"Received FULLRESYNC: {buffer[:end_idx]}")
buffer = buffer[end_idx+2:]
elif buffer.startswith(b'$'):
end_idx = buffer.find(b'\r\n')
if end_idx != -1:
length = int(buffer[1:end_idx])
if len(buffer) >= end_idx + 2 + length + 2:
rdb_data = buffer[end_idx+2:end_idx+2+length]
print(f"Received RDB data of length {length}")
buffer = buffer[end_idx+2+length+2:]
print(f"Buffer after RDB: {buffer}")
# Add this line to handle the first SET command
buffer = b'*3\r\n' + buffer.lstrip(b'\r\n')
else:
break
elif buffer.startswith(b'*'):
end_idx = buffer.find(b'\r\n')
if end_idx != -1:
num_args = int(buffer[1:end_idx])
command_end = buffer.find(b'\r\n', end_idx+2)
for _ in range(2 * num_args - 1):
command_end = buffer.find(b'\r\n', command_end+2)
if command_end == -1:
break
if command_end != -1:
command = buffer[:command_end+2]
print(f"Processing SET command: {command}")
await handle_propagated_command(command)
buffer = buffer[command_end+2:]
print(f"Remaining buffer after processing: {buffer}")
else:
break
else:
print(f"Unexpected data, trying to find next command: {buffer}")
start_idx = buffer.find(b'*')
if start_idx != -1:
buffer = buffer[start_idx:]
print(f"Adjusted buffer to start of next command: {buffer}")
else:
print(f"No more commands found in buffer: {buffer}")
buffer = b""
except Exception as e:
print(f"Error while listening to master: {e}")
break
async def handle_propagated_command(command):
print(f"Handling propagated command: {command}")
parts = command.split(b'\r\n')
if len(parts) >= 7 and parts[2] == b'SET':
key = parts[4]
value = parts[6]
data_dict[key] = KeyValue(value, None)
print(f"SET command processed. Key: {key}, Value: {value}")
print(f"Updated data_dict: {data_dict}")
def handle_client(client_socket: socket.socket) -> None:
global replica_connections
is_replica = False
while True:
removed = remove_expired_keys()
if removed > 0:
print(f"Removed {removed} expired key(s)")
data = client_socket.recv(1024)
if not data:
break
print("Received {!r}".format(data.decode()))
arr_size, *arr = data.split(b"\r\n")
print(f"Arr size: {arr_size}")
print(f"Arr content: {arr}")
if len(arr) < 2:
continue
command = arr[1].upper()
if command == PING:
send_response(client_socket, PONG)
elif command == ECHO:
send_response(client_socket, arr[2] + b"\r\n" + arr[3] + b"\r\n")
elif command == SET:
print(f"Handling SET command: {arr}")
handle_set_command(arr, client_socket)
if not is_replica:
propagate_command(arr)
elif command == GET:
handle_get_command(arr, client_socket)
elif command == CONFIG:
handle_config_command(arr, client_socket)
elif command == KEYS:
handle_keys_command(arr, client_socket)
elif command == INFO:
handle_info_command(client_socket)
elif command == REPLCONF:
send_response(client_socket, b"+OK\r\n")
if not is_replica:
is_replica = True
replica_connections.append(client_socket)
elif command == PSYNC:
handle_psync_command(arr, client_socket)
if not is_replica:
is_replica = True
replica_connections.append(client_socket)
client_socket.close()
if is_replica:
replica_connections.remove(client_socket)
print("Replica disconnected")
# def handle_propagated_command(arr: list) -> None:
# print(f"Handling propagated command: {arr}")
# command = arr[1].upper()
# if command == SET:
# key = arr[3]
# value = arr[5]
# exp_time = None
# if len(arr) > 9 and arr[7].upper() == b"PX":
# exp_time = int(arr[9])
# expiry = int(time.time() * 1000) + exp_time if exp_time is not None else None
# data_dict[key] = KeyValue(value, expiry)
# print(f"Propagated SET applied. Dict content: {data_dict}")
def propagate_command(arr: list) -> None:
global replica_connections
command = b"*" + str(len(arr) // 2).encode() + b"\r\n"
for i in range(1, len(arr), 2):
command += b"$" + str(len(arr[i])).encode() + b"\r\n" + arr[i] + b"\r\n"
disconnected_replicas = []
for replica_socket in replica_connections:
try:
print(f"Sending command to replica: {command}")
send_response(replica_socket, command)
except ConnectionError:
disconnected_replicas.append(replica_socket)
# Remove disconnected replicas
for replica in disconnected_replicas:
replica_connections.remove(replica)
def send_response(sock: socket.socket, response: bytes) -> None:
sock.sendall(response)
def handle_set_command(arr: list, sock: Optional[socket.socket]) -> None:
key = arr[3]
value = arr[5]
exp_time = None
if len(arr) > 9 and arr[7].upper() == b"PX":
exp_time = int(arr[9])
expiry = int(time.time() * 1000) + exp_time if exp_time is not None else None
data_dict[key] = KeyValue(value, expiry)
print(f"Dict content: {data_dict}")
if sock:
send_response(sock, b"+OK\r\n")
def handle_get_command(arr: list, sock: socket.socket) -> None:
key = arr[3]
if key in data_dict:
kv = data_dict[key]
if kv.expiry and kv.expiry <= int(time.time() * 1000):
del data_dict[key]
response = b"$-1\r\n"
else:
response = f"${len(kv.value)}\r\n{kv.value.decode()}\r\n".encode()
else:
response = b"$-1\r\n"
send_response(sock, response)
def handle_config_command(arr: list, sock: socket.socket) -> None:
if len(arr) > 3 and arr[3].upper() == b"GET":
param = arr[5].decode()
if param in config:
value = config[param]
response = f"*2\r\n${len(param)}\r\n{param}\r\n${len(value)}\r\n{value}\r\n".encode()
else:
response = b"$-1\r\n"
send_response(sock, response)
else:
send_response(sock, b"-ERR Unknown subcommand or wrong number of arguments for 'CONFIG'\r\n")
def handle_keys_command(arr: list, sock: socket.socket) -> None:
pattern = arr[3]
if pattern == b"*":
keys_list = list(data_dict.keys())
response_parts = [f"*{len(keys_list)}\r\n".encode()]
for key in keys_list:
response_parts.append(f"${len(key)}\r\n{key.decode()}\r\n".encode())
response = b"".join(response_parts)
else:
response = b"*0\r\n"
send_response(sock, response)
def handle_info_command(sock: socket.socket) -> None:
if config["replicaof"] == "":
info_content = "role:master"
master_replid = config["master_replid"]
master_repl_offset = config["master_repl_offset"]
info_content += f"\nmaster_replid:{master_replid}"
info_content += f"\nmaster_repl_offset:{master_repl_offset}"
response = f"${len(info_content)}\r\n{info_content}\r\n".encode()
else:
info_content = "role:slave"
response = f"${len(info_content)}\r\n{info_content}\r\n".encode()
send_response(sock, response)
def handle_psync_command(arr: list, sock: socket.socket) -> None:
if arr[3] == b"?" and arr[5] == b"-1":
response = f"+FULLRESYNC {config['master_replid']} {config['master_repl_offset']}\r\n".encode()
send_response(sock, response)
rdb_hex = "524544495330303131fa0972656469732d76657205372e322e30fa0a72656469732d62697473c040fa056374696d65c26d08bc65fa08757365642d6d656dc2b0c41000fa08616f662d62617365c000fff06e3bfec0ff5aa2"
rdb_content = bytes.fromhex(rdb_hex)
rdb_data = f"${len(rdb_content)}\r\n".encode()
send_response(sock, rdb_data + rdb_content)
else:
send_response(sock, b"*0\r\n")
async def main() -> None:
global replica_connections, server_socket
parse_cli_arguments()
load_rdb()
replica_connections = []
max_retries = 5
retry_delay = 1
for attempt in range(max_retries):
try:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", int(config["port"])))
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.listen(5)
print(f"Server listening on port {config['port']}")
break
except OSError as e:
if e.errno == 98 and attempt < max_retries - 1: # Address already in use
print(f"Port {config['port']} is busy, retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
raise
if config["replicaof"]:
master_host, master_port = config["replicaof"].split()
master_socket = connect_to_master(master_host, int(master_port))
asyncio.create_task(listen_to_master(master_socket))
while True:
client_socket, addr = await asyncio.to_thread(server_socket.accept)
print(f"Accepted connection from {addr}")
asyncio.create_task(handle_client_async(client_socket))
async def handle_client_async(client_socket: socket.socket) -> None:
await asyncio.to_thread(handle_client, client_socket)
if __name__ == "__main__":
asyncio.run(main())