[feat]rtmp 支持Aggregate Message

pull/51/head
joestarzxh 4 years ago
parent 6c4d7767ee
commit 55a690a989

@ -21,6 +21,7 @@ const (
RTMPTypeIDBandwidth uint8 = 6
RTMPTypeIDCommandMessageAMF3 uint8 = 17
RTMPTypeIDCommandMessageAMF0 uint8 = 20
RTMPTypeIDAggregateMessage uint8 = 22
// user control message type
RTMPUserControlStreamBegin uint8 = 0

@ -37,7 +37,7 @@ func (c *ChunkComposer) SetPeerChunkSize(val uint32) {
c.peerChunkSize = val
}
type OnCompleteMessage func(stream *Stream) error
type OnCompleteMessage func(stream *Stream, c *ChunkComposer) error
// @param cb 回调结束后,内存块会被 ChunkComposer 再次使用
func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
@ -168,7 +168,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
absTsFlag = false
//nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, c=%p", fmt, csid, stream.header, c)
if err := cb(stream); err != nil {
if err := cb(stream, c); err != nil {
return err
}
stream.msg.clear()

@ -314,7 +314,7 @@ func (s *ClientSession) runReadLoop() {
_ = s.chunkComposer.RunLoop(s.conn, s.doMsg)
}
func (s *ClientSession) doMsg(stream *Stream) error {
func (s *ClientSession) doMsg(stream *Stream, c *ChunkComposer) error {
switch stream.header.MsgTypeID {
case base.RTMPTypeIDWinAckSize:
fallthrough
@ -338,6 +338,8 @@ func (s *ClientSession) doMsg(stream *Stream) error {
fallthrough
case base.RTMPTypeIDVideo:
s.onReadRTMPAVMsg(stream.toAVMsg())
case base.RTMPTypeIDAggregateMessage:
s.doAggregateMessage(stream, c)
default:
nazalog.Errorf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString())
panic(0)
@ -478,7 +480,49 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
}
return nil
}
func (s *ClientSession) doAggregateMessage(stream *Stream, c *ChunkComposer) error {
var aggreTagHeader *TagHeader = new(TagHeader)
aggregateStream := NewStream()
for {
if stream.msg.b < stream.msg.e {
aggregateStream.header.CSID = stream.header.CSID
aggregateStream.header.MsgStreamID = stream.header.MsgStreamID
aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11]
aggreTagHeader.Type = aggreTagHeaderBuf[0]
aggregateStream.header.MsgTypeID = aggreTagHeader.Type
if aggreTagHeader.Type == base.RTMPTypeIDAudio || aggreTagHeader.Type == base.RTMPTypeIDVideo {
aggreTagHeader.DataSize = bele.BEUint24(aggreTagHeaderBuf[1:])
aggreTagHeader.Timestamp = bele.BEUint24(aggreTagHeaderBuf[4:])
if aggreTagHeaderBuf[7] > 0 {
aggreTagHeader.Timestamp += uint32(aggreTagHeaderBuf[7] << 24)
}
aggreTagHeader.StreamID = bele.BEUint24(aggreTagHeaderBuf[8:])
aggregateStream.timestamp = aggreTagHeader.Timestamp
aggregateStream.header.TimestampAbs = aggreTagHeader.Timestamp
aggregateStream.header.MsgLen = aggreTagHeader.DataSize
aggregateStream.msg.buf = stream.msg.buf[stream.msg.b+11 : stream.msg.b+11+aggregateStream.header.MsgLen]
aggregateStream.msg.b = 0
aggregateStream.msg.e = aggregateStream.header.MsgLen
stream.msg.b = stream.msg.b + 11 + aggregateStream.header.MsgLen
preTagLen := bele.BEUint32(stream.msg.buf[stream.msg.b:])
if preTagLen != aggregateStream.header.MsgLen {
nazalog.Errorf("[%s] AggregateMessage preTagLen Error.", s.uniqueKey)
return ErrRTMP
}
stream.msg.b += 4
s.onReadRTMPAVMsg(aggregateStream.toAVMsg())
} else {
nazalog.Errorf("[%s] AggregateMessage audio/video TypeID Error.", s.uniqueKey)
return ErrRTMP
}
} else {
return nil
}
}
return nil
}
func (s *ClientSession) doProtocolControlMessage(stream *Stream) error {
if stream.msg.len() < 4 {
return ErrRTMP

@ -85,7 +85,7 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin
var objs []ObjectPair
objs = append(objs, ObjectPair{Key: "app", Value: appName})
objs = append(objs, ObjectPair{Key: "type", Value: "nonprivate"})
//objs = append(objs, ObjectPair{Key: "type", Value: "nonprivate"})
var flashVer string
if isPush {
flashVer = fmt.Sprintf("FMLE/3.0 (compatible; %s)", base.LALRTMPPushSessionConnectVersion)
@ -93,6 +93,7 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin
flashVer = "LNX 9,0,124,2"
}
objs = append(objs, ObjectPair{Key: "flashVer", Value: flashVer})
objs = append(objs, ObjectPair{Key: "fpad", Value: false})
objs = append(objs, ObjectPair{Key: "tcUrl", Value: tcURL})
_ = AMF0.WriteObject(packer.b, objs)
raw := packer.b.Bytes()

@ -196,7 +196,7 @@ func (s *ServerSession) handshake() error {
return nil
}
func (s *ServerSession) doMsg(stream *Stream) error {
func (s *ServerSession) doMsg(stream *Stream, c *ChunkComposer) error {
//log.Debugf("%d %d %v", stream.header.msgTypeID, stream.msgLen, stream.header)
switch stream.header.MsgTypeID {
case base.RTMPTypeIDSetChunkSize:
@ -218,6 +218,8 @@ func (s *ServerSession) doMsg(stream *Stream) error {
return ErrRTMP
}
s.avObserver.OnReadRTMPAVMsg(stream.toAVMsg())
case base.RTMPTypeIDAggregateMessage:
s.doAggregateMessage(stream, c)
default:
nazalog.Warnf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString())
@ -325,7 +327,49 @@ func (s *ServerSession) doCommandAFM3Message(stream *Stream) error {
stream.msg.consumed(1)
return s.doCommandMessage(stream)
}
func (s *ServerSession) doAggregateMessage(stream *Stream, c *ChunkComposer) error {
var aggreTagHeader *TagHeader = new(TagHeader)
aggregateStream := NewStream()
for {
if stream.msg.b < stream.msg.e {
aggregateStream.header.CSID = stream.header.CSID
aggregateStream.header.MsgStreamID = stream.header.MsgStreamID
aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11]
aggreTagHeader.Type = aggreTagHeaderBuf[0]
aggregateStream.header.MsgTypeID = aggreTagHeader.Type
if aggreTagHeader.Type == base.RTMPTypeIDAudio || aggreTagHeader.Type == base.RTMPTypeIDVideo {
aggreTagHeader.DataSize = bele.BEUint24(aggreTagHeaderBuf[1:])
aggreTagHeader.Timestamp = bele.BEUint24(aggreTagHeaderBuf[4:])
if aggreTagHeaderBuf[7] > 0 {
aggreTagHeader.Timestamp += uint32(aggreTagHeaderBuf[7] << 24)
}
aggreTagHeader.StreamID = bele.BEUint24(aggreTagHeaderBuf[8:])
aggregateStream.timestamp = aggreTagHeader.Timestamp
aggregateStream.header.TimestampAbs = aggreTagHeader.Timestamp
aggregateStream.header.MsgLen = aggreTagHeader.DataSize
aggregateStream.msg.buf = stream.msg.buf[stream.msg.b+11 : stream.msg.b+11+aggregateStream.header.MsgLen]
aggregateStream.msg.b = 0
aggregateStream.msg.e = aggregateStream.header.MsgLen
stream.msg.b = stream.msg.b + 11 + aggregateStream.header.MsgLen
preTagLen := bele.BEUint32(stream.msg.buf[stream.msg.b:])
if preTagLen != aggregateStream.header.MsgLen {
nazalog.Errorf("[%s] AggregateMessage preTagLen Error.", s.uniqueKey)
return ErrRTMP
}
stream.msg.b += 4
s.avObserver.OnReadRTMPAVMsg(aggregateStream.toAVMsg())
} else {
nazalog.Errorf("[%s] AggregateMessage audio/video TypeID Error.", s.uniqueKey)
return ErrRTMP
}
} else {
return nil
}
}
return nil
}
func (s *ServerSession) doConnect(tid int, stream *Stream) error {
val, err := stream.msg.readObjectWithType()
if err != nil {

Loading…
Cancel
Save