I’m stuck on Stage 19 #YG4 redis
I’ve tried to make a client to test and the command propagation is working.
Here are my logs:
remote: [tester::#YG4] Running tests for Stage #YG4 (Replication - Command Processing)
remote: [tester::#YG4] Master is running on port 6379
remote: [tester::#YG4] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
remote: [tester::#YG4] [handshake] master: Waiting for replica to initiate handshake with "PING" command
remote: [tester::#YG4] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
remote: [tester::#YG4] [handshake] master: Received RESP array: ["PING"]
remote: [tester::#YG4] [handshake] Received ["PING"]
remote: [tester::#YG4] [handshake] master: Sent "PONG"
remote: [tester::#YG4] [handshake] master: Sent bytes: "+PONG\r\n"
remote: [tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
remote: [tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
remote: [tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "listening-port", "6380"]
remote: [tester::#YG4] [handshake] Received ["REPLCONF", "listening-port", "6380"]
remote: [tester::#YG4] [handshake] master: Sent "OK"
remote: [tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
remote: [tester::#YG4] [handshake] master: Waiting for replica to send "REPLCONF capa" command
remote: [tester::#YG4] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
remote: [tester::#YG4] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
remote: [tester::#YG4] [handshake] Received ["REPLCONF", "capa", "psync2"]
remote: [tester::#YG4] [handshake] master: Sent "OK"
remote: [tester::#YG4] [handshake] master: Sent bytes: "+OK\r\n"
remote: [tester::#YG4] [handshake] master: Waiting for replica to send "PSYNC" command
remote: [tester::#YG4] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
remote: [tester::#YG4] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
remote: [tester::#YG4] [handshake] Received ["PSYNC", "?", "-1"]
remote: [tester::#YG4] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
remote: [tester::#YG4] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
remote: [tester::#YG4] [handshake] Sending RDB file...
remote: [tester::#YG4] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
remote: [tester::#YG4] [handshake] Sent RDB file.
remote: [tester::#YG4] [propagation] master: > SET foo 123
remote: [tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n"
remote: [tester::#YG4] [propagation] master: > SET bar 456
remote: [tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbar\r\n$3\r\n456\r\n"
remote: [tester::#YG4] [propagation] master: > SET baz 789
remote: [tester::#YG4] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\n789\r\n"
remote: [tester::#YG4] [test] Getting key foo
remote: [tester::#YG4] [test] client: $ redis-cli GET foo
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n"
remote: [your_program] SET > foo > 123
remote: [your_program] GET > foo
remote: [tester::#YG4] [test] client: Received bytes: "$3\r\n123\r\n"
remote: [tester::#YG4] [test] client: Received RESP bulk string: "123"
remote: [tester::#YG4] [test] Received "123"
remote: [tester::#YG4] [test] Getting key bar
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] [test] Retrying... (1/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] [test] Retrying... (2/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] Retrying... (3/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] [test] Retrying... (4/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] [test] Retrying... (5/5 attempts)
remote: [tester::#YG4] [test] client: > GET bar
remote: [tester::#YG4] [test] client: Sent bytes: "*2\r\n$3\r\nGET\r\n$3\r\nbar\r\n"
remote: [your_program] GET > bar
remote: [tester::#YG4] [test] client: Received bytes: "$-1\r\n"
remote: [tester::#YG4] [test] client: Received RESP null bulk string: "$-1\r\n"
remote: [tester::#YG4] Expected simple string or bulk string, got NIL
remote: [tester::#YG4] Test failed
remote: [tester::#YG4] Terminating program
remote: [tester::#YG4] Program terminated successfully
And here’s a snippet of my code:
public static void Main(string[] args)
{
parseArgs(args);
Server server = new Server(serverRole, ip, port);
ServerListener listener = new ServerListener(server);
}
internal class Server
{
public readonly string serverRole;
public readonly IPAddress ip;
public readonly int port;
public readonly TcpListener connection;
public readonly DataBank dataBank;
public Server(string serverRole, IPAddress ip, int port)
{
this.serverRole = serverRole;
this.ip = ip;
this.port = port;
this.dataBank = new DataBank();
TcpListener tcpServer = new TcpListener(ip, port);
tcpServer.Start();
connection = tcpServer;
}
}
internal class ServerListener
{
public static List<Socket> listeningSockets = new List<Socket>();
Server server;
CommandManager cmdManager = new CommandManager();
public ServerListener(Server server)
{
this.server = server;
startListening();
}
public void startListening()
{
while (true)
{
lookForNewClient();
foreach (var socket in listeningSockets)
{
if (socket.Connected)
{
cmdManager.handle(socket, server.dataBank);
}
}
}
}
public void lookForNewClient()
{
try
{
listeningSockets.Add(server.connection.AcceptSocket());
}
catch (Exception e)
{
}
}
}
internal class CommandManager
{
private static List<ICommand> commands = new List<ICommand>();
public CommandManager(){
commands.Add(new Echo());
commands.Add(new Ping());
commands.Add(new Set());
commands.Add(new Get());
commands.Add(new Info());
commands.Add(new ReplConf());
commands.Add(new Psync());
}
internal async void handle(Socket clientSocket, DataBank dataBank) {
while (clientSocket.Connected)
{
var buffer = new byte[1024];
await clientSocket.ReceiveAsync(buffer);
string cmd = System.Text.Encoding.Default.GetString(buffer);
if (cmd.StartsWith("+") || cmd.StartsWith("$"))
{
continue;
}
string[] atoms = RespParser.PARSE(cmd).ToArray();
string cmdName = atoms[0];
string[] cmdArgs = new string[atoms.Length - 1];
Console.WriteLine(string.Join(" > ", atoms));
Array.Copy(atoms, 1, cmdArgs, 0, cmdArgs.Length);
foreach (ICommand command in commands)
{
if (command.getName().Equals(cmdName.ToUpper()) || command.getAlies().Contains(cmdName))
{
command.handle(new CommandContext(dataBank,cmdArgs,clientSocket));
break;
}
}
}
}
}
internal class Set : ICommand
{
public List<string> getAlies()
{
return new List<string>() { getName() };
}
public string getHelp()
{
return getName();
}
public string getName()
{
return "SET";
}
public void handle(CommandContext context)
{
Socket socket = context.socket;
DataBank db = context.dataBank;
string[] args = context.args;
switch (args.Length){
case 2:
db.setData(args[0], args[1]);
foreach(SlaveServer slave in SlaveServer.Slaves){
Socket slaveSocket = slave.socket;
slaveSocket.SendAsync(RespEncoder.ENCODE_ARRAY_AS_BYTEARRAY(new List<String>() { "SET", args[0], args[1] }));
}
if (!(MasterServer.Master != null && MasterServer.Master.socket.Equals(socket))) {
Console.WriteLine("+OK");
socket.SendAsync(RespEncoder.ENCODE_STRING_AS_BYTEARRAY("OK"));
}
return;
case 4:
switch (args[2].ToUpper()){
case "PX":
if (int.TryParse(args[3], out int ms)){
db.setData(args[0], args[1], ms);
foreach (SlaveServer slave in SlaveServer.Slaves)
{
Socket slaveSocket = slave.socket;
slaveSocket.SendAsync(RespEncoder.ENCODE_ARRAY_AS_BYTEARRAY(new List<String>() { "SET", args[0], args[1], "PX", args[3] }));
}
if (!(MasterServer.Master != null && MasterServer.Master.socket.Equals(socket)))
{
socket.SendAsync(RespEncoder.ENCODE_STRING_AS_BYTEARRAY("OK"));
}
return;
}
break;
case "EX":
if (int.TryParse(args[3], out int s)){
db.setData(args[0], args[1], s*1000);
foreach (SlaveServer slave in SlaveServer.Slaves)
{
Socket slaveSocket = slave.socket;
slaveSocket.SendAsync(RespEncoder.ENCODE_ARRAY_AS_BYTEARRAY(new List<String>() { "SET", args[0], args[1], "EX", args[3] }));
}
if (!(MasterServer.Master != null && MasterServer.Master.socket.Equals(socket)))
socket.SendAsync(RespEncoder.ENCODE_STRING_AS_BYTEARRAY("OK"));
return;
}
break;
default:
break;
}
socket.SendAsync(RespEncoder.ECODED_NULL_BULK_STRING_AS_BYTEARRAY());
return;
default:
socket.SendAsync(RespEncoder.ECODED_NULL_BULK_STRING_AS_BYTEARRAY());
return;
}
}
}
internal class Get : ICommand
{
public List<string> getAlies()
{
return new List<string>() { getName() };
}
public string getHelp()
{
return getName();
}
public string getName()
{
return "GET";
}
public void handle(CommandContext context)
{
DataBank db = context.dataBank;
string[] args = context.args;
Socket socket = context.socket;
string data = db.getData(args[0]);
if (data != null)
{
socket.SendAsync(RespEncoder.ENCODE_BULK_STRING_AS_BYTEARRAY(data));
return;
}
socket.SendAsync(RespEncoder.ECODED_NULL_BULK_STRING_AS_BYTEARRAY());
}
}