I’m stuck on Stage “Handle APIVersions requests”
The code works fine on my local machine but timed out on the tester.
I noticed that there are no more logs after “accepted new connection”.
However, the code should be able to print the parsed requests and so on, like this
accepted new connection
Request of size 39: [0, 0, 0, 35, 0, 18, 0, 4, 71, 93, 121, 204, 0, 9, 107, 97, 102, 107, 97, 45, 99, 108, 105, 0, 10, 107, 97, 102, 107, 97, 45, 99, 108, 105, 4, 48, 46, 49, 0]
Request { message_size: 35, header: RequestHeaderV2 { request_api_key: ApiVersions, request_api_version: 4, correlation_id: 1197308364, client_id: "kafka-cli" }, body: Empty }
Response { header: ResponseHeaderV0 { correlation_id: 1197308364 }, body: APIVersionsResponseBodyV4(APIVersionsResponseBodyV4 { error_code: NoError, api_keys: [APIKey { api_key: 18, min_version: 0, max_version: 4 }], throttle_time_ms: 420 }) }
[0, 0, 0, 19, 71, 93, 121, 204, 0, 0, 2, 0, 18, 0, 0, 0, 4, 0, 0, 0, 1, 164, 0]
which means the code is (probably) stuck at reading the request to the buffer
let mut request_buf= Vec::new();
let n = stream.read_to_end(&mut request_buf).unwrap();
println!("Request of size {n}: {:?}", request_buf);
I do not know why it timed out since the read_to_end
API has been stable since Rust 1.0.0, and no error has been shown when running the test locally.
Has anyone encountered the same issue?
Here are my logs:
remote: [tester::#NH4] Running tests for Stage #NH4 (Concurrent Clients - Serial requests)
remote: [tester::#NH4] $ ./your_program.sh /tmp/server.properties
remote: [tester::#NH4] Connecting to broker at: localhost:9092
remote: [your_program] Logs from your program will appear here!
remote: [tester::#NH4] Connection to broker at localhost:9092 successful
remote: [tester::#NH4] Sending request 1 of 3: "ApiVersions" (version: 4) request (Correlation id: 1304410696)
remote: [tester::#NH4] Hexdump of sent "ApiVersions" request:
remote: [tester::#NH4] Idx | Hex | ASCII
remote: [tester::#NH4] -----+-------------------------------------------------+-----------------
remote: [tester::#NH4] 0000 | 00 00 00 23 00 12 00 04 4d bf ba 48 00 09 6b 61 | ...#....M..H..ka
remote: [tester::#NH4] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
remote: [tester::#NH4] 0020 | 6c 69 04 30 2e 31 00 | li.0.1.
remote: [tester::#NH4]
remote: [your_program] accepted new connection
remote: [tester::#NH4] timed out, test exceeded 10 seconds
remote: [tester::#NH4] Test failed
remote: [tester::#NH4] Terminating program
remote: [tester::#NH4] Program terminated successfully
And here’s a snippet of my code:
println!("accepted new connection");
let mut request_buf= Vec::new();
let n = stream.read_to_end(&mut request_buf).unwrap();
println!("Request of size {n}: {:?}", request_buf);
let mut cur = Cursor::new(&*request_buf);
let request = Request::parse(&mut cur).unwrap();
println!("{:?}", request);
let response_header = ResponseHeaderV0 {
correlation_id: request.header.correlation_id,
};
let response_body = match request.header.request_api_key {
RequestApiKey::Produce => ResponseBody::Empty,
RequestApiKey::Fetch => ResponseBody::Empty,
RequestApiKey::ApiVersions => {
let error_code = if request.header.request_api_version < 0 || request.header.request_api_version > 4 {
ErrorCode::UnsupportedVersion
} else {
ErrorCode::NoError
};
let api_versions = APIKey {
api_key: RequestApiKey::ApiVersions as i16,
min_version: 0,
max_version: 4,
};
let api_keys = vec![api_versions];
ResponseBody::APIVersionsResponseBodyV4(
APIVersionsResponseBodyV4 {
error_code,
api_keys,
throttle_time_ms: 420
}
)
}
};
let response = Response {
header: response_header,
body: response_body
};
println!("{:?}", response);
let mut response_buf = Vec::new();
response.write(&mut response_buf);
println!("{:?}", response_buf);
stream.write(&*response_buf).expect("TODO: panic message");
}