From 99ab8df79ac2e2bc1c588455d457123701ee0aba Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sun, 24 May 2020 00:24:42 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=20package=20rtmp:=20=E6=8E=A5=E6=94=B6rtm?= =?UTF-8?q?p=E6=95=B0=E6=8D=AE=E6=97=B6=EF=BC=8C=E5=90=8C=E4=B8=80?= =?UTF-8?q?=E4=B8=AAmessage=E7=9A=84=E5=A4=9A=E4=B8=AAchunk=E6=B7=B7?= =?UTF-8?q?=E5=90=88=E4=BD=BF=E7=94=A8fmt1=EF=BC=8C2=E6=97=B6=EF=BC=8C?= =?UTF-8?q?=E5=8F=AF=E8=83=BD=E5=87=BA=E7=8E=B0=E6=97=B6=E9=97=B4=E6=88=B3?= =?UTF-8?q?=E5=A4=9A=E5=8A=A0=E7=9A=84=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rtmp/chunk_composer.go | 35 ++++++++++++++++++++++++++++++----- pkg/rtmp/handshake.go | 3 +-- pkg/rtmp/stream.go | 3 ++- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/pkg/rtmp/chunk_composer.go b/pkg/rtmp/chunk_composer.go index 624a69d..2a2ad32 100644 --- a/pkg/rtmp/chunk_composer.go +++ b/pkg/rtmp/chunk_composer.go @@ -40,16 +40,19 @@ type CompleteMessageCB func(stream *Stream) error // @param cb 回调结束后,内存块会被 ChunkComposer 再次使用 func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { bootstrap := make([]byte, 11) + absTsFlag := false for { + // 5.3.1.1. Chunk Basic Header + // 读取fmt和csid if _, err := io.ReadAtLeast(reader, bootstrap[:1], 1); err != nil { return err } - // 5.3.1.1. Chunk Basic Header fmt := (bootstrap[0] >> 6) & 0x03 csid := int(bootstrap[0] & 0x3f) + // csid可能是变长的 switch csid { case 0: if _, err := io.ReadAtLeast(reader, bootstrap[:1], 1); err != nil { @@ -68,6 +71,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { stream := c.getOrCreateStream(csid) // 5.3.1.2. Chunk Message Header + // 当前chunk的fmt不同,Message Header包含的字段也不同,是变长 switch fmt { case 0: if _, err := io.ReadAtLeast(reader, bootstrap[:11], 11); err != nil { @@ -76,6 +80,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { // 包头中为绝对时间戳 stream.header.Timestamp = bele.BEUint24(bootstrap) stream.header.TimestampAbs = stream.header.Timestamp + absTsFlag = true stream.header.MsgLen = bele.BEUint24(bootstrap[3:]) stream.header.MsgTypeID = bootstrap[6] stream.header.MsgStreamID = int(bele.LEUint32(bootstrap[7:])) @@ -87,7 +92,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { } // 包头中为相对时间戳 stream.header.Timestamp = bele.BEUint24(bootstrap) - stream.header.TimestampAbs += stream.header.Timestamp + //stream.header.TimestampAbs += stream.header.Timestamp stream.header.MsgLen = bele.BEUint24(bootstrap[3:]) stream.header.MsgTypeID = bootstrap[6] @@ -98,7 +103,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { } // 包头中为相对时间戳 stream.header.Timestamp = bele.BEUint24(bootstrap) - stream.header.TimestampAbs += stream.header.Timestamp + //stream.header.TimestampAbs += stream.header.Timestamp case 3: // noop @@ -107,7 +112,9 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { // 5.3.1.3 Extended Timestamp // 使用ffmpeg推流时,发现时间戳超过3字节最大值后,即使是fmt3(即包头大小为0),依然存在ext ts字段 // 所以这里我将 `==` 的判断改成了 `>=` - // TODO chef: 测试其他客户端和ext ts相关的表现 + // TODO chef: + // - 测试其他客户端和ext ts相关的表现 + // - 这部分可能还有问题,需要根据具体的case调整 //if stream.header.Timestamp == maxTimestampInMessageHeader { if stream.header.Timestamp >= maxTimestampInMessageHeader { if _, err := io.ReadAtLeast(reader, bootstrap[:4], 4); err != nil { @@ -126,7 +133,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { } } //stream.header.CSID = csid - //log.Debugf("CHEFGREPME tag1 fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs) + //nazalog.Debugf("ChunkComposer chunk fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs) var neededSize uint32 if stream.header.MsgLen <= c.peerChunkSize { @@ -152,6 +159,13 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { } stream.header.CSID = csid + if !absTsFlag { + // 这么处理相当于取最后一个chunk的时间戳差值,有的协议栈是取的第一个,正常来说都可以 + stream.header.TimestampAbs += stream.header.Timestamp + } + absTsFlag = false + //nazalog.Debugf("ChunkComposer message fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs) + if err := cb(stream); err != nil { return err } @@ -171,3 +185,14 @@ func (c *ChunkComposer) getOrCreateStream(csid int) *Stream { } return stream } + +// 临时存放一些rtmp推流case在这,便于理解,以及修改后,回归用 +// +// 场景:ffmpeg推送test.flv至lals +// 关注点:message超过chunk时,fmt和timestamp的值 +// +// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:143 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:520} csid:6 len:143 ts:520 +// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:4511 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:560} csid:6 len:4511 ts:560 +// ChunkComposer chunk fmt:3 header:{CSID:6 MsgLen:4511 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:560} csid:6 len:4511 ts:560 +// 此处应只给上层返回一次,也即一个message,时间戳应该是560 +// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:904 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:600} csid:6 len:904 ts:600 diff --git a/pkg/rtmp/handshake.go b/pkg/rtmp/handshake.go index dbfcd17..4b681ae 100644 --- a/pkg/rtmp/handshake.go +++ b/pkg/rtmp/handshake.go @@ -193,7 +193,7 @@ func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error) { // s2 // make digest to s2 suffix position random1528(s2) - + replyOffs := s2Len - keyLen makeDigestWithoutCenterPart(s2, replyOffs, s2key, s2[replyOffs:]) } @@ -287,4 +287,3 @@ func init() { random1528Buf[i] = bs[i%bsl] } } - diff --git a/pkg/rtmp/stream.go b/pkg/rtmp/stream.go index a1ec366..b7ae5cc 100644 --- a/pkg/rtmp/stream.go +++ b/pkg/rtmp/stream.go @@ -17,10 +17,11 @@ import ( const initMsgLen = 4096 +// TODO chef: 将Timestamp字段隐藏,不对外暴露 type Header struct { CSID int MsgLen uint32 // 不包含header的大小 - Timestamp uint32 // NOTICE 是 rtmp 协议 header 中的时间戳,可能是绝对的,也可能是相对的。 + Timestamp uint32 // NOTICE 是 rtmp 协议 header 中的时间戳,可能是绝对的,也可能是相对的。上层不应该使用这个字段,而应该使用TimestampAbs MsgTypeID uint8 // 8 audio 9 video 18 metadata MsgStreamID int