Question about pv1: Handle APIVersions requests in rust

I keep getting a client connection reset error. here is the code

#![allow(unused_imports)]
use std::{
    io::{Read, Write},
    net::TcpListener,
};

fn main() {
    println!("Logs from your program will appear here!");
    let listener = TcpListener::bind("127.0.0.1:9092").unwrap();

    for stream in listener.incoming() {
        match stream {
            Ok(mut stream) => {
                println!("accepted new connection");
                let mut ms = [0u8; 4];
                let _ = stream.read_exact(&mut ms).unwrap();
                let len = i32::from_be_bytes(ms);
                let mut buff = vec![0; (len - 4) as usize];
                stream.read_exact(&mut buff).unwrap();

                let _api = i16::from_be_bytes([buff[0], buff[1]]);
                let _req_ver = i16::from_be_bytes([buff[2], buff[3]]);

                let cid = [buff[4], buff[5], buff[6], buff[7]];
                let error_code = 0i16.to_be_bytes();
                let key_count = 1i32.to_be_bytes();
                let api_key =
                    [18i16.to_be_bytes(), 0i16.to_be_bytes(), 4i16.to_be_bytes()].concat(); //is min 4 or zero?
                let mess_len = cid.len()
                    + error_code.len()
                    + key_count.len()
                    + api_key.len()
                    + size_of::<i32>();

                let mut resp = Vec::with_capacity(mess_len);

                resp.extend((mess_len as i32).to_be_bytes());
                resp.extend(cid);
                resp.extend(error_code);
                resp.extend(key_count);
                resp.extend(api_key);
                println!("{:?}", resp);
                let _ = stream.write_all(&resp);

                let res = stream.flush();

                println!("res {:?}", res);
            }
            Err(e) => {
                println!("error: {}", e);
            }
        }
    }
}

Looking at the hexdump of the input and my output, I can’t see any issues. So is it connection handling?

[your_program] accepted new connection
[tester::#PV1] Sending "ApiVersions" (version: 4) request (Correlation id: 1163421261)
[tester::#PV1] Hexdump of sent "ApiVersions" request:
[tester::#PV1] Idx  | Hex                                             | ASCII
[tester::#PV1] -----+-------------------------------------------------+-----------------
[tester::#PV1] 0000 | 00 00 00 23 00 12 00 04 45 58 66 4d 00 09 6b 61 | ...#....EXfM..ka
[tester::#PV1] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
[tester::#PV1] 0020 | 6c 69 04 30 2e 31 00                            | li.0.1.
[tester::#PV1]
[your_program] [0, 0, 0, 20, 69, 88, 102, 77, 0, 0, 0, 0, 0, 1, 0, 18, 0, 0, 0, 4]
[your_program] res Ok(())
[tester::#PV1] error reading from connection: read tcp 127.0.0.1:48012->127.0.0.1:9092: read: connection reset by peer
[tester::#PV1] Test failed

Do I need something for the tag_buffer?

Cheers!

Hi @russelldb, could you upload your code to GitHub and share the link? It will be much easier to debug if I can run it directly.

Hi,
Thanks for the reply GitHub - russelldb/codecrafters-kafka-rust is the repo. My previous step also failed repeatedly with connection reset, and that was when I discovered I need to read all the bytes from the stream before writing the response. Is this the same thing?
Cheers

Thanks for sharing the link! I’ll take a look and get back to you by the end of the week.

Initiating test run...

⚡ This is a turbo test run. https://codecrafters.io/turbo

Running tests. Logs should appear shortly...

[compile]    Compiling codecrafters-kafka v0.1.0 (/app)
[compile]     Finished `release` profile [optimized] target(s) in 0.95s
[compile] Moved ./.codecrafters/run.sh → ./your_program.sh
[compile] Compilation successful.

Debug = true

[tester::#PV1] Running tests for Stage #PV1 (Handle APIVersions requests)
[tester::#PV1] $ ./your_program.sh /tmp/server.properties
[tester::#PV1] Connecting to broker at: localhost:9092
[your_program] Logs from your program will appear here!
[your_program] accepted new connection
[tester::#PV1] Connection to broker at localhost:9092 successful
[tester::#PV1] Sending "ApiVersions" (version: 4) request (Correlation id: 1463317036)
[tester::#PV1] Hexdump of sent "ApiVersions" request:
[tester::#PV1] Idx  | Hex                                             | ASCII
[tester::#PV1] -----+-------------------------------------------------+-----------------
[tester::#PV1] 0000 | 00 00 00 23 00 12 00 04 57 38 72 2c 00 09 6b 61 | ...#....W8r,..ka
[tester::#PV1] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
[tester::#PV1] 0020 | 6c 69 04 30 2e 31 00                            | li.0.1.
[tester::#PV1]
[tester::#PV1] Hexdump of received "ApiVersions" response:
[tester::#PV1] Idx  | Hex                                             | ASCII
[tester::#PV1] -----+-------------------------------------------------+-----------------
[tester::#PV1] 0000 | 00 00 00 13 57 38 72 2c 00 00 02 00 12 00 00 00 | ....W8r,........
[tester::#PV1] 0010 | 04 00 00 00 01 a4 00                            | .......
[tester::#PV1]
[tester::#PV1] [Decoder] - .ResponseHeader
[tester::#PV1] [Decoder]   - .correlation_id (1463317036)
[tester::#PV1] [Decoder] - .ResponseBody
[tester::#PV1] [Decoder]   - .error_code (0)
[tester::#PV1] [Decoder]   - .num_api_keys (1)
[tester::#PV1] [Decoder]   - .ApiKeys[0]
[tester::#PV1] [Decoder]     - .api_key (18)
[tester::#PV1] [Decoder]     - .min_version (0)
[tester::#PV1] [Decoder]     - .max_version (4)
[tester::#PV1] [Decoder]     - .TAG_BUFFER
[tester::#PV1] [Decoder]   - .throttle_time_ms (420)
[tester::#PV1] [Decoder]   - .TAG_BUFFER
[tester::#PV1] ✓ Correlation ID: 1463317036
[tester::#PV1] ✓ Error code: 0 (NO_ERROR)
[tester::#PV1] ✓ API keys array is non-empty
[tester::#PV1] ✓ API version 4 is supported for API_VERSIONS
[tester::#PV1] Test passed.
[tester::#PV1] Terminating program
[your_program] res Ok(())
[tester::#PV1] Program terminated successfully

[tester::#NC5] Running tests for Stage #NC5 (Parse API Version)
[tester::#NC5] $ ./your_program.sh /tmp/server.properties
[tester::#NC5] Connecting to broker at: localhost:9092
[your_program] Logs from your program will appear here!
[your_program] accepted new connection
[tester::#NC5] Connection to broker at localhost:9092 successful
[tester::#NC5] Sending "ApiVersions" (version: -27508) request (Correlation id: 179874620)
[tester::#NC5] Hexdump of sent "ApiVersions" request:
[tester::#NC5] Idx  | Hex                                             | ASCII
[tester::#NC5] -----+-------------------------------------------------+-----------------
[tester::#NC5] 0000 | 00 00 00 23 00 12 94 8c 0a b8 ab 3c 00 09 6b 61 | ...#.......<..ka
[tester::#NC5] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
[tester::#NC5] 0020 | 6c 69 04 30 2e 31 00                            | li.0.1.
[tester::#NC5]
[your_program] res Ok(())
[tester::#NC5] error reading from connection: read tcp 127.0.0.1:43650->127.0.0.1:9092: read: connection reset by peer
[tester::#NC5] Test failed
[tester::#NC5] Terminating program
[tester::#NC5] Program terminated successfully

View our article on debugging test failures: https://codecrafters.io/debug
(base)

when I run the codecrafters cli locally I get the above output. Saying #PV1 passed. But #NC5 failed. What is #NC5 ? I’d love to move on to the next phase of the challenge, but despite it looking like I passed and followed (woeful) instructions, it is a fail. Can I see the test code, please?

Hi @russelldb, #NC5 is a previous stage. Our tester automatically run tests for earlier stages to ensure no regressions in your solution.

I tried running your code against the previous stages, but it’s actually no longer passing the second stage #NV3 (Send Correlation ID).

Suggestions:

  1. Use our CLI to test against previous stages by running:
codecrafters test --previous
  1. Focus on fixing the early stages first, as later stages depend on them.

Sure, you can find our test cases here:

Ah! Nice. Thanks, I didn’t realise. Yes. I hacked and slashed to get to the point of understanding the current stage failure I didn’t think about prior stages.

But again. This makes no sense. If #NV3 (Send Correlation ID) test is failing, it is because I don’t send 7 as an ID. You mean part of the challenge is still passing test NV3 while actually implementing the protocol?

I’m not entirely sure either—feel free to share your updated code, and we can debug it together!

Closing this thread due to inactivity. If you still need assistance, feel free to reopen or start a new discussion!

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.