pull/53/head
q191201771
commit 736eb0c1b4

@ -21,6 +21,7 @@ const (
RTMPTypeIDBandwidth uint8 = 6
RTMPTypeIDCommandMessageAMF3 uint8 = 17
RTMPTypeIDCommandMessageAMF0 uint8 = 20
RTMPTypeIDAggregateMessage uint8 = 22
// user control message type
RTMPUserControlStreamBegin uint8 = 0

@ -41,6 +41,7 @@ type OnCompleteMessage func(stream *Stream) error
// @param cb 回调结束后,内存块会被 ChunkComposer 再次使用
func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
var aggregateStream *Stream
bootstrap := make([]byte, 11)
absTsFlag := false
@ -166,12 +167,69 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
stream.header.TimestampAbs += stream.timestamp
}
absTsFlag = false
//nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, c=%p", fmt, csid, stream.header, c)
//nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, ctimestamp=%d, c=%p",
// fmt, csid, stream.header, stream.timestamp, c)
if err := cb(stream); err != nil {
return err
if stream.header.MsgTypeID == base.RTMPTypeIDAggregateMessage {
firstSubMessage := true
baseTimestamp := uint32(0)
// 懒初始化
if aggregateStream == nil {
aggregateStream = NewStream()
}
aggregateStream.header.CSID = stream.header.CSID
for stream.msg.len() != 0 {
// 读取sub message的头
if stream.msg.len() < 11 {
return ErrRTMP
}
aggregateStream.header.MsgTypeID = stream.msg.buf[stream.msg.b]
stream.msg.consumed(1)
aggregateStream.header.MsgLen = bele.BEUint24(stream.msg.buf[stream.msg.b:])
stream.msg.consumed(3)
aggregateStream.timestamp = bele.BEUint24(stream.msg.buf[stream.msg.b:])
stream.msg.consumed(3)
aggregateStream.timestamp += uint32(stream.msg.buf[stream.msg.b]) << 24
stream.msg.consumed(1)
aggregateStream.header.MsgStreamID = int(bele.BEUint24(stream.msg.buf[stream.msg.b:]))
stream.msg.consumed(3)
// 计算时间戳
if firstSubMessage {
baseTimestamp = aggregateStream.timestamp
firstSubMessage = false
}
aggregateStream.header.TimestampAbs = stream.header.TimestampAbs + aggregateStream.timestamp - baseTimestamp
// message包体
if stream.msg.len() < aggregateStream.header.MsgLen {
return ErrRTMP
}
aggregateStream.msg.buf = stream.msg.buf[stream.msg.b : stream.msg.b+aggregateStream.header.MsgLen]
//aggregateStream.msg.b = 0
aggregateStream.msg.e = aggregateStream.header.MsgLen
stream.msg.consumed(aggregateStream.header.MsgLen)
// sub message回调给上层
if err := cb(aggregateStream); err != nil {
return err
}
// 跳过prev size字段
if stream.msg.len() < 4 {
return ErrRTMP
}
stream.msg.consumed(4)
}
} else {
if err := cb(stream); err != nil {
return err
}
stream.msg.clear()
}
stream.msg.clear()
}
if stream.msg.len() > stream.header.MsgLen {
log.Panicf("stream msg len should not greater than len field in header. stream.msg.len=%d, len.in.header=%d", stream.msg.len(), stream.header.MsgLen)

@ -478,7 +478,6 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
}
return nil
}
func (s *ClientSession) doProtocolControlMessage(stream *Stream) error {
if stream.msg.len() < 4 {
return ErrRTMP

@ -93,6 +93,8 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin
flashVer = "LNX 9,0,124,2"
}
objs = append(objs, ObjectPair{Key: "flashVer", Value: flashVer})
// fpad True if proxy is being used.
objs = append(objs, ObjectPair{Key: "fpad", Value: false})
objs = append(objs, ObjectPair{Key: "tcUrl", Value: tcURL})
_ = AMF0.WriteObject(packer.b, objs)
raw := packer.b.Bytes()

Loading…
Cancel
Save