I’m stuck on Stage #(change to your stage, ex. #JM3).
I’ve tried .. (mention what you’ve tried so far).
Here are my logs:
Running tests for Stage #VT6 (Listing Partitions - List for an unknown topic)
remote: [tester::#VT6] $ ./your_program.sh /tmp/server.properties
remote: [tester::#VT6] Connecting to broker at: localhost:9092
remote: [your_program] Logs from your program will appear here!
remote: [your_program] Server is running on port 9092...
remote: [tester::#VT6] Connection to broker at localhost:9092 successful
remote: [tester::#VT6] Sending "DescribeTopicPartitions" (version: 0) request (Correlation id: 435372330)
remote: [tester::#VT6] Hexdump of sent "DescribeTopicPartitions" request:
remote: [tester::#VT6] Idx | Hex | ASCII
remote: [tester::#VT6] -----+-------------------------------------------------+-----------------
remote: [tester::#VT6] 0000 | 00 00 00 31 00 4b 00 00 19 f3 41 2a 00 0c 6b 61 | ...1.K....A*..ka
remote: [tester::#VT6] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 12 75 6e 6b | fka-tester...unk
remote: [tester::#VT6] 0020 | 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 61 7a 00 00 | nown-topic-saz..
remote: [tester::#VT6] 0030 | 00 00 01 ff 00 | .....
remote: [tester::#VT6]
remote: [your_program] Accepted connection from ('127.0.0.1', 46550)
remote: [your_program] b'\x19\xf3A*' 435372330
remote: [your_program] 12
remote: [your_program] Error handling client: 'bytes' object has no attribute 'to_bytes'
remote: [tester::#VT6] EOF
remote: [tester::#VT6] Test failed
remote: [tester::#VT6] Terminating program
remote: [tester::#VT6] Program terminated successfully
And here’s a snippet of my code:
import socket
import threading
# Encoding helper functions
def encode_unsigned_varint(value):
"""Encode an integer as UNSIGNED_VARINT."""
encoded = []
while value > 0x7F:
encoded.append((value & 0x7F) | 0x80)
value >>= 7
encoded.append(value)
return bytes(encoded)
def encode_compact_array(items):
"""Encode a COMPACT_ARRAY."""
if items is None:
return b'\x00' # Null array
encoded_items = b"".join(items)
encoded_length = encode_unsigned_varint(len(items) + 1)
return encoded_length + encoded_items
def encode_string(value):
"""Encode a string with its length as UNSIGNED_VARINT."""
encoded_length = encode_unsigned_varint(len(value) + 1)
return encoded_length + value.encode("utf-8")
def encode_uuid():
"""Encode a UUID as 16 zero bytes."""
return b'\x00' * 16
# Request processing helpers
def get_corr_id(data):
"""Extract the correlation ID from the incoming data."""
corr_id = data[8:12]
print(corr_id, int.from_bytes(corr_id, byteorder="big"))
return corr_id
def check_api_version(data):
"""Check the API version and return the appropriate error code."""
err_code = 35
api_version = int.from_bytes(data[6:8], byteorder="big")
if 0 <= api_version <= 4:
err_code = 0
return err_code.to_bytes(2, byteorder="big")
def get_topic_and_partition(data):
"""Extract topic name and partition index from the DescribeTopicPartitions request."""
topic_length = int.from_bytes(data[12:14], byteorder='big')
topic_name = data[14:14 + topic_length].decode("utf-8")
partition_index = int.from_bytes(data[14 + topic_length:14 + topic_length + 4], byteorder='big')
return topic_name, partition_index
def response_api_key_75(id, cursor, array_length, length, topic_name):
tag_buffer = b"\x00"
response_header = id.to_bytes(4, byteorder="big") + tag_buffer
error_code = int(3).to_bytes(2, byteorder="big")
throttle_time_ms = int(0).to_bytes(4, byteorder="big")
is_internal = int(0).to_bytes(1, byteorder="big")
topic_authorized_operations = b"\x00\x00\x0d\xf8"
topic_id = int(0).to_bytes(16, byteorder="big")
partition_array = b"\x01"
array_length = array_length if isinstance(array_length, int) else int(array_length)
length = length if isinstance(length, int) else int(length)
response_body = (
throttle_time_ms
+ int(array_length).to_bytes(1, byteorder="big")
+ error_code
+ int(length).to_bytes(1, byteorder="big")
+ topic_name
+ topic_id
+ is_internal
+ partition_array
+ topic_authorized_operations
+ tag_buffer
+ cursor
+ tag_buffer
)
total_len = len(response_header) + len(response_body)
return int(total_len).to_bytes(4, byteorder="big") + response_header + response_body
def create_msg(id, api_key: int, error_code: int = 0):
response_header = id.to_bytes(4, byteorder="big")
err = error_code.to_bytes(2, byteorder="big")
# Client handling
def handle_client(addr, conn):
"""
Handle incoming client requests:
- Decode data.
- Construct a response according to Kafka protocol.
"""
try:
# # Simulate known topics and their partitions
# topics = ["example_topic"]
# response_partition_limit = 10
# cursor_data = {
# "topic_name": "example_topic",
# "partition_index": 2,
# }
# topics_encoded=b""
# for topic in topics:
# topic_encoded=(
# encode_string(topic)+b"\x00"
# )
# topics_encoded+=topic_encoded
while True:
# Wait for data from the client
data = conn.recv(1024)
if not data: # Client disconnected
print(f"Client {addr} disconnected")
break
# Step 1: Extract correlation ID and check API version
api_key = int.from_bytes(data[4:6], byteorder="big")
corr_id = get_corr_id(data)
error_code = check_api_version(data)
# Step 2: Extract topic name and partition index
topic_name, partition_index = get_topic_and_partition(data)
# Step 3: Validate topic and partition
# if topic_name not in topics or partition_index >= topics.get(topic_name, 0):
# error_code = int(3).to_bytes(2, byteorder='big')
# Step 4: Encode the topic response
encoded_topic_name = encode_string(topic_name)
topic_id = encode_uuid()
partitions = encode_unsigned_varint(0)
response_unknown = error_code + encoded_topic_name + topic_id + partitions
# Adjust response size
response_size = len(response_unknown)
response_unknown = (
response_size.to_bytes(4, byteorder='big')
+ response_unknown)
# Step 5: Construct the API keys array
api_keys = [
int(18).to_bytes(2, byteorder="big") + # INT16 api_key
int(0).to_bytes(2, byteorder="big") + # INT16 min_version
int(4).to_bytes(2, byteorder="big") + # INT16 max_version
int(0).to_bytes(1, byteorder="big"), # Extra byte (compact array format)
int(75).to_bytes(2, byteorder="big") + # INT16 api_key
int(0).to_bytes(2, byteorder="big") + # INT16 min_version
int(0).to_bytes(2, byteorder="big") + # INT16 max_version
int(0).to_bytes(1, byteorder="big") # Extra byte (compact array format)
]
compact_api_keys = encode_compact_array(api_keys)
# Step 6: Combine response components
response = (
(4 + len(corr_id) + len(error_code) + len(compact_api_keys) + 1).to_bytes(4, byteorder="big") # Response size
+ corr_id
+ error_code
+ compact_api_keys # COMPACT_ARRAY for API keys
+ int(0).to_bytes(4, byteorder="big") # Throttle time
)
if api_key == 75:
client_id_len = int.from_bytes(data[12:14], byteorder="big", signed=False)
print(client_id_len)
if client_id_len > 0:
cliend_id = data[14 : 14 + client_id_len]
tagged = data[14 + client_id_len]
else:
cliend_id = ""
tagged = [14]
array_len_finder = 14 + client_id_len + 1
array_length = data[array_len_finder]
topic_name_length = data[array_len_finder + 1]
topic_name_starter = array_len_finder + 2
topic_name = bytes(
data[
topic_name_starter : topic_name_starter
+ (topic_name_length - 1)
]
)
cursor_length = topic_name_starter + topic_name_length + 4
cursor = data[cursor_length]
cursor_bytes = cursor if isinstance(cursor, bytes) else int(cursor).to_bytes(1, byteorder="big")
response = response_api_key_75(
corr_id,
cursor_bytes,
array_length,
topic_name_length,
topic_name,
)
conn.sendall(response)
else:
version = {0, 1, 2, 3, 4}
errorr_code = 0 if error_code in version else 35
response = create_msg(corr_id,api_key, errorr_code)
conn.sendall(response)
# Step 7: Send the response back to the client
# conn.sendall(response_unknown)
except Exception as e:
print(f"Error handling client: {e}")
finally:
conn.close()
# Server setup
def main():
"""
Main function to set up the server:
- Listens on port 9092.
- Accepts multiple client connections using threads.
"""
print("Logs from your program will appear here!")
# Create a server socket on localhost and port 9092
server = socket.create_server(("localhost", 9092), reuse_port=True)
print("Server is running on port 9092...")
try:
while True:
# Accept a new client connection
conn, addr = server.accept()
print(f"Accepted connection from {addr}")
# Handle the client in a separate thread
client_thread = threading.Thread(target=handle_client, args=(addr, conn))
client_thread.start()
except KeyboardInterrupt:
print("Shutting down server...")
finally:
server.close()
if __name__ == "__main__":
main()