-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool_test.go
67 lines (63 loc) · 1.67 KB
/
pool_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package gpool
import (
"context"
"fmt"
"testing"
"time"
)
func TestNewWorkerPool(t *testing.T) {
numJobs := 1000
handlerA := func(ctx context.Context, i int) (int, error) {
time.Sleep(10 * time.Millisecond)
return i * 2, nil
}
handlerB := func(ctx context.Context, s string) (string, error) {
time.Sleep(10 * time.Millisecond)
return "Hello " + s, nil
}
procA := NewProcessor[int, int](handlerA)
procB := NewBufferedProcessor[string, string](1, handlerB)
pool := NewPool(context.Background(), 100, procA, procB)
aTasks := make([]*Task[int, int], numJobs)
bTasks := make([]*Task[string, string], numJobs)
for i := 0; i < numJobs; i++ {
aTasks[i] = procA.Submit(i)
bTasks[i] = procB.Submit(fmt.Sprintf("World %d", i))
}
for i := 0; i < numJobs; i++ {
outA, _ := aTasks[i].Await().Unwrap()
if outA != i*2 {
t.Errorf("Expected %d, got %d", i*2, outA)
}
outB, _ := bTasks[i].Await().Unwrap()
if outB != fmt.Sprintf("Hello World %d", i) {
t.Errorf("Expected %s, got %s", fmt.Sprintf("Hello World %d", i), outB)
}
}
// add a new processor
procC := NewProcessor[int, int](func(ctx context.Context, i int) (int, error) {
time.Sleep(10 * time.Millisecond)
return i * 3, nil
})
pool.AddProcessors(procC)
cTasks := make([]*Task[int, int], numJobs)
for i := 0; i < numJobs; i++ {
cTasks[i] = procC.Submit(i)
}
TaskWaitAll(cTasks)
for i, task := range cTasks {
out, _ := task.Result().Unwrap()
fmt.Println(out)
if out != i*3 {
t.Errorf("Expected %d, got %d", i*3, out)
}
}
for i := 0; i < numJobs; i++ {
out, _ := cTasks[i].Await().Unwrap()
fmt.Println(out)
if out != i*3 {
t.Errorf("Expected %d, got %d", i*3, out)
}
}
pool.Stop()
}