Stuck on stage $na2

I’m stuck on Stage #na2.

I’ve tried many iterations of the WAIT code. It currently handles some of the requests correctly but it always gets one of the tests wrong and ends up return 0 instead of the expected value of ACKs. I’m not sure what I’m missing but assuming I have something off with my timing / ack counting.

Here are my logs:

[tester::#NA2] Running tests for Stage #NA2 (Replication - WAIT with multiple commands)
...
[tester::#TU8] [handshake] replica-6: Received bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#TU8] [handshake] Received RDB file
[tester::#TU8] [test] client: $ redis-cli WAIT 3 500
[tester::#TU8] [test] client: Sent bytes: "*3\r\n$4\r\nWAIT\r\n$1\r\n3\r\n$3\r\n500\r\n"
[your_program] Handling process request - For Master
[your_program] Received request: *3
[your_program] $4
[your_program] WAIT
[your_program] $1
[your_program] 3
[your_program] $3
[your_program] 500
[your_program] 
[your_program] Received List Request: *3
[your_program] $4
[your_program] WAIT
[your_program] $1
[your_program] 3
[your_program] $3
[your_program] 500
[your_program] 
[your_program] Characters as integer: 3
[your_program] Handling WAIT request
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program] 
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program] 
[your_program] Waiting for response...
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program] 
[your_program] Waiting for response...
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program] 
[your_program] Waiting for response...
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Waiting for response...
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program] 
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program] 
[your_program] Waiting for response...
[your_program] Waiting for response...
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Finished waiting for acks: :0
[your_program] 
[your_program] Sending message to server: :0
[your_program] 
[your_program] Sent WAIT response to client
[your_program] Finished processing list request
[tester::#TU8] [test] client: Received bytes: ":0\r\n"
[tester::#TU8] [test] client: Received RESP integer: 0
[your_program] Connection closed
[tester::#TU8] Expected 6, got 0
[tester::#TU8] Test failed
[tester::#TU8] Terminating program
[tester::#TU8] Program terminated successfully

And here’s a snippet of my code:

use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, mpsc};
use tokio::net::TcpStream;
use tokio::time::timeout;
use futures::future::join_all;

use crate::SharedState;
use crate::helpers::helpers::{get_next_element, send_message_to_server};

pub async fn handle_wait_request<'a>(
    stream: Arc<Mutex<TcpStream>>,
    lines: &'a mut std::str::Lines<'a>,
    state: &'a Arc<Mutex<SharedState>>
) -> () {
    println!("Handling WAIT request");

    let min_number_of_acks = get_next_element(lines).parse::<usize>().unwrap_or(0);
    let time_limit_millisecs = get_next_element(lines).parse::<u64>().unwrap_or(0);
    let timeout_duration = Duration::from_millis(time_limit_millisecs);

    let repl_ack_msg = "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n".to_string();
    
    let (tx, mut rx) = mpsc::channel(32);
    let total_number_of_replicas = {
        let state_guard = state.lock().await;
        state_guard.replicas.len()
    };
    
    let ack_futures = {
        let state_guard = state.lock().await;
        state_guard.replicas.iter().map(|(_id, replica)| {
            let replica_stream = replica.stream.clone();
            let msg = repl_ack_msg.clone();
            let tx = tx.clone();
            
            tokio::spawn(async move {
                if let Err(err) = process_acks(replica_stream, &msg, &tx, time_limit_millisecs).await {
                    eprintln!("Failed to process acks: {}", err);
                }
            })
        }).collect::<Vec<_>>()
    };

    let ack_task = tokio::spawn(async move {
        join_all(ack_futures).await;
    });

    let mut number_of_acks = 0;
    let start_time = tokio::time::Instant::now();
    
    while number_of_acks < min_number_of_acks && number_of_acks < total_number_of_replicas {
        if start_time.elapsed() >= timeout_duration {
            break;
        }

        match timeout(Duration::from_millis(100), rx.recv()).await {
            Ok(Some(_)) => {
                number_of_acks += 1;
                println!("Received ack: {}/{}", number_of_acks, min_number_of_acks);
            },
            Ok(None) => break, // Channel closed
            Err(_) => continue, // Short timeout, continue waiting
        }
    }

    // Cancel the ack task if it's still running
    ack_task.abort();

    let wait_response = format!(":{}\r\n", number_of_acks);

    println!("Finished waiting for acks: {}", wait_response);
    let mut stream_lock = stream.lock().await;
    send_message_to_server(&mut stream_lock, &wait_response, false, 999999).await.unwrap();

    println!("Sent WAIT response to client");
}

async fn process_acks(
    stream: Arc<Mutex<TcpStream>>,
    msg: &str,
    tx: &mpsc::Sender<()>,
    time_limit_millisecs: u64
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    println!("Sending REPLCONF GETACK message to replica");
    let mut stream_lock = stream.lock().await;
    if stream_lock.peer_addr().is_ok() {
        let replica_timeout = Duration::from_millis(time_limit_millisecs);
        match timeout(
            replica_timeout, 
            send_message_to_server(&mut stream_lock, msg, true, time_limit_millisecs)
        ).await {
            Ok(Ok(response)) => {
                println!("Received response from replica: {}", response);
                if response.starts_with("*3\r\n$8\r\nREPLCONF\r\n$3\r\nACK\r\n") {
                    tx.send(()).await?;
                    println!("Sent ACK to main task");
                }
                Ok(())
            },
            Ok(Err(err)) => {
                println!("Error sending REPLCONF GETACK message to replica: {}", err);
                Ok(()) 
            },
            Err(_) => {
                println!("Timeout while waiting for replica response");
                Ok(()) // We don't count this as an ack, but we don't want to stop the whole process
            }   
        }
    } else {
        Err("Failed to send REPLCONF GETACK message to replica".into())
    }
}

Any help or pointers would be greatly appreciated (including any improvements to the general code if it’s not matching best practices or anything that is not easy to read). Been hitting my head on the wall for a couple of weeks on this one. THANKS!

@alex-evans Apologies if I missed it in your code, but I believe you also need to take replication offsets into account.

Refer to the official Redis doc:

In the specific case of the implementation of WAIT , Redis remembers, for each client, the replication offset of the produced replication stream when a given write command was executed in the context of a given client. When WAIT is called Redis checks if the specified number of replicas already acknowledged this offset or a greater one.

Thanks @andy1li I’ll dig into it but I would think not handling the offsets correctly would give me too many ACKs but instead I’m getting 0 on this specific test. It really has the feel like I’m not handling the timeouts correctly but I’ve messed with those for awhile. Very likely I just have something dumb around all of this. I had forgotten about the offsets though so it still could be that I’m not handling that properly as well, like you suggested. I appreciate the response and I’ll keep looking. Thanks!

1 Like

Closing this thread due to inactivity. If you still need assistance, feel free to reopen or start a new discussion!