Replica doesn't get replication command - #YG4 test

I’m stuck on Stage #YH3. The problem is not quite a problem of #YH3 because when I run tests from past stages, #YG4 test sometimes fails, sometimes succeeds.

I’ve tried to log what replica reads from replication connection and after handshake is setup and successfully processing 3 SET commands, replica doesn’t receive GET command. I feel like tester master server sends replication commands too fast, but it should not be an issue because I do read connection with buffered I/O.

Here are my logs:

[tester::#YG4] Running tests for Stage #YG4 (Replication - Command Processing)
[tester::#YG4] Master is running on port 6379
[tester::#YG4] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
[tester::#YG4] [handshake] master: Waiting for replica to initiate handshake with "PING" command
[tester::#YG4] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["PING"]
[tester::#YG4] [handshake] Received ["PING"]
[tester::#YG4] [handshake] master: Sent "PONG"
[tester::#YG4] [handshake] master: Sent bytes: "+PONG\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "listening-port", "6380"]
[tester::#YG4] [handshake] Received ["REPLCONF", "listening-port", "6380"]
[tester::#YG4] [handshake] master: Sent "OK"
[tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF capa" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
[tester::#YG4] [handshake] Received ["REPLCONF", "capa", "psync2"]
[tester::#YG4] [handshake] master: Sent "OK"
[tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
[tester::#YG4] [handshake] master: Waiting for replica to send "PSYNC" command
[tester::#YG4] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
[tester::#YG4] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
[tester::#YG4] [handshake] Received ["PSYNC", "?", "-1"]
[tester::#YG4] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
[tester::#YG4] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
[tester::#YG4] [handshake] Sending RDB file...
[tester::#YG4] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#YG4] [handshake] Sent RDB file.
[your_program] Replica: Starting to process replication commands
[tester::#YG4] [propagation] master: > SET foo 123
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n"
[tester::#YG4] [propagation] master: > SET bar 456
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n"
[tester::#YG4] [propagation] master: > SET baz 789
[tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n"
[tester::#YG4] [test] Getting key foo
[tester::#YG4] [test] client: $ redis-cli GET foo
[tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
[your_program] [replication conn] received cmd: [SET foo 123]
[your_program] [replication conn] received cmd: [SET bar 456]
[your_program] [replication conn] received cmd: [SET baz 789]
[tester::#YG4] Received: "" (no content received)
[tester::#YG4]            ^ error
[tester::#YG4] Error: Expected start of a new RESP2 value (either +, -, :, $ or *)
[tester::#YG4] Test failed
[tester::#YG4] Terminating program
[your_program] [replication conn] ReadRedisCommand err: EOF
[tester::#YG4] Program terminated successfully

And here’s a snippet of my code:
main.go

func main() {
	...

	if cfg.Role == "slave" {
		replica := server.NewReplica(cfg, kvStore)

		// If replica mode is enabled, connect to master
		err := replica.ConnectToMaster()
		if err != nil {
			fmt.Printf("Error connecting to master: %v\n", err)
			os.Exit(1)
		}

		// Start the replica server
		err = replica.Start()
		if err != nil {
			fmt.Printf("Error starting replica server: %v\n", err)
			os.Exit(1)
		}
	}
}

replica.go:

func (server *Replica) ConnectToMaster() error {
	masterConn, reader, err := server.sendHandshake()
	if err != nil {
		fmt.Printf("Error sending handshake to master: %v\n", err)
		return err
	}
	go server.processReplicationCommands(masterConn, reader)

	return nil
}

func (server *Replica) sendHandshake() (net.Conn, *bufio.Reader, error) {
	// HANDSHAKE:
	// PART 1: send `PING to master`
	// PART 2a: send `REPLCONF listening-port <PORT>`
	// PART 2b: send `REPLCONF capa psync2`
	// PART 3: send `PSYNC <replicationID> <masterOffset>`
	// PART 4: accept RDB file

	// PART 1
	parts := strings.Split(server.cfg.MasterHostAndPort, " ")
	if len(parts) != 2 {
		os.Exit(0)
	}
	masterHost, masterPort := parts[0], parts[1]
	masterAddr := masterHost + ":" + masterPort
	masterConn, err := net.Dial("tcp", masterAddr)
	reader := bufio.NewReader(masterConn)
	// connection.SetReadDeadline(time.Now().Add(10 * time.Second))
	if err != nil {
		return nil, nil, fmt.Errorf("tried to connect to master node on %s", masterAddr)
	}
	_, err = masterConn.Write([]byte(protocol.FormatRESPArray([]string{"PING"})))
	if err != nil {
		return nil, nil, fmt.Errorf("couldn't send PING to master node")
	}

	// read response: should get PONG
	pongLine, err := reader.ReadString('\n')
	if errors.Is(err, io.EOF) {
		return nil, nil, err
	}
	pongLine = strings.TrimSpace(pongLine)
	if pongLine != "+PONG" {
		return nil, nil, fmt.Errorf("unexpected response, at PART1: %s", pongLine)
	}

	// PART 2a
	replConfCmds1 := []string{"REPLCONF", "listening-port", fmt.Sprint(server.cfg.Port)}
	_, err = masterConn.Write([]byte(protocol.FormatRESPArray(replConfCmds1)))
	if err != nil {
		return nil, nil, err
	}
	okLine, err := reader.ReadString('\n')
	if errors.Is(err, io.EOF) {
		return nil, nil, err
	}
	okLine = strings.TrimSpace(okLine)
	if okLine != "+OK" {
		return nil, nil, fmt.Errorf("unexpected response, at PART2a: %s", okLine)
	}

	// PART 2b
	replConfCmds2 := []string{"REPLCONF", "capa", "psync2"}
	_, err = masterConn.Write([]byte(protocol.FormatRESPArray(replConfCmds2)))
	if err != nil {
		return nil, nil, err
	}

	okLine, err = reader.ReadString('\n')
	if errors.Is(err, io.EOF) {
		return nil, nil, err
	}
	okLine = strings.TrimSpace(okLine)
	if okLine != "+OK" {
		return nil, nil, fmt.Errorf("unexpected response, at PART2b: %s", okLine)
	}

	// PART 3
	psyncCmds := []string{"PSYNC"}
	if server.cfg.MasterReplID == "" {
		// first time connecting to master
		psyncCmds = append(psyncCmds, "?")
	} else {
		psyncCmds = append(psyncCmds, fmt.Sprint(server.cfg.MasterReplID))
	}
	psyncCmds = append(psyncCmds, fmt.Sprint(server.cfg.MasterReplOffset))
	_, err = masterConn.Write([]byte(protocol.FormatRESPArray(psyncCmds)))
	if err != nil {
		return nil, nil, err
	}

	fullSyncLine, err := reader.ReadString('\n')
	if err != nil {
		return nil, nil, fmt.Errorf("failed to read FULLRESYNC: %v", err)
	}
	fullSyncLine = strings.TrimSpace(fullSyncLine)
	if !strings.HasPrefix(fullSyncLine, "+FULLRESYNC") {
		return nil, nil, fmt.Errorf("expected FULLRESYNC, got: %s", fullSyncLine)
	}
	parts = strings.Split(fullSyncLine, " ")
	if len(parts) != 3 {
		return nil, nil, fmt.Errorf("invalid FULLRESYNC format: %s", fullSyncLine)
	}
	server.cfg.MasterReplID = parts[1] // Store the replication ID

	// PART 4
	rdbSizeLine, err := reader.ReadString('\n')
	if err != nil {
		return nil, nil, fmt.Errorf("failed to read RDB size: %v", err)
	}
	rdbSizeLine = strings.TrimSpace(rdbSizeLine)
	if !strings.HasPrefix(rdbSizeLine, "$") {
		return nil, nil, fmt.Errorf("expected RDB size marker ($), got: %s", rdbSizeLine)
	}
	rdbSize, err := strconv.Atoi(rdbSizeLine[1:])
	if err != nil {
		return nil, nil, fmt.Errorf("invalid RDB size: %s", rdbSizeLine[1:])
	}
	rdbData := make([]byte, rdbSize)
	_, err = io.ReadFull(reader, rdbData)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to read RDB data: %v", err)
	}

	return masterConn, reader, nil
}

func (server *Replica) processReplicationCommands(masterConn net.Conn, reader *bufio.Reader) {
	defer masterConn.Close()
	server.cfg.MasterReplOffset = 0

	fmt.Printf("Replica: Starting to process replication commands\n")
	for {
		// fmt.Printf("Replica: Waiting for next command from master...\n")
		cmd, bytesProcessed, err := protocol.ReadRedisCommand(reader)
		if err != nil {
			fmt.Printf("[replication conn] ReadRedisCommand err: %v\n", err)
			return
		}

		if len(cmd) == 0 {
			continue
		}
		fmt.Printf("[replication conn] received cmd: %v\n", cmd)

		switch strings.ToLower(cmd[0]) {
		case "ping":
			// master alive
		case "set":
			if len(cmd) == 3 {
				_ = server.kvStore.Set(cmd[1], cmd[2], -1)
			} else if len(cmd) == 4 {
				expireTimeMs, err := strconv.Atoi(cmd[3])
				if err != nil {
					fmt.Printf("expire time is not a number")
				}
				_ = server.kvStore.Set(cmd[1], cmd[2], expireTimeMs)
			}
		case "replconf":
			if len(cmd) >= 3 && strings.ToLower(cmd[1]) == "getack" {
				ackCmd := []string{"REPLCONF", "ACK", fmt.Sprintf("%d", server.cfg.MasterReplOffset)}
				respData := protocol.FormatRESPArray(ackCmd)
				_, err := masterConn.Write([]byte(respData))
				if err != nil {
					fmt.Printf("Replica: Error sending ACK: %v\n", err)
				}
				fmt.Printf("[replication conn] sent: %q\n", []byte(respData))
			}
		default:
			fmt.Printf("Replica got unknown replication command: %v\n", cmd)
		}
		server.cfg.MasterReplOffset += bytesProcessed
	}
}

resp.go:

func ReadRedisCommand(reader *bufio.Reader) ([]string, int, error) {
	bytesProcessed := 0
	// Read the first line which should be the array marker
	line, err := reader.ReadString('\n')
	if err != nil {
		return nil, 0, err
	}
	bytesProcessed += len(line)
	line = strings.TrimSpace(line)

	// Check if this is an array
	if !strings.HasPrefix(line, "*") {
		return nil, 0, fmt.Errorf("expected array, got: %s", line)
	}

	// Parse number of elements in the array
	count, err := strconv.Atoi(line[1:])
	if err != nil {
		return nil, 0, fmt.Errorf("invalid array length: %s", line[1:])
	}

	// Read each element in the command
	cmd := make([]string, count)
	for i := 0; i < count; i++ {
		// Read bulk string marker
		line, err := reader.ReadString('\n')
		if err != nil {
			return nil, 0, err
		}
		bytesProcessed += len(line)
		line = strings.TrimSpace(line)

		if !strings.HasPrefix(line, "$") {
			return nil, 0, fmt.Errorf("expected bulk string, got: %s", line)
		}

		// Parse string length
		strLen, err := strconv.Atoi(line[1:])
		if err != nil {
			return nil, 0, fmt.Errorf("invalid string length: %s", line[1:])
		}

		// Read exactly strLen bytes
		value := make([]byte, strLen)
		n, err := io.ReadFull(reader, value)
		if err != nil {
			return nil, 0, err
		}
		bytesProcessed += n

		// Read the trailing \r\n
		trainlingLine, err := reader.ReadString('\n')
		if err != nil {
			return nil, 0, err
		}
		bytesProcessed += len(trainlingLine)

		cmd[i] = string(value)
	}

	return cmd, bytesProcessed, nil
}

For context, previously I had an issue with reading from the connection in here and issue was that I was using different readers, creating multiple readers from connection, leading to old reader storing new commands. But I fixed it by creating only single reader and it fixed the issue. But I am again seeing the flaky behaviour.

Here is my source code: redis-go/app at redis-streams · zulfkhar00/redis-go · GitHub

@andy1li, I know it’s Friday and sorry for that, but do u mind looking into this when you have time? appreciate :slightly_smiling_face:

alright, pardon, the issue was the command reading approach on main thread (connection from clients); I was using static buffer reading instead of dynamic buffer reading, now it is fixed

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.