-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathArrayBlockingQueue.h
137 lines (120 loc) · 3.79 KB
/
ArrayBlockingQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#ifndef CPPEXECUTOR_ARRAYBLOCKINGQUEUE_H
#define CPPEXECUTOR_ARRAYBLOCKINGQUEUE_H
#include "BlockingQueue.h"
#include <vector>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <chrono>
template<typename T>
class ArrayBlockingQueue : public BlockingQueue<T> {
private:
size_t max_size;
std::deque<T> items;
std::mutex queue_mutex;
std::condition_variable not_empty_condition;
std::condition_variable not_full_condition;
bool stop_;
public:
ArrayBlockingQueue() = delete;
explicit ArrayBlockingQueue(size_t size) : max_size(size), stop_(false) {
}
bool offer(T e) override {
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
if (items.size() >= max_size) return false;
items.push_back(e);
}
not_empty_condition.notify_one();
return true;
}
bool offer(T e, long timeout) override {
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
if (items.size() >= max_size) {
std::cv_status status = not_full_condition.wait_for(lock, std::chrono::milliseconds(timeout));
if (status == std::cv_status::timeout)
return false;
}
items.push_back(e);
}
not_empty_condition.notify_one();
return true;
}
T poll() override {
std::unique_lock<std::mutex> lock(this->queue_mutex);
if (items.empty()) return nullptr;
T ret = std::move(items.front());
items.pop_front();
lock.unlock();
not_full_condition.notify_one();
return ret;
}
T poll(long timeout) override {
std::unique_lock<std::mutex> lock(this->queue_mutex);
if (items.empty()) {
bool status = not_empty_condition.wait_for(lock, std::chrono::milliseconds(timeout),
[this] { return stop_ || !items.empty(); });
if (!status || stop_ && items.empty()) {
return nullptr;
}
}
T ret = std::move(items.front());
items.pop_front();
lock.unlock();
not_full_condition.notify_one();
return ret;
}
T peek() override {
std::unique_lock<std::mutex> lock(this->queue_mutex);
if (items.empty()) return nullptr;
return items.front();
}
void put(T e) override {
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
if (items.size() >= max_size) {
not_full_condition.wait(lock);
}
items.push_back(e);
}
not_empty_condition.notify_one();
}
T take() override {
std::unique_lock<std::mutex> lock(this->queue_mutex);
if (items.empty()) {
not_empty_condition.wait(lock, [this] { return stop_ || !items.empty(); });
if (stop_ && items.empty()) {
return nullptr;
}
}
T ret = std::move(items.front());
items.pop_front();
lock.unlock();
not_full_condition.notify_one();
return ret;
}
bool empty() const override {
return this->items.empty();
}
int remainingCapacity() const override {
return max_size - items.size();
}
bool removeAt(int index) {
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
if (items.size() <= index) return false;
items.erase(index);
}
not_full_condition.notify_one();
return true;
}
void close() override {
this->queue_mutex.lock();
stop_ = true;
this->queue_mutex.unlock();
not_empty_condition.notify_all();
not_full_condition.notify_all();
}
};
#endif //CPPEXECUTOR_ARRAYBLOCKINGQUEUE_H