#YG4 and #XV6 Replica cannot read the message sent by master

I’m stuck on Stage #YG4. This is linked to #XV6 because I cannot get passed REPLCONF GETACK * command during replication.

I’ve tried to locally replicate this issue but it doesn’t come out.

Here are my logs:


[tester::#YD3] Running tests for Stage #YD3 (Replication - ACKs with commands)
[tester::#YD3] Master is running on port 6379
[tester::#YD3] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
[tester::#YD3] [handshake] master: Waiting for replica to initiate handshake with "PING" command
[tester::#YD3] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
[tester::#YD3] [handshake] master: Received RESP array: ["PING"]
[tester::#YD3] [handshake] Received ["PING"]
[tester::#YD3] [handshake] master: Sent "PONG"
[tester::#YD3] [handshake] master: Sent bytes: "+PONG\r\n"
[tester::#YD3] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
[tester::#YD3] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
[tester::#YD3] [handshake] master: Received RESP array: ["REPLCONF", "listening-port", "6380"]
[tester::#YD3] [handshake] Received ["REPLCONF", "listening-port", "6380"]
[tester::#YD3] [handshake] master: Sent "OK"
[tester::#YD3] [handshake] master: Sent bytes: "+OK\r\n"
[tester::#YD3] [handshake] master: Waiting for replica to send "REPLCONF capa" command
[tester::#YD3] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
[tester::#YD3] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
[tester::#YD3] [handshake] Received ["REPLCONF", "capa", "psync2"]
[your_program] Replica sent: [REPLCONF listening-port 6380]
[tester::#YD3] [handshake] master: Sent "OK"
[tester::#YD3] [handshake] master: Sent bytes: "+OK\r\n"
[your_program] Replica sent: [PSYNC ? -1]
[tester::#YD3] [handshake] master: Waiting for replica to send "PSYNC" command
[tester::#YD3] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
[tester::#YD3] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
[tester::#YD3] [handshake] Received ["PSYNC", "?", "-1"]
[tester::#YD3] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
[tester::#YD3] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
[tester::#YD3] [handshake] Sending RDB file...
[tester::#YD3] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#YD3] [handshake] Sent RDB file.
[tester::#YD3] [test] master: > REPLCONF GETACK *
[tester::#YD3] [test] master: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n"
[your_program] Received RDB file: "REDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[your_program] Replica: Starting to process replication commands
[your_program] Replica: Waiting for next command from master...
[tester::#YD3] Received: "" (no content received)
[tester::#YD3]            ^ error
[tester::#YD3] Error: Expected start of a new RESP2 value (either +, -, :, $ or *)
[tester::#YD3] Test failed
[tester::#YD3] Terminating program
[your_program] [masterConnection] ReadRedisCommand err: EOF
[tester::#YD3] Program terminated successfully

And here’s a snippet of my code:

func (server *Replica) ConnectToMaster() error {
	masterConn, err := server.sendHandshake()
	if err != nil {
		fmt.Printf("Error sending handshake to master: %v\n", err)
		return err
	}
	go server.processReplicationCommands(masterConn)

	return nil
}

func (server *Replica) sendHandshake() (net.Conn, error) {
	// HANDSHAKE:
	// PART 1: send `PING to master`
	// PART 2a: send `REPLCONF listening-port <PORT>`
	// PART 2b: send `REPLCONF capa psync2`
	// PART 3: send `PSYNC <replicationID> <masterOffset>`
	// PART 4: accept RDB file

	// PART 1
	parts := strings.Split(server.cfg.MasterHostAndPort, " ")
	if len(parts) != 2 {
		os.Exit(0)
	}
	masterHost, masterPort := parts[0], parts[1]
	masterAddr := masterHost + ":" + masterPort
	masterConn, err := net.Dial("tcp", masterAddr)
	// connection.SetReadDeadline(time.Now().Add(10 * time.Second))
	if err != nil {
		return nil, fmt.Errorf("tried to connect to master node on %s", masterAddr)
	}
	_, err = masterConn.Write([]byte(protocol.FormatRESPArray([]string{"PING"})))
	if err != nil {
		return nil, fmt.Errorf("couldn't send PING to master node")
	}

	// read response: should get PONG
	reader := bufio.NewReader(masterConn)
	pongLine, err := reader.ReadString('\n')
	if errors.Is(err, io.EOF) {
		return nil, err
	}
	pongLine = strings.TrimSpace(pongLine)
	if pongLine != "+PONG" {
		return nil, fmt.Errorf("unexpected response, at PART1: %s", pongLine)
	}

	// PART 2a
	replConfCmds1 := []string{"REPLCONF", "listening-port", fmt.Sprint(server.cfg.Port)}
	fmt.Printf("Replica sent: %v\n", replConfCmds1)
	_, err = masterConn.Write([]byte(protocol.FormatRESPArray(replConfCmds1)))
	if err != nil {
		return nil, err
	}
	okLine, err := reader.ReadString('\n')
	if errors.Is(err, io.EOF) {
		return nil, err
	}
	okLine = strings.TrimSpace(okLine)
	if okLine != "+OK" {
		return nil, fmt.Errorf("unexpected response, at PART2a: %s", okLine)
	}

	// PART 2b
	replConfCmds2 := []string{"REPLCONF", "capa", "psync2"}
	_, err = masterConn.Write([]byte(protocol.FormatRESPArray(replConfCmds2)))
	if err != nil {
		return nil, err
	}

	okLine, err = reader.ReadString('\n')
	if errors.Is(err, io.EOF) {
		return nil, err
	}
	okLine = strings.TrimSpace(okLine)
	if okLine != "+OK" {
		return nil, fmt.Errorf("unexpected response, at PART2b: %s", okLine)
	}

	// PART 3
	psyncCmds := []string{"PSYNC"}
	if server.cfg.MasterReplID == "" {
		// first time connecting to master
		psyncCmds = append(psyncCmds, "?")
	} else {
		psyncCmds = append(psyncCmds, fmt.Sprint(server.cfg.MasterReplID))
	}
	psyncCmds = append(psyncCmds, fmt.Sprint(server.cfg.MasterReplOffset))
	fmt.Printf("Replica sent: %v\n", psyncCmds)
	_, err = masterConn.Write([]byte(protocol.FormatRESPArray(psyncCmds)))
	if err != nil {
		return nil, err
	}

	fullSyncLine, err := reader.ReadString('\n')
	if err != nil {
		return nil, fmt.Errorf("failed to read FULLRESYNC: %v", err)
	}
	fullSyncLine = strings.TrimSpace(fullSyncLine)
	if !strings.HasPrefix(fullSyncLine, "+FULLRESYNC") {
		return nil, fmt.Errorf("expected FULLRESYNC, got: %s", fullSyncLine)
	}
	parts = strings.Split(fullSyncLine, " ")
	if len(parts) != 3 {
		return nil, fmt.Errorf("invalid FULLRESYNC format: %s", fullSyncLine)
	}
	server.cfg.MasterReplID = parts[1] // Store the replication ID

	// PART 4
	rdbSizeLine, err := reader.ReadString('\n')
	if err != nil {
		return nil, fmt.Errorf("failed to read RDB size: %v", err)
	}
	rdbSizeLine = strings.TrimSpace(rdbSizeLine)
	if !strings.HasPrefix(rdbSizeLine, "$") {
		return nil, fmt.Errorf("expected RDB size marker ($), got: %s", rdbSizeLine)
	}
	rdbSize, err := strconv.Atoi(rdbSizeLine[1:])
	if err != nil {
		return nil, fmt.Errorf("invalid RDB size: %s", rdbSizeLine[1:])
	}
	rdbData := make([]byte, rdbSize)
	_, err = reader.Read(rdbData)
	// _, err = io.ReadFull(reader, rdbData)
	if err != nil {
		return nil, fmt.Errorf("failed to read RDB data: %v", err)
	}
	fmt.Printf("Received RDB file: %q\n", rdbData)

	return masterConn, nil
}

func (server *Replica) processReplicationCommands(masterConn net.Conn) {
	defer masterConn.Close()
	server.cfg.MasterReplOffset = 0

	reader := bufio.NewReader(masterConn)
	fmt.Printf("Replica: Starting to process replication commands\n")
	for {
		fmt.Printf("Replica: Waiting for next command from master...\n")
		cmd, err := protocol.ReadRedisCommand(reader)
		if err != nil {
			fmt.Printf("[masterConnection] ReadRedisCommand err: %v\n", err)
			return
		}

		fmt.Printf("replication received: %v\n", cmd)

		if len(cmd) == 0 {
			continue
		}

		switch strings.ToLower(cmd[0]) {
		case "ping":
			err := handlePingReplicaCommand(masterConn)
			if err != nil {
				fmt.Printf("handlePingReplicaCommand error: %v\n", err)
			}
		case "set":
			var result string
			if len(cmd) == 3 {
				result = server.kvStore.Set(cmd[1], cmd[2], -1)
			} else if len(cmd) == 4 {
				expireTimeMs, err := strconv.Atoi(cmd[3])
				if err != nil {
					fmt.Printf("expire time is not a number")
				}
				result = server.kvStore.Set(cmd[1], cmd[2], expireTimeMs)
			}
			fmt.Printf("Replica got %v, result: %s\n", cmd, result)
			server.cfg.MasterReplOffset += 1
		case "replconf":
			fmt.Printf("Replica: Processing REPLCONF command: %v\n", cmd)
			if len(cmd) >= 3 && strings.ToLower(cmd[1]) == "getack" {
				fmt.Printf("Replica: Processing GETACK subcommand\n")
				ackCmd := []string{"REPLCONF", "ACK", fmt.Sprintf("%d", server.cfg.MasterReplOffset)}
				respData := protocol.FormatRESPArray(ackCmd)
				fmt.Printf("Replica: Sending ACK response: %q\n", respData)
				n, err := masterConn.Write([]byte(respData))
				if err != nil {
					fmt.Printf("Replica: Error sending ACK: %v\n", err)
				} else {
					fmt.Printf("Replica: Successfully wrote %d bytes for ACK\n", n)
				}
			}
		default:
			fmt.Printf("Replica got unknown replication command: %v\n", cmd)
		}
	}
}

func ReadRedisCommand(reader *bufio.Reader) ([]string, error) {
	// Read the first line which should be the array marker
	line, err := reader.ReadString('\n')
	if err != nil {
		return nil, err
	}
	line = strings.TrimSpace(line)

	// Check if this is an array
	if !strings.HasPrefix(line, "*") {
		return nil, fmt.Errorf("expected array, got: %s", line)
	}

	// Parse number of elements in the array
	count, err := strconv.Atoi(line[1:])
	if err != nil {
		return nil, fmt.Errorf("invalid array length: %s", line[1:])
	}

	// Read each element in the command
	cmd := make([]string, count)
	for i := 0; i < count; i++ {
		// Read bulk string marker
		line, err := reader.ReadString('\n')
		if err != nil {
			return nil, err
		}
		line = strings.TrimSpace(line)

		if !strings.HasPrefix(line, "$") {
			return nil, fmt.Errorf("expected bulk string, got: %s", line)
		}

		// Parse string length
		strLen, err := strconv.Atoi(line[1:])
		if err != nil {
			return nil, fmt.Errorf("invalid string length: %s", line[1:])
		}

		// Read exactly strLen bytes
		value := make([]byte, strLen)
		_, err = io.ReadFull(reader, value)
		if err != nil {
			return nil, err
		}

		// Read the trailing \r\n
		_, err = reader.ReadString('\n')
		if err != nil {
			return nil, err
		}

		cmd[i] = string(value)
	}

	return cmd, nil
}

here is the link to full codebase: GitHub - zulfkhar00/redis-go

Hey @zulfkhar00, looks like you’ve got past this stage. Do you happen to remember what was wrong? Would love to see if we can improve the tester / instructions.

@andy1li Yes the test was passing sometimes and sometimes it fails. So when I submitted my code for last stage, the test was successful, but in current stage when I run test, it fails on the logic of GETACK, which is from prev stage. The issue was and is REPLCONF GETACK *. My master sends it to replica and replica doesn’t get it so master gets empty content.

@zulfkhar00 The flakiness is caused by creating two bufio.NewReader from the same masterConn:

The message REPLCONF GETACK * could get buffered by the first reader, leaving the second one with nothing to read.

@andy1li so that means if I create the reader once and just pass it down, there shouldn’t be a problem right? I will try to fix it now an see if it is the case

1 Like

@andy1li, you are right, by the time I was creating another buffered reader, the previous reader already buffered REPLCONF GETACK *
thank u a lot!

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.