I’m stuck on Redis Streams Stage 11: Blocking reads.
I’ve tried … (mention what you’ve tried so far).
Here are my logs:
Debug = true
[streams-11] Running tests for Streams > Stage #11: Blocking reads
[streams-11] $ ./spawn_redis_server.sh
[streams-11] $ redis-cli xadd "mango" "0-1" "temperature 2"
[your_program] Initializing DB {. dump.rdb}
[your_program] Message Received: "*5\r\n$4\r\nxadd\r\n$5\r\nmango\r\n$3\r\n0-1\r\n$11\r\ntemperature\r\n$1\r\n2\r\n"
[your_program] writing messages to conn: ["$3\r\n0-1\r\n"]
[streams-11] Received response: ""0-1""
[streams-11] $ redis-cli xread block 'Ϩ' streams "mango 0-1"
[your_program] Message Received: "*6\r\n$5\r\nxread\r\n$5\r\nblock\r\n$4\r\n1000\r\n$7\r\nstreams\r\n$5\r\nmango\r\n$3\r\n0-1\r\n"
[streams-11] $ redis-cli xadd "mango" "0-2" "temperature 2"
[your_program] Message Received: "*5\r\n$4\r\nxadd\r\n$5\r\nmango\r\n$3\r\n0-2\r\n$11\r\ntemperature\r\n$1\r\n2\r\n"
[your_program] writing messages to conn: ["$3\r\n0-2\r\n"]
[streams-11] Received response: ""0-2""
[your_program] writing messages to conn: ["*1\r\n*2\r\n$5\r\nmango\r\n*1\r\n*2\r\n$3\r\n0-2\r\n*2\r\n$11\r\ntemperature\r\n$1\r\n2\r\n"]
[streams-11] Received response: "[
[streams-11] {
[streams-11] "Stream": "mango",
[streams-11] "Messages": [
[streams-11] {
[streams-11] "ID": "0-2",
[streams-11] "Values": {
[streams-11] "temperature": "2"
[streams-11] }
[streams-11] }
[streams-11] ]
[streams-11] }
[streams-11] ]"
[streams-11] $ redis-cli xread block 'Ϩ' streams "mango 0-2"
[your_program] Message Received: "*6\r\n$5\r\nxread\r\n$5\r\nblock\r\n$4\r\n1000\r\n$7\r\nstreams\r\n$5\r\nmango\r\n$3\r\n0-2\r\n"
[your_program] writing messages to conn: ["-1\r\n"]
[streams-11] 1
[streams-11] Test failed
[streams-11] Terminating program
[streams-11] Program terminated successfully
And here’s a snippet of my code:
func (ch *Commands) XReadHandler(requestLines []string) ([]string, error) {
if len(requestLines) < 9 {
return nil, fmt.Errorf("invalid command received. XRANGE should have more arguments: %s", requestLines)
}
indexJ := 5
if Command(strings.ToUpper(requestLines[4])) == BLOCK {
indexJ += 4
blockTimeout, err := strconv.Atoi(requestLines[6])
if err == nil {
time.Sleep(time.Duration(blockTimeout) * time.Millisecond)
}
}
xreadStreamCount := len(requestLines[indexJ:]) / 4
var streamKeys, entryIDs []string
for i := 0; i < xreadStreamCount; i++ {
streamKeys = append(streamKeys, requestLines[indexJ + 1])
indexJ += 2
}
for i := 0; i < xreadStreamCount; i++ {
entryIDs = append(entryIDs, requestLines[indexJ + 1])
indexJ += 2
}
readStreams := make(map[string][]store.StreamValues)
for i := 0; i < xreadStreamCount; i++ {
readStreams[streamKeys[i]] = ch.Store.StreamStore.ReadEntry(streamKeys[i], entryIDs[i])
}
if len(readStreams) == 0 || (len(readStreams) == 1 && len(readStreams[streamKeys[0]]) == 0){
return NullResponse(), nil
}
var resp string
resp = fmt.Sprintf("*%v\r\n", len(readStreams))
for streamName, streamValues := range readStreams {
resp += "*2\r\n"
resp += ResponseBuilder(BulkStringsRespType, streamName)
resp += fmt.Sprintf("*%v\r\n", len(streamValues))
for _, val := range streamValues {
resp += "*2\r\n"
resp += ResponseBuilder(BulkStringsRespType, val.ID)
innerResp := make([]string, 0)
for _, entry := range val.Entry {
innerResp = append(innerResp, entry.Key, entry.Value)
}
resp += ResponseBuilder(ArraysRespType, innerResp...)
}
}
return []string{resp}, nil
}
According to my logs, a RESP Null String [“-1\r\n”] was sent on the connection. Is that not expected? The failure log is not clear in this case.
Would really appreciate someone helping me look at the logs and hoping for a prompt response!
Please let me know if you have any questions or require more context on my code snippet, thanks!