Skip to content

Commit

Permalink
New coroutine library and minor tidups
Browse files Browse the repository at this point in the history
  • Loading branch information
dallison committed Aug 3, 2023
1 parent 6206dee commit 805f872
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 23 deletions.
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ http_archive(

http_archive(
name = "coroutines",
urls = ["https://github.com/dallison/co/archive/refs/tags/1.3.1.tar.gz"],
strip_prefix = "co-1.3.1",
sha256 = "6661713fdea9758f6dcc12ce42cad3f069a855f9757a3c027b0e3122da067c04"
urls = ["https://github.com/dallison/co/archive/refs/tags/1.3.2.tar.gz"],
strip_prefix = "co-1.3.2",
sha256 = "2a005dc6e86e2e8ac732605bf7ec464b39cb6ee64d3bbc40e3205f8348e790d1"
)

# For local debugging of co coroutine library.
Expand Down
14 changes: 4 additions & 10 deletions server/client_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,32 @@ void ClientHandler::Run(co::Coroutine *c) {
absl::StatusOr<ssize_t> n =
socket_.ReceiveMessage(buffer_, sizeof(buffer_), c);
if (!n.ok()) {
server_->CloseHandler(this);
return;
}
subspace::Request request;
if (request.ParseFromArray(buffer_, *n)) {
std::vector<toolbelt::FileDescriptor> fds;
subspace::Response response;
if (absl::Status s = HandleMessage(request, response, fds); !s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str());
server_->CloseHandler(this);
server_->logger_.Log(toolbelt::LogLevel::kError, "%s\n", s.ToString().c_str());
return;
}

if (!response.SerializeToArray(sendbuf, kSendBufLen)) {
fprintf(stderr, "Failed to serialize response\n");
server_->CloseHandler(this);
server_->logger_.Log(toolbelt::LogLevel::kError, "Failed to serialize response\n");
return;
}
size_t msglen = response.ByteSizeLong();
absl::StatusOr<ssize_t> n = socket_.SendMessage(sendbuf, msglen, c);
if (!n.ok()) {
server_->CloseHandler(this);
return;
}
if (absl::Status status = socket_.SendFds(fds, c); !status.ok()) {
fprintf(stderr, "%s\n", status.ToString().c_str());
server_->CloseHandler(this);
server_->logger_.Log(toolbelt::LogLevel::kError, "%s\n", status.ToString().c_str());
return;
}
} else {
fprintf(stderr, "Failed to parse message\n");
server_->CloseHandler(this);
server_->logger_.Log(toolbelt::LogLevel::kError, "Failed to parse message\n");
return;
}
}
Expand Down
24 changes: 14 additions & 10 deletions server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void Server::CloseHandler(ClientHandler *handler) {
// This coroutine listens for incoming client connections on the given
// UDS and spawns a handler coroutine to handle the communication with
// the client.
void Server::ListenerCoroutine(toolbelt::UnixSocket& listen_socket,
void Server::ListenerCoroutine(toolbelt::UnixSocket &listen_socket,
co::Coroutine *c) {
for (;;) {
absl::Status status = HandleIncomingConnection(listen_socket, c);
Expand Down Expand Up @@ -217,12 +217,12 @@ absl::Status Server::Run() {
[this](co::Coroutine *c) { coroutines_.erase(c); });

// Start the listener coroutine.
coroutines_.insert(std::make_unique<co::Coroutine>(
co_scheduler_,
[this, &listen_socket](co::Coroutine *c) {
ListenerCoroutine(listen_socket, c);
},
"Listener UDS"));
coroutines_.insert(
std::make_unique<co::Coroutine>(co_scheduler_,
[this, &listen_socket](co::Coroutine *c) {
ListenerCoroutine(listen_socket, c);
},
"Listener UDS"));

// Start the channel directory coroutine.
coroutines_.insert(std::make_unique<co::Coroutine>(
Expand Down Expand Up @@ -270,9 +270,13 @@ Server::HandleIncomingConnection(toolbelt::UnixSocket &listen_socket,
std::make_unique<ClientHandler>(this, std::move(*s)));
ClientHandler *handler_ptr = client_handlers_.back().get();

coroutines_.insert(std::make_unique<co::Coroutine>(
co_scheduler_, [handler_ptr](co::Coroutine *c) { handler_ptr->Run(c); },
"Client handler"));
coroutines_.insert(
std::make_unique<co::Coroutine>(co_scheduler_,
[this, handler_ptr](co::Coroutine *c) {
handler_ptr->Run(c);
CloseHandler(handler_ptr);
},
"Client handler"));

return absl::OkStatus();
}
Expand Down

0 comments on commit 805f872

Please sign in to comment.