Skip to content

Commit

Permalink
Merge pull request #115 from huanghaoyuanhhy/fix_workerpool
Browse files Browse the repository at this point in the history
common: no longer reuse worker
  • Loading branch information
yelusion2 authored Apr 3, 2023
2 parents 8c05260 + debc01a commit 2567604
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 18 deletions.
5 changes: 4 additions & 1 deletion core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,10 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
}
}

wp := common.NewWorkerPool(ctx, WORKER_NUM, RPS)
wp, err := common.NewWorkerPool(ctx, WORKER_NUM, RPS)
if err != nil {
return backupInfo, err
}
wp.Start()
for _, segment := range segmentBackupInfos {
start := time.Now().Unix()
Expand Down
40 changes: 23 additions & 17 deletions internal/common/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package common

import (
"context"
"errors"
"fmt"
"time"

"golang.org/x/sync/errgroup"
Expand All @@ -21,36 +23,40 @@ type WorkerPool struct {
type Job func(ctx context.Context) error

// NewWorkerPool build a worker pool, rps 0 is unlimited
func NewWorkerPool(ctx context.Context, workerNum int, rps int32) *WorkerPool {
func NewWorkerPool(ctx context.Context, workerNum int, rps int32) (*WorkerPool, error) {
if workerNum <= 0 {
return nil, errors.New("workerpool: worker num can not less than 0")
}
g, subCtx := errgroup.WithContext(ctx)
// Including the main worker
g.SetLimit(workerNum + 1)

var lim *rate.Limiter
if rps != 0 {
lim = rate.NewLimiter(rate.Every(time.Second/time.Duration(rps)), 1)
}

return &WorkerPool{job: make(chan Job), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx}
}

func (p *WorkerPool) Start() {
for i := 0; i < p.workerNum; i++ {
p.g.Go(p.work)
}
return &WorkerPool{job: make(chan Job), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx}, nil
}

func (p *WorkerPool) Start() { p.g.Go(p.work) }
func (p *WorkerPool) work() error {
for job := range p.job {
if p.lim != nil {
if err := p.lim.Wait(p.subCtx); err != nil {
return err
for j := range p.job {
job := j
p.g.Go(func() error {
if p.lim != nil {
if err := p.lim.Wait(p.subCtx); err != nil {
return fmt.Errorf("workerpool: wait token %w", err)
}
}
}

if err := job(p.subCtx); err != nil {
return err
}
}
if err := job(p.subCtx); err != nil {
return fmt.Errorf("workerpool: execute job %w", err)
}

return nil
})
}
return nil
}

Expand Down
50 changes: 50 additions & 0 deletions internal/common/workerpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package common

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)

func Test0Worker(t *testing.T) {
_, err := NewWorkerPool(context.Background(), 0, 0)
assert.NotNil(t, err)
}

func TestRunTaskNoErr(t *testing.T) {
wp, err := NewWorkerPool(context.Background(), 3, 10)
assert.Nil(t, err)

wp.Start()

var v atomic.Int64
for i := 0; i < 10; i++ {
wp.Submit(func(ctx context.Context) error {
v.Add(1)
return nil
})
}

wp.Done()
assert.Nil(t, wp.Wait())
assert.Equal(t, int64(10), v.Load())
}

func TestRunTaskReturnErr(t *testing.T) {
wp, err := NewWorkerPool(context.Background(), 10, 10)
assert.Nil(t, err)

wp.Start()

for i := 0; i < 100; i++ {
wp.Submit(func(ctx context.Context) error {
return errors.New("some err")
})
}

wp.Done()
assert.NotNil(t, wp.Wait())
}

0 comments on commit 2567604

Please sign in to comment.