From 3fa57acfc7af246158c221fef290f8acf596253d Mon Sep 17 00:00:00 2001 From: joestarzxh Date: Sat, 3 Apr 2021 16:05:56 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=E4=BF=AE=E6=AD=A3RTMP=20aggregate=20?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E6=88=B3=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rtmp/chunk_composer.go | 50 +++++++++++++++++++++++++++++++++++--- pkg/rtmp/client_session.go | 47 +---------------------------------- pkg/rtmp/server_session.go | 46 +---------------------------------- pkg/rtmp/tag.go | 8 ------ 4 files changed, 48 insertions(+), 103 deletions(-) delete mode 100644 pkg/rtmp/tag.go diff --git a/pkg/rtmp/chunk_composer.go b/pkg/rtmp/chunk_composer.go index 3a00a7f..bbe63eb 100644 --- a/pkg/rtmp/chunk_composer.go +++ b/pkg/rtmp/chunk_composer.go @@ -37,7 +37,7 @@ func (c *ChunkComposer) SetPeerChunkSize(val uint32) { c.peerChunkSize = val } -type OnCompleteMessage func(stream *Stream, c *ChunkComposer) error +type OnCompleteMessage func(stream *Stream) error // @param cb 回调结束后,内存块会被 ChunkComposer 再次使用 func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { @@ -167,10 +167,52 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { } absTsFlag = false //nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, c=%p", fmt, csid, stream.header, c) - - if err := cb(stream, c); err != nil { - return err + if stream.header.MsgTypeID == base.RTMPTypeIDAggregateMessage { + aggregateStream := NewStream() + var aggregateFirstTimestamp uint32 = 0 + Aggregate: + for { + //aggregate里时间戳相对值; + if stream.msg.b < stream.msg.e && stream.msg.e-stream.msg.b > 11 { + aggregateStream.header.CSID = stream.header.CSID + aggregateStream.header.MsgStreamID = stream.header.MsgStreamID + //十一个字节TagHeader + aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11] + stream.msg.b += 11 + aggregateStream.header.MsgTypeID = aggreTagHeaderBuf[0] + aggregateStream.header.MsgLen = bele.BEUint24(aggreTagHeaderBuf[1:]) + aggregateStream.timestamp = bele.BEUint24(aggreTagHeaderBuf[4:]) + if aggreTagHeaderBuf[7] > 0 { + aggregateStream.timestamp += uint32(aggreTagHeaderBuf[7] << 24) + } + if stream.msg.b == 11 { + aggregateFirstTimestamp = aggregateStream.timestamp + } + + aggregateStream.header.TimestampAbs = stream.header.TimestampAbs + aggregateStream.timestamp - aggregateFirstTimestamp + if stream.msg.e-stream.msg.b < aggregateStream.header.MsgLen+4 { + break Aggregate + } + aggregateStream.msg.buf = stream.msg.buf[stream.msg.b : stream.msg.b+aggregateStream.header.MsgLen] + aggregateStream.msg.b = 0 + aggregateStream.msg.e = aggregateStream.header.MsgLen + stream.msg.b += aggregateStream.header.MsgLen + //4字节pre tag 长度 + _ = bele.BEUint32(stream.msg.buf[stream.msg.b:]) //返回值忽略 + stream.msg.b += 4 + + cb(aggregateStream) + } else { + break Aggregate + } + + } + } else { + if err := cb(stream); err != nil { + return err + } } + stream.msg.clear() } if stream.msg.len() > stream.header.MsgLen { diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index b8fd053..e6d2074 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -314,7 +314,7 @@ func (s *ClientSession) runReadLoop() { _ = s.chunkComposer.RunLoop(s.conn, s.doMsg) } -func (s *ClientSession) doMsg(stream *Stream, c *ChunkComposer) error { +func (s *ClientSession) doMsg(stream *Stream) error { switch stream.header.MsgTypeID { case base.RTMPTypeIDWinAckSize: fallthrough @@ -338,8 +338,6 @@ func (s *ClientSession) doMsg(stream *Stream, c *ChunkComposer) error { fallthrough case base.RTMPTypeIDVideo: s.onReadRTMPAVMsg(stream.toAVMsg()) - case base.RTMPTypeIDAggregateMessage: - s.doAggregateMessage(stream, c) default: nazalog.Errorf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString()) panic(0) @@ -480,49 +478,6 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error { } return nil } -func (s *ClientSession) doAggregateMessage(stream *Stream, c *ChunkComposer) error { - var aggreTagHeader *TagHeader = new(TagHeader) - aggregateStream := NewStream() - for { - if stream.msg.b < stream.msg.e { - aggregateStream.header.CSID = stream.header.CSID - aggregateStream.header.MsgStreamID = stream.header.MsgStreamID - aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11] - aggreTagHeader.Type = aggreTagHeaderBuf[0] - aggregateStream.header.MsgTypeID = aggreTagHeader.Type - if aggreTagHeader.Type == base.RTMPTypeIDAudio || aggreTagHeader.Type == base.RTMPTypeIDVideo { - aggreTagHeader.DataSize = bele.BEUint24(aggreTagHeaderBuf[1:]) - aggreTagHeader.Timestamp = bele.BEUint24(aggreTagHeaderBuf[4:]) - if aggreTagHeaderBuf[7] > 0 { - aggreTagHeader.Timestamp += uint32(aggreTagHeaderBuf[7] << 24) - } - aggreTagHeader.StreamID = bele.BEUint24(aggreTagHeaderBuf[8:]) - aggregateStream.timestamp = aggreTagHeader.Timestamp - aggregateStream.header.TimestampAbs = aggreTagHeader.Timestamp - aggregateStream.header.MsgLen = aggreTagHeader.DataSize - - aggregateStream.msg.buf = stream.msg.buf[stream.msg.b+11 : stream.msg.b+11+aggregateStream.header.MsgLen] - aggregateStream.msg.b = 0 - aggregateStream.msg.e = aggregateStream.header.MsgLen - stream.msg.b = stream.msg.b + 11 + aggregateStream.header.MsgLen - preTagLen := bele.BEUint32(stream.msg.buf[stream.msg.b:]) - if preTagLen != aggregateStream.header.MsgLen { - nazalog.Errorf("[%s] AggregateMessage preTagLen Error.", s.uniqueKey) - return ErrRTMP - } - stream.msg.b += 4 - s.onReadRTMPAVMsg(aggregateStream.toAVMsg()) - } else { - nazalog.Errorf("[%s] AggregateMessage audio/video TypeID Error.", s.uniqueKey) - return ErrRTMP - } - } else { - return nil - } - - } - return nil -} func (s *ClientSession) doProtocolControlMessage(stream *Stream) error { if stream.msg.len() < 4 { return ErrRTMP diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index c1f5f2b..d0726c3 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -196,7 +196,7 @@ func (s *ServerSession) handshake() error { return nil } -func (s *ServerSession) doMsg(stream *Stream, c *ChunkComposer) error { +func (s *ServerSession) doMsg(stream *Stream) error { //log.Debugf("%d %d %v", stream.header.msgTypeID, stream.msgLen, stream.header) switch stream.header.MsgTypeID { case base.RTMPTypeIDSetChunkSize: @@ -218,8 +218,6 @@ func (s *ServerSession) doMsg(stream *Stream, c *ChunkComposer) error { return ErrRTMP } s.avObserver.OnReadRTMPAVMsg(stream.toAVMsg()) - case base.RTMPTypeIDAggregateMessage: - s.doAggregateMessage(stream, c) default: nazalog.Warnf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString()) @@ -327,49 +325,7 @@ func (s *ServerSession) doCommandAFM3Message(stream *Stream) error { stream.msg.consumed(1) return s.doCommandMessage(stream) } -func (s *ServerSession) doAggregateMessage(stream *Stream, c *ChunkComposer) error { - var aggreTagHeader *TagHeader = new(TagHeader) - aggregateStream := NewStream() - for { - if stream.msg.b < stream.msg.e { - aggregateStream.header.CSID = stream.header.CSID - aggregateStream.header.MsgStreamID = stream.header.MsgStreamID - aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11] - aggreTagHeader.Type = aggreTagHeaderBuf[0] - aggregateStream.header.MsgTypeID = aggreTagHeader.Type - if aggreTagHeader.Type == base.RTMPTypeIDAudio || aggreTagHeader.Type == base.RTMPTypeIDVideo { - aggreTagHeader.DataSize = bele.BEUint24(aggreTagHeaderBuf[1:]) - aggreTagHeader.Timestamp = bele.BEUint24(aggreTagHeaderBuf[4:]) - if aggreTagHeaderBuf[7] > 0 { - aggreTagHeader.Timestamp += uint32(aggreTagHeaderBuf[7] << 24) - } - aggreTagHeader.StreamID = bele.BEUint24(aggreTagHeaderBuf[8:]) - aggregateStream.timestamp = aggreTagHeader.Timestamp - aggregateStream.header.TimestampAbs = aggreTagHeader.Timestamp - aggregateStream.header.MsgLen = aggreTagHeader.DataSize - - aggregateStream.msg.buf = stream.msg.buf[stream.msg.b+11 : stream.msg.b+11+aggregateStream.header.MsgLen] - aggregateStream.msg.b = 0 - aggregateStream.msg.e = aggregateStream.header.MsgLen - stream.msg.b = stream.msg.b + 11 + aggregateStream.header.MsgLen - preTagLen := bele.BEUint32(stream.msg.buf[stream.msg.b:]) - if preTagLen != aggregateStream.header.MsgLen { - nazalog.Errorf("[%s] AggregateMessage preTagLen Error.", s.uniqueKey) - return ErrRTMP - } - stream.msg.b += 4 - s.avObserver.OnReadRTMPAVMsg(aggregateStream.toAVMsg()) - } else { - nazalog.Errorf("[%s] AggregateMessage audio/video TypeID Error.", s.uniqueKey) - return ErrRTMP - } - } else { - return nil - } - } - return nil -} func (s *ServerSession) doConnect(tid int, stream *Stream) error { val, err := stream.msg.readObjectWithType() if err != nil { diff --git a/pkg/rtmp/tag.go b/pkg/rtmp/tag.go deleted file mode 100644 index 0e0b130..0000000 --- a/pkg/rtmp/tag.go +++ /dev/null @@ -1,8 +0,0 @@ -package rtmp - -type TagHeader struct { - Type uint8 // type - DataSize uint32 // body大小,不包含 header 和 prev tag size 字段 3byte - Timestamp uint32 // 绝对时间戳,单位毫秒 4byte - StreamID uint32 // always 0 3byte -}