I’m stuck on Stage #nh4
I have been trying to handle multiple requests from the same client.
So far, the first test always passes, but the second test consistently fails. After examining the logs, it always appears that the first request gets lost, and I mean always. This prompted me to think that the reason for consistently losing the first request in the second test might not be due to my code – bold claim, I know, but I’ve tried everything I could think of and I am losing it –
Here are the logs:
[tester::#NH4] Running tests for Stage #NH4 (Concurrent Clients - Serial requests)
[tester::#NH4] $ ./your_program.sh /tmp/server.properties
[tester::#NH4] Connecting to broker at: localhost:9092
[your_program] Logs from your program will appear here!
[tester::#NH4] Connection to broker at localhost:9092 successful
[tester::#NH4] Sending request 1 of 3: "ApiVersions" (version: 4) request (Correlation id: 1775586861)
[tester::#NH4] Hexdump of sent "ApiVersions" request:
[tester::#NH4] Idx | Hex | ASCII
[tester::#NH4] -----+-------------------------------------------------+-----------------
[tester::#NH4] 0000 | 00 00 00 23 00 12 00 04 69 d5 4e 2d 00 09 6b 61 | ...#....i.N-..ka
[tester::#NH4] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
[tester::#NH4] 0020 | 6c 69 04 30 2e 31 00 | li.0.1.
[tester::#NH4]
[your_program] SERVER: new connection have been made : ===========================
[your_program] NEW REQUEST : request's correlation is :1775586861
[tester::#NH4] Hexdump of received "ApiVersions" response:
[tester::#NH4] Idx | Hex | ASCII
[tester::#NH4] -----+-------------------------------------------------+-----------------
[tester::#NH4] 0000 | 69 d5 4e 2d 00 00 02 00 12 00 03 00 04 00 00 00 | i.N-............
[tester::#NH4] 0010 | 00 00 00 | ...
[tester::#NH4]
[tester::#NH4] [Decoder] - .ResponseHeader
[tester::#NH4] [Decoder] - .correlation_id (1775586861)
[tester::#NH4] [Decoder] - .ResponseBody
[tester::#NH4] [Decoder] - .error_code (0)
[tester::#NH4] [Decoder] - .num_api_keys (1)
[tester::#NH4] [Decoder] - .ApiKeys[0]
[tester::#NH4] [Decoder] - .api_key (18)
[tester::#NH4] [Decoder] - .min_version (3)
[tester::#NH4] [Decoder] - .max_version (4)
[tester::#NH4] [Decoder] - .TAG_BUFFER
[tester::#NH4] [Decoder] - .throttle_time_ms (0)
[tester::#NH4] [Decoder] - .TAG_BUFFER
[tester::#NH4] ✓ Correlation ID: 1775586861
[tester::#NH4] ✓ Error code: 0 (NO_ERROR)
[tester::#NH4] ✓ API keys array is non-empty
[tester::#NH4] ✓ API version 4 is supported for API_VERSIONS
[tester::#NH4] ✓ Test 1 of 3: Passed
[tester::#NH4] Sending request 2 of 3: "ApiVersions" (version: 4) request (Correlation id: 2086313682)
[tester::#NH4] Hexdump of sent "ApiVersions" request:
[tester::#NH4] Idx | Hex | ASCII
[tester::#NH4] -----+-------------------------------------------------+-----------------
[tester::#NH4] 0000 | 00 00 00 23 00 12 00 04 7c 5a 9e d2 00 09 6b 61 | ...#....|Z....ka
[tester::#NH4] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
[tester::#NH4] 0020 | 6c 69 04 30 2e 31 00 | li.0.1.
[tester::#NH4]
[your_program] NEW REQUEST : request's correlation is :1668049152
[tester::#NH4] Hexdump of received "ApiVersions" response:
[tester::#NH4] Idx | Hex | ASCII
[tester::#NH4] -----+-------------------------------------------------+-----------------
[tester::#NH4] 0000 | 63 6c 69 00 00 23 02 00 12 00 03 00 04 00 00 00 | cli..#..........
[tester::#NH4] 0010 | 00 00 00 | ...
[tester::#NH4]
[tester::#NH4] [Decoder] - .ResponseHeader
[tester::#NH4] [Decoder] - .correlation_id (1668049152)
[tester::#NH4] [Decoder] - .ResponseBody
[tester::#NH4] [Decoder] - .error_code (35)
[tester::#NH4] [Decoder] - .num_api_keys (1)
[tester::#NH4] [Decoder] - .ApiKeys[0]
[tester::#NH4] [Decoder] - .api_key (18)
[tester::#NH4] [Decoder] - .min_version (3)
[tester::#NH4] [Decoder] - .max_version (4)
[tester::#NH4] [Decoder] - .TAG_BUFFER
[tester::#NH4] [Decoder] - .throttle_time_ms (0)
[tester::#NH4] [Decoder] - .TAG_BUFFER
[tester::#NH4] Expected Correlation ID to be 2086313682, got 1668049152
[tester::#NH4] Test failed
[tester::#NH4] Terminating program
[your_program] NEW REQUEST : request's correlation is :1818821680
[your_program] NEW REQUEST : request's correlation is :301991036
[your_program] SERVER, error : java.net.SocketException: Broken pipe
[tester::#NH4] Program terminated successfully
the program only recieves/skips to the sconde request - as you can see the .correlation_id recieved by the client matchs what the server prints out as supposedly the first request.
And here’s a snippet of my code:
// the server class is a thread, inside the run function :
try{
Socket client = server.accept() ;
System.out.println("SERVER: new connection have been made : ===========================");
while(client.getInputStream()!=null){
Request request = new Request() ;
request.readRequestFromStream(client.getInputStream());
Response response = Response.fromRequest(request) ;
int responseLength = response.encodeResponse().length ;
client.getOutputStream().write(PrimitiveOperations.fromIntToByteArray(responseLength));
client.getOutputStream().write(response.encodeResponse());
client.getOutputStream().flush();
}
}catch (IOException e){
System.out.println("SERVER, error : " + e.toString());
}
// The read from Stream inside the request :
public void readRequestFromStream(InputStream rawRequest){
try{
this.length = fromByteArrayToInt(rawRequest.readNBytes(4)) ;
this.header.setApikey(fromByteArrayToShort(rawRequest.readNBytes(2)));
this.header.setApiVersion(fromByteArrayToShort(rawRequest.readNBytes(2)));
this.header.setCorrelationId(fromByteArrayToInt(rawRequest.readNBytes(4)));
} catch (IOException e){
System.out.println(" Request Service : a problem occurred constructing the request : "+ e.toString());
}
}
// here is the from request in the Response:
public static Response fromRequest(Request req){
Response output = new Response() ;
output.getHeader().setCorrelationId(req.getHeader().getCorrelationId());
if(req.getHeader().getApiVersion()<=-1 ||req.getHeader().getApiVersion()>=5) {
output.getBody().setError_code( (short) 35 );
}
else{
output.getBody().setError_code((short)0);
}
output.getBody().setArrayLength(2); // logic of all of this should change to handle an array of apiKeys *low priority for now.
output.getBody().setApiKey(req.getHeader().getApikey());
output.getBody().setMinVersion((short)3);
output.getBody().setMaxVersion((short)4);
output.getBody().setArray_tagged_fields((byte)0);
output.getBody().setThrottle_time(0);
output.getBody().setOuter_tagged_fields((byte)0);
return output ;
}
// encode Response :
public byte[] encodeResponse(){
ByteBuffer buffer = ByteBuffer.allocate(1024)
.putInt(header.getCorrelationId())
.putShort(body.getError_code())
.put((byte) body.getArrayLength())
.putShort((short)18)
.putShort(body.getMinVersion())
.putShort(body.getMaxVersion())
.put(body.getArray_tagged_fields())
.putInt(body.getThrottle_time())
.put(body.getOuter_tagged_fields())
.flip() ;
byte[] resp = new byte[buffer.remaining()] ;
buffer.get(resp) ;
return resp ;
}