Skip to content
This repository has been archived by the owner on Sep 21, 2020. It is now read-only.

Commit

Permalink
Dispatcher: Implement transmit combining.
Browse files Browse the repository at this point in the history
This handles most of the major cases, but not a loop of delete/assign, which
should be uncommon anyway.
  • Loading branch information
PeterJohnson committed Aug 3, 2015
1 parent 53a0531 commit a86f65d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 4 deletions.
1 change: 0 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

## Functionality

* Dispatcher: Combine multiple updates
* Automatic persistent saves
* RPC

Expand Down
98 changes: 95 additions & 3 deletions src/Dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ void DispatcherBase::DispatchThreadMain() {
connections.push_back(ConnectionRef());
connections.back().net = conn.net.get();
connections.back().outgoing.swap(conn.outgoing);
conn.last_update.resize(0); // clear "previous" updates
}
if (!m_server && conn.net->state() == NetworkConnection::kDead)
reconnect = true;
Expand All @@ -192,8 +193,6 @@ void DispatcherBase::DispatchThreadMain() {
}
}

// scan outgoing messages to remove unnecessary updates

// send outgoing messages
for (auto& conn : connections) {
if (!conn.outgoing.empty())
Expand All @@ -202,6 +201,99 @@ void DispatcherBase::DispatchThreadMain() {
}
}

void DispatcherBase::Connection::QueueOutgoing(std::shared_ptr<Message> msg) {
// Merge with previous. One case we don't combine: delete/assign loop.
switch (msg->type()) {
case Message::kEntryAssign:
case Message::kEntryUpdate: {
// don't do this for unassigned id's
unsigned int id = msg->id();
if (id == 0xffff) {
outgoing.push_back(msg);
break;
}
if (id < last_update.size() && last_update[id].first != 0) {
// overwrite the previous one for this id
auto& oldmsg = outgoing[last_update[id].first - 1];
if (oldmsg && oldmsg->Is(Message::kEntryAssign) &&
msg->Is(Message::kEntryUpdate)) {
// need to update assignment with new seq_num and value
oldmsg = Message::EntryAssign(oldmsg->str(), id, msg->seq_num_uid(),
msg->value(), oldmsg->flags());
} else
oldmsg = msg; // easy update
} else {
// new, but remember it
std::size_t pos = outgoing.size();
outgoing.push_back(msg);
if (id >= last_update.size()) last_update.resize(id + 1);
last_update[id].first = pos + 1;
}
break;
}
case Message::kEntryDelete: {
// don't do this for unassigned id's
unsigned int id = msg->id();
if (id == 0xffff) {
outgoing.push_back(msg);
break;
}

// clear previous updates
if (id < last_update.size()) {
if (last_update[id].first != 0) {
outgoing[last_update[id].first - 1].reset();
last_update[id].first = 0;
}
if (last_update[id].second != 0) {
outgoing[last_update[id].second - 1].reset();
last_update[id].second = 0;
}
}

// add deletion
outgoing.push_back(msg);
break;
}
case Message::kFlagsUpdate: {
// don't do this for unassigned id's
unsigned int id = msg->id();
if (id == 0xffff) {
outgoing.push_back(msg);
break;
}
if (id < last_update.size() && last_update[id].second != 0) {
// overwrite the previous one for this id
outgoing[last_update[id].second - 1] = msg;
} else {
// new, but remember it
std::size_t pos = outgoing.size();
outgoing.push_back(msg);
if (id >= last_update.size()) last_update.resize(id + 1);
last_update[id].second = pos + 1;
}
break;
}
case Message::kClearEntries: {
// knock out all previous assigns/updates!
for (auto& i : outgoing) {
if (!i) continue;
auto t = i->type();
if (t == Message::kEntryAssign || t == Message::kEntryUpdate ||
t == Message::kFlagsUpdate || t == Message::kEntryDelete ||
t == Message::kClearEntries)
i.reset();
}
last_update.resize(0);
outgoing.push_back(msg);
break;
}
default:
outgoing.push_back(msg);
break;
}
}

void DispatcherBase::QueueOutgoing(std::shared_ptr<Message> msg,
NetworkConnection* only,
NetworkConnection* except) {
Expand All @@ -212,7 +304,7 @@ void DispatcherBase::QueueOutgoing(std::shared_ptr<Message> msg,
auto state = conn.net->state();
if (state != NetworkConnection::kSynchronized &&
state != NetworkConnection::kActive) continue;
conn.outgoing.push_back(msg);
conn.QueueOutgoing(msg);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/Dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ class DispatcherBase {
Connection() = default;
explicit Connection(std::unique_ptr<NetworkConnection> net_)
: net(std::move(net_)) {}
void QueueOutgoing(std::shared_ptr<Message> msg);

std::unique_ptr<NetworkConnection> net;
NetworkConnection::Outgoing outgoing;
std::vector<std::pair<std::size_t, std::size_t>> last_update;
};
std::vector<Connection> m_connections;
std::string m_identity;
Expand Down

0 comments on commit a86f65d

Please sign in to comment.