Issue with Adding DescribeTopicPartitions API to ApiVersions Response

Hi everyone,

I’m currently working on the task to implement the ApiVersions response and need to add the entry for the DescribeTopicPartitions API (API key 75) in the response.

Below is my current implementation, and I’m facing some issues. The logic seems almost correct, but the response body format doesn’t seem to be accepted in the test, specifically regarding the number of API entries and the handling of the DescribeTopicPartitions API.

import socket  # noqa: F401
import threading

HOST = 'localhost'
PORT = 9092

SUPPORTED_API_VERSIONS = set(range(0, 5))
SUPPORTED_API_KEYS = {18, 75}  # APIVersions (18), DescribeTopicPartitions (75)
UNSUPPORTED_VERSION_ERROR_CODE = 35
UNSUPPORTED_API_KEY_ERROR_CODE = 35  # Using same error code for consistency
NO_ERROR = 0

def parse_request_length(header: bytes) -> int:
    return int.from_bytes(header, byteorder="big", signed=True)

def parse_request(request: bytes):
    request_api_key = int.from_bytes(request[:2], byteorder="big", signed=True)
    request_api_version = int.from_bytes(request[2:4], byteorder="big", signed=True)
    correlation_id = int.from_bytes(request[4:8], byteorder="big", signed=True)
    # Parse client_id as a Kafka STRING (2-byte length + UTF-8 bytes)
    client_id_len = int.from_bytes(request[8:10], byteorder="big", signed=True)
    client_id = request[10:10 + client_id_len].decode("utf-8")
    return request_api_key, request_api_version, correlation_id, client_id

def create_response(api_key, api_version, id, client_id):
    message_bytes = id.to_bytes(4, byteorder="big", signed=True)
    throttle_time_ms = 0
    tag_buffer = b"\x00"  # UNSIGNED_VARINT for empty tag buffer

    # Check if API key is supported
    if api_key not in SUPPORTED_API_KEYS:
        error_bytes = UNSUPPORTED_API_KEY_ERROR_CODE.to_bytes(2, byteorder="big", signed=True)
        message_bytes += error_bytes
        # Minimal response: no API entries, just throttle time and tag buffer
        message_bytes += int(0).to_bytes(4, byteorder="big", signed=True)  # Empty API array
        message_bytes += throttle_time_ms.to_bytes(4, byteorder="big", signed=True)
        message_bytes += tag_buffer
    else:
        # Handle supported API keys
        if api_version in SUPPORTED_API_VERSIONS:
            error_bytes = NO_ERROR.to_bytes(2, byteorder="big", signed=True)
        else:
            error_bytes = UNSUPPORTED_VERSION_ERROR_CODE.to_bytes(2, byteorder="big", signed=True)

        message_bytes += error_bytes
        # Number of API entries (2: one for APIVersions, one for DescribeTopicPartitions)
        message_bytes += int(0).to_bytes(1, byteorder="big", signed=True)
        
        # API entry for APIVersions (API key 18)
        message_bytes += int(18).to_bytes(2, byteorder="big", signed=True)  # API key
        message_bytes += int(0).to_bytes(2, byteorder="big", signed=True)   # Min version
        message_bytes += int(4).to_bytes(2, byteorder="big", signed=True)   # Max version
        message_bytes += tag_buffer

        # API entry for DescribeTopicPartitions (API key 75)
        message_bytes += int(75).to_bytes(2, byteorder="big", signed=True)  # API key
        message_bytes += int(0).to_bytes(2, byteorder="big", signed=True)   # Min version
        message_bytes += int(0).to_bytes(2, byteorder="big", signed=True)   # Max version
        message_bytes += tag_buffer

        # Throttle time and final tag buffer
        message_bytes += throttle_time_ms.to_bytes(4, byteorder="big", signed=True)
        message_bytes += tag_buffer

    req_len = len(message_bytes).to_bytes(4, byteorder="big", signed=True)
    response = req_len + message_bytes
    return response

def handle_client(client_conn, addr):
    print(f"Accepted connection from {addr}")

    with client_conn:
        while True:
            try:
                # Receive message length
                message_len = parse_request_length(client_conn.recv(4))
                if not message_len:
                    break
                # Receive request (adjusted for correct length)
                request_bytes = client_conn.recv(message_len)
                if not request_bytes:
                    break
                # Create response
                api_key, api_version, id, client_id = parse_request(request_bytes)
                print(f"api_key: {api_key}")
                print(f"api_version: {api_version}")
                print(f"id: {id}")
                print(f"client_id: {client_id}")
                response = create_response(api_key, api_version, id, client_id)
                print(f"response: {response}")
                client_conn.sendall(response)
                print(f"Response sent")
            except Exception as e:
                print(f"Error processing request: {e}")
                break

def main():
    print("Logs from your program will appear here!")

    server = socket.create_server((HOST, PORT), reuse_port=True)
    print("Listening on port 9092...")

    while True:
        client_conn, addr = server.accept()
        client_thread = threading.Thread(target=handle_client, args=(client_conn, addr))
        client_thread.daemon = True
        client_thread.start()

if __name__ == "__main__":
    main()

Looking forward to your feedback!

Hi everyone,

I’ve cleared the tests for this stage, but I’ve encountered a bit of confusion regarding the num_api_keys field in the ApiVersions response.

In the code snippet below, I initially had num_api_keys set to 0. However, I modified it to 2, since we currently support API key 18 (APIVersions) and API key 75 (DescribeTopicPartitions).

message_bytes += int(3).to_bytes(1, byteorder="big", signed=True)

Now, the tests are expecting 3 instead of 2. I’m unclear on why we need to pass 3 when we only support two API keys.

Could there be an implicit third API key that’s required, or is there something in Kafka’s protocol or the test framework that’s expecting this third entry? Has anyone else faced this situation?

Looking forward to your insights!

Thanks!

That is the nature of the encoding scheme used by Kafka. Most of the arrays used in the requests and responses are compact arrays, whose encoding follows these rules:

  • If the array is NULL, just put a 0 byte.
  • Otherwise, it the length of the array is N, put N + 1 into the stream encoded as an UNSIGNED_VARINT. Then you should put the bytes of each of the N instances in order.
1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.