[feat] package ratelimit: 新增漏桶LeakyBucket和令牌桶TokenBucket,把以前的RateLimit删了

pull/2/head
q191201771 5 years ago
parent 0012c33084
commit 637a4d066a

2
.gitignore vendored

@ -14,5 +14,7 @@ profile*.pdf
/pkg/tag
/pkg/nazatime
/demo/samefile
/demo/time
/demo/temp

@ -25,7 +25,7 @@ var licenseTmpl = `
> ****
![fccxy](https://pengrl.com/images/fccxy_qccode_and_sys.png)`
![fccxy](https://pengrl.com/images/fccxy_qccode_and_sys.jpg)`
func main() {
dir := parseFlag()

@ -0,0 +1,124 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package ratelimit_test
import (
"testing"
"time"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/ratelimit"
)
var runExample = true
func TestLeakyBucket(t *testing.T) {
if !runExample {
return
}
lb := ratelimit.NewLeakyBucket(100)
for i := 0; i < 16; i++ {
go func(j int) {
for k := 0; k < 16; k++ {
err := lb.TryAquire()
if err == nil {
nazalog.Debugf("TryAquire succ. goroutine=%d, index=%d", j, k)
} else {
//nazalog.Debugf("TryAquire fail. goroutine=%d, index=%d, err=%v", j, k, err)
time.Sleep(time.Duration(j*k) * time.Millisecond)
}
//time.Sleep(time.Duration(j * k) * time.Millisecond)
}
}(i)
}
time.Sleep(2 * time.Second)
//2020/02/25 23:39:13.896802 DEBUG TryAquire succ. goroutine=10, index=5 - example_test.go:23
//2020/02/25 23:39:13.999801 DEBUG TryAquire succ. goroutine=7, index=8 - example_test.go:23
//2020/02/25 23:39:14.113313 DEBUG TryAquire succ. goroutine=11, index=8 - example_test.go:23
//2020/02/25 23:39:14.216280 DEBUG TryAquire succ. goroutine=11, index=10 - example_test.go:23
//2020/02/25 23:39:14.334289 DEBUG TryAquire succ. goroutine=8, index=12 - example_test.go:23
//2020/02/25 23:39:14.439413 DEBUG TryAquire succ. goroutine=8, index=14 - example_test.go:23
//2020/02/25 23:39:14.574673 DEBUG TryAquire succ. goroutine=14, index=11 - example_test.go:23
//2020/02/25 23:39:14.742899 DEBUG TryAquire succ. goroutine=14, index=13 - example_test.go:23
//2020/02/25 23:39:14.914085 DEBUG TryAquire succ. goroutine=12, index=14 - example_test.go:23
//2020/02/25 23:39:15.172121 DEBUG TryAquire succ. goroutine=15, index=14 - example_test.go:23
lb2 := ratelimit.NewLeakyBucket(100)
for i := 0; i < 4; i++ {
go func(j int) {
for k := 0; k < 4; k++ {
lb2.WaitUntilAquire()
nazalog.Debugf("< lb.WaitUntilAquire. goroutine=%d, index=%d", j, k)
}
}(i)
}
time.Sleep(2 * time.Second)
//2020/02/25 23:40:11.275685 DEBUG < lb.WaitUntilAquire. goroutine=3, index=0 - example_test.go:49
//2020/02/25 23:40:11.374789 DEBUG < lb.WaitUntilAquire. goroutine=0, index=0 - example_test.go:49
//2020/02/25 23:40:11.473445 DEBUG < lb.WaitUntilAquire. goroutine=1, index=0 - example_test.go:49
//2020/02/25 23:40:11.571714 DEBUG < lb.WaitUntilAquire. goroutine=2, index=0 - example_test.go:49
//2020/02/25 23:40:11.670913 DEBUG < lb.WaitUntilAquire. goroutine=3, index=1 - example_test.go:49
//2020/02/25 23:40:11.772003 DEBUG < lb.WaitUntilAquire. goroutine=0, index=1 - example_test.go:49
//2020/02/25 23:40:11.871239 DEBUG < lb.WaitUntilAquire. goroutine=1, index=1 - example_test.go:49
//2020/02/25 23:40:11.973307 DEBUG < lb.WaitUntilAquire. goroutine=2, index=1 - example_test.go:49
//2020/02/25 23:40:12.075015 DEBUG < lb.WaitUntilAquire. goroutine=3, index=2 - example_test.go:49
//2020/02/25 23:40:12.173357 DEBUG < lb.WaitUntilAquire. goroutine=0, index=2 - example_test.go:49
//2020/02/25 23:40:12.270387 DEBUG < lb.WaitUntilAquire. goroutine=1, index=2 - example_test.go:49
//2020/02/25 23:40:12.370509 DEBUG < lb.WaitUntilAquire. goroutine=2, index=2 - example_test.go:49
//2020/02/25 23:40:12.475001 DEBUG < lb.WaitUntilAquire. goroutine=3, index=3 - example_test.go:49
//2020/02/25 23:40:12.571062 DEBUG < lb.WaitUntilAquire. goroutine=0, index=3 - example_test.go:49
//2020/02/25 18:40:12.672385 DEBUG < lb.WaitUntilAquire. goroutine=1, index=3 - example_test.go:49
//2020/02/25 18:40:12.770939 DEBUG < lb.WaitUntilAquire. goroutine=2, index=3 - example_test.go:49
}
func TestTokenBucket(t *testing.T) {
if !runExample {
return
}
tb := ratelimit.NewTokenBucket(100, 1000, 10)
for i := 0; i < 4; i++ {
go func(j int) {
for k := 0; k < 4; k++ {
tb.WaitUntilAquire()
nazalog.Debugf("< tb.WaitUntilAquire. goroutine=%d, index=%d", j, k)
time.Sleep(100 * time.Millisecond)
}
}(i)
}
time.Sleep(2 * time.Second)
//2020/02/25 19:02:33.453207 DEBUG < tb.WaitUntilAquire. goroutine=2, index=0 - example_test.go:82
//2020/02/25 19:02:33.453302 DEBUG < tb.WaitUntilAquire. goroutine=1, index=0 - example_test.go:82
//2020/02/25 19:02:33.453414 DEBUG < tb.WaitUntilAquire. goroutine=0, index=0 - example_test.go:82
//2020/02/25 19:02:33.453535 DEBUG < tb.WaitUntilAquire. goroutine=3, index=0 - example_test.go:82
//2020/02/25 19:02:33.557602 DEBUG < tb.WaitUntilAquire. goroutine=2, index=1 - example_test.go:82
//2020/02/25 19:02:33.557881 DEBUG < tb.WaitUntilAquire. goroutine=0, index=1 - example_test.go:82
//2020/02/25 19:02:33.557968 DEBUG < tb.WaitUntilAquire. goroutine=1, index=1 - example_test.go:82
//2020/02/25 19:02:33.558043 DEBUG < tb.WaitUntilAquire. goroutine=3, index=1 - example_test.go:82
//2020/02/25 19:02:33.661411 DEBUG < tb.WaitUntilAquire. goroutine=0, index=2 - example_test.go:82
//2020/02/25 19:02:33.661495 DEBUG < tb.WaitUntilAquire. goroutine=2, index=2 - example_test.go:82
//2020/02/25 19:02:34.451624 DEBUG < tb.WaitUntilAquire. goroutine=2, index=3 - example_test.go:82
//2020/02/25 19:02:34.451715 DEBUG < tb.WaitUntilAquire. goroutine=0, index=3 - example_test.go:82
//2020/02/25 19:02:34.451787 DEBUG < tb.WaitUntilAquire. goroutine=3, index=2 - example_test.go:82
//2020/02/25 19:02:34.451841 DEBUG < tb.WaitUntilAquire. goroutine=1, index=2 - example_test.go:82
//2020/02/25 19:02:34.551910 DEBUG < tb.WaitUntilAquire. goroutine=3, index=3 - example_test.go:82
//2020/02/25 19:02:34.556604 DEBUG < tb.WaitUntilAquire. goroutine=1, index=3 - example_test.go:82
}
func TestRateLimiter(t *testing.T) {
tb := ratelimit.NewTokenBucket(1, 1, 1)
lb := ratelimit.NewLeakyBucket(1)
var rl ratelimit.RateLimiter
rl = tb
rl.WaitUntilAquire()
rl = lb
rl.TryAquire()
}

@ -0,0 +1,74 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package ratelimit
import (
"errors"
"sync"
"time"
)
// TODO chef: 漏桶可以考虑增加一个接口,获取当前已排队的任务数或者说有可能获取到资源的时间点
var ErrResourceNotAvailable = errors.New("naza.ratelimit: resource not available")
// 漏桶
type LeakyBucket struct {
intervalMSec int
mu sync.Mutex
lastTick int
}
// @param intervalMSec 多长时间以上,允许获取到一个资源,单位毫秒
func NewLeakyBucket(intervalMSec int) *LeakyBucket {
return &LeakyBucket{
intervalMSec: intervalMSec,
// 注意,第一次获取资源,需要与创建对象时的时间点做比较
lastTick: int(time.Now().UnixNano() / 1e6),
}
}
// 尝试获取资源获取成功返回nil获取失败返回ErrResourceNotAvailable
// 如果获取失败,上层可自由选择多久后重试或丢弃本次任务
func (lb *LeakyBucket) TryAquire() error {
lb.mu.Lock()
defer lb.mu.Unlock()
nowMSec := int(time.Now().UnixNano() / 1e6)
// 距离上次获取成功时间超过了间隔阈值,返回成功
if nowMSec-lb.lastTick > lb.intervalMSec {
lb.lastTick = nowMSec
return nil
}
return ErrResourceNotAvailable
}
// 阻塞直到获取到资源
func (lb *LeakyBucket) WaitUntilAquire() {
lb.mu.Lock()
nowMSec := int(time.Now().UnixNano() / 1e6)
diff := nowMSec - lb.lastTick
if diff > lb.intervalMSec {
lb.lastTick = nowMSec
lb.mu.Unlock()
return
}
// 没有达到间隔我们更新lastTick再出锁使得其他想获取资源的协程以新的lastTick作为判断条件
lb.lastTick += lb.intervalMSec
lb.mu.Unlock()
// 我们不需要等整个interval间隔因为可能已经过去了一段时间了
// 注意diff是根据更新前的lastTick计算得到的
time.Sleep(time.Duration(lb.intervalMSec-diff) * time.Millisecond)
return
}

@ -0,0 +1,55 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package ratelimit_test
import (
"testing"
"time"
"github.com/q191201771/naza/pkg/assert"
"github.com/q191201771/naza/pkg/ratelimit"
)
func TestNewLeakyBucket(t *testing.T) {
ratelimit.NewLeakyBucket(10)
}
func TestLeakyBucket_TryAquire(t *testing.T) {
var (
lb *ratelimit.LeakyBucket
err error
)
lb = ratelimit.NewLeakyBucket(1)
time.Sleep(10 * time.Millisecond)
err = lb.TryAquire()
assert.Equal(t, nil, err)
time.Sleep(10 * time.Millisecond)
err = lb.TryAquire()
assert.Equal(t, nil, err)
lb = ratelimit.NewLeakyBucket(100)
err = lb.TryAquire()
assert.Equal(t, ratelimit.ErrResourceNotAvailable, err)
err = lb.TryAquire()
assert.Equal(t, ratelimit.ErrResourceNotAvailable, err)
}
func TestLeakyBucket_WaitUntilAquire(t *testing.T) {
var lb *ratelimit.LeakyBucket
lb = ratelimit.NewLeakyBucket(1)
lb.WaitUntilAquire()
time.Sleep(100 * time.Millisecond)
lb.WaitUntilAquire()
lb = ratelimit.NewLeakyBucket(200)
lb.WaitUntilAquire()
lb.WaitUntilAquire()
}

@ -1,4 +1,4 @@
// Copyright 2019, Chef. All rights reserved.
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
@ -8,63 +8,7 @@
package ratelimit
import (
"sync"
"time"
)
type RateLimit struct {
num int
option Option
neededWait time.Duration
mu sync.Mutex
last time.Time
}
type Option struct {
Duration time.Duration
}
var defaultOption = Option{
Duration: 1 * time.Second,
}
type ModOption func(option *Option)
func New(num int, modOptions ...ModOption) *RateLimit {
option := defaultOption
for _, fn := range modOptions {
fn(&option)
}
return &RateLimit{
num: num,
option: option,
neededWait: option.Duration / time.Duration(num),
}
}
func (rl *RateLimit) Wait() {
rl.mu.Lock()
now := time.Now()
if rl.last.IsZero() {
rl.last = now
rl.mu.Unlock()
return
}
diff := now.Sub(rl.last)
if diff > rl.neededWait {
rl.last = now
rl.mu.Unlock()
return
}
rl.last = rl.last.Add(rl.neededWait)
rl.mu.Unlock()
t := time.NewTimer(rl.neededWait - diff)
<-t.C
t.Stop()
return
type RateLimiter interface {
TryAquire() error
WaitUntilAquire()
}

@ -1,100 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package ratelimit_test
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/ratelimit"
)
var (
duration = 100 * time.Millisecond
num = 50
)
func TestNew(t *testing.T) {
rl := ratelimit.New(num)
rl = ratelimit.New(num, func(option *ratelimit.Option) {
option.Duration = duration
})
rl.Wait()
}
func TestRateLimit_Wait(t *testing.T) {
rl := ratelimit.New(num, func(option *ratelimit.Option) {
option.Duration = duration
})
b := time.Now()
for i := 0; i < num; i++ {
rl.Wait()
}
nazalog.Debugf("cost:%v", time.Now().Sub(b))
}
func TestRateLimit_Wait2(t *testing.T) {
rl := ratelimit.New(num, func(option *ratelimit.Option) {
option.Duration = duration
})
b := time.Now()
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func(ii int) {
rl.Wait()
wg.Done()
}(i)
}
wg.Wait()
nazalog.Debugf("cost:%v", time.Now().Sub(b))
}
func TestRateLimit_Wait3(t *testing.T) {
rand.Seed(time.Now().Unix())
rl := ratelimit.New(num, func(option *ratelimit.Option) {
option.Duration = duration
})
b := time.Now()
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func(ii int) {
time.Sleep(time.Duration(rand.Int63n(int64(duration) / 2)))
rl.Wait()
wg.Done()
}(i)
}
wg.Wait()
nazalog.Debugf("cost:%v", time.Now().Sub(b))
}
func TestRateLimit_Wait4(t *testing.T) {
rl := ratelimit.New(num, func(option *ratelimit.Option) {
option.Duration = duration
})
b := time.Now()
for i := 0; i < num; i++ {
time.Sleep(time.Duration(int64(duration) * 2 / int64(num)))
rl.Wait()
}
nazalog.Debugf("cost:%v", time.Now().Sub(b))
}
func BenchmarkRateLimit_Wait(b *testing.B) {
for i := 0; i < b.N; i++ {
rl := ratelimit.New(num)
for i := 0; i < num; i++ {
rl.Wait()
}
}
}

@ -0,0 +1,123 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package ratelimit
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
)
var ErrTokenNotEnough = errors.New("naza.ratelimit: token not enough")
// 令牌桶
type TokenBucket struct {
capacity int
prodTokenInterval time.Duration
prodTokenNumEveryInterval int
disposeFlag int32
mu sync.Mutex
available int
cond *sync.Cond
}
// @param capacity: 桶容量大小
// @param prodTokenIntervalMSec: 生产令牌的时间间隔,单位毫秒
// @param prodTokenNumEveryInterval: 每次生产多少个令牌
func NewTokenBucket(capacity int, prodTokenIntervalMSec int, prodTokenNumEveryInterval int) *TokenBucket {
tb := &TokenBucket{
capacity: capacity,
prodTokenInterval: time.Duration(time.Duration(prodTokenIntervalMSec) * time.Millisecond),
prodTokenNumEveryInterval: prodTokenNumEveryInterval,
}
tb.cond = sync.NewCond(&tb.mu)
tb.asyncProdToken()
return tb
}
func (tb *TokenBucket) TryAquire() error {
return tb.TryAquireWithNum(1)
}
func (tb *TokenBucket) WaitUntilAquire() {
tb.WaitUntilAquireWithNum(1)
}
// 尝试获取相应数量的令牌获取成功返回nil获取失败返回ErrTokenNotEnough
// 如果获取失败,上层可自由选择多久后重试或丢弃本次任务
func (tb *TokenBucket) TryAquireWithNum(num int) error {
tb.checkAquireNum(num)
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.available >= num {
tb.available -= num
return nil
}
return ErrTokenNotEnough
}
// 阻塞直到获取到相应数量的令牌
func (tb *TokenBucket) WaitUntilAquireWithNum(num int) {
tb.checkAquireNum(num)
for {
tb.mu.Lock()
if tb.available >= num {
tb.available -= num
tb.mu.Unlock()
return
}
// 等待下次令牌生产时被唤醒
// wait的内部会将自身添加到事件监听队列中然后释放锁当接收到事件时内部会重新获取锁然后返回
tb.cond.Wait()
tb.mu.Unlock()
}
}
// 销毁令牌桶
func (tb *TokenBucket) Dispose() {
atomic.StoreInt32(&tb.disposeFlag, 1)
}
func (tb *TokenBucket) asyncProdToken() {
go func() {
t := time.NewTicker(tb.prodTokenInterval)
defer t.Stop()
for {
if atomic.LoadInt32(&tb.disposeFlag) == 1 {
break
}
select {
case <-t.C:
tb.mu.Lock()
tb.available += tb.prodTokenNumEveryInterval
if tb.available > tb.capacity {
tb.available = tb.capacity
}
// It is allowed but not required for the caller to hold c.L
// during the call.
tb.cond.Broadcast()
tb.mu.Unlock()
}
}
}()
}
func (tb *TokenBucket) checkAquireNum(num int) {
if num > tb.capacity {
panic(fmt.Sprintf("aquire num should not bigger than capacity. num=%d, capacity=%d", num, tb.capacity))
}
}

@ -0,0 +1,72 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package ratelimit_test
import (
"testing"
"time"
"github.com/q191201771/naza/pkg/assert"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/ratelimit"
)
func TestNewTokenBucket(t *testing.T) {
ratelimit.NewTokenBucket(2000, 1000, 1000)
}
func TestTokenBucket_TryAquire(t *testing.T) {
var (
tb *ratelimit.TokenBucket
err error
)
tb = ratelimit.NewTokenBucket(2000, 1000, 1000)
err = tb.TryAquire()
assert.Equal(t, ratelimit.ErrTokenNotEnough, err)
err = tb.TryAquire()
assert.Equal(t, ratelimit.ErrTokenNotEnough, err)
tb = ratelimit.NewTokenBucket(2000, 1, 1000)
time.Sleep(10 * time.Millisecond)
err = tb.TryAquire()
assert.Equal(t, nil, err)
err = tb.TryAquire()
assert.Equal(t, nil, err)
}
func TestTokenBucket_WaitUntilAquire(t *testing.T) {
var tb *ratelimit.TokenBucket
tb = ratelimit.NewTokenBucket(2000, 1000, 1000)
tb.WaitUntilAquire()
tb.WaitUntilAquire()
}
func TestTokenBucket_Dispose(t *testing.T) {
var (
tb *ratelimit.TokenBucket
err error
)
tb = ratelimit.NewTokenBucket(2000, 1, 1000)
time.Sleep(10 * time.Millisecond)
err = tb.TryAquireWithNum(1)
assert.Equal(t, nil, err)
tb.WaitUntilAquireWithNum(1)
tb.Dispose()
}
func TestTokenBucket_panic(t *testing.T) {
defer func() {
nazalog.Debug(recover())
}()
tb := ratelimit.NewTokenBucket(1, 1, 1)
tb.TryAquireWithNum(100)
}
Loading…
Cancel
Save