Time out in server

I’m stuck on Stage “Handle APIVersions requests”

The code works fine on my local machine but timed out on the tester.
I noticed that there are no more logs after “accepted new connection”.
However, the code should be able to print the parsed requests and so on, like this

accepted new connection
Request of size 39: [0, 0, 0, 35, 0, 18, 0, 4, 71, 93, 121, 204, 0, 9, 107, 97, 102, 107, 97, 45, 99, 108, 105, 0, 10, 107, 97, 102, 107, 97, 45, 99, 108, 105, 4, 48, 46, 49, 0]
Request { message_size: 35, header: RequestHeaderV2 { request_api_key: ApiVersions, request_api_version: 4, correlation_id: 1197308364, client_id: "kafka-cli" }, body: Empty }
Response { header: ResponseHeaderV0 { correlation_id: 1197308364 }, body: APIVersionsResponseBodyV4(APIVersionsResponseBodyV4 { error_code: NoError, api_keys: [APIKey { api_key: 18, min_version: 0, max_version: 4 }], throttle_time_ms: 420 }) }
[0, 0, 0, 19, 71, 93, 121, 204, 0, 0, 2, 0, 18, 0, 0, 0, 4, 0, 0, 0, 1, 164, 0]

which means the code is (probably) stuck at reading the request to the buffer

                let mut request_buf= Vec::new();
                let n = stream.read_to_end(&mut request_buf).unwrap();
                println!("Request of size {n}: {:?}", request_buf);

I do not know why it timed out since the read_to_end API has been stable since Rust 1.0.0, and no error has been shown when running the test locally.

Has anyone encountered the same issue?

Here are my logs:

remote: [tester::#NH4] Running tests for Stage #NH4 (Concurrent Clients - Serial requests)
remote: [tester::#NH4] $ ./your_program.sh /tmp/server.properties
remote: [tester::#NH4] Connecting to broker at: localhost:9092
remote: [your_program] Logs from your program will appear here!
remote: [tester::#NH4] Connection to broker at localhost:9092 successful
remote: [tester::#NH4] Sending request 1 of 3: "ApiVersions" (version: 4) request (Correlation id: 1304410696)
remote: [tester::#NH4] Hexdump of sent "ApiVersions" request: 
remote: [tester::#NH4] Idx  | Hex                                             | ASCII
remote: [tester::#NH4] -----+-------------------------------------------------+-----------------
remote: [tester::#NH4] 0000 | 00 00 00 23 00 12 00 04 4d bf ba 48 00 09 6b 61 | ...#....M..H..ka
remote: [tester::#NH4] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
remote: [tester::#NH4] 0020 | 6c 69 04 30 2e 31 00                            | li.0.1.
remote: [tester::#NH4] 
remote: [your_program] accepted new connection
remote: [tester::#NH4] timed out, test exceeded 10 seconds
remote: [tester::#NH4] Test failed
remote: [tester::#NH4] Terminating program
remote: [tester::#NH4] Program terminated successfully

And here’s a snippet of my code:

                println!("accepted new connection");
                let mut request_buf= Vec::new();
                let n = stream.read_to_end(&mut request_buf).unwrap();
                println!("Request of size {n}: {:?}", request_buf);
                let mut cur = Cursor::new(&*request_buf);
                let request = Request::parse(&mut cur).unwrap();
                println!("{:?}", request);
                let response_header = ResponseHeaderV0 {
                    correlation_id: request.header.correlation_id,
                };
                let response_body = match request.header.request_api_key {
                    RequestApiKey::Produce => ResponseBody::Empty,
                    RequestApiKey::Fetch => ResponseBody::Empty,
                    RequestApiKey::ApiVersions => {
                        let error_code = if request.header.request_api_version < 0 || request.header.request_api_version > 4 {
                            ErrorCode::UnsupportedVersion
                        } else {
                            ErrorCode::NoError
                        };
                        let api_versions = APIKey {
                            api_key: RequestApiKey::ApiVersions as i16,
                            min_version: 0,
                            max_version: 4,
                        };
                        let api_keys = vec![api_versions];
                        ResponseBody::APIVersionsResponseBodyV4(
                            APIVersionsResponseBodyV4 {
                                error_code,
                                api_keys,
                                throttle_time_ms: 420
                            }
                        )
                    }
                };

                let response = Response {
                    header: response_header,
                    body: response_body
                };

                println!("{:?}", response);

                let mut response_buf = Vec::new();
                response.write(&mut response_buf);
                println!("{:?}", response_buf);
                stream.write(&*response_buf).expect("TODO: panic message");
            }

Hey @JensLiu, could you upload your code to GitHub and share the link? It will be much easier to debug if I can run it directly.

@andy1li Sure, I’ve uploaded it to GitHub, here’s the link GitHub - JensLiu/JensLiu-codecrafters-kafka-rust
Thank you so much for helping out :smiling_face_with_three_hearts:

@JensLiu Sorry for the delayed response!

There might be multiple issues at play. The first thing I noticed is that TAG_BUFFER is taking up 4 bytes:

However, it should be only 1 byte instead.

Let me know if you need help with other issues as well!

@andy1li Problem solved after changing the size of TAG_BUFFER! Thank you so much for the assistance!! :partying_face:

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.