I’m stuck on Stage #YG4.
I am using event loop architecture instead of threads and it was all going great till now. I am stuck at command propagation to the replica server and I have tried everything. I am getting the error (screenshot attached). I think it comes when the replica attempts to send back data when it should have been silent.
I have tried starting master and replica servers locally and have observed that my set commands are propagated to the replica server and I am able to run get commands as well. So, locally it is working perfectly. Not sure if the error is maybe because it is not optimized for the test environment.
Is the issue because of me using event loop and not threads or it can be done without threads?
Please help, I have spent way too much time on this.
Here are my logs:
And here’s a snippet of my code:
//Event Loop
while(true){
FD_ZERO(&read_fds);
FD_SET(server_fd,&read_fds);
int max_fd = server_fd;
for(int fd: client_fds){
FD_SET(fd, &read_fds);
if(fd>max_fd) max_fd = fd;
}
int activity = select(max_fd+1, &read_fds, NULL, NULL, NULL);
if(activity < 0){
std::cerr << "select error\n";
break;
}
if(FD_ISSET(server_fd, &read_fds)){
struct sockaddr_in client_addr;
int client_addr_len = sizeof(client_addr);
int new_client_fd = accept(server_fd, (struct sockaddr *)&client_addr, (socklen_t *) &client_addr_len);
if(new_client_fd < 0){
std::cerr << "accept error\n";
break;
}
cout<<"New client connected with FD: "<<new_client_fd<<"\n";
client_fds.push_back(new_client_fd);
}
for(auto it = client_fds.begin(); it != client_fds.end();){
int client_fd = *it;
if (FD_ISSET(client_fd, &read_fds)) {
char buffer[256];
memset(buffer, 0, sizeof(buffer));
int n = read(client_fd, buffer, sizeof(buffer) - 1);
if (n <= 0) {
std::cerr << "Client disconnected: FD = " << client_fd << "\n";
// close(client_fd);
it = client_fds.erase(it); // Remove from the list
continue;
}
std::cout << "Received message from client FD " << client_fd << ": " << buffer;
istringstream stream(string(buffer, n));
vector<string> commands = decodeStream(stream);
for(string command : commands) cout<<"command "<<command<<" ";
cout<<"Helklow sdlkfj "<<endl;
if(strcasecmp(commands[0].c_str(),"ping") == 0)
send(client_fd, "+PONG\r\n", 7, 0);
else if(strcasecmp(commands[0].c_str(),"echo") == 0){
string resp = "+" + commands[1] + "\r\n";
send(client_fd, resp.c_str() , resp.length(), 0);
}
else if(strcasecmp(commands[0].c_str(),"set") == 0){
auto current_time = chrono::steady_clock::now();
if(commands.size()==5 && strcasecmp(commands[3].c_str(),"px") == 0){
auto expiry_time = current_time + chrono::milliseconds(stoi(commands[4]));
db[commands[1]] = make_pair(commands[2],expiry_time);
}
else{
auto max_time = current_time + chrono::milliseconds(10000000);
db[commands[1]] = make_pair(commands[2], max_time);
}
if(client_fd != masterFd)
propagateToReplicas(replicas, string(buffer, n));
if(client_fd != masterFd)
send(client_fd, "+OK\r\n", 5, 0);
}
else if(strcasecmp(commands[0].c_str(),"get") == 0){
auto current_time = chrono::steady_clock::now();
if(db[commands[1]].second < current_time){
string resp = "$-1\r\n";
send(client_fd, resp.c_str(), resp.length(), 0);
}
else {
string resp = "+" + db[commands[1]].first + "\r\n";
cout<<"resp: "<<resp<<endl;
send(client_fd, resp.c_str(), resp.length(), 0);
}
}
else if(strcasecmp(commands[0].c_str(),"config") == 0){
if(strcasecmp(commands[1].c_str(),"get") == 0){
string resp = "";
if(strcasecmp(commands[2].c_str(),"dir") == 0){
resp = "*2\r\n$3\r\ndir\r\n$" + to_string(dir.length()) +"\r\n"+ dir +"\r\n";
}
else if(strcasecmp(commands[2].c_str(),"dbfilename") == 0){
resp = "*2\r\n$10\r\ndbfilename\r\n$" + to_string(dbfilename.length()) +"\r\n"+ dbfilename +"\r\n";
}
cout<<"resp2: "<<resp<<endl;
send(client_fd, resp.c_str(), resp.length(), 0);
}
}
else if(strcasecmp(commands[0].c_str(),"keys") == 0){
vector<string>keys;
if(commands[1] == "*"){
for(auto it = db.begin();it!=db.end();it++){
cout<<"Key: "<<it->first << " value: "<<it->second.first<<endl;
keys.push_back(it->first);
}
string resp="*";
resp+=to_string(keys.size()) + "\r\n";
for(string key:keys){
resp+="$"+to_string(key.length())+"\r\n";
resp+=key+"\r\n";
}
send(client_fd, resp.c_str(), resp.length(), 0);
}
}else if(strcasecmp(commands[0].c_str(),"info") == 0){
string resp="";
string value = "role:"+role + "\nmaster_replid:"+master_replid+"\nmaster_repl_offset:"+master_repl_offset;
resp ="$"+ to_string(value.length()) + "\r\n";
resp+=value;
resp+="\r\n";
send(client_fd, resp.c_str(), resp.length(), 0);
}else if(strcasecmp(commands[0].c_str(),"replconf") == 0){
if(strcasecmp(commands[1].c_str(),"listening-port") == 0){
slavePort = stoi(commands[2]);
}
string resp="+OK\r\n";
send(client_fd, resp.c_str(), resp.length(), 0);
}else if(strcasecmp(commands[0].c_str(),"psync") == 0){
string resp="+FULLRESYNC "+master_replid+" "+master_repl_offset+"\r\n";
send(client_fd, resp.c_str(), resp.length(), 0);
string empty_rdb = "\x52\x45\x44\x49\x53\x30\x30\x31\x31\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x05\x37\x2e\x32\x2e\x30\xfa\x0a\x72\x65\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74\x69\x6d\x65\xc2\x6d\x08\xbc\x65\xfa\x08\x75\x73\x65\x64\x2d\x6d\x65\x6d\xc2\xb0\xc4\x10\x00\xfa\x08\x61\x6f\x66\x2d\x62\x61\x73\x65\xc0\x00\xff\xf0\x6e\x3b\xfe\xc0\xff\x5a\xa2";
resp = "$"+to_string(empty_rdb.length())+"\r\n"+empty_rdb;
cout<<"replica port "<<slavePort<<endl;
replicas.push_back(make_pair(slaveHost, slavePort));
send(client_fd, resp.c_str(), resp.length(), 0);
}
}
++it;
}
}
for (int fd : client_fds) {
close(fd);
}
return 0;