From 6904ae65a5757929388df3ea280d180eb4eeae7d Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 14 Nov 2020 14:11:52 +0800 Subject: [PATCH] =?UTF-8?q?[feat]=20=E6=96=B0=E5=A2=9Epackage=20defertaskt?= =?UTF-8?q?hread=EF=BC=8C=E7=94=A8=E4=BA=8E=E6=89=A7=E8=A1=8C=E5=BB=B6?= =?UTF-8?q?=E6=97=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/connection/connection.go | 3 +++ pkg/defertaskthread/defertaskthread.go | 21 +++++++++++++++ pkg/defertaskthread/defertaskthread_test.go | 29 +++++++++++++++++++++ pkg/defertaskthread/global.go | 19 ++++++++++++++ pkg/defertaskthread/interface.go | 21 +++++++++++++++ 5 files changed, 93 insertions(+) create mode 100644 pkg/defertaskthread/defertaskthread.go create mode 100644 pkg/defertaskthread/defertaskthread_test.go create mode 100644 pkg/defertaskthread/global.go create mode 100644 pkg/defertaskthread/interface.go diff --git a/pkg/connection/connection.go b/pkg/connection/connection.go index 0ff5537..ca00d23 100644 --- a/pkg/connection/connection.go +++ b/pkg/connection/connection.go @@ -64,6 +64,9 @@ type Connection interface { ModReadTimeoutMS(n int) ModWriteTimeoutMS(n int) + // 连接上读取和发送的字节总数。 + // 注意,如果是异步发送,发送字节统计的是调用底层write的值,而非上层调用Connection发送的值 + // 也即不包含Connection中的发送缓存部分,但是可能包含内核socket发送缓冲区的值。 GetStat() Stat } diff --git a/pkg/defertaskthread/defertaskthread.go b/pkg/defertaskthread/defertaskthread.go new file mode 100644 index 0000000..db0b5ee --- /dev/null +++ b/pkg/defertaskthread/defertaskthread.go @@ -0,0 +1,21 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/naza +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package defertaskthread + +import "time" + +type deferTaskThread struct { +} + +func (d *deferTaskThread) Go(deferMS int, task TaskFn, param ...interface{}) { + go func() { + time.Sleep(time.Duration(deferMS) * time.Millisecond) + task(param...) + }() +} diff --git a/pkg/defertaskthread/defertaskthread_test.go b/pkg/defertaskthread/defertaskthread_test.go new file mode 100644 index 0000000..8c16618 --- /dev/null +++ b/pkg/defertaskthread/defertaskthread_test.go @@ -0,0 +1,29 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/naza +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package defertaskthread_test + +import ( + "testing" + "time" + + "github.com/q191201771/naza/pkg/nazalog" + + "github.com/q191201771/naza/pkg/defertaskthread" +) + +func TestDeferTaskThread(t *testing.T) { + d := defertaskthread.NewDeferTaskThread() + for i := 0; i < 300; i += 50 { + d.Go(i, func(param ...interface{}) { + ii := param[0].(int) + nazalog.Debugf("running %d", ii) + }, i) + } + time.Sleep(300 * time.Millisecond) +} diff --git a/pkg/defertaskthread/global.go b/pkg/defertaskthread/global.go new file mode 100644 index 0000000..52c27de --- /dev/null +++ b/pkg/defertaskthread/global.go @@ -0,0 +1,19 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/naza +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package defertaskthread + +var thread DeferTaskThread + +func Go(deferMS int, task TaskFn, param ...interface{}) { + thread.Go(deferMS, task, param...) +} + +func init() { + thread = NewDeferTaskThread() +} diff --git a/pkg/defertaskthread/interface.go b/pkg/defertaskthread/interface.go new file mode 100644 index 0000000..31f9af4 --- /dev/null +++ b/pkg/defertaskthread/interface.go @@ -0,0 +1,21 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/naza +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package defertaskthread + +type TaskFn func(param ...interface{}) + +type DeferTaskThread interface { + // 注意,一个thread的多个task,本应该是串行执行的语义, + // 目前为了简单,让它们并行执行了,以后可能会发生变化 + Go(deferMS int, task TaskFn, param ...interface{}) +} + +func NewDeferTaskThread() DeferTaskThread { + return &deferTaskThread{} +}