Can't send synchronization commands to Redis replica

I’m stuck on Stage #ZN8.

I’ve tried to store the stream the replica made during the handshake process and use it later to send commands, but I keep getting an error that says nothing was recieved.

Here are my logs:

[tester::#ZN8] replica: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
[your_program] Handshake stream: Mutex { data: PollEvented { io: Some(TcpStream { addr: 127.0.0.1:6379, peer: 127.0.0.1:43046, fd: 11 }) } }
[tester::#ZN8] replica: Received bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#ZN8] Received RDB file
[tester::#ZN8] client: $ redis-cli SET foo 123
[tester::#ZN8] client: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n"
[your_program] Propagating changes
[your_program] Serialized command: *3
[your_program] $3
[your_program] SET
[your_program] $3
[your_program] foo
[your_program] $3
[your_program] 123
[tester::#ZN8] Received: "" (no content received)
[tester::#ZN8]            ^ error
[tester::#ZN8] Error: Expected start of a new RESP2 value (either +, -, :, $ or *)
[tester::#ZN8] Test failed
[tester::#ZN8] Terminating program
[your_program] Writing to PollEvented { io: Some(TcpStream { addr: 127.0.0.1:6379, peer: 127.0.0.1:43046, fd: 11 }) }
[tester::#ZN8] Program terminated successfully

I don’t get why that log statement gets printed after the error. Is it failing to execute in time or something? Note how the stream is the same during the handshake and when I attempt to send the command to the replica

And here’s a snippet of my code:

pub async fn propagate_command_to_replica(stream: Arc<Mutex<TcpStream>>, command: &Command) {
    let serialized_command = serialize_command(command);
    println!("Serialized command: {}", serialized_command);
    std::io::stdout().flush().unwrap();
    let mut stream = stream.lock().await;
    println!("Writing to {:?}", stream);
    if let Err(e) = stream.write_all(serialized_command.as_bytes()).await {
        println!("Failed to write to stream: {}", e);
    }
    if let Err(e) = stream.flush().await {
        println!("Failed to flush stream: {}", e);
    }
    std::io::stdout().flush().unwrap();
}

My master calls this function whenever it runs into a write command and I know it executes at least once before the error (or at least gets called).

The listening-port argument is also set to 6380 which is different than the stream the handshake is performed on. But when I try to connect to port 6380 and store that connection for later use, I get the following:
Failed to connect to replica at 127.0.0.1:6380: Connection refused (os error 111)

I’d really appreciate any help. Thanks!

I don’t get why that log statement gets printed after the error. Is it failing to execute in time or something? Note how the stream is the same during the handshake and when I attempt to send the command to the replica

I haven’t taken a closer look at this yet, but just wanted to share: typically when you see a log after the “test failed” part, it’s because your program was blocked on something, and the SIGTERM signal we sent caused that lock to be released.

In this case, I’m guessing the blocking behaviour could be from stream.lock().await? This can be confirmed by adding logs around it (acquiring lock, acquired lock).

Thanks, that really helped. It seems that I was listening to the replica stream in an infinite loop, which requires a write lock in rust when using concurrency primitives. I just had to make sure I wasn’t listening to replica streams after the initial handshake and that solved the problem for me.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.