diff --git a/src/main/native/cpp/CallbackManager.h b/src/main/native/cpp/CallbackManager.h index ff4a095..425af28 100644 --- a/src/main/native/cpp/CallbackManager.h +++ b/src/main/native/cpp/CallbackManager.h @@ -10,13 +10,13 @@ #include #include -#include #include -#include #include #include #include "llvm/raw_ostream.h" +#include "support/condition_variable.h" +#include "support/mutex.h" #include "support/SafeThread.h" #include "support/UidVector.h" @@ -67,19 +67,19 @@ class CallbackThread : public wpi::SafeThread { wpi::UidVector m_listeners; std::queue> m_queue; - std::condition_variable m_queue_empty; + wpi::condition_variable m_queue_empty; struct Poller { void Terminate() { { - std::lock_guard lock(poll_mutex); + std::lock_guard lock(poll_mutex); terminating = true; } poll_cond.notify_all(); } std::queue poll_queue; - std::mutex poll_mutex; - std::condition_variable poll_cond; + wpi::mutex poll_mutex; + wpi::condition_variable poll_cond; bool terminating = false; bool cancelling = false; }; @@ -92,7 +92,7 @@ class CallbackThread : public wpi::SafeThread { auto poller = m_pollers[poller_uid]; if (!poller) return; { - std::lock_guard lock(poller->poll_mutex); + std::lock_guard lock(poller->poll_mutex); poller->poll_queue.emplace(std::forward(args)...); } poller->poll_cond.notify_one(); @@ -102,7 +102,7 @@ class CallbackThread : public wpi::SafeThread { template void CallbackThread::Main() { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); while (m_active) { while (m_queue.empty()) { m_cond.wait(lock); @@ -244,7 +244,7 @@ class CallbackManager { if (!poller) return infos; } - std::unique_lock lock(poller->poll_mutex); + std::unique_lock lock(poller->poll_mutex); #if defined(_MSC_VER) && _MSC_VER < 1900 auto timeout_time = std::chrono::steady_clock::now() + std::chrono::duration( @@ -295,7 +295,7 @@ class CallbackManager { } { - std::lock_guard lock(poller->poll_mutex); + std::lock_guard lock(poller->poll_mutex); poller->cancelling = true; } poller->poll_cond.notify_one(); diff --git a/src/main/native/cpp/Dispatcher.cpp b/src/main/native/cpp/Dispatcher.cpp index ef10653..048b6da 100644 --- a/src/main/native/cpp/Dispatcher.cpp +++ b/src/main/native/cpp/Dispatcher.cpp @@ -118,7 +118,7 @@ void DispatcherBase::StartServer( StringRef persist_filename, std::unique_ptr acceptor) { { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); if (m_active) return; m_active = true; } @@ -148,7 +148,7 @@ void DispatcherBase::StartServer( void DispatcherBase::StartClient() { { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); if (m_active) return; m_active = true; } @@ -167,7 +167,7 @@ void DispatcherBase::Stop() { // wake up client thread with a reconnect { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); m_client_connector = nullptr; } ClientReconnect(); @@ -181,7 +181,7 @@ void DispatcherBase::Stop() { std::vector> conns; { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); conns.swap(m_connections); } @@ -199,14 +199,14 @@ void DispatcherBase::SetUpdateRate(double interval) { } void DispatcherBase::SetIdentity(llvm::StringRef name) { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); m_identity = name; } void DispatcherBase::Flush() { auto now = std::chrono::steady_clock::now(); { - std::lock_guard lock(m_flush_mutex); + std::lock_guard lock(m_flush_mutex); // don't allow flushes more often than every 10 ms if ((now - m_last_flush) < std::chrono::milliseconds(10)) return; m_last_flush = now; @@ -219,7 +219,7 @@ std::vector DispatcherBase::GetConnections() const { std::vector conns; if (!m_active) return conns; - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); for (auto& conn : m_connections) { if (conn->state() != NetworkConnection::kActive) continue; conns.emplace_back(conn->info()); @@ -231,7 +231,7 @@ std::vector DispatcherBase::GetConnections() const { bool DispatcherBase::IsConnected() const { if (!m_active) return false; - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); for (auto& conn : m_connections) { if (conn->state() == NetworkConnection::kActive) return true; } @@ -242,7 +242,7 @@ bool DispatcherBase::IsConnected() const { unsigned int DispatcherBase::AddListener( std::function callback, bool immediate_notify) const { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); unsigned int uid = m_notifier.Add(callback); // perform immediate notifications if (immediate_notify) { @@ -256,7 +256,7 @@ unsigned int DispatcherBase::AddListener( unsigned int DispatcherBase::AddPolledListener(unsigned int poller_uid, bool immediate_notify) const { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); unsigned int uid = m_notifier.AddPolled(poller_uid); // perform immediate notifications if (immediate_notify) { @@ -269,17 +269,17 @@ unsigned int DispatcherBase::AddPolledListener(unsigned int poller_uid, } void DispatcherBase::SetConnector(Connector connector) { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); m_client_connector = std::move(connector); } void DispatcherBase::SetConnectorOverride(Connector connector) { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); m_client_connector_override = std::move(connector); } void DispatcherBase::ClearConnectorOverride() { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); m_client_connector_override = nullptr; } @@ -298,7 +298,7 @@ void DispatcherBase::DispatchThreadMain() { // wait for periodic or when flushed timeout_time += std::chrono::milliseconds(m_update_rate); - std::unique_lock flush_lock(m_flush_mutex); + std::unique_lock flush_lock(m_flush_mutex); m_flush_cv.wait_until(flush_lock, timeout_time, [&] { return !m_active || m_do_flush; }); m_do_flush = false; @@ -316,7 +316,7 @@ void DispatcherBase::DispatchThreadMain() { } { - std::lock_guard user_lock(m_user_mutex); + std::lock_guard user_lock(m_user_mutex); bool reconnect = false; if (++count > 10) { @@ -347,7 +347,7 @@ void DispatcherBase::DispatchThreadMain() { void DispatcherBase::QueueOutgoing(std::shared_ptr msg, INetworkConnection* only, INetworkConnection* except) { - std::lock_guard user_lock(m_user_mutex); + std::lock_guard user_lock(m_user_mutex); for (auto& conn : m_connections) { if (conn.get() == except) continue; if (only && conn.get() != only) continue; @@ -389,7 +389,7 @@ void DispatcherBase::ServerThreadMain() { std::bind(&IStorage::ProcessIncoming, &m_storage, _1, _2, std::weak_ptr(conn))); { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); // reuse dead connection slots bool placed = false; for (auto& c : m_connections) { @@ -414,7 +414,7 @@ void DispatcherBase::ClientThreadMain() { // get next server to connect to { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); if (m_client_connector_override) { connect = m_client_connector_override; } else { @@ -436,7 +436,7 @@ void DispatcherBase::ClientThreadMain() { DEBUG("client connected"); m_networkMode = NT_NET_MODE_CLIENT; - std::unique_lock lock(m_user_mutex); + std::unique_lock lock(m_user_mutex); using namespace std::placeholders; auto conn = std::make_shared( ++m_connections_uid, std::move(stream), m_notifier, m_logger, @@ -466,7 +466,7 @@ bool DispatcherBase::ClientHandshake( // get identity std::string self_id; { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); self_id = m_identity; } @@ -573,7 +573,7 @@ bool DispatcherBase::ServerHandshake( // Start with server hello. TODO: initial connection flag if (proto_rev >= 0x0300) { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); outgoing.emplace_back(Message::ServerHello(0u, m_identity)); } @@ -630,7 +630,7 @@ bool DispatcherBase::ServerHandshake( void DispatcherBase::ClientReconnect(unsigned int proto_rev) { if ((m_networkMode & NT_NET_MODE_SERVER) != 0) return; { - std::lock_guard lock(m_user_mutex); + std::lock_guard lock(m_user_mutex); m_reconnect_proto_rev = proto_rev; m_do_reconnect = true; } diff --git a/src/main/native/cpp/Dispatcher.h b/src/main/native/cpp/Dispatcher.h index b8a48a8..07bcdfc 100644 --- a/src/main/native/cpp/Dispatcher.h +++ b/src/main/native/cpp/Dispatcher.h @@ -10,15 +10,15 @@ #include #include -#include #include #include -#include #include #include #include #include "llvm/StringRef.h" +#include "support/condition_variable.h" +#include "support/mutex.h" #include "IDispatcher.h" #include "INetworkConnection.h" @@ -103,7 +103,7 @@ class DispatcherBase : public IDispatcher { uint8_t m_connections_uid = 0; // Mutex for user-accessible items - mutable std::mutex m_user_mutex; + mutable wpi::mutex m_user_mutex; std::vector> m_connections; std::string m_identity; @@ -111,13 +111,13 @@ class DispatcherBase : public IDispatcher { std::atomic_uint m_update_rate; // periodic dispatch update rate, in ms // Condition variable for forced dispatch wakeup (flush) - std::mutex m_flush_mutex; - std::condition_variable m_flush_cv; + wpi::mutex m_flush_mutex; + wpi::condition_variable m_flush_cv; std::chrono::steady_clock::time_point m_last_flush; bool m_do_flush = false; // Condition variable for client reconnect (uses user mutex) - std::condition_variable m_reconnect_cv; + wpi::condition_variable m_reconnect_cv; unsigned int m_reconnect_proto_rev = 0x0300; bool m_do_reconnect = true; diff --git a/src/main/native/cpp/DsClient.cpp b/src/main/native/cpp/DsClient.cpp index b8e8c00..a333581 100644 --- a/src/main/native/cpp/DsClient.cpp +++ b/src/main/native/cpp/DsClient.cpp @@ -63,7 +63,7 @@ void DsClient::Thread::Main() { std::chrono::steady_clock::now() + std::chrono::milliseconds(500); unsigned int port; { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); m_cond.wait_until(lock, timeout_time, [&] { return !m_active; }); port = m_port; } diff --git a/src/main/native/cpp/InstanceImpl.cpp b/src/main/native/cpp/InstanceImpl.cpp index 9bbd303..6c282a9 100644 --- a/src/main/native/cpp/InstanceImpl.cpp +++ b/src/main/native/cpp/InstanceImpl.cpp @@ -12,7 +12,7 @@ using namespace nt; std::atomic InstanceImpl::s_default{-1}; std::atomic InstanceImpl::s_fast_instances[10]; wpi::UidVector, 10> InstanceImpl::s_instances; -std::mutex InstanceImpl::s_mutex; +wpi::mutex InstanceImpl::s_mutex; using namespace std::placeholders; @@ -43,7 +43,7 @@ InstanceImpl* InstanceImpl::Get(int inst) { } // slow path - std::lock_guard lock(s_mutex); + std::lock_guard lock(s_mutex); // static fast-path block if (static_cast(inst) < @@ -66,7 +66,7 @@ int InstanceImpl::GetDefaultIndex() { if (inst >= 0) return inst; // slow path - std::lock_guard lock(s_mutex); + std::lock_guard lock(s_mutex); // double-check inst = s_default; @@ -79,7 +79,7 @@ int InstanceImpl::GetDefaultIndex() { } int InstanceImpl::Alloc() { - std::lock_guard lock(s_mutex); + std::lock_guard lock(s_mutex); return AllocImpl(); } @@ -96,7 +96,7 @@ int InstanceImpl::AllocImpl() { } void InstanceImpl::Destroy(int inst) { - std::lock_guard lock(s_mutex); + std::lock_guard lock(s_mutex); if (inst < 0 || static_cast(inst) >= s_instances.size()) return; if (static_cast(inst) < diff --git a/src/main/native/cpp/InstanceImpl.h b/src/main/native/cpp/InstanceImpl.h index ab89080..4d191af 100644 --- a/src/main/native/cpp/InstanceImpl.h +++ b/src/main/native/cpp/InstanceImpl.h @@ -10,8 +10,8 @@ #include #include -#include +#include "support/mutex.h" #include "support/UidVector.h" #include "ConnectionNotifier.h" @@ -52,7 +52,7 @@ class InstanceImpl { static std::atomic s_default; static std::atomic s_fast_instances[10]; static wpi::UidVector, 10> s_instances; - static std::mutex s_mutex; + static wpi::mutex s_mutex; }; } // namespace nt diff --git a/src/main/native/cpp/NetworkConnection.cpp b/src/main/native/cpp/NetworkConnection.cpp index eb33abd..9435106 100644 --- a/src/main/native/cpp/NetworkConnection.cpp +++ b/src/main/native/cpp/NetworkConnection.cpp @@ -49,7 +49,7 @@ void NetworkConnection::Start() { while (!m_outgoing.empty()) m_outgoing.pop(); // reset shutdown flags { - std::lock_guard lock(m_shutdown_mutex); + std::lock_guard lock(m_shutdown_mutex); m_read_shutdown = false; m_write_shutdown = false; } @@ -68,7 +68,7 @@ void NetworkConnection::Stop() { m_outgoing.push(Outgoing()); // wait for threads to terminate, with timeout if (m_write_thread.joinable()) { - std::unique_lock lock(m_shutdown_mutex); + std::unique_lock lock(m_shutdown_mutex); auto timeout_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(200); if (m_write_shutdown_cv.wait_until(lock, timeout_time, @@ -78,7 +78,7 @@ void NetworkConnection::Stop() { m_write_thread.detach(); // timed out, detach it } if (m_read_thread.joinable()) { - std::unique_lock lock(m_shutdown_mutex); + std::unique_lock lock(m_shutdown_mutex); auto timeout_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(200); if (m_read_shutdown_cv.wait_until(lock, timeout_time, @@ -104,12 +104,12 @@ void NetworkConnection::set_proto_rev(unsigned int proto_rev) { } NetworkConnection::State NetworkConnection::state() const { - std::lock_guard lock(m_state_mutex); + std::lock_guard lock(m_state_mutex); return m_state; } void NetworkConnection::set_state(State state) { - std::lock_guard lock(m_state_mutex); + std::lock_guard lock(m_state_mutex); // Don't update state any more once we've died if (m_state == kDead) return; // One-shot notify state changes @@ -121,12 +121,12 @@ void NetworkConnection::set_state(State state) { } std::string NetworkConnection::remote_id() const { - std::lock_guard lock(m_remote_id_mutex); + std::lock_guard lock(m_remote_id_mutex); return m_remote_id; } void NetworkConnection::set_remote_id(StringRef remote_id) { - std::lock_guard lock(m_remote_id_mutex); + std::lock_guard lock(m_remote_id_mutex); m_remote_id = remote_id; } @@ -177,7 +177,7 @@ void NetworkConnection::ReadThreadMain() { done: // use condition variable to signal thread shutdown { - std::lock_guard lock(m_shutdown_mutex); + std::lock_guard lock(m_shutdown_mutex); m_read_shutdown = true; m_read_shutdown_cv.notify_one(); } @@ -214,14 +214,14 @@ void NetworkConnection::WriteThreadMain() { // use condition variable to signal thread shutdown { - std::lock_guard lock(m_shutdown_mutex); + std::lock_guard lock(m_shutdown_mutex); m_write_shutdown = true; m_write_shutdown_cv.notify_one(); } } void NetworkConnection::QueueOutgoing(std::shared_ptr msg) { - std::lock_guard lock(m_pending_mutex); + std::lock_guard lock(m_pending_mutex); // Merge with previous. One case we don't combine: delete/assign loop. switch (msg->type()) { @@ -316,7 +316,7 @@ void NetworkConnection::QueueOutgoing(std::shared_ptr msg) { } void NetworkConnection::PostOutgoing(bool keep_alive) { - std::lock_guard lock(m_pending_mutex); + std::lock_guard lock(m_pending_mutex); auto now = std::chrono::steady_clock::now(); if (m_pending_outgoing.empty()) { if (!keep_alive) return; diff --git a/src/main/native/cpp/NetworkConnection.h b/src/main/native/cpp/NetworkConnection.h index 91d82cc..720b7b3 100644 --- a/src/main/native/cpp/NetworkConnection.h +++ b/src/main/native/cpp/NetworkConnection.h @@ -13,6 +13,8 @@ #include #include +#include "support/condition_variable.h" +#include "support/mutex.h" #include "support/ConcurrentQueue.h" #include "INetworkConnection.h" #include "Message.h" @@ -95,21 +97,21 @@ class NetworkConnection : public INetworkConnection { std::thread m_write_thread; std::atomic_bool m_active; std::atomic_uint m_proto_rev; - mutable std::mutex m_state_mutex; + mutable wpi::mutex m_state_mutex; State m_state; - mutable std::mutex m_remote_id_mutex; + mutable wpi::mutex m_remote_id_mutex; std::string m_remote_id; std::atomic_ullong m_last_update; std::chrono::steady_clock::time_point m_last_post; - std::mutex m_pending_mutex; + wpi::mutex m_pending_mutex; Outgoing m_pending_outgoing; std::vector> m_pending_update; // Condition variables for shutdown - std::mutex m_shutdown_mutex; - std::condition_variable m_read_shutdown_cv; - std::condition_variable m_write_shutdown_cv; + wpi::mutex m_shutdown_mutex; + wpi::condition_variable m_read_shutdown_cv; + wpi::condition_variable m_write_shutdown_cv; bool m_read_shutdown = false; bool m_write_shutdown = false; }; diff --git a/src/main/native/cpp/RpcServer.h b/src/main/native/cpp/RpcServer.h index a79e1e2..97d10db 100644 --- a/src/main/native/cpp/RpcServer.h +++ b/src/main/native/cpp/RpcServer.h @@ -9,6 +9,7 @@ #define NT_RPCSERVER_H_ #include "llvm/DenseMap.h" +#include "support/mutex.h" #include "CallbackManager.h" #include "Handle.h" @@ -60,7 +61,7 @@ class RpcServerThread RpcIdPair lookup_uid{local_id, call_uid}; callback(data); { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); auto i = m_response_map.find(lookup_uid); if (i != m_response_map.end()) { // post an empty response and erase it diff --git a/src/main/native/cpp/Storage.cpp b/src/main/native/cpp/Storage.cpp index 8adc914..17284ed 100644 --- a/src/main/native/cpp/Storage.cpp +++ b/src/main/native/cpp/Storage.cpp @@ -30,7 +30,7 @@ Storage::~Storage() { } void Storage::SetDispatcher(IDispatcher* dispatcher, bool server) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); m_dispatcher = dispatcher; m_server = server; } @@ -38,7 +38,7 @@ void Storage::SetDispatcher(IDispatcher* dispatcher, bool server) { void Storage::ClearDispatcher() { m_dispatcher = nullptr; } NT_Type Storage::GetMessageEntryType(unsigned int id) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); if (id >= m_idmap.size()) return NT_UNASSIGNED; Entry* entry = m_idmap[id]; if (!entry || !entry->value) return NT_UNASSIGNED; @@ -86,7 +86,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, void Storage::ProcessIncomingEntryAssign(std::shared_ptr msg, INetworkConnection* conn) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); unsigned int id = msg->id(); StringRef name = msg->str(); Entry* entry; @@ -211,7 +211,7 @@ void Storage::ProcessIncomingEntryAssign(std::shared_ptr msg, void Storage::ProcessIncomingEntryUpdate(std::shared_ptr msg, INetworkConnection* conn) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); unsigned int id = msg->id(); if (id >= m_idmap.size() || !m_idmap[id]) { // ignore arbitrary entry updates; @@ -248,7 +248,7 @@ void Storage::ProcessIncomingEntryUpdate(std::shared_ptr msg, void Storage::ProcessIncomingFlagsUpdate(std::shared_ptr msg, INetworkConnection* conn) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); unsigned int id = msg->id(); if (id >= m_idmap.size() || !m_idmap[id]) { // ignore arbitrary entry updates; @@ -272,7 +272,7 @@ void Storage::ProcessIncomingFlagsUpdate(std::shared_ptr msg, void Storage::ProcessIncomingEntryDelete(std::shared_ptr msg, INetworkConnection* conn) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); unsigned int id = msg->id(); if (id >= m_idmap.size() || !m_idmap[id]) { // ignore arbitrary entry updates; @@ -296,7 +296,7 @@ void Storage::ProcessIncomingEntryDelete(std::shared_ptr msg, void Storage::ProcessIncomingClearEntries(std::shared_ptr msg, INetworkConnection* conn) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); // update local DeleteAllEntriesImpl(false); @@ -312,7 +312,7 @@ void Storage::ProcessIncomingClearEntries(std::shared_ptr msg, void Storage::ProcessIncomingExecuteRpc( std::shared_ptr msg, INetworkConnection* conn, std::weak_ptr conn_weak) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (!m_server) return; // only process on server unsigned int id = msg->id(); if (id >= m_idmap.size() || !m_idmap[id]) { @@ -351,7 +351,7 @@ void Storage::ProcessIncomingExecuteRpc( void Storage::ProcessIncomingRpcResponse(std::shared_ptr msg, INetworkConnection* conn) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (m_server) return; // only process on client unsigned int id = msg->id(); if (id >= m_idmap.size() || !m_idmap[id]) { @@ -374,7 +374,7 @@ void Storage::ProcessIncomingRpcResponse(std::shared_ptr msg, void Storage::GetInitialAssignments( INetworkConnection& conn, std::vector>* msgs) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); conn.set_state(INetworkConnection::kSynchronized); for (auto& i : m_entries) { Entry* entry = i.getValue(); @@ -388,7 +388,7 @@ void Storage::GetInitialAssignments( void Storage::ApplyInitialAssignments( INetworkConnection& conn, llvm::ArrayRef> msgs, bool new_server, std::vector>* out_msgs) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (m_server) return; // should not do this on server conn.set_state(INetworkConnection::kSynchronized); @@ -476,14 +476,14 @@ void Storage::ApplyInitialAssignments( } std::shared_ptr Storage::GetEntryValue(StringRef name) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); auto i = m_entries.find(name); if (i == m_entries.end()) return nullptr; return i->getValue()->value; } std::shared_ptr Storage::GetEntryValue(unsigned int local_id) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); if (local_id >= m_localmap.size()) return nullptr; return m_localmap[local_id]->value; } @@ -492,7 +492,7 @@ bool Storage::SetDefaultEntryValue(StringRef name, std::shared_ptr value) { if (name.empty()) return false; if (!value) return false; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); Entry* entry = GetOrNew(name); // we return early if value already exists; if types match return true @@ -505,7 +505,7 @@ bool Storage::SetDefaultEntryValue(StringRef name, bool Storage::SetDefaultEntryValue(unsigned int local_id, std::shared_ptr value) { if (!value) return false; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return false; Entry* entry = m_localmap[local_id].get(); @@ -519,7 +519,7 @@ bool Storage::SetDefaultEntryValue(unsigned int local_id, bool Storage::SetEntryValue(StringRef name, std::shared_ptr value) { if (name.empty()) return true; if (!value) return true; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); Entry* entry = GetOrNew(name); if (entry->value && entry->value->type() != value->type()) @@ -532,7 +532,7 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr value) { bool Storage::SetEntryValue(unsigned int local_id, std::shared_ptr value) { if (!value) return true; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return true; Entry* entry = m_localmap[local_id].get(); @@ -544,7 +544,7 @@ bool Storage::SetEntryValue(unsigned int local_id, } void Storage::SetEntryValueImpl(Entry* entry, std::shared_ptr value, - std::unique_lock& lock, + std::unique_lock& lock, bool local) { if (!value) return; auto old_value = entry->value; @@ -595,7 +595,7 @@ void Storage::SetEntryValueImpl(Entry* entry, std::shared_ptr value, void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr value) { if (name.empty()) return; if (!value) return; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); Entry* entry = GetOrNew(name); SetEntryValueImpl(entry, value, lock, true); @@ -604,7 +604,7 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr value) { void Storage::SetEntryTypeValue(unsigned int local_id, std::shared_ptr value) { if (!value) return; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return; Entry* entry = m_localmap[local_id].get(); if (!entry) return; @@ -614,20 +614,20 @@ void Storage::SetEntryTypeValue(unsigned int local_id, void Storage::SetEntryFlags(StringRef name, unsigned int flags) { if (name.empty()) return; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); auto i = m_entries.find(name); if (i == m_entries.end()) return; SetEntryFlagsImpl(i->getValue(), flags, lock, true); } void Storage::SetEntryFlags(unsigned int id_local, unsigned int flags) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (id_local >= m_localmap.size()) return; SetEntryFlagsImpl(m_localmap[id_local].get(), flags, lock, true); } void Storage::SetEntryFlagsImpl(Entry* entry, unsigned int flags, - std::unique_lock& lock, + std::unique_lock& lock, bool local) { if (!entry->value || entry->flags == flags) return; @@ -654,32 +654,32 @@ void Storage::SetEntryFlagsImpl(Entry* entry, unsigned int flags, } unsigned int Storage::GetEntryFlags(StringRef name) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); auto i = m_entries.find(name); if (i == m_entries.end()) return 0; return i->getValue()->flags; } unsigned int Storage::GetEntryFlags(unsigned int local_id) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); if (local_id >= m_localmap.size()) return 0; return m_localmap[local_id]->flags; } void Storage::DeleteEntry(StringRef name) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); auto i = m_entries.find(name); if (i == m_entries.end()) return; DeleteEntryImpl(i->getValue(), lock, true); } void Storage::DeleteEntry(unsigned int local_id) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return; DeleteEntryImpl(m_localmap[local_id].get(), lock, true); } -void Storage::DeleteEntryImpl(Entry* entry, std::unique_lock& lock, +void Storage::DeleteEntryImpl(Entry* entry, std::unique_lock& lock, bool local) { unsigned int id = entry->id; @@ -745,7 +745,7 @@ void Storage::DeleteAllEntriesImpl(bool local) { } void Storage::DeleteAllEntries() { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (m_entries.empty()) return; DeleteAllEntriesImpl(true); @@ -769,13 +769,13 @@ Storage::Entry* Storage::GetOrNew(StringRef name) { unsigned int Storage::GetEntry(StringRef name) { if (name.empty()) return UINT_MAX; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); return GetOrNew(name)->local_id; } std::vector Storage::GetEntries(StringRef prefix, unsigned int types) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); std::vector ids; for (auto& i : m_entries) { Entry* entry = i.getValue(); @@ -794,7 +794,7 @@ EntryInfo Storage::GetEntryInfo(int inst, unsigned int local_id) const { info.flags = 0; info.last_change = 0; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return info; Entry* entry = m_localmap[local_id].get(); if (!entry->value) return info; @@ -808,13 +808,13 @@ EntryInfo Storage::GetEntryInfo(int inst, unsigned int local_id) const { } std::string Storage::GetEntryName(unsigned int local_id) const { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return std::string{}; return m_localmap[local_id]->name; } NT_Type Storage::GetEntryType(unsigned int local_id) const { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return NT_UNASSIGNED; Entry* entry = m_localmap[local_id].get(); if (!entry->value) return NT_UNASSIGNED; @@ -822,7 +822,7 @@ NT_Type Storage::GetEntryType(unsigned int local_id) const { } unsigned long long Storage::GetEntryLastChange(unsigned int local_id) const { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return 0; Entry* entry = m_localmap[local_id].get(); if (!entry->value) return 0; @@ -831,7 +831,7 @@ unsigned long long Storage::GetEntryLastChange(unsigned int local_id) const { std::vector Storage::GetEntryInfo(int inst, StringRef prefix, unsigned int types) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); std::vector infos; for (auto& i : m_entries) { Entry* entry = i.getValue(); @@ -853,7 +853,7 @@ unsigned int Storage::AddListener( StringRef prefix, std::function callback, unsigned int flags) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); unsigned int uid = m_notifier.Add(callback, prefix, flags); // perform immediate notifications if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0) { @@ -871,7 +871,7 @@ unsigned int Storage::AddListener( unsigned int local_id, std::function callback, unsigned int flags) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); unsigned int uid = m_notifier.Add(callback, local_id, flags); // perform immediate notifications if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0 && @@ -887,7 +887,7 @@ unsigned int Storage::AddListener( unsigned int Storage::AddPolledListener(unsigned int poller, StringRef prefix, unsigned int flags) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); unsigned int uid = m_notifier.AddPolled(poller, prefix, flags); // perform immediate notifications if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0) { @@ -905,7 +905,7 @@ unsigned int Storage::AddPolledListener(unsigned int poller, StringRef prefix, unsigned int Storage::AddPolledListener(unsigned int poller, unsigned int local_id, unsigned int flags) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); unsigned int uid = m_notifier.AddPolled(poller, local_id, flags); // perform immediate notifications if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0 && @@ -926,7 +926,7 @@ bool Storage::GetPersistentEntries( const { // copy values out of storage as quickly as possible so lock isn't held { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); // for periodic, don't re-save unless something has changed if (periodic && !m_persistent_dirty) return false; m_persistent_dirty = false; @@ -954,7 +954,7 @@ bool Storage::GetEntries( const { // copy values out of storage as quickly as possible so lock isn't held { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); entries->reserve(m_entries.size()); for (auto& i : m_entries) { Entry* entry = i.getValue(); @@ -975,7 +975,7 @@ bool Storage::GetEntries( void Storage::CreateRpc(unsigned int local_id, StringRef def, unsigned int rpc_uid) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return; Entry* entry = m_localmap[local_id].get(); @@ -1013,7 +1013,7 @@ void Storage::CreateRpc(unsigned int local_id, StringRef def, } unsigned int Storage::CallRpc(unsigned int local_id, StringRef params) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); if (local_id >= m_localmap.size()) return 0; Entry* entry = m_localmap[local_id].get(); @@ -1040,7 +1040,7 @@ unsigned int Storage::CallRpc(unsigned int local_id, StringRef params) { unsigned int call_uid = msg->seq_num_uid(); m_rpc_server.ProcessRpc(local_id, call_uid, name, msg->str(), conn_info, [=](StringRef result) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); m_rpc_results.insert(std::make_pair( RpcIdPair{local_id, call_uid}, result)); m_rpc_results_cond.notify_all(); @@ -1063,7 +1063,7 @@ bool Storage::GetRpcResult(unsigned int local_id, unsigned int call_uid, bool Storage::GetRpcResult(unsigned int local_id, unsigned int call_uid, std::string* result, double timeout, bool* timed_out) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); RpcIdPair call_pair{local_id, call_uid}; @@ -1115,7 +1115,7 @@ bool Storage::GetRpcResult(unsigned int local_id, unsigned int call_uid, } void Storage::CancelRpcResult(unsigned int local_id, unsigned int call_uid) { - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); // safe to erase even if id does not exist m_rpc_blocking_calls.erase(RpcIdPair{local_id, call_uid}); m_rpc_results_cond.notify_all(); diff --git a/src/main/native/cpp/Storage.h b/src/main/native/cpp/Storage.h index 9bf4201..625bd92 100644 --- a/src/main/native/cpp/Storage.h +++ b/src/main/native/cpp/Storage.h @@ -9,15 +9,15 @@ #define NT_STORAGE_H_ #include -#include #include #include #include -#include #include "llvm/DenseMap.h" #include "llvm/SmallSet.h" #include "llvm/StringMap.h" +#include "support/condition_variable.h" +#include "support/mutex.h" #include "Message.h" #include "ntcore_cpp.h" #include "SequenceNumber.h" @@ -196,7 +196,7 @@ class Storage : public IStorage { typedef llvm::DenseMap RpcResultMap; typedef llvm::SmallSet RpcBlockingCallSet; - mutable std::mutex m_mutex; + mutable wpi::mutex m_mutex; EntriesMap m_entries; IdMap m_idmap; LocalMap m_localmap; @@ -207,7 +207,7 @@ class Storage : public IStorage { // condition variable and termination flag for blocking on a RPC result std::atomic_bool m_terminating; - std::condition_variable m_rpc_results_cond; + wpi::condition_variable m_rpc_results_cond; // configured by dispatcher at startup IDispatcher* m_dispatcher = nullptr; @@ -241,10 +241,10 @@ class Storage : public IStorage { std::vector>>* entries) const; void SetEntryValueImpl(Entry* entry, std::shared_ptr value, - std::unique_lock& lock, bool local); + std::unique_lock& lock, bool local); void SetEntryFlagsImpl(Entry* entry, unsigned int flags, - std::unique_lock& lock, bool local); - void DeleteEntryImpl(Entry* entry, std::unique_lock& lock, + std::unique_lock& lock, bool local); + void DeleteEntryImpl(Entry* entry, std::unique_lock& lock, bool local); // Must be called with m_mutex held diff --git a/src/main/native/cpp/Storage_load.cpp b/src/main/native/cpp/Storage_load.cpp index 16076af..b1e1955 100644 --- a/src/main/native/cpp/Storage_load.cpp +++ b/src/main/native/cpp/Storage_load.cpp @@ -372,7 +372,7 @@ bool Storage::LoadEntries( // copy values into storage as quickly as possible so lock isn't held std::vector> msgs; - std::unique_lock lock(m_mutex); + std::unique_lock lock(m_mutex); for (auto& i : entries) { Entry* entry = GetOrNew(i.first); auto old_value = entry->value; diff --git a/src/main/native/cpp/networktables/NetworkTable.cpp b/src/main/native/cpp/networktables/NetworkTable.cpp index 0f58726..07361bb 100644 --- a/src/main/native/cpp/networktables/NetworkTable.cpp +++ b/src/main/native/cpp/networktables/NetworkTable.cpp @@ -136,7 +136,7 @@ NetworkTableInstance NetworkTable::GetInstance() const { } NetworkTableEntry NetworkTable::GetEntry(llvm::StringRef key) const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); NT_Entry& entry = m_entries[key]; if (entry == 0) { llvm::SmallString<128> path(m_path); @@ -194,7 +194,7 @@ void NetworkTable::AddTableListener(ITableListener* listener, void NetworkTable::AddTableListenerEx(ITableListener* listener, unsigned int flags) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; std::size_t prefix_len = path.size(); @@ -218,7 +218,7 @@ void NetworkTable::AddTableListener(StringRef key, ITableListener* listener, void NetworkTable::AddTableListenerEx(StringRef key, ITableListener* listener, unsigned int flags) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; std::size_t prefix_len = path.size(); @@ -240,7 +240,7 @@ void NetworkTable::AddSubTableListener(ITableListener* listener) { void NetworkTable::AddSubTableListener(ITableListener* listener, bool localNotify) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; std::size_t prefix_len = path.size(); @@ -268,7 +268,7 @@ void NetworkTable::AddSubTableListener(ITableListener* listener, } void NetworkTable::RemoveTableListener(ITableListener* listener) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); auto matches_begin = std::remove_if(m_listeners.begin(), m_listeners.end(), [=](const Listener& x) { return x.first == listener; }); @@ -302,7 +302,7 @@ std::vector NetworkTable::GetKeys(int types) const { llvm::SmallString<128> path(m_path); path += PATH_SEPARATOR_CHAR; auto infos = GetEntryInfo(m_inst, path, types); - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); for (auto& info : infos) { auto relative_key = StringRef(info.name).substr(path.size()); if (relative_key.find(PATH_SEPARATOR_CHAR) != StringRef::npos) continue; diff --git a/src/main/native/cpp/ntcore_cpp.cpp b/src/main/native/cpp/ntcore_cpp.cpp index 6e13f64..4b93e36 100644 --- a/src/main/native/cpp/ntcore_cpp.cpp +++ b/src/main/native/cpp/ntcore_cpp.cpp @@ -954,9 +954,9 @@ const char* LoadEntries( void SetLogger(LogFunc func, unsigned int min_level) { auto ii = InstanceImpl::GetDefault(); - static std::mutex mutex; + static wpi::mutex mutex; static unsigned int logger = 0; - std::lock_guard lock(mutex); + std::lock_guard lock(mutex); if (logger != 0) ii->logger_impl.Remove(logger); logger = ii->logger_impl.Add( [=](const LogMessage& msg) { diff --git a/src/main/native/include/networktables/NetworkTable.h b/src/main/native/include/networktables/NetworkTable.h index b867187..aa563d0 100644 --- a/src/main/native/include/networktables/NetworkTable.h +++ b/src/main/native/include/networktables/NetworkTable.h @@ -9,13 +9,13 @@ #define NETWORKTABLE_H_ #include -#include #include #include "llvm/StringMap.h" #include "networktables/NetworkTableEntry.h" #include "networktables/TableEntryListener.h" #include "networktables/TableListener.h" +#include "support/mutex.h" #include "ntcore_c.h" #include "tables/ITable.h" @@ -38,7 +38,7 @@ class NetworkTable final : public ITable { private: NT_Inst m_inst; std::string m_path; - mutable std::mutex m_mutex; + mutable wpi::mutex m_mutex; mutable llvm::StringMap m_entries; typedef std::pair Listener; std::vector m_listeners;