I’m stuck on Redis Replication Stage 18.
After receiving the WAIT on the master, I send REPLCONFs to all the replicas. Then I start a thread for every replica, where I try to receive a response from them (within the timeout). In the master’s main thread, it waits for one of the two conditions to fulfil (timeout and min acks) and finally respond with the current number of confirmations.
However, 95% of the time, I don’t get enough the expected number of replica responses before the timeout expires. This is even though I start the timer after sending REPLCONFs to the replicas and starting the receiving threads, to account for the overhead from these functions.
Here some logs:
[replication-18] Testing Replica : 4
[replication-18] Received ["SET", "baz", "789"]
[your_program] recvd from 47954: 99
[replication-18] Received ["REPLCONF", "GETACK", "*"]
[your_program] timeout from 47956 with 500ms
[your_program] timeout from 47944 with 500ms
[your_program] timeout from 47930 with 2000ms
[your_program] timeout from 47956 with 2000ms
[your_program] timeout from 47954 with 2000ms
[your_program] Exception in thread Thread-1 (respond):
[replication-18] Expected 3, got 1
[replication-18] Test failed (try setting 'debug: true' in your codecrafters.yml to see more details)
[your_program] Traceback (most recent call last):
[your_program] File "/usr/local/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
And here’s a snippet of my code:
Handling the WAITs
min_acks, timeout = int(params[1]), int(params[2])
confirmed = MutableInteger()
required_offset = writes.offset + (writes.num - 1) * ACK_BYTES_SIZE
for repl_conn in replicas:
repl_conn.send(encode_resp(["REPLCONF", "GETACK", "*"]))
for repl_conn in replicas:
t = threading.Thread(
target=recv_repl_ack,
args=(repl_conn, required_offset, confirmed, timeout),
)
t.start()
end = time.time() + (timeout / 1000)
while (time.time() < end) and (confirmed.val < min_acks):
time.sleep(0.2)
conn.send(integer(confirmed.val))
Receiving acks from replicas
def recv_repl_ack(
conn: socket.socket,
required_offset: int,
confirmed: MutableInteger,
timeout: int,
):
try:
conn.settimeout(timeout / 1000)
response = conn.recv(DEFAULT_RECV_BUFFER)
except TimeoutError:
print(f"timeout from {conn.getpeername()[1]} with {timeout}ms")
else:
response = decode_resp(response)[0]
offset = int(response[2])
print(f"recvd from {conn.getpeername()[1]}: {offset}")
if offset >= required_offset:
confirmed.val += 1
finally:
conn.settimeout(None)
return
Extras:
class MutableInteger:
def __init__(self) -> None:
self.val = 0