I’m stuck on Stage #Blocking reads#bs1
I’ve tried … Debug with print statements and AI, but I had no success.
Here are my logs:
include relevant logs here ([tester::#BS1] [client-2] ✔︎ Received "0-2"
[tester::#BS1] Expecting 1 client to receive response of XREAD command
[tester::#BS1] [client-1] Received bytes: "$-1\r\n"
[tester::#BS1] [client-1] Received RESP null bulk string: "$-1\r\n"
[tester::#BS1] Expected array, got null bulk string
[tester::#BS1] Test failed)
And here’s a snippet of my code:
include relevant code here:
This is the logic in my xadd method: (async def handle_xadd(self, writer, commands, lock):
try:
if len(commands) < 5:
writer.write(b'-ERR command need a least 4 arguments \r\n')
await writer.drain()
return
key = commands[1]
id = None
if commands[2] == "*":
id = int(time.time()) * 1000
id = f'{id}-0'
elif commands[2] == "0-*":
id = "0-1"
else:
id = commands[2]
args_key = commands[3]
args_val = commands[4]
# split id for auto generated id
id_clone = id
milli_sec, seq_num = id.split("-")
if id == "0-0":
writer.write(b'-ERR The ID specified in XADD must be greater than 0-0\r\n')
await writer.drain()
return
obj = {'args_key': args_key, 'args_val': args_val}
entry = OrderedDict(obj)
try:
async with lock:
if key in self.stream_data:
old_id = list(self.stream_data[key].keys())[-1]
is_id_valid = await self.validate_id(old_id,commands[2], writer)
if not is_id_valid:
return
id = is_id_valid
self.stream_data[key][id] = entry
# send the id back in bulk
# add functionality for coroutine asyncio.Event()
# Unblock the correct number of clients
if key in self.blocked_clients and self.blocked_clients[key]:
ms, seq_num = map(int, id.split("-"))
clients_to_notify = []
still_waiting = []
for start_id, client_event in self.blocked_clients[key]:
start_ms, start_seq_num = map(int, start_id.split("-"))
# Check if the new entry comes AFTER the client's start_id
if ms > start_ms or (ms == start_ms and seq_num > start_seq_num):
# This client should be notified
clients_to_notify.append(client_event)
else:
# This client is still waiting
still_waiting.append((start_id, client_event))
# Notify all clients that should be unblocked
for client_event in clients_to_notify:
client_event.set()
# Update the blocked_clients list with only those still waiting
self.blocked_clients[key] = still_waiting
resp = f"${len(id)}\r\n{id}\r\n"
writer.write(resp.encode())
await writer.drain()
elif key not in self.stream_data:
if not await self.validate_first_id(writer, id):
return
self.stream_data[key] = OrderedDict()
self.stream_data[key][id] = entry
resp = f"${len(id)}\r\n{id}\r\n"
writer.write(resp.encode())
await writer.drain()
except TypeError as e:
writer.write(e.encode())
await writer.drain()
except Exception:
writer.write(b'-ERR key does not exist in the stream\r\n')
await writer.drain()
except Exception:
writer.write(b'-ERR problem with XADD processing \r\n')
await writer.drain())
This is the logic in my xread method: (async def handle_xread(self, writer, commands, lock):
keys = None
_ids = None
key = None
start_id = None
block_id = None
timeout = (float(commands[2]) / 1000.0) if "block" in commands else None
if "block" in commands:
# XREAD block 1000 streams stream_key 0-2
_, _, _, _, key, block_id = commands
start_id = block_id # Use same variable name consistently
if len(commands) > 6:
new_commands = commands[4:]
keys = new_commands[:len(new_commands) // 2]
_ids = new_commands[len(new_commands) // 2:]
if len(commands) > 4 and "block" not in commands:
new_commands = commands[2:]
keys = new_commands[:len(new_commands) // 2]
_ids = new_commands[len(new_commands) // 2:]
elif len(commands) == 4:
command, _type, key, start_id = commands
async with lock:
if "block" in commands:
if key in self.stream_data and self.stream_data[key]:
# Check if there are entries AFTER block_id
has_new_data = False
block_ms, block_seq = map(int, block_id.split("-"))
for entry_id in self.stream_data[key]:
entry_ms, entry_seq = map(int, entry_id.split("-"))
if entry_ms > block_ms or (entry_ms == block_ms and entry_seq > block_seq):
has_new_data = True
break
if has_new_data:
# Return immediately with new data
resp = ['*1\r\n'] # Initialize here
format_result = await self.format_xread_output(writer, key, block_id, resp, self.stream_data[key])
if format_result:
resp.append(format_result)
resp_str = "".join(resp)
writer.write(resp_str.encode())
await writer.drain()
return
else:
# No new data, block and wait
if key not in self.blocked_clients:
self.blocked_clients[key] = []
client_event = asyncio.Event()
self.blocked_clients[key].append((block_id, client_event))
try:
wait_task = asyncio.create_task(client_event.wait())
await asyncio.wait_for(wait_task, timeout=timeout)
# After waking up, check for new data AFTER block_id
has_new_data = False
for entry_id in self.stream_data[key]:
entry_ms, entry_seq = map(int, entry_id.split("-"))
if entry_ms > block_ms or (entry_ms == block_ms and entry_seq > block_seq):
has_new_data = True
break
if has_new_data:
format_result = await self.format_xread_output(writer, key, block_id, resp, self.stream_data[key])
if format_result:
resp.append(format_result)
except asyncio.TimeoutError:
if (block_id, client_event) in self.blocked_clients[key]:
self.blocked_clients[key].remove((block_id, client_event))
writer.write(b'$-1\r\n')
await writer.drain()
return
else:
# Stream doesn't exist yet, block and wait
if key not in self.blocked_clients:
self.blocked_clients[key] = []
client_event = asyncio.Event()
self.blocked_clients[key].append((block_id, client_event))
try:
wait_task = asyncio.create_task(client_event.wait())
await asyncio.wait_for(wait_task, timeout=timeout)
# After waking up, return new data
if key in self.stream_data:
format_result = await self.format_xread_output(writer, key, block_id, resp, self.stream_data[key])
if format_result:
resp.append(format_result)
except asyncio.TimeoutError:
if (block_id, client_event) in self.blocked_clients[key]:
self.blocked_clients[key].remove((block_id, client_event))
writer.write(b'$-1\r\n')
await writer.drain()
return
elif len(commands) == 4:
resp = ['*1\r\n']
if key in self.stream_data:
format_result = await self.format_xread_output(writer, key, start_id, resp, self.stream_data[key])
if format_result:
resp.append(format_result)
resp_str = "".join(resp)
writer.write(resp_str.encode())
await writer.drain()
elif len(commands) > 4 and "block" not in commands:
resp = [f'*{len(keys)}\r\n']
for i in range(len(keys)):
key = keys[i]
start_id = _ids[i]
if key in self.stream_data:
format_result = await self.format_xread_output(writer, key, start_id, resp, self.stream_data[key])
if format_result:
resp.append(format_result)
resp_str = "".join(resp)
writer.write(resp_str.encode())
await writer.drain())
and finally this is my formating Resp method: (async def format_xread_output(self, writer, key, start_id, resp, stream):
start_ms, start_seq = map(int, start_id.split("-"))
matching_entries = []
for entry_id in stream.keys():
entry_ms, entry_seq = map(int, entry_id.split("-"))
if entry_ms > start_ms or (entry_ms == start_ms and entry_seq > start_seq):
matching_entries.append((entry_id, stream[entry_id]))
if not matching_entries:
return None
# Build response format:
# *2
# $<key_length>
# <key>
# *<num_entries>
# *2
# $<id_length>
# <id>
# *<num_fields>
# $<field_length>
# <field>
# $<value_length>
# <value>
result = []
# Stream array: *2 (key and entries)
result.append('*2\r\n')
# Key as bulk string
result.append(f'${len(key)}\r\n{key}\r\n')
# Number of entries
result.append(f'*{len(matching_entries)}\r\n')
# Each entry
for entry_id, entry_data in matching_entries:
# Entry is an array of 2: [id, [field, value, ...]]
result.append('*2\r\n')
# Entry ID as bulk string
result.append(f'${len(entry_id)}\r\n{entry_id}\r\n')
# Fields and values array
num_fields = len(entry_data)
result.append(f'*{num_fields * 2}\r\n') # field-value pairs
# Add each field and value
for field, value in entry_data.items():
result.append(f'${len(field)}\r\n{field}\r\n')
result.append(f'${len(value)}\r\n{value}\r\n')
return ''.join(result))
I use claude to help with the formatting because I was really confused.
Thank you for your help. I have been at it for the passed 3 days, but nothing.