DNS Parse compressed packet

I’m stuck at this stage( Parse compressed packet)

I’ve tried … (mention what you’ve tried so far).

import socket
import struct
import sys

# DNS header format and size
DNS_HEADER_FORMAT = "!HHHHHH"
DNS_HEADER_SIZE = 12

# Constants for DNS query and response types
DNS_QUERY = 0
DNS_RESPONSE = 1
DNS_OPCODE_QUERY = 0
DNS_RCODE_NOERROR = 0
COMPRESSED_NAME_FLAG = 0xC0  # DNS compression pointer flag

def decode_dns_name(data, offset):
    """
    Decode a DNS name from the query packet (handles compression).
    :param data: The DNS query data.
    :param offset: The current offset in the query packet.
    :return: The decoded domain name and the new offset in the packet.
    """
    labels = []
    while True:
        if offset >= len(data):
            raise IndexError("Offset exceeds packet length")
        
        length = data[offset]
        offset += 1

        if length == 0:
            break  # End of domain name

        if length & COMPRESSED_NAME_FLAG == COMPRESSED_NAME_FLAG:  # Pointer (compression)
            if offset + 1 >= len(data):
                raise IndexError("Pointer offset exceeds packet length")
            pointer_offset = struct.unpack('!H', data[offset:offset + 2])[0] & 0x3FFF
            if pointer_offset >= len(data):
                raise IndexError("Pointer offset is outside packet bounds")
            # Recursively decode the name from the pointer offset
            labels.append(decode_dns_name(data, pointer_offset)[0])
            offset += 2  # Move past the pointer
            break
        else:
            # Normal label, just decode the label and move offset
            label = data[offset + 1:offset + 1 + length].decode('utf-8', errors='ignore')
            labels.append(label)
            offset += 1 + length  # Move past the label and its length byte

    domain_name = '.'.join(labels)
    return domain_name, offset

def parse_dns_header(data):
    """
    Parse the DNS header from the data.
    :param data: The DNS query data.
    :return: A tuple of (id, flags, qdcount, ancount, nscount, arcount).
    """
    if len(data) < DNS_HEADER_SIZE:
        raise ValueError("DNS packet is too short to contain a valid header")
    
    header = struct.unpack(DNS_HEADER_FORMAT, data[:DNS_HEADER_SIZE])
    return header

def handle_dns_query(data, addr):
    """
    Handle the incoming DNS query.
    :param data: The DNS query data.
    :param addr: The client address from which the query was received.
    :return: The DNS response or None if there is an error.
    """
    try:
        # Parse the DNS header
        id, flags, qdcount, ancount, nscount, arcount = parse_dns_header(data)
    except Exception as e:
        print(f"Error parsing DNS header: {e}")
        return None

    offset = DNS_HEADER_SIZE

    questions = []
    try:
        # Decode all the questions in the query
        for _ in range(qdcount):
            domain_name, offset = decode_dns_name(data, offset)
            qtype, qclass = struct.unpack("!HH", data[offset:offset + 4])
            questions.append((domain_name, qtype, qclass))
            offset += 4  # Move past QTYPE and QCLASS fields
    except Exception as e:
        print(f"Error decoding DNS query: {e}")
        return None

    # Construct the DNS response
    response = bytearray()

    # Add the DNS header to the response
    response.extend(struct.pack(DNS_HEADER_FORMAT, id, DNS_RESPONSE << 15, DNS_OPCODE_QUERY << 11, qdcount, ancount, nscount, arcount))

    # Add the questions section to the response
    for domain_name, qtype, qclass in questions:
        response.extend(encode_dns_name(domain_name))
        response.extend(struct.pack("!HH", qtype, qclass))

    # Add a simple answer (mocked) to the response
    for domain_name, qtype, qclass in questions:
        response.extend(encode_dns_name(domain_name))
        response.extend(struct.pack("!HH", qtype, qclass))
        response.extend(struct.pack("!I", 300))  # TTL
        response.extend(struct.pack("!H", 4))    # Data length (IPv4 address)
        response.extend(struct.pack("!4B", 192, 168, 1, 1))  # Mocked answer IP address

    return bytes(response)

def encode_dns_name(domain_name):
    """
    Encode a domain name as DNS-compatible format (including compression).
    :param domain_name: The domain name to encode.
    :return: The encoded domain name in byte format.
    """
    labels = domain_name.split('.')
    encoded_name = bytearray()
    for label in labels:
        length = len(label)
        encoded_name.append(length)
        encoded_name.extend(label.encode('utf-8'))
    encoded_name.append(0)  # Null byte at the end of the name
    return bytes(encoded_name)

def start_dns_server():
    """
    Start the DNS server and handle incoming queries.
    """
    server_address = ('127.0.0.1', 2053)
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server_socket.bind(server_address)
    print(f"DNS server running on {server_address}")

    while True:
        try:
            # Receive DNS query from client
            data, addr = server_socket.recvfrom(512)  # 512-byte buffer size for DNS queries
            print(f"Received query from {addr}")

            # Handle the query
            response = handle_dns_query(data, addr)

            if response:
                # Send DNS response to client
                server_socket.sendto(response, addr)
                print(f"Sent response to {addr}")
            else:
                print(f"Error handling query from {addr}")
        except Exception as e:
            print(f"Error in server loop: {e}")

if __name__ == "__main__":
    try:
        start_dns_server()
    except KeyboardInterrupt:
        print("DNS server stopped manually.")
        sys.exit(0)

Logs:

remote: [tester::#YC9] Running tests for Stage #YC9 (Parse compressed packet)
remote: [tester::#YC9] Starting DNS server on 127.0.0.1:2053
remote: [tester::#YC9] Running program
remote: [tester::#YC9] DNS resolver listening on 127.0.0.1:5354
remote: [tester::#YC9] Connecting to 127.0.0.1:2053 using UDP
remote: [your_program] DNS server running on ('127.0.0.1', 2053)
remote: [your_program] Received query from ('127.0.0.1', 47046)
remote: [your_program] Error decoding DNS query: Offset exceeds packet length
remote: [your_program] Error handling query from ('127.0.0.1', 47046)
remote: [tester::#YC9] Did not receive response from DNS server, retrying
remote: [tester::#YC9] read udp 127.0.0.1:47046->127.0.0.1:2053: i/o timeout
remote: [your_program] Received query from ('127.0.0.1', 47459)
remote: [your_program] Error decoding DNS query: Offset exceeds packet length
remote: [your_program] Error handling query from ('127.0.0.1', 47459)
remote: [tester::#YC9] Did not receive response from DNS server, retrying
remote: [tester::#YC9] read udp 127.0.0.1:47459->127.0.0.1:2053: i/o timeout
remote: [your_program] Received query from ('127.0.0.1', 43681)
remote: [your_program] Error decoding DNS query: Offset exceeds packet length
remote: [your_program] Error handling query from ('127.0.0.1', 43681)
remote: [tester::#YC9] Shutting down DNS resolver server...
remote: [tester::#YC9] read udp 127.0.0.1:43681->127.0.0.1:2053: i/o timeout
remote: [tester::#YC9] Test failed
remote: [tester::#YC9] Terminating program
remote: [tester::#YC9] Program terminated successfully

@Roshan2115 I’ve edited your post with Markdown formatting.

To make code and logs easier to read, you can use triple backticks (```) to format them, like this:

Closing this thread due to inactivity. If you still need assistance, feel free to reopen or start a new discussion!

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.