1. 删除 package bufferpool 2. package slicebytepool 添加单元测试

pull/2/head
q191201771 5 years ago
parent 36ddcfdb64
commit b9f6905392

@ -1,141 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"bytes"
"flag"
"fmt"
"math/rand"
"os"
"runtime"
"runtime/pprof"
"sync"
"sync/atomic"
"time"
"github.com/q191201771/naza/pkg/bufferpool"
"github.com/q191201771/naza/pkg/nazalog"
)
var bp bufferpool.BufferPool
//var count int32
var doneCount uint32
var gorutineNum = 1000
var loopNum = 1000
var sleepMSec = time.Duration(10) * time.Millisecond
func size() int {
return random(1, 256*1024)
//return 128 * 1024
//ss := []int{1000, 2000, 5000}
//////ss := []int{128, 1024, 4096, 16384}
//atomic.AddInt32(&count, 1)
//return ss[count % 3]
//count++
//if count > 128 * 1024 {
// count = 1
//}
//return count
}
func random(l, r int) int {
return l + (rand.Int() % (r - l))
}
func originFunc() {
var buf bytes.Buffer
size := size()
buf.Grow(size)
atomic.AddUint32(&doneCount, 1)
time.Sleep(sleepMSec)
}
func bufferPoolFunc() {
size := size()
buf := bp.Get(size)
buf.Grow(size)
time.Sleep(sleepMSec)
bp.Put(buf)
atomic.AddUint32(&doneCount, 1)
}
func main() {
strategy := parseFlag()
nazalog.Debugf("strategy: %d", strategy)
//cpufd, err := os.Create("/tmp/cpu.prof")
//nazalog.FatalIfErrorNotNil(err)
//pprof.StartCPUProfile(cpufd)
//defer pprof.StopCPUProfile()
rand.Seed(time.Now().Unix())
if strategy != 5 {
bp = bufferpool.NewBufferPool(bufferpool.Strategy(strategy))
}
go func() {
for {
if strategy != 5 {
nazalog.Debugf("time. done=%d, pool=%+v", atomic.LoadUint32(&doneCount), bp.RetrieveStatus())
time.Sleep(1 * time.Second)
} else {
nazalog.Debugf("time. done=%d", atomic.LoadUint32(&doneCount))
time.Sleep(1 * time.Second)
}
}
}()
var wg sync.WaitGroup
wg.Add(gorutineNum * loopNum)
nazalog.Debug("> loop.")
for i := 0; i < gorutineNum; i++ {
go func() {
if strategy != 5 {
for j := 0; j < loopNum; j++ {
bufferPoolFunc()
wg.Done()
}
} else {
for j := 0; j < loopNum; j++ {
originFunc()
wg.Done()
}
}
}()
}
wg.Wait()
memfd, err := os.Create(fmt.Sprintf("/tmp/mem%d.prof", strategy))
nazalog.FatalIfErrorNotNil(err)
pprof.WriteHeapProfile(memfd)
memfd.Close()
nazalog.Debug("> GC.")
runtime.GC()
nazalog.Debug("< GC.")
if strategy != 5 {
nazalog.Debugf("%+v", bp.RetrieveStatus())
}
nazalog.Debug("< loop.")
}
func parseFlag() int {
strategy := flag.Int("t", 0, "type: 1. single std pool 2. single slice pool 3. multi std pool 4. multi slice pool 5. origin")
flag.Parse()
if *strategy < 1 || *strategy > 5 {
flag.Usage()
os.Exit(1)
}
return *strategy
}

@ -1,112 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package bufferpool
import (
"bytes"
"sync/atomic"
)
var (
minSize = 1024
maxSize = 1073741824
)
type bufferPool struct {
strategy Strategy
singleBucket Bucket
capToFreeBucket map[int]Bucket
status Status
}
func (bp *bufferPool) Get(size int) *bytes.Buffer {
atomic.AddInt64(&bp.status.getCount, 1)
var bucket Bucket
if bp.strategy == StrategyMultiStdPoolBucket || bp.strategy == StrategyMultiSlicePoolBucket {
ss := up2power(size)
if ss < minSize {
ss = minSize
}
bucket = bp.capToFreeBucket[ss]
} else {
bucket = bp.singleBucket
}
buf := bucket.Get()
if buf == nil {
return &bytes.Buffer{}
}
atomic.AddInt64(&bp.status.hitCount, 1)
atomic.AddInt64(&bp.status.sizeBytes, int64(-buf.Cap()))
return buf
}
func (bp *bufferPool) Put(buf *bytes.Buffer) {
c := buf.Cap()
atomic.AddInt64(&bp.status.putCount, 1)
atomic.AddInt64(&bp.status.sizeBytes, int64(c))
var bucket Bucket
if bp.strategy == StrategyMultiStdPoolBucket || bp.strategy == StrategyMultiSlicePoolBucket {
size := down2power(c)
if size < minSize {
size = minSize
}
bucket = bp.capToFreeBucket[size]
} else {
bucket = bp.singleBucket
}
bucket.Put(buf)
}
func (bp *bufferPool) RetrieveStatus() Status {
return Status{
getCount: atomic.LoadInt64(&bp.status.getCount),
putCount: atomic.LoadInt64(&bp.status.putCount),
hitCount: atomic.LoadInt64(&bp.status.hitCount),
sizeBytes: atomic.LoadInt64(&bp.status.sizeBytes),
}
}
// @return 范围为 [2, 4, 8, 16, ..., 1073741824]如果大于等于1073741824则直接返回n
func up2power(n int) int {
if n >= maxSize {
return n
}
var i uint32
for ; n > (2 << i); i++ {
}
return 2 << i
}
// @return 范围为 [2, 4, 8, 16, ..., 1073741824]
func down2power(n int) int {
if n < 2 {
return 2
} else if n >= maxSize {
return maxSize
}
var i uint32
for {
nn := 2 << i
if n > nn {
i++
} else if n == nn {
return n
} else if n < nn {
return 2 << (i - 1)
}
}
}

@ -1,95 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package bufferpool
import (
"bytes"
"math/rand"
"testing"
"time"
)
var bp BufferPool
var count int32
func size() int {
return random(0, 128*1024)
//return 128 * 1024
//ss := []int{1000, 2000, 5000}
//////ss := []int{128, 1024, 4096, 16384}
//atomic.AddInt32(&count, 1)
//return ss[count % 3]
//count++
//if count > 128 * 1024 {
// count = 1
//}
//return count
}
func random(l, r int) int {
return l + (rand.Int() % (r - l))
}
func originFunc() {
var buf bytes.Buffer
size := size()
buf.Grow(size)
}
func bufferPoolFunc() {
size := size()
buf := bp.Get(size)
buf.Grow(size)
bp.Put(buf)
}
func BenchmarkOrigin(b *testing.B) {
for i := 0; i < b.N; i++ {
originFunc()
}
}
func BenchmarkBufferPool(b *testing.B) {
bp = NewBufferPool(StrategyMultiStdPoolBucket)
b.ResetTimer()
for i := 0; i < b.N; i++ {
bufferPoolFunc()
}
//nazalog.Debugf("%+v", bp.RetrieveStatus())
}
//func BenchmarkOriginParallel(b *testing.B) {
// for i := 0; i < b.N; i++ {
// b.RunParallel(func(pb *testing.PB) {
// for pb.Next() {
// originFunc()
// }
// })
// }
//}
//
//func BenchmarkBufferPoolParallel(b *testing.B) {
// bp = NewBufferPool()
// b.ResetTimer()
// for i := 0; i < b.N; i++ {
// b.RunParallel(func(pb *testing.PB) {
// for pb.Next() {
// bufferPoolFunc()
// }
// })
// }
// //nazalog.Debugf("%+v", bp.RetrieveStatus())
//}
func init() {
rand.Seed(time.Now().Unix())
}

@ -1,33 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package bufferpool
import "bytes"
var defaultPool BufferPool
func Get(size int) *bytes.Buffer {
return defaultPool.Get(size)
}
func Put(buf *bytes.Buffer) {
defaultPool.Put(buf)
}
func RetrieveStatus() Status {
return defaultPool.RetrieveStatus()
}
func Init(strategy Strategy) {
defaultPool = NewBufferPool(strategy)
}
func init() {
Init(StrategyMultiStdPoolBucket)
}

@ -1,82 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package bufferpool
import "bytes"
type BufferPool interface {
// 获取一个预估容量为<size>的Buffer对象不同的策略返回的Buffer对象实际容量可能会有不同但是不会返回nil
Get(size int) *bytes.Buffer
// 将Buffer对象放回池中
Put(buf *bytes.Buffer)
// 获取池当前状态
RetrieveStatus() Status
}
type Status struct {
getCount int64 // 调用Get方法的次数
putCount int64 // 调用Put方法的次数
hitCount int64 // 调用Get方法时池内存在满足条件的空闲Buffer这种情况的计数
sizeBytes int64 // 池内所有空闲Buffer占用的内存大小单位字节
}
type Strategy int
const (
// 直接使用一个标准库中的sync.Pool
StrategySingleStdPoolBucket Strategy = iota + 1
// 直接使用一个切片存储所有的Buffer对象
StrategySingleSlicePoolBucket
// 按Buffer对象的容量哈希到不同的桶中每个桶是一个sync.Pool
StrategyMultiStdPoolBucket
// 按Buffer对象的容量哈希到不同的桶中每个桶是一个切片
StrategyMultiSlicePoolBucket
)
type Bucket interface {
// 桶内没有Buffer对象时返回nil
Get() *bytes.Buffer
Put(buf *bytes.Buffer)
}
func NewBufferPool(strategy Strategy) BufferPool {
var (
singleBucket Bucket
capToFreeBucket map[int]Bucket
)
switch strategy {
case StrategySingleStdPoolBucket:
singleBucket = NewStdPoolBucket()
case StrategySingleSlicePoolBucket:
singleBucket = NewSliceBucket()
case StrategyMultiStdPoolBucket:
capToFreeBucket = make(map[int]Bucket)
for i := minSize; i <= maxSize; i <<= 1 {
capToFreeBucket[i] = NewStdPoolBucket()
}
case StrategyMultiSlicePoolBucket:
capToFreeBucket = make(map[int]Bucket)
for i := minSize; i <= maxSize; i <<= 1 {
capToFreeBucket[i] = NewSliceBucket()
}
}
return &bufferPool{
strategy: strategy,
singleBucket: singleBucket,
capToFreeBucket: capToFreeBucket,
}
}

@ -1,43 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package bufferpool
import (
"bytes"
"github.com/q191201771/naza/pkg/nazaatomic"
)
type SharedBuffer struct {
*bytes.Buffer
pool BufferPool
count *nazaatomic.Uint32
}
func NewSharedBufferDefault(size int) *SharedBuffer {
return NewSharedBuffer(defaultPool, size)
}
func NewSharedBuffer(pool BufferPool, size int) *SharedBuffer {
return &SharedBuffer{
Buffer: pool.Get(size),
count: new(nazaatomic.Uint32),
}
}
func (sb *SharedBuffer) Ref() *SharedBuffer {
sb.count.Increment()
return sb
}
func (sb *SharedBuffer) ReleaseIfNeeded() {
if sb.count.Decrement() == 0 {
sb.pool.Put(sb.Buffer)
}
}

@ -1,41 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package bufferpool
import (
"bytes"
"sync"
)
type SliceBucket struct {
m sync.Mutex
core []*bytes.Buffer
}
func NewSliceBucket() *SliceBucket {
return new(SliceBucket)
}
func (b *SliceBucket) Get() *bytes.Buffer {
b.m.Lock()
defer b.m.Unlock()
if len(b.core) == 0 {
return nil
}
buf := b.core[len(b.core)-1]
b.core = b.core[:len(b.core)-1]
buf.Reset()
return buf
}
func (b *SliceBucket) Put(buf *bytes.Buffer) {
b.m.Lock()
defer b.m.Unlock()
b.core = append(b.core, buf)
}

@ -1,38 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package bufferpool
import (
"bytes"
"sync"
)
type StdPoolBucket struct {
core *sync.Pool
}
func NewStdPoolBucket() *StdPoolBucket {
return &StdPoolBucket{
core: new(sync.Pool),
}
}
func (b *StdPoolBucket) Get() *bytes.Buffer {
v := b.core.Get()
if v == nil {
return nil
}
vv := v.(*bytes.Buffer)
vv.Reset()
return vv
}
func (b *StdPoolBucket) Put(buf *bytes.Buffer) {
b.core.Put(buf)
}

@ -9,6 +9,7 @@
package slicebytepool
type SliceBytePool interface {
// 功能类似于 make([]byte, <size>)
Get(size int) []byte
Put(buf []byte)
@ -26,12 +27,15 @@ type Status struct {
type Strategy int
const (
// 底层桶使用sync.Pool内部的[]byte由sync.Pool决定何时释放
StrategyMultiStdPoolBucket = iota + 1
// 底层桶使用切片,内部的[]byte永远不会释放
StrategyMultiSlicePoolBucket
)
type Bucket interface {
// 桶内无满足条件的[]byte时返回nil
Get(size int) []byte
Put(buf []byte)

@ -6,52 +6,54 @@
//
// Author: Chef (191201771@qq.com)
package bufferpool
package slicebytepool
import (
"bytes"
"testing"
"github.com/q191201771/naza/pkg/assert"
)
func TestBufferPool(t *testing.T) {
// TODO chef: assert result
// benchmark 参见 naza/demo/slicebytepool
strategyList := []Strategy{
StrategySingleStdPoolBucket,
StrategySingleSlicePoolBucket,
StrategyMultiStdPoolBucket,
StrategyMultiSlicePoolBucket,
}
for _, s := range strategyList {
bp := NewBufferPool(s)
buf := &bytes.Buffer{}
bp.Get(128)
bp.Put(buf)
buf = bp.Get(128)
buf.Grow(4096)
bp.Put(buf)
buf = bp.Get(4096)
bp.Put(buf)
bp.RetrieveStatus()
func TestDefault(t *testing.T) {
Init(StrategyMultiSlicePoolBucket)
buf := Get(1000)
assert.Equal(t, 1000, len(buf))
Put(buf)
status := RetrieveStatus()
e := Status{
getCount: 1,
putCount: 1,
hitCount: 0,
sizeBytes: 1024,
}
assert.Equal(t, e, status)
}
func TestGlobal(t *testing.T) {
buf := Get(128)
Put(buf)
RetrieveStatus()
func TestMultiSlicePool(t *testing.T) {
p := NewSliceBytePool(StrategyMultiSlicePoolBucket)
assert.IsNotNil(t, p)
buf := p.Get(1)
assert.Equal(t, 1, len(buf))
buf = make([]byte, 1)
p.Put(buf)
buf = p.Get(1)
assert.Equal(t, 1, len(buf))
}
func TestSliceBucket(t *testing.T) {
sb := NewSliceBucket()
buf := sb.Get()
assert.Equal(t, nil, buf)
sb.Put(&bytes.Buffer{})
buf = sb.Get()
assert.IsNotNil(t, buf)
func TestMultiStdPool(t *testing.T) {
p := NewSliceBytePool(StrategyMultiStdPoolBucket)
assert.IsNotNil(t, p)
buf := p.Get(1000)
assert.Equal(t, 1000, len(buf))
p.Put(buf)
buf = p.Get(1000)
assert.Equal(t, 1000, len(buf))
for i := 0; i < 1000; i++ {
buf = p.Get(1000)
p.Put(buf)
}
}
func TestUp2power(t *testing.T) {
Loading…
Cancel
Save