I’m stuck on Stage #nc5 with rust.
The tester seems rather confusing. If I write all zeros to the first four bytes of tcpstream, tester will not read the subsequent bytes and output “Expected INT32 length to be 4 bytes, got 0 bytes”. However, if the first four bytes are not all zeros, tester will not read any bytes and will directly output “error reading from connection: unexpected EOF”. This situation also occurs when I use the passed Code Examples. I wonder if this is a bug.
My code is available here:flyingsand/codecrafters-kafka-rust
Here are my logs:
zeros to the first four bytes situation:
remote: [tester::#NC5] Running tests for Stage #NC5 (Parse API Version)
remote: [tester::#NC5] $ ./your_program.sh /tmp/server.properties
remote: [tester::#NC5] Connecting to broker at: localhost:9092
remote: [your_program] Logs from your program will appear here!
remote: [tester::#NC5] Connection to broker at localhost:9092 successful
remote: [tester::#NC5] [Encoder] - ApiVersionsRequest
remote: [tester::#NC5] [Encoder] - Header
remote: [tester::#NC5] [Encoder] - APIKey (18)
remote: [tester::#NC5] [Encoder] - APIVersion (1080)
remote: [tester::#NC5] [Encoder] - CorrelationID (1667803575)
remote: [tester::#NC5] [Encoder] - ClientID (kafka-tester)
remote: [tester::#NC5] [Encoder] - Body
remote: [tester::#NC5] [Encoder] - ClientSoftwareName (kafka-cli)
remote: [tester::#NC5] [Encoder] - ClientSoftwareVersion (0.1)
remote: [tester::#NC5] Sending "ApiVersions" request
remote: [tester::#NC5] Hexdump of sent "ApiVersions" request:
remote: [tester::#NC5] Idx | Hex | ASCII
remote: [tester::#NC5] -----+-------------------------------------------------+-----------------
remote: [tester::#NC5] 0000 | 00 00 00 26 00 12 04 38 63 68 a9 b7 00 0c 6b 61 | ...&...8ch....ka
remote: [tester::#NC5] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 0a 6b 61 66 6b | fka-tester..kafk
remote: [tester::#NC5] 0020 | 61 2d 63 6c 69 04 30 2e 31 00 | a-cli.0.1.
remote: [tester::#NC5]
remote: [tester::#NC5] Hexdump of received "ApiVersions" response:
remote: [tester::#NC5] Idx | Hex | ASCII
remote: [tester::#NC5] -----+-------------------------------------------------+-----------------
remote: [tester::#NC5] 0000 | 00 00 00 00 | ....
remote: [tester::#NC5]
remote: [tester::#NC5] [Decoder] - ApiVersionsResponse
remote: [tester::#NC5] [Decoder] - Header
remote: [tester::#NC5] [Decoder] X CorrelationID (decode error)
remote: [tester::#NC5] Received bytes:
remote: [tester::#NC5] Hex (bytes 0--1) | ASCII
remote: [tester::#NC5] ------------------------------------------------+------------------
remote: [tester::#NC5]
remote: [tester::#NC5] ^ ^
remote: [your_program] accepted new connection
remote: [tester::#NC5] Expected INT32 length to be 4 bytes, got 0 bytes
remote: [tester::#NC5] Test failed
remote: [tester::#NC5] Terminating program
remote: [tester::#NC5] Program terminated successfully
first four bytes are not all zeros situation:
remote: [tester::#NC5] Running tests for Stage #NC5 (Parse API Version)
remote: [tester::#NC5] $ ./your_program.sh /tmp/server.properties
remote: [tester::#NC5] Connecting to broker at: localhost:9092
remote: [your_program] Logs from your program will appear here!
remote: [tester::#NC5] Connection to broker at localhost:9092 successful
remote: [tester::#NC5] [Encoder] - ApiVersionsRequest
remote: [tester::#NC5] [Encoder] - Header
remote: [tester::#NC5] [Encoder] - APIKey (18)
remote: [tester::#NC5] [Encoder] - APIVersion (1564)
remote: [tester::#NC5] [Encoder] - CorrelationID (350636842)
remote: [tester::#NC5] [Encoder] - ClientID (kafka-tester)
remote: [tester::#NC5] [Encoder] - Body
remote: [tester::#NC5] [Encoder] - ClientSoftwareName (kafka-cli)
remote: [tester::#NC5] [Encoder] - ClientSoftwareVersion (0.1)
remote: [tester::#NC5] Sending "ApiVersions" request
remote: [tester::#NC5] Hexdump of sent "ApiVersions" request:
remote: [tester::#NC5] Idx | Hex | ASCII
remote: [tester::#NC5] -----+-------------------------------------------------+-----------------
remote: [tester::#NC5] 0000 | 00 00 00 26 00 12 06 1c 14 e6 4b 2a 00 0c 6b 61 | ...&......K*..ka
remote: [tester::#NC5] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 0a 6b 61 66 6b | fka-tester..kafk
remote: [tester::#NC5] 0020 | 61 2d 63 6c 69 04 30 2e 31 00 | a-cli.0.1.
remote: [tester::#NC5]
remote: [your_program] accepted new connection
remote: [tester::#NC5] error reading from connection: unexpected EOF
remote: [tester::#NC5] Test failed
remote: [tester::#NC5] Terminating program
remote: [tester::#NC5] Program terminated successfully
And here’s a snippet of my code:
//main.rs
fn main() -> std::io::Result<()>{
println!("Logs from your program will appear here!");
// Uncomment this block to pass the first stage
//
let listener = TcpListener::bind("127.0.0.1:9092")?;
for stream in listener.incoming() {
match stream {
Ok(mut _stream) => {
let input_protocol = InputProtocol::parse_from_stream(&mut _stream)?;
let output_protocol = OutputProtocol::build_from_input_protocol(&input_protocol);
output_protocol.write_protocol(&mut _stream)?;
let _ = _stream.flush()?;
}
Err(e) => {
println!("error: {}", e);
}
}
}
Ok(())
}
//protocol.rs
#[derive(Debug)]
pub struct InputProtocol{
pub message_size: u32,
pub request_api_key: u16,
pub request_api_version: u16,
pub correlation_id: u32
}
#[derive(Debug)]
pub struct OutputProtocol {
pub message_size: u32,
pub correlation_id: u32,
pub error_code: u16
}
impl InputProtocol {
pub fn parse_from_stream(stream:&mut TcpStream) -> io::Result<Self> {
let mut bytes = [0u8;4];
stream.read_exact(&mut bytes)?;
let len = u32::from_be_bytes(bytes);
let mut buffer = vec![0u8;len as usize];
stream.read_exact(&mut buffer)?;
let bytes = &buffer[0..2];
let api_key = u16::from_be_bytes(bytes.try_into().unwrap());
let bytes = &buffer[2..4];
let api_version = u16::from_be_bytes(bytes.try_into().unwrap());
let bytes = &buffer[4..8];
let correlation_id = u32::from_be_bytes(bytes.try_into().unwrap());
Ok(
InputProtocol {
message_size: len,
request_api_key: api_key,
request_api_version: api_version,
correlation_id
}
)
}
}
impl OutputProtocol {
pub fn build_from_input_protocol(input_protocol: & InputProtocol) -> Self{
let correlation_id = input_protocol.correlation_id;
let error_code = Self::build_error_code(input_protocol);
let message_size = 0;
OutputProtocol {
message_size,
correlation_id,
error_code
}
}
fn build_error_code(input_protocol: & InputProtocol) -> u16 {
if input_protocol.request_api_version <=4 {
0
}
else{
35
}
}
pub fn write_protocol(&self, stream:&mut TcpStream) -> io::Result<()>{
// Write message size
stream.flush()?;
stream.write_all(&self.message_size.to_be_bytes())?;
// Write correlation ID
stream.write_all(&self.correlation_id.to_be_bytes())?;
// Write error code
stream.write_all(&self.error_code.to_be_bytes())?;
stream.flush()?;
Ok(())
}
}