diff --git a/pkg/nazamath/wrap_round_test.go b/pkg/nazamath/wrap_round_test.go new file mode 100644 index 0000000..f49f658 --- /dev/null +++ b/pkg/nazamath/wrap_round_test.go @@ -0,0 +1 @@ +package nazamath diff --git a/pkg/taskpool/interface.go b/pkg/taskpool/interface.go index afe80a9..5fa1268 100644 --- a/pkg/taskpool/interface.go +++ b/pkg/taskpool/interface.go @@ -6,7 +6,7 @@ // // Author: Chef (191201771@qq.com) -// 非阻塞协程池,协程数量可动态增长,可配置最大协程并发数量,可手动释放空闲的协程 +// Package taskpool 非阻塞协程池,协程数量可动态增长,可配置最大协程并发数量,可手动释放空闲的协程 package taskpool import ( @@ -16,6 +16,23 @@ import ( // TODO // - channel 通信替换成其他方式是否有可能提高性能 +// DisposeType +// +// 任务分为3种状态: +// +// 1. 已添加(到Pool中)),正在执行 +// 2. 已添加,但是还没有被执行 +// 3. 还没有添加 +// +// DisposeTypeAsap: 1会执行,2和3不会 +// DisposeTypeRunAllBlockTask: 1和2会执行,3不会 +type DisposeType int + +const ( + DisposeTypeAsap DisposeType = iota + 1 + DisposeTypeRunAllBlockTask +) + var ErrTaskPool = errors.New("naza.taskpool: fxxk") type TaskFn func(param ...interface{}) @@ -38,11 +55,14 @@ type Pool interface { // Go(task TaskFn, param ...interface{}) - // 获取当前的状态,注意,只是一个瞬时值 + // GetCurrentStatus 获取当前的状态,注意,只是一个瞬时值 GetCurrentStatus() Status - // 关闭池内所有的空闲协程 + // KillIdleWorkers 关闭池内所有的空闲协程 KillIdleWorkers() + + // Dispose 完全释放池内资源,包括所有协程 + Dispose(t DisposeType) } type Option struct { diff --git a/pkg/taskpool/pool.go b/pkg/taskpool/pool.go index b1602eb..0098946 100644 --- a/pkg/taskpool/pool.go +++ b/pkg/taskpool/pool.go @@ -13,17 +13,20 @@ import ( ) type taskWrapper struct { - taskFn TaskFn - param []interface{} + taskFn TaskFn + param []interface{} + disposeFlag bool } type pool struct { maxWorkerNum int - m sync.Mutex - totalWorkerNum int + m sync.Mutex + //totalWorkerNum int idleWorkerList []*worker blockTaskList []taskWrapper + allWorkerList []*worker + disposeFlag bool } func newPool(option Option) *pool { @@ -37,12 +40,18 @@ func newPool(option Option) *pool { } func (p *pool) Go(task TaskFn, param ...interface{}) { + p.m.Lock() + defer p.m.Unlock() + if p.disposeFlag { + return + } + tw := taskWrapper{ taskFn: task, param: param, } var w *worker - p.m.Lock() + if len(p.idleWorkerList) != 0 { // 还有空闲worker @@ -53,7 +62,7 @@ func (p *pool) Go(task TaskFn, param ...interface{}) { // 无空闲worker if p.maxWorkerNum == 0 || - (p.maxWorkerNum > 0 && p.totalWorkerNum < p.maxWorkerNum) { + (p.maxWorkerNum > 0 && len(p.allWorkerList) < p.maxWorkerNum) { // 无最大worker限制,或还未达到限制 p.newWorkerWithTask(tw) @@ -63,24 +72,47 @@ func (p *pool) Go(task TaskFn, param ...interface{}) { p.blockTaskList = append(p.blockTaskList, tw) } } - p.m.Unlock() } func (p *pool) KillIdleWorkers() { p.m.Lock() - p.totalWorkerNum = p.totalWorkerNum - len(p.idleWorkerList) + defer p.m.Unlock() + if p.disposeFlag { + return + } + + for i := range p.idleWorkerList { + p.idleWorkerList[i].Stop() + } + p.idleWorkerList = p.idleWorkerList[0:0] +} + +func (p *pool) Dispose(t DisposeType) { + p.m.Lock() + defer p.m.Unlock() + if p.disposeFlag { + return + } + + p.disposeFlag = true + + if t == DisposeTypeAsap { + p.blockTaskList = nil + } else if t == DisposeTypeRunAllBlockTask { + // noop + } + for i := range p.idleWorkerList { p.idleWorkerList[i].Stop() } p.idleWorkerList = p.idleWorkerList[0:0] - p.m.Unlock() } func (p *pool) GetCurrentStatus() Status { p.m.Lock() defer p.m.Unlock() return Status{ - TotalWorkerNum: p.totalWorkerNum, + TotalWorkerNum: len(p.allWorkerList), IdleWorkerNum: len(p.idleWorkerList), BlockTaskNum: len(p.blockTaskList), } @@ -90,7 +122,7 @@ func (p *pool) newWorker() *worker { w := NewWorker(p) w.Start() p.idleWorkerList = append(p.idleWorkerList, w) - p.totalWorkerNum++ + p.allWorkerList = append(p.allWorkerList, w) return w } @@ -98,19 +130,35 @@ func (p *pool) newWorkerWithTask(task taskWrapper) { w := NewWorker(p) w.Start() w.Go(task) - p.totalWorkerNum++ + p.allWorkerList = append(p.allWorkerList, w) } func (p *pool) onIdle(w *worker) { p.m.Lock() + defer p.m.Unlock() if len(p.blockTaskList) == 0 { // 没有等待执行的任务 + if p.disposeFlag { + w.Stop() + return + } + p.idleWorkerList = append(p.idleWorkerList, w) } else { t := p.blockTaskList[0] p.blockTaskList = p.blockTaskList[1:] w.Go(t) } - p.m.Unlock() +} + +func (p *pool) onDispose(w *worker) { + p.m.Lock() + defer p.m.Unlock() + for i := range p.allWorkerList { + if p.allWorkerList[i] == w { + p.allWorkerList = append(p.allWorkerList[0:i], p.allWorkerList[i+1:]...) + break + } + } } diff --git a/pkg/taskpool/taskpool_test.go b/pkg/taskpool/taskpool_test.go index d9e978f..6208361 100644 --- a/pkg/taskpool/taskpool_test.go +++ b/pkg/taskpool/taskpool_test.go @@ -9,6 +9,7 @@ package taskpool_test import ( + "github.com/q191201771/naza/pkg/nazaatomic" "sync" "sync/atomic" "testing" @@ -164,3 +165,87 @@ func TestCorner(t *testing.T) { }) assert.Equal(t, taskpool.ErrTaskPool, err) } + +func TestPool_Dispose2(t *testing.T) { + // 测试 DisposeTypeRunAllBlockTask + + tp, _ := taskpool.NewPool(func(option *taskpool.Option) { + option.InitWorkerNum = 2 + option.MaxWorkerNum = 2 + }) + + var count nazaatomic.Int32 + for i := 0; i < 10; i++ { + tp.Go(func(param ...interface{}) { + ii := param[0].(int) + time.Sleep(time.Duration(10) * time.Millisecond) + nazalog.Debugf("%d", ii) + count.Increment() + }, i) + } + + nazalog.Debugf("%+v", tp.GetCurrentStatus()) + tp.Dispose(taskpool.DisposeTypeRunAllBlockTask) + nazalog.Debugf("%+v", tp.GetCurrentStatus()) + time.Sleep(200 * time.Millisecond) + assert.Equal(t, 10, int(count.Load())) + nazalog.Debugf("%+v", tp.GetCurrentStatus()) + + // 测试空闲情况dispose + { + tpp, _ := taskpool.NewPool(func(option *taskpool.Option) { + option.InitWorkerNum = 2 + }) + nazalog.Debugf("%+v", tp.GetCurrentStatus()) + tpp.Dispose(taskpool.DisposeTypeAsap) + nazalog.Debugf("%+v", tp.GetCurrentStatus()) + } +} + +func TestPool_Dispose(t *testing.T) { + tp, _ := taskpool.NewPool(func(option *taskpool.Option) { + option.InitWorkerNum = 1 + option.MaxWorkerNum = 1 + }) + + var v nazaatomic.Int32 + + // 任务1在dispose之前已经被执行的任务,但是由于自身的sleep导致没有执行完,从而导致任务2,3在dispose时处于阻塞状态,还没有被执行 + // 也因此,任务2,,3在dispose后不再执行 + tp.Go(func(param ...interface{}) { + nazalog.Debugf("> task 1") + time.Sleep(100 * time.Millisecond) + nazalog.Debugf("< task 1") + v.Add(1) + }) + + tp.Go(func(param ...interface{}) { + nazalog.Debugf("> task 2") + time.Sleep(300 * time.Millisecond) + nazalog.Debugf("< task 2") + v.Add(2) + }) + + tp.Go(func(param ...interface{}) { + nazalog.Debugf("> task 3") + time.Sleep(500 * time.Millisecond) + nazalog.Debugf("< task 3") + v.Add(4) + }) + + nazalog.Debugf("%+v", tp.GetCurrentStatus()) + time.Sleep(50 * time.Millisecond) + nazalog.Debugf("%+v", tp.GetCurrentStatus()) + tp.Dispose(taskpool.DisposeTypeAsap) + + nazalog.Debugf("%+v", tp.GetCurrentStatus()) + time.Sleep(400 * time.Millisecond) + + tp.Dispose(taskpool.DisposeTypeAsap) + tp.Go(func(param ...interface{}) { + }) + tp.KillIdleWorkers() + nazalog.Debugf("%+v", tp.GetCurrentStatus()) + + assert.Equal(t, 1, int(v.Load())) +} diff --git a/pkg/taskpool/worker.go b/pkg/taskpool/worker.go index 7c9eca7..1ca5d2b 100644 --- a/pkg/taskpool/worker.go +++ b/pkg/taskpool/worker.go @@ -23,8 +23,9 @@ func NewWorker(p *pool) *worker { func (w *worker) Start() { go func() { for { - task, ok := <-w.taskChan - if !ok { + task := <-w.taskChan + if task.disposeFlag { + w.p.onDispose(w) break } task.taskFn(task.param...) @@ -34,7 +35,9 @@ func (w *worker) Start() { } func (w *worker) Stop() { - close(w.taskChan) + w.taskChan <- taskWrapper{ + disposeFlag: true, + } } func (w *worker) Go(t taskWrapper) {