I stuck on this stage(header answer section).
I tried so far :
import socket
import struct
class DomainLabel:
def __init__(self, name):
self.name = name
self.len = len(name)
def bytes(self):
return bytes([self.len]) + self.name.encode()
class LabelSequence:
def __init__(self, labels):
self.labels = labels
def bytes(self):
qName = bytearray()
for label in self.labels:
qName.extend(label.bytes())
qName.append(0)
return bytes(qName)
class DNSAnswer:
def __init__(self, name, qtype, qclass, ttl, data):
self.name = name
self.type = qtype
self.class_ = qclass
self.ttl = ttl
self.len = len(data)
self.data = data
def bytes(self):
ansName = self.name.bytes()
ansType = struct.pack('!H', self.type)
ansClass = struct.pack('!H', self.class_)
ansTTL = struct.pack('!I', self.ttl)
ansLen = struct.pack('!H', self.len)
return ansName + ansType + ansClass + ansTTL + ansLen + self.data
domain_to_ip = {
"example.com": "192.0.2.1",
"www.example.com": "192.0.2.1",
"google.com": "142.250.64.14",
"github.com": "140.82.112.3",
}
def main():
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(("127.0.0.1", 2053))
print("DNS server listening on 127.0.0.1:2053")
while True:
try:
buf, source = udp_socket.recvfrom(512)
print(f"Received data from: {source}")
response = build_dns_response(buf)
udp_socket.sendto(response, source)
print(f"Sent response to: {source}")
except Exception as e:
print(f"Error receiving data: {e}")
continue
def build_dns_response(query):
id, flags, qdcount, _, _, _ = struct.unpack("!HHHHHH", query[:12])
question_name, qtype, qclass = parse_question_section(query[12:])
ip_address = domain_to_ip.get(question_name)
if ip_address is None:
print(f"No record found for {question_name}")
return build_error_response(id, 3)
answer_section = build_answer_section(question_name, qtype, ip_address)
qr = 1
opcode = (flags >> 11) & 0b1111
aa = 1
tc = 0
rd = (flags >> 8) & 0b1
ra = 0
z = 0
rcode = 0
response_flags = (qr << 15) | (opcode << 11) | (aa << 10) | (tc << 9) | (rd << 8) | (ra << 7) | rcode
header = struct.pack("!HHHHHH",
id,
response_flags,
qdcount,
1,
0,
0)
response = header + query[12:] + answer_section
print(f"Built DNS Response:\n{response.hex()}")
return response
def parse_question_section(question):
labels = []
idx = 0
while question[idx] != 0:
length = question[idx]
labels.append(question[idx + 1: idx + 1 + length])
idx += length + 1
question_name = b".".join(labels).decode('utf-8')
qtype, qclass = struct.unpack("!HH", question[idx + 1: idx + 5])
return question_name, qtype, qclass
def build_answer_section(domain_name, qtype, ip_address):
if qtype == 1:
packed_ip = socket.inet_aton(ip_address)
answer = DNSAnswer(
name=LabelSequence([DomainLabel(domain_name)]),
qtype=1,
qclass=1,
ttl=60,
data=packed_ip
)
else:
print(f"Unsupported query type: {qtype}")
return b''
return answer.bytes()
def build_error_response(query_id, rcode):
response_flags = 0b10000000 | rcode
header = struct.pack("!HHHHHH", query_id, response_flags, 1, 0, 0, 0)
return header
if __name__ == "__main__":
main()
Error:
remote: [your_program] Received data from: ('127.0.0.1', 42373)
remote: [your_program] No record found for codecrafters.io
remote: [your_program] Sent response to: ('127.0.0.1', 42373)
remote: [tester::#HD8] Querying: ;reddit.com. IN A
remote: [tester::#HD8] Sending Request: (Messages with >>> prefix are part of this log)
remote: [tester::#HD8] >>> ;; opcode: QUERY, status: NOERROR, id: 13890
remote: [tester::#HD8] >>> ;; flags: rd; QUERY: 1, ANSWER: 0, AUTHORITY: 0, ADDITIONAL: 0
remote: [tester::#HD8] >>>
remote: [tester::#HD8] >>> ;; QUESTION SECTION:
remote: [tester::#HD8] >>> ;reddit.com. IN A
remote: [tester::#HD8] >>>
remote: [your_program] Received data from: ('127.0.0.1', 40356)
remote: [your_program] No record found for reddit.com
remote: [your_program] Sent response to: ('127.0.0.1', 40356)
remote: [tester::#HD8] Shutting down DNS resolver server...
remote: [tester::#HD8] Expected QR field to be set to 1. 1 indicates that it is a response. Got 0
remote: [tester::#HD8] Test failed
remote: [tester::#HD8] Terminating program
remote: [tester::#HD8] Program terminated successfully