I’m stuck on Rust Redis Replication stage 18.
I’ve tried using channels to communicate thinking problem was because I was using Arc<Mutex<>> everywhere.
When I spin up my own implementation and multiple slaves, My code is more is less getting response that I expect.
Here are my logs:
[replication-18] replica-3: Received bytes: “$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2”
[replication-18] Received RDB file
[replication-18] client: $ redis-cli SET foo 123
[replication-18] client: Sent bytes: “*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n”
[your_program] 2024-04-11T17:54:38.543031Z DEBUG ThreadId(01) src/server/mod.rs:24: Got a request from: 127.0.0.1:48580
[replication-18] client: Received bytes: “+OK\r\n”
[replication-18] client: Received RESP value: “OK”
[replication-18] Received “OK”
[replication-18] client: $ redis-cli WAIT 1 500
[your_program] 2024-04-11T17:54:38.543359Z INFO ThreadId(02) src/database/mod.rs:79: Setting key: foo with value: 123
[replication-18] client: Sent bytes: “*3\r\n$4\r\nWAIT\r\n$1\r\n1\r\n$3\r\n500\r\n”
[replication-18] Testing Replica : 1
[replication-18] replica-1: Received bytes: “*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n”
[replication-18] replica-1: Received RESP value: [“SET”, “foo”, “123”]
[replication-18] Received [“SET”, “foo”, “123”]
[your_program] 2024-04-11T17:54:38.545308Z DEBUG ThreadId(02) src/cmd_processor/server_cmd_processor.rs:137: (inside of wait): Emitting ReplicationEvent::GetAck
[your_program] 2024-04-11T17:54:38.545454Z DEBUG ThreadId(02) src/replication/mod.rs:85: Sending GET ACK TO slave
[your_program] 2024-04-11T17:54:38.545478Z DEBUG ThreadId(02) src/replication/mod.rs:88: Writing to one slave
[your_program] 2024-04-11T17:54:38.545609Z DEBUG ThreadId(02) READING ACK FROM CLIENT: src/replication/mod.rs:92: Creating bufferred reader
[your_program] 2024-04-11T17:54:39.046217Z DEBUG ThreadId(02) READING ACK FROM CLIENT: src/cmd_processor/server_cmd_processor.rs:159: TIMEOUT or error
[replication-18] Received: “” (no content received)
[replication-18] ^ error
[replication-18] Error: Expected start of a new RESP value (either +, -, :, $ or *)
[replication-18] Test failed
[replication-18] Terminating program
include relevant logs here
And here’s a snippet of my code:
let mut acks_received = 0;
let req = RESPType::Array(vec![
RESPType::BulkString("REPLCONF".to_string()),
RESPType::BulkString("GETACK".to_string()),
RESPType::BulkString("*".to_string()),
]);
for (_, streams) in &mut streams_map {
let (reader, mut writer) = streams.split();
debug!("Sending GET ACK TO slave");
let _ = writer.write_all(&req.as_bytes()).await;
writer.flush().await.unwrap();
debug!("Writing to one slave");
let span =
tracing::span!(tracing::Level::DEBUG, "READING ACK FROM CLIENT");
let _guard = span.enter();
debug!("Creating bufferred reader");
let mut reader = BufReader::new(reader);
let resp_type = RESPType::parse(&mut reader).await.unwrap();
debug!("RESP from slave - {:?}", resp_type);
acks_received += 1;
debug!(
"Acks received {:?} -- min_acks -- {}",
acks_received, min_ack
);
if acks_received >= min_ack {
break;
}
}
debug!("Respoding to the onshot channel");
let _ = resp.send(acks_received);
NOTE: I am not rust pro, so it could be that I am doing something very wrong and I can’t figure it out