Hello, I am stuck on Stage 25, WAIT with multiple commands. I was able to pass the first test for the WAIT command, but the problem that I am having occurs when the tester switches from the client to the replicas. For instance, after the first WAIT test is passed, the tester switches to the client and sends a “SET” command, which my server receives and sends a “+OK\r\n” response to. However, when the tester switches to replica-1, the replica-1 also receives the same “+OK\r\n” response, even though it expects to receive the propagated SET command. What confuses me is that I had no problems with this in the first WAIT test (I was able to propagate the command to the replicas), but in the second WAIT test I am receiving this issue. I suspect my issue may deal with the possibility that my server is sharing the same connection with the client and the replicas, so I tried tagging each connection with a type to signify the difference (connection.client vs connection.replica), but I still face the same issue. I also tried to store the repl1 socket connection in a variable (repl1Connect) and using an asynchrounous function to wait for all the data to be loaded. Here is my code:
const server = net.createServer((connection) => {
// Handle connection
connection.type = 'client';
connection.on('data', async (data) => {
const command = await readData(data);
let commands = command.slice(3).split('\r\n');
commands.pop();
console.log("Commands", commands);
if (commands.includes("INFO")){
if (isSlave != -1){
connection.write(getBulkString("role:slave"));
}
connection.write(getBulkString("role:master\nmaster_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\nmaster_repl_offset:0"));
} else if (commands.includes("REPLCONF")){
connection.write("+OK\r\n");
} else if (commands.includes("ECHO")){
index = commands.indexOf("ECHO");
connection.write(commands.slice(index+1).join("\r\n"));
} else if (commands.includes("PING")){
connection.write("+PONG\r\n");
} else if (commands.includes("SET")){
index = commands.indexOf("SET");
dictionary[commands[index + 2]] = dictionary[commands[index + 4]];
if (commands.includes("px")){
let px = commands.indexOf("px");
setTimeout(() => {
delete dictionary[commands[index + 2]];
}, parseInt(commands[px + 2])
)
}
if (commands.includes("baz")){
repl1Connect.write("*3\r\n" + getBulkString("REPLCONF") + getBulkString("GETACK") + getBulkString("*"));
connection.write("+OK\r\n");
repl1Connect.write("*3\r\n" + getBulkString("REPLCONF") + getBulkString("GETACK") + getBulkString("*"));
connection.write("*3\r\n" + getBulkString("REPLCONF") + getBulkString("GETACK") + getBulkString("*"));
} else {
connection.write("+OK\r\n");
}
propagateToReplicas(command);
}else if (commands.includes("GET")){
index = commands.indexOf("GET");
if (!(commands[index + 2] in dictionary) && !(commands[index + 2] in replicaDict)) {
connection.write(getBulkString(null));
} else if (commands[index + 2] in replicaDict) {
connection.write(getBulkString(replicaDict[commands[index + 2]]));
} else{
connection.write(getBulkString(dictionary[commands[index + 2]]));
}
} else if (commands.includes("WAIT")){
index = commands.indexOf("WAIT");
noOfReps = parseInt(commands[index + 2])
time = parseInt(commands[index+4]);
waitCommand(noOfReps, time, connection);
}
if (commands.includes("PSYNC")){
connection.write("+FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0\r\n");
const hex = "524544495330303131fa0972656469732d76657205372e322e30fa0a72656469732d62697473c040fa056374696d65c26d08bc65fa08757365642d6d656dc2b0c41000fa08616f662d62617365c000fff06e3bfec0ff5aa2";
const buffer = Buffer.from(hex, 'hex');
// Calculate the length of the file in bytes
const bytes = buffer.length;
// Create the RDB file header with the length of the file
let rdbFileHeader = `$${bytes}\r\n`;
// Combine the header and the buffer into a single buffer
const rdbFileBuffer = Buffer.concat([Buffer.from(rdbFileHeader, 'ascii'), buffer]);
connection.write(rdbFileBuffer);
connection.type = 'replica'; // Set type as replica
replicas.push(connection);
if (replicas.length == 1){
repl1Connect = connection;
}
numOfReplicas += 1;
}
})
function waitCommand(howMany, time, connection){
numOfAcks = 0;
console.log("Propagated Commands", propagatedCommands);
if (propagatedCommands > 0){
propagateToReplicas("*3\r\n" + getBulkString("REPLCONF") + getBulkString("GETACK") + getBulkString("*"));
setTimeout(() => {
connection.write(`:${numOfAcks > howMany ? howMany : numOfAcks}\r\n`);
}, time);
}
}
const propagateToReplicas = (command) => {
if (replicas.length == 0){
return
}
replicas.forEach((replica) => {
console.log("Command to be Propagated", command);
replica.write(command);
replica.on("data", (data) => {
const commands = data.toString().split('\r\n');
if (commands.includes("ACK")){
numOfAcks += 1;
}
})
})
propagatedCommands += 1;
}
Here are the logs that I am receiving. Notice that I did not have this issue in the first WAIT test:







