Hello all,
I am currently working on Stage 5
of the challenge. I refactored the code to be able to parse incoming messages.
I converted my tokio::net::TcpStream
to a tokio::io::BufStream<tokio::net::TcpStream>
so that I can read bytes until \r\n
as and when needed.
The output of my println!(..)
is as follows for running redis-cli echo "hello world"
accepted new connection
Received '[42, 50, 13, 10]' in the buffer.
Received '[36, 52, 13, 10]' in the buffer.
Bulk String Buf Len = '4'.
Received '[101, 99, 104, 111, 13, 10]' in the bulk buffer.
Decoded the value at 0 as BulkString("echo").
Received '[36, 49, 49, 13, 10]' in the buffer.
Bulk String Buf Len = '11'.
Received '[104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 13, 10]' in the bulk buffer.
Decoded the value at 1 as BulkString("hello world").
In echo.
Generated response is Ok("$11\r\nhello world\r\n") of length 18.
Written 18 bytes.
accepted new connection
The write says that is has written all the 18 bytes to the stream but the redis-cli
terminal does not receive the output.
How do I resolve the writing issue here ?
I am assuming it has to do with the BufStream<TcpStream>
which reads well but does not write.
Thanks in advance.
Below is my main.rs
file:
mod resp;
use tokio::io::{AsyncWriteExt, BufStream};
use tokio::net::{TcpListener, TcpStream};
use crate::resp::{decode::decode, encode::encode, value::Value};
async fn handle_client(stream: &mut BufStream<TcpStream>) {
loop {
println!("accepted new connection");
let resp_value = match decode(stream).await {
Ok(resp_value) => resp_value,
Err(error) => panic!("{}", error.to_string()),
};
let response = match resp_value {
Value::Array(commands) => match &commands[0] {
Value::BulkString(input_command) => match input_command.to_lowercase().as_str() {
"ping" => {
println!("In ping.");
if commands.len() > 2 {
todo!()
}
if commands.len() == 1 {
Some(encode(&Value::SimpleString("PONG".to_string())))
} else {
Some(encode(&commands[1]))
}
}
"echo" => {
println!("In echo.");
if commands.len() != 2 {
todo!()
}
Some(encode(&commands[1]))
}
_ => None,
},
other => panic!(
"Expected a RESP Bulk String inside the array but received {:?}.",
other
),
},
_ => panic!("Expected a RESP Array of Bulk Strings."),
};
match response {
Some(response) => {
let num_bytes = response.len();
println!(
"Generated response is {:?} of length {:?}.",
String::from_utf8(response.clone()),
num_bytes
);
let mut num_written: usize = 0;
while num_written < num_bytes {
let n = match stream.write(response.as_slice()).await {
Ok(n) => n,
Err(error) => panic!("{}", error.to_string()),
};
println!("Written {:?} bytes.", n);
num_written += n;
}
}
None => {}
}
}
}
#[tokio::main]
async fn main() {
let listener = match TcpListener::bind("127.0.0.1:6379").await {
Ok(listener) => listener,
Err(error) => panic!("{}", error.to_string()),
};
loop {
let (mut stream, _socket_addr) = match listener.accept().await {
Ok((stream, _socket_addr)) => (BufStream::new(stream), _socket_addr),
Err(error) => panic!("{}", error.to_string()),
};
tokio::spawn(async move { handle_client(&mut stream).await });
}
}