I’m stuck on Stage #na2.
I’ve tried many iterations of the WAIT code. It currently handles some of the requests correctly but it always gets one of the tests wrong and ends up return 0 instead of the expected value of ACKs. I’m not sure what I’m missing but assuming I have something off with my timing / ack counting.
Here are my logs:
[tester::#NA2] Running tests for Stage #NA2 (Replication - WAIT with multiple commands)
...
[tester::#TU8] [handshake] replica-6: Received bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#TU8] [handshake] Received RDB file
[tester::#TU8] [test] client: $ redis-cli WAIT 3 500
[tester::#TU8] [test] client: Sent bytes: "*3\r\n$4\r\nWAIT\r\n$1\r\n3\r\n$3\r\n500\r\n"
[your_program] Handling process request - For Master
[your_program] Received request: *3
[your_program] $4
[your_program] WAIT
[your_program] $1
[your_program] 3
[your_program] $3
[your_program] 500
[your_program]
[your_program] Received List Request: *3
[your_program] $4
[your_program] WAIT
[your_program] $1
[your_program] 3
[your_program] $3
[your_program] 500
[your_program]
[your_program] Characters as integer: 3
[your_program] Handling WAIT request
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program]
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program]
[your_program] Waiting for response...
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program]
[your_program] Waiting for response...
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program]
[your_program] Waiting for response...
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Waiting for response...
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program]
[your_program] Sending REPLCONF GETACK message to replica
[your_program] Sending message to server: *3
[your_program] $8
[your_program] REPLCONF
[your_program] $6
[your_program] GETACK
[your_program] $1
[your_program] *
[your_program]
[your_program] Waiting for response...
[your_program] Waiting for response...
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Error sending REPLCONF GETACK message to replica: Timeout while reading from stream
[your_program] Finished waiting for acks: :0
[your_program]
[your_program] Sending message to server: :0
[your_program]
[your_program] Sent WAIT response to client
[your_program] Finished processing list request
[tester::#TU8] [test] client: Received bytes: ":0\r\n"
[tester::#TU8] [test] client: Received RESP integer: 0
[your_program] Connection closed
[tester::#TU8] Expected 6, got 0
[tester::#TU8] Test failed
[tester::#TU8] Terminating program
[tester::#TU8] Program terminated successfully
And here’s a snippet of my code:
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, mpsc};
use tokio::net::TcpStream;
use tokio::time::timeout;
use futures::future::join_all;
use crate::SharedState;
use crate::helpers::helpers::{get_next_element, send_message_to_server};
pub async fn handle_wait_request<'a>(
stream: Arc<Mutex<TcpStream>>,
lines: &'a mut std::str::Lines<'a>,
state: &'a Arc<Mutex<SharedState>>
) -> () {
println!("Handling WAIT request");
let min_number_of_acks = get_next_element(lines).parse::<usize>().unwrap_or(0);
let time_limit_millisecs = get_next_element(lines).parse::<u64>().unwrap_or(0);
let timeout_duration = Duration::from_millis(time_limit_millisecs);
let repl_ack_msg = "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n".to_string();
let (tx, mut rx) = mpsc::channel(32);
let total_number_of_replicas = {
let state_guard = state.lock().await;
state_guard.replicas.len()
};
let ack_futures = {
let state_guard = state.lock().await;
state_guard.replicas.iter().map(|(_id, replica)| {
let replica_stream = replica.stream.clone();
let msg = repl_ack_msg.clone();
let tx = tx.clone();
tokio::spawn(async move {
if let Err(err) = process_acks(replica_stream, &msg, &tx, time_limit_millisecs).await {
eprintln!("Failed to process acks: {}", err);
}
})
}).collect::<Vec<_>>()
};
let ack_task = tokio::spawn(async move {
join_all(ack_futures).await;
});
let mut number_of_acks = 0;
let start_time = tokio::time::Instant::now();
while number_of_acks < min_number_of_acks && number_of_acks < total_number_of_replicas {
if start_time.elapsed() >= timeout_duration {
break;
}
match timeout(Duration::from_millis(100), rx.recv()).await {
Ok(Some(_)) => {
number_of_acks += 1;
println!("Received ack: {}/{}", number_of_acks, min_number_of_acks);
},
Ok(None) => break, // Channel closed
Err(_) => continue, // Short timeout, continue waiting
}
}
// Cancel the ack task if it's still running
ack_task.abort();
let wait_response = format!(":{}\r\n", number_of_acks);
println!("Finished waiting for acks: {}", wait_response);
let mut stream_lock = stream.lock().await;
send_message_to_server(&mut stream_lock, &wait_response, false, 999999).await.unwrap();
println!("Sent WAIT response to client");
}
async fn process_acks(
stream: Arc<Mutex<TcpStream>>,
msg: &str,
tx: &mpsc::Sender<()>,
time_limit_millisecs: u64
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Sending REPLCONF GETACK message to replica");
let mut stream_lock = stream.lock().await;
if stream_lock.peer_addr().is_ok() {
let replica_timeout = Duration::from_millis(time_limit_millisecs);
match timeout(
replica_timeout,
send_message_to_server(&mut stream_lock, msg, true, time_limit_millisecs)
).await {
Ok(Ok(response)) => {
println!("Received response from replica: {}", response);
if response.starts_with("*3\r\n$8\r\nREPLCONF\r\n$3\r\nACK\r\n") {
tx.send(()).await?;
println!("Sent ACK to main task");
}
Ok(())
},
Ok(Err(err)) => {
println!("Error sending REPLCONF GETACK message to replica: {}", err);
Ok(())
},
Err(_) => {
println!("Timeout while waiting for replica response");
Ok(()) // We don't count this as an ack, but we don't want to stop the whole process
}
}
} else {
Err("Failed to send REPLCONF GETACK message to replica".into())
}
}
Any help or pointers would be greatly appreciated (including any improvements to the general code if it’s not matching best practices or anything that is not easy to read). Been hitting my head on the wall for a couple of weeks on this one. THANKS!