I’m stuck at this stage( Parse compressed packet)
I’ve tried … (mention what you’ve tried so far).
import socket
import struct
import sys
# DNS header format and size
DNS_HEADER_FORMAT = "!HHHHHH"
DNS_HEADER_SIZE = 12
# Constants for DNS query and response types
DNS_QUERY = 0
DNS_RESPONSE = 1
DNS_OPCODE_QUERY = 0
DNS_RCODE_NOERROR = 0
COMPRESSED_NAME_FLAG = 0xC0 # DNS compression pointer flag
def decode_dns_name(data, offset):
"""
Decode a DNS name from the query packet (handles compression).
:param data: The DNS query data.
:param offset: The current offset in the query packet.
:return: The decoded domain name and the new offset in the packet.
"""
labels = []
while True:
if offset >= len(data):
raise IndexError("Offset exceeds packet length")
length = data[offset]
offset += 1
if length == 0:
break # End of domain name
if length & COMPRESSED_NAME_FLAG == COMPRESSED_NAME_FLAG: # Pointer (compression)
if offset + 1 >= len(data):
raise IndexError("Pointer offset exceeds packet length")
pointer_offset = struct.unpack('!H', data[offset:offset + 2])[0] & 0x3FFF
if pointer_offset >= len(data):
raise IndexError("Pointer offset is outside packet bounds")
# Recursively decode the name from the pointer offset
labels.append(decode_dns_name(data, pointer_offset)[0])
offset += 2 # Move past the pointer
break
else:
# Normal label, just decode the label and move offset
label = data[offset + 1:offset + 1 + length].decode('utf-8', errors='ignore')
labels.append(label)
offset += 1 + length # Move past the label and its length byte
domain_name = '.'.join(labels)
return domain_name, offset
def parse_dns_header(data):
"""
Parse the DNS header from the data.
:param data: The DNS query data.
:return: A tuple of (id, flags, qdcount, ancount, nscount, arcount).
"""
if len(data) < DNS_HEADER_SIZE:
raise ValueError("DNS packet is too short to contain a valid header")
header = struct.unpack(DNS_HEADER_FORMAT, data[:DNS_HEADER_SIZE])
return header
def handle_dns_query(data, addr):
"""
Handle the incoming DNS query.
:param data: The DNS query data.
:param addr: The client address from which the query was received.
:return: The DNS response or None if there is an error.
"""
try:
# Parse the DNS header
id, flags, qdcount, ancount, nscount, arcount = parse_dns_header(data)
except Exception as e:
print(f"Error parsing DNS header: {e}")
return None
offset = DNS_HEADER_SIZE
questions = []
try:
# Decode all the questions in the query
for _ in range(qdcount):
domain_name, offset = decode_dns_name(data, offset)
qtype, qclass = struct.unpack("!HH", data[offset:offset + 4])
questions.append((domain_name, qtype, qclass))
offset += 4 # Move past QTYPE and QCLASS fields
except Exception as e:
print(f"Error decoding DNS query: {e}")
return None
# Construct the DNS response
response = bytearray()
# Add the DNS header to the response
response.extend(struct.pack(DNS_HEADER_FORMAT, id, DNS_RESPONSE << 15, DNS_OPCODE_QUERY << 11, qdcount, ancount, nscount, arcount))
# Add the questions section to the response
for domain_name, qtype, qclass in questions:
response.extend(encode_dns_name(domain_name))
response.extend(struct.pack("!HH", qtype, qclass))
# Add a simple answer (mocked) to the response
for domain_name, qtype, qclass in questions:
response.extend(encode_dns_name(domain_name))
response.extend(struct.pack("!HH", qtype, qclass))
response.extend(struct.pack("!I", 300)) # TTL
response.extend(struct.pack("!H", 4)) # Data length (IPv4 address)
response.extend(struct.pack("!4B", 192, 168, 1, 1)) # Mocked answer IP address
return bytes(response)
def encode_dns_name(domain_name):
"""
Encode a domain name as DNS-compatible format (including compression).
:param domain_name: The domain name to encode.
:return: The encoded domain name in byte format.
"""
labels = domain_name.split('.')
encoded_name = bytearray()
for label in labels:
length = len(label)
encoded_name.append(length)
encoded_name.extend(label.encode('utf-8'))
encoded_name.append(0) # Null byte at the end of the name
return bytes(encoded_name)
def start_dns_server():
"""
Start the DNS server and handle incoming queries.
"""
server_address = ('127.0.0.1', 2053)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(server_address)
print(f"DNS server running on {server_address}")
while True:
try:
# Receive DNS query from client
data, addr = server_socket.recvfrom(512) # 512-byte buffer size for DNS queries
print(f"Received query from {addr}")
# Handle the query
response = handle_dns_query(data, addr)
if response:
# Send DNS response to client
server_socket.sendto(response, addr)
print(f"Sent response to {addr}")
else:
print(f"Error handling query from {addr}")
except Exception as e:
print(f"Error in server loop: {e}")
if __name__ == "__main__":
try:
start_dns_server()
except KeyboardInterrupt:
print("DNS server stopped manually.")
sys.exit(0)
Logs:
remote: [tester::#YC9] Running tests for Stage #YC9 (Parse compressed packet)
remote: [tester::#YC9] Starting DNS server on 127.0.0.1:2053
remote: [tester::#YC9] Running program
remote: [tester::#YC9] DNS resolver listening on 127.0.0.1:5354
remote: [tester::#YC9] Connecting to 127.0.0.1:2053 using UDP
remote: [your_program] DNS server running on ('127.0.0.1', 2053)
remote: [your_program] Received query from ('127.0.0.1', 47046)
remote: [your_program] Error decoding DNS query: Offset exceeds packet length
remote: [your_program] Error handling query from ('127.0.0.1', 47046)
remote: [tester::#YC9] Did not receive response from DNS server, retrying
remote: [tester::#YC9] read udp 127.0.0.1:47046->127.0.0.1:2053: i/o timeout
remote: [your_program] Received query from ('127.0.0.1', 47459)
remote: [your_program] Error decoding DNS query: Offset exceeds packet length
remote: [your_program] Error handling query from ('127.0.0.1', 47459)
remote: [tester::#YC9] Did not receive response from DNS server, retrying
remote: [tester::#YC9] read udp 127.0.0.1:47459->127.0.0.1:2053: i/o timeout
remote: [your_program] Received query from ('127.0.0.1', 43681)
remote: [your_program] Error decoding DNS query: Offset exceeds packet length
remote: [your_program] Error handling query from ('127.0.0.1', 43681)
remote: [tester::#YC9] Shutting down DNS resolver server...
remote: [tester::#YC9] read udp 127.0.0.1:43681->127.0.0.1:2053: i/o timeout
remote: [tester::#YC9] Test failed
remote: [tester::#YC9] Terminating program
remote: [tester::#YC9] Program terminated successfully