YG4,client get command before propagation set command, so get ""

I’m stuck on Stage #(YG4).

Hello, guys, my question is about YG4. I will post my Java code and logs.
Here is my issue: I have started a separate thread specifically for handling handshake and propagation, and I have also started threads to handle client requests. This means there is a multithreading issue here because I cannot control the timing of propagation and client requests As a result, I cannot pass the YG4 test because when the client issues a get command, my propagation thread has not yet completed the set command. This is evident from the logs, where there is a time gap between the set and get commands. If I make the client thread handling the get command sleep for a while, it works, but I believe this is inappropriate.
how can I ensure that the propagation set command is executed before the get command?

help me please, thank you guys .

Here are my logs:



And here’s my code:

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


class KeyExpiry {
    public long expTime;
    public boolean isActive;
    public long startTime;

    public KeyExpiry(long expTime, boolean isActive, long startTime) {
        this.expTime = expTime;
        this.isActive = isActive;
        this.startTime = startTime;
    }
}

public class Main {
    private static HashSet<Socket> replicaSockets = new HashSet<>();
    private static Map<String, String> map;
    private static Map<String, KeyExpiry> expMap;
    private static String currentKey = "";
    private static int port = 6379;
    private static boolean isMaster = true;



    public static void main(String[] args) {

        map = new ConcurrentHashMap<>();
        expMap = new ConcurrentHashMap<>();
        // You can use print statements as follows for debugging, they'll be visible when running tests.
        System.out.println("Logs from your program will appear here!");

        //  Uncomment this block to pass the first stage
        ServerSocket serverSocket = null;

        handleArgs(List.of(args));
        try {
            serverSocket = new ServerSocket(port);

            // Since the tester restarts your program quite often, setting SO_REUSEADDR
            // ensures that we don't run into 'Address already in use' errors
            serverSocket.setReuseAddress(true);
            // Wait for connection from client.
            while (true) {
                Socket clientSocket = serverSocket.accept();

                new Thread(() -> {
                    try {
                        processClients(clientSocket);
                    } catch (Exception e) {
                        System.out.println("Exception: " + e.getMessage());
                    }
                }).start();
            }

        } catch (IOException e) {
            System.out.println("IOException: " + e.getMessage());
        }
    }

    private static void handleArgs(List<String> args) {
        if (args.contains("--port")) {
            port = Integer.parseInt(args.get(args.indexOf("--port") + 1));
        }
        if (args.contains("--replicaof")) {
            String[] replicaArgs = args.get(args.indexOf("--replicaof") + 1).split(" ");
            String masterHostname = replicaArgs[0];
            int masterPortNumber = Integer.parseInt(replicaArgs[1]);
            isMaster = false;
            replicaHandshake(masterHostname, masterPortNumber);
        }
    }

    private static void replicaHandshake(String masterHostname, int masterPortNumber) {
        try {
            new Thread(() -> {
                try {
                    processMasterPropagation(masterHostname,masterPortNumber);
                } catch (Exception e) {
                    System.out.println("Exception: " + e.getMessage());
                }
            }).start();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    private static void sendToReplica(Vector<String> command) throws IOException {
        String toSend = "";
        String arr[] = new String[command.size()];
        for (int i = 0; i < command.size(); i++) {
            arr[i] = command.get(i);
        }
        toSend = encodeRESPArr(arr);
        for (Socket s : replicaSockets) {
            OutputStream os = s.getOutputStream();
            System.out.println("Writing to a replica");
            os.write(toSend.getBytes());
        }
    }

    public static String encodeRESPArr(String[] arr) {
        int n = arr.length;
        String send = "*" + n + "\r\n";
        for (String s : arr) {
            send += encodeRESP(s);
        }
        return send;
    }

    public static String encodeRESP(String s) {
        int size = s.length();
        String ret = "$" + size + "\r\n" + s + "\r\n";
        return ret;
    }


    private static void processMasterPropagation(String masterHostname , int masterPortNumber) {

        try  {
            String request_ping = "*1\r\n$4\r\nping\r\n";
            String request_replconf_1 = "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n";
            String request_replconf_2 = "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n";
            String request_PSYNC = "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n";


            Socket masterSocket = new Socket(masterHostname, masterPortNumber);
            BufferedReader reader = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));
            DataOutputStream dos = new DataOutputStream(masterSocket.getOutputStream());
            dos.write(request_ping.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            String content = reader.readLine();
            System.out.println("ping->" + content);

            dos.write(request_replconf_1.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_replconf_1->" + content);

            dos.write(request_replconf_2.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_replconf_2->" + content);

            dos.write(request_PSYNC.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_PSYNC->" + content);


            BufferedWriter writer = new BufferedWriter(
                    new OutputStreamWriter(masterSocket.getOutputStream()));

            while ((content = reader.readLine()) != null) {
                System.out.println("  processMasterPropagation 读到的内容read content->::" + content);
                if ("GETACK".equalsIgnoreCase(content)) {
                    String toSend[] = {"REPLCONF", "ACK", "0"};
                    masterSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
                } else if ("SET".equals(content)) {
                    System.out.println("processMasterPropagation 进入set命令 ,this is set");
                    Vector<String> command = new Vector<>();

                    reader.readLine();
                    String key = reader.readLine();
                    reader.readLine();
                    String value = reader.readLine();

                    command.addElement("SET");
                    command.addElement(key);
                    command.addElement(value);

                    sendToReplica(command);

                    currentKey = key;
                    System.out.println("processMasterPropagation   BEGIN....SET");
                    map.put(key, value);
                    expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
                    System.out.println("processMasterPropagation END.....SET");

                    System.out.println("this is a message from master propagation");

                } else if ("px".equalsIgnoreCase(content)) {
                    reader.readLine();
                    long exp = Long.valueOf(reader.readLine());
                    expMap.put(currentKey,
                            new KeyExpiry(exp, true, System.currentTimeMillis()));
                } else if ("GET".equals(content)) {
                    System.out.println("processMasterPropagation 进入GET命令  this is get");
                    //Thread.sleep(50);
                    reader.readLine();
                    content = reader.readLine();
                    System.out.println("processMasterPropagation  BEGIN...... GET");
                    KeyExpiry keyExpiry = expMap.get(content);
                    System.out.println("processMasterPropagation  END ..... ..GET");
                    String value = null;
                    System.out.println(keyExpiry.isActive);
                    if (keyExpiry.isActive) {
                        long currTime = System.currentTimeMillis();
                        long duration = currTime - keyExpiry.startTime;
                        if (duration < keyExpiry.expTime) {
                            value = map.get(content);
                        } else {
                            keyExpiry.isActive = false;
                        }
                    }
                    if (value == null) {
                        writer.write("$-1\r\n");
                        writer.flush();
                        continue;
                    }
                    StringBuilder sb = new StringBuilder();
                    sb.append("$" + value.length());
                    sb.append("\r\n").append(value).append("\r\n");
                    System.out.println(sb);
                    writer.write(sb.toString());
                    writer.flush();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static void processClients(Socket clientSocket) {

        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(clientSocket.getInputStream()));
             BufferedWriter writer = new BufferedWriter(
                     new OutputStreamWriter(clientSocket.getOutputStream()));) {
            String content;
            while ((content = reader.readLine()) != null) {
                System.out.println("processClients  读到的内容read content->::" + content);
                if ("ping".equalsIgnoreCase(content)) {
                    writer.write("+PONG\r\n");
                    writer.flush();
                } else if ("REPLCONF".equalsIgnoreCase(content)) {
                    writer.write("+OK\r\n");
                    writer.flush();
                } else if ("GETACK".equalsIgnoreCase(content)) {
                    String toSend[] = {"REPLCONF", "ACK", "0"};
                    clientSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
                } else if ("PSYNC".equalsIgnoreCase(content)) {

                    String replid = "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb";
                    String resp = "+FULLRESYNC " + replid + " 0\r\n";
                    writer.write(resp);
                    writer.flush();

                    byte[] contents = HexFormat.of().parseHex("524544495330303131fa0972656469732d76657205372e322e30fa0a72656469732d62697473c040fa056374696d65c26d08bc65fa08757365642d6d656dc2b0c41000fa08616f662d62617365c000fff06e3bfec0ff5aa2");
                    resp = "$" + contents.length + "\r\n";
                    clientSocket.getOutputStream().write(resp.getBytes());
                    clientSocket.getOutputStream().write(contents);

                    replicaSockets.add(clientSocket);

                } else if ("eof".equalsIgnoreCase(content)) {
                    System.out.println("eof");
                } else if ("ECHO".equalsIgnoreCase(content)) {
                    reader.readLine(); // Read and ignore the next line (length of the message)
                    String message = reader.readLine(); // Read the actual message
                    clientSocket.getOutputStream().write(
                            String.format("$%d\r\n%s\r\n", message.length(), message)
                                    .getBytes()); // Send the formatted message as a RESP bulk string
                } else if ("SET".equals(content)) {
                    System.out.println("processClients 进入set命令 ,this is set");
                    Vector<String> command = new Vector<>();

                    reader.readLine();
                    String key = reader.readLine();
                    reader.readLine();
                    String value = reader.readLine();

                    command.addElement("SET");
                    command.addElement(key);
                    command.addElement(value);

                    sendToReplica(command);

                    currentKey = key;
                    System.out.println("processClients  BEGIN....SET");
                    map.put(key, value);
                    expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
                    System.out.println("processClients   END.....SET");

                    writer.write("$2\r\nOK\r\n");
                    writer.flush();

                } else if ("px".equalsIgnoreCase(content)) {
                    reader.readLine();
                    long exp = Long.valueOf(reader.readLine());
                    expMap.put(currentKey,
                            new KeyExpiry(exp, true, System.currentTimeMillis()));
                } else if ("GET".equals(content)) {
                    System.out.println("processClients  进入GET命令  this is get");
                    //Thread.sleep(50); // if i uncomment this , then it will pass the test, but i think it's not a proper solution.
                    reader.readLine();
                    content = reader.readLine();
                    System.out.println("processClients    BEGIN...... GET");
                    KeyExpiry keyExpiry = expMap.get(content);
                    System.out.println("processClients     END ..... ..GET");
                    String value = null;
                    System.out.println(keyExpiry.isActive);
                    if (keyExpiry.isActive) {
                        long currTime = System.currentTimeMillis();
                        long duration = currTime - keyExpiry.startTime;
                        if (duration < keyExpiry.expTime) {
                            value = map.get(content);
                        } else {
                            keyExpiry.isActive = false;
                        }
                    }
                    if (value == null) {
                        writer.write("$-1\r\n");
                        writer.flush();
                        continue;
                    }
                    StringBuilder sb = new StringBuilder();
                    sb.append("$" + value.length());
                    sb.append("\r\n").append(value).append("\r\n");
                    System.out.println(sb);
                    writer.write(sb.toString());
                    writer.flush();
                } else if ("info".equalsIgnoreCase(content)) {
                    System.out.println("info->" + content);
                    StringBuilder sb = new StringBuilder();
                    if (isMaster) {
                        sb.append("role:master\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
                    } else {
                        sb.append("role:slave\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
                    }
                    writer.write("$" + sb.toString().length() + "\r\n" + sb + "\r\n");
                    writer.flush();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (clientSocket != null) {
                    clientSocket.close();
                }
            } catch (IOException e) {
                System.out.println("IOException: " + e.getMessage());
            }
        }
    }
}

Hey @show1999, there might be multiple issues at play. The first thing I noticed is that the handshake is being handled in a thread:

This isn’t ideal because the replica shouldn’t accept client connections until the handshake with the master is complete. In other words, the handshake should be blocking until it finishes.

Could you try moving the handshake out of the thread? Then we can proceed to address other potential issues.

1 Like

OK, yes, thank you ,you’re right. I forgot about that point. Now I have moved the handshake out, but actually, before this, my handshake was placed outside this thread, and then there would be reception issues causing XV6 to fail. However, it’s not reproducing now, which is strange.
Alright, I have now moved the handshake out, but YG4 still has the same issue—it can’t get the value. (Moreover, I noticed that in the YG4 phase, the replica did not read the RDB file sent by the master, which might be the reception issue I mentioned. It didn’t occur in the XV6 phase, but it appeared in the YG4 phase.)
So, I have two questions I need your help with:

  1. Regarding the replica receiving data sent by the master, it seems unstable. For example, this time in the YG4 phase, it didn’t receive the RDB file.
  2. Regarding the issue of the get command executing before the set command, resulting in not getting the value.

I appreciate your help very much.

here is my log





here is my code ,moving handshake out

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


class KeyExpiry {
    public long expTime;
    public boolean isActive;
    public long startTime;

    public KeyExpiry(long expTime, boolean isActive, long startTime) {
        this.expTime = expTime;
        this.isActive = isActive;
        this.startTime = startTime;
    }
}

public class Main {
    private static HashSet<Socket> replicaSockets = new HashSet<>();
    private static Map<String, String> map;
    private static Map<String, KeyExpiry> expMap;
    private static String currentKey = "";
    private static int port = 6379;
    private static boolean isMaster = true;



    public static void main(String[] args) {

        map = new ConcurrentHashMap<>();
        expMap = new ConcurrentHashMap<>();
        // You can use print statements as follows for debugging, they'll be visible when running tests.
        System.out.println("Logs from your program will appear here!");

        //  Uncomment this block to pass the first stage
        ServerSocket serverSocket = null;

        handleArgs(List.of(args));
        try {
            serverSocket = new ServerSocket(port);

            // Since the tester restarts your program quite often, setting SO_REUSEADDR
            // ensures that we don't run into 'Address already in use' errors
            serverSocket.setReuseAddress(true);
            // Wait for connection from client.
            while (true) {
                Socket clientSocket = serverSocket.accept();

                new Thread(() -> {
                    try {
                        processClients(clientSocket);
                    } catch (Exception e) {
                        System.out.println("Exception: " + e.getMessage());
                    }
                }).start();
            }

        } catch (IOException e) {
            System.out.println("IOException: " + e.getMessage());
        }
    }

    private static void handleArgs(List<String> args) {
        if (args.contains("--port")) {
            port = Integer.parseInt(args.get(args.indexOf("--port") + 1));
        }
        if (args.contains("--replicaof")) {
            String[] replicaArgs = args.get(args.indexOf("--replicaof") + 1).split(" ");
            String masterHostname = replicaArgs[0];
            int masterPortNumber = Integer.parseInt(replicaArgs[1]);
            isMaster = false;
            replicaHandshake(masterHostname, masterPortNumber);
        }
    }

    private static void replicaHandshake(String masterHostname, int masterPortNumber) {
        try {
            String request_ping = "*1\r\n$4\r\nping\r\n";
            String request_replconf_1 = "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n";
            String request_replconf_2 = "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n";
            String request_PSYNC = "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n";

            Socket masterSocket = new Socket(masterHostname, masterPortNumber);
            BufferedReader reader = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));
            DataOutputStream dos = new DataOutputStream(masterSocket.getOutputStream());

            dos.write(request_ping.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            String content = reader.readLine();
            System.out.println("ping->" + content);

            dos.write(request_replconf_1.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_replconf_1->" + content);

            dos.write(request_replconf_2.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_replconf_2->" + content);

            dos.write(request_PSYNC.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_PSYNC->" + content);

            new Thread(() -> {
                try {
                    processMasterPropagation(masterSocket);
                } catch (Exception e) {
                    System.out.println("Exception: " + e.getMessage());
                }
            }).start();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    private static void sendToReplica(Vector<String> command) throws IOException {
        String toSend = "";
        String arr[] = new String[command.size()];
        for (int i = 0; i < command.size(); i++) {
            arr[i] = command.get(i);
        }
        toSend = encodeRESPArr(arr);
        for (Socket s : replicaSockets) {
            OutputStream os = s.getOutputStream();
            System.out.println("Writing to a replica");
            os.write(toSend.getBytes());
        }
    }

    public static String encodeRESPArr(String[] arr) {
        int n = arr.length;
        String send = "*" + n + "\r\n";
        for (String s : arr) {
            send += encodeRESP(s);
        }
        return send;
    }

    public static String encodeRESP(String s) {
        int size = s.length();
        String ret = "$" + size + "\r\n" + s + "\r\n";
        return ret;
    }


    private static void processMasterPropagation(Socket masterSocket) {

        try  {
            BufferedReader reader = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(masterSocket.getOutputStream()));
            String content;
            while ((content = reader.readLine()) != null) {
                System.out.println("  processMasterPropagation 读到的内容read content->::" + content);
                if ("GETACK".equalsIgnoreCase(content)) {
                    String toSend[] = {"REPLCONF", "ACK", "0"};
                    masterSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
                } else if ("SET".equals(content)) {
                    System.out.println("processMasterPropagation 进入set命令 ,this is set");
                    Vector<String> command = new Vector<>();

                    reader.readLine();
                    String key = reader.readLine();
                    reader.readLine();
                    String value = reader.readLine();

                    command.addElement("SET");
                    command.addElement(key);
                    command.addElement(value);

                    sendToReplica(command);

                    currentKey = key;
                    System.out.println("processMasterPropagation   BEGIN....SET");
                    map.put(key, value);
                    expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
                    System.out.println("processMasterPropagation END.....SET");

                    System.out.println("this is a message from master propagation");

                } else if ("px".equalsIgnoreCase(content)) {
                    reader.readLine();
                    long exp = Long.valueOf(reader.readLine());
                    expMap.put(currentKey,
                            new KeyExpiry(exp, true, System.currentTimeMillis()));
                } else if ("GET".equals(content)) {
                    System.out.println("processMasterPropagation 进入GET命令  this is get");
                    //Thread.sleep(50);
                    reader.readLine();
                    content = reader.readLine();
                    System.out.println("processMasterPropagation  BEGIN...... GET");
                    KeyExpiry keyExpiry = expMap.get(content);
                    System.out.println("processMasterPropagation  END ..... ..GET");
                    String value = null;
                    System.out.println(keyExpiry.isActive);
                    if (keyExpiry.isActive) {
                        long currTime = System.currentTimeMillis();
                        long duration = currTime - keyExpiry.startTime;
                        if (duration < keyExpiry.expTime) {
                            value = map.get(content);
                        } else {
                            keyExpiry.isActive = false;
                        }
                    }
                    if (value == null) {
                        writer.write("$-1\r\n");
                        writer.flush();
                        continue;
                    }
                    StringBuilder sb = new StringBuilder();
                    sb.append("$" + value.length());
                    sb.append("\r\n").append(value).append("\r\n");
                    System.out.println(sb);
                    writer.write(sb.toString());
                    writer.flush();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static void processClients(Socket clientSocket) {

        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(clientSocket.getInputStream()));
             BufferedWriter writer = new BufferedWriter(
                     new OutputStreamWriter(clientSocket.getOutputStream()));) {
            String content;
            while ((content = reader.readLine()) != null) {
                System.out.println("processClients  读到的内容read content->::" + content);
                if ("ping".equalsIgnoreCase(content)) {
                    writer.write("+PONG\r\n");
                    writer.flush();
                } else if ("REPLCONF".equalsIgnoreCase(content)) {
                    writer.write("+OK\r\n");
                    writer.flush();
                } else if ("GETACK".equalsIgnoreCase(content)) {
                    String toSend[] = {"REPLCONF", "ACK", "0"};
                    clientSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
                } else if ("PSYNC".equalsIgnoreCase(content)) {

                    String replid = "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb";
                    String resp = "+FULLRESYNC " + replid + " 0\r\n";
                    writer.write(resp);
                    writer.flush();

                    byte[] contents = HexFormat.of().parseHex("524544495330303131fa0972656469732d76657205372e322e30fa0a72656469732d62697473c040fa056374696d65c26d08bc65fa08757365642d6d656dc2b0c41000fa08616f662d62617365c000fff06e3bfec0ff5aa2");
                    resp = "$" + contents.length + "\r\n";
                    clientSocket.getOutputStream().write(resp.getBytes());
                    clientSocket.getOutputStream().write(contents);

                    replicaSockets.add(clientSocket);

                } else if ("eof".equalsIgnoreCase(content)) {
                    System.out.println("eof");
                } else if ("ECHO".equalsIgnoreCase(content)) {
                    reader.readLine(); // Read and ignore the next line (length of the message)
                    String message = reader.readLine(); // Read the actual message
                    clientSocket.getOutputStream().write(
                            String.format("$%d\r\n%s\r\n", message.length(), message)
                                    .getBytes()); // Send the formatted message as a RESP bulk string
                } else if ("SET".equals(content)) {
                    System.out.println("processClients 进入set命令 ,this is set");
                    Vector<String> command = new Vector<>();

                    reader.readLine();
                    String key = reader.readLine();
                    reader.readLine();
                    String value = reader.readLine();

                    command.addElement("SET");
                    command.addElement(key);
                    command.addElement(value);

                    sendToReplica(command);

                    currentKey = key;
                    System.out.println("processClients  BEGIN....SET");
                    map.put(key, value);
                    expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
                    System.out.println("processClients   END.....SET");

                    writer.write("$2\r\nOK\r\n");
                    writer.flush();

                } else if ("px".equalsIgnoreCase(content)) {
                    reader.readLine();
                    long exp = Long.valueOf(reader.readLine());
                    expMap.put(currentKey,
                            new KeyExpiry(exp, true, System.currentTimeMillis()));
                } else if ("GET".equals(content)) {
                    System.out.println("processClients  进入GET命令  this is get");
                    //Thread.sleep(50); // if i uncomment this , then it will pass the test, but i think it's not a proper solution.
                    reader.readLine();
                    content = reader.readLine();
                    System.out.println("processClients    BEGIN...... GET");
                    KeyExpiry keyExpiry = expMap.get(content);
                    System.out.println("processClients     END ..... ..GET");
                    String value = null;
                    System.out.println(keyExpiry.isActive);
                    if (keyExpiry.isActive) {
                        long currTime = System.currentTimeMillis();
                        long duration = currTime - keyExpiry.startTime;
                        if (duration < keyExpiry.expTime) {
                            value = map.get(content);
                        } else {
                            keyExpiry.isActive = false;
                        }
                    }
                    if (value == null) {
                        writer.write("$-1\r\n");
                        writer.flush();
                        continue;
                    }
                    StringBuilder sb = new StringBuilder();
                    sb.append("$" + value.length());
                    sb.append("\r\n").append(value).append("\r\n");
                    System.out.println(sb);
                    writer.write(sb.toString());
                    writer.flush();
                } else if ("info".equalsIgnoreCase(content)) {
                    System.out.println("info->" + content);
                    StringBuilder sb = new StringBuilder();
                    if (isMaster) {
                        sb.append("role:master\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
                    } else {
                        sb.append("role:slave\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
                    }
                    writer.write("$" + sb.toString().length() + "\r\n" + sb + "\r\n");
                    writer.flush();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (clientSocket != null) {
                    clientSocket.close();
                }
            } catch (IOException e) {
                System.out.println("IOException: " + e.getMessage());
            }
        }
    }
}


1 Like
  1. Regarding the replica receiving data sent by the master, it seems unstable. For example, this time in the YG4 phase, it didn’t receive the RDB file.

Could you point me to where you’re handling RDB reading?

Please note that while RDB may sometimes be read along with PSYNC, this isn’t guaranteed and shouldn’t be relied upon.

  1. Regarding the issue of the get command executing before the set command, resulting in not getting the value.

Let’s address this after ensuring RDB is handled properly.

1 Like

I believe this is where the RDB file is being read. As you can see, in the XV6 phase, the RDB file was successfully read, but in the YG4 phase, it was not read.


Am I misunderstanding? Could you please explain it in more detail? Thank you.

Hmm, I see. It seems that you’re implicitly handling and ignoring RDB there.

You might want to make sure RDB is properly handled/ignored before starting the thread for processMasterPropagation, as RDB is part of the handshake/resynchronization process.


I also noticed is that two BufferedReaders are being created from masterSocket. This could cause RDB data to be buffered in the first one, potentially leading to instability in RDB handling.

There may be other issues as well, but let’s tackle them as we go.

1 Like

ok,love you, so question 1 is solved,
help me about question2 , appreciate your help very much @andy1li

@show1999 Feel free to tag me once you’ve addressed this:

You might want to make sure RDB is properly handled/ignored before starting the thread for processMasterPropagation , as RDB is part of the handshake/resynchronization process.

When that’s done, I’ll take another look at your code.

1 Like

@andy1li
yes,i have already solved the question1 with your help, just keep one reader.

here is my new code

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


class KeyExpiry {
    public long expTime;
    public boolean isActive;
    public long startTime;

    public KeyExpiry(long expTime, boolean isActive, long startTime) {
        this.expTime = expTime;
        this.isActive = isActive;
        this.startTime = startTime;
    }
}

public class Main {
    private static HashSet<Socket> replicaSockets = new HashSet<>();
    private static Map<String, String> map;
    private static Map<String, KeyExpiry> expMap;
    private static String currentKey = "";
    private static int port = 6379;
    private static boolean isMaster = true;



    public static void main(String[] args) {

        map = new ConcurrentHashMap<>();
        expMap = new ConcurrentHashMap<>();
        // You can use print statements as follows for debugging, they'll be visible when running tests.
        System.out.println("Logs from your program will appear here!");

        //  Uncomment this block to pass the first stage
        ServerSocket serverSocket = null;

        handleArgs(List.of(args));
        try {
            serverSocket = new ServerSocket(port);

            // Since the tester restarts your program quite often, setting SO_REUSEADDR
            // ensures that we don't run into 'Address already in use' errors
            serverSocket.setReuseAddress(true);
            // Wait for connection from client.
            while (true) {
                Socket clientSocket = serverSocket.accept();

                new Thread(() -> {
                    try {
                        processClients(clientSocket);
                    } catch (Exception e) {
                        System.out.println("Exception: " + e.getMessage());
                    }
                }).start();
            }

        } catch (IOException e) {
            System.out.println("IOException: " + e.getMessage());
        }
    }

    private static void handleArgs(List<String> args) {
        if (args.contains("--port")) {
            port = Integer.parseInt(args.get(args.indexOf("--port") + 1));
        }
        if (args.contains("--replicaof")) {
            String[] replicaArgs = args.get(args.indexOf("--replicaof") + 1).split(" ");
            String masterHostname = replicaArgs[0];
            int masterPortNumber = Integer.parseInt(replicaArgs[1]);
            isMaster = false;
            replicaHandshake(masterHostname, masterPortNumber);
        }
    }

    private static void replicaHandshake(String masterHostname, int masterPortNumber) {
        try {
            String request_ping = "*1\r\n$4\r\nping\r\n";
            String request_replconf_1 = "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n";
            String request_replconf_2 = "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n";
            String request_PSYNC = "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n";

            Socket masterSocket = new Socket(masterHostname, masterPortNumber);
            BufferedReader reader = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));
            DataOutputStream dos = new DataOutputStream(masterSocket.getOutputStream());

            dos.write(request_ping.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            String content = reader.readLine();
            System.out.println("ping->" + content);

            dos.write(request_replconf_1.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_replconf_1->" + content);

            dos.write(request_replconf_2.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_replconf_2->" + content);

            dos.write(request_PSYNC.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_PSYNC->" + content);

            new Thread(() -> {
                try {
                    processMasterPropagation(masterSocket,reader);
                } catch (Exception e) {
                    System.out.println("Exception: " + e.getMessage());
                }
            }).start();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    private static void sendToReplica(Vector<String> command) throws IOException {
        String toSend = "";
        String arr[] = new String[command.size()];
        for (int i = 0; i < command.size(); i++) {
            arr[i] = command.get(i);
        }
        toSend = encodeRESPArr(arr);
        for (Socket s : replicaSockets) {
            OutputStream os = s.getOutputStream();
            System.out.println("Writing to a replica");
            os.write(toSend.getBytes());
        }
    }

    public static String encodeRESPArr(String[] arr) {
        int n = arr.length;
        String send = "*" + n + "\r\n";
        for (String s : arr) {
            send += encodeRESP(s);
        }
        return send;
    }

    public static String encodeRESP(String s) {
        int size = s.length();
        String ret = "$" + size + "\r\n" + s + "\r\n";
        return ret;
    }


    private static void processMasterPropagation(Socket masterSocket,BufferedReader reader) {

        try  {
//            BufferedReader reader = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(masterSocket.getOutputStream()));
            String content;
            while ((content = reader.readLine()) != null) {
                System.out.println("  processMasterPropagation 读到的内容read content->::" + content);
                if ("GETACK".equalsIgnoreCase(content)) {
                    String toSend[] = {"REPLCONF", "ACK", "0"};
                    masterSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
                } else if ("SET".equals(content)) {
                    System.out.println("processMasterPropagation 进入set命令 ,this is set");
                    Vector<String> command = new Vector<>();

                    reader.readLine();
                    String key = reader.readLine();
                    reader.readLine();
                    String value = reader.readLine();

                    command.addElement("SET");
                    command.addElement(key);
                    command.addElement(value);

                    sendToReplica(command);

                    currentKey = key;
                    System.out.println("processMasterPropagation   BEGIN....SET");
                    map.put(key, value);
                    expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
                    System.out.println("processMasterPropagation END.....SET");

                    System.out.println("this is a message from master propagation");

                } else if ("px".equalsIgnoreCase(content)) {
                    reader.readLine();
                    long exp = Long.valueOf(reader.readLine());
                    expMap.put(currentKey,
                            new KeyExpiry(exp, true, System.currentTimeMillis()));
                } else if ("GET".equals(content)) {
                    System.out.println("processMasterPropagation 进入GET命令  this is get");
                    //Thread.sleep(50);
                    reader.readLine();
                    content = reader.readLine();
                    System.out.println("processMasterPropagation  BEGIN...... GET");
                    KeyExpiry keyExpiry = expMap.get(content);
                    System.out.println("processMasterPropagation  END ..... ..GET");
                    String value = null;
                    System.out.println(keyExpiry.isActive);
                    if (keyExpiry.isActive) {
                        long currTime = System.currentTimeMillis();
                        long duration = currTime - keyExpiry.startTime;
                        if (duration < keyExpiry.expTime) {
                            value = map.get(content);
                        } else {
                            keyExpiry.isActive = false;
                        }
                    }
                    if (value == null) {
                        writer.write("$-1\r\n");
                        writer.flush();
                        continue;
                    }
                    StringBuilder sb = new StringBuilder();
                    sb.append("$" + value.length());
                    sb.append("\r\n").append(value).append("\r\n");
                    System.out.println(sb);
                    writer.write(sb.toString());
                    writer.flush();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static void processClients(Socket clientSocket) {

        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(clientSocket.getInputStream()));
             BufferedWriter writer = new BufferedWriter(
                     new OutputStreamWriter(clientSocket.getOutputStream()));) {
            String content;
            while ((content = reader.readLine()) != null) {
                System.out.println("processClients  读到的内容read content->::" + content);
                if ("ping".equalsIgnoreCase(content)) {
                    writer.write("+PONG\r\n");
                    writer.flush();
                } else if ("REPLCONF".equalsIgnoreCase(content)) {
                    writer.write("+OK\r\n");
                    writer.flush();
                } else if ("GETACK".equalsIgnoreCase(content)) {
                    String toSend[] = {"REPLCONF", "ACK", "0"};
                    clientSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
                } else if ("PSYNC".equalsIgnoreCase(content)) {

                    String replid = "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb";
                    String resp = "+FULLRESYNC " + replid + " 0\r\n";
                    writer.write(resp);
                    writer.flush();

                    byte[] contents = HexFormat.of().parseHex("524544495330303131fa0972656469732d76657205372e322e30fa0a72656469732d62697473c040fa056374696d65c26d08bc65fa08757365642d6d656dc2b0c41000fa08616f662d62617365c000fff06e3bfec0ff5aa2");
                    resp = "$" + contents.length + "\r\n";
                    clientSocket.getOutputStream().write(resp.getBytes());
                    clientSocket.getOutputStream().write(contents);

                    replicaSockets.add(clientSocket);

                } else if ("eof".equalsIgnoreCase(content)) {
                    System.out.println("eof");
                } else if ("ECHO".equalsIgnoreCase(content)) {
                    reader.readLine(); // Read and ignore the next line (length of the message)
                    String message = reader.readLine(); // Read the actual message
                    clientSocket.getOutputStream().write(
                            String.format("$%d\r\n%s\r\n", message.length(), message)
                                    .getBytes()); // Send the formatted message as a RESP bulk string
                } else if ("SET".equals(content)) {
                    System.out.println("processClients 进入set命令 ,this is set");
                    Vector<String> command = new Vector<>();

                    reader.readLine();
                    String key = reader.readLine();
                    reader.readLine();
                    String value = reader.readLine();

                    command.addElement("SET");
                    command.addElement(key);
                    command.addElement(value);

                    sendToReplica(command);

                    currentKey = key;
                    System.out.println("processClients  BEGIN....SET");
                    map.put(key, value);
                    expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
                    System.out.println("processClients   END.....SET");

                    writer.write("$2\r\nOK\r\n");
                    writer.flush();

                } else if ("px".equalsIgnoreCase(content)) {
                    reader.readLine();
                    long exp = Long.valueOf(reader.readLine());
                    expMap.put(currentKey,
                            new KeyExpiry(exp, true, System.currentTimeMillis()));
                } else if ("GET".equals(content)) {
                    System.out.println("processClients  进入GET命令  this is get");
                    //Thread.sleep(50); // if i uncomment this , then it will pass the test, but i think it's not a proper solution.
                    reader.readLine();
                    content = reader.readLine();
                    System.out.println("processClients    BEGIN...... GET");
                    KeyExpiry keyExpiry = expMap.get(content);
                    System.out.println("processClients     END ..... ..GET");
                    String value = null;
                    System.out.println(keyExpiry.isActive);
                    if (keyExpiry.isActive) {
                        long currTime = System.currentTimeMillis();
                        long duration = currTime - keyExpiry.startTime;
                        if (duration < keyExpiry.expTime) {
                            value = map.get(content);
                        } else {
                            keyExpiry.isActive = false;
                        }
                    }
                    if (value == null) {
                        writer.write("$-1\r\n");
                        writer.flush();
                        continue;
                    }
                    StringBuilder sb = new StringBuilder();
                    sb.append("$" + value.length());
                    sb.append("\r\n").append(value).append("\r\n");
                    System.out.println(sb);
                    writer.write(sb.toString());
                    writer.flush();
                } else if ("info".equalsIgnoreCase(content)) {
                    System.out.println("info->" + content);
                    StringBuilder sb = new StringBuilder();
                    if (isMaster) {
                        sb.append("role:master\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
                    } else {
                        sb.append("role:slave\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
                    }
                    writer.write("$" + sb.toString().length() + "\r\n" + sb + "\r\n");
                    writer.flush();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (clientSocket != null) {
                    clientSocket.close();
                }
            } catch (IOException e) {
                System.out.println("IOException: " + e.getMessage());
            }
        }
    }
}

@show1999 Good job using only one reader!

Your latest submission still shows that RDB is being handled inside the thread rather than before starting it:

This may or may not be affecting Issue 2, but until it’s fixed, we can’t be certain. :sweat_smile:

1 Like

@andy1li

sorry i misunderstood your meaning before, so now I really receive RDB file before start a new thread.

it appears a new problem, see my log please.

you can see my latest code and log ,right?

anyway i post my code and log here,

here is my log

here is my new code

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


class KeyExpiry {
    public long expTime;
    public boolean isActive;
    public long startTime;

    public KeyExpiry(long expTime, boolean isActive, long startTime) {
        this.expTime = expTime;
        this.isActive = isActive;
        this.startTime = startTime;
    }
}

public class Main {
    private static HashSet<Socket> replicaSockets = new HashSet<>();
    private static Map<String, String> map;
    private static Map<String, KeyExpiry> expMap;
    private static String currentKey = "";
    private static int port = 6379;
    private static boolean isMaster = true;



    public static void main(String[] args) {

        map = new ConcurrentHashMap<>();
        expMap = new ConcurrentHashMap<>();
        // You can use print statements as follows for debugging, they'll be visible when running tests.
        System.out.println("Logs from your program will appear here!");

        //  Uncomment this block to pass the first stage
        ServerSocket serverSocket = null;

        handleArgs(List.of(args));
        try {
            serverSocket = new ServerSocket(port);

            // Since the tester restarts your program quite often, setting SO_REUSEADDR
            // ensures that we don't run into 'Address already in use' errors
            serverSocket.setReuseAddress(true);
            // Wait for connection from client.
            while (true) {
                Socket clientSocket = serverSocket.accept();

                new Thread(() -> {
                    try {
                        processClients(clientSocket);
                    } catch (Exception e) {
                        System.out.println("Exception: " + e.getMessage());
                    }
                }).start();
            }

        } catch (IOException e) {
            System.out.println("IOException: " + e.getMessage());
        }
    }

    private static void handleArgs(List<String> args) {
        if (args.contains("--port")) {
            port = Integer.parseInt(args.get(args.indexOf("--port") + 1));
        }
        if (args.contains("--replicaof")) {
            String[] replicaArgs = args.get(args.indexOf("--replicaof") + 1).split(" ");
            String masterHostname = replicaArgs[0];
            int masterPortNumber = Integer.parseInt(replicaArgs[1]);
            isMaster = false;
            replicaHandshake(masterHostname, masterPortNumber);
        }
    }

    private static void replicaHandshake(String masterHostname, int masterPortNumber) {
        try {
            String request_ping = "*1\r\n$4\r\nping\r\n";
            String request_replconf_1 = "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n";
            String request_replconf_2 = "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n";
            String request_PSYNC = "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n";

            Socket masterSocket = new Socket(masterHostname, masterPortNumber);
            BufferedReader reader = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));
            DataOutputStream dos = new DataOutputStream(masterSocket.getOutputStream());

            dos.write(request_ping.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            String content = reader.readLine();
            System.out.println("ping->" + content);

            dos.write(request_replconf_1.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_replconf_1->" + content);

            dos.write(request_replconf_2.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_replconf_2->" + content);

            dos.write(request_PSYNC.getBytes(StandardCharsets.UTF_8));
            dos.flush();
            content = reader.readLine();
            System.out.println("request_PSYNC->" + content);

            content = reader.readLine();
            System.out.println("RDB----part1------>" + content);

            content = reader.readLine();
            System.out.println("RDB----part2------>" + content);

            content = reader.readLine();
            System.out.println("RDB----part3------>" + content);

            new Thread(() -> {
                try {
                    processMasterPropagation(masterSocket,reader);
                } catch (Exception e) {
                    System.out.println("Exception: " + e.getMessage());
                }
            }).start();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    private static void sendToReplica(Vector<String> command) throws IOException {
        String toSend = "";
        String arr[] = new String[command.size()];
        for (int i = 0; i < command.size(); i++) {
            arr[i] = command.get(i);
        }
        toSend = encodeRESPArr(arr);
        for (Socket s : replicaSockets) {
            OutputStream os = s.getOutputStream();
            System.out.println("Writing to a replica");
            os.write(toSend.getBytes());
        }
    }

    public static String encodeRESPArr(String[] arr) {
        int n = arr.length;
        String send = "*" + n + "\r\n";
        for (String s : arr) {
            send += encodeRESP(s);
        }
        return send;
    }

    public static String encodeRESP(String s) {
        int size = s.length();
        String ret = "$" + size + "\r\n" + s + "\r\n";
        return ret;
    }


    private static void processMasterPropagation(Socket masterSocket,BufferedReader reader) {

        try  {
//            BufferedReader reader = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(masterSocket.getOutputStream()));
            String content;
            while ((content = reader.readLine()) != null) {
                System.out.println("  processMasterPropagation 读到的内容read content->::" + content);
                if ("GETACK".equalsIgnoreCase(content)) {
                    String toSend[] = {"REPLCONF", "ACK", "0"};
                    masterSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
                } else if ("SET".equals(content)) {
                    System.out.println("processMasterPropagation 进入set命令 ,this is set");
                    Vector<String> command = new Vector<>();

                    reader.readLine();
                    String key = reader.readLine();
                    reader.readLine();
                    String value = reader.readLine();

                    command.addElement("SET");
                    command.addElement(key);
                    command.addElement(value);

                    sendToReplica(command);

                    currentKey = key;
                    System.out.println("processMasterPropagation   BEGIN....SET");
                    map.put(key, value);
                    expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
                    System.out.println("processMasterPropagation END.....SET");

                    System.out.println("this is a message from master propagation");

                } else if ("px".equalsIgnoreCase(content)) {
                    reader.readLine();
                    long exp = Long.valueOf(reader.readLine());
                    expMap.put(currentKey,
                            new KeyExpiry(exp, true, System.currentTimeMillis()));
                } else if ("GET".equals(content)) {
                    System.out.println("processMasterPropagation 进入GET命令  this is get");
                    //Thread.sleep(50);
                    reader.readLine();
                    content = reader.readLine();
                    System.out.println("processMasterPropagation  BEGIN...... GET");
                    KeyExpiry keyExpiry = expMap.get(content);
                    System.out.println("processMasterPropagation  END ..... ..GET");
                    String value = null;
                    System.out.println(keyExpiry.isActive);
                    if (keyExpiry.isActive) {
                        long currTime = System.currentTimeMillis();
                        long duration = currTime - keyExpiry.startTime;
                        if (duration < keyExpiry.expTime) {
                            value = map.get(content);
                        } else {
                            keyExpiry.isActive = false;
                        }
                    }
                    if (value == null) {
                        writer.write("$-1\r\n");
                        writer.flush();
                        continue;
                    }
                    StringBuilder sb = new StringBuilder();
                    sb.append("$" + value.length());
                    sb.append("\r\n").append(value).append("\r\n");
                    System.out.println(sb);
                    writer.write(sb.toString());
                    writer.flush();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static void processClients(Socket clientSocket) {

        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(clientSocket.getInputStream()));
             BufferedWriter writer = new BufferedWriter(
                     new OutputStreamWriter(clientSocket.getOutputStream()));) {
            String content;
            while ((content = reader.readLine()) != null) {
                System.out.println("processClients  读到的内容read content->::" + content);
                if ("ping".equalsIgnoreCase(content)) {
                    writer.write("+PONG\r\n");
                    writer.flush();
                } else if ("REPLCONF".equalsIgnoreCase(content)) {
                    writer.write("+OK\r\n");
                    writer.flush();
                } else if ("GETACK".equalsIgnoreCase(content)) {
                    String toSend[] = {"REPLCONF", "ACK", "0"};
                    clientSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
                } else if ("PSYNC".equalsIgnoreCase(content)) {

                    String replid = "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb";
                    String resp = "+FULLRESYNC " + replid + " 0\r\n";
                    writer.write(resp);
                    writer.flush();

                    byte[] contents = HexFormat.of().parseHex("524544495330303131fa0972656469732d76657205372e322e30fa0a72656469732d62697473c040fa056374696d65c26d08bc65fa08757365642d6d656dc2b0c41000fa08616f662d62617365c000fff06e3bfec0ff5aa2");
                    resp = "$" + contents.length + "\r\n";
                    clientSocket.getOutputStream().write(resp.getBytes());
                    clientSocket.getOutputStream().write(contents);

                    replicaSockets.add(clientSocket);

                } else if ("eof".equalsIgnoreCase(content)) {
                    System.out.println("eof");
                } else if ("ECHO".equalsIgnoreCase(content)) {
                    reader.readLine(); // Read and ignore the next line (length of the message)
                    String message = reader.readLine(); // Read the actual message
                    clientSocket.getOutputStream().write(
                            String.format("$%d\r\n%s\r\n", message.length(), message)
                                    .getBytes()); // Send the formatted message as a RESP bulk string
                } else if ("SET".equals(content)) {
                    System.out.println("processClients 进入set命令 ,this is set");
                    Vector<String> command = new Vector<>();

                    reader.readLine();
                    String key = reader.readLine();
                    reader.readLine();
                    String value = reader.readLine();

                    command.addElement("SET");
                    command.addElement(key);
                    command.addElement(value);

                    sendToReplica(command);

                    currentKey = key;
                    System.out.println("processClients  BEGIN....SET");
                    map.put(key, value);
                    expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
                    System.out.println("processClients   END.....SET");

                    writer.write("$2\r\nOK\r\n");
                    writer.flush();

                } else if ("px".equalsIgnoreCase(content)) {
                    reader.readLine();
                    long exp = Long.valueOf(reader.readLine());
                    expMap.put(currentKey,
                            new KeyExpiry(exp, true, System.currentTimeMillis()));
                } else if ("GET".equals(content)) {
                    System.out.println("processClients  进入GET命令  this is get");
                    //Thread.sleep(50); // if i uncomment this , then it will pass the test, but i think it's not a proper solution.
                    reader.readLine();
                    content = reader.readLine();
                    System.out.println("processClients    BEGIN...... GET");
                    KeyExpiry keyExpiry = expMap.get(content);
                    System.out.println("processClients     END ..... ..GET");
                    String value = null;
                    System.out.println(keyExpiry.isActive);
                    if (keyExpiry.isActive) {
                        long currTime = System.currentTimeMillis();
                        long duration = currTime - keyExpiry.startTime;
                        if (duration < keyExpiry.expTime) {
                            value = map.get(content);
                        } else {
                            keyExpiry.isActive = false;
                        }
                    }
                    if (value == null) {
                        writer.write("$-1\r\n");
                        writer.flush();
                        continue;
                    }
                    StringBuilder sb = new StringBuilder();
                    sb.append("$" + value.length());
                    sb.append("\r\n").append(value).append("\r\n");
                    System.out.println(sb);
                    writer.write(sb.toString());
                    writer.flush();
                } else if ("info".equalsIgnoreCase(content)) {
                    System.out.println("info->" + content);
                    StringBuilder sb = new StringBuilder();
                    if (isMaster) {
                        sb.append("role:master\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
                    } else {
                        sb.append("role:slave\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
                    }
                    writer.write("$" + sb.toString().length() + "\r\n" + sb + "\r\n");
                    writer.flush();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (clientSocket != null) {
                    clientSocket.close();
                }
            } catch (IOException e) {
                System.out.println("IOException: " + e.getMessage());
            }
        }
    }
}

I logged each character from content in the format: char (hex | decimal).

You can see that the hex value fa is not being read correctly:

The issue seems to be caused by reading data as characters instead of as raw bytes, because \xfa is not a valid UTF-8 character.

You might need to use DataInputStream instead of BufferedReader to correctly handle raw byte data.

1 Like

@andy1li
use DataInputStream instead of
BufferedReader
I don’t understand why this issue would affect the result, but I made the changes anyway. However, the outcome seems to remain the same. :upside_down_face:
thank you for your replys , appreciate it very much
here is part of my log:


and my code just change - → use DataInputStream instead of
BufferedReader

Honestly, I’m not sure about the root cause yet, as there could be multiple factors involved. I can only identify them one at a time. :handshake:

1 Like

ok , you are right ,just fix it one by one

just help me please , appreciate it.

1 Like

master: Sent bytes: “$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2”

The next issue is that the empty RDB file does not end with a newline, causing readLine to wait indefinitely for the newline that never arrives:

To resolve this, read the exact number of bytes (88 in this case) instead of relying on readLine.

1 Like

@andy1li

I’m not sure if I understand correctly. Are you saying that the main thread will block here and stop running further because the readLine method did not encounter a newline character? I tried it and found that it can continue to run.

Anyway, I still tried to change the readLine method to accurately read 88 bytes. I expected it to throw the same error (connection refused) to prove that it has nothing to do with readLine , but it actually didn’t encounter the connection refused error.
This has left me very confused. Could you please explain this in more details?

Thank you. :heart:

Are you saying that the main thread will block here and stop running further because the readLine method did not encounter a newline character?

Yep, and that’s one of the issues we’re trying to resolve.

I tried it and found that it can continue to run.

Could you share more details about your findings?

1 Like

@andy1li
hi


This is part of my log. It printed “yes,” indicating that the main thread can continue running and is not blocked by readLine.

am I right? i dont know。 :upside_down_face:

and also , why can i pass xv6

i’m very confused.

1 Like

Ah, got it!

It was blocking until a client in the tester tried to connect to your replica, but the connection failed with dial tcp [::1]:6380: connect: connection refused:

As a result, the tester tried to terminate your replica, causing it to stop blocking and proceed to print the buffered content for RDB----Part 3.

Does that make sense?

1 Like