-
Notifications
You must be signed in to change notification settings - Fork 0
/
sharedMemory.cpp
125 lines (102 loc) · 3.05 KB
/
sharedMemory.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <climits>
#include <condition_variable>
#include <fstream>
#define INF INT_MAX / 2
using namespace std;
mutex mu_;
condition_variable cv_;
enum class ret_status
{
SUCCESS,
FAILURE
};
class Logger {
public:
void operator()(std::string log_str, int log_int) {
//Below line will gurantee, that logfile will open once by all threads
std::call_once(flag_, [&]() { logfile_.open("pub_sub_log.txt"); });
// adopt_lock_t : assume the calling thread already has ownership of the mutex
std::lock_guard<std::mutex> glock(mtx_, adopt_lock);
logfile_ << "Thread id " << this_thread::get_id() << " :\t";
logfile_ << log_str << log_int << endl;
}
~Logger() {
logfile_.close();
}
private:
mutex mtx_;
once_flag flag_;
ofstream logfile_;
};
class Publish {
public:
ret_status operator()(std::queue<int> &msg_que, std::vector<int> data_bus, Logger &log) {
// After processing of Data Bus the termination condition for subscriber will trigger
data_bus.push_back(INF);
try {
for (auto data : data_bus) {
std::unique_lock<std::mutex> ulock(mu_, defer_lock); // defer_lock_t: do not acquire ownership of the mutex
ulock.lock();
msg_que.push(data);
log("Published:\t", data);
ulock.unlock();
// Notify All other threads, who are waiting on this thread
cv_.notify_all();
}
}
catch (exception e) {
cerr << e.what() << endl;
return ret_status::FAILURE;
}
return ret_status::SUCCESS;
}
};
class Subscribe {
public:
ret_status operator()(std::queue<int> &msg_que, Logger &log) {
int data = 0;
try {
while (true) {
unique_lock<mutex> ulock(mu_);
/**
* Waiting using the same cv_ variable.
* Also, to protect Spurious wake, we are using one extra condition in the lambda function.
*/
cv_.wait(ulock, [=]() { return !msg_que.empty(); });
data = msg_que.front();
msg_que.pop();
ulock.unlock();
if (data == INF) {
break;
}
log("Subscribed:\t", data);
}
}
catch (exception e) {
cerr << e.what() << endl;
return ret_status::FAILURE;
}
return ret_status::SUCCESS;
}
};
int main() {
try {
std::queue<int> msg_que; //Act as a shared memory
Publish pub;
Subscribe sub;
std::vector<int> data_bus = {5, 9, 6, 8, 99, 999, 158};
Logger log;
std::thread t1(pub, ref(msg_que), data_bus, ref(log));
std::thread t2(sub, ref(msg_que), ref(log));
t1.join();
t2.join();
}
catch (exception e) {
cerr << e.what() << endl;
}
return 0;
}