You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
naza/pkg/connection/connection.go

526 lines
12 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/naza
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
// Package connection
//
// 对 net.Conn 接口的二次封装,目的有两个:
// 1. 在流媒体传输这种特定的长连接场景下提供更方便、高性能的接口
// 2. 便于后续将TCPConn替换成其他传输协议
package connection
import (
"bufio"
"errors"
"io"
"net"
"sync"
"time"
"github.com/q191201771/naza/pkg/unique"
"github.com/q191201771/naza/pkg/nazaatomic"
"github.com/q191201771/naza/pkg/nazalog"
)
var (
ErrConnectionPanic = errors.New("naza.connection: using in a wrong way")
ErrClosedAlready = errors.New("naza.connection: connection closed already")
ErrWriteChanFull = errors.New("naza.connection: write channel full")
)
type Connection interface {
// ----- net.Conn interface ----------------------------------------------------------------------------------------
//
// 注意,如果没有特别说明,函数的语义和 net.Conn 相同
//
// Read ...
Read(b []byte) (n int, err error)
// Write
//
// @return n 发送成功的大小
// 注意,如果设置了 Option.WriteChanSize 做异步发送,那么`n`恒等于len(`b`)
//
Write(b []byte) (n int, err error)
// Close 允许调用多次
//
Close() error
LocalAddr() net.Addr
RemoteAddr() net.Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
// -----------------------------------------------------------------------------------------------------------------
// Writev 发送多块不连续的内存块时使用
//
// 当有多块不连续的内存块需要发送时,调用 Writev 在某些平台性能会优于以下做法:
// 1. 多次调用Write
// 2. 将多块内存块拷贝拼接成一块内存块后调用Write
// 原因是减少了系统调用以及内存拷贝(还有可能有内存管理)的开销
//
// 注意,如果需要发送的是一块连续的内存块,建议使用 Write 发送
//
Writev(b net.Buffers) (n int, err error)
ReadAtLeast(buf []byte, min int) (n int, err error)
ReadLine() (line []byte, isPrefix bool, err error) // 只有设置了ReadBufSize才可以使用这个方法
// Flush
//
// 如果使用了bufio写缓冲则将缓冲中的数据发送出去
// 如果使用了channel异步发送则阻塞等待直到之前channel中的数据全部发送完毕
//
// 一般在Close前想要将剩余数据发送完毕时调用
//
Flush() error
// Done 阻塞直到连接关闭或发生错误
//
// 注意向上层严格保证消息发送后后续ReadWrite等调用都将失败
//
// 注意,向上层严格保证,消息只发送一次
//
// @return 返回nil则是本端主动调用Close关闭
//
Done() <-chan error
// ModWriteChanSize Modxxx
//
// TODO chef: 这几个接口是否不提供
// Mod类型函数不加锁需要调用方保证不发生竞态调用
//
// ModWriteChanSize 只允许在初始化时为0的前提下调用
ModWriteChanSize(n int)
ModWriteBufSize(n int)
ModReadTimeoutMs(n int)
ModWriteTimeoutMs(n int)
// GetStat 连接上读取和发送的字节总数。
// 注意如果是异步发送发送字节统计的是调用底层write的值而非上层调用Connection发送的值
// 也即不包含Connection中的发送缓存部分但是可能包含内核socket发送缓冲区的值。
GetStat() Stat
}
type Stat struct {
ReadBytesSum uint64
WroteBytesSum uint64
}
type StatAtomic struct {
ReadBytesSum nazaatomic.Uint64
WroteBytesSum nazaatomic.Uint64
}
type WriteChanFullBehavior int
const (
WriteChanFullBehaviorReturnError WriteChanFullBehavior = iota + 1
WriteChanFullBehaviorBlock
)
type Option struct {
// 如果不为0则之后每次读/写使用bufio的缓冲
ReadBufSize int
WriteBufSize int
// 如果不为0则之后每次读/写都带超时
ReadTimeoutMs int
WriteTimeoutMs int
// 如果不为0则写使用channel将数据发送到后台协程中发送
WriteChanSize int
// 使用channel发送数据时channel满了时Write函数的行为
// WriteChanFullBehaviorReturnError 返回错误
// WriteChanFullBehaviorBlock 阻塞直到向channel写入成功
WriteChanFullBehavior WriteChanFullBehavior
}
// 没有配置的属性,将按如下配置
var defaultOption = Option{
ReadBufSize: 0,
WriteBufSize: 0,
ReadTimeoutMs: 0,
WriteTimeoutMs: 0,
WriteChanSize: 0,
WriteChanFullBehavior: WriteChanFullBehaviorReturnError,
}
type ModOption func(option *Option)
func New(conn net.Conn, modOptions ...ModOption) Connection {
c := new(connection)
c.uniqueKey = uniqueGen.GenUniqueKey()
c.doneChan = make(chan error, 1)
c.Conn = conn
c.option = defaultOption
for _, fn := range modOptions {
fn(&c.option)
}
if c.option.ReadBufSize > 0 {
c.r = bufio.NewReaderSize(conn, c.option.ReadBufSize)
} else {
c.r = conn
}
if c.option.WriteBufSize > 0 {
c.w = bufio.NewWriterSize(conn, c.option.WriteBufSize)
} else {
c.w = conn
}
if c.option.WriteChanSize > 0 {
c.wChan = make(chan wMsg, c.option.WriteChanSize)
c.flushDoneChan = make(chan struct{}, 1)
c.exitChan = make(chan struct{}, 1)
go c.runWriteLoop()
}
nazalog.Debugf("[%s] lifecycle new connection. net.Conn=%p, naza.Connection=%p", c.uniqueKey, conn, c)
return c
}
type wMsgType int
const (
_ wMsgType = iota
wMsgTypeWrite
wMsgTypeWritev
wMsgTypeFlush
)
type wMsg struct {
t wMsgType
b []byte
bs net.Buffers
}
type connection struct {
Conn net.Conn
r io.Reader
w io.Writer
option Option
uniqueKey string
wChan chan wMsg
flushDoneChan chan struct{}
exitChan chan struct{}
doneChan chan error
closedFlag nazaatomic.Bool
closeOnce sync.Once
stat StatAtomic
}
var uniqueGen *unique.SingleGenerator
func (c *connection) ModWriteChanSize(n int) {
if c.option.WriteChanSize > 0 {
panic(ErrConnectionPanic)
}
if n == 0 {
return
}
c.option.WriteChanSize = n
c.wChan = make(chan wMsg, n)
c.flushDoneChan = make(chan struct{}, 1)
c.exitChan = make(chan struct{}, 1)
go c.runWriteLoop()
}
func (c *connection) ModWriteBufSize(n int) {
if c.option.WriteBufSize > 0 {
// 如果之前已经设置过写缓冲,直接 panic
// 这里改成 flush 后替换成新缓冲也行,暂时没这个必要
panic(ErrConnectionPanic)
}
c.option.WriteBufSize = n
c.w = bufio.NewWriterSize(c.Conn, n)
}
func (c *connection) ModReadTimeoutMs(n int) {
if c.option.ReadTimeoutMs > 0 {
panic(ErrConnectionPanic)
}
c.option.ReadTimeoutMs = n
}
func (c *connection) ModWriteTimeoutMs(n int) {
if c.option.WriteTimeoutMs > 0 {
panic(ErrConnectionPanic)
}
c.option.WriteTimeoutMs = n
}
func (c *connection) ReadAtLeast(buf []byte, min int) (n int, err error) {
if c.option.ReadTimeoutMs > 0 {
err = c.SetReadDeadline(time.Now().Add(time.Duration(c.option.ReadTimeoutMs) * time.Millisecond))
if err != nil {
c.close(err)
return 0, err
}
}
n, err = io.ReadAtLeast(c.r, buf, min)
if err != nil {
c.close(err)
}
c.stat.ReadBytesSum.Add(uint64(n))
return n, err
}
// TODO chef: 测试 bufio 设置的大小 < 换行符位置时的情况
func (c *connection) ReadLine() (line []byte, isPrefix bool, err error) {
bufioReader, ok := c.r.(*bufio.Reader)
if !ok {
// 目前只有使用了 bufio.Reader 时才能执行 ReadLine 操作
panic(ErrConnectionPanic)
}
if c.option.ReadTimeoutMs > 0 {
err = c.SetReadDeadline(time.Now().Add(time.Duration(c.option.ReadTimeoutMs) * time.Millisecond))
if err != nil {
c.close(err)
return nil, false, err
}
}
line, isPrefix, err = bufioReader.ReadLine()
if err != nil {
c.close(err)
}
c.stat.ReadBytesSum.Add(uint64(len(line)))
return line, isPrefix, err
}
func (c *connection) Read(b []byte) (n int, err error) {
if c.option.ReadTimeoutMs > 0 {
err = c.SetReadDeadline(time.Now().Add(time.Duration(c.option.ReadTimeoutMs) * time.Millisecond))
if err != nil {
c.close(err)
return 0, err
}
}
n, err = c.r.Read(b)
if err != nil {
c.close(err)
}
c.stat.ReadBytesSum.Add(uint64(n))
return n, err
}
func (c *connection) Write(b []byte) (n int, err error) {
if c.closedFlag.Load() {
return 0, ErrClosedAlready
}
if c.option.WriteChanSize > 0 {
switch c.option.WriteChanFullBehavior {
case WriteChanFullBehaviorBlock:
c.wChan <- wMsg{t: wMsgTypeWrite, b: b}
return len(b), nil
case WriteChanFullBehaviorReturnError:
select {
case c.wChan <- wMsg{t: wMsgTypeWrite, b: b}:
return len(b), nil
default:
return 0, ErrWriteChanFull
}
}
}
return c.write(b)
}
func (c *connection) Writev(b net.Buffers) (n int, err error) {
if c.closedFlag.Load() {
return 0, ErrClosedAlready
}
if c.option.WriteChanSize > 0 {
for _, v := range b {
n += len(v)
}
switch c.option.WriteChanFullBehavior {
case WriteChanFullBehaviorBlock:
c.wChan <- wMsg{t: wMsgTypeWritev, bs: b}
return n, nil
case WriteChanFullBehaviorReturnError:
select {
case c.wChan <- wMsg{t: wMsgTypeWritev, bs: b}:
return n, nil
default:
return 0, ErrWriteChanFull
}
}
}
return c.writev(b)
}
func (c *connection) Flush() error {
if c.closedFlag.Load() {
return ErrClosedAlready
}
if c.option.WriteChanSize > 0 {
c.wChan <- wMsg{t: wMsgTypeFlush}
<-c.flushDoneChan
return nil
}
return c.flush()
}
func (c *connection) Close() error {
nazalog.Debugf("[%s] Close.", c.uniqueKey)
c.close(nil)
return nil
}
func (c *connection) Done() <-chan error {
return c.doneChan
}
func (c *connection) LocalAddr() net.Addr {
return c.Conn.LocalAddr()
}
func (c *connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
func (c *connection) SetDeadline(t time.Time) error {
err := c.Conn.SetDeadline(t)
if err != nil {
c.close(err)
}
return err
}
func (c *connection) SetReadDeadline(t time.Time) error {
err := c.Conn.SetReadDeadline(t)
if err != nil {
c.close(err)
}
return err
}
func (c *connection) SetWriteDeadline(t time.Time) error {
err := c.Conn.SetWriteDeadline(t)
if err != nil {
c.close(err)
}
return err
}
func (c *connection) GetStat() (s Stat) {
s.ReadBytesSum = c.stat.ReadBytesSum.Load()
s.WroteBytesSum = c.stat.WroteBytesSum.Load()
return
}
func (c *connection) write(b []byte) (n int, err error) {
if c.option.WriteTimeoutMs > 0 {
err = c.SetWriteDeadline(time.Now().Add(time.Duration(c.option.WriteTimeoutMs) * time.Millisecond))
if err != nil {
c.close(err)
return 0, err
}
}
n, err = c.w.Write(b)
if err != nil {
c.close(err)
}
c.stat.WroteBytesSum.Add(uint64(n))
return n, err
}
func (c *connection) writev(b net.Buffers) (n int, err error) {
if c.option.WriteTimeoutMs > 0 {
err = c.SetWriteDeadline(time.Now().Add(time.Duration(c.option.WriteTimeoutMs) * time.Millisecond))
if err != nil {
c.close(err)
return 0, err
}
}
var n64 int64
n64, err = b.WriteTo(c.w)
if err != nil {
c.close(err)
}
n = int(n64)
c.stat.WroteBytesSum.Add(uint64(n))
return n, err
}
func (c *connection) runWriteLoop() {
for {
select {
case <-c.exitChan:
//nazalog.Debugf("[%s] recv exitChan and exit write loop", c.uniqueKey)
return
case msg := <-c.wChan:
switch msg.t {
case wMsgTypeWrite:
if _, err := c.write(msg.b); err != nil {
return
}
case wMsgTypeWritev:
if _, err := c.writev(msg.bs); err != nil {
return
}
case wMsgTypeFlush:
if err := c.flush(); err != nil {
c.flushDoneChan <- struct{}{}
return
}
c.flushDoneChan <- struct{}{}
}
}
}
}
func (c *connection) flush() error {
w, ok := c.w.(*bufio.Writer)
if ok {
if c.option.WriteTimeoutMs > 0 {
err := c.SetWriteDeadline(time.Now().Add(time.Duration(c.option.WriteTimeoutMs) * time.Millisecond))
if err != nil {
c.close(err)
return err
}
}
if err := w.Flush(); err != nil {
c.close(err)
return err
}
}
return nil
}
func (c *connection) close(err error) {
c.closeOnce.Do(func() {
nazalog.Debugf("[%s] close once. err=%+v", c.uniqueKey, err)
c.closedFlag.Store(true)
if c.option.WriteChanSize > 0 {
c.exitChan <- struct{}{}
}
// 注意先Close后再发送消息保证消息发送前已经Close掉了
_ = c.Conn.Close()
c.doneChan <- err
// 注意如果使用了wChan并不关闭它避免竞态条件下connection继续使用它造成问题。让它随connection对象释放。
})
}
func init() {
uniqueGen = unique.NewSingleGenerator("NAZACONN")
}