Skip to content
This repository has been archived by the owner on Feb 25, 2023. It is now read-only.

Commit

Permalink
Add scale method, fix data race issues (#8)
Browse files Browse the repository at this point in the history
* Add scale method, fix data race issues
  • Loading branch information
peakle authored Mar 21, 2022
1 parent 07f5db4 commit 4417f88
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ lint:

test:
@echo "Running tests..."
@go test ./... -cover -short -count=1
@go test ./... -cover -short -count=1 -race

ci-lint: install-linter lint

Expand Down
77 changes: 45 additions & 32 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type WorkerPool struct {
activeWorkers *int64
freeWorkers *int64
taskCount *int64
workersCapacity int64
workersCapacity *int64

process TaskProcessor
taskCh chan interface{}
Expand All @@ -23,7 +23,7 @@ type WorkerPool struct {
shutdownCtx context.Context
cancelFunc context.CancelFunc
wg *sync.WaitGroup
isClosed int32
isClosed *int64

logger logger
}
Expand Down Expand Up @@ -53,19 +53,22 @@ func Create(ctx context.Context, processor TaskProcessor, opts ...Option) *Worke
config.Capacity = 1
}
ctx, cancel := context.WithCancel(ctx)
wg := &sync.WaitGroup{}
wg.Add(1)

return &WorkerPool{
activeWorkers: ptrOfInt64(0),
freeWorkers: ptrOfInt64(0),
taskCount: ptrOfInt64(0),
workersCapacity: config.Capacity,
workersCapacity: ptrOfInt64(config.Capacity),
process: processor,
taskCh: make(chan interface{}, 2*config.Capacity),
cfg: &config,
shutdownCtx: ctx,
cancelFunc: cancel,
wg: &sync.WaitGroup{},
wg: wg,
logger: log.Default(),
isClosed: ptrOfInt64(0),
}
}

Expand All @@ -76,7 +79,7 @@ func (wp *WorkerPool) SetLogger(logger logger) {

// Submit - submit task to pool
func (wp *WorkerPool) Submit(task interface{}) {
if wp.isClosed == 1 {
if atomic.LoadInt64(wp.isClosed) == 1 {
return
}

Expand All @@ -87,7 +90,7 @@ func (wp *WorkerPool) Submit(task interface{}) {

// SubmitAsync - submit task to pool, for async better use this method
func (wp *WorkerPool) SubmitAsync(task interface{}) {
if wp.isClosed == 1 {
if atomic.LoadInt64(wp.isClosed) == 1 {
return
}

Expand All @@ -113,8 +116,43 @@ func (wp *WorkerPool) Wait() {
}
}

// Close - close worker pool and release all resources, not processed tasks will be thrown away
func (wp *WorkerPool) Close() {
atomic.StoreInt64(wp.isClosed, 1)
wp.cancelFunc()
wp.wg.Add(-1)
wp.wg.Wait()
}

// CloseGracefully - close worker pool and release all resources, wait until all task will be processed
func (wp *WorkerPool) CloseGracefully() {
atomic.StoreInt64(wp.isClosed, 1)
closed := make(chan struct{})
go func() {
wp.Wait()
close(closed)
}()

select {
case <-closed:
case <-time.After(wp.cfg.GracefulTimeout):
}

wp.cancelFunc()

wp.wg.Add(-1)
wp.wg.Wait()
}

func (wp *WorkerPool) Scale(delta int64) {
atomic.AddInt64(wp.workersCapacity, delta)
}

func (wp *WorkerPool) retrieveWorker() {
if c := atomic.LoadInt64(wp.activeWorkers); c < wp.workersCapacity {
max := atomic.LoadInt64(wp.workersCapacity)
c := atomic.LoadInt64(wp.activeWorkers)

if c < max {
if atomic.CompareAndSwapInt64(wp.activeWorkers, c, c+1) {
wp.spawnWorker()
}
Expand Down Expand Up @@ -167,31 +205,6 @@ func (wp *WorkerPool) spawnWorker() {
}()
}

// Close - close worker pool and release all resources, not processed tasks will be thrown away
func (wp *WorkerPool) Close() {
atomic.StoreInt32(&wp.isClosed, 1)
wp.cancelFunc()
wp.wg.Wait()
}

// CloseGracefully - close worker pool and release all resources, wait until all task will be processed
func (wp *WorkerPool) CloseGracefully() {
atomic.StoreInt32(&wp.isClosed, 1)
closed := make(chan struct{})
go func() {
wp.Wait()
close(closed)
}()

select {
case <-closed:
case <-time.After(wp.cfg.GracefulTimeout):
}

wp.cancelFunc()
wp.wg.Wait()
}

func ptrOfInt64(i int64) *int64 {
return &i
}
1 change: 1 addition & 0 deletions pool_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func Example() {
pool.Submit(1)
pool.Submit(2)
pool.Submit(3)
pool.Wait()
// Output:
// 1
// 2
Expand Down
97 changes: 86 additions & 11 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bees

import (
"context"
"io"
"log"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -93,6 +94,7 @@ func TestRecoverAfterPanicOnSingleWorker(t *testing.T) {
checkCh <- struct{}{}
panic("aaaaaa")
}, WithKeepAlive(time.Hour), WithJitter(10), WithCapacity(1))
pool.SetLogger(log.New(io.Discard, "", 0))

pool.Submit(task)
<-checkCh // check first execution
Expand All @@ -118,7 +120,7 @@ func TestWorkerExpiration(t *testing.T) {
}
time.Sleep(100 * time.Millisecond)

if *pool.activeWorkers != 0 || *pool.freeWorkers != 0 {
if atomic.LoadInt64(pool.activeWorkers) != 0 || atomic.LoadInt64(pool.freeWorkers) != 0 {
t.Fatalf("active workers found")
}
}
Expand Down Expand Up @@ -167,6 +169,7 @@ func TestOnPanic(t *testing.T) {
WithJitter(1),
WithCapacity(testCount),
)
pool.SetLogger(log.New(io.Discard, "", 0))

stopper := ptrOfInt64(0)
var wg sync.WaitGroup
Expand All @@ -185,21 +188,23 @@ func TestOnPanic(t *testing.T) {
wg.Wait()
pool.Wait()

if *pool.taskCount != 0 || len(pool.taskCh) != 0 {
t.Fatalf("unconsistent task count: taskCount: %d, channel len: %d", *pool.taskCount, len(pool.taskCh))
taskCount := atomic.LoadInt64(pool.taskCount)
taskChLen := len(pool.taskCh)
if taskCount != 0 || taskChLen != 0 {
t.Fatalf("unconsistent task count: taskCount: %d, channel len: %d", taskCount, taskChLen)
}

pool.Close()

if *pool.activeWorkers != 0 {
t.Fatalf("unconsistent active workers count: expected zero, actual: %d", *pool.activeWorkers)
if aw := atomic.LoadInt64(pool.activeWorkers); aw != 0 {
t.Fatalf("unconsistent active workers count: expected zero, actual: %d", pool.activeWorkers)
}

if *pool.freeWorkers != 0 {
t.Fatalf("unconsistent free workers count: expected zero, actual: %d", *pool.freeWorkers)
if fw := atomic.LoadInt64(pool.freeWorkers); fw != 0 {
t.Fatalf("unconsistent free workers count: expected zero, actual: %d", fw)
}

if pool.isClosed != 1 {
if atomic.LoadInt64(pool.isClosed) != 1 {
t.Fatalf("isClosed must be one")
}
}
Expand All @@ -222,8 +227,8 @@ func TestCloseGracefully(t *testing.T) {
}
pool.CloseGracefully()

if atomic.LoadInt64(counter) != 100 {
t.Fatalf("counter not equal: %d", *counter)
if c := atomic.LoadInt64(counter); c != 100 {
t.Fatalf("counter not equal: %d", c)
}
}
func TestCloseGracefullyByTimeout(t *testing.T) {
Expand All @@ -248,7 +253,77 @@ func TestCloseGracefullyByTimeout(t *testing.T) {
start := time.Now()
pool.CloseGracefully()

if end := time.Since(start); end > 6*time.Second {
if end := time.Since(start); end.Round(time.Second) > 6*time.Second {
t.Fatalf("too big wait time: %s", end)
}
}

func TestScale(t *testing.T) {
t.Parallel()

start := time.Now()
defer func() {
if f := time.Since(start); f.Round(time.Second) > 3*time.Second {
t.Fatalf("too long execution: %s", f)
}
}()

pool := Create(context.Background(), func(ctx context.Context, task interface{}) {
time.Sleep(time.Second)
}, WithCapacity(1))

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
pool.Submit(nil)
}()

wg.Wait()
pool.Wait()

if wCap := atomic.LoadInt64(pool.workersCapacity); wCap != 1 {
t.Fatalf("wrong capacity: %d", wCap)
}

pool.Scale(100)
wg.Add(1)

go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
go pool.Submit(nil)
}
}()

if wCap := atomic.LoadInt64(pool.workersCapacity); wCap != 101 {
t.Fatalf("wrong capacity: %d", wCap)
}

wg.Wait()
pool.Wait()

pool.Scale(-100)
wg.Add(1)

go func() {
defer wg.Done()
for i := 0; i < 2; i++ {
go pool.Submit(nil)
}
}()

if wCap := atomic.LoadInt64(pool.workersCapacity); wCap != 1 {
t.Fatalf("wrong capacity: %d", wCap)
}

aw := atomic.LoadInt64(pool.activeWorkers)
fw := atomic.LoadInt64(pool.freeWorkers)
if fw < 0 && aw < 0 {
t.Fatalf("must be grater than zero, because some workers still alive after decrease worker count")
}

wg.Wait()
pool.Wait()
}

0 comments on commit 4417f88

Please sign in to comment.