From debc01a19343a504c3dd5f44063d7b6501611237 Mon Sep 17 00:00:00 2001 From: huanghaoyuan Date: Tue, 28 Mar 2023 20:40:50 +0800 Subject: [PATCH] common: no longer reuse worker Signed-off-by: huanghaoyuan --- core/backup_context.go | 5 ++- internal/common/workerpool.go | 40 ++++++++++++++---------- internal/common/workerpool_test.go | 50 ++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 18 deletions(-) create mode 100644 internal/common/workerpool_test.go diff --git a/core/backup_context.go b/core/backup_context.go index efa173b3..0e7b1df6 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -615,7 +615,10 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp } } - wp := common.NewWorkerPool(ctx, WORKER_NUM, RPS) + wp, err := common.NewWorkerPool(ctx, WORKER_NUM, RPS) + if err != nil { + return backupInfo, err + } wp.Start() for _, segment := range segmentBackupInfos { start := time.Now().Unix() diff --git a/internal/common/workerpool.go b/internal/common/workerpool.go index 97e5425e..73286a1b 100644 --- a/internal/common/workerpool.go +++ b/internal/common/workerpool.go @@ -2,6 +2,8 @@ package common import ( "context" + "errors" + "fmt" "time" "golang.org/x/sync/errgroup" @@ -21,36 +23,40 @@ type WorkerPool struct { type Job func(ctx context.Context) error // NewWorkerPool build a worker pool, rps 0 is unlimited -func NewWorkerPool(ctx context.Context, workerNum int, rps int32) *WorkerPool { +func NewWorkerPool(ctx context.Context, workerNum int, rps int32) (*WorkerPool, error) { + if workerNum <= 0 { + return nil, errors.New("workerpool: worker num can not less than 0") + } g, subCtx := errgroup.WithContext(ctx) + // Including the main worker + g.SetLimit(workerNum + 1) var lim *rate.Limiter if rps != 0 { lim = rate.NewLimiter(rate.Every(time.Second/time.Duration(rps)), 1) } - return &WorkerPool{job: make(chan Job), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx} -} - -func (p *WorkerPool) Start() { - for i := 0; i < p.workerNum; i++ { - p.g.Go(p.work) - } + return &WorkerPool{job: make(chan Job), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx}, nil } +func (p *WorkerPool) Start() { p.g.Go(p.work) } func (p *WorkerPool) work() error { - for job := range p.job { - if p.lim != nil { - if err := p.lim.Wait(p.subCtx); err != nil { - return err + for j := range p.job { + job := j + p.g.Go(func() error { + if p.lim != nil { + if err := p.lim.Wait(p.subCtx); err != nil { + return fmt.Errorf("workerpool: wait token %w", err) + } } - } - if err := job(p.subCtx); err != nil { - return err - } - } + if err := job(p.subCtx); err != nil { + return fmt.Errorf("workerpool: execute job %w", err) + } + return nil + }) + } return nil } diff --git a/internal/common/workerpool_test.go b/internal/common/workerpool_test.go new file mode 100644 index 00000000..99bb5537 --- /dev/null +++ b/internal/common/workerpool_test.go @@ -0,0 +1,50 @@ +package common + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func Test0Worker(t *testing.T) { + _, err := NewWorkerPool(context.Background(), 0, 0) + assert.NotNil(t, err) +} + +func TestRunTaskNoErr(t *testing.T) { + wp, err := NewWorkerPool(context.Background(), 3, 10) + assert.Nil(t, err) + + wp.Start() + + var v atomic.Int64 + for i := 0; i < 10; i++ { + wp.Submit(func(ctx context.Context) error { + v.Add(1) + return nil + }) + } + + wp.Done() + assert.Nil(t, wp.Wait()) + assert.Equal(t, int64(10), v.Load()) +} + +func TestRunTaskReturnErr(t *testing.T) { + wp, err := NewWorkerPool(context.Background(), 10, 10) + assert.Nil(t, err) + + wp.Start() + + for i := 0; i < 100; i++ { + wp.Submit(func(ctx context.Context) error { + return errors.New("some err") + }) + } + + wp.Done() + assert.NotNil(t, wp.Wait()) +}