I’m stuck on Stage #YG4. This is linked to #XV6 because I cannot get passed REPLCONF GETACK * command during replication.
I’ve tried to locally replicate this issue but it doesn’t come out.
Here are my logs:
[tester::#YD3] Running tests for Stage #YD3 (Replication - ACKs with commands)
[tester::#YD3] Master is running on port 6379
[tester::#YD3] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
[tester::#YD3] [handshake] master: Waiting for replica to initiate handshake with "PING" command
[tester::#YD3] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
[tester::#YD3] [handshake] master: Received RESP array: ["PING"]
[tester::#YD3] [handshake] Received ["PING"]
[tester::#YD3] [handshake] master: Sent "PONG"
[tester::#YD3] [handshake] master: Sent bytes: "+PONG\r\n"
[tester::#YD3] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
[tester::#YD3] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
[tester::#YD3] [handshake] master: Received RESP array: ["REPLCONF", "listening-port", "6380"]
[tester::#YD3] [handshake] Received ["REPLCONF", "listening-port", "6380"]
[tester::#YD3] [handshake] master: Sent "OK"
[tester::#YD3] [handshake] master: Sent bytes: "+OK\r\n"
[tester::#YD3] [handshake] master: Waiting for replica to send "REPLCONF capa" command
[tester::#YD3] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
[tester::#YD3] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
[tester::#YD3] [handshake] Received ["REPLCONF", "capa", "psync2"]
[your_program] Replica sent: [REPLCONF listening-port 6380]
[tester::#YD3] [handshake] master: Sent "OK"
[tester::#YD3] [handshake] master: Sent bytes: "+OK\r\n"
[your_program] Replica sent: [PSYNC ? -1]
[tester::#YD3] [handshake] master: Waiting for replica to send "PSYNC" command
[tester::#YD3] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
[tester::#YD3] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
[tester::#YD3] [handshake] Received ["PSYNC", "?", "-1"]
[tester::#YD3] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
[tester::#YD3] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
[tester::#YD3] [handshake] Sending RDB file...
[tester::#YD3] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[tester::#YD3] [handshake] Sent RDB file.
[tester::#YD3] [test] master: > REPLCONF GETACK *
[tester::#YD3] [test] master: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n"
[your_program] Received RDB file: "REDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
[your_program] Replica: Starting to process replication commands
[your_program] Replica: Waiting for next command from master...
[tester::#YD3] Received: "" (no content received)
[tester::#YD3] ^ error
[tester::#YD3] Error: Expected start of a new RESP2 value (either +, -, :, $ or *)
[tester::#YD3] Test failed
[tester::#YD3] Terminating program
[your_program] [masterConnection] ReadRedisCommand err: EOF
[tester::#YD3] Program terminated successfully
And here’s a snippet of my code:
func (server *Replica) ConnectToMaster() error {
masterConn, err := server.sendHandshake()
if err != nil {
fmt.Printf("Error sending handshake to master: %v\n", err)
return err
}
go server.processReplicationCommands(masterConn)
return nil
}
func (server *Replica) sendHandshake() (net.Conn, error) {
// HANDSHAKE:
// PART 1: send `PING to master`
// PART 2a: send `REPLCONF listening-port <PORT>`
// PART 2b: send `REPLCONF capa psync2`
// PART 3: send `PSYNC <replicationID> <masterOffset>`
// PART 4: accept RDB file
// PART 1
parts := strings.Split(server.cfg.MasterHostAndPort, " ")
if len(parts) != 2 {
os.Exit(0)
}
masterHost, masterPort := parts[0], parts[1]
masterAddr := masterHost + ":" + masterPort
masterConn, err := net.Dial("tcp", masterAddr)
// connection.SetReadDeadline(time.Now().Add(10 * time.Second))
if err != nil {
return nil, fmt.Errorf("tried to connect to master node on %s", masterAddr)
}
_, err = masterConn.Write([]byte(protocol.FormatRESPArray([]string{"PING"})))
if err != nil {
return nil, fmt.Errorf("couldn't send PING to master node")
}
// read response: should get PONG
reader := bufio.NewReader(masterConn)
pongLine, err := reader.ReadString('\n')
if errors.Is(err, io.EOF) {
return nil, err
}
pongLine = strings.TrimSpace(pongLine)
if pongLine != "+PONG" {
return nil, fmt.Errorf("unexpected response, at PART1: %s", pongLine)
}
// PART 2a
replConfCmds1 := []string{"REPLCONF", "listening-port", fmt.Sprint(server.cfg.Port)}
fmt.Printf("Replica sent: %v\n", replConfCmds1)
_, err = masterConn.Write([]byte(protocol.FormatRESPArray(replConfCmds1)))
if err != nil {
return nil, err
}
okLine, err := reader.ReadString('\n')
if errors.Is(err, io.EOF) {
return nil, err
}
okLine = strings.TrimSpace(okLine)
if okLine != "+OK" {
return nil, fmt.Errorf("unexpected response, at PART2a: %s", okLine)
}
// PART 2b
replConfCmds2 := []string{"REPLCONF", "capa", "psync2"}
_, err = masterConn.Write([]byte(protocol.FormatRESPArray(replConfCmds2)))
if err != nil {
return nil, err
}
okLine, err = reader.ReadString('\n')
if errors.Is(err, io.EOF) {
return nil, err
}
okLine = strings.TrimSpace(okLine)
if okLine != "+OK" {
return nil, fmt.Errorf("unexpected response, at PART2b: %s", okLine)
}
// PART 3
psyncCmds := []string{"PSYNC"}
if server.cfg.MasterReplID == "" {
// first time connecting to master
psyncCmds = append(psyncCmds, "?")
} else {
psyncCmds = append(psyncCmds, fmt.Sprint(server.cfg.MasterReplID))
}
psyncCmds = append(psyncCmds, fmt.Sprint(server.cfg.MasterReplOffset))
fmt.Printf("Replica sent: %v\n", psyncCmds)
_, err = masterConn.Write([]byte(protocol.FormatRESPArray(psyncCmds)))
if err != nil {
return nil, err
}
fullSyncLine, err := reader.ReadString('\n')
if err != nil {
return nil, fmt.Errorf("failed to read FULLRESYNC: %v", err)
}
fullSyncLine = strings.TrimSpace(fullSyncLine)
if !strings.HasPrefix(fullSyncLine, "+FULLRESYNC") {
return nil, fmt.Errorf("expected FULLRESYNC, got: %s", fullSyncLine)
}
parts = strings.Split(fullSyncLine, " ")
if len(parts) != 3 {
return nil, fmt.Errorf("invalid FULLRESYNC format: %s", fullSyncLine)
}
server.cfg.MasterReplID = parts[1] // Store the replication ID
// PART 4
rdbSizeLine, err := reader.ReadString('\n')
if err != nil {
return nil, fmt.Errorf("failed to read RDB size: %v", err)
}
rdbSizeLine = strings.TrimSpace(rdbSizeLine)
if !strings.HasPrefix(rdbSizeLine, "$") {
return nil, fmt.Errorf("expected RDB size marker ($), got: %s", rdbSizeLine)
}
rdbSize, err := strconv.Atoi(rdbSizeLine[1:])
if err != nil {
return nil, fmt.Errorf("invalid RDB size: %s", rdbSizeLine[1:])
}
rdbData := make([]byte, rdbSize)
_, err = reader.Read(rdbData)
// _, err = io.ReadFull(reader, rdbData)
if err != nil {
return nil, fmt.Errorf("failed to read RDB data: %v", err)
}
fmt.Printf("Received RDB file: %q\n", rdbData)
return masterConn, nil
}
func (server *Replica) processReplicationCommands(masterConn net.Conn) {
defer masterConn.Close()
server.cfg.MasterReplOffset = 0
reader := bufio.NewReader(masterConn)
fmt.Printf("Replica: Starting to process replication commands\n")
for {
fmt.Printf("Replica: Waiting for next command from master...\n")
cmd, err := protocol.ReadRedisCommand(reader)
if err != nil {
fmt.Printf("[masterConnection] ReadRedisCommand err: %v\n", err)
return
}
fmt.Printf("replication received: %v\n", cmd)
if len(cmd) == 0 {
continue
}
switch strings.ToLower(cmd[0]) {
case "ping":
err := handlePingReplicaCommand(masterConn)
if err != nil {
fmt.Printf("handlePingReplicaCommand error: %v\n", err)
}
case "set":
var result string
if len(cmd) == 3 {
result = server.kvStore.Set(cmd[1], cmd[2], -1)
} else if len(cmd) == 4 {
expireTimeMs, err := strconv.Atoi(cmd[3])
if err != nil {
fmt.Printf("expire time is not a number")
}
result = server.kvStore.Set(cmd[1], cmd[2], expireTimeMs)
}
fmt.Printf("Replica got %v, result: %s\n", cmd, result)
server.cfg.MasterReplOffset += 1
case "replconf":
fmt.Printf("Replica: Processing REPLCONF command: %v\n", cmd)
if len(cmd) >= 3 && strings.ToLower(cmd[1]) == "getack" {
fmt.Printf("Replica: Processing GETACK subcommand\n")
ackCmd := []string{"REPLCONF", "ACK", fmt.Sprintf("%d", server.cfg.MasterReplOffset)}
respData := protocol.FormatRESPArray(ackCmd)
fmt.Printf("Replica: Sending ACK response: %q\n", respData)
n, err := masterConn.Write([]byte(respData))
if err != nil {
fmt.Printf("Replica: Error sending ACK: %v\n", err)
} else {
fmt.Printf("Replica: Successfully wrote %d bytes for ACK\n", n)
}
}
default:
fmt.Printf("Replica got unknown replication command: %v\n", cmd)
}
}
}
func ReadRedisCommand(reader *bufio.Reader) ([]string, error) {
// Read the first line which should be the array marker
line, err := reader.ReadString('\n')
if err != nil {
return nil, err
}
line = strings.TrimSpace(line)
// Check if this is an array
if !strings.HasPrefix(line, "*") {
return nil, fmt.Errorf("expected array, got: %s", line)
}
// Parse number of elements in the array
count, err := strconv.Atoi(line[1:])
if err != nil {
return nil, fmt.Errorf("invalid array length: %s", line[1:])
}
// Read each element in the command
cmd := make([]string, count)
for i := 0; i < count; i++ {
// Read bulk string marker
line, err := reader.ReadString('\n')
if err != nil {
return nil, err
}
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "$") {
return nil, fmt.Errorf("expected bulk string, got: %s", line)
}
// Parse string length
strLen, err := strconv.Atoi(line[1:])
if err != nil {
return nil, fmt.Errorf("invalid string length: %s", line[1:])
}
// Read exactly strLen bytes
value := make([]byte, strLen)
_, err = io.ReadFull(reader, value)
if err != nil {
return nil, err
}
// Read the trailing \r\n
_, err = reader.ReadString('\n')
if err != nil {
return nil, err
}
cmd[i] = string(value)
}
return cmd, nil
}
here is the link to full codebase: GitHub - zulfkhar00/redis-go