I’m stuck on Stage #BS1
I’ve tried parsing the input, and the test case failing.
[tester::#BS1] ./your_program.sh
[tester::#BS1] $ redis-cli xadd "grape" "0-1" "temperature 21"
[tester::#BS1] $ redis-cli xread block 'Ϩ' streams "grape 0-1"
[tester::#BS1] $ redis-cli xadd "grape" "0-2" "temperature 21"
I’m getting error:
[tester::#BS1] timed out, test exceeded 10 seconds
[tester::#BS1] Test failed
[tester::#BS1] Terminating program
[tester::#BS1] Program terminated successfully
The thing I noticed and I’m think that this might be the problem.
[tester::#BS1] $ redis-cli xread block 'Ϩ' streams "grape 0-1"
[your_program] ****Printing encoded input commands : *****
[your_program] *6
[your_program] $5
[your_program] xread
[your_program] $5
[your_program] block
[your_program] $4
[your_program] 1000
[your_program] $7
[your_program] streams
[your_program] $5
[your_program] grape
[your_program] $3
[your_program] 0-1
The blockTime is not an ascii character, and I’m getting 1000 as block time by default, is this the reason…?
Here are my logs:
[tester::#BS1] Running tests for Stage #BS1 (Streams - Blocking reads)
[tester::#BS1] $ ./your_program.sh
[tester::#BS1] $ redis-cli xadd "grape" "0-1" "temperature 21"
[your_program] Logs from your program will appear here!
[your_program] Waiting for a client to connect...
[your_program] clientId : 1
[your_program] clientId : 2
[your_program] ****Printing decoded commands : *****
[your_program] *5
[your_program] $4
[your_program] xadd
[your_program] $5
[your_program] grape
[your_program] $3
[your_program] 0-1
[your_program] $11
[your_program] temperature
[your_program] $2
[your_program] 21
[your_program]
[your_program] ****Printing decoded commands : *****
[your_program] xadd
[your_program] grape
[your_program] 0-1
[your_program] temperature
[your_program] 21
[your_program] *****Printing commands : *****
[your_program] decodedCommands xadd
[your_program] 0x7f62c8000dc0
[your_program] *****Printing response : *****
[your_program] $0-1
[your_program] *****Printing encodedResponse : *****
[your_program] $3\r\n0-1\r\n
[tester::#BS1] Received response: ""0-1""
[tester::#BS1] $ redis-cli xread block 'Ϩ' streams "grape 0-1"
[your_program] ****Printing decoded commands : *****
[your_program] *6
[your_program] $5
[your_program] xread
[your_program] $5
[your_program] block
[your_program] $4
[your_program] 1000
[your_program] $7
[your_program] streams
[your_program] $5
[your_program] grape
[your_program] $3
[your_program] 0-1
[your_program]
[your_program] ****Printing decoded commands : *****
[your_program] xread
[your_program] block
[your_program] 1000
[your_program] streams
[your_program] grape
[your_program] 0-1
[your_program] *****Printing commands : *****
[your_program] decodedCommands xread
[your_program] 0x7f62c8000b70
[your_program] *****Printing response : *****
[your_program] localInput$1000$see 4******
[your_program] $
[your_program] *****Printing encodedResponse : *****
[your_program] $-1\r\n
[tester::#BS1] $ redis-cli xadd "grape" "0-2" "temperature 21"
[your_program] ****Printing decoded commands : *****
[your_program] *5
[your_program] $4
[your_program] xadd
[your_program] $5
[your_program] grape
[your_program] $3
[your_program] 0-2
[your_program] $11
[your_program] temperature
[your_program] $2
[your_program] 21
[your_program]
[your_program] ****Printing decoded commands : *****
[your_program] xadd
[your_program] grape
[your_program] 0-2
[your_program] temperature
[your_program] 21
[your_program] *****Printing commands : *****
[tester::#BS1] Received response: ""0-2""
[your_program] decodedCommands xadd
[your_program] 0x7f62c8000d00
[your_program] *****Printing response : *****
[your_program] $0-2
[your_program] *****Printing encodedResponse : *****
[your_program] $3\r\n0-2\r\n
[tester::#BS1] timed out, test exceeded 10 seconds
[tester::#BS1] Test failed
[tester::#BS1] Terminating program
[tester::#BS1] Program terminated successfully
And here’s a snippet of my code:
// Server.cpp
#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sstream>
#include <vector>
#include "Parser/Parser.hpp"
#include "Store/KeyValueStore.hpp"
#include "CommandHandler/CommandFactory.hpp"
#define DEFAULT_BUFFER_DELIMITER "\r\n"
#define DEFAULT_BUFFER_SIZE 128
std::string getRawString(const std::string input) {
std::ostringstream oss;
for (char ch : input) {
switch (ch) {
case '\r':
oss << "\\r";
break;
case '\n':
oss << "\\n";
break;
default:
oss << ch;
}
}
return oss.str();
}
void* commandHandler(void* arg){
int fd = *((int*)arg);
char buffer[DEFAULT_BUFFER_SIZE];
bzero(&buffer,sizeof(buffer));
int byteReceived ;
while(1){
memset(&buffer,'\0',sizeof(buffer));
byteReceived = recv(fd, buffer, sizeof(buffer), 0);
if(byteReceived==0){
std::cout << "Info: Connection closed at the client end " << std::endl;
break;
}
if(byteReceived==-1){
std::cout << "error: Cannot read from buffer " << std::endl;
break;
}
Parser parser;
CommandFactory factory;
std::cout << "****Printing decoded commands : *****" << std::endl;
for (int i = 0; i < byteReceived; ++i) {
std::cout << buffer[i];
}
std::cout << std::endl;
std::vector<std::string> decodedCommands = parser.decode(std::string(buffer, byteReceived));
std::cout << "****Printing decoded commands : *****" << std::endl;
for(const auto& command_str: decodedCommands){
std::cout << command_str << std::endl;
}
if(decodedCommands.size()<1){
std::cout << "Invalid command to server" << std::endl;
}
std::cout << "*****Printing commands : *****" << std::endl;
std::cout << "decodedCommands " << decodedCommands[0] << std::endl;
ICommand* command = factory.getCommand(decodedCommands[0]);
std::cout << command << std::endl;
std::cout << "*****Printing response : *****" << std::endl;
std::vector<std::string> response = command->execute(decodedCommands);
for(const auto& response_str: response){
std::cout << response_str << std::endl;
}
std::cout << "*****Printing encodedResponse : *****" << std::endl;
std::string encodedResponse = parser.encode(response);
std::cout << getRawString(encodedResponse) << std::endl;
send(fd, encodedResponse.c_str(), encodedResponse.size(), 0);
}
return nullptr;
}
int main(int argc, char **argv) {
// Flush after every std::cout / std::cerr
std::cout << std::unitbuf;
std::cerr << std::unitbuf;
// You can use print statements as follows for debugging, they'll be visible when running tests.
std::cout << "Logs from your program will appear here!\n";
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd < 0) {
std::cerr << "Failed to create server socket\n";
return 1;
}
// // setting SO_REUSEADDR ensures that we don't run into 'Address already in use' errors
int reuse = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {
std::cerr << "setsockopt failed\n";
return 1;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(6379);
if (bind(server_fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) != 0) {
std::cerr << "Failed to bind to port 6379\n";
return 1;
}
int connection_backlog = 5;
if (listen(server_fd, connection_backlog) != 0) {
std::cerr << "listen failed\n";
return 1;
}
struct sockaddr_in client_addr;
int client_addr_len = sizeof(client_addr);
std::cout << "Waiting for a client to connect...\n";
int client_fd;
int clientId = 1;
while (true) {
std::cout << "clientId : " << clientId << std::endl;
clientId += 1;
if ((client_fd = accept(server_fd, (struct sockaddr*)&client_addr, (socklen_t*)&client_addr_len)) < 0) {
std::cerr << "accept\n";
continue;
}
// Creates a detached thread for each client
pthread_t thread_id;
int* new_sock = (int*)malloc(sizeof(int));
*new_sock = client_fd;
pthread_create(&thread_id, NULL, commandHandler, (void*)new_sock);
}
return 0;
}
XreadCommad.cpp
#include "XreadCommand.hpp"
#include <iostream>
#include <sstream>
#include "../Store/KeyValueStore.hpp"
#include "../Utils/utils.hpp"
#include <string>
#include <vector>
#include <cctype>
#include <typeinfo>
#include <iomanip>
XreadCommand::XreadCommand() {}
std::vector<std::string> XreadCommand::execute(const std::vector<std::string>& args) {
// args = ["xread", "block", "Ϩ", "streams", "grape", "0-1"]
if (args.size() >= 6 && args.size()%2==0 ) {
auto pos = std::find(args.begin(), args.end(), "block");
std::string blockMilliSeconds = "-1";
int streamsStartsAt = 2;
bool block = false;
std::string localCopy = args[2];
if(pos!=args.end()){
streamsStartsAt = 4;
std::string localInput;
for (const auto& c : localCopy) {
if (isdigit(static_cast<unsigned char>(c))) {
localInput += c;
}
}
std::string blockMilliSeconds = args[2];
block = true;
}
std::vector<std::string> streamKeys = {};
std::vector<std::string> ids = {};
int lenOfStreamIDs = args.size()-streamsStartsAt;
for(int i = 0; i < lenOfStreamIDs;i++){
if(i<(lenOfStreamIDs/2)){
streamKeys.push_back(args[streamsStartsAt+i]);
}else{
ids.push_back(args[streamsStartsAt+i]);
}
}
std::vector<std::pair<Stream, std::string>> streams = {};
for(int i=0; i<streamKeys.size();i++){
std::string streamKey = streamKeys[i];
std::string id = ids[i];
// std::cout << "Stream is " << streamKey << std::endl;
// std::cout << "id is " << id << std::endl;
Stream stream = KeyValueStore::getInstance().xread(streamKey, id, block, blockMilliSeconds);
streams.push_back({stream, streamKey});
}
return formatForRESP(streams);
}
return {"-ERR wrong number of arguments"};
}
std::vector<std::string> XreadCommand::formatForRESP(const std::vector<std::pair<Stream, std::string>>& streams) {
if(streams.empty() || streams[0].first.empty()){
return {"$"};
}
std::vector<std::string> formattedEntries;
formattedEntries.push_back("*" + std::to_string(streams.size()));
for(const auto& streamPair : streams){
Stream stream = streamPair.first;
std::string streamKey = streamPair.second;
formattedEntries.push_back("*" + std::to_string(2));
formattedEntries.push_back("$" + streamKey);
formattedEntries.push_back("*" + std::to_string(stream.size())); // Size of the array
for (const auto& entry : stream) { formattedEntries.push_back("*2"); // Each entry has 2 elements (ID and array of fields)
formattedEntries.push_back("$" + entry.id); // Stream entry ID
formattedEntries.push_back("*" + std::to_string(entry.fields.size() * 2)); // Number of fields * 2 (field name and value)
for (const auto& field : entry.fields) {
formattedEntries.push_back("$" + field.first); // Field name
formattedEntries.push_back("$" + field.second); // Field value
}
}
}
return formattedEntries;
}