Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Dec 27, 2024
1 parent b9c73d2 commit ad257c3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
16 changes: 12 additions & 4 deletions examples/kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TMessageHolder<TLogEntry> TKv::Prepare(TMessageHolder<TCommandRequest> command)
}

template<typename TPoller, typename TSocket>
NNet::TFuture<void> Client(TPoller& poller, TSocket socket) {
NNet::TFuture<void> Client(TPoller& poller, TSocket socket, uint32_t flags) {
using TFileHandle = typename TPoller::TFileHandle;
TFileHandle input{0, poller}; // stdin
co_await socket.Connect();
Expand Down Expand Up @@ -112,11 +112,14 @@ NNet::TFuture<void> Client(TPoller& poller, TSocket socket) {
auto key = strtok(nullptr, sep);
auto size = strlen(key);
auto mes = NewHoldedMessage<TReadKv>(sizeof(TReadKv) + size);
mes->Flags = flags;
mes->KeySize = size;
memcpy(mes->Data, key, size);
req = mes;
} else if (!strcmp(prefix, "list")) {
req = NewHoldedMessage<TReadKv>(sizeof(TReadKv));
auto mes = NewHoldedMessage<TReadKv>(sizeof(TReadKv));
mes->Flags = flags;
req = mes;
} else if (!strcmp(prefix, "del")) {
auto key = strtok(nullptr, sep);
auto size = strlen(key);
Expand Down Expand Up @@ -145,7 +148,7 @@ NNet::TFuture<void> Client(TPoller& poller, TSocket socket) {
}

void usage(const char* prog) {
std::cerr << prog << "--client|--server --id myid --node ip:port:id [--node ip:port:id ...] [--persist]" << "\n";
std::cerr << prog << "--client|--server --id myid --node ip:port:id [--node ip:port:id ...] [--persist] [--stale] [--consistent]" << "\n";
exit(0);
}

Expand All @@ -157,6 +160,7 @@ int main(int argc, char** argv) {
uint32_t id = 0;
bool server = false;
bool persist = false;
uint32_t flags = 0;
for (int i = 1; i < argc; i++) {
if (!strcmp(argv[i], "--server")) {
server = true;
Expand All @@ -167,6 +171,10 @@ int main(int argc, char** argv) {
id = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--persist")) {
persist = true;
} else if (!strcmp(argv[i], "--stale")) {
flags |= TCommandRequest::EStale;
} else if (!strcmp(argv[i], "--consistent")) {
flags |= TCommandRequest::EConsistent;
} else if (!strcmp(argv[i], "--help")) {
usage(argv[0]);
}
Expand Down Expand Up @@ -212,7 +220,7 @@ int main(int argc, char** argv) {
NNet::TAddress addr{hosts[0].Address, hosts[0].Port};
NNet::TSocket socket(std::move(addr), loop.Poller());

auto h = Client(loop.Poller(), std::move(socket));
auto h = Client(loop.Poller(), std::move(socket), flags);
while (!h.done()) {
loop.Step();
}
Expand Down
5 changes: 4 additions & 1 deletion src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ NNet::TVoidTask TRaftServer<TSocket>::InboundConnection(TSocket socket) {
Nodes.insert(client);
while (true) {
auto mes = co_await TMessageReader(client->Sock()).Read();
// client request
// client request
if (auto maybeCommandRequest = mes.template Maybe<TCommandRequest>()) {
RequestProcessor->OnCommandRequest(std::move(maybeCommandRequest.Cast()), client);
} else if (auto maybeCommandResponse = mes.template Maybe<TCommandResponse>()) {
Expand Down Expand Up @@ -170,6 +170,9 @@ NNet::TVoidTask TRaftServer<TSocket>::OutboundServe(std::shared_ptr<TNode<TSocke
while (true) {
bool error = false;
try {
if (!node->IsConnected()) {
throw std::runtime_error("Not connected");
}
auto mes = co_await TMessageReader(node->Sock()).Read();
if (auto maybeCommandResponse = mes.template Maybe<TCommandResponse>()) {
RequestProcessor->OnCommandResponse(std::move(maybeCommandResponse.Cast()));
Expand Down

0 comments on commit ad257c3

Please sign in to comment.