Replication Stage #YD3 Missing GEKACK

Hi there, I’ve been stuck for a while on stage 15 (ACKs with commands) of the Redis challenge. The problem I’m facing is that the replica does not seem to be receiving the REPLCONF GETACK * command from master.

I found a thread on a similar issue: https://forum.codecrafters.io/t/replication-stage-14-slave-receive-nothing-on-occasion/197/4. In that thread, the resolution was that a TCP read can contain multiple commands. I’ve already handled that in my code, so it does not seem to be the fix in my case.

What could be the issue?

Here are the logs from my program:

[replication-15] Running tests for Replication > Stage #15: ACKs with commands
[replication-15] Master is running on port 6379
[replication-15] $ ./spawn_redis_server.sh --port 6380 --replicaof "localhost 6379"
[replication-15] master: Waiting for replica to initiate handshake with "PING" command
[replication-15] master: Received bytes: "*1\r\n$4\r\nping\r\n"
[replication-15] master: Received RESP value: ["ping"]
[replication-15] Received ["ping"]
[replication-15] master: Sent "PONG"
[replication-15] master: Sent bytes: "+PONG\r\n"
[replication-15] master: Waiting for replica to send "REPLCONF listening-port 6380" command
[your_program] Logs from your program will appear here!
[your_program] Waiting for a client to connect...
[your_program] ===================================================
[your_program] Port 6380, message received from 4: +PONG\r\n
[your_program] Parsed 1 commands
[your_program] Command 0, has 7 bytes: PONG  
[your_program] Processing command: PONG  
[replication-15] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
[replication-15] master: Received RESP value: ["REPLCONF", "listening-port", "6380"]
[replication-15] Received ["REPLCONF", "listening-port", "6380"]
[replication-15] master: Sent "OK"
[replication-15] master: Sent bytes: "+OK\r\n"
[replication-15] master: Waiting for replica to send "REPLCONF capa" command
[your_program] Handling case 8 handshake part 2.1: REPLCONF 1
[your_program] ===================================================
[your_program] Port 6380, message received from 4: +OK\r\n
[your_program] Parsed 1 commands
[your_program] Command 0, has 5 bytes: OK  
[your_program] Processing command: OK  
[replication-15] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
[replication-15] master: Received RESP value: ["REPLCONF", "capa", "psync2"]
[replication-15] Received ["REPLCONF", "capa", "psync2"]
[replication-15] master: Sent "OK"
[replication-15] master: Sent bytes: "+OK\r\n"
[replication-15] master: Waiting for replica to send "PSYNC" command
[your_program] Handling case 9 handshake part 2.2: REPLCONF 2
[your_program] ===================================================
[your_program] Port 6380, message received from 4: +OK\r\n
[your_program] Parsed 1 commands
[your_program] Command 0, has 5 bytes: OK  
[your_program] Processing command: OK  
[replication-15] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
[replication-15] master: Received RESP value: ["PSYNC", "?", "-1"]
[replication-15] Received ["PSYNC", "?", "-1"]
[replication-15] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
[replication-15] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
[replication-15] Sending RDB file...
[replication-15] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[replication-15] Sent RDB file.
[replication-15] master: $ redis-cli REPLCONF GETACK *
[replication-15] master: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n"
[your_program] Handling case 10 handshake part 3: Replica sends PSYNC 1
[your_program] ===================================================
[your_program] Port 6380, message received from 4: +FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n
[your_program] Parsed 1 commands
[your_program] Command 0, has 56 bytes: FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0  
[your_program] Processing command: FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0  
[your_program] Skipped FULLRESYNC
[your_program] ===================================================
[your_program] Port 6380, message received from 4: $88\r\nREDIS0011�\tredis-ver7.2.0�\nredis-bits�@�ctime�m\b�e�\bused-mem°�
[your_program] Parsed 1 commands
[your_program] Command 0, has 72 bytes: $88 REDIS0011�\tredis-ver7.2.0�\nredis-bits�@�ctime�m\b�e�\bused-mem°� 
[your_program] Processing command: $88 REDIS0011�       redis-ver7.2.0�
[your_program] redis-bits�@�ctime��eused-mem°� 
[your_program] Skipped empty RDB
[your_program] ===================================================
[replication-15] Received: "" (no content received)
[replication-15]            ^ error
[replication-15] Error: Expected start of a new RESP2 value (either +, -, :, $ or *)
[replication-15] Test failed
[replication-15] Terminating program
[replication-15] Program terminated successfully

And here’s a snippet of my server code (only a small portion of the handler is shown):


int handle_client(int client_socket, ServerInfo &server_info, TimeStampedStringMap &store) {
    std::cout << "===================================================" << std::endl;
    char buffer[1024] = {};
    int recv_bytes = recv(client_socket, buffer, sizeof(buffer), 0);

    if (recv_bytes < 0) {
        std::cout << "Error receiving bytes\n";
        return 1;
    } else if (recv_bytes == 0) {
        std::cout << "Client disconnected\n";
        return 1;
    }

    std::string msg(buffer);
    std::cout << "Port " << server_info.port << ", message received from " << client_socket << ": ";
    write_string(msg);
    std::cout << '\n';

    if (msg == null_bulk_string) return 0;

    std::vector<std::pair<std::vector<std::string>, int>> commands = parse_message(msg);
    for (auto [command, num_bytes] : commands) {
        std::cout << "Processing command: ";
        for (auto s : command) std::cout << s << ' ';
        std::cout << std::endl;

        std::string keyword = command[0];
        std::transform(keyword.begin(), keyword.end(), keyword.begin(), toupper);

        // Ignores RDB for now
        if (keyword[0] == '$') {
            std::cout << "Skipped empty RDB" << std::endl;
            continue;
        } else if (keyword.find("FULLRESYNC") != std::string::npos) {
            std::cout << "Skipped FULLRESYNC" << std::endl;
            continue;
        }

        if (keyword == "PING") {
            std::cout << "Handling case 1 PING\n";
            ping_command(server_info, client_socket);
        } else if (keyword == "ECHO") {
            std::cout << "Handling case 2 ECHO\n";
            echo_command(command, client_socket);
        } else if (keyword == "SET") {
            std::cout << "Handling case 3 SET\n";
            for (int replica_fd : server_info.replica_connections) {
                propagate_command(msg, replica_fd);
            }
            set_command(command, client_socket, store, server_info);
        } else if (keyword == "GET") {
            std::cout << "Handling case 4 GET\n";
            get_command(command, client_socket, store);
        } else if (keyword == "INFO") {
            std::cout << "Handling case 5 INFO\n";
            info_command(server_info, client_socket);
        } else if (keyword == "REPLCONF") {
            std::cout << "Handling case 6 REPLCONF\n";
            replconf_command(server_info, client_socket);
        } else if (keyword == "PSYNC") {
            std::cout << "Handling case 7 master receives PSYNC\n";
            psync_command(command, server_info, client_socket);
        } else if (keyword == "PONG" && server_info.replication_stage == 1) {
            std::cout << "Handling case 8 handshake part 2.1: REPLCONF 1\n";
            replconf_listening(server_info);
        } else if (keyword == "OK" && server_info.replication_stage == 2) {
            std::cout << "Handling case 9 handshake part 2.2: REPLCONF 2\n";
            replconf_capa(server_info);
        } else if (keyword == "OK" && server_info.replication_stage == 3) {
            std::cout << "Handling case 10 handshake part 3: Replica sends PSYNC 1\n";
            psync(server_info);
        } else {
            std::cout << "Handling else case: Do nothing\n";
            if (client_socket != server_info.master_fd) {
                reply_null(client_socket);
            }
        }

        if (client_socket == server_info.master_fd && server_info.replication_stage >= 4) {
            server_info.master_repl_offset += num_bytes;
        }
    }

    return 0;
}

int handshake_master(ServerInfo &server_info) {
    // Handshake 1a: PING master
    std::string master_host = server_info.master_host;
    if (master_host == "localhost") master_host = "127.0.0.1";
    int master_port = server_info.master_port;
    int master_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (master_fd == -1) {
        std::cerr << "Failed to create master server socket\n";
        return 1;
    }

    struct sockaddr_in master_addr;
    master_addr.sin_family = AF_INET;
    master_addr.sin_addr.s_addr = inet_addr(master_host.c_str());
    master_addr.sin_port = htons(master_port);
    if (connect(master_fd, (struct sockaddr *)&master_addr, sizeof(master_addr)) != 0) {
        std::cerr << "Failed to connect to master port " << master_host << ':' << master_port << '\n';
        return 1;
    }
    server_info.master_fd = master_fd;

    std::vector<std::string> arr = {"ping"};
    std::string message = encode_array(arr);
    if (send(master_fd, message.c_str(), message.size(), 0) < 0) {
        std::cerr << "Failed to ping master\n";
        return 1;
    }
    server_info.replication_stage++;
    return 0;
}

void replconf_listening(ServerInfo &server_info) {
    std::string message = encode_array({"REPLCONF", "listening-port", std::to_string(server_info.port)});
    send(server_info.master_fd, message.c_str(), message.size(), 0);
    server_info.replication_stage++;
}

void replconf_capa(ServerInfo &server_info) {
    std::string message = encode_array({"REPLCONF", "capa", "psync2"});
    send(server_info.master_fd, message.c_str(), message.size(), 0);
    server_info.replication_stage++;
}

void psync(ServerInfo &server_info) {
    std::string message = encode_array({"PSYNC", "?", "-1"});
    send(server_info.master_fd, message.c_str(), message.size(), 0);
    server_info.replication_stage++;
}

For anyone in the future facing this issue, I resolved it by reading the incoming TCP request byte by byte instead of 1024 bytes together. Seems to fix the issue, but I’m not exactly sure why the previous implementation was wrong, since I wasn’t clearing the bytes before printing them.

    // Receive FULLRESYNC
    while (buf[0] != '\n') {
        recv(master_fd, buf, 1, 0);
    }

    // Receive empty RDB
    bool size_found = false;
    int size = 0;
    while (!size_found) {
        recv(master_fd, buf, 1, 0);
        if (buf[0] == '$' || buf[0] == '\r')
            continue;
        else if (buf[0] == '\n')
            size_found = true;
        else
            size = size * 10 + (buf[0] - '0');
    }

    int recv_bytes = 0;
    while (recv_bytes < size) {
        recv(master_fd, buf, 1, 0);
        recv_bytes++;
    }
1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.

Note: I’ve updated the title of this post to include the stage ID (#YD3). You can learn about the stages rename here: Upcoming change: Stages overhaul.