|
|
|
@ -38,13 +38,15 @@ type Connection interface {
|
|
|
|
|
|
|
|
|
|
// 如果使用了 bufio 写缓冲,则将缓冲中的数据发送出去
|
|
|
|
|
// 如果使用了 channel 异步发送,则阻塞等待,直到之前 channel 中的数据全部发送完毕
|
|
|
|
|
// 一般在 Close 前,想要将剩余数据发送完毕时调用
|
|
|
|
|
Flush() error
|
|
|
|
|
|
|
|
|
|
// 阻塞直到连接主动或被动关闭
|
|
|
|
|
// 阻塞直到连接关闭或发生错误
|
|
|
|
|
// @return 返回 nil 则是本端主动调用 Close 关闭
|
|
|
|
|
Done() <-chan error
|
|
|
|
|
|
|
|
|
|
// TODO chef: 这几个接口是否不提供
|
|
|
|
|
// Mod类型函数不加锁,需要调用方保证不发生竞态调用
|
|
|
|
|
ModWriteChanSize(n int)
|
|
|
|
|
ModWriteBufSize(n int)
|
|
|
|
|
ModReadTimeoutMS(n int)
|
|
|
|
@ -60,7 +62,7 @@ type Config struct {
|
|
|
|
|
ReadTimeoutMS int
|
|
|
|
|
WriteTimeoutMS int
|
|
|
|
|
|
|
|
|
|
// 如果不过0,则写使用 channel 将数据发送到后台协程中发送
|
|
|
|
|
// 如果不为0,则写使用 channel 将数据发送到后台协程中发送
|
|
|
|
|
WChanSize int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -70,7 +72,6 @@ const (
|
|
|
|
|
_ wMsgT = iota
|
|
|
|
|
wMsgTWrite
|
|
|
|
|
wMsgTFlush
|
|
|
|
|
wMsgTClose // TODO chef: 没有使用
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type wmsg struct {
|
|
|
|
@ -114,9 +115,6 @@ type connection struct {
|
|
|
|
|
closeOnce sync.Once
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Mod类型函数不加锁
|
|
|
|
|
|
|
|
|
|
// 由调用方保证不和写操作并发执行
|
|
|
|
|
func (c *connection) ModWriteChanSize(n int) {
|
|
|
|
|
if c.config.WChanSize > 0 {
|
|
|
|
|
panic(connectionErr)
|
|
|
|
@ -155,13 +153,13 @@ func (c *connection) ReadAtLeast(buf []byte, min int) (n int, err error) {
|
|
|
|
|
if c.config.ReadTimeoutMS > 0 {
|
|
|
|
|
err = c.SetReadDeadline(time.Now().Add(time.Duration(c.config.ReadTimeoutMS) * time.Millisecond))
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
n, err = io.ReadAtLeast(c.r, buf, min)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
c.close(err)
|
|
|
|
|
}
|
|
|
|
|
return n, err
|
|
|
|
@ -176,13 +174,13 @@ func (c *connection) ReadLine() (line []byte, isPrefix bool, err error) {
|
|
|
|
|
if c.config.ReadTimeoutMS > 0 {
|
|
|
|
|
err = c.SetReadDeadline(time.Now().Add(time.Duration(c.config.ReadTimeoutMS) * time.Millisecond))
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
return nil, false, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
line, isPrefix, err = bufioReader.ReadLine()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
c.close(err)
|
|
|
|
|
}
|
|
|
|
|
return line, isPrefix, err
|
|
|
|
@ -199,13 +197,13 @@ func (c *connection) Read(b []byte) (n int, err error) {
|
|
|
|
|
if c.config.ReadTimeoutMS > 0 {
|
|
|
|
|
err = c.SetReadDeadline(time.Now().Add(time.Duration(c.config.ReadTimeoutMS) * time.Millisecond))
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
n, err = c.r.Read(b)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
c.close(err)
|
|
|
|
|
}
|
|
|
|
|
return n, err
|
|
|
|
@ -223,13 +221,13 @@ func (c *connection) write(b []byte) (n int, err error) {
|
|
|
|
|
if c.config.WriteTimeoutMS > 0 {
|
|
|
|
|
err = c.SetWriteDeadline(time.Now().Add(time.Duration(c.config.WriteTimeoutMS) * time.Millisecond))
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
n, err = c.w.Write(b)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
c.close(err)
|
|
|
|
|
}
|
|
|
|
|
return n, err
|
|
|
|
@ -245,18 +243,16 @@ func (c *connection) runWriteLoop() {
|
|
|
|
|
switch msg.t {
|
|
|
|
|
case wMsgTWrite:
|
|
|
|
|
if _, err := c.write(msg.b); err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
case wMsgTFlush:
|
|
|
|
|
if err := c.flush(); err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
c.flushDoneChan <- struct{}{}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
c.flushDoneChan <- struct{}{}
|
|
|
|
|
case wMsgTClose:
|
|
|
|
|
// TODO chef: 是否需要
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -278,12 +274,12 @@ func (c *connection) flush() error {
|
|
|
|
|
if c.config.WriteTimeoutMS > 0 {
|
|
|
|
|
err := c.SetWriteDeadline(time.Now().Add(time.Duration(c.config.WriteTimeoutMS) * time.Millisecond))
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err := w.Flush(); err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
c.close(err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -292,13 +288,13 @@ func (c *connection) flush() error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *connection) Close() error {
|
|
|
|
|
log.Debugf("Close.")
|
|
|
|
|
log.Debugf("nezha connection Close. conn=%p", c)
|
|
|
|
|
c.close(nil)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *connection) close(err error) {
|
|
|
|
|
log.Debugf("close. err=%v", err)
|
|
|
|
|
log.Debugf("nezha connection close. err=%v, conn=%p", err, c)
|
|
|
|
|
c.closeOnce.Do(func() {
|
|
|
|
|
if c.config.WChanSize > 0 {
|
|
|
|
|
c.exitChan <- struct{}{}
|
|
|
|
@ -310,15 +306,6 @@ func (c *connection) close(err error) {
|
|
|
|
|
|
|
|
|
|
func (c *connection) Done() <-chan error {
|
|
|
|
|
return c.doneChan
|
|
|
|
|
//err := <-c.doneChan
|
|
|
|
|
//log.Debugf("Done. err=%v", err)
|
|
|
|
|
//if err != nil {
|
|
|
|
|
// c.close(err)
|
|
|
|
|
//}
|
|
|
|
|
//
|
|
|
|
|
//ch := make(chan error, 1)
|
|
|
|
|
//ch <- err
|
|
|
|
|
//return ch
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *connection) LocalAddr() net.Addr {
|
|
|
|
@ -332,7 +319,7 @@ func (c *connection) RemoteAddr() net.Addr {
|
|
|
|
|
func (c *connection) SetDeadline(t time.Time) error {
|
|
|
|
|
err := c.Conn.SetDeadline(t)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
c.close(err)
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
@ -341,7 +328,7 @@ func (c *connection) SetDeadline(t time.Time) error {
|
|
|
|
|
func (c *connection) SetReadDeadline(t time.Time) error {
|
|
|
|
|
err := c.Conn.SetReadDeadline(t)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
c.close(err)
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
@ -350,7 +337,7 @@ func (c *connection) SetReadDeadline(t time.Time) error {
|
|
|
|
|
func (c *connection) SetWriteDeadline(t time.Time) error {
|
|
|
|
|
err := c.Conn.SetWriteDeadline(t)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debugf("error=%v", err)
|
|
|
|
|
log.Debugf("nezha connection. error=%v, conn=%p", err, c)
|
|
|
|
|
c.close(err)
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|