[feat] taskpool: 增加Dispose方法,用于完全释放池内资源,包括所有协程

pull/12/head
q191201771 2 years ago
parent 6b9d23078a
commit 4fac4a3516

@ -6,7 +6,7 @@
//
// Author: Chef (191201771@qq.com)
// 非阻塞协程池,协程数量可动态增长,可配置最大协程并发数量,可手动释放空闲的协程
// Package taskpool 非阻塞协程池,协程数量可动态增长,可配置最大协程并发数量,可手动释放空闲的协程
package taskpool
import (
@ -16,6 +16,23 @@ import (
// TODO
// - channel 通信替换成其他方式是否有可能提高性能
// DisposeType
//
// 任务分为3种状态:
//
// 1. 已添加到Pool中)),正在执行
// 2. 已添加,但是还没有被执行
// 3. 还没有添加
//
// DisposeTypeAsap: 1会执行2和3不会
// DisposeTypeRunAllBlockTask: 1和2会执行3不会
type DisposeType int
const (
DisposeTypeAsap DisposeType = iota + 1
DisposeTypeRunAllBlockTask
)
var ErrTaskPool = errors.New("naza.taskpool: fxxk")
type TaskFn func(param ...interface{})
@ -38,11 +55,14 @@ type Pool interface {
//
Go(task TaskFn, param ...interface{})
// 获取当前的状态,注意,只是一个瞬时值
// GetCurrentStatus 获取当前的状态,注意,只是一个瞬时值
GetCurrentStatus() Status
// 关闭池内所有的空闲协程
// KillIdleWorkers 关闭池内所有的空闲协程
KillIdleWorkers()
// Dispose 完全释放池内资源,包括所有协程
Dispose(t DisposeType)
}
type Option struct {

@ -15,15 +15,18 @@ import (
type taskWrapper struct {
taskFn TaskFn
param []interface{}
disposeFlag bool
}
type pool struct {
maxWorkerNum int
m sync.Mutex
totalWorkerNum int
//totalWorkerNum int
idleWorkerList []*worker
blockTaskList []taskWrapper
allWorkerList []*worker
disposeFlag bool
}
func newPool(option Option) *pool {
@ -37,12 +40,18 @@ func newPool(option Option) *pool {
}
func (p *pool) Go(task TaskFn, param ...interface{}) {
p.m.Lock()
defer p.m.Unlock()
if p.disposeFlag {
return
}
tw := taskWrapper{
taskFn: task,
param: param,
}
var w *worker
p.m.Lock()
if len(p.idleWorkerList) != 0 {
// 还有空闲worker
@ -53,7 +62,7 @@ func (p *pool) Go(task TaskFn, param ...interface{}) {
// 无空闲worker
if p.maxWorkerNum == 0 ||
(p.maxWorkerNum > 0 && p.totalWorkerNum < p.maxWorkerNum) {
(p.maxWorkerNum > 0 && len(p.allWorkerList) < p.maxWorkerNum) {
// 无最大worker限制或还未达到限制
p.newWorkerWithTask(tw)
@ -63,24 +72,47 @@ func (p *pool) Go(task TaskFn, param ...interface{}) {
p.blockTaskList = append(p.blockTaskList, tw)
}
}
p.m.Unlock()
}
func (p *pool) KillIdleWorkers() {
p.m.Lock()
p.totalWorkerNum = p.totalWorkerNum - len(p.idleWorkerList)
defer p.m.Unlock()
if p.disposeFlag {
return
}
for i := range p.idleWorkerList {
p.idleWorkerList[i].Stop()
}
p.idleWorkerList = p.idleWorkerList[0:0]
}
func (p *pool) Dispose(t DisposeType) {
p.m.Lock()
defer p.m.Unlock()
if p.disposeFlag {
return
}
p.disposeFlag = true
if t == DisposeTypeAsap {
p.blockTaskList = nil
} else if t == DisposeTypeRunAllBlockTask {
// noop
}
for i := range p.idleWorkerList {
p.idleWorkerList[i].Stop()
}
p.idleWorkerList = p.idleWorkerList[0:0]
p.m.Unlock()
}
func (p *pool) GetCurrentStatus() Status {
p.m.Lock()
defer p.m.Unlock()
return Status{
TotalWorkerNum: p.totalWorkerNum,
TotalWorkerNum: len(p.allWorkerList),
IdleWorkerNum: len(p.idleWorkerList),
BlockTaskNum: len(p.blockTaskList),
}
@ -90,7 +122,7 @@ func (p *pool) newWorker() *worker {
w := NewWorker(p)
w.Start()
p.idleWorkerList = append(p.idleWorkerList, w)
p.totalWorkerNum++
p.allWorkerList = append(p.allWorkerList, w)
return w
}
@ -98,19 +130,35 @@ func (p *pool) newWorkerWithTask(task taskWrapper) {
w := NewWorker(p)
w.Start()
w.Go(task)
p.totalWorkerNum++
p.allWorkerList = append(p.allWorkerList, w)
}
func (p *pool) onIdle(w *worker) {
p.m.Lock()
defer p.m.Unlock()
if len(p.blockTaskList) == 0 {
// 没有等待执行的任务
if p.disposeFlag {
w.Stop()
return
}
p.idleWorkerList = append(p.idleWorkerList, w)
} else {
t := p.blockTaskList[0]
p.blockTaskList = p.blockTaskList[1:]
w.Go(t)
}
p.m.Unlock()
}
func (p *pool) onDispose(w *worker) {
p.m.Lock()
defer p.m.Unlock()
for i := range p.allWorkerList {
if p.allWorkerList[i] == w {
p.allWorkerList = append(p.allWorkerList[0:i], p.allWorkerList[i+1:]...)
break
}
}
}

@ -9,6 +9,7 @@
package taskpool_test
import (
"github.com/q191201771/naza/pkg/nazaatomic"
"sync"
"sync/atomic"
"testing"
@ -164,3 +165,87 @@ func TestCorner(t *testing.T) {
})
assert.Equal(t, taskpool.ErrTaskPool, err)
}
func TestPool_Dispose2(t *testing.T) {
// 测试 DisposeTypeRunAllBlockTask
tp, _ := taskpool.NewPool(func(option *taskpool.Option) {
option.InitWorkerNum = 2
option.MaxWorkerNum = 2
})
var count nazaatomic.Int32
for i := 0; i < 10; i++ {
tp.Go(func(param ...interface{}) {
ii := param[0].(int)
time.Sleep(time.Duration(10) * time.Millisecond)
nazalog.Debugf("%d", ii)
count.Increment()
}, i)
}
nazalog.Debugf("%+v", tp.GetCurrentStatus())
tp.Dispose(taskpool.DisposeTypeRunAllBlockTask)
nazalog.Debugf("%+v", tp.GetCurrentStatus())
time.Sleep(200 * time.Millisecond)
assert.Equal(t, 10, int(count.Load()))
nazalog.Debugf("%+v", tp.GetCurrentStatus())
// 测试空闲情况dispose
{
tpp, _ := taskpool.NewPool(func(option *taskpool.Option) {
option.InitWorkerNum = 2
})
nazalog.Debugf("%+v", tp.GetCurrentStatus())
tpp.Dispose(taskpool.DisposeTypeAsap)
nazalog.Debugf("%+v", tp.GetCurrentStatus())
}
}
func TestPool_Dispose(t *testing.T) {
tp, _ := taskpool.NewPool(func(option *taskpool.Option) {
option.InitWorkerNum = 1
option.MaxWorkerNum = 1
})
var v nazaatomic.Int32
// 任务1在dispose之前已经被执行的任务但是由于自身的sleep导致没有执行完从而导致任务23在dispose时处于阻塞状态还没有被执行
// 也因此任务2,3在dispose后不再执行
tp.Go(func(param ...interface{}) {
nazalog.Debugf("> task 1")
time.Sleep(100 * time.Millisecond)
nazalog.Debugf("< task 1")
v.Add(1)
})
tp.Go(func(param ...interface{}) {
nazalog.Debugf("> task 2")
time.Sleep(300 * time.Millisecond)
nazalog.Debugf("< task 2")
v.Add(2)
})
tp.Go(func(param ...interface{}) {
nazalog.Debugf("> task 3")
time.Sleep(500 * time.Millisecond)
nazalog.Debugf("< task 3")
v.Add(4)
})
nazalog.Debugf("%+v", tp.GetCurrentStatus())
time.Sleep(50 * time.Millisecond)
nazalog.Debugf("%+v", tp.GetCurrentStatus())
tp.Dispose(taskpool.DisposeTypeAsap)
nazalog.Debugf("%+v", tp.GetCurrentStatus())
time.Sleep(400 * time.Millisecond)
tp.Dispose(taskpool.DisposeTypeAsap)
tp.Go(func(param ...interface{}) {
})
tp.KillIdleWorkers()
nazalog.Debugf("%+v", tp.GetCurrentStatus())
assert.Equal(t, 1, int(v.Load()))
}

@ -23,8 +23,9 @@ func NewWorker(p *pool) *worker {
func (w *worker) Start() {
go func() {
for {
task, ok := <-w.taskChan
if !ok {
task := <-w.taskChan
if task.disposeFlag {
w.p.onDispose(w)
break
}
task.taskFn(task.param...)
@ -34,7 +35,9 @@ func (w *worker) Start() {
}
func (w *worker) Stop() {
close(w.taskChan)
w.taskChan <- taskWrapper{
disposeFlag: true,
}
}
func (w *worker) Go(t taskWrapper) {

Loading…
Cancel
Save