Redis -Xread stream with block

I’m stuck on Stage #Blocking reads#bs1

I’ve tried … Debug with print statements and AI, but I had no success.

Here are my logs:

include relevant logs here ([tester::#BS1] [client-2] ✔︎ Received "0-2"
[tester::#BS1] Expecting 1 client to receive response of XREAD command
[tester::#BS1] [client-1] Received bytes: "$-1\r\n"
[tester::#BS1] [client-1] Received RESP null bulk string: "$-1\r\n"
[tester::#BS1] Expected array, got null bulk string
[tester::#BS1] Test failed)

And here’s a snippet of my code:

include relevant code here: 
This is the logic in my xadd method: (async def handle_xadd(self, writer, commands, lock):
        try:
            if len(commands) < 5:
                writer.write(b'-ERR command need a least 4 arguments \r\n')
                await writer.drain()
                return
            

            key = commands[1]
            id = None
            if commands[2] == "*":
                id = int(time.time()) * 1000
                id = f'{id}-0'
            elif commands[2] == "0-*":
                id = "0-1"
            else:
                id = commands[2]
            args_key = commands[3]
            args_val = commands[4]

            # split id for auto generated id
            id_clone = id
            milli_sec, seq_num = id.split("-")
            if id == "0-0":
                writer.write(b'-ERR The ID specified in XADD must be greater than 0-0\r\n')
                await writer.drain()
                return 

            
            obj = {'args_key': args_key, 'args_val': args_val}
            entry = OrderedDict(obj)

            try:
                async with lock:

                    if key in self.stream_data:
                        old_id = list(self.stream_data[key].keys())[-1]

                        is_id_valid = await self.validate_id(old_id,commands[2], writer)
                        if not is_id_valid:
                            return
                        id = is_id_valid
                        self.stream_data[key][id] = entry
                        # send the id back in bulk

                        # add functionality for coroutine asyncio.Event()
                        # Unblock the correct number of clients
                        if key in self.blocked_clients and self.blocked_clients[key]:
                            ms, seq_num = map(int, id.split("-"))
                            clients_to_notify = []
                            still_waiting = []
                            
                            for start_id, client_event in self.blocked_clients[key]:
                                start_ms, start_seq_num = map(int, start_id.split("-"))
                                
                                # Check if the new entry comes AFTER the client's start_id
                                if ms > start_ms or (ms == start_ms and seq_num > start_seq_num):
                                    # This client should be notified
                                    clients_to_notify.append(client_event)
                                else:
                                    # This client is still waiting
                                    still_waiting.append((start_id, client_event))
                            
                            # Notify all clients that should be unblocked
                            for client_event in clients_to_notify:
                                client_event.set()
                            
                            # Update the blocked_clients list with only those still waiting
                            self.blocked_clients[key] = still_waiting


                        resp = f"${len(id)}\r\n{id}\r\n"
                        writer.write(resp.encode())
                        await writer.drain()

                    elif key not in self.stream_data:
                        if not await self.validate_first_id(writer, id):
                            return
                        self.stream_data[key] = OrderedDict()
                        self.stream_data[key][id] = entry

                        resp = f"${len(id)}\r\n{id}\r\n"
                        writer.write(resp.encode())
                        await writer.drain()

            except TypeError as e:
                writer.write(e.encode())
                await writer.drain()
            except Exception:
                writer.write(b'-ERR key does not exist in the stream\r\n')
                await writer.drain()
        except Exception:
            writer.write(b'-ERR problem with XADD processing \r\n')
            await writer.drain())
This is the logic in my xread method: (async def handle_xread(self, writer, commands, lock):
        keys = None
        _ids = None
        key = None
        start_id = None
        block_id = None
        timeout = (float(commands[2]) / 1000.0) if "block" in commands else None
        
        if "block" in commands:
            # XREAD block 1000 streams stream_key 0-2
            _, _, _, _, key, block_id = commands
            start_id = block_id  # Use same variable name consistently
            if len(commands) > 6:
                new_commands = commands[4:]
                keys = new_commands[:len(new_commands) // 2]
                _ids = new_commands[len(new_commands) // 2:]
        
        if len(commands) > 4 and "block" not in commands:
            new_commands = commands[2:]
            keys = new_commands[:len(new_commands) // 2]
            _ids = new_commands[len(new_commands) // 2:]
        elif len(commands) == 4:
            command, _type, key, start_id = commands
        
        async with lock:
            if "block" in commands:
                if key in self.stream_data and self.stream_data[key]:
                    # Check if there are entries AFTER block_id
                    has_new_data = False
                    block_ms, block_seq = map(int, block_id.split("-"))
                    
                    for entry_id in self.stream_data[key]:
                        entry_ms, entry_seq = map(int, entry_id.split("-"))
                        if entry_ms > block_ms or (entry_ms == block_ms and entry_seq > block_seq):
                            has_new_data = True
                            break
                    
                    if has_new_data:
                        # Return immediately with new data
                        resp = ['*1\r\n']  # Initialize here
                        format_result = await self.format_xread_output(writer, key, block_id, resp, self.stream_data[key])
                        if format_result:
                            resp.append(format_result)
                            resp_str = "".join(resp)
                            writer.write(resp_str.encode())
                            await writer.drain()
                            return
                    else:
                        # No new data, block and wait
                        if key not in self.blocked_clients:
                            self.blocked_clients[key] = []
                        
                        client_event = asyncio.Event()
                        self.blocked_clients[key].append((block_id, client_event))
                        
                        try:
                            wait_task = asyncio.create_task(client_event.wait())
                            await asyncio.wait_for(wait_task, timeout=timeout)
                            
                            # After waking up, check for new data AFTER block_id
                            has_new_data = False
                            for entry_id in self.stream_data[key]:
                                entry_ms, entry_seq = map(int, entry_id.split("-"))
                                if entry_ms > block_ms or (entry_ms == block_ms and entry_seq > block_seq):
                                    has_new_data = True
                                    break
                            
                            if has_new_data:
                                format_result = await self.format_xread_output(writer, key, block_id, resp, self.stream_data[key])
                                if format_result:
                                    resp.append(format_result)
                            
                        except asyncio.TimeoutError:
                            if (block_id, client_event) in self.blocked_clients[key]:
                                self.blocked_clients[key].remove((block_id, client_event))
                            
                            writer.write(b'$-1\r\n')
                            await writer.drain()
                            return
                else:
                    # Stream doesn't exist yet, block and wait
                    if key not in self.blocked_clients:
                        self.blocked_clients[key] = []
                    
                    client_event = asyncio.Event()
                    self.blocked_clients[key].append((block_id, client_event))
                    
                    try:
                        wait_task = asyncio.create_task(client_event.wait())
                        await asyncio.wait_for(wait_task, timeout=timeout)
                        
                        # After waking up, return new data
                        if key in self.stream_data:
                            format_result = await self.format_xread_output(writer, key, block_id, resp, self.stream_data[key])
                            if format_result:
                                resp.append(format_result)
                        
                    except asyncio.TimeoutError:
                        if (block_id, client_event) in self.blocked_clients[key]:
                            self.blocked_clients[key].remove((block_id, client_event))
                        
                        writer.write(b'$-1\r\n')
                        await writer.drain()
                        return
            
            elif len(commands) == 4:
                resp = ['*1\r\n']
                if key in self.stream_data:
                    format_result = await self.format_xread_output(writer, key, start_id, resp, self.stream_data[key])
                    if format_result:
                        resp.append(format_result)
                resp_str = "".join(resp)
                writer.write(resp_str.encode())
                await writer.drain()
            
            elif len(commands) > 4 and "block" not in commands:
                resp = [f'*{len(keys)}\r\n']
                for i in range(len(keys)):
                    key = keys[i]
                    start_id = _ids[i]
                    if key in self.stream_data:
                        format_result = await self.format_xread_output(writer, key, start_id, resp, self.stream_data[key])
                        if format_result:
                            resp.append(format_result)
                resp_str = "".join(resp)
                writer.write(resp_str.encode())
                await writer.drain())
and finally this is my formating Resp method: (async def format_xread_output(self, writer, key, start_id, resp, stream):
    
        start_ms, start_seq = map(int, start_id.split("-"))
        
        matching_entries = []
        for entry_id in stream.keys():
            entry_ms, entry_seq = map(int, entry_id.split("-"))
            
            if entry_ms > start_ms or (entry_ms == start_ms and entry_seq > start_seq):
                matching_entries.append((entry_id, stream[entry_id]))
        
        if not matching_entries:
            return None
        
        # Build response format:
        # *2
        # $<key_length>
        # <key>
        # *<num_entries>
        # *2
        # $<id_length>
        # <id>
        # *<num_fields>
        # $<field_length>
        # <field>
        # $<value_length>
        # <value>
        
        result = []
        
        # Stream array: *2 (key and entries)
        result.append('*2\r\n')
        
        # Key as bulk string
        result.append(f'${len(key)}\r\n{key}\r\n')
        
        # Number of entries
        result.append(f'*{len(matching_entries)}\r\n')
        
        # Each entry
        for entry_id, entry_data in matching_entries:
            # Entry is an array of 2: [id, [field, value, ...]]
            result.append('*2\r\n')
            
            # Entry ID as bulk string
            result.append(f'${len(entry_id)}\r\n{entry_id}\r\n')
            
            # Fields and values array
            num_fields = len(entry_data)
            result.append(f'*{num_fields * 2}\r\n')  # field-value pairs
            
            # Add each field and value
            for field, value in entry_data.items():
                result.append(f'${len(field)}\r\n{field}\r\n')
                result.append(f'${len(value)}\r\n{value}\r\n')
        
        return ''.join(result))
I use claude to help with the formatting because I was really confused. 

Thank you for your help. I have been at it for the passed 3 days, but nothing.

Hey @StephtheITSloth, I tried running your code against the previous stages, but it’s actually no longer passing a previous stage #UM0 (Streams - Query single stream using XREAD).

Suggestions:

  1. Use our CLI to test against previous stages by running:
codecrafters test --previous
  1. Focus on fixing the early stages first, as later stages depend on them.

Let me know if you’d like help debugging this one first!

Closing this thread due to inactivity. If you still need assistance, feel free to reopen or start a new discussion!

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.