Rust - Redis #ZN8: Single-replica propagation

Hello!

I’m having issue propagating commands from the master to the replicas. I’m not sure how to reuse the same connection used for the handshake.

The process is easier from the replica-side since we know the connection will be with the master.

However, I found it challenging doing it from the master’s side. My current implementation is to try and capture the connection that is used to send the RDB file. Unfortunately, I’m unable to clone the connection without creating a new one on a different socket. To reuse the same connection for other operations, I’m using &mut for my TcpStream and I’m not able to store that without running into errors with the Rust borrow-checker.

My handshakes work alright since they’re handled by the replicas and the master is simply responding to requests from the replicas.

I’d greatly appreciate anyone’s thoughts on this.

Thanks!

To reuse the same connection for other operations, I’m using &mut for my TcpStream and I’m not able to store that without running into errors with the Rust borrow-checker.

@NatnaelGebrekidane could you share a code snippet and where in that code snippet you’re seeing the error?

Thanks for your response @rohitpaulk
Here are the logs from codecrafters test.

remote: [replication-11] client: $ redis-cli SET baz 789
remote: [replication-11] client: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n
remote: [replication-11] client: Received bytes: “$2\r\nOK\r\n”
remote: [replication-11] client: Received RESP value: “OK”
remote: [replication-11] Received “OK”
remote: [replication-11] Sent 3 SET commands to master successfully.
remote: [replication-11] replica: Expecting “SET foo 123” to be propagated
remote: [replication-11] replica: Received bytes: “$2\r\nOK\r\n”
remote: [replication-11] replica: Received RESP value: “OK”
remote: [replication-11] Expected array type, got BULK_STRING
remote: [replication-11] Test failed
remote: [replication-11] Terminating program
remote: [replication-11] Program terminated successfully

In terms of my code structure:

// redis struct
pub struct RedisCache {
    data_structure: HashMap<String, (CacheData, CacheData, CacheData)>,
    info: HashMap<String, String>,
    replicas: Vec<TcpStream>, // can be empty
    to_master: Option<TcpStream>


impl RedisCache { 
// also contains other functions

/// Creates a TcpListener on the url provided. It handles multiple connections
    /// using multi-threading.
    /// The current implementation spawns a new thread for every client that connects
    /// to the server. This can be replaced by a `threadpool` or
    /// concurrent programming.
    ///
    /// The function accepts a connection with a client, spawns a new thread to process the
    /// client's requests using the `connection_handler` helper method.
    pub fn my_server(mut self, url: String, master_url: &Option<String>) {
        let listener = TcpListener::bind(url.as_str()).unwrap();
        println!("my_server: TcpListener: {:?}", listener);

        // connect with the master --------------------------------------- START
        match master_url {
            Some(val) => { // means this cache is a replica
                let tcp_to_master_result = TcpStream::connect(val);
                match tcp_to_master_result {
                    Ok(mut connection) => {

                        println!("new: Successfully connected to master cache at:{val}");
                        let _ = RedisCache::handshake_handler(&mut connection);
                        println!("new: tcp_to_master: {:?}", connection);

                        self.to_master = Some(connection);
                    },
                    Err(e) => {println!("fn new: Error connecting with master cache. Error: {e:?}")}
                }
            },
            None => {println!("my_server: cache is master!")}
        }
        // connect with the master --------------------------------------- END

        // our cache/db mutex
        let cache_mutex = Mutex::new(self.data_structure);
        let info_mutex = Mutex::new(self.info);
        let replicas_mutex = Mutex::new(self.replicas);


        // arc so we can pass the db to multiple threads
        let shared_cache: Arc<Mutex<HashMap<String, (CacheData, CacheData, CacheData)>>> = Arc::new(cache_mutex);
        let shared_info: Arc<Mutex<HashMap<String, String>>> = Arc::new(info_mutex);
        let shared_replicas: Arc<Mutex<Vec<TcpStream>>> = Arc::new(replicas_mutex);


        // to handle multiple client requests
        for stream in listener.incoming() {
            let shared_cache_clone = shared_cache.clone();
            let shared_info_clone = shared_info.clone();
            let shared_replicas_clone = shared_replicas.clone();


            match stream {
                Ok(mut client_stream) => {
                    println!("my_server: accepted new connection: {:?}", client_stream);

                    std::thread::spawn(move|| {
                        // an infinite loop to keep the connection with the client alive
                        // will implement a timeout in a future update
                        loop {
                            // RedisCache::connection_handler(&mut _stream, shared_cache_clone.clone());
                            let result =
                                RedisCache::connection_handler(&mut client_stream, shared_cache_clone.clone(), shared_info_clone.clone(), shared_replicas_clone.clone());

                            if result.is_err() {
                                break;
                            } else {
                                // self.replicas.push(client_stream);
                            }
                        }

                    });
                }
                Err(e) => {
                    println!("Connection failed. stream error: {}", e);
                }
            }
        }
    }

}


/// Reads incoming message and passes it to `message_handler` where the bulk of
    /// command processing happens.
    fn connection_handler(mut _stream: & mut TcpStream,
                          cache: Arc<Mutex<HashMap<String, (CacheData, CacheData, CacheData)>>>,
                          info_arc: Arc<Mutex<HashMap<String, String>>>,
                          replicas: Arc<Mutex<Vec<TcpStream>>>) -> Result<bool, ()> {
        println!("connection_handler...");
        let mut buf = [0;512];
        if let Ok(bytes_read) = _stream.read(&mut buf){

            // converting the slice of bytes to &str. we use the `bytes_read` to know how many
            // bytes we received and wrote to buf. This is crucial because our buf is of size 512.
            // Which means, if we get less than 512 chars (u8s), we have to exclude the extra 0s from
            // our string. (we initiated a slice of size 512 filled with 0s. We don't need the extra zeros).
            let incoming_raw_string = std::str::from_utf8(&buf[..bytes_read]);

            match incoming_raw_string {
                Ok(val) => {
                    let res = RedisCache::message_handler(val, &mut _stream, cache, info_arc, replicas);
                    return res;
                },
                Err(e) => println!("Error: {e:}. Unable to convert buf to string")
            }
        } else {
            println!("connection_handler: Error when reading to string");
        }
        return Err(());
    }


/// Takes in the message we've received from the client and processes the message using
    /// the `message_search_regex` helper function which breaks down the incoming message to command and
    /// params (if applicable because commands like PING don't have parameters).
    ///
    /// It matches the command (which it gets from `message_search_regex`) and uses helper
    /// functions which handle respective commands.
    fn message_handler(message: &str, stream: &mut TcpStream,
                       cache: Arc<Mutex<HashMap<String, (CacheData, CacheData, CacheData)>>>,
                       info_arc: Arc<Mutex<HashMap<String, String>>>,
                       replicas: Arc<Mutex<Vec<TcpStream>>>) -> Result<bool, ()> {
/* this function passes the stream and data to the relevant 
helper functions that handle the specific request. 
E.g. `get_hander`, `set_with_expiry_handler`, `set_without_expiry_handler` etc. 
If the command is the second repl conf, it calls the 
`second_repl_conf_handler` function which contains the 
`send_rdb_file` helper function which is used to send the 
RDB file from the master to the replica(s)*/
}


/// The master cache sends the RDB file to replicas.
    fn send_rdb_file(stream: &mut TcpStream, replicas: Arc<Mutex<Vec<TcpStream>>>) -> Result<bool, ()> {
        // send RDB file  ---------------------------------------------------------- START
        println!("send_rdb_file to stream: {stream:?}");
        let empty_rdb_hex = RedisCache::create_empty_rdb_file();
        let empty_rdb_to_hex_result = hex::decode(empty_rdb_hex.clone());

        match empty_rdb_to_hex_result {
            Ok(empty_rdb_vec) => {
                // format value
                let rdb_file_prefix = format!("${}\r\n", empty_rdb_vec.len());
                let rdb_file_prefix_bytes = rdb_file_prefix.as_bytes();

                // concatenate prefix with content (content being the rdb file)
                let mut full_message: Vec<u8> = vec![];
                let _ = rdb_file_prefix_bytes.iter().map(|x| full_message.push(*x)).collect::<Vec<_>>();
                let _ = empty_rdb_vec.iter().map(|x| full_message.push(*x)).collect::<Vec<_>>();
                let _ = stream.write_all(full_message.as_slice());
                println!("send_rdb_file: empty rdb file sent");
                println!("send_rdb_file pre-clone stream: local_addr: {:?}, peer_addr: {:?}", stream.local_addr(), stream.peer_addr());

                /*ALERT ISSUE: since this function got a `&mut TcpStream`, I attempted to 
convert that to a `TcpStream` which seems to be initiating a 
new connection on a new port which isn't what I want since 
command propagation should be done by the same connection 
used for handshake. */

                let stream_copy = stream.try_clone();
                match stream_copy {
                    Ok(stream) => {
                        let mut replicas_guard = replicas.lock().unwrap();
                        replicas_guard.push(stream);
                        println!("send_rdb_file: TcpStream copied. replicas: {replicas_guard:?}");
                        // when a SET or DEL commands are sent to the master, I can iterate over `replicas` and send the commands to the replicas as well. 
                    },
                    Err(e) => {println!("send_rdb_file: unable to copy TcpStream. Error: {e:?}")}
                }
                Ok(true)
                // send RDB file  ---------------------------------------------------------- END
            },
            Err(e) => {Err(())}
        }
    }

}

Please let me know if there’s anything I should clarify. Thanks!

@NatnaelGebrekidane I’ll see if we can improve the expectation logs here.

This is where the bug is:

The expected value to be propagated is the SET command itself (i.e. something like *3\r\n$3\r\nSET\r\n.... It looks like the incorrect value being propagated is the response to the command ("OK").

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.

Note: I’ve updated the title of this post to include the stage ID (#ZN8). You can learn about the stages rename here: Upcoming change: Stages overhaul.