diff --git a/examples/kv.cpp b/examples/kv.cpp index 581ff7f..a14e8a6 100644 --- a/examples/kv.cpp +++ b/examples/kv.cpp @@ -74,7 +74,7 @@ TMessageHolder TKv::Prepare(TMessageHolder command) } template -NNet::TFuture Client(TPoller& poller, TSocket socket) { +NNet::TFuture Client(TPoller& poller, TSocket socket, uint32_t flags) { using TFileHandle = typename TPoller::TFileHandle; TFileHandle input{0, poller}; // stdin co_await socket.Connect(); @@ -112,11 +112,14 @@ NNet::TFuture Client(TPoller& poller, TSocket socket) { auto key = strtok(nullptr, sep); auto size = strlen(key); auto mes = NewHoldedMessage(sizeof(TReadKv) + size); + mes->Flags = flags; mes->KeySize = size; memcpy(mes->Data, key, size); req = mes; } else if (!strcmp(prefix, "list")) { - req = NewHoldedMessage(sizeof(TReadKv)); + auto mes = NewHoldedMessage(sizeof(TReadKv)); + mes->Flags = flags; + req = mes; } else if (!strcmp(prefix, "del")) { auto key = strtok(nullptr, sep); auto size = strlen(key); @@ -145,7 +148,7 @@ NNet::TFuture Client(TPoller& poller, TSocket socket) { } void usage(const char* prog) { - std::cerr << prog << "--client|--server --id myid --node ip:port:id [--node ip:port:id ...] [--persist]" << "\n"; + std::cerr << prog << "--client|--server --id myid --node ip:port:id [--node ip:port:id ...] [--persist] [--stale] [--consistent]" << "\n"; exit(0); } @@ -157,6 +160,7 @@ int main(int argc, char** argv) { uint32_t id = 0; bool server = false; bool persist = false; + uint32_t flags = 0; for (int i = 1; i < argc; i++) { if (!strcmp(argv[i], "--server")) { server = true; @@ -167,6 +171,10 @@ int main(int argc, char** argv) { id = atoi(argv[++i]); } else if (!strcmp(argv[i], "--persist")) { persist = true; + } else if (!strcmp(argv[i], "--stale")) { + flags |= TCommandRequest::EStale; + } else if (!strcmp(argv[i], "--consistent")) { + flags |= TCommandRequest::EConsistent; } else if (!strcmp(argv[i], "--help")) { usage(argv[0]); } @@ -212,7 +220,7 @@ int main(int argc, char** argv) { NNet::TAddress addr{hosts[0].Address, hosts[0].Port}; NNet::TSocket socket(std::move(addr), loop.Poller()); - auto h = Client(loop.Poller(), std::move(socket)); + auto h = Client(loop.Poller(), std::move(socket), flags); while (!h.done()) { loop.Step(); } diff --git a/src/server.cpp b/src/server.cpp index c79307b..d1c16f9 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -122,7 +122,7 @@ NNet::TVoidTask TRaftServer::InboundConnection(TSocket socket) { Nodes.insert(client); while (true) { auto mes = co_await TMessageReader(client->Sock()).Read(); - // client request + // client request if (auto maybeCommandRequest = mes.template Maybe()) { RequestProcessor->OnCommandRequest(std::move(maybeCommandRequest.Cast()), client); } else if (auto maybeCommandResponse = mes.template Maybe()) { @@ -170,6 +170,9 @@ NNet::TVoidTask TRaftServer::OutboundServe(std::shared_ptrIsConnected()) { + throw std::runtime_error("Not connected"); + } auto mes = co_await TMessageReader(node->Sock()).Read(); if (auto maybeCommandResponse = mes.template Maybe()) { RequestProcessor->OnCommandResponse(std::move(maybeCommandResponse.Cast()));