-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtask.go
128 lines (108 loc) · 2.9 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package throttler
import (
"context"
"errors"
"time"
)
// Executable interface that all tasks must implement.
type Executable[T any, R any] interface {
Execute(ctx context.Context, input T) (R, error)
}
// TaskOptions provides common configurations for tasks.
type TaskOptions struct {
ID string
Priority int
retryCount int
MaxRetries int
Backoff time.Duration
Timeout time.Duration
Context context.Context
}
// TaskResult encapsulates the output or error after task execution.
type TaskResult[R any] struct {
TaskID string
Result R
Error error
RetryCount int
CompletedAt time.Time
}
// Task wraps inputs, outputs, and execution logic for a generic task.
type TaskInfo[T any, R any] struct {
TaskOptions
input T
task Executable[T, R]
order int64
result chan TaskResult[R]
}
// TaskQueue implements a priority queue as a min-heap.
type TaskQueue[T any, R any] struct {
tasks []*TaskInfo[T, R]
}
// NewTaskQueue creates a new task queue.
func NewTaskQueue[T any, R any]() *TaskQueue[T, R] {
return &TaskQueue[T, R]{
tasks: make([]*TaskInfo[T, R], 0),
}
}
// Len returns the number of tasks in the queue.
func (q *TaskQueue[T, R]) Len() int {
return len(q.tasks)
}
// Insert adds a task into the queue and maintains heap properties.
func (q *TaskQueue[T, R]) Insert(task *TaskInfo[T, R]) {
q.tasks = append(q.tasks, task)
q.bubbleUp(len(q.tasks) - 1)
}
// Extract removes and returns the task with the highest priority.
func (q *TaskQueue[T, R]) Extract() (*TaskInfo[T, R], error) {
if q.Len() == 0 {
return nil, errors.New("task queue is empty")
}
// Swap root with last element.
n := len(q.tasks) - 1
q.swap(0, n)
// Remove the last element (smallest task).
task := q.tasks[n]
q.tasks = q.tasks[:n]
// Restore heap properties.
if len(q.tasks) > 0 {
q.bubbleDown(0)
}
return task, nil
}
// bubbleUp restores the heap property upwards from the given index.
func (q *TaskQueue[T, R]) bubbleUp(index int) {
parent := (index - 1) / 2
if parent >= 0 && q.less(index, parent) {
q.swap(index, parent)
q.bubbleUp(parent)
}
}
// bubbleDown restores the heap property downwards from the given index.
func (q *TaskQueue[T, R]) bubbleDown(index int) {
left := 2*index + 1
right := 2*index + 2
smallest := index
if left < len(q.tasks) && q.less(left, smallest) {
smallest = left
}
if right < len(q.tasks) && q.less(right, smallest) {
smallest = right
}
if smallest != index {
q.swap(index, smallest)
q.bubbleDown(smallest)
}
}
// less determines the priority between two tasks.
// smaller the number higher the priority
func (q *TaskQueue[T, R]) less(i, j int) bool {
if q.tasks[i].Priority == q.tasks[j].Priority {
return q.tasks[i].order < q.tasks[j].order
}
return q.tasks[i].Priority < q.tasks[j].Priority
}
// swap exchanges two tasks in the heap.
func (q *TaskQueue[T, R]) swap(i, j int) {
q.tasks[i], q.tasks[j] = q.tasks[j], q.tasks[i]
}