Redis Streams Stage #ZN8: Blocking reads

I’m stuck on Redis Streams Stage 11: Blocking reads.

I’ve tried … (mention what you’ve tried so far).

Here are my logs:

Debug = true
[streams-11] Running tests for Streams > Stage #11: Blocking reads
[streams-11] $ ./spawn_redis_server.sh
[streams-11] $ redis-cli xadd "mango" "0-1" "temperature 2"
[your_program] Initializing DB {. dump.rdb}
[your_program] Message Received: "*5\r\n$4\r\nxadd\r\n$5\r\nmango\r\n$3\r\n0-1\r\n$11\r\ntemperature\r\n$1\r\n2\r\n"
[your_program] writing messages to conn: ["$3\r\n0-1\r\n"]
[streams-11] Received response: ""0-1""
[streams-11] $ redis-cli xread block 'Ϩ' streams "mango 0-1"
[your_program] Message Received: "*6\r\n$5\r\nxread\r\n$5\r\nblock\r\n$4\r\n1000\r\n$7\r\nstreams\r\n$5\r\nmango\r\n$3\r\n0-1\r\n"
[streams-11] $ redis-cli xadd "mango" "0-2" "temperature 2"
[your_program] Message Received: "*5\r\n$4\r\nxadd\r\n$5\r\nmango\r\n$3\r\n0-2\r\n$11\r\ntemperature\r\n$1\r\n2\r\n"
[your_program] writing messages to conn: ["$3\r\n0-2\r\n"]
[streams-11] Received response: ""0-2""
[your_program] writing messages to conn: ["*1\r\n*2\r\n$5\r\nmango\r\n*1\r\n*2\r\n$3\r\n0-2\r\n*2\r\n$11\r\ntemperature\r\n$1\r\n2\r\n"]
[streams-11] Received response: "[
[streams-11]   {
[streams-11]     "Stream": "mango",
[streams-11]     "Messages": [
[streams-11]       {
[streams-11]         "ID": "0-2",
[streams-11]         "Values": {
[streams-11]           "temperature": "2"
[streams-11]         }
[streams-11]       }
[streams-11]     ]
[streams-11]   }
[streams-11] ]"
[streams-11] $ redis-cli xread block 'Ϩ' streams "mango 0-2"
[your_program] Message Received: "*6\r\n$5\r\nxread\r\n$5\r\nblock\r\n$4\r\n1000\r\n$7\r\nstreams\r\n$5\r\nmango\r\n$3\r\n0-2\r\n"
[your_program] writing messages to conn: ["-1\r\n"]
[streams-11] 1
[streams-11] Test failed
[streams-11] Terminating program
[streams-11] Program terminated successfully

And here’s a snippet of my code:

func (ch *Commands) XReadHandler(requestLines []string) ([]string, error) {
	if len(requestLines) < 9 {
		return nil, fmt.Errorf("invalid command received. XRANGE should have more arguments: %s", requestLines)
	}

	indexJ := 5
	if Command(strings.ToUpper(requestLines[4])) == BLOCK {
		indexJ += 4
		blockTimeout, err := strconv.Atoi(requestLines[6])
		if err == nil {
			time.Sleep(time.Duration(blockTimeout) * time.Millisecond)
		}
	}

	xreadStreamCount := len(requestLines[indexJ:]) / 4

	var streamKeys, entryIDs []string
	for i := 0; i < xreadStreamCount; i++ {
		streamKeys = append(streamKeys, requestLines[indexJ + 1])
		indexJ += 2
	}

	for i := 0; i < xreadStreamCount; i++ {
		entryIDs = append(entryIDs, requestLines[indexJ + 1])
		indexJ += 2
	}

	readStreams := make(map[string][]store.StreamValues)
	for i := 0; i < xreadStreamCount; i++ {
		readStreams[streamKeys[i]] = ch.Store.StreamStore.ReadEntry(streamKeys[i], entryIDs[i])
	}

	if len(readStreams) == 0 || (len(readStreams) == 1 && len(readStreams[streamKeys[0]]) == 0){
		return NullResponse(), nil
	}

	var resp string
	resp = fmt.Sprintf("*%v\r\n", len(readStreams))

	for streamName, streamValues := range readStreams {

		resp += "*2\r\n"
		resp += ResponseBuilder(BulkStringsRespType, streamName)

		resp += fmt.Sprintf("*%v\r\n", len(streamValues))
		for _, val := range streamValues {
			resp += "*2\r\n"
			resp += ResponseBuilder(BulkStringsRespType, val.ID)
	
			innerResp := make([]string, 0)
			for _, entry := range val.Entry {
				innerResp = append(innerResp, entry.Key, entry.Value)
			}
	
			resp += ResponseBuilder(ArraysRespType, innerResp...)
		}
	}

	return []string{resp}, nil
}

According to my logs, a RESP Null String [“-1\r\n”] was sent on the connection. Is that not expected? The failure log is not clear in this case.

Would really appreciate someone helping me look at the logs and hoping for a prompt response!

Please let me know if you have any questions or require more context on my code snippet, thanks!

We’ve got a new RESP parser that emits more friendly logs, but it isn’t wired up for streams yet. Will keep this open until that’s done!

@sanchitdeora btw, I think the error here is that the representation of a null string is $-1\r\n, not -1\r\n

1 Like

Thanks @rohitpaulk!
That was a silly mistake, seems like everything works well now.

It was a really fun challenge!

1 Like

@sanchitdeora glad to hear!

@rohitpaulk for this i wanted to know if there can be multiple streams like the previous task?

@Jay-0331 Ah, I think it’s only one in this stage. Will see if we can improve the instructions here!

Just leaving a quick summary of what’s left before we mark this as solved:

  • Integrate new parser into streams extension

Note: I’ve updated the title of this post to include the stage ID (#ZN8). You can learn about the stages rename here: Upcoming change: Stages overhaul.

We’ve integrated the new parser into the first 5 stages of streams. It actually turned out to be a bit more difficult for the laster stages since the expected output structures are different and would be very verbose with the way our new parser currently works.

Will mark this as closed for now and prioritize integrating the parser into the other stages based on whether we receive similar reports!

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.