I’m stuck on Stage #VT6
My current implementation is returning the correct response , at least appears to be ^^, but still get as tested failed:
[tester::#VT6] Expected TopicResponse[0] Topic Name to be unknown-topic-paz, got unknown-topic-paz
Here are my logs:
your_program] Handling request: &{0 75 0 1623390873 kafka-tester [0 2 18 117 110 107 110 111 119 110 45 116 111 112 105 99 45 112 97 122 0 0 0 0 1 255 0]}
[tester::#VT6] Hexdump of received "DescribeTopicPartitions" response:
[tester::#VT6] Idx | Hex | ASCII
[tester::#VT6] -----+-------------------------------------------------+-----------------
[tester::#VT6] 0000 | 00 00 00 3e 60 c2 fa 99 00 00 00 00 00 02 00 03 | ...>`...........
[tester::#VT6] 0010 | 13 75 6e 6b 6e 6f 77 6e 2d 74 6f 70 69 63 2d 70 | .unknown-topic-p
[tester::#VT6] 0020 | 61 7a 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | az..............
[tester::#VT6] 0030 | 00 00 00 00 00 01 00 00 00 00 00 01 00 00 00 00 | ................
[tester::#VT6] 0040 | 00 00 | ..
[tester::#VT6]
[tester::#VT6] [Decoder] - .ResponseHeader
[tester::#VT6] [Decoder] - .correlation_id (1623390873)
[tester::#VT6] [Decoder] - .TAG_BUFFER
[tester::#VT6] [Decoder] - .ResponseBody
[tester::#VT6] [Decoder] - .throttle_time_ms (0)
[tester::#VT6] [Decoder] - .topic.length (1)
[tester::#VT6] [Decoder] - .Topics[0]
[tester::#VT6] [Decoder] - .error_code (3)
[tester::#VT6] [Decoder] - .name (unknown-topic-paz)
[tester::#VT6] [Decoder] - .topic_id (00000000-0000-0000-0000-000000000000)
[tester::#VT6] [Decoder] - .is_internal (false)
[tester::#VT6] [Decoder] - .num_partitions (0)
[tester::#VT6] [Decoder] - .topic_authorized_operations (16777216)
[tester::#VT6] [Decoder] - .TAG_BUFFER
[tester::#VT6] [Decoder] - .next_cursor
[tester::#VT6] [Decoder] - .topic_name ()
[tester::#VT6] [Decoder] - .partition_index (0)
[tester::#VT6] [Decoder] - .TAG_BUFFER
[tester::#VT6] [Decoder] - .TAG_BUFFER
[tester::#VT6] ✓ Correlation ID: 1623390873
[tester::#VT6] ✓ Throttle Time: 0
[tester::#VT6] ✓ TopicResponse[0] Error code: 3
[tester::#VT6] Expected TopicResponse[0] Topic Name to be unknown-topic-paz, got unknown-topic-paz
[tester::#VT6] Test failed
[tester::#VT6] Terminating program
[your_program] Handling DescribeTopicPartition request &{0 75 0 1623390873 kafka-tester [0 2 18 117 110 107 110 111 119 110 45 116 111 112 105 99 45 112 97 122 0 0 0 0 1 255 0]}
[your_program] Partition limit: 130816
[your_program] Parsed topic name: [unknown-topic-paz]
[your_program] Writing ApiVersions response:
[your_program] &{CorrelationID:1623390873 Topics:[{TopicName:unknown-topic-paz TopicID:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] ErrorCode:3 Partitions:[] TaggedFields:0}] TaggedFields:0}
[your_program] Final buffer: [0 0 0 62 96 194 250 153 0 0 0 0 0 2 0 3 19 117 110 107 110 111 119 110 45 116 111 112 105 99 45 112 97 122 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 0]
[tester::#VT6] Program terminated successfully
And here’s a snippet of my code:
func writeDescribeTopicPartitionResponse(conn net.Conn, response *DescribeTopicPartitionResponse) error {
fmt.Printf("Writing ApiVersions response:\n %+v\n", response)
// Serialize the response
buff := make([]byte, 0)
// Write the correlation ID
corrId := make([]byte, 4)
binary.BigEndian.PutUint32(corrId, uint32(response.CorrelationID))
buff = append(buff, corrId...)
//fmt.Println("Value after correlationID")
//fmt.Println(buff)
buff = append(buff, 0) // tag buffer
throttleTime := make([]byte, 4)
binary.BigEndian.PutUint32(throttleTime, uint32(0))
buff = append(buff, throttleTime...)
//fmt.Println("Value after throttleTime")
//fmt.Println(buff)
buff = append(buff, byte(len(response.Topics)+1))
//fmt.Println("Value after topics array length")
//fmt.Println(buff)
for _, topic := range response.Topics {
errorCode := make([]byte, 2)
binary.BigEndian.PutUint16(errorCode, uint16(3))
buff = append(buff, errorCode...)
//fmt.Println("Value after topic error code")
//fmt.Println(buff)
buff = append(buff, uint8(len(topic.TopicName)+1))
buff = append(buff, []byte(topic.TopicName)...)
buff = append(buff, 0) // tag buffer after topic name
//fmt.Println("Value after topic name length")
//fmt.Println(buff)
buff = append(buff, topic.TopicID[:]...)
//fmt.Println("Value after topic ID")
//fmt.Println(buff)
buff = append(buff, 0) // isInternal
//fmt.Println("Value after isInternal")
//fmt.Println(buff)
buff = append(buff, 1) // partitionArray
//fmt.Println("Value after partition array length")
//fmt.Println(buff)
authorizedOperation := make([]byte, 4)
binary.BigEndian.PutUint32(authorizedOperation, uint32(0))
buff = append(buff, authorizedOperation...)
//fmt.Print("Value after authorized operation")
//fmt.Println(buff)
buff = append(buff, 0) // tagbuffer
//fmt.Println("Value after topic tag buffer")
//fmt.Println(buff)
}
// Write next_cursor as a valid empty COMPACT_STRING
buff = append(buff, 1) // length = 1 (VARINT)
buff = append(buff, 0) // tag buffer
// Write partition_index (INT32, 4 bytes, usually 0)
partitionIndex := make([]byte, 4)
binary.BigEndian.PutUint32(partitionIndex, 0)
buff = append(buff, partitionIndex...)
// Write tag buffer after partition_index
buff = append(buff, 0)
// Calculate the size before prepending
totalLen := make([]byte, 4)
binary.BigEndian.PutUint32(totalLen, uint32(len(buff)))
finalBuff := append(totalLen, buff...)
fmt.Println("Final buffer: ", finalBuff)
_, err := conn.Write(finalBuff)
return err
I’m still getting familiar with golang syntax/language, so I believe it could be better written, so I apologize in antecedence
I also found this other question Unexpected EoD in List for an Unknown Topic #VT6 , that to some point reach the same error, but looking into my implementation and changing to uint8 did not help.
This is my first project and question in the Forum / platform, please let me know if I should share more details to better assist .
Thank you