Issues with Socket connection after handling initial commands

Sending RDB file...
[tester::#YD3] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#YD3] [handshake] Sent RDB file.
[tester::#YD3] [test] master: $ redis-cli REPLCONF GETACK *
[tester::#YD3] [test] master: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n"
[your_program] 
[your_program] 17:52:21.680 [debug] Received data chunk: <<43, 70, 85, 76, 76, 82, 69, 83, 89, 78, 67, 32, 55, 53, 99, 100, 55, 98, 99, 49, 48, 99, 52, 57, 48, 52, 55, 101, 48, 100, 49, 54, 51, 54, 54, 48, 102, 51, 98, 57, 48, 54, 50, 53, 98, 49, 97, 102, 51, 49, ...>>, bytes: 186
[your_program] 
[your_program] 17:52:21.680 [debug] Received PSYNC response: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0", bytes read: 56
[your_program] 
[your_program] 17:52:21.680 [info] PSYNC successful. Replication ID: 75cd7bc10c49047e0d163660f3b90625b1af31dc, Offset: 0
[your_program] 
[your_program] 17:52:21.680 [debug] Received data chunk: <<36, 56, 56, 13, 10, 82, 69, 68, 73, 83, 48, 48, 49, 49, 250, 9, 114, 101, 100, 105, 115, 45, 118, 101, 114, 5, 55, 46, 50, 46, 48, 250, 10, 114, 101, 100, 105, 115, 45, 98, 105, 116, 115, 192, 64, 250, 5, 99, 116, 105, ...>>, bytes: 130
[your_program] 
[your_program] 17:52:21.680 [debug] Reading exact 88 bytes
[your_program] 
[your_program] 17:52:21.681 [debug] Read exact data: <<82, 69, 68, 73, 83, 48, 48, 49, 49, 250, 9, 114, 101, 100, 105, 115, 45, 118, 101, 114, 5, 55, 46, 50, 46, 48, 250, 10, 114, 101, 100, 105, 115, 45, 98, 105, 116, 115, 192, 64, 250, 5, 99, 116, 105, 109, 101, 194, 109, 8, ...>>
[your_program] 
[your_program] 17:52:21.681 [info] Received complete RDB file of size 88 bytes
[your_program] 
[your_program] 17:52:21.681 [debug] Starting to read command
[your_program] 
[your_program] 17:52:21.681 [debug] Received data chunk: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n", bytes: 37
[your_program] 
[your_program] 17:52:21.683 [debug] Received data chunk: "$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n", bytes: 33
[your_program] 
[your_program] 17:52:21.683 [debug] Reading exact 8 bytes
[your_program] 
[your_program] 17:52:21.684 [debug] Read exact data: "REPLCONF"
[your_program] 
[your_program] 17:52:21.684 [debug] Received data chunk: "\r\n$6\r\nGETACK\r\n$1\r\n*\r\n", bytes: 21
[your_program] 
[your_program] 17:52:21.684 [debug] Argument total bytes: 14
[your_program] 
[your_program] 17:52:21.684 [debug] Received data chunk: "$6\r\nGETACK\r\n$1\r\n*\r\n", bytes: 19
[your_program] 
[your_program] 17:52:21.684 [debug] Reading exact 6 bytes
[your_program] 
[your_program] 17:52:21.685 [debug] Read exact data: "GETACK"
[your_program] 
[your_program] 17:52:21.685 [debug] Received data chunk: "\r\n$1\r\n*\r\n", bytes: 9
[your_program] 
[your_program] 17:52:21.685 [debug] Argument total bytes: 12
[your_program] 
[your_program] 17:52:21.685 [debug] Received data chunk: "$1\r\n*\r\n", bytes: 7
[your_program] 
[your_program] 17:52:21.685 [debug] Reading exact 1 bytes
[your_program] 
[your_program] 17:52:21.685 [debug] Read exact data: "*"
[your_program] 
[your_program] 17:52:21.685 [debug] Received data chunk: "\r\n", bytes: 2
[your_program] 
[your_program] 17:52:21.685 [debug] Argument total bytes: 7
[your_program] 
[your_program] 17:52:21.685 [debug] Finished reading all arguments. Total bytes: 37
[your_program] 
[your_program] 17:52:21.688 [info] Received command: ["REPLCONF", "GETACK", "*"], total bytes: 37
[your_program] 
[your_program] 17:52:21.688 [info] Executing REPLCONF GETACK. Current offset: 0
[your_program] 
[your_program] 17:52:21.688 [info] Sent REPLCONF ACK response: "*3\r\n$8\r\nREPLCONF\r\n$3\r\nACK\r\n$1\r\n0\r\n"
[your_program] 
[your_program] 17:52:21.688 [debug] Starting to read command
[your_program] 
[your_program] 17:52:21.689 [error] Error receiving data: :emsgsize
[your_program] 
[your_program] 17:52:21.689 [error] Error reading command header: :emsgsize
[your_program] 
[your_program] 17:52:21.689 [error] Error reading command: :emsgsize
[your_program] 
[your_program] 17:52:21.689 [info] Handshake completed successfully
[tester::#YD3] [test] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$3\r\nACK\r\n$1\r\n0\r\n"
[tester::#YD3] [test] master: Received RESP array: ["REPLCONF", "ACK", "0"]
[tester::#YD3] [test] Received ["REPLCONF", "ACK", "0"]
[tester::#YD3] [propagation] master: > PING
[tester::#YD3] [propagation] master: Sent bytes: "*1\r\n$4\r\nPING\r\n"
[tester::#YD3] [test] master: > REPLCONF GETACK *
[tester::#YD3] [test] master: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n"
[tester::#YD3] write tcp 127.0.0.1:6379->127.0.0.1:36854: write: broken pipe
[tester::#YD3] Test failed
[tester::#YD3] Terminating program
[your_program] 
[your_program] 17:52:21.691 [notice] SIGTERM received - shutting down
[your_program] 
[tester::#YD3] Program terminated successfully

I am currently processing commands received over a socket, utilizing either a delimiter or a fixed byte count for reading. Initially, the commands are being processed successfully, as indicated by the logs.
However, after handling the first replconf command, I encounter an EMSGSIZE error when attempting to read from the socket again. This error is puzzling, as I am simply trying to read after processing the first command.
The occurrence of this error seems to prevent the handling of subsequent commands, such as PING. Additionally, when the master sends another replconf, it appears that the connection may be breaking.
I have researched the EMSGSIZE error and understand its general implications, but I am struggling to pinpoint the specific reason for its occurrence in my case.
Could you provide insights or suggestions on why this error might be happening and how I can resolve it?

@rohitpaulk this is my code snippet, which is responsible for reading data over connection.

defp handle_commands(socket) do
    case read_command(socket) do
      {:ok, command, command_bytes} ->
        Logger.info("Received command: #{inspect(command)}, total bytes: #{command_bytes}")
        execute_replica_command(socket, command, command_bytes)
        handle_commands(socket)
      {:error, :closed} ->
        Logger.info("Connection closed")
        :ok
      {:error, :timeout} ->
        handle_commands(socket)
      {:error, reason} ->
        Logger.error("Error reading command: #{inspect(reason)}")
        {:error, reason}
    end
  end


  defp read_command(socket) do
    Logger.debug("Starting to read command")
    case read_until_delimiter(socket, "\r\n") do
      {:ok, "*" <> num_args_str, first_line_bytes} ->
        num_args = String.to_integer(num_args_str)
        # Logger.debug("Command header: *#{num_args}, bytes: #{first_line_bytes}")
        read_command_args(socket, num_args, [], first_line_bytes)
      {:error, :timeout} ->
        {:error, :timeout}
        {:error, reason} ->
          Logger.error("Error reading command header: #{inspect(reason)}")
          {:error, reason}
    end
  end

  defp read_command_args(_socket, 0, acc, total_bytes) do
    Logger.debug("Finished reading all arguments. Total bytes: #{total_bytes}")
    {:ok, Enum.reverse(acc), total_bytes}
  end
  defp read_command_args(socket, num_args, acc, total_bytes) do
    # Logger.debug("Reading argument #{length(acc) + 1} of #{num_args + length(acc)}")
    case read_until_delimiter(socket, "\r\n") do
      {:ok, "$" <> len_str, bytes_read} ->
        len = String.to_integer(len_str)
        # Logger.debug("Argument length indicator: $#{len}, bytes: #{bytes_read}")
        case read_exact(socket, len) do
          {:ok, arg} ->
            # Logger.debug("Read argument: #{inspect(arg)}, bytes: #{len}")
            case read_until_delimiter(socket, "\r\n") do
              {:ok, "", more_bytes} ->
                new_total = total_bytes + bytes_read + len + more_bytes
                Logger.debug("Argument total bytes: #{new_total - total_bytes}")
                read_command_args(socket, num_args - 1, [arg | acc], new_total)
              {:error, reason} ->
                Logger.error("Error reading argument delimiter: #{inspect(reason)}")
                {:error, reason}
            end
          {:error, reason} ->
            Logger.error("Error reading argument content: #{inspect(reason)}")
            {:error, reason}
        end
      {:error, reason} ->
        Logger.error("Error reading argument length: #{inspect(reason)}")
        {:error, reason}
    end
  end

  defp read_until_delimiter(socket, delimiter) do
    # Logger.debug("Reading until delimiter: #{inspect(delimiter)}")
    read_until_delimiter(socket, delimiter, "", 0)
  end

  defp read_until_delimiter(socket, delimiter, acc, bytes_read) do
    case :gen_tcp.recv(socket, 0, 5000) do
      {:ok, data} ->
        Logger.debug("Received data chunk: #{inspect(data)}, bytes: #{byte_size(data)}")
        new_acc = acc <> data
        new_bytes_read = bytes_read + byte_size(data)
        case String.split(new_acc, delimiter, parts: 2) do
          [result, rest] ->
            # Logger.debug("Found delimiter. Result: #{inspect(result)}, remaining: #{byte_size(rest)} bytes")
            :gen_tcp.unrecv(socket, rest)
            {:ok, result, new_bytes_read - byte_size(rest)}
          [_] ->
            Logger.debug("Delimiter not found, continuing to read")
            read_until_delimiter(socket, delimiter, new_acc, new_bytes_read)
        end
      {:error, reason} ->
        Logger.error("Error receiving data: #{inspect(reason)}")
        {:error, reason}
    end
  end

  defp read_exact(socket, length) do
    Logger.debug("Reading exact #{length} bytes")
    case :gen_tcp.recv(socket, length) do
      {:ok, data} ->
        Logger.debug("Read exact data: #{inspect(data)}")
        {:ok, data}
      {:error, reason} ->
        Logger.error("Error reading exact bytes: #{inspect(reason)}")
        {:error, reason}
    end
  end

I believe I have resolved the issue, but I feel like I’m hacking the solution here. From my observations in the tests, it seems that when we send the response to “PSYNC”, we then send the rdb file and subsequently the commands. The errors I’ve been encountering so far stem from reading continuously from the connection while receiving the responses to these PSYNC responses, processing the rdb and commands, and then more commands. To address this, I’ve modified my approach. Since I know that the PSYNC response takes 56 bytes and the rdb takes 93 bytes, I now read up to these specified bytes for the respective responses and then start listening for the upcoming commands. This has resulted in the process working fine without errors. However, I’m uncertain whether this is a suitable or optimal approach to take. Could you provide some advice on this @rohitpaulk ?

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.