Thanks for your response @rohitpaulk
Here are the logs from codecrafters test.
remote: [replication-11] client: $ redis-cli SET baz 789
remote: [replication-11] client: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n
remote: [replication-11] client: Received bytes: “$2\r\nOK\r\n”
remote: [replication-11] client: Received RESP value: “OK”
remote: [replication-11] Received “OK”
remote: [replication-11] Sent 3 SET commands to master successfully.
remote: [replication-11] replica: Expecting “SET foo 123” to be propagated
remote: [replication-11] replica: Received bytes: “$2\r\nOK\r\n”
remote: [replication-11] replica: Received RESP value: “OK”
remote: [replication-11] Expected array type, got BULK_STRING
remote: [replication-11] Test failed
remote: [replication-11] Terminating program
remote: [replication-11] Program terminated successfully
In terms of my code structure:
// redis struct
pub struct RedisCache {
data_structure: HashMap<String, (CacheData, CacheData, CacheData)>,
info: HashMap<String, String>,
replicas: Vec<TcpStream>, // can be empty
to_master: Option<TcpStream>
impl RedisCache {
// also contains other functions
/// Creates a TcpListener on the url provided. It handles multiple connections
/// using multi-threading.
/// The current implementation spawns a new thread for every client that connects
/// to the server. This can be replaced by a `threadpool` or
/// concurrent programming.
///
/// The function accepts a connection with a client, spawns a new thread to process the
/// client's requests using the `connection_handler` helper method.
pub fn my_server(mut self, url: String, master_url: &Option<String>) {
let listener = TcpListener::bind(url.as_str()).unwrap();
println!("my_server: TcpListener: {:?}", listener);
// connect with the master --------------------------------------- START
match master_url {
Some(val) => { // means this cache is a replica
let tcp_to_master_result = TcpStream::connect(val);
match tcp_to_master_result {
Ok(mut connection) => {
println!("new: Successfully connected to master cache at:{val}");
let _ = RedisCache::handshake_handler(&mut connection);
println!("new: tcp_to_master: {:?}", connection);
self.to_master = Some(connection);
},
Err(e) => {println!("fn new: Error connecting with master cache. Error: {e:?}")}
}
},
None => {println!("my_server: cache is master!")}
}
// connect with the master --------------------------------------- END
// our cache/db mutex
let cache_mutex = Mutex::new(self.data_structure);
let info_mutex = Mutex::new(self.info);
let replicas_mutex = Mutex::new(self.replicas);
// arc so we can pass the db to multiple threads
let shared_cache: Arc<Mutex<HashMap<String, (CacheData, CacheData, CacheData)>>> = Arc::new(cache_mutex);
let shared_info: Arc<Mutex<HashMap<String, String>>> = Arc::new(info_mutex);
let shared_replicas: Arc<Mutex<Vec<TcpStream>>> = Arc::new(replicas_mutex);
// to handle multiple client requests
for stream in listener.incoming() {
let shared_cache_clone = shared_cache.clone();
let shared_info_clone = shared_info.clone();
let shared_replicas_clone = shared_replicas.clone();
match stream {
Ok(mut client_stream) => {
println!("my_server: accepted new connection: {:?}", client_stream);
std::thread::spawn(move|| {
// an infinite loop to keep the connection with the client alive
// will implement a timeout in a future update
loop {
// RedisCache::connection_handler(&mut _stream, shared_cache_clone.clone());
let result =
RedisCache::connection_handler(&mut client_stream, shared_cache_clone.clone(), shared_info_clone.clone(), shared_replicas_clone.clone());
if result.is_err() {
break;
} else {
// self.replicas.push(client_stream);
}
}
});
}
Err(e) => {
println!("Connection failed. stream error: {}", e);
}
}
}
}
}
/// Reads incoming message and passes it to `message_handler` where the bulk of
/// command processing happens.
fn connection_handler(mut _stream: & mut TcpStream,
cache: Arc<Mutex<HashMap<String, (CacheData, CacheData, CacheData)>>>,
info_arc: Arc<Mutex<HashMap<String, String>>>,
replicas: Arc<Mutex<Vec<TcpStream>>>) -> Result<bool, ()> {
println!("connection_handler...");
let mut buf = [0;512];
if let Ok(bytes_read) = _stream.read(&mut buf){
// converting the slice of bytes to &str. we use the `bytes_read` to know how many
// bytes we received and wrote to buf. This is crucial because our buf is of size 512.
// Which means, if we get less than 512 chars (u8s), we have to exclude the extra 0s from
// our string. (we initiated a slice of size 512 filled with 0s. We don't need the extra zeros).
let incoming_raw_string = std::str::from_utf8(&buf[..bytes_read]);
match incoming_raw_string {
Ok(val) => {
let res = RedisCache::message_handler(val, &mut _stream, cache, info_arc, replicas);
return res;
},
Err(e) => println!("Error: {e:}. Unable to convert buf to string")
}
} else {
println!("connection_handler: Error when reading to string");
}
return Err(());
}
/// Takes in the message we've received from the client and processes the message using
/// the `message_search_regex` helper function which breaks down the incoming message to command and
/// params (if applicable because commands like PING don't have parameters).
///
/// It matches the command (which it gets from `message_search_regex`) and uses helper
/// functions which handle respective commands.
fn message_handler(message: &str, stream: &mut TcpStream,
cache: Arc<Mutex<HashMap<String, (CacheData, CacheData, CacheData)>>>,
info_arc: Arc<Mutex<HashMap<String, String>>>,
replicas: Arc<Mutex<Vec<TcpStream>>>) -> Result<bool, ()> {
/* this function passes the stream and data to the relevant
helper functions that handle the specific request.
E.g. `get_hander`, `set_with_expiry_handler`, `set_without_expiry_handler` etc.
If the command is the second repl conf, it calls the
`second_repl_conf_handler` function which contains the
`send_rdb_file` helper function which is used to send the
RDB file from the master to the replica(s)*/
}
/// The master cache sends the RDB file to replicas.
fn send_rdb_file(stream: &mut TcpStream, replicas: Arc<Mutex<Vec<TcpStream>>>) -> Result<bool, ()> {
// send RDB file ---------------------------------------------------------- START
println!("send_rdb_file to stream: {stream:?}");
let empty_rdb_hex = RedisCache::create_empty_rdb_file();
let empty_rdb_to_hex_result = hex::decode(empty_rdb_hex.clone());
match empty_rdb_to_hex_result {
Ok(empty_rdb_vec) => {
// format value
let rdb_file_prefix = format!("${}\r\n", empty_rdb_vec.len());
let rdb_file_prefix_bytes = rdb_file_prefix.as_bytes();
// concatenate prefix with content (content being the rdb file)
let mut full_message: Vec<u8> = vec![];
let _ = rdb_file_prefix_bytes.iter().map(|x| full_message.push(*x)).collect::<Vec<_>>();
let _ = empty_rdb_vec.iter().map(|x| full_message.push(*x)).collect::<Vec<_>>();
let _ = stream.write_all(full_message.as_slice());
println!("send_rdb_file: empty rdb file sent");
println!("send_rdb_file pre-clone stream: local_addr: {:?}, peer_addr: {:?}", stream.local_addr(), stream.peer_addr());
/*ALERT ISSUE: since this function got a `&mut TcpStream`, I attempted to
convert that to a `TcpStream` which seems to be initiating a
new connection on a new port which isn't what I want since
command propagation should be done by the same connection
used for handshake. */
let stream_copy = stream.try_clone();
match stream_copy {
Ok(stream) => {
let mut replicas_guard = replicas.lock().unwrap();
replicas_guard.push(stream);
println!("send_rdb_file: TcpStream copied. replicas: {replicas_guard:?}");
// when a SET or DEL commands are sent to the master, I can iterate over `replicas` and send the commands to the replicas as well.
},
Err(e) => {println!("send_rdb_file: unable to copy TcpStream. Error: {e:?}")}
}
Ok(true)
// send RDB file ---------------------------------------------------------- END
},
Err(e) => {Err(())}
}
}
}
Please let me know if there’s anything I should clarify. Thanks!