Skip to content
This repository has been archived by the owner on Sep 21, 2020. It is now read-only.

Commit

Permalink
Use wpi::mutex instead of std::mutex. (#254)
Browse files Browse the repository at this point in the history
This uses a priority-aware mutex on Linux platforms.
  • Loading branch information
PeterJohnson authored Nov 13, 2017
1 parent 86d4899 commit 3438a17
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 129 deletions.
20 changes: 10 additions & 10 deletions src/main/native/cpp/CallbackManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

#include <atomic>
#include <climits>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <utility>

#include "llvm/raw_ostream.h"
#include "support/condition_variable.h"
#include "support/mutex.h"
#include "support/SafeThread.h"
#include "support/UidVector.h"

Expand Down Expand Up @@ -67,19 +67,19 @@ class CallbackThread : public wpi::SafeThread {
wpi::UidVector<ListenerData, 64> m_listeners;

std::queue<std::pair<unsigned int, NotifierData>> m_queue;
std::condition_variable m_queue_empty;
wpi::condition_variable m_queue_empty;

struct Poller {
void Terminate() {
{
std::lock_guard<std::mutex> lock(poll_mutex);
std::lock_guard<wpi::mutex> lock(poll_mutex);
terminating = true;
}
poll_cond.notify_all();
}
std::queue<NotifierData> poll_queue;
std::mutex poll_mutex;
std::condition_variable poll_cond;
wpi::mutex poll_mutex;
wpi::condition_variable poll_cond;
bool terminating = false;
bool cancelling = false;
};
Expand All @@ -92,7 +92,7 @@ class CallbackThread : public wpi::SafeThread {
auto poller = m_pollers[poller_uid];
if (!poller) return;
{
std::lock_guard<std::mutex> lock(poller->poll_mutex);
std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
poller->poll_queue.emplace(std::forward<Args>(args)...);
}
poller->poll_cond.notify_one();
Expand All @@ -102,7 +102,7 @@ class CallbackThread : public wpi::SafeThread {
template <typename Derived, typename TUserInfo, typename TListenerData,
typename TNotifierData>
void CallbackThread<Derived, TUserInfo, TListenerData, TNotifierData>::Main() {
std::unique_lock<std::mutex> lock(m_mutex);
std::unique_lock<wpi::mutex> lock(m_mutex);
while (m_active) {
while (m_queue.empty()) {
m_cond.wait(lock);
Expand Down Expand Up @@ -244,7 +244,7 @@ class CallbackManager {
if (!poller) return infos;
}

std::unique_lock<std::mutex> lock(poller->poll_mutex);
std::unique_lock<wpi::mutex> lock(poller->poll_mutex);
#if defined(_MSC_VER) && _MSC_VER < 1900
auto timeout_time = std::chrono::steady_clock::now() +
std::chrono::duration<int64_t, std::nano>(
Expand Down Expand Up @@ -295,7 +295,7 @@ class CallbackManager {
}

{
std::lock_guard<std::mutex> lock(poller->poll_mutex);
std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
poller->cancelling = true;
}
poller->poll_cond.notify_one();
Expand Down
44 changes: 22 additions & 22 deletions src/main/native/cpp/Dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void DispatcherBase::StartServer(
StringRef persist_filename,
std::unique_ptr<wpi::NetworkAcceptor> acceptor) {
{
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
if (m_active) return;
m_active = true;
}
Expand Down Expand Up @@ -148,7 +148,7 @@ void DispatcherBase::StartServer(

void DispatcherBase::StartClient() {
{
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
if (m_active) return;
m_active = true;
}
Expand All @@ -167,7 +167,7 @@ void DispatcherBase::Stop() {

// wake up client thread with a reconnect
{
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
m_client_connector = nullptr;
}
ClientReconnect();
Expand All @@ -181,7 +181,7 @@ void DispatcherBase::Stop() {

std::vector<std::shared_ptr<INetworkConnection>> conns;
{
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
conns.swap(m_connections);
}

Expand All @@ -199,14 +199,14 @@ void DispatcherBase::SetUpdateRate(double interval) {
}

void DispatcherBase::SetIdentity(llvm::StringRef name) {
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
m_identity = name;
}

void DispatcherBase::Flush() {
auto now = std::chrono::steady_clock::now();
{
std::lock_guard<std::mutex> lock(m_flush_mutex);
std::lock_guard<wpi::mutex> lock(m_flush_mutex);
// don't allow flushes more often than every 10 ms
if ((now - m_last_flush) < std::chrono::milliseconds(10)) return;
m_last_flush = now;
Expand All @@ -219,7 +219,7 @@ std::vector<ConnectionInfo> DispatcherBase::GetConnections() const {
std::vector<ConnectionInfo> conns;
if (!m_active) return conns;

std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
for (auto& conn : m_connections) {
if (conn->state() != NetworkConnection::kActive) continue;
conns.emplace_back(conn->info());
Expand All @@ -231,7 +231,7 @@ std::vector<ConnectionInfo> DispatcherBase::GetConnections() const {
bool DispatcherBase::IsConnected() const {
if (!m_active) return false;

std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
for (auto& conn : m_connections) {
if (conn->state() == NetworkConnection::kActive) return true;
}
Expand All @@ -242,7 +242,7 @@ bool DispatcherBase::IsConnected() const {
unsigned int DispatcherBase::AddListener(
std::function<void(const ConnectionNotification& event)> callback,
bool immediate_notify) const {
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
unsigned int uid = m_notifier.Add(callback);
// perform immediate notifications
if (immediate_notify) {
Expand All @@ -256,7 +256,7 @@ unsigned int DispatcherBase::AddListener(

unsigned int DispatcherBase::AddPolledListener(unsigned int poller_uid,
bool immediate_notify) const {
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
unsigned int uid = m_notifier.AddPolled(poller_uid);
// perform immediate notifications
if (immediate_notify) {
Expand All @@ -269,17 +269,17 @@ unsigned int DispatcherBase::AddPolledListener(unsigned int poller_uid,
}

void DispatcherBase::SetConnector(Connector connector) {
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
m_client_connector = std::move(connector);
}

void DispatcherBase::SetConnectorOverride(Connector connector) {
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
m_client_connector_override = std::move(connector);
}

void DispatcherBase::ClearConnectorOverride() {
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
m_client_connector_override = nullptr;
}

Expand All @@ -298,7 +298,7 @@ void DispatcherBase::DispatchThreadMain() {

// wait for periodic or when flushed
timeout_time += std::chrono::milliseconds(m_update_rate);
std::unique_lock<std::mutex> flush_lock(m_flush_mutex);
std::unique_lock<wpi::mutex> flush_lock(m_flush_mutex);
m_flush_cv.wait_until(flush_lock, timeout_time,
[&] { return !m_active || m_do_flush; });
m_do_flush = false;
Expand All @@ -316,7 +316,7 @@ void DispatcherBase::DispatchThreadMain() {
}

{
std::lock_guard<std::mutex> user_lock(m_user_mutex);
std::lock_guard<wpi::mutex> user_lock(m_user_mutex);
bool reconnect = false;

if (++count > 10) {
Expand Down Expand Up @@ -347,7 +347,7 @@ void DispatcherBase::DispatchThreadMain() {
void DispatcherBase::QueueOutgoing(std::shared_ptr<Message> msg,
INetworkConnection* only,
INetworkConnection* except) {
std::lock_guard<std::mutex> user_lock(m_user_mutex);
std::lock_guard<wpi::mutex> user_lock(m_user_mutex);
for (auto& conn : m_connections) {
if (conn.get() == except) continue;
if (only && conn.get() != only) continue;
Expand Down Expand Up @@ -389,7 +389,7 @@ void DispatcherBase::ServerThreadMain() {
std::bind(&IStorage::ProcessIncoming, &m_storage, _1, _2,
std::weak_ptr<NetworkConnection>(conn)));
{
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
// reuse dead connection slots
bool placed = false;
for (auto& c : m_connections) {
Expand All @@ -414,7 +414,7 @@ void DispatcherBase::ClientThreadMain() {

// get next server to connect to
{
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
if (m_client_connector_override) {
connect = m_client_connector_override;
} else {
Expand All @@ -436,7 +436,7 @@ void DispatcherBase::ClientThreadMain() {
DEBUG("client connected");
m_networkMode = NT_NET_MODE_CLIENT;

std::unique_lock<std::mutex> lock(m_user_mutex);
std::unique_lock<wpi::mutex> lock(m_user_mutex);
using namespace std::placeholders;
auto conn = std::make_shared<NetworkConnection>(
++m_connections_uid, std::move(stream), m_notifier, m_logger,
Expand Down Expand Up @@ -466,7 +466,7 @@ bool DispatcherBase::ClientHandshake(
// get identity
std::string self_id;
{
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
self_id = m_identity;
}

Expand Down Expand Up @@ -573,7 +573,7 @@ bool DispatcherBase::ServerHandshake(

// Start with server hello. TODO: initial connection flag
if (proto_rev >= 0x0300) {
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
outgoing.emplace_back(Message::ServerHello(0u, m_identity));
}

Expand Down Expand Up @@ -630,7 +630,7 @@ bool DispatcherBase::ServerHandshake(
void DispatcherBase::ClientReconnect(unsigned int proto_rev) {
if ((m_networkMode & NT_NET_MODE_SERVER) != 0) return;
{
std::lock_guard<std::mutex> lock(m_user_mutex);
std::lock_guard<wpi::mutex> lock(m_user_mutex);
m_reconnect_proto_rev = proto_rev;
m_do_reconnect = true;
}
Expand Down
12 changes: 6 additions & 6 deletions src/main/native/cpp/Dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include "llvm/StringRef.h"
#include "support/condition_variable.h"
#include "support/mutex.h"

#include "IDispatcher.h"
#include "INetworkConnection.h"
Expand Down Expand Up @@ -103,21 +103,21 @@ class DispatcherBase : public IDispatcher {
uint8_t m_connections_uid = 0;

// Mutex for user-accessible items
mutable std::mutex m_user_mutex;
mutable wpi::mutex m_user_mutex;
std::vector<std::shared_ptr<INetworkConnection>> m_connections;
std::string m_identity;

std::atomic_bool m_active; // set to false to terminate threads
std::atomic_uint m_update_rate; // periodic dispatch update rate, in ms

// Condition variable for forced dispatch wakeup (flush)
std::mutex m_flush_mutex;
std::condition_variable m_flush_cv;
wpi::mutex m_flush_mutex;
wpi::condition_variable m_flush_cv;
std::chrono::steady_clock::time_point m_last_flush;
bool m_do_flush = false;

// Condition variable for client reconnect (uses user mutex)
std::condition_variable m_reconnect_cv;
wpi::condition_variable m_reconnect_cv;
unsigned int m_reconnect_proto_rev = 0x0300;
bool m_do_reconnect = true;

Expand Down
2 changes: 1 addition & 1 deletion src/main/native/cpp/DsClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void DsClient::Thread::Main() {
std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
unsigned int port;
{
std::unique_lock<std::mutex> lock(m_mutex);
std::unique_lock<wpi::mutex> lock(m_mutex);
m_cond.wait_until(lock, timeout_time, [&] { return !m_active; });
port = m_port;
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/native/cpp/InstanceImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ using namespace nt;
std::atomic<int> InstanceImpl::s_default{-1};
std::atomic<InstanceImpl*> InstanceImpl::s_fast_instances[10];
wpi::UidVector<std::unique_ptr<InstanceImpl>, 10> InstanceImpl::s_instances;
std::mutex InstanceImpl::s_mutex;
wpi::mutex InstanceImpl::s_mutex;

using namespace std::placeholders;

Expand Down Expand Up @@ -43,7 +43,7 @@ InstanceImpl* InstanceImpl::Get(int inst) {
}

// slow path
std::lock_guard<std::mutex> lock(s_mutex);
std::lock_guard<wpi::mutex> lock(s_mutex);

// static fast-path block
if (static_cast<unsigned int>(inst) <
Expand All @@ -66,7 +66,7 @@ int InstanceImpl::GetDefaultIndex() {
if (inst >= 0) return inst;

// slow path
std::lock_guard<std::mutex> lock(s_mutex);
std::lock_guard<wpi::mutex> lock(s_mutex);

// double-check
inst = s_default;
Expand All @@ -79,7 +79,7 @@ int InstanceImpl::GetDefaultIndex() {
}

int InstanceImpl::Alloc() {
std::lock_guard<std::mutex> lock(s_mutex);
std::lock_guard<wpi::mutex> lock(s_mutex);
return AllocImpl();
}

Expand All @@ -96,7 +96,7 @@ int InstanceImpl::AllocImpl() {
}

void InstanceImpl::Destroy(int inst) {
std::lock_guard<std::mutex> lock(s_mutex);
std::lock_guard<wpi::mutex> lock(s_mutex);
if (inst < 0 || static_cast<unsigned int>(inst) >= s_instances.size()) return;

if (static_cast<unsigned int>(inst) <
Expand Down
4 changes: 2 additions & 2 deletions src/main/native/cpp/InstanceImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

#include <atomic>
#include <memory>
#include <mutex>

#include "support/mutex.h"
#include "support/UidVector.h"

#include "ConnectionNotifier.h"
Expand Down Expand Up @@ -52,7 +52,7 @@ class InstanceImpl {
static std::atomic<int> s_default;
static std::atomic<InstanceImpl*> s_fast_instances[10];
static wpi::UidVector<std::unique_ptr<InstanceImpl>, 10> s_instances;
static std::mutex s_mutex;
static wpi::mutex s_mutex;
};

} // namespace nt
Expand Down
Loading

0 comments on commit 3438a17

Please sign in to comment.