diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 3995ba0..8f8f5ef 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -643,7 +643,7 @@ func (s *ClientSession) doRespAcknowledgement(stream *Stream) error { s.recvLastAck = currStat.ReadBytesSum seqNum := s.seqNum + delta //当序列号溢出时,将其重置 - if seqNum > 0xf0000000 { + if seqNum > AckSeqMax { seqNum = delta } s.seqNum = seqNum diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index d76dcb9..d6c6716 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -91,6 +91,9 @@ type ServerSession struct { disposeOnce sync.Once DisposeByObserverFlag bool + + recvLastAck uint64 + seqNum uint32 } func NewServerSession(observer IServerSessionObserver, conn net.Conn) *ServerSession { @@ -204,6 +207,11 @@ func (s *ServerSession) handshake() error { } func (s *ServerSession) doMsg(stream *Stream) error { + if s.sessionStat.BaseType() == base.SessionBaseTypePubStr { + if err := s.doRespAcknowledgement(stream); err != nil { + return err + } + } //log.Debugf("%d %d %v", stream.header.msgTypeId, stream.msgLen, stream.header) switch stream.header.MsgTypeId { case base.RtmpTypeIdSetChunkSize: @@ -345,6 +353,23 @@ func (s *ServerSession) doCommandAmf3Message(stream *Stream) error { stream.msg.Skip(1) return s.doCommandMessage(stream) } +func (s *ServerSession) doRespAcknowledgement(stream *Stream) error { + currStat := s.conn.GetStat() + delta := uint32(currStat.ReadBytesSum - s.recvLastAck) + //此次接收小于窗口大小一半,不处理 + if delta < uint32(windowAcknowledgementSize/2) { + return nil + } + s.recvLastAck = currStat.ReadBytesSum + seqNum := s.seqNum + delta + //当序列号溢出时,将其重置 + if seqNum > AckSeqMax { + seqNum = delta + } + s.seqNum = seqNum + //时间戳暂时先发0 + return s.packer.writeAcknowledgement(s.conn, seqNum) +} func (s *ServerSession) doConnect(tid int, stream *Stream) error { val, err := stream.msg.readObjectWithType() diff --git a/pkg/rtmp/var.go b/pkg/rtmp/var.go index 91f155a..5859639 100644 --- a/pkg/rtmp/var.go +++ b/pkg/rtmp/var.go @@ -46,3 +46,4 @@ var ( // 接收rtmp数据时,msg的初始内存块大小 // 注意,该值只影响性能,不影响功能(大小不够会自动扩容) const initMsgLen = 4096 +const AckSeqMax = 0xf0000000