Stage #QQ0 Stuck with sending bytes to TcpStream

Hello all,

I am currently working on Stage 5 of the challenge. I refactored the code to be able to parse incoming messages.

I converted my tokio::net::TcpStream to a tokio::io::BufStream<tokio::net::TcpStream> so that I can read bytes until \r\n as and when needed.

The output of my println!(..) is as follows for running redis-cli echo "hello world"

accepted new connection
Received '[42, 50, 13, 10]' in the buffer.
Received '[36, 52, 13, 10]' in the buffer.
Bulk String Buf Len = '4'.
Received '[101, 99, 104, 111, 13, 10]' in the bulk buffer.
Decoded the value at 0 as BulkString("echo").
Received '[36, 49, 49, 13, 10]' in the buffer.
Bulk String Buf Len = '11'.
Received '[104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 13, 10]' in the bulk buffer.
Decoded the value at 1 as BulkString("hello world").
In echo.
Generated response is Ok("$11\r\nhello world\r\n") of length 18.
Written 18 bytes.
accepted new connection

The write says that is has written all the 18 bytes to the stream but the redis-cli terminal does not receive the output.

How do I resolve the writing issue here ?
I am assuming it has to do with the BufStream<TcpStream> which reads well but does not write.

Thanks in advance.


Below is my main.rs file:

mod resp;

use tokio::io::{AsyncWriteExt, BufStream};
use tokio::net::{TcpListener, TcpStream};

use crate::resp::{decode::decode, encode::encode, value::Value};

async fn handle_client(stream: &mut BufStream<TcpStream>) {
    loop {
        println!("accepted new connection");

        let resp_value = match decode(stream).await {
            Ok(resp_value) => resp_value,
            Err(error) => panic!("{}", error.to_string()),
        };

        let response = match resp_value {
            Value::Array(commands) => match &commands[0] {
                Value::BulkString(input_command) => match input_command.to_lowercase().as_str() {
                    "ping" => {
                        println!("In ping.");
                        if commands.len() > 2 {
                            todo!()
                        }
                        if commands.len() == 1 {
                            Some(encode(&Value::SimpleString("PONG".to_string())))
                        } else {
                            Some(encode(&commands[1]))
                        }
                    }
                    "echo" => {
                        println!("In echo.");
                        if commands.len() != 2 {
                            todo!()
                        }
                        Some(encode(&commands[1]))
                    }
                    _ => None,
                },
                other => panic!(
                    "Expected a RESP Bulk String inside the array but received {:?}.",
                    other
                ),
            },
            _ => panic!("Expected a RESP Array of Bulk Strings."),
        };

        match response {
            Some(response) => {
                let num_bytes = response.len();
                println!(
                    "Generated response is {:?} of length {:?}.",
                    String::from_utf8(response.clone()),
                    num_bytes
                );
                let mut num_written: usize = 0;

                while num_written < num_bytes {
                    let n = match stream.write(response.as_slice()).await {
                        Ok(n) => n,
                        Err(error) => panic!("{}", error.to_string()),
                    };
                    println!("Written {:?} bytes.", n);
                    num_written += n;
                }
            }
            None => {}
        }
    }
}

#[tokio::main]
async fn main() {
    let listener = match TcpListener::bind("127.0.0.1:6379").await {
        Ok(listener) => listener,
        Err(error) => panic!("{}", error.to_string()),
    };

    loop {
        let (mut stream, _socket_addr) = match listener.accept().await {
            Ok((stream, _socket_addr)) => (BufStream::new(stream), _socket_addr),
            Err(error) => panic!("{}", error.to_string()),
        };

        tokio::spawn(async move { handle_client(&mut stream).await });
    }
}

So I got the solution, I had to flush after my write_all call.

        match response {
            Some(response) => {
                let num_bytes = response.len();
                println!(
                    "Generated response is {:?} of length {:?}.",
                    String::from_utf8(response.clone()),
                    num_bytes
                );

                match stream.write_all(response.as_slice()).await {
                    Ok(()) => match stream.flush().await {
                        Ok(()) => {}
                        Err(error) => panic!("{}", error.to_string()),
                    },
                    Err(error) => panic!("{}", error.to_string()),
                };
            }
            None => {}
        }

This was it and it works now.

Ah, nice find! For others reading this, also remember to use write_all instead of write.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.

Note: I’ve updated the title of this post to include the stage ID (#QQ0). You can learn about the stages rename here: Upcoming change: Stages overhaul