I’m stuck on Stage #YH3. The problem is not quite a problem of #YH3 because when I run tests from past stages, #YG4 test sometimes fails, sometimes succeeds.
I’ve tried to log what replica reads from replication connection and after handshake is setup and successfully processing 3 SET
commands, replica doesn’t receive GET
command. I feel like tester master server sends replication commands too fast, but it should not be an issue because I do read connection with buffered I/O.
Here are my logs:
[tester::#YG4] Running tests for Stage #YG4 (Replication - Command Processing)
[tester::#YG4] Master is running on port 6379
[tester::#YG4] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
[tester::#YG4] [handshake] master: Waiting for replica to initiate handshake with "PING" command
[tester::#YG4] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["PING"]
[tester::#YG4] [handshake] Received ["PING"]
[tester::#YG4] [handshake] master: Sent "PONG"
[tester::#YG4] [handshake] master: Sent bytes: "+PONG\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "listening-port", "6380"]
[tester::#YG4] [handshake] Received ["REPLCONF", "listening-port", "6380"]
[tester::#YG4] [handshake] master: Sent "OK"
[tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF capa" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
[tester::#YG4] [handshake] Received ["REPLCONF", "capa", "psync2"]
[tester::#YG4] [handshake] master: Sent "OK"
[tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "PSYNC" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
[tester::#YG4] [handshake] Received ["PSYNC", "?", "-1"]
[tester::#YG4] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
[tester::#YG4] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
[tester::#YG4] [handshake] Sending RDB file...
[tester::#YG4] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#YG4] [handshake] Sent RDB file.
[your_program] Replica: Starting to process replication commands
[tester::#YG4] [propagation] master: > SET foo 123
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n"
[tester::#YG4] [propagation] master: > SET bar 456
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n"
[tester::#YG4] [propagation] master: > SET baz 789
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n"
[tester::#YG4] [test] Getting key foo
[tester::#YG4] [test] client: $ redis-cli GET foo
[tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
[your_program] [replication conn] received cmd: [SET foo 123]
[your_program] [replication conn] received cmd: [SET bar 456]
[your_program] [replication conn] received cmd: [SET baz 789]
[tester::#YG4] Received: "" (no content received)
[tester::#YG4] ^ error
[tester::#YG4] Error: Expected start of a new RESP2 value (either +, -, :, $ or *)
[tester::#YG4] Test failed
[tester::#YG4] Terminating program
[your_program] [replication conn] ReadRedisCommand err: EOF
[tester::#YG4] Program terminated successfully
And here’s a snippet of my code:
main.go
func main() {
...
if cfg.Role == "slave" {
replica := server.NewReplica(cfg, kvStore)
// If replica mode is enabled, connect to master
err := replica.ConnectToMaster()
if err != nil {
fmt.Printf("Error connecting to master: %v\n", err)
os.Exit(1)
}
// Start the replica server
err = replica.Start()
if err != nil {
fmt.Printf("Error starting replica server: %v\n", err)
os.Exit(1)
}
}
}
replica.go
:
func (server *Replica) ConnectToMaster() error {
masterConn, reader, err := server.sendHandshake()
if err != nil {
fmt.Printf("Error sending handshake to master: %v\n", err)
return err
}
go server.processReplicationCommands(masterConn, reader)
return nil
}
func (server *Replica) sendHandshake() (net.Conn, *bufio.Reader, error) {
// HANDSHAKE:
// PART 1: send `PING to master`
// PART 2a: send `REPLCONF listening-port <PORT>`
// PART 2b: send `REPLCONF capa psync2`
// PART 3: send `PSYNC <replicationID> <masterOffset>`
// PART 4: accept RDB file
// PART 1
parts := strings.Split(server.cfg.MasterHostAndPort, " ")
if len(parts) != 2 {
os.Exit(0)
}
masterHost, masterPort := parts[0], parts[1]
masterAddr := masterHost + ":" + masterPort
masterConn, err := net.Dial("tcp", masterAddr)
reader := bufio.NewReader(masterConn)
// connection.SetReadDeadline(time.Now().Add(10 * time.Second))
if err != nil {
return nil, nil, fmt.Errorf("tried to connect to master node on %s", masterAddr)
}
_, err = masterConn.Write([]byte(protocol.FormatRESPArray([]string{"PING"})))
if err != nil {
return nil, nil, fmt.Errorf("couldn't send PING to master node")
}
// read response: should get PONG
pongLine, err := reader.ReadString('\n')
if errors.Is(err, io.EOF) {
return nil, nil, err
}
pongLine = strings.TrimSpace(pongLine)
if pongLine != "+PONG" {
return nil, nil, fmt.Errorf("unexpected response, at PART1: %s", pongLine)
}
// PART 2a
replConfCmds1 := []string{"REPLCONF", "listening-port", fmt.Sprint(server.cfg.Port)}
_, err = masterConn.Write([]byte(protocol.FormatRESPArray(replConfCmds1)))
if err != nil {
return nil, nil, err
}
okLine, err := reader.ReadString('\n')
if errors.Is(err, io.EOF) {
return nil, nil, err
}
okLine = strings.TrimSpace(okLine)
if okLine != "+OK" {
return nil, nil, fmt.Errorf("unexpected response, at PART2a: %s", okLine)
}
// PART 2b
replConfCmds2 := []string{"REPLCONF", "capa", "psync2"}
_, err = masterConn.Write([]byte(protocol.FormatRESPArray(replConfCmds2)))
if err != nil {
return nil, nil, err
}
okLine, err = reader.ReadString('\n')
if errors.Is(err, io.EOF) {
return nil, nil, err
}
okLine = strings.TrimSpace(okLine)
if okLine != "+OK" {
return nil, nil, fmt.Errorf("unexpected response, at PART2b: %s", okLine)
}
// PART 3
psyncCmds := []string{"PSYNC"}
if server.cfg.MasterReplID == "" {
// first time connecting to master
psyncCmds = append(psyncCmds, "?")
} else {
psyncCmds = append(psyncCmds, fmt.Sprint(server.cfg.MasterReplID))
}
psyncCmds = append(psyncCmds, fmt.Sprint(server.cfg.MasterReplOffset))
_, err = masterConn.Write([]byte(protocol.FormatRESPArray(psyncCmds)))
if err != nil {
return nil, nil, err
}
fullSyncLine, err := reader.ReadString('\n')
if err != nil {
return nil, nil, fmt.Errorf("failed to read FULLRESYNC: %v", err)
}
fullSyncLine = strings.TrimSpace(fullSyncLine)
if !strings.HasPrefix(fullSyncLine, "+FULLRESYNC") {
return nil, nil, fmt.Errorf("expected FULLRESYNC, got: %s", fullSyncLine)
}
parts = strings.Split(fullSyncLine, " ")
if len(parts) != 3 {
return nil, nil, fmt.Errorf("invalid FULLRESYNC format: %s", fullSyncLine)
}
server.cfg.MasterReplID = parts[1] // Store the replication ID
// PART 4
rdbSizeLine, err := reader.ReadString('\n')
if err != nil {
return nil, nil, fmt.Errorf("failed to read RDB size: %v", err)
}
rdbSizeLine = strings.TrimSpace(rdbSizeLine)
if !strings.HasPrefix(rdbSizeLine, "$") {
return nil, nil, fmt.Errorf("expected RDB size marker ($), got: %s", rdbSizeLine)
}
rdbSize, err := strconv.Atoi(rdbSizeLine[1:])
if err != nil {
return nil, nil, fmt.Errorf("invalid RDB size: %s", rdbSizeLine[1:])
}
rdbData := make([]byte, rdbSize)
_, err = io.ReadFull(reader, rdbData)
if err != nil {
return nil, nil, fmt.Errorf("failed to read RDB data: %v", err)
}
return masterConn, reader, nil
}
func (server *Replica) processReplicationCommands(masterConn net.Conn, reader *bufio.Reader) {
defer masterConn.Close()
server.cfg.MasterReplOffset = 0
fmt.Printf("Replica: Starting to process replication commands\n")
for {
// fmt.Printf("Replica: Waiting for next command from master...\n")
cmd, bytesProcessed, err := protocol.ReadRedisCommand(reader)
if err != nil {
fmt.Printf("[replication conn] ReadRedisCommand err: %v\n", err)
return
}
if len(cmd) == 0 {
continue
}
fmt.Printf("[replication conn] received cmd: %v\n", cmd)
switch strings.ToLower(cmd[0]) {
case "ping":
// master alive
case "set":
if len(cmd) == 3 {
_ = server.kvStore.Set(cmd[1], cmd[2], -1)
} else if len(cmd) == 4 {
expireTimeMs, err := strconv.Atoi(cmd[3])
if err != nil {
fmt.Printf("expire time is not a number")
}
_ = server.kvStore.Set(cmd[1], cmd[2], expireTimeMs)
}
case "replconf":
if len(cmd) >= 3 && strings.ToLower(cmd[1]) == "getack" {
ackCmd := []string{"REPLCONF", "ACK", fmt.Sprintf("%d", server.cfg.MasterReplOffset)}
respData := protocol.FormatRESPArray(ackCmd)
_, err := masterConn.Write([]byte(respData))
if err != nil {
fmt.Printf("Replica: Error sending ACK: %v\n", err)
}
fmt.Printf("[replication conn] sent: %q\n", []byte(respData))
}
default:
fmt.Printf("Replica got unknown replication command: %v\n", cmd)
}
server.cfg.MasterReplOffset += bytesProcessed
}
}
resp.go
:
func ReadRedisCommand(reader *bufio.Reader) ([]string, int, error) {
bytesProcessed := 0
// Read the first line which should be the array marker
line, err := reader.ReadString('\n')
if err != nil {
return nil, 0, err
}
bytesProcessed += len(line)
line = strings.TrimSpace(line)
// Check if this is an array
if !strings.HasPrefix(line, "*") {
return nil, 0, fmt.Errorf("expected array, got: %s", line)
}
// Parse number of elements in the array
count, err := strconv.Atoi(line[1:])
if err != nil {
return nil, 0, fmt.Errorf("invalid array length: %s", line[1:])
}
// Read each element in the command
cmd := make([]string, count)
for i := 0; i < count; i++ {
// Read bulk string marker
line, err := reader.ReadString('\n')
if err != nil {
return nil, 0, err
}
bytesProcessed += len(line)
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "$") {
return nil, 0, fmt.Errorf("expected bulk string, got: %s", line)
}
// Parse string length
strLen, err := strconv.Atoi(line[1:])
if err != nil {
return nil, 0, fmt.Errorf("invalid string length: %s", line[1:])
}
// Read exactly strLen bytes
value := make([]byte, strLen)
n, err := io.ReadFull(reader, value)
if err != nil {
return nil, 0, err
}
bytesProcessed += n
// Read the trailing \r\n
trainlingLine, err := reader.ReadString('\n')
if err != nil {
return nil, 0, err
}
bytesProcessed += len(trainlingLine)
cmd[i] = string(value)
}
return cmd, bytesProcessed, nil
}
For context, previously I had an issue with reading from the connection in here and issue was that I was using different readers, creating multiple readers from connection, leading to old reader storing new commands. But I fixed it by creating only single reader and it fixed the issue. But I am again seeing the flaky behaviour.
Here is my source code: redis-go/app at redis-streams · zulfkhar00/redis-go · GitHub