From 0f3da700ec8cb90068dda7a8f091aefd9076be07 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Tue, 24 Sep 2019 14:05:39 +0800 Subject: [PATCH] =?UTF-8?q?package=20connection:=201.=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=20wChanSize=20=E9=85=8D=E7=BD=AE=EF=BC=8C=E7=94=A8=E4=BA=8E?= =?UTF-8?q?=E6=8C=87=E5=AE=9A=E6=98=AF=E5=90=A6=E4=BD=BF=E7=94=A8=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E5=8F=91=E9=80=81=202.=20=E6=B7=BB=E5=8A=A0=20Flush?= =?UTF-8?q?=20=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/assert/assert.go | 2 +- pkg/connection/connection.go | 98 ++++++++++++++++++++++++++++++++++-- pkg/log/log.go | 2 +- 3 files changed, 97 insertions(+), 5 deletions(-) diff --git a/pkg/assert/assert.go b/pkg/assert/assert.go index 6d6f693..bf03e44 100644 --- a/pkg/assert/assert.go +++ b/pkg/assert/assert.go @@ -1,4 +1,4 @@ -// Package assert 提供了单元测试时的断言功能,减少一些模板代码 +// package assert 提供了单元测试时的断言功能,减少一些模板代码 // // 代码参考了 https://github.com/stretchr/testify // diff --git a/pkg/connection/connection.go b/pkg/connection/connection.go index bf83975..30c5b9c 100644 --- a/pkg/connection/connection.go +++ b/pkg/connection/connection.go @@ -18,26 +18,57 @@ var connectionErr = errors.New("connection: fxxk") type Connection interface { // 包含 interface net.Conn 的所有方法 + // Read + // Write + // Close + // LocalAddr + // RemoteAddr + // SetDeadline + // SetReadDeadline + // SetWriteDeadline net.Conn ReadAtLeast(buf []byte, min int) (n int, err error) ReadLine() (line []byte, isPrefix bool, err error) + // TODO chef: 这个接口是否不提供 Printf(fmt string, v ...interface{}) (n int, err error) + // 如果使用了 bufio 写缓冲,则将缓冲中的数据发送出去 + // 如果使用了 channel 异步发送,则阻塞等待,直到之前 channel 中的数据全部发送完毕 + Flush() error + + // TODO chef: 这几个接口是否不提供 ModWriteBufSize(n int) ModReadTimeoutMS(n int) ModWriteTimeoutMS(n int) } type Config struct { - // 如果不为0,则之后每次读/写使用 buffer 缓冲 + // 如果不为0,则之后每次读/写使用 bufio 的缓冲 ReadBufSize int WriteBufSize int // 如果不为0,则之后每次读/写都带超时 ReadTimeoutMS int WriteTimeoutMS int + + // 如果不过0,则写使用 channel 将数据发送到后台协程中发送 + WChanSize int +} + +type wMsgT int + +const ( + _ wMsgT = iota + wMsgTWrite + wMsgTFlush + wMsgTClose +) + +type wmsg struct { + t wMsgT + b []byte } func New(conn net.Conn, config Config) Connection { @@ -53,6 +84,11 @@ func New(conn net.Conn, config Config) Connection { } else { c.w = conn } + if config.WChanSize > 0 { + c.wChan = make(chan wmsg, config.WChanSize) + c.flushDoneChan = make(chan struct{}, 1) + go c.runWriteLoop() + } c.config = config return &c } @@ -61,6 +97,8 @@ type connection struct { Conn net.Conn r io.Reader w io.Writer + wChan chan wmsg + flushDoneChan chan struct{} config Config } @@ -93,6 +131,7 @@ func (c *connection) ModWriteTimeoutMS(n int) { func (c *connection) ReadAtLeast(buf []byte, min int) (n int, err error) { if c.config.ReadTimeoutMS > 0 { + // TODO chef: 超时的错误返回 _ = c.Conn.SetReadDeadline(time.Now().Add(time.Duration(c.config.ReadTimeoutMS) * time.Millisecond)) } return io.ReadAtLeast(c.r, buf, min) @@ -125,17 +164,69 @@ func (c *connection) Read(b []byte) (n int, err error) { } func (c *connection) Write(b []byte) (n int, err error) { + if c.config.WChanSize > 0 { + c.wChan <- wmsg{} + return len(b), nil + } + return c.write(b) +} + +func (c *connection) write(b []byte) (n int, err error) { if c.config.WriteTimeoutMS > 0 { _ = c.Conn.SetWriteDeadline(time.Now().Add(time.Duration(c.config.WriteTimeoutMS) * time.Millisecond)) } return c.w.Write(b) } -func (c *connection) Close() error { +func (c *connection) runWriteLoop() { + for { + msg, ok := <- c.wChan + if !ok { + return + } + switch msg.t { + case wMsgTWrite: + if _, err := c.write(msg.b); err != nil { + _ = c.Close() + } + case wMsgTFlush: + c.flush() + c.flushDoneChan <- struct{}{} + case wMsgTClose: + // TODO chef: 是否需要 + } + } +} + +func (c *connection) Flush() error { + if c.config.WChanSize > 0 { + c.wChan <- wmsg{t:wMsgTFlush} + <- c.flushDoneChan + return nil + } + + return c.flush() +} + +func (c *connection) flush() error { w, ok := c.w.(*bufio.Writer) if ok { - w.Flush() + if c.config.WriteTimeoutMS > 0 { + _ = c.Conn.SetWriteDeadline(time.Now().Add(time.Duration(c.config.WriteTimeoutMS) * time.Millisecond)) + } + if err := w.Flush(); err != nil { + _ = c.Close() + } } + return nil +} + +// 调用方需保证不和 Write 接口并发调用 +func (c *connection) Close() error { + if c.config.WChanSize > 0 { + close(c.wChan) + } + return c.Conn.Close() } @@ -154,6 +245,7 @@ func (c *connection) SetDeadline(t time.Time) error { func (c *connection) SetReadDeadline(t time.Time) error { return c.Conn.SetReadDeadline(t) } + func (c *connection) SetWriteDeadline(t time.Time) error { return c.Conn.SetWriteDeadline(t) } diff --git a/pkg/log/log.go b/pkg/log/log.go index 4aff64f..f2d59fa 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -57,7 +57,7 @@ type Config struct { type Level uint8 const ( - _ = iota + _ Level = iota LevelDebug LevelInfo LevelWarn