[fix] package rtmp: 接收rtmp数据时,同一个message的多个chunk混合使用fmt1,2时,可能出现时间戳多加的情况

pull/2/head
q191201771
parent 9e80735f1a
commit 99ab8df79a

@ -40,16 +40,19 @@ type CompleteMessageCB func(stream *Stream) error
// @param cb 回调结束后,内存块会被 ChunkComposer 再次使用
func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
bootstrap := make([]byte, 11)
absTsFlag := false
for {
// 5.3.1.1. Chunk Basic Header
// 读取fmt和csid
if _, err := io.ReadAtLeast(reader, bootstrap[:1], 1); err != nil {
return err
}
// 5.3.1.1. Chunk Basic Header
fmt := (bootstrap[0] >> 6) & 0x03
csid := int(bootstrap[0] & 0x3f)
// csid可能是变长的
switch csid {
case 0:
if _, err := io.ReadAtLeast(reader, bootstrap[:1], 1); err != nil {
@ -68,6 +71,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
stream := c.getOrCreateStream(csid)
// 5.3.1.2. Chunk Message Header
// 当前chunk的fmt不同Message Header包含的字段也不同是变长
switch fmt {
case 0:
if _, err := io.ReadAtLeast(reader, bootstrap[:11], 11); err != nil {
@ -76,6 +80,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
// 包头中为绝对时间戳
stream.header.Timestamp = bele.BEUint24(bootstrap)
stream.header.TimestampAbs = stream.header.Timestamp
absTsFlag = true
stream.header.MsgLen = bele.BEUint24(bootstrap[3:])
stream.header.MsgTypeID = bootstrap[6]
stream.header.MsgStreamID = int(bele.LEUint32(bootstrap[7:]))
@ -87,7 +92,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
}
// 包头中为相对时间戳
stream.header.Timestamp = bele.BEUint24(bootstrap)
stream.header.TimestampAbs += stream.header.Timestamp
//stream.header.TimestampAbs += stream.header.Timestamp
stream.header.MsgLen = bele.BEUint24(bootstrap[3:])
stream.header.MsgTypeID = bootstrap[6]
@ -98,7 +103,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
}
// 包头中为相对时间戳
stream.header.Timestamp = bele.BEUint24(bootstrap)
stream.header.TimestampAbs += stream.header.Timestamp
//stream.header.TimestampAbs += stream.header.Timestamp
case 3:
// noop
@ -107,7 +112,9 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
// 5.3.1.3 Extended Timestamp
// 使用ffmpeg推流时发现时间戳超过3字节最大值后即使是fmt3(即包头大小为0)依然存在ext ts字段
// 所以这里我将 `==` 的判断改成了 `>=`
// TODO chef: 测试其他客户端和ext ts相关的表现
// TODO chef:
// - 测试其他客户端和ext ts相关的表现
// - 这部分可能还有问题需要根据具体的case调整
//if stream.header.Timestamp == maxTimestampInMessageHeader {
if stream.header.Timestamp >= maxTimestampInMessageHeader {
if _, err := io.ReadAtLeast(reader, bootstrap[:4], 4); err != nil {
@ -126,7 +133,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
}
}
//stream.header.CSID = csid
//log.Debugf("CHEFGREPME tag1 fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs)
//nazalog.Debugf("ChunkComposer chunk fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs)
var neededSize uint32
if stream.header.MsgLen <= c.peerChunkSize {
@ -152,6 +159,13 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
}
stream.header.CSID = csid
if !absTsFlag {
// 这么处理相当于取最后一个chunk的时间戳差值有的协议栈是取的第一个正常来说都可以
stream.header.TimestampAbs += stream.header.Timestamp
}
absTsFlag = false
//nazalog.Debugf("ChunkComposer message fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs)
if err := cb(stream); err != nil {
return err
}
@ -171,3 +185,14 @@ func (c *ChunkComposer) getOrCreateStream(csid int) *Stream {
}
return stream
}
// 临时存放一些rtmp推流case在这便于理解以及修改后回归用
//
// 场景ffmpeg推送test.flv至lals
// 关注点message超过chunk时fmt和timestamp的值
//
// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:143 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:520} csid:6 len:143 ts:520
// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:4511 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:560} csid:6 len:4511 ts:560
// ChunkComposer chunk fmt:3 header:{CSID:6 MsgLen:4511 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:560} csid:6 len:4511 ts:560
// 此处应只给上层返回一次也即一个message时间戳应该是560
// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:904 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:600} csid:6 len:904 ts:600

@ -193,7 +193,7 @@ func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error) {
// s2
// make digest to s2 suffix position
random1528(s2)
replyOffs := s2Len - keyLen
makeDigestWithoutCenterPart(s2, replyOffs, s2key, s2[replyOffs:])
}
@ -287,4 +287,3 @@ func init() {
random1528Buf[i] = bs[i%bsl]
}
}

@ -17,10 +17,11 @@ import (
const initMsgLen = 4096
// TODO chef: 将Timestamp字段隐藏不对外暴露
type Header struct {
CSID int
MsgLen uint32 // 不包含header的大小
Timestamp uint32 // NOTICE 是 rtmp 协议 header 中的时间戳,可能是绝对的,也可能是相对的。
Timestamp uint32 // NOTICE 是 rtmp 协议 header 中的时间戳,可能是绝对的,也可能是相对的。上层不应该使用这个字段而应该使用TimestampAbs
MsgTypeID uint8 // 8 audio 9 video 18 metadata
MsgStreamID int

Loading…
Cancel
Save