forked from JustinTimperio/gpq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bpq.go
129 lines (109 loc) · 3.03 KB
/
bpq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package gpq
import (
"sync"
"sync/atomic"
)
type Bucket struct {
BucketID int64
Prev, Next *Bucket
}
// Bucket priority queue implementation.
// This is used to keep track of non-empty buckets in the GPQ
// This is a combination of a HashSet, doubly linked list, and a priority queue
// to allow for O(1) removal of buckets and removal of items from the buckets
// and O(1) addition of buckets and addition of items to the buckets
type BucketPriorityQueue struct {
ActiveBuckets int64
BucketIDs map[int64]*Bucket
First, Last *Bucket
LastRemoved int64
ObjectsInQueue uint64
mutex *sync.Mutex
}
// NewBucketPriorityQueue creates a new BucketPriorityQueue
func NewBucketPriorityQueue() *BucketPriorityQueue {
return &BucketPriorityQueue{
ActiveBuckets: 0,
ObjectsInQueue: 0,
BucketIDs: make(map[int64]*Bucket),
mutex: &sync.Mutex{},
}
}
func (pq *BucketPriorityQueue) Len() *int64 {
return &pq.ActiveBuckets
}
func (pq *BucketPriorityQueue) Peek() (bucketID int64, exists bool) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
if pq.First == nil {
return 0, false
}
return pq.First.BucketID, true
}
func (pq *BucketPriorityQueue) Add(bucketID int64) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
// If the bucket already exists, return
if _, exists := pq.BucketIDs[bucketID]; exists {
return
}
// Create a new bucket
newBucket := &Bucket{BucketID: bucketID}
// If the queue is empty, set the new bucket as the first and last
if pq.First == nil {
pq.First = newBucket
pq.Last = newBucket
} else {
// Find the correct position to insert the new bucket
current := pq.First
for current != nil && current.BucketID < bucketID {
current = current.Next
}
if current == pq.First { // Insert the new bucket at the beginning
newBucket.Next = pq.First
pq.First.Prev = newBucket
pq.First = newBucket
} else if current == nil { // Insert the new bucket at the end
newBucket.Prev = pq.Last
pq.Last.Next = newBucket
pq.Last = newBucket
} else { // Insert the new bucket in the middle
newBucket.Prev = current.Prev
newBucket.Next = current
current.Prev.Next = newBucket
current.Prev = newBucket
}
}
pq.BucketIDs[bucketID] = newBucket
atomic.AddInt64(&pq.ActiveBuckets, 1)
}
func (pq *BucketPriorityQueue) Remove(bucketID int64) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
// If the bucket does not exist, return
bucket, exists := pq.BucketIDs[bucketID]
if !exists {
return
}
// Update the first and last pointers if necessary
if bucket.Prev != nil {
bucket.Prev.Next = bucket.Next
} else {
pq.First = bucket.Next
}
if bucket.Next != nil {
bucket.Next.Prev = bucket.Prev
} else {
pq.Last = bucket.Prev
}
// Remove the bucket from the map and decrement the active bucket count
delete(pq.BucketIDs, bucketID)
atomic.AddInt64(&pq.ActiveBuckets, -1)
atomic.StoreInt64(&pq.LastRemoved, bucketID)
}
func (pq *BucketPriorityQueue) Contains(bucketID int64) bool {
pq.mutex.Lock()
defer pq.mutex.Unlock()
_, exists := pq.BucketIDs[bucketID]
return exists
}