[fix]修正RTMP aggregate 时间戳问题

pull/51/head
joestarzxh 4 years ago
parent efdd0eaa6b
commit 3fa57acfc7

@ -37,7 +37,7 @@ func (c *ChunkComposer) SetPeerChunkSize(val uint32) {
c.peerChunkSize = val c.peerChunkSize = val
} }
type OnCompleteMessage func(stream *Stream, c *ChunkComposer) error type OnCompleteMessage func(stream *Stream) error
// @param cb 回调结束后,内存块会被 ChunkComposer 再次使用 // @param cb 回调结束后,内存块会被 ChunkComposer 再次使用
func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
@ -167,10 +167,52 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
} }
absTsFlag = false absTsFlag = false
//nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, c=%p", fmt, csid, stream.header, c) //nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, c=%p", fmt, csid, stream.header, c)
if stream.header.MsgTypeID == base.RTMPTypeIDAggregateMessage {
if err := cb(stream, c); err != nil { aggregateStream := NewStream()
return err var aggregateFirstTimestamp uint32 = 0
Aggregate:
for {
//aggregate里时间戳相对值
if stream.msg.b < stream.msg.e && stream.msg.e-stream.msg.b > 11 {
aggregateStream.header.CSID = stream.header.CSID
aggregateStream.header.MsgStreamID = stream.header.MsgStreamID
//十一个字节TagHeader
aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11]
stream.msg.b += 11
aggregateStream.header.MsgTypeID = aggreTagHeaderBuf[0]
aggregateStream.header.MsgLen = bele.BEUint24(aggreTagHeaderBuf[1:])
aggregateStream.timestamp = bele.BEUint24(aggreTagHeaderBuf[4:])
if aggreTagHeaderBuf[7] > 0 {
aggregateStream.timestamp += uint32(aggreTagHeaderBuf[7] << 24)
}
if stream.msg.b == 11 {
aggregateFirstTimestamp = aggregateStream.timestamp
}
aggregateStream.header.TimestampAbs = stream.header.TimestampAbs + aggregateStream.timestamp - aggregateFirstTimestamp
if stream.msg.e-stream.msg.b < aggregateStream.header.MsgLen+4 {
break Aggregate
}
aggregateStream.msg.buf = stream.msg.buf[stream.msg.b : stream.msg.b+aggregateStream.header.MsgLen]
aggregateStream.msg.b = 0
aggregateStream.msg.e = aggregateStream.header.MsgLen
stream.msg.b += aggregateStream.header.MsgLen
//4字节pre tag 长度
_ = bele.BEUint32(stream.msg.buf[stream.msg.b:]) //返回值忽略
stream.msg.b += 4
cb(aggregateStream)
} else {
break Aggregate
}
}
} else {
if err := cb(stream); err != nil {
return err
}
} }
stream.msg.clear() stream.msg.clear()
} }
if stream.msg.len() > stream.header.MsgLen { if stream.msg.len() > stream.header.MsgLen {

@ -314,7 +314,7 @@ func (s *ClientSession) runReadLoop() {
_ = s.chunkComposer.RunLoop(s.conn, s.doMsg) _ = s.chunkComposer.RunLoop(s.conn, s.doMsg)
} }
func (s *ClientSession) doMsg(stream *Stream, c *ChunkComposer) error { func (s *ClientSession) doMsg(stream *Stream) error {
switch stream.header.MsgTypeID { switch stream.header.MsgTypeID {
case base.RTMPTypeIDWinAckSize: case base.RTMPTypeIDWinAckSize:
fallthrough fallthrough
@ -338,8 +338,6 @@ func (s *ClientSession) doMsg(stream *Stream, c *ChunkComposer) error {
fallthrough fallthrough
case base.RTMPTypeIDVideo: case base.RTMPTypeIDVideo:
s.onReadRTMPAVMsg(stream.toAVMsg()) s.onReadRTMPAVMsg(stream.toAVMsg())
case base.RTMPTypeIDAggregateMessage:
s.doAggregateMessage(stream, c)
default: default:
nazalog.Errorf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString()) nazalog.Errorf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString())
panic(0) panic(0)
@ -480,49 +478,6 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
} }
return nil return nil
} }
func (s *ClientSession) doAggregateMessage(stream *Stream, c *ChunkComposer) error {
var aggreTagHeader *TagHeader = new(TagHeader)
aggregateStream := NewStream()
for {
if stream.msg.b < stream.msg.e {
aggregateStream.header.CSID = stream.header.CSID
aggregateStream.header.MsgStreamID = stream.header.MsgStreamID
aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11]
aggreTagHeader.Type = aggreTagHeaderBuf[0]
aggregateStream.header.MsgTypeID = aggreTagHeader.Type
if aggreTagHeader.Type == base.RTMPTypeIDAudio || aggreTagHeader.Type == base.RTMPTypeIDVideo {
aggreTagHeader.DataSize = bele.BEUint24(aggreTagHeaderBuf[1:])
aggreTagHeader.Timestamp = bele.BEUint24(aggreTagHeaderBuf[4:])
if aggreTagHeaderBuf[7] > 0 {
aggreTagHeader.Timestamp += uint32(aggreTagHeaderBuf[7] << 24)
}
aggreTagHeader.StreamID = bele.BEUint24(aggreTagHeaderBuf[8:])
aggregateStream.timestamp = aggreTagHeader.Timestamp
aggregateStream.header.TimestampAbs = aggreTagHeader.Timestamp
aggregateStream.header.MsgLen = aggreTagHeader.DataSize
aggregateStream.msg.buf = stream.msg.buf[stream.msg.b+11 : stream.msg.b+11+aggregateStream.header.MsgLen]
aggregateStream.msg.b = 0
aggregateStream.msg.e = aggregateStream.header.MsgLen
stream.msg.b = stream.msg.b + 11 + aggregateStream.header.MsgLen
preTagLen := bele.BEUint32(stream.msg.buf[stream.msg.b:])
if preTagLen != aggregateStream.header.MsgLen {
nazalog.Errorf("[%s] AggregateMessage preTagLen Error.", s.uniqueKey)
return ErrRTMP
}
stream.msg.b += 4
s.onReadRTMPAVMsg(aggregateStream.toAVMsg())
} else {
nazalog.Errorf("[%s] AggregateMessage audio/video TypeID Error.", s.uniqueKey)
return ErrRTMP
}
} else {
return nil
}
}
return nil
}
func (s *ClientSession) doProtocolControlMessage(stream *Stream) error { func (s *ClientSession) doProtocolControlMessage(stream *Stream) error {
if stream.msg.len() < 4 { if stream.msg.len() < 4 {
return ErrRTMP return ErrRTMP

@ -196,7 +196,7 @@ func (s *ServerSession) handshake() error {
return nil return nil
} }
func (s *ServerSession) doMsg(stream *Stream, c *ChunkComposer) error { func (s *ServerSession) doMsg(stream *Stream) error {
//log.Debugf("%d %d %v", stream.header.msgTypeID, stream.msgLen, stream.header) //log.Debugf("%d %d %v", stream.header.msgTypeID, stream.msgLen, stream.header)
switch stream.header.MsgTypeID { switch stream.header.MsgTypeID {
case base.RTMPTypeIDSetChunkSize: case base.RTMPTypeIDSetChunkSize:
@ -218,8 +218,6 @@ func (s *ServerSession) doMsg(stream *Stream, c *ChunkComposer) error {
return ErrRTMP return ErrRTMP
} }
s.avObserver.OnReadRTMPAVMsg(stream.toAVMsg()) s.avObserver.OnReadRTMPAVMsg(stream.toAVMsg())
case base.RTMPTypeIDAggregateMessage:
s.doAggregateMessage(stream, c)
default: default:
nazalog.Warnf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString()) nazalog.Warnf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString())
@ -327,49 +325,7 @@ func (s *ServerSession) doCommandAFM3Message(stream *Stream) error {
stream.msg.consumed(1) stream.msg.consumed(1)
return s.doCommandMessage(stream) return s.doCommandMessage(stream)
} }
func (s *ServerSession) doAggregateMessage(stream *Stream, c *ChunkComposer) error {
var aggreTagHeader *TagHeader = new(TagHeader)
aggregateStream := NewStream()
for {
if stream.msg.b < stream.msg.e {
aggregateStream.header.CSID = stream.header.CSID
aggregateStream.header.MsgStreamID = stream.header.MsgStreamID
aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11]
aggreTagHeader.Type = aggreTagHeaderBuf[0]
aggregateStream.header.MsgTypeID = aggreTagHeader.Type
if aggreTagHeader.Type == base.RTMPTypeIDAudio || aggreTagHeader.Type == base.RTMPTypeIDVideo {
aggreTagHeader.DataSize = bele.BEUint24(aggreTagHeaderBuf[1:])
aggreTagHeader.Timestamp = bele.BEUint24(aggreTagHeaderBuf[4:])
if aggreTagHeaderBuf[7] > 0 {
aggreTagHeader.Timestamp += uint32(aggreTagHeaderBuf[7] << 24)
}
aggreTagHeader.StreamID = bele.BEUint24(aggreTagHeaderBuf[8:])
aggregateStream.timestamp = aggreTagHeader.Timestamp
aggregateStream.header.TimestampAbs = aggreTagHeader.Timestamp
aggregateStream.header.MsgLen = aggreTagHeader.DataSize
aggregateStream.msg.buf = stream.msg.buf[stream.msg.b+11 : stream.msg.b+11+aggregateStream.header.MsgLen]
aggregateStream.msg.b = 0
aggregateStream.msg.e = aggregateStream.header.MsgLen
stream.msg.b = stream.msg.b + 11 + aggregateStream.header.MsgLen
preTagLen := bele.BEUint32(stream.msg.buf[stream.msg.b:])
if preTagLen != aggregateStream.header.MsgLen {
nazalog.Errorf("[%s] AggregateMessage preTagLen Error.", s.uniqueKey)
return ErrRTMP
}
stream.msg.b += 4
s.avObserver.OnReadRTMPAVMsg(aggregateStream.toAVMsg())
} else {
nazalog.Errorf("[%s] AggregateMessage audio/video TypeID Error.", s.uniqueKey)
return ErrRTMP
}
} else {
return nil
}
}
return nil
}
func (s *ServerSession) doConnect(tid int, stream *Stream) error { func (s *ServerSession) doConnect(tid int, stream *Stream) error {
val, err := stream.msg.readObjectWithType() val, err := stream.msg.readObjectWithType()
if err != nil { if err != nil {

@ -1,8 +0,0 @@
package rtmp
type TagHeader struct {
Type uint8 // type
DataSize uint32 // body大小不包含 header 和 prev tag size 字段 3byte
Timestamp uint32 // 绝对时间戳,单位毫秒 4byte
StreamID uint32 // always 0 3byte
}
Loading…
Cancel
Save