Skip to content
This repository has been archived by the owner on Sep 21, 2020. It is now read-only.

Commit

Permalink
Move immediate entry notification logic into Storage.
Browse files Browse the repository at this point in the history
This prevents a race condition that could result in out of order
notifications.
  • Loading branch information
PeterJohnson committed Oct 1, 2017
1 parent 10982e0 commit e4a8bff
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 44 deletions.
8 changes: 4 additions & 4 deletions src/main/native/cpp/EntryNotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ class EntryNotifier
bool local_notifiers() const override;

unsigned int Add(std::function<void(const EntryNotification& event)> callback,
llvm::StringRef prefix, unsigned int flags);
llvm::StringRef prefix, unsigned int flags) override;
unsigned int Add(std::function<void(const EntryNotification& event)> callback,
unsigned int local_id, unsigned int flags);
unsigned int local_id, unsigned int flags) override;
unsigned int AddPolled(unsigned int poller_uid, llvm::StringRef prefix,
unsigned int flags);
unsigned int flags) override;
unsigned int AddPolled(unsigned int poller_uid, unsigned int local_id,
unsigned int flags);
unsigned int flags) override;

void NotifyEntry(unsigned int local_id, StringRef name,
std::shared_ptr<Value> value, unsigned int flags,
Expand Down
13 changes: 13 additions & 0 deletions src/main/native/cpp/IEntryNotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ class IEntryNotifier {
IEntryNotifier& operator=(const IEntryNotifier&) = delete;
virtual ~IEntryNotifier() = default;
virtual bool local_notifiers() const = 0;

virtual unsigned int Add(
std::function<void(const EntryNotification& event)> callback,
llvm::StringRef prefix, unsigned int flags) = 0;
virtual unsigned int Add(
std::function<void(const EntryNotification& event)> callback,
unsigned int local_id, unsigned int flags) = 0;
virtual unsigned int AddPolled(unsigned int poller_uid,
llvm::StringRef prefix,
unsigned int flags) = 0;
virtual unsigned int AddPolled(unsigned int poller_uid, unsigned int local_id,
unsigned int flags) = 0;

virtual void NotifyEntry(unsigned int local_id, StringRef name,
std::shared_ptr<Value> value, unsigned int flags,
unsigned int only_listener = UINT_MAX) = 0;
Expand Down
71 changes: 71 additions & 0 deletions src/main/native/cpp/Storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,77 @@ std::vector<EntryInfo> Storage::GetEntryInfo(int inst, StringRef prefix,
return infos;
}

unsigned int Storage::AddListener(
StringRef prefix,
std::function<void(const EntryNotification& event)> callback,
unsigned int flags) const {
std::lock_guard<std::mutex> lock(m_mutex);
unsigned int uid = m_notifier.Add(callback, prefix, flags);
// perform immediate notifications
if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0) {
for (auto& i : m_entries) {
if (!i.getKey().startswith(prefix)) continue;
Entry* entry = i.getValue();
m_notifier.NotifyEntry(entry->local_id, i.getKey(), entry->value,
NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW, uid);
}
}
return uid;
}

unsigned int Storage::AddListener(
unsigned int local_id,
std::function<void(const EntryNotification& event)> callback,
unsigned int flags) const {
std::lock_guard<std::mutex> lock(m_mutex);
unsigned int uid = m_notifier.Add(callback, local_id, flags);
// perform immediate notifications
if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0 &&
local_id < m_localmap.size()) {
Entry* entry = m_localmap[local_id].get();
// if no value, don't notify
if (entry->value) {
m_notifier.NotifyEntry(local_id, entry->name, entry->value,
NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW, uid);
}
}
return uid;
}

unsigned int Storage::AddPolledListener(unsigned int poller, StringRef prefix,
unsigned int flags) const {
std::lock_guard<std::mutex> lock(m_mutex);
unsigned int uid = m_notifier.AddPolled(poller, prefix, flags);
// perform immediate notifications
if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0) {
for (auto& i : m_entries) {
if (!i.getKey().startswith(prefix)) continue;
Entry* entry = i.getValue();
m_notifier.NotifyEntry(entry->local_id, i.getKey(), entry->value,
NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW, uid);
}
}
return uid;
}

unsigned int Storage::AddPolledListener(unsigned int poller,
unsigned int local_id,
unsigned int flags) const {
std::lock_guard<std::mutex> lock(m_mutex);
unsigned int uid = m_notifier.AddPolled(poller, local_id, flags);
// perform immediate notifications
if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0 &&
local_id < m_localmap.size()) {
Entry* entry = m_localmap[local_id].get();
// if no value, don't notify
if (entry->value) {
m_notifier.NotifyEntry(local_id, entry->name, entry->value,
NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW, uid);
}
}
return uid;
}

bool Storage::GetPersistentEntries(
bool periodic,
std::vector<std::pair<std::string, std::shared_ptr<Value>>>* entries)
Expand Down
14 changes: 14 additions & 0 deletions src/main/native/cpp/Storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ class Storage : public IStorage {
std::vector<EntryInfo> GetEntryInfo(int inst, StringRef prefix,
unsigned int types);

unsigned int AddListener(
StringRef prefix,
std::function<void(const EntryNotification& event)> callback,
unsigned int flags) const;
unsigned int AddListener(
unsigned int local_id,
std::function<void(const EntryNotification& event)> callback,
unsigned int flags) const;

unsigned int AddPolledListener(unsigned int poller_uid, StringRef prefix,
unsigned int flags) const;
unsigned int AddPolledListener(unsigned int poller_uid, unsigned int local_id,
unsigned int flags) const;

// Index-only
unsigned int GetEntry(StringRef name);
std::vector<unsigned int> GetEntries(StringRef prefix, unsigned int types);
Expand Down
44 changes: 4 additions & 40 deletions src/main/native/cpp/ntcore_cpp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,7 @@ NT_EntryListener AddEntryListener(
auto ii = InstanceImpl::Get(i);
if (i < 0 || !ii) return 0;

unsigned int uid = ii->entry_notifier.Add(callback, prefix, flags);
// perform immediate notifications
if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0) {
for (auto& i : ii->storage.GetEntries(prefix, 0)) {
ii->entry_notifier.NotifyEntry(i, ii->storage.GetEntryName(i),
ii->storage.GetEntryValue(i),
NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW, uid);
}
}
unsigned int uid = ii->storage.AddListener(prefix, callback, flags);
return Handle(i, uid, Handle::kEntryListener);
}

Expand All @@ -279,17 +271,7 @@ NT_EntryListener AddEntryListener(
auto ii = InstanceImpl::Get(i);
if (id < 0 || !ii) return 0;

unsigned int uid = ii->entry_notifier.Add(callback, id, flags);
// perform immediate notifications
if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0) {
auto name = ii->storage.GetEntryName(id);
auto value = ii->storage.GetEntryValue(id);
// if no name or value, don't notify
if (!name.empty() && value) {
ii->entry_notifier.NotifyEntry(id, name, value,
NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW, uid);
}
}
unsigned int uid = ii->storage.AddListener(id, callback, flags);
return Handle(i, uid, Handle::kEntryListener);
}

Expand Down Expand Up @@ -319,15 +301,7 @@ NT_EntryListener AddPolledEntryListener(NT_EntryListenerPoller poller,
auto ii = InstanceImpl::Get(i);
if (id < 0 || !ii) return 0;

unsigned int uid = ii->entry_notifier.AddPolled(id, prefix, flags);
// perform immediate notifications
if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0) {
for (auto& i : ii->storage.GetEntries(prefix, 0)) {
ii->entry_notifier.NotifyEntry(i, ii->storage.GetEntryName(i),
ii->storage.GetEntryValue(i),
NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW, uid);
}
}
unsigned int uid = ii->storage.AddPolledListener(id, prefix, flags);
return Handle(i, uid, Handle::kEntryListener);
}

Expand All @@ -344,17 +318,7 @@ NT_EntryListener AddPolledEntryListener(NT_EntryListenerPoller poller,
if (p_id < 0) return 0;
if (handle.GetInst() != phandle.GetInst()) return 0;

unsigned int uid = ii->entry_notifier.AddPolled(p_id, entry, flags);
// perform immediate notifications
if ((flags & NT_NOTIFY_IMMEDIATE) != 0 && (flags & NT_NOTIFY_NEW) != 0) {
auto name = ii->storage.GetEntryName(id);
auto value = ii->storage.GetEntryValue(id);
// if no name or value, don't notify
if (!name.empty() && value) {
ii->entry_notifier.NotifyEntry(id, name, value,
NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW, uid);
}
}
unsigned int uid = ii->storage.AddPolledListener(p_id, id, flags);
return Handle(i, uid, Handle::kEntryListener);
}

Expand Down
14 changes: 14 additions & 0 deletions src/test/native/cpp/MockEntryNotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ namespace nt {
class MockEntryNotifier : public IEntryNotifier {
public:
MOCK_CONST_METHOD0(local_notifiers, bool());
MOCK_METHOD3(
Add,
unsigned int(std::function<void(const EntryNotification& event)> callback,
llvm::StringRef prefix, unsigned int flags));
MOCK_METHOD3(
Add,
unsigned int(std::function<void(const EntryNotification& event)> callback,
unsigned int local_id, unsigned int flags));
MOCK_METHOD3(AddPolled,
unsigned int(unsigned int poller_uid, llvm::StringRef prefix,
unsigned int flags));
MOCK_METHOD3(AddPolled,
unsigned int(unsigned int poller_uid, unsigned int local_id,
unsigned int flags));
MOCK_METHOD5(NotifyEntry,
void(unsigned int local_id, StringRef name,
std::shared_ptr<Value> value, unsigned int flags,
Expand Down

0 comments on commit e4a8bff

Please sign in to comment.