Stuck on Stage #WAIT with multiple commands #na2

I’m stuck on Stage #(change to your stage, ex. #JM3).

I’ve tried .. (mention what you’ve tried so far).
So when master receive the “SET” conditon , i send ok to client , then calculate the bytes it contained and store it in masterOffset and then sending it to other replica .(also storing those bytes in replicaOffset ).
then in the wait condition , i send “replconf getack *” to all other replicas , then reading their inputstream , if any one is conatining “Replconf ack any_number” then making that replica ack = true. then checking if that replica ack is true && that replica offset is greater than master offset.–> then calculating that replica.

Here are my logs:

include relevant logs here (please make sure to keep the backticks around this!)

And here’s a snippet of my code:

(set condition handled by master ) → private static void handleSetCommand(List command, int bytesConsumed, OutputStream outputStream) throws IOException {

    String key = command.get(1);
    String value = command.get(2);
    long expiryTime = Long.MAX_VALUE;

    if (command.size() >= 5 && command.get(3).equalsIgnoreCase("PX")) {// for expiry time only
        try {
            long pxMillis = Long.parseLong(command.get(4));
            expiryTime = System.currentTimeMillis() + pxMillis;
        } catch (NumberFormatException e) {
            outputStream.write("-ERR invalid PX value\r\n".getBytes());
            return;
        }
    }

    master_offset += bytesConsumed;
    Main.map.put(key, new Main.ValueWithExpiry(value, expiryTime));
    outputStream.write("+OK\r\n".getBytes());
    outputStream.flush();

    for (ReplicaConnection replica : Main.replicaConnections) {
        replica.getOutputStream().write(buildRespArray("SET", key, value).getBytes());
        replica.getOutputStream().flush();
        
        replica.setOffset(bytesConsumed);
    }
}

(wait conditon handled by master ) → switch (cmd) {

                case "WAIT":      
                            
                        long currentMasterOffset =  master_offset;
                        int required_replica=Integer.parseInt(command.get(1));
                        int timeout = Integer.parseInt(command.get(2));
                        System.out.println("entered wait conditon");
                        System.out.println(master_offset);
                        int replicasAcked = ReplicaAckWaiter.waitForAcks(required_replica,timeout, currentMasterOffset);// required , timeout, masteroffset
                        String resp1 = ":" + replicasAcked + "\r\n";
                        outputStream.write(resp1.getBytes("UTF-8"));
                        outputStream.flush();

                    break;

(ReplicaAckWaiter.waitForAcks)–>public class ReplicaAckWaiter {

// Example global list
public static List<ReplicaConnection> replica_copiedList = new ArrayList<>();

// Call this when processing WAIT <numReplicas> <timeoutMs>
public static int waitForAcks(int requiredAcks, int timeoutMs, long masterOffset) throws IOException {
    long deadline = System.currentTimeMillis() + timeoutMs;

    replica_copiedList.clear();                 // optional: clear old data
    replica_copiedList.addAll(Main.replicaConnections);   
    System.out.println(replica_copiedList);

    // Step 1: Send REPLCONF GETACK * to all replicas
    for (ReplicaConnection replica : replica_copiedList) {
        try {
            Writer writer = new OutputStreamWriter(replica.getOutputStream(), "UTF-8");
            writer.write("*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n");
            writer.flush();
        } catch (Exception e) {
            // Ignore write failure
        }
    }

    List<ReplicaConnection> acked = new ArrayList<>();

    while (System.currentTimeMillis() < deadline) {
        for (ReplicaConnection replica : replica_copiedList) {
            if (acked.contains(replica)) continue;
            System.out.println("inside");

            InputStream in= replica.getInputStream();
            Main.ParseResult result = RESPParser.parseRESP(in);
            
            List<String> command = result.command;
            String cmd = command.get(0).toUpperCase();
            if(cmd.equalsIgnoreCase("REPLCONF") && command.get(1).equalsIgnoreCase("ACK")){
                replica.setack(true);
            }
            

            if (replica.getOffset() >= masterOffset && replica.getack()) {
                acked.add(replica);
            }
        }

        if (acked.size() >= requiredAcks) break;
    }

    //     try {
    //         Thread.sleep(10);  // allow time for ACKs to arrive
    //     } catch (InterruptedException ignored) {}
    // }

    //  for (ReplicaConnection replica : replica_copiedList) {
    //     try {
    //         System.out.println(replica);
    //         System.out.println(replica.getOffset());
    //     } catch (Exception e) {
    //         // Ignore write failure
    //     }
    // }

    return acked.size();
}

}

include relevant code here (please make sure to keep the backticks around this!)

Hey @Nitesh-neg, could you upload your code to GitHub and share the link? It will be much easier to debug if I can run it directly.

hi @andy1li , thank you for replying .

@Nitesh-neg Sorry for the delayed response! Looks like you’ve got past this stage. :tada:

Do you happen to remember what was wrong? Would love to see if we can improve the tester / instructions.

@andy1li there was some problem with my code, not with the tester.
I was calculating the number of replcia that acknowledge in the wrong place.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.