-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtaskqueue.go
81 lines (65 loc) · 1.24 KB
/
taskqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package taskqueue
import (
"container/list"
"errors"
"sync"
"github.com/gogather/safemap"
)
// TaskQueue task queue
type TaskQueue struct {
sync.RWMutex
m *safemap.SafeMap
l *list.List
}
// New new a TaskQueue
func New() *TaskQueue {
return &TaskQueue{
m: safemap.New(),
l: list.New().Init(),
}
}
// Top get top element of TaskQueue
func (tq *TaskQueue) Top() (bool, string, interface{}) {
defer func() {
tq.RUnlock()
}()
tq.RLock()
e := tq.l.Front()
if e == nil {
return false, "", nil
}
taskID := e.Value.(string)
value, _ := tq.m.Get(taskID)
tq.l.Remove(e)
tq.m.Remove(taskID)
return true, taskID, value
}
// Add add element into TaskQueue
func (tq *TaskQueue) Add(taskID string, task interface{}) {
defer func() {
tq.Unlock()
}()
tq.Lock()
tq.m.Put(taskID, task)
tq.l.PushBack(taskID)
return
}
// Remove remove element from TaskQueue
func (tq *TaskQueue) Remove(taskID string) error {
defer func() {
tq.Unlock()
}()
tq.Lock()
for e := tq.l.Front(); e != nil; e = e.Next() {
if e.Value.(string) == taskID {
tq.l.Remove(e)
tq.m.Remove(taskID)
return nil
}
}
return errors.New("remove failed")
}
// Length get length of TaskQueue
func (tq *TaskQueue) Length() int64 {
return (int64)(tq.l.Len())
}