Hi everyone,
I’m currently working on the task to implement the ApiVersions response and need to add the entry for the DescribeTopicPartitions
API (API key 75
) in the response.
Below is my current implementation, and I’m facing some issues. The logic seems almost correct, but the response body format doesn’t seem to be accepted in the test, specifically regarding the number of API entries and the handling of the DescribeTopicPartitions
API.
import socket # noqa: F401
import threading
HOST = 'localhost'
PORT = 9092
SUPPORTED_API_VERSIONS = set(range(0, 5))
SUPPORTED_API_KEYS = {18, 75} # APIVersions (18), DescribeTopicPartitions (75)
UNSUPPORTED_VERSION_ERROR_CODE = 35
UNSUPPORTED_API_KEY_ERROR_CODE = 35 # Using same error code for consistency
NO_ERROR = 0
def parse_request_length(header: bytes) -> int:
return int.from_bytes(header, byteorder="big", signed=True)
def parse_request(request: bytes):
request_api_key = int.from_bytes(request[:2], byteorder="big", signed=True)
request_api_version = int.from_bytes(request[2:4], byteorder="big", signed=True)
correlation_id = int.from_bytes(request[4:8], byteorder="big", signed=True)
# Parse client_id as a Kafka STRING (2-byte length + UTF-8 bytes)
client_id_len = int.from_bytes(request[8:10], byteorder="big", signed=True)
client_id = request[10:10 + client_id_len].decode("utf-8")
return request_api_key, request_api_version, correlation_id, client_id
def create_response(api_key, api_version, id, client_id):
message_bytes = id.to_bytes(4, byteorder="big", signed=True)
throttle_time_ms = 0
tag_buffer = b"\x00" # UNSIGNED_VARINT for empty tag buffer
# Check if API key is supported
if api_key not in SUPPORTED_API_KEYS:
error_bytes = UNSUPPORTED_API_KEY_ERROR_CODE.to_bytes(2, byteorder="big", signed=True)
message_bytes += error_bytes
# Minimal response: no API entries, just throttle time and tag buffer
message_bytes += int(0).to_bytes(4, byteorder="big", signed=True) # Empty API array
message_bytes += throttle_time_ms.to_bytes(4, byteorder="big", signed=True)
message_bytes += tag_buffer
else:
# Handle supported API keys
if api_version in SUPPORTED_API_VERSIONS:
error_bytes = NO_ERROR.to_bytes(2, byteorder="big", signed=True)
else:
error_bytes = UNSUPPORTED_VERSION_ERROR_CODE.to_bytes(2, byteorder="big", signed=True)
message_bytes += error_bytes
# Number of API entries (2: one for APIVersions, one for DescribeTopicPartitions)
message_bytes += int(0).to_bytes(1, byteorder="big", signed=True)
# API entry for APIVersions (API key 18)
message_bytes += int(18).to_bytes(2, byteorder="big", signed=True) # API key
message_bytes += int(0).to_bytes(2, byteorder="big", signed=True) # Min version
message_bytes += int(4).to_bytes(2, byteorder="big", signed=True) # Max version
message_bytes += tag_buffer
# API entry for DescribeTopicPartitions (API key 75)
message_bytes += int(75).to_bytes(2, byteorder="big", signed=True) # API key
message_bytes += int(0).to_bytes(2, byteorder="big", signed=True) # Min version
message_bytes += int(0).to_bytes(2, byteorder="big", signed=True) # Max version
message_bytes += tag_buffer
# Throttle time and final tag buffer
message_bytes += throttle_time_ms.to_bytes(4, byteorder="big", signed=True)
message_bytes += tag_buffer
req_len = len(message_bytes).to_bytes(4, byteorder="big", signed=True)
response = req_len + message_bytes
return response
def handle_client(client_conn, addr):
print(f"Accepted connection from {addr}")
with client_conn:
while True:
try:
# Receive message length
message_len = parse_request_length(client_conn.recv(4))
if not message_len:
break
# Receive request (adjusted for correct length)
request_bytes = client_conn.recv(message_len)
if not request_bytes:
break
# Create response
api_key, api_version, id, client_id = parse_request(request_bytes)
print(f"api_key: {api_key}")
print(f"api_version: {api_version}")
print(f"id: {id}")
print(f"client_id: {client_id}")
response = create_response(api_key, api_version, id, client_id)
print(f"response: {response}")
client_conn.sendall(response)
print(f"Response sent")
except Exception as e:
print(f"Error processing request: {e}")
break
def main():
print("Logs from your program will appear here!")
server = socket.create_server((HOST, PORT), reuse_port=True)
print("Listening on port 9092...")
while True:
client_conn, addr = server.accept()
client_thread = threading.Thread(target=handle_client, args=(client_conn, addr))
client_thread.daemon = True
client_thread.start()
if __name__ == "__main__":
main()
Looking forward to your feedback!