Redis -Xread stream with block

I’m stuck on Stage #Blocking reads#bs1

I’ve tried … Debug with print statements and AI, but I had no success.

Here are my logs:

include relevant logs here ([tester::#BS1] [client-2] ✔︎ Received "0-2"
[tester::#BS1] Expecting 1 client to receive response of XREAD command
[tester::#BS1] [client-1] Received bytes: "$-1\r\n"
[tester::#BS1] [client-1] Received RESP null bulk string: "$-1\r\n"
[tester::#BS1] Expected array, got null bulk string
[tester::#BS1] Test failed)

And here’s a snippet of my code:

include relevant code here: 
This is the logic in my xadd method: (async def handle_xadd(self, writer, commands, lock):
        try:
            if len(commands) < 5:
                writer.write(b'-ERR command need a least 4 arguments \r\n')
                await writer.drain()
                return
            

            key = commands[1]
            id = None
            if commands[2] == "*":
                id = int(time.time()) * 1000
                id = f'{id}-0'
            elif commands[2] == "0-*":
                id = "0-1"
            else:
                id = commands[2]
            args_key = commands[3]
            args_val = commands[4]

            # split id for auto generated id
            id_clone = id
            milli_sec, seq_num = id.split("-")
            if id == "0-0":
                writer.write(b'-ERR The ID specified in XADD must be greater than 0-0\r\n')
                await writer.drain()
                return 

            
            obj = {'args_key': args_key, 'args_val': args_val}
            entry = OrderedDict(obj)

            try:
                async with lock:

                    if key in self.stream_data:
                        old_id = list(self.stream_data[key].keys())[-1]

                        is_id_valid = await self.validate_id(old_id,commands[2], writer)
                        if not is_id_valid:
                            return
                        id = is_id_valid
                        self.stream_data[key][id] = entry
                        # send the id back in bulk

                        # add functionality for coroutine asyncio.Event()
                        # Unblock the correct number of clients
                        if key in self.blocked_clients and self.blocked_clients[key]:
                            ms, seq_num = map(int, id.split("-"))
                            clients_to_notify = []
                            still_waiting = []
                            
                            for start_id, client_event in self.blocked_clients[key]:
                                start_ms, start_seq_num = map(int, start_id.split("-"))
                                
                                # Check if the new entry comes AFTER the client's start_id
                                if ms > start_ms or (ms == start_ms and seq_num > start_seq_num):
                                    # This client should be notified
                                    clients_to_notify.append(client_event)
                                else:
                                    # This client is still waiting
                                    still_waiting.append((start_id, client_event))
                            
                            # Notify all clients that should be unblocked
                            for client_event in clients_to_notify:
                                client_event.set()
                            
                            # Update the blocked_clients list with only those still waiting
                            self.blocked_clients[key] = still_waiting


                        resp = f"${len(id)}\r\n{id}\r\n"
                        writer.write(resp.encode())
                        await writer.drain()

                    elif key not in self.stream_data:
                        if not await self.validate_first_id(writer, id):
                            return
                        self.stream_data[key] = OrderedDict()
                        self.stream_data[key][id] = entry

                        resp = f"${len(id)}\r\n{id}\r\n"
                        writer.write(resp.encode())
                        await writer.drain()

            except TypeError as e:
                writer.write(e.encode())
                await writer.drain()
            except Exception:
                writer.write(b'-ERR key does not exist in the stream\r\n')
                await writer.drain()
        except Exception:
            writer.write(b'-ERR problem with XADD processing \r\n')
            await writer.drain())
This is the logic in my xread method: (async def handle_xread(self, writer, commands, lock):
        keys = None
        _ids = None
        key = None
        start_id = None
        block_id = None
        timeout = (float(commands[2]) / 1000.0) if "block" in commands else None
        
        if "block" in commands:
            # XREAD block 1000 streams stream_key 0-2
            _, _, _, _, key, block_id = commands
            start_id = block_id  # Use same variable name consistently
            if len(commands) > 6:
                new_commands = commands[4:]
                keys = new_commands[:len(new_commands) // 2]
                _ids = new_commands[len(new_commands) // 2:]
        
        if len(commands) > 4 and "block" not in commands:
            new_commands = commands[2:]
            keys = new_commands[:len(new_commands) // 2]
            _ids = new_commands[len(new_commands) // 2:]
        elif len(commands) == 4:
            command, _type, key, start_id = commands
        
        async with lock:
            if "block" in commands:
                if key in self.stream_data and self.stream_data[key]:
                    # Check if there are entries AFTER block_id
                    has_new_data = False
                    block_ms, block_seq = map(int, block_id.split("-"))
                    
                    for entry_id in self.stream_data[key]:
                        entry_ms, entry_seq = map(int, entry_id.split("-"))
                        if entry_ms > block_ms or (entry_ms == block_ms and entry_seq > block_seq):
                            has_new_data = True
                            break
                    
                    if has_new_data:
                        # Return immediately with new data
                        resp = ['*1\r\n']  # Initialize here
                        format_result = await self.format_xread_output(writer, key, block_id, resp, self.stream_data[key])
                        if format_result:
                            resp.append(format_result)
                            resp_str = "".join(resp)
                            writer.write(resp_str.encode())
                            await writer.drain()
                            return
                    else:
                        # No new data, block and wait
                        if key not in self.blocked_clients:
                            self.blocked_clients[key] = []
                        
                        client_event = asyncio.Event()
                        self.blocked_clients[key].append((block_id, client_event))
                        
                        try:
                            wait_task = asyncio.create_task(client_event.wait())
                            await asyncio.wait_for(wait_task, timeout=timeout)
                            
                            # After waking up, check for new data AFTER block_id
                            has_new_data = False
                            for entry_id in self.stream_data[key]:
                                entry_ms, entry_seq = map(int, entry_id.split("-"))
                                if entry_ms > block_ms or (entry_ms == block_ms and entry_seq > block_seq):
                                    has_new_data = True
                                    break
                            
                            if has_new_data:
                                format_result = await self.format_xread_output(writer, key, block_id, resp, self.stream_data[key])
                                if format_result:
                                    resp.append(format_result)
                            
                        except asyncio.TimeoutError:
                            if (block_id, client_event) in self.blocked_clients[key]:
                                self.blocked_clients[key].remove((block_id, client_event))
                            
                            writer.write(b'$-1\r\n')
                            await writer.drain()
                            return
                else:
                    # Stream doesn't exist yet, block and wait
                    if key not in self.blocked_clients:
                        self.blocked_clients[key] = []
                    
                    client_event = asyncio.Event()
                    self.blocked_clients[key].append((block_id, client_event))
                    
                    try:
                        wait_task = asyncio.create_task(client_event.wait())
                        await asyncio.wait_for(wait_task, timeout=timeout)
                        
                        # After waking up, return new data
                        if key in self.stream_data:
                            format_result = await self.format_xread_output(writer, key, block_id, resp, self.stream_data[key])
                            if format_result:
                                resp.append(format_result)
                        
                    except asyncio.TimeoutError:
                        if (block_id, client_event) in self.blocked_clients[key]:
                            self.blocked_clients[key].remove((block_id, client_event))
                        
                        writer.write(b'$-1\r\n')
                        await writer.drain()
                        return
            
            elif len(commands) == 4:
                resp = ['*1\r\n']
                if key in self.stream_data:
                    format_result = await self.format_xread_output(writer, key, start_id, resp, self.stream_data[key])
                    if format_result:
                        resp.append(format_result)
                resp_str = "".join(resp)
                writer.write(resp_str.encode())
                await writer.drain()
            
            elif len(commands) > 4 and "block" not in commands:
                resp = [f'*{len(keys)}\r\n']
                for i in range(len(keys)):
                    key = keys[i]
                    start_id = _ids[i]
                    if key in self.stream_data:
                        format_result = await self.format_xread_output(writer, key, start_id, resp, self.stream_data[key])
                        if format_result:
                            resp.append(format_result)
                resp_str = "".join(resp)
                writer.write(resp_str.encode())
                await writer.drain())
and finally this is my formating Resp method: (async def format_xread_output(self, writer, key, start_id, resp, stream):
    
        start_ms, start_seq = map(int, start_id.split("-"))
        
        matching_entries = []
        for entry_id in stream.keys():
            entry_ms, entry_seq = map(int, entry_id.split("-"))
            
            if entry_ms > start_ms or (entry_ms == start_ms and entry_seq > start_seq):
                matching_entries.append((entry_id, stream[entry_id]))
        
        if not matching_entries:
            return None
        
        # Build response format:
        # *2
        # $<key_length>
        # <key>
        # *<num_entries>
        # *2
        # $<id_length>
        # <id>
        # *<num_fields>
        # $<field_length>
        # <field>
        # $<value_length>
        # <value>
        
        result = []
        
        # Stream array: *2 (key and entries)
        result.append('*2\r\n')
        
        # Key as bulk string
        result.append(f'${len(key)}\r\n{key}\r\n')
        
        # Number of entries
        result.append(f'*{len(matching_entries)}\r\n')
        
        # Each entry
        for entry_id, entry_data in matching_entries:
            # Entry is an array of 2: [id, [field, value, ...]]
            result.append('*2\r\n')
            
            # Entry ID as bulk string
            result.append(f'${len(entry_id)}\r\n{entry_id}\r\n')
            
            # Fields and values array
            num_fields = len(entry_data)
            result.append(f'*{num_fields * 2}\r\n')  # field-value pairs
            
            # Add each field and value
            for field, value in entry_data.items():
                result.append(f'${len(field)}\r\n{field}\r\n')
                result.append(f'${len(value)}\r\n{value}\r\n')
        
        return ''.join(result))
I use claude to help with the formatting because I was really confused. 

Thank you for your help. I have been at it for the passed 3 days, but nothing.

Hey @StephtheITSloth, I tried running your code against the previous stages, but it’s actually no longer passing a previous stage #UM0 (Streams - Query single stream using XREAD).

Suggestions:

  1. Use our CLI to test against previous stages by running:
codecrafters test --previous
  1. Focus on fixing the early stages first, as later stages depend on them.

Let me know if you’d like help debugging this one first!