I’m stuck on Command Processing (stage 13).
I’ve was getting this error constantly when using the old version of my code where I received commands from the master, decoded them, processed them and then started listening for other commands.
Logs
[replication-13] dial tcp [::1]:6380: connect: connection refused
[replication-13] Test failed (try setting 'debug: true' in your codecrafters.yml to see more details)
I assumed that it was due to the master sending new commands when I was not finished with the previous command, and thus the connection being refused. So I moved the processing into a separate thread for each command. Now it passes like 80% of the time but it still sometimes fails, which is very annoying.
What is the correct way to fix this?
And here’s some code snippets:
Main function
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--port", default=DEFAULT_PORT)
parser.add_argument("--replicaof", nargs=2)
args = parser.parse_args()
port = int(args.port)
is_master = args.replicaof is None
store = dict()
replicas = set()
if not is_master:
master_host = args.replicaof[0]
master_port = int(args.replicaof[1])
conn = socket.create_connection((master_host, master_port))
connect_to_master_handshake(conn, port)
master_thread = threading.Thread(
target=respond, args=(conn, store, set(), is_master)
)
master_thread.start()
server_socket = socket.create_server(("localhost", port), reuse_port=True)
while True:
client_socket, _ = server_socket.accept()
t = threading.Thread(
target=respond,
args=(client_socket, store, replicas, is_master),
)
t.start()
Respond function:
def respond(
conn: socket.socket,
store: dict,
replicas: set[socket.socket],
is_master: bool = True,
):
while True:
data = conn.recv(DEFAULT_RECV_BUFFER).decode()
if not data:
continue
commands = decode_redis_protocol(data)
for params in commands:
threading.Thread(
target=process,
args=(
params,
conn,
store,
replicas,
is_master,
),
).start()