I’m stuck on Stage #ZU2.
Here are my logs:
[tester::#ZU2] Running tests for Stage #ZU2 (Handle concurrent clients)
[tester::#ZU2] $ ./your_program.sh
[your_program] Added new connection with index 0
[tester::#ZU2] client-1: $ redis-cli PING
[tester::#ZU2] client-1: Sent bytes: "*1\r\n$4\r\nPING\r\n"
[your_program] Read from connection 0:
[your_program] *1
[your_program] Read from connection 0:
[your_program] $4
[your_program] Read from connection 0:
[your_program] PING
[your_program] Sending PONG
[tester::#ZU2] client-1: Received bytes: "+PONG\r\n"
[tester::#ZU2] client-1: Received RESP simple string: "PONG"
[tester::#ZU2] Received "PONG"
[tester::#ZU2] client-2: $ redis-cli PING
[tester::#ZU2] client-2: Sent bytes: "*1\r\n$4\r\nPING\r\n"
[tester::#ZU2] Received: "" (no content received)
[tester::#ZU2] ^ error
[tester::#ZU2] Error: Expected start of a new RESP2 value (either +, -, :, $ or *)
[tester::#ZU2] Test failed
[tester::#ZU2] Terminating program
[tester::#ZU2] Program terminated successfully
However, when I connect with several redis-cli
instances to my server locally, everything works. I tested this by connecting manually in different terminal sessions and by using GNU parallel:
parallel -j 6 redis-cli -e "PING" ::: {1..6}
PONG
PONG
PONG
PONG
PONG
PONG
Here is my code:
const std = @import("std");
const net = std.net;
pub fn main() !void {
const address = try net.Address.resolveIp("0.0.0.0", 6379);
var listener = try address.listen(.{
.reuse_address = true,
.force_nonblocking = true,
});
defer listener.deinit();
var list: [255]net.Server.Connection = undefined;
var active: [255]bool = [_]bool{false} ** 255;
var buf: [256]u8 = undefined;
while (true) {
if (std.mem.indexOfScalar(bool, &active, false)) |idx| { // if we can accept new connections
if (listener.accept()) |conn| {
list[idx] = conn;
active[idx] = true;
std.debug.print("Added new connection with index {d}\n", .{idx});
} else |err| {
if (err != error.WouldBlock) {
std.debug.print("Could not accept new connecion. Error: {any}\n", .{err});
}
}
} else {
std.debug.print("Could not accept new connection, all 256 slots are in use!\n", .{});
}
for (active, 0..) |a, i| {
if (!a) continue;
const conn = &list[i];
const reader = conn.stream.reader();
if (reader.readUntilDelimiter(&buf, 0x0a)) |slice| {
std.debug.print("Read from connection {d}:\n{s}\n", .{ i, slice });
if (std.mem.eql(u8, slice, "PING\r")) {
std.debug.print("Sending PONG\n", .{});
_ = try conn.stream.write("+PONG\r\n");
}
if (std.mem.eql(u8, slice, "DOCS\r")) {
std.debug.print("Sending DOCS\n", .{});
_ = try conn.stream.write("%0\r\n");
}
} else |err| {
switch (err) {
error.WouldBlock => {},
error.EndOfStream => {
std.debug.print("Client closed connection {d}\n", .{i});
active[i] = false;
list[i] = undefined;
},
else => {
std.debug.print("Cannot read from connection {d}, error: {any}\n", .{ i, err });
active[i] = false;
list[i] = undefined;
},
}
}
}
}
}
Any help is greatly appreciated.