I’m stuck on Stage stage 19.
I’ve tried the below code.
Here are my logs:
remote: [tester::#YG4] Running tests for Stage #YG4 (Replication - Command Processing)
remote: [tester::#YG4] Master is running on port 6379
remote: [tester::#YG4] $ ./spawn_redis_server.sh --port 6380 --replicaof "localhost 6379"
remote: [your_program] Logs from your program will appear here!
remote: [tester::#YG4] master: Waiting for replica to initiate handshake with "PING" command
remote: [tester::#YG4] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
remote: [tester::#YG4] master: Received RESP value: ["PING"]
remote: [tester::#YG4] Received ["PING"]
remote: [tester::#YG4] master: Sent "PONG"
remote: [tester::#YG4] master: Sent bytes: "+PONG\r\n"
remote: [tester::#YG4] master: Waiting for replica to send "REPLCONF listening-port 6380" command
remote: [your_program] +PONG
remote: [your_program]
remote: [tester::#YG4] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
remote: [tester::#YG4] master: Received RESP value: ["REPLCONF", "listening-port", "6380"]
remote: [tester::#YG4] Received ["REPLCONF", "listening-port", "6380"]
remote: [tester::#YG4] master: Sent "OK"
remote: [tester::#YG4] master: Sent bytes: "+OK\r\n"
remote: [tester::#YG4] master: Waiting for replica to send "REPLCONF capa" command
remote: [your_program] +OK
remote: [your_program]
remote: [tester::#YG4] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
remote: [tester::#YG4] master: Received RESP value: ["REPLCONF", "capa", "psync2"]
remote: [tester::#YG4] Received ["REPLCONF", "capa", "psync2"]
remote: [tester::#YG4] master: Sent "OK"
remote: [tester::#YG4] master: Sent bytes: "+OK\r\n"
remote: [tester::#YG4] master: Waiting for replica to send "PSYNC" command
remote: [your_program] +OK
remote: [your_program]
remote: [tester::#YG4] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
remote: [tester::#YG4] master: Received RESP value: ["PSYNC", "?", "-1"]
remote: [tester::#YG4] Received ["PSYNC", "?", "-1"]
remote: [tester::#YG4] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
remote: [tester::#YG4] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
remote: [tester::#YG4] Sending RDB file...
remote: [tester::#YG4] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
remote: [tester::#YG4] Sent RDB file.
remote: [your_program] +FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0
remote: [your_program]
remote: [tester::#YG4] master: $ redis-cli SET foo 123
remote: [tester::#YG4] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n"
remote: [tester::#YG4] master: $ redis-cli SET bar 456
remote: [tester::#YG4] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n"
remote: [tester::#YG4] master: $ redis-cli SET baz 789
remote: [tester::#YG4] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n"
remote: [tester::#YG4] Getting key foo
remote: [tester::#YG4] client: $ redis-cli GET foo
remote: [tester::#YG4] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
remote: [your_program] REPLICA --> {}
remote: [tester::#YG4] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] client: Received RESP value: NIL
remote: [tester::#YG4] Retrying... (1/5 attempts)
remote: [tester::#YG4] client: $ redis-cli GET foo
remote: [tester::#YG4] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
remote: [your_program] REPLICA --> {}
remote: [tester::#YG4] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] client: Received RESP value: NIL
remote: [tester::#YG4] Retrying... (2/5 attempts)
remote: [tester::#YG4] client: $ redis-cli GET foo
remote: [tester::#YG4] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
remote: [your_program] REPLICA --> {}
remote: [tester::#YG4] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] client: Received RESP value: NIL
remote: [tester::#YG4] Retrying... (3/5 attempts)
remote: [tester::#YG4] client: $ redis-cli GET foo
remote: [tester::#YG4] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
remote: [tester::#YG4] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] client: Received RESP value: NIL
remote: [your_program] REPLICA --> {}
remote: [tester::#YG4] Retrying... (4/5 attempts)
remote: [tester::#YG4] client: $ redis-cli GET foo
remote: [tester::#YG4] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
remote: [tester::#YG4] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] client: Received RESP value: NIL
remote: [your_program] REPLICA --> {}
remote: [tester::#YG4] Retrying... (5/5 attempts)
remote: [tester::#YG4] client: $ redis-cli GET foo
remote: [tester::#YG4] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
remote: [your_program] REPLICA --> {}
remote: [tester::#YG4] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] client: Received RESP value: NIL
remote: [tester::#YG4] Expected simple string or bulk string, got NIL
remote: [tester::#YG4] Test failed
remote: [tester::#YG4] Terminating program
remote: [tester::#YG4] Program terminated successfully
And here’s a snippet of my code:
public void handleRequest(Socket clientSocket) {
try {
while (clientSocket.isConnected()) {
byte[] input = new byte[1024];
int bytesRead = clientSocket.getInputStream().read(input);
String rawRequest = new String(input, 0, bytesRead);
String request = rawRequest.trim();
String[] parts = request.split("\r\n");
if (parts.length >= 2) {
if (parts[2].equalsIgnoreCase("SET")) {
String key = parts[4];
String value = parts[6];
long timeout = parts.length == 11 ? Long.parseLong(parts[10]) : 0;
Data data = new Data(value, timeout);
storage.put(key, data);
if (Role.REPLICA.name().equals(role.name())) {
System.out.println(role + " --> " + storage);
}
if (Role.MASTER.name().equals(role.name())) {
String response = "OK";
clientSocket.getOutputStream().write(("$" + response.length() + "\r\n" + response + "\r\n")
.getBytes());
sendToReplicas(rawRequest);
}
} else if (parts[2].equalsIgnoreCase("GET")) {
Object rawData = storage.get(parts[4]);
String value = null;
if (Objects.nonNull(rawData)) {
Data data = (Data) rawData;
if (data.expiry > 0 && data.expiry < System.currentTimeMillis()) {
storage.remove(parts[4]);
} else {
value = (String) data.value;
}
}
if (Role.REPLICA.name().equals(role.name())) {
System.out.println(role + " --> " + storage);
}
if (Objects.nonNull(value)) {
clientSocket.getOutputStream().write(
("$" + value.length() + "\r\n" + value + "\r\n").getBytes());
} else {
clientSocket.getOutputStream().write(
("$-1\r\n").getBytes());
}
} else if (parts[2].equalsIgnoreCase("INFO")) {
StringBuilder builder = new StringBuilder();
infoMap.forEach((key, value) -> builder.append(key).append(":").append(value));
String value = builder.toString();
clientSocket.getOutputStream().write(
("$" + value.length() + "\r\n" + value + "\r\n").getBytes());
} else if (parts[2].equalsIgnoreCase("REPLCONF") && Role.MASTER.name().equals(role.name())) {
String data = "OK";
clientSocket.getOutputStream().write(
("$" + data.length() + "\r\n" + data + "\r\n").getBytes());
} else if (parts[2].equalsIgnoreCase("PSYNC") && Role.MASTER.name().equals(role.name())) {
String replID = REPL_ID + UUID.randomUUID().toString().substring(25);
String data = String.format("+FULLRESYNC %s %d%s", replID, OFFSET, "\r\n");
clientSocket.getOutputStream().write(data.getBytes());
clientSocket.getOutputStream().flush();
String fileContents = "UkVESVMwMDEx+glyZWRpcy12ZXIFNy4yLjD6CnJlZGlzLWJpdHPAQPoFY3RpbWXCbQi8ZfoIdXNlZC1tZW3CsMQQAPoIYW9mLWJhc2XAAP/wbjv+wP9aog==";
byte[] bytes = Base64.getDecoder().decode(fileContents);
clientSocket.getOutputStream().write(("$" + bytes.length + "\r\n").getBytes());
clientSocket.getOutputStream().flush();
clientSocket.getOutputStream().write(bytes);
clientSocket.getOutputStream().flush();
replicaMap.put(replID, clientSocket);
} else if (parts[2].equalsIgnoreCase("ECHO")) {
String data = parts[4];
clientSocket.getOutputStream().write(
("$" + data.length() + "\r\n" + data + "\r\n").getBytes());
} else if (parts[2].equalsIgnoreCase("PING")) {
System.out.println("Sending Pong....");
clientSocket.getOutputStream().write("+PONG\r\n".getBytes());
} else {
clientSocket.getOutputStream().write(
"-ERR invalid request\r\n".getBytes());
}
} else {
clientSocket.getOutputStream().write(
"-ERR invalid request\r\n".getBytes());
}
}
} catch (IOException e) {
System.err.println("Error handling client: " + e.getMessage());
}
}
private void sendToReplicas(String request) {
replicaMap.forEach((replicaId, socket) -> {
CompletableFuture.runAsync(() -> {
try {
System.out.println("Sending to replicas");
socket.getOutputStream().write(request.getBytes(StandardCharsets.UTF_8));
socket.getOutputStream().flush();
String response = ReplicaRequestHandler.getResponse(socket);
System.out.println(response);
} catch (IOException e) {
e.printStackTrace();
}
});
});
}