Stuck on stage #NC5 with rust

I’m stuck on Stage #nc5 with rust.

The tester seems rather confusing. If I write all zeros to the first four bytes of tcpstream, tester will not read the subsequent bytes and output “Expected INT32 length to be 4 bytes, got 0 bytes”. However, if the first four bytes are not all zeros, tester will not read any bytes and will directly output “error reading from connection: unexpected EOF”. This situation also occurs when I use the passed Code Examples. I wonder if this is a bug.

My code is available here:flyingsand/codecrafters-kafka-rust

Here are my logs:

zeros to the first four bytes situation:

remote: [tester::#NC5] Running tests for Stage #NC5 (Parse API Version)
remote: [tester::#NC5] $ ./your_program.sh /tmp/server.properties
remote: [tester::#NC5] Connecting to broker at: localhost:9092
remote: [your_program] Logs from your program will appear here!
remote: [tester::#NC5] Connection to broker at localhost:9092 successful
remote: [tester::#NC5] [Encoder] - ApiVersionsRequest
remote: [tester::#NC5] [Encoder]   - Header
remote: [tester::#NC5] [Encoder]     - APIKey (18)
remote: [tester::#NC5] [Encoder]     - APIVersion (1080)
remote: [tester::#NC5] [Encoder]     - CorrelationID (1667803575)
remote: [tester::#NC5] [Encoder]     - ClientID (kafka-tester)
remote: [tester::#NC5] [Encoder]   - Body
remote: [tester::#NC5] [Encoder]     - ClientSoftwareName (kafka-cli)
remote: [tester::#NC5] [Encoder]     - ClientSoftwareVersion (0.1)
remote: [tester::#NC5] Sending "ApiVersions" request
remote: [tester::#NC5] Hexdump of sent "ApiVersions" request: 
remote: [tester::#NC5] Idx  | Hex                                             | ASCII
remote: [tester::#NC5] -----+-------------------------------------------------+-----------------
remote: [tester::#NC5] 0000 | 00 00 00 26 00 12 04 38 63 68 a9 b7 00 0c 6b 61 | ...&...8ch....ka
remote: [tester::#NC5] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 0a 6b 61 66 6b | fka-tester..kafk
remote: [tester::#NC5] 0020 | 61 2d 63 6c 69 04 30 2e 31 00                   | a-cli.0.1.
remote: [tester::#NC5] 
remote: [tester::#NC5] Hexdump of received "ApiVersions" response: 
remote: [tester::#NC5] Idx  | Hex                                             | ASCII
remote: [tester::#NC5] -----+-------------------------------------------------+-----------------
remote: [tester::#NC5] 0000 | 00 00 00 00                                     | ....
remote: [tester::#NC5] 
remote: [tester::#NC5] [Decoder] - ApiVersionsResponse
remote: [tester::#NC5] [Decoder]   - Header
remote: [tester::#NC5] [Decoder]     X CorrelationID (decode error)
remote: [tester::#NC5] Received bytes:
remote: [tester::#NC5] Hex (bytes 0--1)                                | ASCII
remote: [tester::#NC5] ------------------------------------------------+------------------
remote: [tester::#NC5] 
remote: [tester::#NC5]  ^                                                ^
remote: [your_program] accepted new connection
remote: [tester::#NC5] Expected INT32 length to be 4 bytes, got 0 bytes
remote: [tester::#NC5] Test failed
remote: [tester::#NC5] Terminating program
remote: [tester::#NC5] Program terminated successfully

first four bytes are not all zeros situation:

remote: [tester::#NC5] Running tests for Stage #NC5 (Parse API Version)
remote: [tester::#NC5] $ ./your_program.sh /tmp/server.properties
remote: [tester::#NC5] Connecting to broker at: localhost:9092
remote: [your_program] Logs from your program will appear here!
remote: [tester::#NC5] Connection to broker at localhost:9092 successful
remote: [tester::#NC5] [Encoder] - ApiVersionsRequest
remote: [tester::#NC5] [Encoder]   - Header
remote: [tester::#NC5] [Encoder]     - APIKey (18)
remote: [tester::#NC5] [Encoder]     - APIVersion (1564)
remote: [tester::#NC5] [Encoder]     - CorrelationID (350636842)
remote: [tester::#NC5] [Encoder]     - ClientID (kafka-tester)
remote: [tester::#NC5] [Encoder]   - Body
remote: [tester::#NC5] [Encoder]     - ClientSoftwareName (kafka-cli)
remote: [tester::#NC5] [Encoder]     - ClientSoftwareVersion (0.1)
remote: [tester::#NC5] Sending "ApiVersions" request
remote: [tester::#NC5] Hexdump of sent "ApiVersions" request: 
remote: [tester::#NC5] Idx  | Hex                                             | ASCII
remote: [tester::#NC5] -----+-------------------------------------------------+-----------------
remote: [tester::#NC5] 0000 | 00 00 00 26 00 12 06 1c 14 e6 4b 2a 00 0c 6b 61 | ...&......K*..ka
remote: [tester::#NC5] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 0a 6b 61 66 6b | fka-tester..kafk
remote: [tester::#NC5] 0020 | 61 2d 63 6c 69 04 30 2e 31 00                   | a-cli.0.1.
remote: [tester::#NC5] 
remote: [your_program] accepted new connection
remote: [tester::#NC5] error reading from connection: unexpected EOF
remote: [tester::#NC5] Test failed
remote: [tester::#NC5] Terminating program
remote: [tester::#NC5] Program terminated successfully

And here’s a snippet of my code:

//main.rs
fn main() -> std::io::Result<()>{
    println!("Logs from your program will appear here!");

    // Uncomment this block to pass the first stage
    //
     let listener = TcpListener::bind("127.0.0.1:9092")?;
    
     for stream in listener.incoming() {
         match stream {
             Ok(mut _stream) => {
                let input_protocol = InputProtocol::parse_from_stream(&mut _stream)?;
                let output_protocol = OutputProtocol::build_from_input_protocol(&input_protocol);

                output_protocol.write_protocol(&mut _stream)?;
                let _ = _stream.flush()?;

             }
             Err(e) => {
                 println!("error: {}", e);
             }
         }
    }
    Ok(())
}

//protocol.rs
#[derive(Debug)]
pub struct InputProtocol{
    pub message_size: u32,
    pub request_api_key: u16,
    pub request_api_version: u16,
    pub correlation_id: u32
}
#[derive(Debug)]
pub struct OutputProtocol {
    pub message_size: u32,
    pub correlation_id: u32,
    pub error_code: u16
}
impl InputProtocol {
    pub fn parse_from_stream(stream:&mut TcpStream) -> io::Result<Self> {
        let mut bytes = [0u8;4];
        stream.read_exact(&mut bytes)?;
        let len = u32::from_be_bytes(bytes);
        let mut buffer = vec![0u8;len as usize];
        stream.read_exact(&mut buffer)?;

        let bytes = &buffer[0..2];
        let api_key = u16::from_be_bytes(bytes.try_into().unwrap());
        let bytes = &buffer[2..4];
        let api_version = u16::from_be_bytes(bytes.try_into().unwrap());
        let bytes = &buffer[4..8];
        let correlation_id = u32::from_be_bytes(bytes.try_into().unwrap());
        Ok(
            InputProtocol {
                message_size: len,
                request_api_key: api_key,
                request_api_version: api_version,
                correlation_id
            }
        )
        
    }
}

impl OutputProtocol {
    pub fn build_from_input_protocol(input_protocol: & InputProtocol) -> Self{
        let correlation_id = input_protocol.correlation_id;
        let error_code = Self::build_error_code(input_protocol);
        let message_size = 0; 
        OutputProtocol {
            message_size,
            correlation_id,
            error_code
        }
    }
    
    fn build_error_code(input_protocol: & InputProtocol) -> u16 {
        if input_protocol.request_api_version <=4 {
            0
        }
        else{
            35
        }
    }
    
    pub fn write_protocol(&self, stream:&mut TcpStream) -> io::Result<()>{
        // Write message size
        stream.flush()?;
        stream.write_all(&self.message_size.to_be_bytes())?;
        // Write correlation ID
        stream.write_all(&self.correlation_id.to_be_bytes())?;
        // Write error code
        stream.write_all(&self.error_code.to_be_bytes())?;
        stream.flush()?;
        Ok(())
    }
}

Hey @flyingsand, thanks for highlighting the issue!

We’re currently looking into it and will update you as soon as we have more details.

I didn’t modify the code just now, but the test suddenly passed. So I’m quite sure this is a bug.

I didn’t modify the code just now, but the test suddenly passed. So I’m quite sure this is a bug.

We’ve shipped a fix in this PR. Let us know if the issue comes up again!

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.