I’m stuck on Stage #(YG4).
Hello, guys, my question is about YG4. I will post my Java code and logs.
Here is my issue: I have started a separate thread specifically for handling handshake and propagation, and I have also started threads to handle client requests. This means there is a multithreading issue here because I cannot control the timing of propagation and client requests As a result, I cannot pass the YG4 test because when the client issues a get command, my propagation thread has not yet completed the set command. This is evident from the logs, where there is a time gap between the set and get commands. If I make the client thread handling the get command sleep for a while, it works, but I believe this is inappropriate.
how can I ensure that the propagation set command is executed before the get command?
help me please, thank you guys .
Here are my logs:
And here’s my code:
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
class KeyExpiry {
public long expTime;
public boolean isActive;
public long startTime;
public KeyExpiry(long expTime, boolean isActive, long startTime) {
this.expTime = expTime;
this.isActive = isActive;
this.startTime = startTime;
}
}
public class Main {
private static HashSet<Socket> replicaSockets = new HashSet<>();
private static Map<String, String> map;
private static Map<String, KeyExpiry> expMap;
private static String currentKey = "";
private static int port = 6379;
private static boolean isMaster = true;
public static void main(String[] args) {
map = new ConcurrentHashMap<>();
expMap = new ConcurrentHashMap<>();
// You can use print statements as follows for debugging, they'll be visible when running tests.
System.out.println("Logs from your program will appear here!");
// Uncomment this block to pass the first stage
ServerSocket serverSocket = null;
handleArgs(List.of(args));
try {
serverSocket = new ServerSocket(port);
// Since the tester restarts your program quite often, setting SO_REUSEADDR
// ensures that we don't run into 'Address already in use' errors
serverSocket.setReuseAddress(true);
// Wait for connection from client.
while (true) {
Socket clientSocket = serverSocket.accept();
new Thread(() -> {
try {
processClients(clientSocket);
} catch (Exception e) {
System.out.println("Exception: " + e.getMessage());
}
}).start();
}
} catch (IOException e) {
System.out.println("IOException: " + e.getMessage());
}
}
private static void handleArgs(List<String> args) {
if (args.contains("--port")) {
port = Integer.parseInt(args.get(args.indexOf("--port") + 1));
}
if (args.contains("--replicaof")) {
String[] replicaArgs = args.get(args.indexOf("--replicaof") + 1).split(" ");
String masterHostname = replicaArgs[0];
int masterPortNumber = Integer.parseInt(replicaArgs[1]);
isMaster = false;
replicaHandshake(masterHostname, masterPortNumber);
}
}
private static void replicaHandshake(String masterHostname, int masterPortNumber) {
try {
new Thread(() -> {
try {
processMasterPropagation(masterHostname,masterPortNumber);
} catch (Exception e) {
System.out.println("Exception: " + e.getMessage());
}
}).start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void sendToReplica(Vector<String> command) throws IOException {
String toSend = "";
String arr[] = new String[command.size()];
for (int i = 0; i < command.size(); i++) {
arr[i] = command.get(i);
}
toSend = encodeRESPArr(arr);
for (Socket s : replicaSockets) {
OutputStream os = s.getOutputStream();
System.out.println("Writing to a replica");
os.write(toSend.getBytes());
}
}
public static String encodeRESPArr(String[] arr) {
int n = arr.length;
String send = "*" + n + "\r\n";
for (String s : arr) {
send += encodeRESP(s);
}
return send;
}
public static String encodeRESP(String s) {
int size = s.length();
String ret = "$" + size + "\r\n" + s + "\r\n";
return ret;
}
private static void processMasterPropagation(String masterHostname , int masterPortNumber) {
try {
String request_ping = "*1\r\n$4\r\nping\r\n";
String request_replconf_1 = "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n";
String request_replconf_2 = "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n";
String request_PSYNC = "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n";
Socket masterSocket = new Socket(masterHostname, masterPortNumber);
BufferedReader reader = new BufferedReader(new InputStreamReader(masterSocket.getInputStream()));
DataOutputStream dos = new DataOutputStream(masterSocket.getOutputStream());
dos.write(request_ping.getBytes(StandardCharsets.UTF_8));
dos.flush();
String content = reader.readLine();
System.out.println("ping->" + content);
dos.write(request_replconf_1.getBytes(StandardCharsets.UTF_8));
dos.flush();
content = reader.readLine();
System.out.println("request_replconf_1->" + content);
dos.write(request_replconf_2.getBytes(StandardCharsets.UTF_8));
dos.flush();
content = reader.readLine();
System.out.println("request_replconf_2->" + content);
dos.write(request_PSYNC.getBytes(StandardCharsets.UTF_8));
dos.flush();
content = reader.readLine();
System.out.println("request_PSYNC->" + content);
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(masterSocket.getOutputStream()));
while ((content = reader.readLine()) != null) {
System.out.println(" processMasterPropagation 读到的内容read content->::" + content);
if ("GETACK".equalsIgnoreCase(content)) {
String toSend[] = {"REPLCONF", "ACK", "0"};
masterSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
} else if ("SET".equals(content)) {
System.out.println("processMasterPropagation 进入set命令 ,this is set");
Vector<String> command = new Vector<>();
reader.readLine();
String key = reader.readLine();
reader.readLine();
String value = reader.readLine();
command.addElement("SET");
command.addElement(key);
command.addElement(value);
sendToReplica(command);
currentKey = key;
System.out.println("processMasterPropagation BEGIN....SET");
map.put(key, value);
expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
System.out.println("processMasterPropagation END.....SET");
System.out.println("this is a message from master propagation");
} else if ("px".equalsIgnoreCase(content)) {
reader.readLine();
long exp = Long.valueOf(reader.readLine());
expMap.put(currentKey,
new KeyExpiry(exp, true, System.currentTimeMillis()));
} else if ("GET".equals(content)) {
System.out.println("processMasterPropagation 进入GET命令 this is get");
//Thread.sleep(50);
reader.readLine();
content = reader.readLine();
System.out.println("processMasterPropagation BEGIN...... GET");
KeyExpiry keyExpiry = expMap.get(content);
System.out.println("processMasterPropagation END ..... ..GET");
String value = null;
System.out.println(keyExpiry.isActive);
if (keyExpiry.isActive) {
long currTime = System.currentTimeMillis();
long duration = currTime - keyExpiry.startTime;
if (duration < keyExpiry.expTime) {
value = map.get(content);
} else {
keyExpiry.isActive = false;
}
}
if (value == null) {
writer.write("$-1\r\n");
writer.flush();
continue;
}
StringBuilder sb = new StringBuilder();
sb.append("$" + value.length());
sb.append("\r\n").append(value).append("\r\n");
System.out.println(sb);
writer.write(sb.toString());
writer.flush();
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void processClients(Socket clientSocket) {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(clientSocket.getOutputStream()));) {
String content;
while ((content = reader.readLine()) != null) {
System.out.println("processClients 读到的内容read content->::" + content);
if ("ping".equalsIgnoreCase(content)) {
writer.write("+PONG\r\n");
writer.flush();
} else if ("REPLCONF".equalsIgnoreCase(content)) {
writer.write("+OK\r\n");
writer.flush();
} else if ("GETACK".equalsIgnoreCase(content)) {
String toSend[] = {"REPLCONF", "ACK", "0"};
clientSocket.getOutputStream().write(encodeRESPArr(toSend).getBytes());
} else if ("PSYNC".equalsIgnoreCase(content)) {
String replid = "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb";
String resp = "+FULLRESYNC " + replid + " 0\r\n";
writer.write(resp);
writer.flush();
byte[] contents = HexFormat.of().parseHex("524544495330303131fa0972656469732d76657205372e322e30fa0a72656469732d62697473c040fa056374696d65c26d08bc65fa08757365642d6d656dc2b0c41000fa08616f662d62617365c000fff06e3bfec0ff5aa2");
resp = "$" + contents.length + "\r\n";
clientSocket.getOutputStream().write(resp.getBytes());
clientSocket.getOutputStream().write(contents);
replicaSockets.add(clientSocket);
} else if ("eof".equalsIgnoreCase(content)) {
System.out.println("eof");
} else if ("ECHO".equalsIgnoreCase(content)) {
reader.readLine(); // Read and ignore the next line (length of the message)
String message = reader.readLine(); // Read the actual message
clientSocket.getOutputStream().write(
String.format("$%d\r\n%s\r\n", message.length(), message)
.getBytes()); // Send the formatted message as a RESP bulk string
} else if ("SET".equals(content)) {
System.out.println("processClients 进入set命令 ,this is set");
Vector<String> command = new Vector<>();
reader.readLine();
String key = reader.readLine();
reader.readLine();
String value = reader.readLine();
command.addElement("SET");
command.addElement(key);
command.addElement(value);
sendToReplica(command);
currentKey = key;
System.out.println("processClients BEGIN....SET");
map.put(key, value);
expMap.put(key, new KeyExpiry(0, true, Long.MAX_VALUE));
System.out.println("processClients END.....SET");
writer.write("$2\r\nOK\r\n");
writer.flush();
} else if ("px".equalsIgnoreCase(content)) {
reader.readLine();
long exp = Long.valueOf(reader.readLine());
expMap.put(currentKey,
new KeyExpiry(exp, true, System.currentTimeMillis()));
} else if ("GET".equals(content)) {
System.out.println("processClients 进入GET命令 this is get");
//Thread.sleep(50); // if i uncomment this , then it will pass the test, but i think it's not a proper solution.
reader.readLine();
content = reader.readLine();
System.out.println("processClients BEGIN...... GET");
KeyExpiry keyExpiry = expMap.get(content);
System.out.println("processClients END ..... ..GET");
String value = null;
System.out.println(keyExpiry.isActive);
if (keyExpiry.isActive) {
long currTime = System.currentTimeMillis();
long duration = currTime - keyExpiry.startTime;
if (duration < keyExpiry.expTime) {
value = map.get(content);
} else {
keyExpiry.isActive = false;
}
}
if (value == null) {
writer.write("$-1\r\n");
writer.flush();
continue;
}
StringBuilder sb = new StringBuilder();
sb.append("$" + value.length());
sb.append("\r\n").append(value).append("\r\n");
System.out.println(sb);
writer.write(sb.toString());
writer.flush();
} else if ("info".equalsIgnoreCase(content)) {
System.out.println("info->" + content);
StringBuilder sb = new StringBuilder();
if (isMaster) {
sb.append("role:master\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
} else {
sb.append("role:slave\n").append("master_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb\n").append("master_repl_offset:0");
}
writer.write("$" + sb.toString().length() + "\r\n" + sb + "\r\n");
writer.flush();
}
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (clientSocket != null) {
clientSocket.close();
}
} catch (IOException e) {
System.out.println("IOException: " + e.getMessage());
}
}
}
}