Merge pull request #51 from joestarzxh/master

[feat] rtmp支持aggregate message
pull/53/head
yoko 4 years ago committed by GitHub
commit d343836d08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -21,6 +21,7 @@ const (
RTMPTypeIDBandwidth uint8 = 6
RTMPTypeIDCommandMessageAMF3 uint8 = 17
RTMPTypeIDCommandMessageAMF0 uint8 = 20
RTMPTypeIDAggregateMessage uint8 = 22
// user control message type
RTMPUserControlStreamBegin uint8 = 0

@ -167,10 +167,52 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
}
absTsFlag = false
//nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, c=%p", fmt, csid, stream.header, c)
if err := cb(stream); err != nil {
return err
if stream.header.MsgTypeID == base.RTMPTypeIDAggregateMessage {
aggregateStream := NewStream()
var aggregateFirstTimestamp uint32 = 0
Aggregate:
for {
//aggregate里时间戳相对值
if stream.msg.b < stream.msg.e && stream.msg.e-stream.msg.b > 11 {
aggregateStream.header.CSID = stream.header.CSID
aggregateStream.header.MsgStreamID = stream.header.MsgStreamID
//十一个字节TagHeader
aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11]
stream.msg.b += 11
aggregateStream.header.MsgTypeID = aggreTagHeaderBuf[0]
aggregateStream.header.MsgLen = bele.BEUint24(aggreTagHeaderBuf[1:])
aggregateStream.timestamp = bele.BEUint24(aggreTagHeaderBuf[4:])
if aggreTagHeaderBuf[7] > 0 {
aggregateStream.timestamp += uint32(aggreTagHeaderBuf[7] << 24)
}
if stream.msg.b == 11 {
aggregateFirstTimestamp = aggregateStream.timestamp
}
aggregateStream.header.TimestampAbs = stream.header.TimestampAbs + aggregateStream.timestamp - aggregateFirstTimestamp
if stream.msg.e-stream.msg.b < aggregateStream.header.MsgLen+4 {
break Aggregate
}
aggregateStream.msg.buf = stream.msg.buf[stream.msg.b : stream.msg.b+aggregateStream.header.MsgLen]
aggregateStream.msg.b = 0
aggregateStream.msg.e = aggregateStream.header.MsgLen
stream.msg.b += aggregateStream.header.MsgLen
//4字节pre tag 长度
_ = bele.BEUint32(stream.msg.buf[stream.msg.b:]) //返回值忽略
stream.msg.b += 4
cb(aggregateStream)
} else {
break Aggregate
}
}
} else {
if err := cb(stream); err != nil {
return err
}
}
stream.msg.clear()
}
if stream.msg.len() > stream.header.MsgLen {

@ -478,7 +478,6 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
}
return nil
}
func (s *ClientSession) doProtocolControlMessage(stream *Stream) error {
if stream.msg.len() < 4 {
return ErrRTMP

@ -93,6 +93,7 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin
flashVer = "LNX 9,0,124,2"
}
objs = append(objs, ObjectPair{Key: "flashVer", Value: flashVer})
objs = append(objs, ObjectPair{Key: "fpad", Value: false})
objs = append(objs, ObjectPair{Key: "tcUrl", Value: tcURL})
_ = AMF0.WriteObject(packer.b, objs)
raw := packer.b.Bytes()

Loading…
Cancel
Save