Issues with Command Propagation in Testing

I’m stuck on Stage 19 #YG4 redis

I’ve tried to make a client to test and the command propagation is working.

Here are my logs:

remote: [tester::#YG4] Running tests for Stage #YG4 (Replication - Command Processing)
remote: [tester::#YG4] Master is running on port 6379
remote: [tester::#YG4] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
remote: [tester::#YG4] [handshake] master: Waiting for replica to initiate handshake with "PING" command
remote: [tester::#YG4] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
remote: [tester::#YG4] [handshake] master: Received RESP array: ["PING"]
remote: [tester::#YG4] [handshake] Received ["PING"]
remote: [tester::#YG4] [handshake] master: Sent "PONG"
remote: [tester::#YG4] [handshake] master: Sent bytes: "+PONG\r\n"
remote: [tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
remote: [tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
remote: [tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "listening-port", "6380"]
remote: [tester::#YG4] [handshake] Received ["REPLCONF", "listening-port", "6380"]
remote: [tester::#YG4] [handshake] master: Sent "OK"
remote: [tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
remote: [tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF capa" command
remote: [tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
remote: [tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
remote: [tester::#YG4] [handshake] Received ["REPLCONF", "capa", "psync2"]
remote: [tester::#YG4] [handshake] master: Sent "OK"
remote: [tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
remote: [tester::#YG4] [handshake] master: Waiting for replica to send "PSYNC" command
remote: [tester::#YG4] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
remote: [tester::#YG4] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
remote: [tester::#YG4] [handshake] Received ["PSYNC", "?", "-1"]
remote: [tester::#YG4] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
remote: [tester::#YG4] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
remote: [tester::#YG4] [handshake] Sending RDB file...
remote: [tester::#YG4] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
remote: [tester::#YG4] [handshake] Sent RDB file.
remote: [tester::#YG4] [propagation] master: > SET foo 123
remote: [tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n"
remote: [tester::#YG4] [propagation] master: > SET bar 456
remote: [tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n"
remote: [tester::#YG4] [propagation] master: > SET baz 789
remote: [tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n"
remote: [tester::#YG4] [test] Getting key foo
remote: [tester::#YG4] [test] client: $ redis-cli GET foo
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
remote: [your_program] SET > foo > 123
remote: [your_program] GET > foo
remote: [tester::#YG4] [test] client: Received bytes: "$3\r\n123\r\n"
remote: [tester::#YG4] [test] client: Received RESP bulk string: "123"
remote: [tester::#YG4] [test] Received "123"
remote: [tester::#YG4] [test] Getting key bar
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] [test] Retrying... (1/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] [test] Retrying... (2/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] Retrying... (3/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] [test] Retrying... (4/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] [test] Retrying... (5/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] Expected simple string or bulk string, got NIL
remote: [tester::#YG4] Test failed
remote: [tester::#YG4] Terminating program
remote: [tester::#YG4] Program terminated successfully

And here’s a snippet of my code:

    public static void Main(string[] args)
    {

        parseArgs(args);

        Server server = new Server(serverRole, ip, port);
        ServerListener listener = new ServerListener(server);
        
    }

internal class Server
{
    public readonly string serverRole;
    public readonly IPAddress ip;
    public readonly int port;
    public readonly TcpListener connection;
    public readonly DataBank dataBank;


    public Server(string serverRole, IPAddress ip, int port)
    {
        this.serverRole = serverRole;
        this.ip = ip;
        this.port = port;
        this.dataBank = new DataBank();

        TcpListener tcpServer = new TcpListener(ip, port);
        tcpServer.Start();

        connection = tcpServer;
    }
}

internal class ServerListener
{
    public static List<Socket> listeningSockets = new List<Socket>();

    Server server;
    CommandManager cmdManager = new CommandManager();

    public ServerListener(Server server)
    {
        this.server = server;

        startListening();
    }

    public void startListening()
    {
        while (true)
        {

            lookForNewClient();

            foreach (var socket in listeningSockets)
            {
                if (socket.Connected)
                {
                    cmdManager.handle(socket, server.dataBank);
                }
            }
        }
    }

    public void lookForNewClient()
    {
        try
        {
            listeningSockets.Add(server.connection.AcceptSocket());
        }
        catch (Exception e)
        {

        }
    }
}

internal class CommandManager
{
    private static List<ICommand> commands = new List<ICommand>();        
    public CommandManager(){

        commands.Add(new Echo());
        commands.Add(new Ping());
        commands.Add(new Set());
        commands.Add(new Get());
        commands.Add(new Info());
        commands.Add(new ReplConf());
        commands.Add(new Psync());
}

    internal async void handle(Socket clientSocket, DataBank dataBank) {

    while (clientSocket.Connected)
        {

            var buffer = new byte[1024];
        await clientSocket.ReceiveAsync(buffer);

            string cmd = System.Text.Encoding.Default.GetString(buffer);

        if (cmd.StartsWith("+") || cmd.StartsWith("$"))
        {
            continue;
        }
            string[] atoms = RespParser.PARSE(cmd).ToArray();
            string cmdName = atoms[0];
            string[] cmdArgs = new string[atoms.Length - 1];
            Console.WriteLine(string.Join(" > ", atoms));
            Array.Copy(atoms, 1, cmdArgs, 0, cmdArgs.Length);

        foreach (ICommand command in commands)
        {
            if (command.getName().Equals(cmdName.ToUpper()) || command.getAlies().Contains(cmdName))
            {
                command.handle(new CommandContext(dataBank,cmdArgs,clientSocket));
                break;
            }
        }
        }

    }
}

internal class Set : ICommand
{
    public List<string> getAlies()
    {
        return new List<string>() { getName() };
    }

    public string getHelp()
    {
        return getName();
    }

    public string getName()
    {
        return "SET";
    }

    public void handle(CommandContext context)
    {

        Socket socket = context.socket;
        DataBank db = context.dataBank;
        string[] args = context.args;

        switch (args.Length){

            case 2:

                db.setData(args[0], args[1]);
                
                foreach(SlaveServer slave in SlaveServer.Slaves){
                    Socket slaveSocket = slave.socket;

                    slaveSocket.SendAsync(RespEncoder.ENCODE_ARRAY_AS_BYTEARRAY(new List<String>() { "SET", args[0], args[1] }));
                }

                if (!(MasterServer.Master != null && MasterServer.Master.socket.Equals(socket))) {
                    Console.WriteLine("+OK");
                    socket.SendAsync(RespEncoder.ENCODE_STRING_AS_BYTEARRAY("OK"));
                }

                return;

            case 4:

                switch (args[2].ToUpper()){

                    case "PX":

                        if (int.TryParse(args[3], out int ms)){

                            db.setData(args[0], args[1], ms);


                            foreach (SlaveServer slave in SlaveServer.Slaves)
                            {
                                Socket slaveSocket = slave.socket;

                                slaveSocket.SendAsync(RespEncoder.ENCODE_ARRAY_AS_BYTEARRAY(new List<String>() { "SET", args[0], args[1], "PX", args[3] }));
                            }

                            if (!(MasterServer.Master != null && MasterServer.Master.socket.Equals(socket)))
                            {
                                socket.SendAsync(RespEncoder.ENCODE_STRING_AS_BYTEARRAY("OK"));
                            }

                            return;
                        }

                        break;

                    case "EX":

                        if (int.TryParse(args[3], out int s)){

                            db.setData(args[0], args[1], s*1000);

                            foreach (SlaveServer slave in SlaveServer.Slaves)
                            {
                                Socket slaveSocket = slave.socket;

                                slaveSocket.SendAsync(RespEncoder.ENCODE_ARRAY_AS_BYTEARRAY(new List<String>() { "SET", args[0], args[1], "EX", args[3] }));
                            }

                            if (!(MasterServer.Master != null && MasterServer.Master.socket.Equals(socket)))
                                socket.SendAsync(RespEncoder.ENCODE_STRING_AS_BYTEARRAY("OK"));

                            return;
                        }

                        break;

                    default:

                        break;
                }

                socket.SendAsync(RespEncoder.ECODED_NULL_BULK_STRING_AS_BYTEARRAY());
                return;

            default:

                socket.SendAsync(RespEncoder.ECODED_NULL_BULK_STRING_AS_BYTEARRAY());
                return;
        }
    }
}

internal class Get : ICommand
{
    public List<string> getAlies()
    {
       return new List<string>() { getName() };
    }

    public string getHelp()
    {
        return getName();
    }

    public string getName()
    {
        return "GET";
    }

    public void handle(CommandContext context)
    {

        DataBank db = context.dataBank;
        string[] args = context.args;
        Socket socket = context.socket;
        string data = db.getData(args[0]);


        if (data != null)
        {
            socket.SendAsync(RespEncoder.ENCODE_BULK_STRING_AS_BYTEARRAY(data));
            return;
        }

        socket.SendAsync(RespEncoder.ECODED_NULL_BULK_STRING_AS_BYTEARRAY());
    }
}

Most likely, your assumption on how/where the master is sending the data is incorrect (and it does not conform to how you are sending data in your master/replica setting - the tests are not spawning your program as both master and replica).

I would recommend you log how you receive data, which should correspond to the logs from the test. If you’re missing the data in the logs, you’re ignoring the data transfer somewhere.

Hints:

  1. Master is sending the data over the handshake socket, not over the main socket for incoming connections.
  2. Master is sending the data in no particular order. You can receive the data right after the rdb_file, or on next read.
  3. Master is sending the data in no particular length. You can read 3 times to receive all data, or just read once to receive all three commands in one string.

Good luck :slight_smile:

1 Like

can there be a scenario where the rdb file goes along with psync_response command because when i am receiving the psync response i am getting the rdb file along with it
PSYNC successful. Replication ID: 75cd7bc10c49047e0d163660f3b90625b1af31dc, Offset: 0, Remaining Data: $88\r\nREDI
and i am parsing the rdb file there itself and the commands are following on over the same read ?
case parse_psync_response(data) do
{:ok, repl_id, offset, remaining_data} →
Logger.info(“PSYNC successful. Replication ID: #{repl_id}, Offset: #{offset}, Remaining Data: #{remaining_data}”)

        if remaining_data != "" do
          case parse_data(remaining_data, 0) do
            {:rdb_complete, rdb_data} ->
              Logger.info("Received complete RDB file of size #{byte_size(rdb_data)} bytes")
              {:commands, commands} ->
                Enum.each(commands, fn command ->
                  Logger.info("Executing command: #{inspect(command)}")
                  execute_replica_command(socket, command)
                end)
              {:error, reason} ->
                Logger.error("Failed to receive RDB file: #{inspect(reason)}")
              end
        end

in most of the cases i am getting the data like this and when it’s not coming, i am handling the case like this ?
defp receive_data(socket, buffer, expected_length) do
Logger.debug(“Receiving data. Buffer size: #{byte_size(buffer)}, Expected length: #{expected_length}”)
case :gen_tcp.recv(socket, 0, 5000) do
{:ok, data} →
Logger.debug(“Received raw bytes: #{inspect(data, limit: :infinity, binaries: :as_binaries)}”)
Logger.debug(“Received chunk of size: #{byte_size(data)} bytes”)
Logger.debug(“Received data (as string): #{inspect(data, charlists: :as_lists)}”)
Logger.info(“Received data: #{inspect(String.slice(data, 0, 100))}…”)
new_buffer = buffer <> data
Logger.debug(“New buffer size: #{byte_size(new_buffer)} bytes”)
case parse_data(new_buffer, expected_length) do
{:continue, remaining, new_expected_length} →
Logger.info(“Incomplete data, continuing to receive. New expected length: #{new_expected_length}”)
receive_data(socket, remaining, new_expected_length)
{:rdb_complete, rdb_data} →
Logger.info(“Received complete RDB file of size #{byte_size(rdb_data)} bytes”)
{:ok, :rdb_complete, rdb_data}
{:commands, commands} →
Logger.info(“Received complete command: #{inspect(commands)}”)
{:ok, :commands, commands}
end
{:error, reason} →
Logger.error(“Error receiving data: #{inspect(reason)}”)
{:error, reason}
end
end
now i am performing another read , now the rdb is parsing and commands are not comming over sometimes, and i am now getting it when you are mentioning over those three points, so do i have to prepare for 3 reads on beforehand when processing the data, then will the tests are going to pass ?

1 Like

Yep! This is something you’ll need to handle.

Even if a client does multiple TCP writes, when the other end performs a TCP read, it might get all those writes in a single read.

1 Like

so basically i have to handle for the following cases ? just tell me if i am wrong ?

  1. when the whole data comes in single read (psync_response, rdb_file and the three commands)
  2. when the rdb comes after the psync reponse, so you have to perform the read in order to receive that rdb file and commands may also come along with it or may not
  3. when the rdb comes and it process, you perfrom another read to process subsequent commands which can processed in a single read or another case where i have to read over 3 times, since threee commands are goingto place ? am i right in thinking these cases?
1 Like

@ProgMastermind correct, those are all valid cases. In terms of structuring your code, the clean way to handle this is: never read from the TCP socket unless you know when to stop reading. You should be always either read (a) till a delimiter, like \r\n or (b) a fixed number of bytes.

As an example, when trying to decode the following:

+OK\r\n.

  • You should first read a fixed byte to get the +.
  • You should then read until you encounter \r\n.
3 Likes

Thanks for the explanation @rohitpaulk , i have tried this approach, it’s working now, i have run test 5 times, now it’s running 5/5 times , Issue solved.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.