|
|
|
@ -91,6 +91,9 @@ type ServerSession struct {
|
|
|
|
|
disposeOnce sync.Once
|
|
|
|
|
|
|
|
|
|
DisposeByObserverFlag bool
|
|
|
|
|
|
|
|
|
|
recvLastAck uint64
|
|
|
|
|
seqNum uint32
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewServerSession(observer IServerSessionObserver, conn net.Conn) *ServerSession {
|
|
|
|
@ -204,6 +207,11 @@ func (s *ServerSession) handshake() error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerSession) doMsg(stream *Stream) error {
|
|
|
|
|
if s.sessionStat.BaseType() == base.SessionBaseTypePubStr {
|
|
|
|
|
if err := s.doRespAcknowledgement(stream); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//log.Debugf("%d %d %v", stream.header.msgTypeId, stream.msgLen, stream.header)
|
|
|
|
|
switch stream.header.MsgTypeId {
|
|
|
|
|
case base.RtmpTypeIdSetChunkSize:
|
|
|
|
@ -345,6 +353,23 @@ func (s *ServerSession) doCommandAmf3Message(stream *Stream) error {
|
|
|
|
|
stream.msg.Skip(1)
|
|
|
|
|
return s.doCommandMessage(stream)
|
|
|
|
|
}
|
|
|
|
|
func (s *ServerSession) doRespAcknowledgement(stream *Stream) error {
|
|
|
|
|
currStat := s.conn.GetStat()
|
|
|
|
|
delta := uint32(currStat.ReadBytesSum - s.recvLastAck)
|
|
|
|
|
//此次接收小于窗口大小一半,不处理
|
|
|
|
|
if delta < uint32(windowAcknowledgementSize/2) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
s.recvLastAck = currStat.ReadBytesSum
|
|
|
|
|
seqNum := s.seqNum + delta
|
|
|
|
|
//当序列号溢出时,将其重置
|
|
|
|
|
if seqNum > AckSeqMax {
|
|
|
|
|
seqNum = delta
|
|
|
|
|
}
|
|
|
|
|
s.seqNum = seqNum
|
|
|
|
|
//时间戳暂时先发0
|
|
|
|
|
return s.packer.writeAcknowledgement(s.conn, seqNum)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ServerSession) doConnect(tid int, stream *Stream) error {
|
|
|
|
|
val, err := stream.msg.readObjectWithType()
|
|
|
|
|