Broken pipeline issue #YG4

I’m stuck on Stage #YG4.

I am using event loop architecture instead of threads and it was all going great till now. I am stuck at command propagation to the replica server and I have tried everything. I am getting the error (screenshot attached). I think it comes when the replica attempts to send back data when it should have been silent.
I have tried starting master and replica servers locally and have observed that my set commands are propagated to the replica server and I am able to run get commands as well. So, locally it is working perfectly. Not sure if the error is maybe because it is not optimized for the test environment.
Is the issue because of me using event loop and not threads or it can be done without threads?
Please help, I have spent way too much time on this.

Here are my logs:


And here’s a snippet of my code:

//Event Loop
while(true){
    FD_ZERO(&read_fds);
    FD_SET(server_fd,&read_fds);
    int max_fd = server_fd;

    for(int fd: client_fds){
      FD_SET(fd, &read_fds);
      if(fd>max_fd) max_fd = fd;
    }

    int activity = select(max_fd+1, &read_fds, NULL, NULL, NULL);
    if(activity < 0){
      std::cerr << "select error\n";
      break;
    }

    if(FD_ISSET(server_fd, &read_fds)){
      struct sockaddr_in client_addr;
      int client_addr_len = sizeof(client_addr);
      int new_client_fd = accept(server_fd, (struct sockaddr *)&client_addr, (socklen_t *) &client_addr_len);
      if(new_client_fd < 0){
        std::cerr << "accept error\n";
        break;
      }
      cout<<"New client connected with FD: "<<new_client_fd<<"\n";
      client_fds.push_back(new_client_fd); 
    }

    for(auto it = client_fds.begin(); it != client_fds.end();){
      int client_fd = *it;
      if (FD_ISSET(client_fd, &read_fds)) {
        char buffer[256];
        memset(buffer, 0, sizeof(buffer));
        int n = read(client_fd, buffer, sizeof(buffer) - 1);
        if (n <= 0) {
            std::cerr << "Client disconnected: FD = " << client_fd << "\n";
            // close(client_fd);
            it = client_fds.erase(it); // Remove from the list
            continue;
        }
        std::cout << "Received message from client FD " << client_fd << ": " << buffer;
        istringstream stream(string(buffer, n));
        vector<string> commands = decodeStream(stream);
        for(string command : commands) cout<<"command "<<command<<" ";
        cout<<"Helklow sdlkfj "<<endl;
        if(strcasecmp(commands[0].c_str(),"ping") == 0)
          send(client_fd, "+PONG\r\n", 7, 0);
        else if(strcasecmp(commands[0].c_str(),"echo") == 0){
          string resp =  "+" + commands[1] + "\r\n";
          send(client_fd, resp.c_str() , resp.length(), 0);
        }
        else if(strcasecmp(commands[0].c_str(),"set") == 0){
          auto current_time = chrono::steady_clock::now();
          if(commands.size()==5 && strcasecmp(commands[3].c_str(),"px") == 0){
            auto expiry_time = current_time + chrono::milliseconds(stoi(commands[4]));
            db[commands[1]] = make_pair(commands[2],expiry_time);
          }
          else{
            auto max_time = current_time + chrono::milliseconds(10000000);
            db[commands[1]] = make_pair(commands[2], max_time);
          } 
          
          if(client_fd != masterFd)
            propagateToReplicas(replicas, string(buffer, n));
          if(client_fd != masterFd)
            send(client_fd, "+OK\r\n", 5, 0);
        }
        else if(strcasecmp(commands[0].c_str(),"get") == 0){
          auto current_time = chrono::steady_clock::now();
          if(db[commands[1]].second < current_time){
            string resp = "$-1\r\n";
            send(client_fd, resp.c_str(), resp.length(), 0);
          }
          else {
            string resp = "+" + db[commands[1]].first + "\r\n";
            cout<<"resp: "<<resp<<endl;
            send(client_fd, resp.c_str(), resp.length(), 0);
          }
        }
        else if(strcasecmp(commands[0].c_str(),"config") == 0){
          if(strcasecmp(commands[1].c_str(),"get") == 0){
            string resp = "";
            if(strcasecmp(commands[2].c_str(),"dir") == 0){
              resp = "*2\r\n$3\r\ndir\r\n$" + to_string(dir.length()) +"\r\n"+ dir +"\r\n";
            }
            else if(strcasecmp(commands[2].c_str(),"dbfilename") == 0){
              resp = "*2\r\n$10\r\ndbfilename\r\n$" + to_string(dbfilename.length()) +"\r\n"+ dbfilename +"\r\n";
            }
            cout<<"resp2: "<<resp<<endl;
            send(client_fd, resp.c_str(), resp.length(), 0);
          }
        }
        else if(strcasecmp(commands[0].c_str(),"keys") == 0){
          vector<string>keys;
          if(commands[1] == "*"){
            for(auto it = db.begin();it!=db.end();it++){
              cout<<"Key: "<<it->first << " value: "<<it->second.first<<endl;
              keys.push_back(it->first);
            }
            string resp="*";
            resp+=to_string(keys.size()) + "\r\n";
            for(string key:keys){
              resp+="$"+to_string(key.length())+"\r\n";
              resp+=key+"\r\n";
            }
            send(client_fd, resp.c_str(), resp.length(), 0);
          }
        }else if(strcasecmp(commands[0].c_str(),"info") == 0){
          string resp="";
          string value = "role:"+role + "\nmaster_replid:"+master_replid+"\nmaster_repl_offset:"+master_repl_offset;
          resp ="$"+ to_string(value.length()) + "\r\n";
          resp+=value;
          resp+="\r\n"; 
          send(client_fd, resp.c_str(), resp.length(), 0);
        }else if(strcasecmp(commands[0].c_str(),"replconf") == 0){
          if(strcasecmp(commands[1].c_str(),"listening-port") == 0){
            slavePort = stoi(commands[2]);
          }
          string resp="+OK\r\n";
          send(client_fd, resp.c_str(), resp.length(), 0);
        }else if(strcasecmp(commands[0].c_str(),"psync") == 0){
          string resp="+FULLRESYNC "+master_replid+" "+master_repl_offset+"\r\n";
          send(client_fd, resp.c_str(), resp.length(), 0);
          string empty_rdb = "\x52\x45\x44\x49\x53\x30\x30\x31\x31\xfa\x09\x72\x65\x64\x69\x73\x2d\x76\x65\x72\x05\x37\x2e\x32\x2e\x30\xfa\x0a\x72\x65\x64\x69\x73\x2d\x62\x69\x74\x73\xc0\x40\xfa\x05\x63\x74\x69\x6d\x65\xc2\x6d\x08\xbc\x65\xfa\x08\x75\x73\x65\x64\x2d\x6d\x65\x6d\xc2\xb0\xc4\x10\x00\xfa\x08\x61\x6f\x66\x2d\x62\x61\x73\x65\xc0\x00\xff\xf0\x6e\x3b\xfe\xc0\xff\x5a\xa2";
          resp = "$"+to_string(empty_rdb.length())+"\r\n"+empty_rdb;
          cout<<"replica port "<<slavePort<<endl;
          replicas.push_back(make_pair(slaveHost, slavePort));
          send(client_fd, resp.c_str(), resp.length(), 0);
        }
      }
      ++it;
    }
  }

  for (int fd : client_fds) {
    close(fd);
  }
  return 0;

Hi @tarun0110, could you upload your code to GitHub and share the link? It will be much easier to debug if I can run it directly.

Hi @andy1li , thanks for your reply. Here is my github repo link:

Thank you for sharing the link! I’ll take a look and get back to you by the end of next week, as I’m currently catching up on posts from this week.

1 Like

Hi @andy1li , just wanted to check in if you could find time to check this out.

Is there anyone who has tried to solve this challenge with event loop? if yes, can you share your code link or try to see what’s the issue with mine when I got #YG4. My codebase link is there in the top comments. Thanks in advance.

Hi @tarun0110, I tried running your code against the previous stages, but it’s no longer passing #ZN8 (Replication - Single-replica propagation).

Suggestions:

  1. Use our CLI to test against previous stages by running:
codecrafters test --previous
  1. Focus on fixing the early stages first, as later stages depend on them.