-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
165 lines (138 loc) · 3.38 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Package orbyte is a lightweight & safe (buffer-writer | general object) pool.
package orbyte
import (
"runtime"
"sync"
"sync/atomic"
"github.com/RomiChan/syncx"
)
// Pool lightweight general pool.
type Pool[T any] struct {
countin int32
countout int32
// 64 bit align
outlim int32
inlim int32
// 64 bit align
pool sync.Pool
dupmap syncx.Map[*Item[T], struct{}]
pooler Pooler[T]
noputbak bool
issync bool
}
// NewPool make a new pool from custom pooler.
func NewPool[T any](pooler Pooler[T]) *Pool[T] {
p := new(Pool[T])
p.pooler = pooler
p.pool.New = func() any {
return &Item[T]{pool: p}
}
// default limit
p.outlim = 4096
p.inlim = 4096
return p
}
// SetNoPutBack make it panic on every use-after-destroy.
//
// Enable this to detect coding errors.
func (pool *Pool[T]) SetNoPutBack(on bool) {
pool.noputbak = on
}
// SetSyncItem make it panic on every read-write conflict.
//
// Enable this to detect coding errors.
func (pool *Pool[T]) SetSyncItem(on bool) {
pool.issync = on
}
// LimitOutput will automatically set new item no-autodestroy
// if countout > outlim.
func (pool *Pool[T]) LimitOutput(n int32) {
if n <= 0 {
panic("n must > 0")
}
pool.outlim = n
}
// LimitInputwill automatically set new item no-autodestroy
// if countout > inlim.
func (pool *Pool[T]) LimitInput(n int32) {
if n <= 0 {
panic("n must > 0")
}
pool.inlim = n
}
func (pool *Pool[T]) incin() {
atomic.AddInt32(&pool.countin, 1)
}
func (pool *Pool[T]) decin() {
atomic.AddInt32(&pool.countin, -1)
}
func (pool *Pool[T]) incout() {
atomic.AddInt32(&pool.countout, 1)
}
func (pool *Pool[T]) decout() {
atomic.AddInt32(&pool.countout, -1)
}
func (pool *Pool[T]) newempty() *Item[T] {
item := pool.pool.Get().(*Item[T])
isrecycled := item.stat.hasdestroyed()
if isrecycled {
pool.decin()
pool.dupmap.Delete(item)
}
item.stat = status(0)
isfull := atomic.LoadInt32(&pool.countin) > pool.inlim ||
atomic.LoadInt32(&pool.countout) > pool.outlim
if isfull {
// no out log, no reuse
return item
}
pool.incout()
return item.setautodestroy()
}
func (pool *Pool[T]) put(item *Item[T]) {
runtime.SetFinalizer(item, nil)
item.cfg = nil
item.stat.setdestroyed(true)
if pool.noputbak ||
atomic.LoadInt32(&pool.countin) > pool.inlim {
return
}
_, exist := pool.dupmap.LoadOrStore(item, struct{}{})
if exist {
panic("duplicated put")
}
pool.pool.Put(item)
pool.decout()
pool.incin()
}
// New call this to generate an item.
func (pool *Pool[T]) New(config any) *Item[T] {
item := pool.newempty()
item.cfg = config
item.stat.setbuffered(true)
item.val = pool.pooler.New(config, item.val)
return item
}
// InvolveItem[T any] involve external object into pool.
//
// After that, you must only use the object through Item.
func (pool *Pool[T]) Involve(config, obj any) *Item[T] {
item := pool.newempty()
item.cfg = config
item.stat.setbuffered(true)
item.val = pool.pooler.Parse(obj, item.val)
return item
}
// ParseItem[T any] safely convert obj into pool item without copy.
//
// You can still use the original object elsewhere.
func (pool *Pool[T]) Parse(config, obj any) *Item[T] {
item := pool.newempty()
item.cfg = config
item.val = pool.pooler.Parse(obj, item.val)
return item
}
// CountItems returns total item count outside and inside.
func (pool *Pool[T]) CountItems() (outside, inside int32) {
return atomic.LoadInt32(&pool.countout), atomic.LoadInt32(&pool.countin)
}