package taskpool: bugfix, KillIdleWorkers

pull/2/head
q191201771 5 years ago
parent 5b2c117a5a
commit a29ed02c89

1
.gitignore vendored

@ -10,3 +10,4 @@ tmp
/TODO.md
/pkg/tag
/pkg/taskpool/spinlock.go

@ -25,5 +25,6 @@ LDFlags=" \
cd ${ROOT_DIR}/demo/add_blog_license && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/add_blog_license &&
cd ${ROOT_DIR}/demo/add_go_license && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/add_go_license &&
cd ${ROOT_DIR}/demo/taskpool && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/taskpool &&
ls -lrt ${ROOT_DIR}/bin &&
echo 'build done.'

@ -2,11 +2,6 @@
set -x
ROOT_DIR=`pwd`
echo ${ROOT_DIR}/bin
if [ ! -d ${ROOT_DIR}/bin ]; then
mkdir bin
fi
cd ${ROOT_DIR}/demo/connstat && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/connstat_linux
export GOOS=linux
export GOARCH=amd64
sh build.sh

@ -0,0 +1,66 @@
package main
import (
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/taskpool"
"sync"
"time"
)
var (
taskNum = 1000 * 10000
initWorkerNum = 1 //1000 * 20 //1000 * 10
)
func originGo() {
nazalog.Debug("> BenchmarkOriginGo")
var wg sync.WaitGroup
for j := 0; j < 1; j++ {
wg.Add(taskNum)
for i := 0; i < taskNum; i++ {
go func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
}()
}
wg.Wait()
}
nazalog.Debug("< BenchmarkOriginGo")
}
func taskPool() {
nazalog.Debug("> BenchmarkTaskPool")
var wg sync.WaitGroup
var ps []taskpool.Pool
var poolNum = 1
for i := 0; i < poolNum; i++ {
p, _ := taskpool.NewPool(func(option *taskpool.Option) {
option.InitWorkerNum = initWorkerNum
})
ps = append(ps, p)
}
for j := 0; j < 1; j++ {
//b.StartTimer()
wg.Add(taskNum)
for i := 0; i < taskNum; i++ {
ps[i % poolNum].Go(func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
})
}
wg.Wait()
//b.StopTimer()
//idle, busy := p.Status()
//nazalog.Debugf("done, worker num. idle=%d, busy=%d", idle, busy) // 此时还有个别busy也是正常的因为只是业务方的任务代码执行完了可能还没回收到idle队列中
//p.KillIdleWorkers()
//idle, busy = p.Status()
//nazalog.Debugf("killed, worker num. idle=%d, busy=%d", idle, busy)
}
nazalog.Debug("< BenchmarkTaskPool")
}
func main() {
taskPool()
//originGo()
}

@ -6,6 +6,10 @@
//
// Author: Chef (191201771@qq.com)
// TODO
// 1. 尝试替换掉 list.List
// 2. channel 通信是否能替换成其他方式
package taskpool
import (

@ -18,6 +18,7 @@ type pool struct {
idleWorkerNum int32
busyWorkerNum int32
//m SpinLock
m sync.Mutex
idleWorkerList *list.List
}
@ -44,9 +45,11 @@ func (p *pool) Go(task Task) {
func (p *pool) KillIdleWorkers() {
p.m.Lock()
n := p.idleWorkerList.Len()
for e := p.idleWorkerList.Front(); e != nil; e = e.Next() {
var next *list.Element
for e := p.idleWorkerList.Front(); e != nil; e = next {
w := e.Value.(*Worker)
w.Stop()
next = e.Next()
p.idleWorkerList.Remove(e)
}
atomic.AddInt32(&p.idleWorkerNum, int32(-n))

@ -18,45 +18,56 @@ import (
var (
taskNum = 1000 * 1000
initWorkerNum = 1000
initWorkerNum = 1 //1000 * 20 //1000 * 10
)
func BenchmarkOriginGo(b *testing.B) {
nazalog.Debug("> BenchmarkOriginGo")
var wg sync.WaitGroup
for j := 0; j < b.N; j++ {
for j := 0; j < 1; j++ {
wg.Add(taskNum)
for i := 0; i < taskNum; i++ {
go func() {
time.Sleep(1 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
wg.Done()
}()
}
wg.Wait()
}
nazalog.Debug("< BenchmarkOriginGo")
}
func BenchmarkTaskPool(b *testing.B) {
nazalog.Debug("> BenchmarkTaskPool")
var wg sync.WaitGroup
p, _ := NewPool(func(option *Option) {
option.InitWorkerNum = initWorkerNum
})
//var ps []Pool
//var poolNum = 1
//for i := 0; i < poolNum; i++ {
// ps = append(ps, p)
//}
b.ResetTimer()
for j := 0; j < b.N; j++ {
for j := 0; j < 1; j++ {
//b.StartTimer()
wg.Add(taskNum)
for i := 0; i < taskNum; i++ {
p.Go(func() {
time.Sleep(1 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
wg.Done()
})
}
wg.Wait()
idle, busy := p.Status()
nazalog.Debugf("done, worker num. idle=%d, busy=%d", idle, busy) // 此时还有个别busy也是正常的因为只是业务方的任务代码执行完了可能还没回收到idle队列中
p.KillIdleWorkers()
idle, busy = p.Status()
nazalog.Debugf("killed, worker num. idle=%d, busy=%d", idle, busy)
//b.StopTimer()
//idle, busy := p.Status()
//nazalog.Debugf("done, worker num. idle=%d, busy=%d", idle, busy) // 此时还有个别busy也是正常的因为只是业务方的任务代码执行完了可能还没回收到idle队列中
//p.KillIdleWorkers()
//idle, busy = p.Status()
//nazalog.Debugf("killed, worker num. idle=%d, busy=%d", idle, busy)
}
nazalog.Debug("< BenchmarkTaskPool")
}
func TestTaskPool(t *testing.T) {
@ -85,6 +96,19 @@ func TestTaskPool(t *testing.T) {
p.KillIdleWorkers()
idle, busy = p.Status()
nazalog.Debugf("killed, worker num. idle=%d, busy=%d", idle, busy)
time.Sleep(100 * time.Millisecond)
wg.Add(n)
for i := 0; i < n; i++ {
p.Go(func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
})
}
wg.Wait()
idle, busy = p.Status()
nazalog.Debugf("done, worker num. idle=%d, busy=%d", idle, busy) // 此时还有个别busy也是正常的因为只是业务方的任务代码执行完了可能还没回收到idle队列中
}
func TestGlobal(t *testing.T) {

Loading…
Cancel
Save