From a29ed02c897f0b7b5b5f659cc399dc046c283c85 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 12 Oct 2019 18:38:12 +0800 Subject: [PATCH] package taskpool: bugfix, KillIdleWorkers --- .gitignore | 1 + build.sh | 1 + build_linux.sh | 11 ++---- demo/taskpool/main.go | 66 +++++++++++++++++++++++++++++++++++ pkg/taskpool/interface.go | 4 +++ pkg/taskpool/pool.go | 5 ++- pkg/taskpool/taskpool_test.go | 44 +++++++++++++++++------ 7 files changed, 113 insertions(+), 19 deletions(-) create mode 100644 demo/taskpool/main.go diff --git a/.gitignore b/.gitignore index f71bc1f..e5c86d8 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ tmp /TODO.md /pkg/tag +/pkg/taskpool/spinlock.go diff --git a/build.sh b/build.sh index c45558d..6c357c5 100755 --- a/build.sh +++ b/build.sh @@ -25,5 +25,6 @@ LDFlags=" \ cd ${ROOT_DIR}/demo/add_blog_license && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/add_blog_license && cd ${ROOT_DIR}/demo/add_go_license && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/add_go_license && +cd ${ROOT_DIR}/demo/taskpool && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/taskpool && ls -lrt ${ROOT_DIR}/bin && echo 'build done.' diff --git a/build_linux.sh b/build_linux.sh index 50fd136..b77dd45 100755 --- a/build_linux.sh +++ b/build_linux.sh @@ -2,11 +2,6 @@ set -x -ROOT_DIR=`pwd` -echo ${ROOT_DIR}/bin - -if [ ! -d ${ROOT_DIR}/bin ]; then - mkdir bin -fi - -cd ${ROOT_DIR}/demo/connstat && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/connstat_linux +export GOOS=linux +export GOARCH=amd64 +sh build.sh diff --git a/demo/taskpool/main.go b/demo/taskpool/main.go new file mode 100644 index 0000000..1092243 --- /dev/null +++ b/demo/taskpool/main.go @@ -0,0 +1,66 @@ +package main + +import ( + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/taskpool" + "sync" + "time" +) + +var ( + taskNum = 1000 * 10000 + initWorkerNum = 1 //1000 * 20 //1000 * 10 +) + +func originGo() { + nazalog.Debug("> BenchmarkOriginGo") + var wg sync.WaitGroup + for j := 0; j < 1; j++ { + wg.Add(taskNum) + for i := 0; i < taskNum; i++ { + go func() { + time.Sleep(10 * time.Millisecond) + wg.Done() + }() + } + wg.Wait() + } + nazalog.Debug("< BenchmarkOriginGo") +} + +func taskPool() { + nazalog.Debug("> BenchmarkTaskPool") + var wg sync.WaitGroup + var ps []taskpool.Pool + var poolNum = 1 + for i := 0; i < poolNum; i++ { + p, _ := taskpool.NewPool(func(option *taskpool.Option) { + option.InitWorkerNum = initWorkerNum + }) + ps = append(ps, p) + } + + for j := 0; j < 1; j++ { + //b.StartTimer() + wg.Add(taskNum) + for i := 0; i < taskNum; i++ { + ps[i % poolNum].Go(func() { + time.Sleep(10 * time.Millisecond) + wg.Done() + }) + } + wg.Wait() + //b.StopTimer() + //idle, busy := p.Status() + //nazalog.Debugf("done, worker num. idle=%d, busy=%d", idle, busy) // 此时还有个别busy也是正常的,因为只是业务方的任务代码执行完了,可能还没回收到idle队列中 + //p.KillIdleWorkers() + //idle, busy = p.Status() + //nazalog.Debugf("killed, worker num. idle=%d, busy=%d", idle, busy) + } + nazalog.Debug("< BenchmarkTaskPool") +} + +func main() { + taskPool() + //originGo() +} diff --git a/pkg/taskpool/interface.go b/pkg/taskpool/interface.go index a774142..f7f1dd7 100644 --- a/pkg/taskpool/interface.go +++ b/pkg/taskpool/interface.go @@ -6,6 +6,10 @@ // // Author: Chef (191201771@qq.com) +// TODO +// 1. 尝试替换掉 list.List +// 2. channel 通信是否能替换成其他方式 + package taskpool import ( diff --git a/pkg/taskpool/pool.go b/pkg/taskpool/pool.go index 18f4d9f..a48fcd2 100644 --- a/pkg/taskpool/pool.go +++ b/pkg/taskpool/pool.go @@ -18,6 +18,7 @@ type pool struct { idleWorkerNum int32 busyWorkerNum int32 + //m SpinLock m sync.Mutex idleWorkerList *list.List } @@ -44,9 +45,11 @@ func (p *pool) Go(task Task) { func (p *pool) KillIdleWorkers() { p.m.Lock() n := p.idleWorkerList.Len() - for e := p.idleWorkerList.Front(); e != nil; e = e.Next() { + var next *list.Element + for e := p.idleWorkerList.Front(); e != nil; e = next { w := e.Value.(*Worker) w.Stop() + next = e.Next() p.idleWorkerList.Remove(e) } atomic.AddInt32(&p.idleWorkerNum, int32(-n)) diff --git a/pkg/taskpool/taskpool_test.go b/pkg/taskpool/taskpool_test.go index 6b3946b..150e43a 100644 --- a/pkg/taskpool/taskpool_test.go +++ b/pkg/taskpool/taskpool_test.go @@ -18,45 +18,56 @@ import ( var ( taskNum = 1000 * 1000 - initWorkerNum = 1000 + initWorkerNum = 1 //1000 * 20 //1000 * 10 ) func BenchmarkOriginGo(b *testing.B) { + nazalog.Debug("> BenchmarkOriginGo") var wg sync.WaitGroup - for j := 0; j < b.N; j++ { + for j := 0; j < 1; j++ { wg.Add(taskNum) for i := 0; i < taskNum; i++ { go func() { - time.Sleep(1 * time.Millisecond) + time.Sleep(10 * time.Millisecond) wg.Done() }() } wg.Wait() } + nazalog.Debug("< BenchmarkOriginGo") } func BenchmarkTaskPool(b *testing.B) { + nazalog.Debug("> BenchmarkTaskPool") var wg sync.WaitGroup p, _ := NewPool(func(option *Option) { option.InitWorkerNum = initWorkerNum }) + //var ps []Pool + //var poolNum = 1 + //for i := 0; i < poolNum; i++ { + // ps = append(ps, p) + //} b.ResetTimer() - for j := 0; j < b.N; j++ { + for j := 0; j < 1; j++ { + //b.StartTimer() wg.Add(taskNum) for i := 0; i < taskNum; i++ { p.Go(func() { - time.Sleep(1 * time.Millisecond) + time.Sleep(10 * time.Millisecond) wg.Done() }) } wg.Wait() - idle, busy := p.Status() - nazalog.Debugf("done, worker num. idle=%d, busy=%d", idle, busy) // 此时还有个别busy也是正常的,因为只是业务方的任务代码执行完了,可能还没回收到idle队列中 - p.KillIdleWorkers() - idle, busy = p.Status() - nazalog.Debugf("killed, worker num. idle=%d, busy=%d", idle, busy) + //b.StopTimer() + //idle, busy := p.Status() + //nazalog.Debugf("done, worker num. idle=%d, busy=%d", idle, busy) // 此时还有个别busy也是正常的,因为只是业务方的任务代码执行完了,可能还没回收到idle队列中 + //p.KillIdleWorkers() + //idle, busy = p.Status() + //nazalog.Debugf("killed, worker num. idle=%d, busy=%d", idle, busy) } + nazalog.Debug("< BenchmarkTaskPool") } func TestTaskPool(t *testing.T) { @@ -85,6 +96,19 @@ func TestTaskPool(t *testing.T) { p.KillIdleWorkers() idle, busy = p.Status() nazalog.Debugf("killed, worker num. idle=%d, busy=%d", idle, busy) + + time.Sleep(100 * time.Millisecond) + + wg.Add(n) + for i := 0; i < n; i++ { + p.Go(func() { + time.Sleep(10 * time.Millisecond) + wg.Done() + }) + } + wg.Wait() + idle, busy = p.Status() + nazalog.Debugf("done, worker num. idle=%d, busy=%d", idle, busy) // 此时还有个别busy也是正常的,因为只是业务方的任务代码执行完了,可能还没回收到idle队列中 } func TestGlobal(t *testing.T) {