I’m stuck on Stage #(change to your stage, ex. #JM3).
I’ve tried .. (mention what you’ve tried so far).
So when master receive the “SET” conditon , i send ok to client , then calculate the bytes it contained and store it in masterOffset and then sending it to other replica .(also storing those bytes in replicaOffset ).
then in the wait condition , i send “replconf getack *” to all other replicas , then reading their inputstream , if any one is conatining “Replconf ack any_number” then making that replica ack = true. then checking if that replica ack is true && that replica offset is greater than master offset.–> then calculating that replica.
Here are my logs:
include relevant logs here (please make sure to keep the backticks around this!)
And here’s a snippet of my code:
(set condition handled by master ) → private static void handleSetCommand(List command, int bytesConsumed, OutputStream outputStream) throws IOException {
String key = command.get(1);
String value = command.get(2);
long expiryTime = Long.MAX_VALUE;
if (command.size() >= 5 && command.get(3).equalsIgnoreCase("PX")) {// for expiry time only
try {
long pxMillis = Long.parseLong(command.get(4));
expiryTime = System.currentTimeMillis() + pxMillis;
} catch (NumberFormatException e) {
outputStream.write("-ERR invalid PX value\r\n".getBytes());
return;
}
}
master_offset += bytesConsumed;
Main.map.put(key, new Main.ValueWithExpiry(value, expiryTime));
outputStream.write("+OK\r\n".getBytes());
outputStream.flush();
for (ReplicaConnection replica : Main.replicaConnections) {
replica.getOutputStream().write(buildRespArray("SET", key, value).getBytes());
replica.getOutputStream().flush();
replica.setOffset(bytesConsumed);
}
}
(wait conditon handled by master ) → switch (cmd) {
case "WAIT":
long currentMasterOffset = master_offset;
int required_replica=Integer.parseInt(command.get(1));
int timeout = Integer.parseInt(command.get(2));
System.out.println("entered wait conditon");
System.out.println(master_offset);
int replicasAcked = ReplicaAckWaiter.waitForAcks(required_replica,timeout, currentMasterOffset);// required , timeout, masteroffset
String resp1 = ":" + replicasAcked + "\r\n";
outputStream.write(resp1.getBytes("UTF-8"));
outputStream.flush();
break;
(ReplicaAckWaiter.waitForAcks)–>public class ReplicaAckWaiter {
// Example global list
public static List<ReplicaConnection> replica_copiedList = new ArrayList<>();
// Call this when processing WAIT <numReplicas> <timeoutMs>
public static int waitForAcks(int requiredAcks, int timeoutMs, long masterOffset) throws IOException {
long deadline = System.currentTimeMillis() + timeoutMs;
replica_copiedList.clear(); // optional: clear old data
replica_copiedList.addAll(Main.replicaConnections);
System.out.println(replica_copiedList);
// Step 1: Send REPLCONF GETACK * to all replicas
for (ReplicaConnection replica : replica_copiedList) {
try {
Writer writer = new OutputStreamWriter(replica.getOutputStream(), "UTF-8");
writer.write("*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n");
writer.flush();
} catch (Exception e) {
// Ignore write failure
}
}
List<ReplicaConnection> acked = new ArrayList<>();
while (System.currentTimeMillis() < deadline) {
for (ReplicaConnection replica : replica_copiedList) {
if (acked.contains(replica)) continue;
System.out.println("inside");
InputStream in= replica.getInputStream();
Main.ParseResult result = RESPParser.parseRESP(in);
List<String> command = result.command;
String cmd = command.get(0).toUpperCase();
if(cmd.equalsIgnoreCase("REPLCONF") && command.get(1).equalsIgnoreCase("ACK")){
replica.setack(true);
}
if (replica.getOffset() >= masterOffset && replica.getack()) {
acked.add(replica);
}
}
if (acked.size() >= requiredAcks) break;
}
// try {
// Thread.sleep(10); // allow time for ACKs to arrive
// } catch (InterruptedException ignored) {}
// }
// for (ReplicaConnection replica : replica_copiedList) {
// try {
// System.out.println(replica);
// System.out.println(replica.getOffset());
// } catch (Exception e) {
// // Ignore write failure
// }
// }
return acked.size();
}
}
include relevant code here (please make sure to keep the backticks around this!)
