Replication (#YG4) connection refused sometimes

I’m stuck on Command Processing (stage 13).

I’ve was getting this error constantly when using the old version of my code where I received commands from the master, decoded them, processed them and then started listening for other commands.

Logs

[replication-13] dial tcp [::1]:6380: connect: connection refused
[replication-13] Test failed (try setting 'debug: true' in your codecrafters.yml to see more details)

I assumed that it was due to the master sending new commands when I was not finished with the previous command, and thus the connection being refused. So I moved the processing into a separate thread for each command. Now it passes like 80% of the time but it still sometimes fails, which is very annoying.

What is the correct way to fix this?

And here’s some code snippets:

Main function

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--port", default=DEFAULT_PORT)
    parser.add_argument("--replicaof", nargs=2)

    args = parser.parse_args()
    port = int(args.port)
    is_master = args.replicaof is None

    store = dict()
    replicas = set()

    if not is_master:
        master_host = args.replicaof[0]
        master_port = int(args.replicaof[1])

        conn = socket.create_connection((master_host, master_port))
        connect_to_master_handshake(conn, port)

        master_thread = threading.Thread(
            target=respond, args=(conn, store, set(), is_master)
        )
        master_thread.start()

    server_socket = socket.create_server(("localhost", port), reuse_port=True)

    while True:
        client_socket, _ = server_socket.accept()

        t = threading.Thread(
            target=respond,
            args=(client_socket, store, replicas, is_master),
        )
        t.start()

Respond function:

def respond(
    conn: socket.socket,
    store: dict,
    replicas: set[socket.socket],
    is_master: bool = True,
):
    while True:
        data = conn.recv(DEFAULT_RECV_BUFFER).decode()
        if not data:
            continue

        commands = decode_redis_protocol(data)
        for params in commands:
            threading.Thread(
                target=process,
                args=(
                    params,
                    conn,
                    store,
                    replicas,
                    is_master,
                ),
            ).start()

@warpftl I think that overall this approach of using a separate thread for the replication logic makes sense.

The issue here might be that the connect & handshake steps are still run synchronously and not in the thread:

conn = socket.create_connection((master_host, master_port))
connect_to_master_handshake(conn, port)

Could you try moving these into the thread too? That way there’s no “blocking” actions before you get to creating server_socket and accepting connections.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.

Note: I’ve updated the title of this post to include the stage ID (#YG4). You can learn about the stages rename here: Upcoming change: Stages overhaul