// Copyright 2019, Chef. All rights reserved. // https://github.com/q191201771/naza // // Use of this source code is governed by a MIT-style license // that can be found in the License file. // // Author: Chef (191201771@qq.com) // Package connection // // 对 net.Conn 接口的二次封装,目的有两个: // 1. 在流媒体传输这种特定的长连接场景下提供更方便、高性能的接口 // 2. 便于后续将TCPConn替换成其他传输协议 package connection import ( "bufio" "errors" "io" "net" "sync" "time" "github.com/q191201771/naza/pkg/unique" "github.com/q191201771/naza/pkg/nazaatomic" "github.com/q191201771/naza/pkg/nazalog" ) var ( ErrConnectionPanic = errors.New("naza.connection: using in a wrong way") ErrClosedAlready = errors.New("naza.connection: connection closed already") ErrWriteChanFull = errors.New("naza.connection: write channel full") ) type Connection interface { // ----- net.Conn interface ---------------------------------------------------------------------------------------- // // 注意,如果没有特别说明,函数的语义和 net.Conn 相同 // // Read ... Read(b []byte) (n int, err error) // Write // // @return n 发送成功的大小 // 注意,如果设置了 Option.WriteChanSize 做异步发送,那么`n`恒等于len(`b`) // Write(b []byte) (n int, err error) // Close 允许调用多次 // Close() error LocalAddr() net.Addr RemoteAddr() net.Addr SetDeadline(t time.Time) error SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error // ----------------------------------------------------------------------------------------------------------------- // Writev 发送多块不连续的内存块时使用 // // 当有多块不连续的内存块需要发送时,调用 Writev 在某些平台性能会优于以下做法: // 1. 多次调用Write // 2. 将多块内存块拷贝拼接成一块内存块后调用Write // 原因是减少了系统调用以及内存拷贝(还有可能有内存管理)的开销 // // 注意,如果需要发送的是一块连续的内存块,建议使用 Write 发送 // Writev(b net.Buffers) (n int, err error) ReadAtLeast(buf []byte, min int) (n int, err error) ReadLine() (line []byte, isPrefix bool, err error) // 只有设置了ReadBufSize才可以使用这个方法 // Flush // // 如果使用了bufio写缓冲,则将缓冲中的数据发送出去 // 如果使用了channel异步发送,则阻塞等待,直到之前channel中的数据全部发送完毕 // // 一般在Close前,想要将剩余数据发送完毕时调用 // Flush() error // Done 阻塞直到连接关闭或发生错误 // // 注意,向上层严格保证,消息发送后,后续Read,Write等调用都将失败 // // 注意,向上层严格保证,消息只发送一次 // // @return 返回nil则是本端主动调用Close关闭 // Done() <-chan error // ModWriteChanSize Modxxx // // TODO chef: 这几个接口是否不提供 // Mod类型函数不加锁,需要调用方保证不发生竞态调用 // // ModWriteChanSize 只允许在初始化时为0的前提下调用 ModWriteChanSize(n int) ModWriteBufSize(n int) ModReadTimeoutMs(n int) ModWriteTimeoutMs(n int) // GetStat 连接上读取和发送的字节总数。 // 注意,如果是异步发送,发送字节统计的是调用底层write的值,而非上层调用Connection发送的值 // 也即不包含Connection中的发送缓存部分,但是可能包含内核socket发送缓冲区的值。 GetStat() Stat } type Stat struct { ReadBytesSum uint64 WroteBytesSum uint64 } type StatAtomic struct { ReadBytesSum nazaatomic.Uint64 WroteBytesSum nazaatomic.Uint64 } type WriteChanFullBehavior int const ( WriteChanFullBehaviorReturnError WriteChanFullBehavior = iota + 1 WriteChanFullBehaviorBlock ) type Option struct { // 如果不为0,则之后每次读/写使用bufio的缓冲 ReadBufSize int WriteBufSize int // 如果不为0,则之后每次读/写都带超时 ReadTimeoutMs int WriteTimeoutMs int // 如果不为0,则写使用channel将数据发送到后台协程中发送 WriteChanSize int // 使用channel发送数据时,channel满了时Write函数的行为 // WriteChanFullBehaviorReturnError 返回错误 // WriteChanFullBehaviorBlock 阻塞直到向channel写入成功 WriteChanFullBehavior WriteChanFullBehavior } // 没有配置的属性,将按如下配置 var defaultOption = Option{ ReadBufSize: 0, WriteBufSize: 0, ReadTimeoutMs: 0, WriteTimeoutMs: 0, WriteChanSize: 0, WriteChanFullBehavior: WriteChanFullBehaviorReturnError, } type ModOption func(option *Option) func New(conn net.Conn, modOptions ...ModOption) Connection { c := new(connection) c.uniqueKey = uniqueGen.GenUniqueKey() c.doneChan = make(chan error, 1) c.Conn = conn c.option = defaultOption for _, fn := range modOptions { fn(&c.option) } if c.option.ReadBufSize > 0 { c.r = bufio.NewReaderSize(conn, c.option.ReadBufSize) } else { c.r = conn } if c.option.WriteBufSize > 0 { c.w = bufio.NewWriterSize(conn, c.option.WriteBufSize) } else { c.w = conn } if c.option.WriteChanSize > 0 { c.wChan = make(chan wMsg, c.option.WriteChanSize) c.flushDoneChan = make(chan struct{}, 1) c.exitChan = make(chan struct{}, 1) go c.runWriteLoop() } nazalog.Debugf("[%s] lifecycle new connection. net.Conn=%p, naza.Connection=%p", c.uniqueKey, conn, c) return c } type wMsgType int const ( _ wMsgType = iota wMsgTypeWrite wMsgTypeWritev wMsgTypeFlush ) type wMsg struct { t wMsgType b []byte bs net.Buffers } type connection struct { Conn net.Conn r io.Reader w io.Writer option Option uniqueKey string wChan chan wMsg flushDoneChan chan struct{} exitChan chan struct{} doneChan chan error closedFlag nazaatomic.Bool closeOnce sync.Once stat StatAtomic } var uniqueGen *unique.SingleGenerator func (c *connection) ModWriteChanSize(n int) { if c.option.WriteChanSize > 0 { panic(ErrConnectionPanic) } if n == 0 { return } c.option.WriteChanSize = n c.wChan = make(chan wMsg, n) c.flushDoneChan = make(chan struct{}, 1) c.exitChan = make(chan struct{}, 1) go c.runWriteLoop() } func (c *connection) ModWriteBufSize(n int) { if c.option.WriteBufSize > 0 { // 如果之前已经设置过写缓冲,直接 panic // 这里改成 flush 后替换成新缓冲也行,暂时没这个必要 panic(ErrConnectionPanic) } c.option.WriteBufSize = n c.w = bufio.NewWriterSize(c.Conn, n) } func (c *connection) ModReadTimeoutMs(n int) { if c.option.ReadTimeoutMs > 0 { panic(ErrConnectionPanic) } c.option.ReadTimeoutMs = n } func (c *connection) ModWriteTimeoutMs(n int) { if c.option.WriteTimeoutMs > 0 { panic(ErrConnectionPanic) } c.option.WriteTimeoutMs = n } func (c *connection) ReadAtLeast(buf []byte, min int) (n int, err error) { if c.option.ReadTimeoutMs > 0 { err = c.SetReadDeadline(time.Now().Add(time.Duration(c.option.ReadTimeoutMs) * time.Millisecond)) if err != nil { c.close(err) return 0, err } } n, err = io.ReadAtLeast(c.r, buf, min) if err != nil { c.close(err) } c.stat.ReadBytesSum.Add(uint64(n)) return n, err } // TODO chef: 测试 bufio 设置的大小 < 换行符位置时的情况 func (c *connection) ReadLine() (line []byte, isPrefix bool, err error) { bufioReader, ok := c.r.(*bufio.Reader) if !ok { // 目前只有使用了 bufio.Reader 时才能执行 ReadLine 操作 panic(ErrConnectionPanic) } if c.option.ReadTimeoutMs > 0 { err = c.SetReadDeadline(time.Now().Add(time.Duration(c.option.ReadTimeoutMs) * time.Millisecond)) if err != nil { c.close(err) return nil, false, err } } line, isPrefix, err = bufioReader.ReadLine() if err != nil { c.close(err) } c.stat.ReadBytesSum.Add(uint64(len(line))) return line, isPrefix, err } func (c *connection) Read(b []byte) (n int, err error) { if c.option.ReadTimeoutMs > 0 { err = c.SetReadDeadline(time.Now().Add(time.Duration(c.option.ReadTimeoutMs) * time.Millisecond)) if err != nil { c.close(err) return 0, err } } n, err = c.r.Read(b) if err != nil { c.close(err) } c.stat.ReadBytesSum.Add(uint64(n)) return n, err } func (c *connection) Write(b []byte) (n int, err error) { if c.closedFlag.Load() { return 0, ErrClosedAlready } if c.option.WriteChanSize > 0 { switch c.option.WriteChanFullBehavior { case WriteChanFullBehaviorBlock: c.wChan <- wMsg{t: wMsgTypeWrite, b: b} return len(b), nil case WriteChanFullBehaviorReturnError: select { case c.wChan <- wMsg{t: wMsgTypeWrite, b: b}: return len(b), nil default: return 0, ErrWriteChanFull } } } return c.write(b) } func (c *connection) Writev(b net.Buffers) (n int, err error) { if c.closedFlag.Load() { return 0, ErrClosedAlready } if c.option.WriteChanSize > 0 { for _, v := range b { n += len(v) } switch c.option.WriteChanFullBehavior { case WriteChanFullBehaviorBlock: c.wChan <- wMsg{t: wMsgTypeWritev, bs: b} return n, nil case WriteChanFullBehaviorReturnError: select { case c.wChan <- wMsg{t: wMsgTypeWritev, bs: b}: return n, nil default: return 0, ErrWriteChanFull } } } return c.writev(b) } func (c *connection) Flush() error { if c.closedFlag.Load() { return ErrClosedAlready } if c.option.WriteChanSize > 0 { c.wChan <- wMsg{t: wMsgTypeFlush} <-c.flushDoneChan return nil } return c.flush() } func (c *connection) Close() error { nazalog.Debugf("[%s] Close.", c.uniqueKey) c.close(nil) return nil } func (c *connection) Done() <-chan error { return c.doneChan } func (c *connection) LocalAddr() net.Addr { return c.Conn.LocalAddr() } func (c *connection) RemoteAddr() net.Addr { return c.Conn.RemoteAddr() } func (c *connection) SetDeadline(t time.Time) error { err := c.Conn.SetDeadline(t) if err != nil { c.close(err) } return err } func (c *connection) SetReadDeadline(t time.Time) error { err := c.Conn.SetReadDeadline(t) if err != nil { c.close(err) } return err } func (c *connection) SetWriteDeadline(t time.Time) error { err := c.Conn.SetWriteDeadline(t) if err != nil { c.close(err) } return err } func (c *connection) GetStat() (s Stat) { s.ReadBytesSum = c.stat.ReadBytesSum.Load() s.WroteBytesSum = c.stat.WroteBytesSum.Load() return } func (c *connection) write(b []byte) (n int, err error) { if c.option.WriteTimeoutMs > 0 { err = c.SetWriteDeadline(time.Now().Add(time.Duration(c.option.WriteTimeoutMs) * time.Millisecond)) if err != nil { c.close(err) return 0, err } } n, err = c.w.Write(b) if err != nil { c.close(err) } c.stat.WroteBytesSum.Add(uint64(n)) return n, err } func (c *connection) writev(b net.Buffers) (n int, err error) { if c.option.WriteTimeoutMs > 0 { err = c.SetWriteDeadline(time.Now().Add(time.Duration(c.option.WriteTimeoutMs) * time.Millisecond)) if err != nil { c.close(err) return 0, err } } var n64 int64 n64, err = b.WriteTo(c.w) if err != nil { c.close(err) } n = int(n64) c.stat.WroteBytesSum.Add(uint64(n)) return n, err } func (c *connection) runWriteLoop() { for { select { case <-c.exitChan: //nazalog.Debugf("[%s] recv exitChan and exit write loop", c.uniqueKey) return case msg := <-c.wChan: switch msg.t { case wMsgTypeWrite: if _, err := c.write(msg.b); err != nil { return } case wMsgTypeWritev: if _, err := c.writev(msg.bs); err != nil { return } case wMsgTypeFlush: if err := c.flush(); err != nil { c.flushDoneChan <- struct{}{} return } c.flushDoneChan <- struct{}{} } } } } func (c *connection) flush() error { w, ok := c.w.(*bufio.Writer) if ok { if c.option.WriteTimeoutMs > 0 { err := c.SetWriteDeadline(time.Now().Add(time.Duration(c.option.WriteTimeoutMs) * time.Millisecond)) if err != nil { c.close(err) return err } } if err := w.Flush(); err != nil { c.close(err) return err } } return nil } func (c *connection) close(err error) { c.closeOnce.Do(func() { nazalog.Debugf("[%s] close once. err=%+v", c.uniqueKey, err) c.closedFlag.Store(true) if c.option.WriteChanSize > 0 { c.exitChan <- struct{}{} } // 注意,先Close后再发送消息,保证消息发送前,已经Close掉了 _ = c.Conn.Close() c.doneChan <- err // 注意,如果使用了wChan,并不关闭它,避免竞态条件下connection继续使用它造成问题。让它随connection对象释放。 }) } func init() { uniqueGen = unique.NewSingleGenerator("NAZACONN") }