Expected Correlation ID to be X, got Y

I’m stuck on Stage #nh4

I have been trying to handle multiple requests from the same client.

So far, the first test always passes, but the second test consistently fails. After examining the logs, it always appears that the first request gets lost, and I mean always. This prompted me to think that the reason for consistently losing the first request in the second test might not be due to my code – bold claim, I know, but I’ve tried everything I could think of and I am losing it –

Here are the logs:

[tester::#NH4] Running tests for Stage #NH4 (Concurrent Clients - Serial requests)
[tester::#NH4] $ ./your_program.sh /tmp/server.properties
[tester::#NH4] Connecting to broker at: localhost:9092
[your_program] Logs from your program will appear here!
[tester::#NH4] Connection to broker at localhost:9092 successful
[tester::#NH4] Sending request 1 of 3: "ApiVersions" (version: 4) request (Correlation id: 1775586861)
[tester::#NH4] Hexdump of sent "ApiVersions" request:
[tester::#NH4] Idx  | Hex                                             | ASCII
[tester::#NH4] -----+-------------------------------------------------+-----------------
[tester::#NH4] 0000 | 00 00 00 23 00 12 00 04 69 d5 4e 2d 00 09 6b 61 | ...#....i.N-..ka
[tester::#NH4] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
[tester::#NH4] 0020 | 6c 69 04 30 2e 31 00                            | li.0.1.
[tester::#NH4]
[your_program] SERVER: new connection have been made : ===========================
[your_program] NEW REQUEST : request's correlation is :1775586861
[tester::#NH4] Hexdump of received "ApiVersions" response:
[tester::#NH4] Idx  | Hex                                             | ASCII
[tester::#NH4] -----+-------------------------------------------------+-----------------
[tester::#NH4] 0000 | 69 d5 4e 2d 00 00 02 00 12 00 03 00 04 00 00 00 | i.N-............
[tester::#NH4] 0010 | 00 00 00                                        | ...
[tester::#NH4]
[tester::#NH4] [Decoder] - .ResponseHeader
[tester::#NH4] [Decoder]   - .correlation_id (1775586861)
[tester::#NH4] [Decoder] - .ResponseBody
[tester::#NH4] [Decoder]   - .error_code (0)
[tester::#NH4] [Decoder]   - .num_api_keys (1)
[tester::#NH4] [Decoder]   - .ApiKeys[0]
[tester::#NH4] [Decoder]     - .api_key (18)
[tester::#NH4] [Decoder]     - .min_version (3)
[tester::#NH4] [Decoder]     - .max_version (4)
[tester::#NH4] [Decoder]     - .TAG_BUFFER
[tester::#NH4] [Decoder]   - .throttle_time_ms (0)
[tester::#NH4] [Decoder]   - .TAG_BUFFER
[tester::#NH4] ✓ Correlation ID: 1775586861
[tester::#NH4] ✓ Error code: 0 (NO_ERROR)
[tester::#NH4] ✓ API keys array is non-empty
[tester::#NH4] ✓ API version 4 is supported for API_VERSIONS
[tester::#NH4] ✓ Test 1 of 3: Passed
[tester::#NH4] Sending request 2 of 3: "ApiVersions" (version: 4) request (Correlation id: 2086313682)
[tester::#NH4] Hexdump of sent "ApiVersions" request:
[tester::#NH4] Idx  | Hex                                             | ASCII
[tester::#NH4] -----+-------------------------------------------------+-----------------
[tester::#NH4] 0000 | 00 00 00 23 00 12 00 04 7c 5a 9e d2 00 09 6b 61 | ...#....|Z....ka
[tester::#NH4] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
[tester::#NH4] 0020 | 6c 69 04 30 2e 31 00                            | li.0.1.
[tester::#NH4]
[your_program] NEW REQUEST : request's correlation is :1668049152
[tester::#NH4] Hexdump of received "ApiVersions" response:
[tester::#NH4] Idx  | Hex                                             | ASCII
[tester::#NH4] -----+-------------------------------------------------+-----------------
[tester::#NH4] 0000 | 63 6c 69 00 00 23 02 00 12 00 03 00 04 00 00 00 | cli..#..........
[tester::#NH4] 0010 | 00 00 00                                        | ...
[tester::#NH4]
[tester::#NH4] [Decoder] - .ResponseHeader
[tester::#NH4] [Decoder]   - .correlation_id (1668049152)
[tester::#NH4] [Decoder] - .ResponseBody
[tester::#NH4] [Decoder]   - .error_code (35)
[tester::#NH4] [Decoder]   - .num_api_keys (1)
[tester::#NH4] [Decoder]   - .ApiKeys[0]
[tester::#NH4] [Decoder]     - .api_key (18)
[tester::#NH4] [Decoder]     - .min_version (3)
[tester::#NH4] [Decoder]     - .max_version (4)
[tester::#NH4] [Decoder]     - .TAG_BUFFER
[tester::#NH4] [Decoder]   - .throttle_time_ms (0)
[tester::#NH4] [Decoder]   - .TAG_BUFFER
[tester::#NH4] Expected Correlation ID to be 2086313682, got 1668049152
[tester::#NH4] Test failed
[tester::#NH4] Terminating program
[your_program] NEW REQUEST : request's correlation is :1818821680
[your_program] NEW REQUEST : request's correlation is :301991036
[your_program] SERVER, error : java.net.SocketException: Broken pipe
[tester::#NH4] Program terminated successfully

the program only recieves/skips to the sconde request - as you can see the .correlation_id recieved by the client matchs what the server prints out as supposedly the first request.

And here’s a snippet of my code:


// the server class is a thread, inside the run function : 
try{
                Socket client = server.accept() ;
                System.out.println("SERVER: new connection have been made : ===========================");
                while(client.getInputStream()!=null){
                    Request request = new Request() ;
                    request.readRequestFromStream(client.getInputStream());
                    Response response = Response.fromRequest(request) ;
                    int responseLength = response.encodeResponse().length ;
              client.getOutputStream().write(PrimitiveOperations.fromIntToByteArray(responseLength));
                    client.getOutputStream().write(response.encodeResponse());
                    client.getOutputStream().flush();
                }
            }catch (IOException e){
                System.out.println("SERVER, error : " + e.toString());
            }

// The read from Stream inside the request : 
public void readRequestFromStream(InputStream rawRequest){
        try{
            this.length = fromByteArrayToInt(rawRequest.readNBytes(4)) ;
            this.header.setApikey(fromByteArrayToShort(rawRequest.readNBytes(2)));
            this.header.setApiVersion(fromByteArrayToShort(rawRequest.readNBytes(2)));
            this.header.setCorrelationId(fromByteArrayToInt(rawRequest.readNBytes(4)));

        } catch (IOException e){
            System.out.println(" Request Service : a problem occurred constructing the request : "+ e.toString());
        }
    }

// here is the from request in the Response: 
    public static Response fromRequest(Request req){
        Response output = new Response() ;
        output.getHeader().setCorrelationId(req.getHeader().getCorrelationId());
        if(req.getHeader().getApiVersion()<=-1 ||req.getHeader().getApiVersion()>=5) {
            output.getBody().setError_code( (short) 35 );
        }
        else{
            output.getBody().setError_code((short)0);
        }
        output.getBody().setArrayLength(2); // logic of all of this should change to handle an array of apiKeys *low priority for now. 
        output.getBody().setApiKey(req.getHeader().getApikey());
        output.getBody().setMinVersion((short)3);
        output.getBody().setMaxVersion((short)4);
        output.getBody().setArray_tagged_fields((byte)0);
        output.getBody().setThrottle_time(0);
        output.getBody().setOuter_tagged_fields((byte)0);

        return output ;
    }

// encode Response : 
public byte[] encodeResponse(){
        ByteBuffer buffer = ByteBuffer.allocate(1024)
                .putInt(header.getCorrelationId())
                .putShort(body.getError_code())
                .put((byte) body.getArrayLength())
                .putShort((short)18)
                .putShort(body.getMinVersion())
                .putShort(body.getMaxVersion())
                .put(body.getArray_tagged_fields())
                .putInt(body.getThrottle_time())
                .put(body.getOuter_tagged_fields())
                .flip() ;

        byte[] resp = new byte[buffer.remaining()] ;
        buffer.get(resp) ;

        return resp ;
    }

@zarhouni21 Could you upload your code to GitHub and share the link? It will be much easier to debug if I can run it directly.

I’ll try to get back to you next week, as I’m a bit tied up this week.

1 Like

Hello Andy, I hope you get the rest you need.
Here is the code uploaded: GitHub - zarhouni21/codecrafters-kafka-java
You can take your time with it

7c5a9ed2 converted to decimal is 2086313682, your code decoded it as

I recommend making sure you are decoding the request properly?

2 Likes

I thought of that being the problem, but it wouldn’t make much sense to consistently decode it right the first test and only decoding it wrong the second test, right - unless I am missing something-? either way I’ll try to examining the decoding and see what i get : D

@zarhouni21 I’ll give you another hint:

  • In the 2nd request, you decoded the correlation ID as 1668049152 which converted to hexadecimal is 636C6900
  • 636C6900 is indeed found in the request, but no way near the beginning of the request where the correlation ID really is.

I don’t read Java very well, so I can’t pinpoint where in your code the issue is, but you aren’t resetting the cursor of your request decoder after finishing reading the first request. This means that you started reading the second request from the 12th byte.

3 Likes

DUDE I HAD NO IDEA THAT THIS WAS A THING THANK YOU SO MUCH THAT IS INDEED THE PROBLEM I am forever grateful that you took from your time to answer this

2 Likes

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.