remote: [tester::#YD3] Running tests for Stage #YD3 (Replication - ACKs with commands)
remote: [tester::#YD3] Master is running on port 6379
remote: [tester::#YD3] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
remote: [tester::#YD3] [handshake] master: Waiting for replica to initiate handshake with "PING" command
remote: [tester::#YD3] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
remote: [tester::#YD3] [handshake] master: Received RESP array: ["PING"]
remote: [tester::#YD3] [handshake]
remote: [tester::#YD3] [handshake] Received ["PING"]
remote: [tester::#YD3] [handshake]
remote: [tester::#YD3] [handshake] master: Sent "PONG"
remote: [tester::#YD3] [handshake] master: Sent bytes: "+PONG\r\n"
remote: [tester::#YD3] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
remote: [tester::#YD3] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
remote: [tester::#YD3] [handshake] master: Received RESP array: [
remote: [tester::#YD3] [handshake] "REPLCONF",
remote: [tester::#YD3] [handshake] "listening-port",
remote: [tester::#YD3] [handshake] "6380"
remote: [tester::#YD3] [handshake] ]
remote: [tester::#YD3] [handshake]
remote: [tester::#YD3] [handshake] Received [
remote: [tester::#YD3] [handshake] "REPLCONF",
remote: [tester::#YD3] [handshake] "listening-port",
remote: [tester::#YD3] [handshake] "6380"
remote: [tester::#YD3] [handshake] ]
remote: [tester::#YD3] [handshake]
remote: [tester::#YD3] [handshake] master: Sent "OK"
remote: [tester::#YD3] [handshake] master: Sent bytes: "+OK\r\n"
remote: [tester::#YD3] [handshake] master: Waiting for replica to send "REPLCONF capa" command
remote: [tester::#YD3] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
remote: [tester::#YD3] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
remote: [tester::#YD3] [handshake]
remote: [tester::#YD3] [handshake] Received ["REPLCONF", "capa", "psync2"]
remote: [tester::#YD3] [handshake]
remote: [tester::#YD3] [handshake] master: Sent "OK"
remote: [tester::#YD3] [handshake] master: Sent bytes: "+OK\r\n"
remote: [tester::#YD3] [handshake] master: Waiting for replica to send "PSYNC" command
remote: [tester::#YD3] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
remote: [tester::#YD3] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
remote: [tester::#YD3] [handshake]
remote: [tester::#YD3] [handshake] Received ["PSYNC", "?", "-1"]
remote: [tester::#YD3] [handshake]
remote: [tester::#YD3] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
remote: [tester::#YD3] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
remote: [tester::#YD3] [handshake] Sending RDB file...
remote: [tester::#YD3] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
remote: [tester::#YD3] [handshake] Sent RDB file.
remote: [tester::#YD3] [test] master: > REPLCONF GETACK *
remote: [tester::#YD3] [test] master: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n"
remote: [your_program] 88
remote: [your_program] current read bytes 0
remote: [your_program] current read bytes 88
remote: [tester::#YD3] [test] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$3\r\nACK\r\n$1\r\n0\r\n"
remote: [tester::#YD3] [test] master: Received RESP array: ["REPLCONF", "ACK", "0"]
remote: [tester::#YD3] [test]
remote: [your_program] hand shake successed
remote: [your_program] *****
remote: [your_program] REPLCONF
remote: [your_program] GETACK
remote: [your_program] *
remote: [your_program] *****
remote: [your_program] command = replconf arg = getack
remote: [tester::#YD3] [test] Received ["REPLCONF", "ACK", "0"]
remote: [tester::#YD3] [test]
remote: [tester::#YD3] [propagation] master: > PING
remote: [tester::#YD3] [propagation] master: Sent bytes: "*1\r\n$4\r\nPING\r\n"
remote: [tester::#YD3] [test] master: > REPLCONF GETACK *
remote: [tester::#YD3] [test] master: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n"
remote: [your_program] *****
remote: [your_program] PING
remote: [your_program] *****
remote: [your_program] *****
remote: [your_program] REPLCONF
remote: [your_program] GETACK
remote: [your_program] *
remote: [your_program] *****
remote: [your_program] command = replconf arg = getack
remote: [tester::#YD3] [test] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$3\r\nACK\r\n$2\r\n51\r\n"
remote: [tester::#YD3] [test] master: Received RESP array: ["REPLCONF", "ACK", "51"]
remote: [tester::#YD3] [test]
remote: [tester::#YD3] [test] Received ["REPLCONF", "ACK", "51"]
remote: [tester::#YD3] [test]
remote: [tester::#YD3] [propagation] master: > SET pear blueberry
remote: [tester::#YD3] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$4\r\npear\r\n$9\r\nblueberry\r\n"
remote: [tester::#YD3] [propagation] master: > SET banana strawberry
remote: [tester::#YD3] [propagation] master: Sent bytes: "*3\r\n$3\r\nSET\r\n$6\r\nbanana\r\n$10\r\nstrawberry\r\n"
remote: [tester::#YD3] [test] master: > REPLCONF GETACK *
remote: [tester::#YD3] [test] master: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n"
remote: [your_program] *****
remote: [tester::#YD3] [test] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$3\r\nACK\r\n$3\r\n168\r\n"
remote: [tester::#YD3] [test] master: Received RESP array: ["REPLCONF", "ACK", "168"]
remote: [tester::#YD3] [test]
remote: [your_program] SET
remote: [your_program] pear
remote: [your_program] blueberry
remote: [your_program] *****
remote: [your_program] *****
remote: [your_program] SET
remote: [your_program] banana
remote: [your_program] strawberry
remote: [your_program] *****
remote: [your_program] *****
remote: [your_program] REPLCONF
remote: [your_program] GETACK
remote: [your_program] *
remote: [your_program] *****
remote: [your_program] command = replconf arg = getack
remote: [tester::#YD3] [test] Received ["REPLCONF", "ACK", "168"]
remote: [tester::#YD3] [test]
remote: [tester::#YD3] Test passed.
remote: [tester::#YD3] Terminating program
remote: [tester::#YD3] Program terminated successfully
remote:
remote: [tester::#XV6] Running tests for Stage #XV6 (Replication - ACKs with no commands)
remote: [tester::#XV6] Master is running on port 6379
remote: [tester::#XV6] $ ./your_program.sh --port 6380 --replicaof "localhost 6379"
remote: [tester::#XV6] [handshake] master: Waiting for replica to initiate handshake with "PING" command
remote: [tester::#XV6] [handshake] master: Received bytes: "*1\r\n$4\r\nPING\r\n"
remote: [tester::#XV6] [handshake] master: Received RESP array: ["PING"]
remote: [tester::#XV6] [handshake]
remote: [tester::#XV6] [handshake] Received ["PING"]
remote: [tester::#XV6] [handshake]
remote: [tester::#XV6] [handshake] master: Sent "PONG"
remote: [tester::#XV6] [handshake] master: Sent bytes: "+PONG\r\n"
remote: [tester::#XV6] [handshake] master: Waiting for replica to send "REPLCONF listening-port 6380" command
remote: [tester::#XV6] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$14\r\nlistening-port\r\n$4\r\n6380\r\n"
remote: [tester::#XV6] [handshake] master: Received RESP array: [
remote: [tester::#XV6] [handshake] "REPLCONF",
remote: [tester::#XV6] [handshake] "listening-port",
remote: [tester::#XV6] [handshake] "6380"
remote: [tester::#XV6] [handshake] ]
remote: [tester::#XV6] [handshake]
remote: [tester::#XV6] [handshake] Received [
remote: [tester::#XV6] [handshake] "REPLCONF",
remote: [tester::#XV6] [handshake] "listening-port",
remote: [tester::#XV6] [handshake] "6380"
remote: [tester::#XV6] [handshake] ]
remote: [tester::#XV6] [handshake]
remote: [tester::#XV6] [handshake] master: Sent "OK"
remote: [tester::#XV6] [handshake] master: Sent bytes: "+OK\r\n"
remote: [tester::#XV6] [handshake] master: Waiting for replica to send "REPLCONF capa" command
remote: [tester::#XV6] [handshake] master: Received bytes: "*3\r\n$8\r\nREPLCONF\r\n$4\r\ncapa\r\n$6\r\npsync2\r\n"
remote: [tester::#XV6] [handshake] master: Received RESP array: ["REPLCONF", "capa", "psync2"]
remote: [tester::#XV6] [handshake]
remote: [tester::#XV6] [handshake] Received ["REPLCONF", "capa", "psync2"]
remote: [tester::#XV6] [handshake]
remote: [tester::#XV6] [handshake] master: Sent "OK"
remote: [tester::#XV6] [handshake] master: Sent bytes: "+OK\r\n"
remote: [tester::#XV6] [handshake] master: Waiting for replica to send "PSYNC" command
remote: [tester::#XV6] [handshake] master: Received bytes: "*3\r\n$5\r\nPSYNC\r\n$1\r\n?\r\n$2\r\n-1\r\n"
remote: [tester::#XV6] [handshake] master: Received RESP array: ["PSYNC", "?", "-1"]
remote: [tester::#XV6] [handshake]
remote: [tester::#XV6] [handshake] Received ["PSYNC", "?", "-1"]
remote: [tester::#XV6] [handshake]
remote: [tester::#XV6] [handshake] master: Sent "FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0"
remote: [tester::#XV6] [handshake] master: Sent bytes: "+FULLRESYNC 75cd7bc10c49047e0d163660f3b90625b1af31dc 0\r\n"
remote: [tester::#XV6] [handshake] Sending RDB file...
remote: [tester::#XV6] [handshake] master: Sent bytes: "$88\r\nREDIS0011\xfa\tredis-ver\x057.2.0\xfa\nredis-bits\xc0@\xfa\x05ctime\xc2m\b\xbce\xfa\bused-mem°\xc4\x10\x00\xfa\baof-base\xc0\x00\xff\xf0n;\xfe\xc0\xffZ\xa2"
remote: [tester::#XV6] [handshake] Sent RDB file.
remote: [tester::#XV6] [test] master: > REPLCONF GETACK *
remote: [tester::#XV6] [test] master: Sent bytes: "*3\r\n$8\r\nREPLCONF\r\n$6\r\nGETACK\r\n$1\r\n*\r\n"
remote: [tester::#XV6] Received: "" (no content received)
remote: [tester::#XV6] ^ error
remote: [tester::#XV6] Error: Expected start of a new RESP2 value (either +, -, :, $ or *)
remote: [tester::#XV6] Test failed
remote: [tester::#XV6] Terminating program
remote: [tester::#XV6] Program terminated successfully
remote:
remote: Try our CLI to run tests faster without Git: https://codecrafters.io/cli
remote:
remote: View our article on debugging test failures: https://codecrafters.io/debug
remote:
To https://git.codecrafters.io/816e049dbd9e2229
8294e28..8200e06 master -> master
why I pass some tests, but I don’t pass the xv6 test. The test seems identical!!!
#ifndef REDIS_HH
#define REDIS_HH
#include <netinet/in.h>
#include <ostream>
#include <string>
#include <stdexcept>
#include <netdb.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sstream>
#include <cctype>
#include <algorithm>
#include <vector>
#include <unordered_map>
#include <chrono>
#include "Protocol.hpp"
#include "RDBParser.hpp"
#include "util.hpp"
#include <filesystem>
#include <cstring>
namespace fs = std::filesystem;
class Redis {
private:
int sockfd;
std::string host;
int port;
std::stringstream buffer;
int connection_backlog = 5;
int cur_db;
std::vector<std::unordered_map<std::string, std::string>> kvs;
std::vector<std::unordered_map<std::string, int64_t>> key_elapsed_time_dbs;
std::unordered_map<std::string, std::string> metadata;
bool is_master{true};
std::string master_host;
int master_port;
std::vector<int> slave_fds;
int _master_fd{-1};
size_t processed_bytes{0};
public:
Redis(std::string dir, std::string dbfilename, int cur_db = 0, int port = Protocol::DEFAULT_PORT, bool is_master = true, std::string replicaof = "", const std::string& host = Protocol::DEFAULT_HOST, int connection_backlog = 5)
: sockfd(-1), host(host), port(port), is_master(is_master), connection_backlog(connection_backlog), cur_db(0), kvs(16), key_elapsed_time_dbs(16){
if(!dir.empty() && !dbfilename.empty()) {
fs::path filepath = fs::path(dir) / dbfilename;
if(fs::exists(filepath)) {
RDBParser rdb_parser(dir, dbfilename);
rdb_parser.parseMetadata(metadata);
rdb_parser.parseDatabase(kvs, key_elapsed_time_dbs);
}
}
if (!replicaof.empty()) {
int space_pos = replicaof.find(' ');
master_host = replicaof.substr(0, space_pos);
master_port = std::stoi(replicaof.substr(space_pos + 1));
int master_fd = socket(AF_INET, SOCK_STREAM, 0);
if (master_fd < 0) {
throw std::runtime_error("socket creation failed");
}
struct addrinfo hints{}, *res;
hints.ai_family = AF_INET; // IPv4
hints.ai_socktype = SOCK_STREAM; // TCP stream socket
int err = getaddrinfo(master_host.c_str(), std::to_string(master_port).c_str(), &hints, &res);
if (err != 0) {
close(master_fd);
throw std::runtime_error("getaddrinfo failed: " + std::string(gai_strerror(err)));
}
if (connect(master_fd, res->ai_addr, res->ai_addrlen) < 0) {
freeaddrinfo(res);
close(master_fd);
throw std::runtime_error("connect failed to master");
}
freeaddrinfo(res); // Clean up addrinfo
// Send PING to master after connection
sendCommand({makeArray({makeBulk("PING")})}, master_fd);
char buf[65536];
RedisReply reply = readOneReply(master_fd);
// REPLCONF listening-port <PORT>
sendCommand({makeArray({makeBulk("REPLCONF"), makeBulk("listening-port"), makeBulk(std::to_string(port))})}, master_fd);
// REPLCONF capa psync2
sendCommand({makeArray({makeBulk("REPLCONF"), makeBulk("capa"), makeBulk("psync2")})}, master_fd);
reply = readOneReply(master_fd);
reply = readOneReply(master_fd);
sendCommand({makeArray({makeBulk("PSYNC"), makeBulk("?"), makeBulk("-1")})}, master_fd);
reply = readOneReply(master_fd);
int rdb_len = readBulkStringLen(master_fd);
std::string rdb_data(rdb_len, '\0');
std::cout << rdb_len << std::endl;
size_t total_read = 0;
while (total_read < rdb_len) {
ssize_t n = ::recv(master_fd, &rdb_data[total_read], rdb_len - total_read, 0);
std::cout << "current read bytes " << total_read << std::endl;
if (n < 0) {
throw std::runtime_error("recv error while reading RDB data");
} else if (n == 0) {
throw std::runtime_error("connection closed before RDB fully received");
}
total_read += n;
}
std::cout << "current read bytes " << total_read << std::endl;
std::cout << "hand shake successed " << std::endl;
_master_fd = master_fd;
}
metadata.insert_or_assign("dir", dir);
metadata.insert_or_assign("dbfilename", dbfilename);
metadata.insert_or_assign("master_replid", "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb");
metadata.insert_or_assign("master_repl_offset", "0");
bind_listen();
}
~Redis() {
if (sockfd != -1) close(sockfd);
if(_master_fd != -1) close(_master_fd);
}
int server_fd() const {
return sockfd;
}
int master_fd() {
return _master_fd;
}
void bind_listen() {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
int reuse = 1;
if (sockfd < 0) throw std::runtime_error("Socket creation failed");
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {
throw std::runtime_error("Socket set failed");
}
sockaddr_in server_addr {};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
inet_pton(AF_INET, host.c_str(), &server_addr.sin_addr);
if (bind(sockfd, (sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
throw std::runtime_error("bind failed");
}
if (listen(sockfd, connection_backlog) != 0) {
throw std::runtime_error("listen failed");
}
}
void sendCommand(const std::vector<RedisReply> &items, const int client_fd) {
std::string formatted = formatCommand(items);
::send(client_fd, formatted.c_str(), formatted.size(), 0);
}
int readBulkStringLen(const int client_fd) {
char buf[1024];
for(int i = 0; i < 1024; i++) {
if(::recv(client_fd, buf + i, 1, 0) != 1) {
throw std::runtime_error("process bulk string len failed");
}
if(buf[i] == '\n') {
buffer.write(buf, i + 1);
break;
}
}
RedisInputStream ris(buffer);
return Protocol::processBulkStringlen(ris);
}
std::vector<RedisReply> readAllAvailableReplies(const int client_fd) {
char buf[65536];
ssize_t n = ::recv(client_fd, buf, sizeof(buf), 0);
if (n < 0) throw std::runtime_error("Read error or connection closed");
else if (n == 0) return {}; // EOF
buffer.write(buf, n); // ĺ°†ć•°ćŤ®čż˝ĺŠ čż› buffer
std::vector<RedisReply> replies;
RedisInputStream ris(buffer);
while (true) {
std::streampos prevPos = buffer.tellg();
try {
RedisReply reply = Protocol::read(ris);
replies.push_back(std::move(reply));
} catch (const std::runtime_error& e) {
buffer.clear();
buffer.seekg(prevPos);
return replies;
}
}
return replies;
}
RedisReply readOneReply(const int client_fd) {
while (true) {
RedisInputStream ris(buffer);
std::streampos prevPos = buffer.tellg();
try {
RedisReply reply = Protocol::read(ris);
return reply;
} catch (const std::runtime_error& e) {
buffer.clear();
buffer.seekg(prevPos);
char buf[65536];
ssize_t n = ::recv(client_fd, buf, sizeof(buf), 0);
if (n < 0) throw std::runtime_error("recv error");
if (n == 0) throw std::runtime_error("connection closed (EOF)");
buffer.write(buf, n); // append new data into buffer
}
}
}
void process_command(std::vector<RedisReply> replys, const int client_fd) {
for(auto &reply : replys) {
std::vector<RedisReply> &items = reply.elements;
std::string command = items.front().strVal;
std::transform(command.begin(), command.end(), command.begin(), ::tolower);
std::cout << "*****" << std::endl;
for(auto item : items) {
std::cout << item.strVal << " " << std::endl;
}
std::cout << "*****" << std::endl;
if (command == "echo") {
items.erase(items.begin());
sendCommand(items, client_fd);
} else if (command == "ping" && client_fd != _master_fd) {
RedisReply pong;
pong.type = REPLY_STRING;
pong.strVal = "PONG";
sendCommand({pong}, client_fd);
} else if (command == "set") {
if (items.size() < 3) return;
std::string key = items[1].strVal;
std::string value = items[2].strVal;
if(items.size() == 5) {
std::transform(items[3].strVal.begin(), items[3].strVal.end(), command.begin(), ::tolower);
if(items[3].strVal == "px") {
key_elapsed_time_dbs[cur_db].insert_or_assign(key, get_millis() + std::stoi(items[4].strVal));
}
}
// store[key] = value;
kvs[cur_db].insert_or_assign(key, value);
// std::cout << "is_master = " << is_master << " SET " << key << " " << value << std::endl;
if(is_master) {
RedisReply ok;
ok.type = REPLY_STRING;
ok.strVal = "OK";
sendCommand({ok}, client_fd);
for(int fd: slave_fds) {
RedisReply slave_sync_content;
slave_sync_content.elements = reply.elements;
slave_sync_content.type = REPLY_ARRAY;
sendCommand({reply}, fd);
// std::cout << "fd " << fd << " Send Slave:" << " KEY " << key << " VALUE " << value << std::endl;
}
}
} else if (command == "get") {
if (items.size() < 2) return;
std::string key = items[1].strVal;
std::cout << "is_master = " << is_master << " GET " << key << std::endl;
if(key_elapsed_time_dbs[cur_db].count(key)) {
if(key_elapsed_time_dbs[cur_db][key] <= get_millis()) {
key_elapsed_time_dbs[cur_db].erase(key);
kvs[cur_db].erase(key);
}
}
RedisReply result;
if (kvs[cur_db].count(key)) {
result.type = REPLY_BULK;
result.strVal = kvs[cur_db][key];
} else {
result.type = REPLY_NIL;
}
sendCommand({result}, client_fd);
} else if(command == "config") {
command = items[1].strVal;
std::transform(command.begin(), command.end(), command.begin(), ::tolower);
if(command == "get") {
auto response = makeArray({
makeBulk("dir"),
makeBulk(metadata["dir"])
});
sendCommand({response}, client_fd);
}
} else if(command == "keys") {
std::string pattern = items[1].strVal;
RedisReply reply;
reply.type = RedisReplyType::REPLY_ARRAY;
for (const auto& [key, _] : kvs[cur_db]) {
if(matchPattern(pattern, key)) {
reply.elements.emplace_back(std::move(makeBulk(key)));
}
}
sendCommand({reply}, client_fd);
} else if(command == "info") {
std::string &arg = items[1].strVal;
std::transform(arg.begin(), arg.end(), arg.begin(), ::tolower);
if (arg == "replication") {
std::ostringstream oss;
if (is_master) {
oss << "role:master\n";
oss << "master_replid:" << metadata["master_replid"] << "\n";
oss << "master_repl_offset:" << metadata["master_repl_offset"] << "\n";
} else {
oss << "role:slave\n";
}
sendCommand({makeBulk(oss.str())}, client_fd);
}
} else if(command == "replconf") {
std::string &arg = items[1].strVal;
std::transform(arg.begin(), arg.end(), arg.begin(), ::tolower);
std::cout << "command = " << command << " arg = " << arg << std::endl;
if(arg == "getack") {
// *3\r\n$8\r\nREPLCONF\r\n$3\r\nACK\r\n$1\r\n0\r\n
sendCommand({makeArray({makeBulk("REPLCONF"), makeBulk("ACK"), makeBulk(std::to_string(processed_bytes))})}, client_fd);
} else {
sendCommand({makeString("OK")}, client_fd);
}
} else if(command == "psync") {
sendCommand({makeString("FULLRESYNC " + metadata["master_replid"] + " " + metadata["master_repl_offset"])}, client_fd);
const std::string empty_rdb = "\x52\x45\x44\x49\x53\x30\x30\x31\x31\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65"
"\x72\x05\x37\x2e\x32\x2e\x30\xfa\x0a\x72\x65\x64\x69\x73\x2d\x62\x69\x74\x73"
"\xc0\x40\xfa\x05\x63\x74\x69\x6d\x65\xc2\x6d\x08\xbc\x65\xfa\x08\x75\x73\x65"
"\x64\x2d\x6d\x65\x6d\xc2\xb0\xc4\x10\x00\xfa\x08\x61\x6f\x66\x2d\x62\x61\x73"
"\x65\xc0\x00\xff\xf0\x6e\x3b\xfe\xc0\xff\x5a\xa2";
std::string content = "$" + std::to_string(empty_rdb.size()) + "\r\n" + empty_rdb;
if(send(client_fd, content.c_str(), content.size(), 0) != content.size()) {
throw std::runtime_error("send RDB failed");
}
slave_fds.emplace_back(client_fd);
// std::cout << "slave client fd = " << client_fd << std::endl;
}
// std::cout << "processed_bytes = " << processed_bytes << std::endl;
if(client_fd == _master_fd) {
processed_bytes += reply.len;
}
}
}
private:
std::string formatCommand(const std::vector<RedisReply>& items) {
std::ostringstream oss;
for (const auto& r : items) {
switch (r.type) {
case REPLY_STRING:
oss << "+" << r.strVal << "\r\n";
break;
case REPLY_ERROR:
oss << "-" << r.strVal << "\r\n";
break;
case REPLY_INTEGER:
oss << ":" << r.intVal << "\r\n";
break;
case REPLY_BULK:
oss << "$" << r.strVal.size() << "\r\n" << r.strVal << "\r\n";
break;
case REPLY_NIL:
oss << "$-1\r\n";
break;
case REPLY_ARRAY:
oss << "*" << r.elements.size() << "\r\n";
for (const auto& sub : r.elements) {
oss << formatCommand({sub});
}
break;
}
}
return oss.str();
}
int64_t get_millis() {
auto now = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
}
RedisReply makeBulk(const std::string& s) {
RedisReply r;
r.type = REPLY_BULK;
r.strVal = s;
return r;
}
RedisReply makeArray(const std::vector<RedisReply>& elements) {
RedisReply r;
r.type = REPLY_ARRAY;
r.elements = elements;
return r;
}
RedisReply makeString(const std::string &s) {
RedisReply r;
r.type = REPLY_STRING;
r.strVal = s;
return r;
}
};
#endif // REDIS_HH
This is my code, I process the connection between slave and master in constructor function, I think it is true!