Hey, I’m stuck on Stage #pv1 with an error “error reading from connection: unexpected EOF”
Here are my logs:
Debug = true
[tester::#PV1] Running tests for Stage #PV1 (Handle APIVersions requests)
[tester::#PV1] $ ./your_program.sh /tmp/server.properties
[tester::#PV1] Connecting to broker at: localhost:9092
[your_program] Logs from your program will appear here!
[your_program] Connection accepted
[tester::#PV1] Connection to broker at localhost:9092 successful
[tester::#PV1] Sending "ApiVersions" (version: 4) request (Correlation id: 1556724858)
[tester::#PV1] Hexdump of sent "ApiVersions" request:
[tester::#PV1] Idx | Hex | ASCII
[tester::#PV1] -----+-------------------------------------------------+-----------------
[tester::#PV1] 0000 | 00 00 00 23 00 12 00 04 5c c9 bc 7a 00 09 6b 61 | ...#....\..z..ka
[tester::#PV1] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
[tester::#PV1] 0020 | 6c 69 04 30 2e 31 00 | li.0.1.
[tester::#PV1]
[tester::#PV1] error reading from connection: unexpected EOF
[tester::#PV1] Test failed
[tester::#PV1] Terminating program
[your_program] Correlation ID: 1556724858
[your_program] Api Version: 4
[your_program] Request type is: ApiVersions
[your_program] [0 0 0 35 92 201 188 122 0 0]
[tester::#PV1] Program terminated successfully
Hey @scofield-ua , could you upload your code to GitHub and share the link? It will be much easier to debug if I can run it directly.
Here is the code for main.go
, thanks:
package main
import (
"encoding/binary"
"fmt"
"net"
"os"
)
// Ensures gofmt doesn't remove the "net" and "os" imports in stage 1 (feel free to remove this!)
var _ = net.Listen
var _ = os.Exit
func main() {
// You can use print statements as follows for debugging, they'll be visible when running tests.
fmt.Println("Logs from your program will appear here!")
// Uncomment this block to pass the first stage
//
l, err := net.Listen("tcp", "0.0.0.0:9092")
if err != nil {
fmt.Println("Failed to bind to port 9092")
os.Exit(1)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting connection: ", err.Error())
os.Exit(1)
}
fmt.Println("Connection accepted")
go processRequest(conn)
}
}
func processRequest(conn net.Conn) {
defer conn.Close()
reqBuff := make([]byte, 1024)
n, err := conn.Read(reqBuff)
if err != nil {
fmt.Println("Error reading:", err)
return
}
if n < 8 {
fmt.Println("Received data is too short to contain a correlation ID")
return
}
messageSize := reqBuff[:4]
requestApiKey := reqBuff[4:6]
requestApiVer := reqBuff[6:8]
correlationId := reqBuff[8:12]
correlationIdN := binary.BigEndian.Uint32(correlationId)
requestApiKeyN := binary.BigEndian.Uint16(requestApiKey)
requestApiVerN := binary.BigEndian.Uint16(requestApiVer)
fmt.Println("Correlation ID: ", correlationIdN)
fmt.Println("Api Version: ", requestApiVerN)
if requestApiVerN > 4 {
fmt.Println("ApiVersions not valid")
response := messageSize
response = append(response, correlationId...)
response = append(response, []byte{0, 35}...)
fmt.Println(response)
conn.Write(response)
} else {
// requst: ApiVersions
if requestApiKeyN == 18 {
fmt.Println("Request type is: ApiVersions")
response := messageSize
response = append(response, correlationId...)
response = append(response, []byte{0, 0}...)
fmt.Println(response)
conn.Write(response)
return
}
response := messageSize
response = append(response, requestApiKey...)
response = append(response, requestApiVer...)
response = append(response, correlationId...)
fmt.Println(response)
conn.Write(response)
}
}
@scofield-ua It doesn’t look like a body is included in the APIVersions response.
For reference, here’s what the body should look like:
1 Like
I see, thanks for the tip.
Here is updated code which now include response body for API Versions request:
package main
import (
"encoding/binary"
"fmt"
"net"
"os"
)
// Ensures gofmt doesn't remove the "net" and "os" imports in stage 1 (feel free to remove this!)
var _ = net.Listen
var _ = os.Exit
func main() {
// You can use print statements as follows for debugging, they'll be visible when running tests.
fmt.Println("Logs from your program will appear here!")
// Uncomment this block to pass the first stage
//
l, err := net.Listen("tcp", "0.0.0.0:9092")
if err != nil {
fmt.Println("Failed to bind to port 9092")
os.Exit(1)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting connection: ", err.Error())
os.Exit(1)
}
fmt.Println("Connection accepted")
go processRequest(conn)
}
}
func processRequest(conn net.Conn) {
defer conn.Close()
reqBuff := make([]byte, 1024)
n, err := conn.Read(reqBuff)
if err != nil {
fmt.Println("Error reading:", err)
return
}
if n < 8 {
fmt.Println("Received data is too short to contain a correlation ID")
return
}
messageSize := reqBuff[:4]
requestApiKey := reqBuff[4:6]
requestApiVer := reqBuff[6:8]
correlationId := reqBuff[8:12]
correlationIdN := binary.BigEndian.Uint32(correlationId)
requestApiKeyN := binary.BigEndian.Uint16(requestApiKey)
requestApiVerN := binary.BigEndian.Uint16(requestApiVer)
fmt.Println("Correlation ID: ", correlationIdN)
fmt.Println("Api Key: ", requestApiKeyN)
fmt.Println("Api Version: ", requestApiVerN)
if requestApiVerN > 4 {
fmt.Println("ApiVersions not valid")
response := messageSize
response = append(response, correlationId...)
response = append(response, []byte{0, 35}...)
fmt.Println(response)
conn.Write(response)
} else {
// requst: ApiVersions
if requestApiKeyN == 18 {
fmt.Println("Request type is: ApiVersions")
response := messageSize
response = append(response, correlationId...)
response = append(response, []byte{0, 0}...) // error code (0 - no error)
response = append(response, []byte{2}...) // num of api keys (num of keys + 1)
response = append(response, []byte{0, 18}...) // api key
response = append(response, []byte{0, 0}...) // min api version
response = append(response, []byte{0, 4}...) // max api version
response = append(response, []byte{0}...) // tag buffer
response = append(response, []byte{0, 0, 0, 0}...) // throttle time ms
response = append(response, []byte{0}...) // tag buffer
fmt.Println(response)
conn.Write(response)
return
}
response := messageSize
response = append(response, requestApiKey...)
response = append(response, requestApiVer...)
response = append(response, correlationId...)
fmt.Println(response)
conn.Write(response)
}
}
Logs:
[tester::#PV1] Running tests for Stage #PV1 (Handle APIVersions requests)
[tester::#PV1] $ ./your_program.sh /tmp/server.properties
[tester::#PV1] Connecting to broker at: localhost:9092
[your_program] Logs from your program will appear here!
[tester::#PV1] Connection to broker at localhost:9092 successful
[tester::#PV1] Sending "ApiVersions" (version: 4) request (Correlation id: 1402220540)
[your_program] Connection accepted
[tester::#PV1] Hexdump of sent "ApiVersions" request:
[tester::#PV1] Idx | Hex | ASCII
[tester::#PV1] -----+-------------------------------------------------+-----------------
[tester::#PV1] 0000 | 00 00 00 23 00 12 00 04 53 94 2f fc 00 09 6b 61 | ...#....S./...ka
[tester::#PV1] 0010 | 66 6b 61 2d 63 6c 69 00 0a 6b 61 66 6b 61 2d 63 | fka-cli..kafka-c
[tester::#PV1] 0020 | 6c 69 04 30 2e 31 00 | li.0.1.
[tester::#PV1]
[your_program] Correlation ID: 1402220540
[your_program] Api Key: 18
[tester::#PV1] error reading from connection: unexpected EOF
[tester::#PV1] Test failed
[tester::#PV1] Terminating program
[your_program] Api Version: 4
[your_program] Request type is: ApiVersions
[your_program] [0 0 0 35 83 148 47 252 0 0 1 0 18 0 0 0 4 0 0 0 0 0 0]
[tester::#PV1] Program terminated successfully
In all of your responses, you are using a hardcoded message size which is equal to the request’s message size, which is incorrect.
messageSize := reqBuff[:4]
The actual logic is first, you should read 4
bytes from the network (Big Endian Unsigned Int), denoting the message length. Then you should read that many more bytes from the network, and parse them as a Kafka request! The same is true for response as well. The first 4 bytes of the response should be the actual length of the response body in bytes.
You were able to pass the previous stages coincidentally as the request and response bodies happened to be of the same size. This fails for stage #PV1
.
Here is one way to handle the same:
if requestApiKeyN == 18 {
fmt.Println("Request type is: ApiVersions")
// response := messageSize
response := make([]byte, 0) // Initialize an empty byte slice for the response
response = append(response, correlationId...)
response = append(response, []byte{0, 0}...) // error code (0 - no error)
response = append(response, []byte{2}...) // num of api keys (num of keys + 1)
response = append(response, []byte{0, 18}...) // api key
response = append(response, []byte{0, 0}...) // min api version
response = append(response, []byte{0, 4}...) // max api version
response = append(response, []byte{0}...) // tag buffer
response = append(response, []byte{0, 0, 0, 0}...) // throttle time ms
response = append(response, []byte{0}...) // tag buffer
fmt.Println(response)
messageLen := len(response)
dataToSend := binary.BigEndian.AppendUint32(nil, uint32(messageLen))
dataToSend = append(dataToSend, response...)
conn.Write(dataToSend)
return
}
You might want to refactor the current implementation a bit to handle this in a more general manner for upcoming stages!
2 Likes
Yeah, you were right.
Looks like I’m not fully understand the format of request\response.
1 Like
system
Closed
June 19, 2025, 12:31am
13
This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.