'bytes' object has no attribute 'to_bytes'

I’m stuck on Stage #(change to your stage, ex. #JM3).

I’ve tried .. (mention what you’ve tried so far).

Here are my logs:

Running tests for Stage #VT6 (Listing Partitions - List for an unknown topic)
remote: [tester::#VT6] $ ./your_program.sh /tmp/server.properties
remote: [tester::#VT6] Connecting to broker at: localhost:9092
remote: [your_program] Logs from your program will appear here!
remote: [your_program] Server is running on port 9092...
remote: [tester::#VT6] Connection to broker at localhost:9092 successful
remote: [tester::#VT6] Sending "DescribeTopicPartitions" (version: 0) request (Correlation id: 435372330)
remote: [tester::#VT6] Hexdump of sent "DescribeTopicPartitions" request:
remote: [tester::#VT6] Idx  | Hex                                             | ASCII
remote: [tester::#VT6] -----+-------------------------------------------------+-----------------
remote: [tester::#VT6] 0000 | 00 00 00 31 00 4b 00 00 19 f3 41 2a 00 0c 6b 61 | ...1.K....A*..ka
remote: [tester::#VT6] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 12 75 6e 6b | fka-tester...unk
remote: [tester::#VT6] 0020 | 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 61 7a 00 00 | nown-topic-saz..
remote: [tester::#VT6] 0030 | 00 00 01 ff 00                                  | .....
remote: [tester::#VT6]
remote: [your_program] Accepted connection from ('127.0.0.1', 46550)
remote: [your_program] b'\x19\xf3A*' 435372330
remote: [your_program] 12
remote: [your_program] Error handling client: 'bytes' object has no attribute 'to_bytes'
remote: [tester::#VT6] EOF
remote: [tester::#VT6] Test failed
remote: [tester::#VT6] Terminating program
remote: [tester::#VT6] Program terminated successfully

And here’s a snippet of my code:

import socket
import threading

# Encoding helper functions
def encode_unsigned_varint(value):
    """Encode an integer as UNSIGNED_VARINT."""
    encoded = []
    while value > 0x7F:
        encoded.append((value & 0x7F) | 0x80)
        value >>= 7
    encoded.append(value)
    return bytes(encoded)

def encode_compact_array(items):
    """Encode a COMPACT_ARRAY."""
    if items is None:
        return b'\x00'  # Null array
    encoded_items = b"".join(items)
    encoded_length = encode_unsigned_varint(len(items) + 1)
    return encoded_length + encoded_items

def encode_string(value):
    """Encode a string with its length as UNSIGNED_VARINT."""
    encoded_length = encode_unsigned_varint(len(value) + 1)
    return encoded_length + value.encode("utf-8")

def encode_uuid():
    """Encode a UUID as 16 zero bytes."""
    return b'\x00' * 16

# Request processing helpers
def get_corr_id(data):
    """Extract the correlation ID from the incoming data."""
    corr_id = data[8:12]
    print(corr_id, int.from_bytes(corr_id, byteorder="big"))
    return corr_id

def check_api_version(data):
    """Check the API version and return the appropriate error code."""
    err_code = 35
    api_version = int.from_bytes(data[6:8], byteorder="big")
    if 0 <= api_version <= 4:
        err_code = 0
    return err_code.to_bytes(2, byteorder="big")

def get_topic_and_partition(data):
    """Extract topic name and partition index from the DescribeTopicPartitions request."""
    topic_length = int.from_bytes(data[12:14], byteorder='big')
    topic_name = data[14:14 + topic_length].decode("utf-8")
    partition_index = int.from_bytes(data[14 + topic_length:14 + topic_length + 4], byteorder='big')
    return topic_name, partition_index

def response_api_key_75(id, cursor, array_length, length, topic_name):
    tag_buffer = b"\x00"
    response_header = id.to_bytes(4, byteorder="big") + tag_buffer
    error_code = int(3).to_bytes(2, byteorder="big")
    throttle_time_ms = int(0).to_bytes(4, byteorder="big")
    is_internal = int(0).to_bytes(1, byteorder="big")
    topic_authorized_operations = b"\x00\x00\x0d\xf8"
    topic_id = int(0).to_bytes(16, byteorder="big")
    partition_array = b"\x01"
    array_length = array_length if isinstance(array_length, int) else int(array_length)
    length = length if isinstance(length, int) else int(length)
    response_body = (
        throttle_time_ms
        + int(array_length).to_bytes(1, byteorder="big")
        + error_code
        + int(length).to_bytes(1, byteorder="big")
        + topic_name
        + topic_id
        + is_internal
        + partition_array
        + topic_authorized_operations
        + tag_buffer
        + cursor
        + tag_buffer
    )
    total_len = len(response_header) + len(response_body)
    return int(total_len).to_bytes(4, byteorder="big") + response_header + response_body

def create_msg(id, api_key: int, error_code: int = 0):
    response_header = id.to_bytes(4, byteorder="big")
    err = error_code.to_bytes(2, byteorder="big")

# Client handling
def handle_client(addr, conn):
    """
    Handle incoming client requests:
    - Decode data.
    - Construct a response according to Kafka protocol.
    """
    try:
        # # Simulate known topics and their partitions
        # topics = ["example_topic"]
        # response_partition_limit = 10
        # cursor_data = {
        #     "topic_name": "example_topic",
        #     "partition_index": 2,
        # }
        
 
        # topics_encoded=b""
        # for topic in topics:
        #     topic_encoded=(
        #         encode_string(topic)+b"\x00"
        #     )
        #     topics_encoded+=topic_encoded

        while True:
            # Wait for data from the client
            data = conn.recv(1024)
            if not data:  # Client disconnected
                print(f"Client {addr} disconnected")
                break

            # Step 1: Extract correlation ID and check API version
            api_key = int.from_bytes(data[4:6], byteorder="big")
            corr_id = get_corr_id(data)
            error_code = check_api_version(data)

            # Step 2: Extract topic name and partition index
            topic_name, partition_index = get_topic_and_partition(data)

            # Step 3: Validate topic and partition
            # if topic_name not in topics or partition_index >= topics.get(topic_name, 0):
            #     error_code = int(3).to_bytes(2, byteorder='big')

            # Step 4: Encode the topic response
            encoded_topic_name = encode_string(topic_name)
            topic_id = encode_uuid()
            partitions = encode_unsigned_varint(0)
            response_unknown = error_code + encoded_topic_name + topic_id + partitions

            # Adjust response size
            response_size = len(response_unknown)
            response_unknown = (
                response_size.to_bytes(4, byteorder='big')
                + response_unknown)

            # Step 5: Construct the API keys array
            api_keys = [
                int(18).to_bytes(2, byteorder="big") +  # INT16 api_key
                int(0).to_bytes(2, byteorder="big") +   # INT16 min_version
                int(4).to_bytes(2, byteorder="big") +   # INT16 max_version
                int(0).to_bytes(1, byteorder="big"),    # Extra byte (compact array format)

                int(75).to_bytes(2, byteorder="big") +  # INT16 api_key
                int(0).to_bytes(2, byteorder="big") +   # INT16 min_version
                int(0).to_bytes(2, byteorder="big") +   # INT16 max_version
                int(0).to_bytes(1, byteorder="big")     # Extra byte (compact array format)
            ]

            compact_api_keys = encode_compact_array(api_keys)

            # Step 6: Combine response components
            response = (
                (4 + len(corr_id) + len(error_code) + len(compact_api_keys) + 1).to_bytes(4, byteorder="big")  # Response size
                + corr_id
                + error_code
                + compact_api_keys  # COMPACT_ARRAY for API keys
                + int(0).to_bytes(4, byteorder="big")  # Throttle time
            )

            if api_key == 75:
                client_id_len = int.from_bytes(data[12:14], byteorder="big", signed=False)
                print(client_id_len)
                if client_id_len > 0:
                    cliend_id = data[14 : 14 + client_id_len]
                    tagged = data[14 + client_id_len]
                else:                                                       
                    cliend_id = ""
                    tagged = [14]

                array_len_finder = 14 + client_id_len + 1
                array_length = data[array_len_finder]
                topic_name_length = data[array_len_finder + 1]
                topic_name_starter = array_len_finder + 2
                topic_name = bytes(
                    data[
                        topic_name_starter : topic_name_starter
                        + (topic_name_length - 1)
                    ]
                )
                cursor_length = topic_name_starter + topic_name_length + 4
                cursor = data[cursor_length]
                cursor_bytes = cursor if isinstance(cursor, bytes) else int(cursor).to_bytes(1, byteorder="big")
                response = response_api_key_75(
                    corr_id,
                    cursor_bytes,
                    array_length,
                    topic_name_length,
                    topic_name,
                )
                conn.sendall(response)
            else:
                version = {0, 1, 2, 3, 4}
                errorr_code = 0 if error_code in version else 35
                response = create_msg(corr_id,api_key, errorr_code)
                conn.sendall(response)


            # Step 7: Send the response back to the client
            # conn.sendall(response_unknown)
    except Exception as e:
        print(f"Error handling client: {e}")
    finally:
        conn.close()

# Server setup
def main():
    """
    Main function to set up the server:
    - Listens on port 9092.
    - Accepts multiple client connections using threads.
    """
    print("Logs from your program will appear here!")

    # Create a server socket on localhost and port 9092
    server = socket.create_server(("localhost", 9092), reuse_port=True)
    print("Server is running on port 9092...")

    try:
        while True:
            # Accept a new client connection
            conn, addr = server.accept()
            print(f"Accepted connection from {addr}")

            # Handle the client in a separate thread
            client_thread = threading.Thread(target=handle_client, args=(addr, conn))
            client_thread.start()
    except KeyboardInterrupt:
        print("Shutting down server...")
    finally:
        server.close()

if __name__ == "__main__":
    main()

Hey @faith-in-het, could you upload your code to GitHub and share the link? It will be much easier to debug if I can run it directly.

@faith-in-het I tried running your code against the previous stages, but it’s actually no longer passing a previous stage #NV3 (Send Correlation ID).

Suggestions:

  1. Use our CLI to test against previous stages by running:
codecrafters test --previous
  1. Focus on fixing the early stages first, as later stages depend on them.

Closing this thread due to inactivity. If you still need assistance, feel free to reopen or start a new discussion!