From a86f65db1ed92561f877539722923dd60d4cd295 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Sun, 2 Aug 2015 23:21:23 -0700 Subject: [PATCH] Dispatcher: Implement transmit combining. This handles most of the major cases, but not a loop of delete/assign, which should be uncommon anyway. --- TODO.md | 1 - src/Dispatcher.cpp | 98 ++++++++++++++++++++++++++++++++++++++++++++-- src/Dispatcher.h | 3 ++ 3 files changed, 98 insertions(+), 4 deletions(-) diff --git a/TODO.md b/TODO.md index b41da6d..b1621c9 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,6 @@ ## Functionality -* Dispatcher: Combine multiple updates * Automatic persistent saves * RPC diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 104b199..151ffa3 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -181,6 +181,7 @@ void DispatcherBase::DispatchThreadMain() { connections.push_back(ConnectionRef()); connections.back().net = conn.net.get(); connections.back().outgoing.swap(conn.outgoing); + conn.last_update.resize(0); // clear "previous" updates } if (!m_server && conn.net->state() == NetworkConnection::kDead) reconnect = true; @@ -192,8 +193,6 @@ void DispatcherBase::DispatchThreadMain() { } } - // scan outgoing messages to remove unnecessary updates - // send outgoing messages for (auto& conn : connections) { if (!conn.outgoing.empty()) @@ -202,6 +201,99 @@ void DispatcherBase::DispatchThreadMain() { } } +void DispatcherBase::Connection::QueueOutgoing(std::shared_ptr msg) { + // Merge with previous. One case we don't combine: delete/assign loop. + switch (msg->type()) { + case Message::kEntryAssign: + case Message::kEntryUpdate: { + // don't do this for unassigned id's + unsigned int id = msg->id(); + if (id == 0xffff) { + outgoing.push_back(msg); + break; + } + if (id < last_update.size() && last_update[id].first != 0) { + // overwrite the previous one for this id + auto& oldmsg = outgoing[last_update[id].first - 1]; + if (oldmsg && oldmsg->Is(Message::kEntryAssign) && + msg->Is(Message::kEntryUpdate)) { + // need to update assignment with new seq_num and value + oldmsg = Message::EntryAssign(oldmsg->str(), id, msg->seq_num_uid(), + msg->value(), oldmsg->flags()); + } else + oldmsg = msg; // easy update + } else { + // new, but remember it + std::size_t pos = outgoing.size(); + outgoing.push_back(msg); + if (id >= last_update.size()) last_update.resize(id + 1); + last_update[id].first = pos + 1; + } + break; + } + case Message::kEntryDelete: { + // don't do this for unassigned id's + unsigned int id = msg->id(); + if (id == 0xffff) { + outgoing.push_back(msg); + break; + } + + // clear previous updates + if (id < last_update.size()) { + if (last_update[id].first != 0) { + outgoing[last_update[id].first - 1].reset(); + last_update[id].first = 0; + } + if (last_update[id].second != 0) { + outgoing[last_update[id].second - 1].reset(); + last_update[id].second = 0; + } + } + + // add deletion + outgoing.push_back(msg); + break; + } + case Message::kFlagsUpdate: { + // don't do this for unassigned id's + unsigned int id = msg->id(); + if (id == 0xffff) { + outgoing.push_back(msg); + break; + } + if (id < last_update.size() && last_update[id].second != 0) { + // overwrite the previous one for this id + outgoing[last_update[id].second - 1] = msg; + } else { + // new, but remember it + std::size_t pos = outgoing.size(); + outgoing.push_back(msg); + if (id >= last_update.size()) last_update.resize(id + 1); + last_update[id].second = pos + 1; + } + break; + } + case Message::kClearEntries: { + // knock out all previous assigns/updates! + for (auto& i : outgoing) { + if (!i) continue; + auto t = i->type(); + if (t == Message::kEntryAssign || t == Message::kEntryUpdate || + t == Message::kFlagsUpdate || t == Message::kEntryDelete || + t == Message::kClearEntries) + i.reset(); + } + last_update.resize(0); + outgoing.push_back(msg); + break; + } + default: + outgoing.push_back(msg); + break; + } +} + void DispatcherBase::QueueOutgoing(std::shared_ptr msg, NetworkConnection* only, NetworkConnection* except) { @@ -212,7 +304,7 @@ void DispatcherBase::QueueOutgoing(std::shared_ptr msg, auto state = conn.net->state(); if (state != NetworkConnection::kSynchronized && state != NetworkConnection::kActive) continue; - conn.outgoing.push_back(msg); + conn.QueueOutgoing(msg); } } diff --git a/src/Dispatcher.h b/src/Dispatcher.h index f83cbfe..865d966 100644 --- a/src/Dispatcher.h +++ b/src/Dispatcher.h @@ -85,8 +85,11 @@ class DispatcherBase { Connection() = default; explicit Connection(std::unique_ptr net_) : net(std::move(net_)) {} + void QueueOutgoing(std::shared_ptr msg); + std::unique_ptr net; NetworkConnection::Outgoing outgoing; + std::vector> last_update; }; std::vector m_connections; std::string m_identity;