diff --git a/pkg/base/rtmp_t.go b/pkg/base/rtmp_t.go index a0c4c52..593a509 100644 --- a/pkg/base/rtmp_t.go +++ b/pkg/base/rtmp_t.go @@ -21,6 +21,7 @@ const ( RTMPTypeIDBandwidth uint8 = 6 RTMPTypeIDCommandMessageAMF3 uint8 = 17 RTMPTypeIDCommandMessageAMF0 uint8 = 20 + RTMPTypeIDAggregateMessage uint8 = 22 // user control message type RTMPUserControlStreamBegin uint8 = 0 diff --git a/pkg/rtmp/chunk_composer.go b/pkg/rtmp/chunk_composer.go index 8cb75f8..3a00a7f 100644 --- a/pkg/rtmp/chunk_composer.go +++ b/pkg/rtmp/chunk_composer.go @@ -37,7 +37,7 @@ func (c *ChunkComposer) SetPeerChunkSize(val uint32) { c.peerChunkSize = val } -type OnCompleteMessage func(stream *Stream) error +type OnCompleteMessage func(stream *Stream, c *ChunkComposer) error // @param cb 回调结束后,内存块会被 ChunkComposer 再次使用 func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { @@ -168,7 +168,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { absTsFlag = false //nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, c=%p", fmt, csid, stream.header, c) - if err := cb(stream); err != nil { + if err := cb(stream, c); err != nil { return err } stream.msg.clear() diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index e5d679b..b8fd053 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -314,7 +314,7 @@ func (s *ClientSession) runReadLoop() { _ = s.chunkComposer.RunLoop(s.conn, s.doMsg) } -func (s *ClientSession) doMsg(stream *Stream) error { +func (s *ClientSession) doMsg(stream *Stream, c *ChunkComposer) error { switch stream.header.MsgTypeID { case base.RTMPTypeIDWinAckSize: fallthrough @@ -338,6 +338,8 @@ func (s *ClientSession) doMsg(stream *Stream) error { fallthrough case base.RTMPTypeIDVideo: s.onReadRTMPAVMsg(stream.toAVMsg()) + case base.RTMPTypeIDAggregateMessage: + s.doAggregateMessage(stream, c) default: nazalog.Errorf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString()) panic(0) @@ -478,7 +480,49 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error { } return nil } +func (s *ClientSession) doAggregateMessage(stream *Stream, c *ChunkComposer) error { + var aggreTagHeader *TagHeader = new(TagHeader) + aggregateStream := NewStream() + for { + if stream.msg.b < stream.msg.e { + aggregateStream.header.CSID = stream.header.CSID + aggregateStream.header.MsgStreamID = stream.header.MsgStreamID + aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11] + aggreTagHeader.Type = aggreTagHeaderBuf[0] + aggregateStream.header.MsgTypeID = aggreTagHeader.Type + if aggreTagHeader.Type == base.RTMPTypeIDAudio || aggreTagHeader.Type == base.RTMPTypeIDVideo { + aggreTagHeader.DataSize = bele.BEUint24(aggreTagHeaderBuf[1:]) + aggreTagHeader.Timestamp = bele.BEUint24(aggreTagHeaderBuf[4:]) + if aggreTagHeaderBuf[7] > 0 { + aggreTagHeader.Timestamp += uint32(aggreTagHeaderBuf[7] << 24) + } + aggreTagHeader.StreamID = bele.BEUint24(aggreTagHeaderBuf[8:]) + aggregateStream.timestamp = aggreTagHeader.Timestamp + aggregateStream.header.TimestampAbs = aggreTagHeader.Timestamp + aggregateStream.header.MsgLen = aggreTagHeader.DataSize + + aggregateStream.msg.buf = stream.msg.buf[stream.msg.b+11 : stream.msg.b+11+aggregateStream.header.MsgLen] + aggregateStream.msg.b = 0 + aggregateStream.msg.e = aggregateStream.header.MsgLen + stream.msg.b = stream.msg.b + 11 + aggregateStream.header.MsgLen + preTagLen := bele.BEUint32(stream.msg.buf[stream.msg.b:]) + if preTagLen != aggregateStream.header.MsgLen { + nazalog.Errorf("[%s] AggregateMessage preTagLen Error.", s.uniqueKey) + return ErrRTMP + } + stream.msg.b += 4 + s.onReadRTMPAVMsg(aggregateStream.toAVMsg()) + } else { + nazalog.Errorf("[%s] AggregateMessage audio/video TypeID Error.", s.uniqueKey) + return ErrRTMP + } + } else { + return nil + } + } + return nil +} func (s *ClientSession) doProtocolControlMessage(stream *Stream) error { if stream.msg.len() < 4 { return ErrRTMP diff --git a/pkg/rtmp/message_packer.go b/pkg/rtmp/message_packer.go index 23d7f08..edcf962 100644 --- a/pkg/rtmp/message_packer.go +++ b/pkg/rtmp/message_packer.go @@ -85,7 +85,7 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin var objs []ObjectPair objs = append(objs, ObjectPair{Key: "app", Value: appName}) - objs = append(objs, ObjectPair{Key: "type", Value: "nonprivate"}) + //objs = append(objs, ObjectPair{Key: "type", Value: "nonprivate"}) var flashVer string if isPush { flashVer = fmt.Sprintf("FMLE/3.0 (compatible; %s)", base.LALRTMPPushSessionConnectVersion) @@ -93,6 +93,7 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin flashVer = "LNX 9,0,124,2" } objs = append(objs, ObjectPair{Key: "flashVer", Value: flashVer}) + objs = append(objs, ObjectPair{Key: "fpad", Value: false}) objs = append(objs, ObjectPair{Key: "tcUrl", Value: tcURL}) _ = AMF0.WriteObject(packer.b, objs) raw := packer.b.Bytes() diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index d0726c3..c1f5f2b 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -196,7 +196,7 @@ func (s *ServerSession) handshake() error { return nil } -func (s *ServerSession) doMsg(stream *Stream) error { +func (s *ServerSession) doMsg(stream *Stream, c *ChunkComposer) error { //log.Debugf("%d %d %v", stream.header.msgTypeID, stream.msgLen, stream.header) switch stream.header.MsgTypeID { case base.RTMPTypeIDSetChunkSize: @@ -218,6 +218,8 @@ func (s *ServerSession) doMsg(stream *Stream) error { return ErrRTMP } s.avObserver.OnReadRTMPAVMsg(stream.toAVMsg()) + case base.RTMPTypeIDAggregateMessage: + s.doAggregateMessage(stream, c) default: nazalog.Warnf("[%s] read unknown message. typeid=%d, %s", s.uniqueKey, stream.header.MsgTypeID, stream.toDebugString()) @@ -325,7 +327,49 @@ func (s *ServerSession) doCommandAFM3Message(stream *Stream) error { stream.msg.consumed(1) return s.doCommandMessage(stream) } +func (s *ServerSession) doAggregateMessage(stream *Stream, c *ChunkComposer) error { + var aggreTagHeader *TagHeader = new(TagHeader) + aggregateStream := NewStream() + for { + if stream.msg.b < stream.msg.e { + aggregateStream.header.CSID = stream.header.CSID + aggregateStream.header.MsgStreamID = stream.header.MsgStreamID + aggreTagHeaderBuf := stream.msg.buf[stream.msg.b : stream.msg.b+11] + aggreTagHeader.Type = aggreTagHeaderBuf[0] + aggregateStream.header.MsgTypeID = aggreTagHeader.Type + if aggreTagHeader.Type == base.RTMPTypeIDAudio || aggreTagHeader.Type == base.RTMPTypeIDVideo { + aggreTagHeader.DataSize = bele.BEUint24(aggreTagHeaderBuf[1:]) + aggreTagHeader.Timestamp = bele.BEUint24(aggreTagHeaderBuf[4:]) + if aggreTagHeaderBuf[7] > 0 { + aggreTagHeader.Timestamp += uint32(aggreTagHeaderBuf[7] << 24) + } + aggreTagHeader.StreamID = bele.BEUint24(aggreTagHeaderBuf[8:]) + aggregateStream.timestamp = aggreTagHeader.Timestamp + aggregateStream.header.TimestampAbs = aggreTagHeader.Timestamp + aggregateStream.header.MsgLen = aggreTagHeader.DataSize + + aggregateStream.msg.buf = stream.msg.buf[stream.msg.b+11 : stream.msg.b+11+aggregateStream.header.MsgLen] + aggregateStream.msg.b = 0 + aggregateStream.msg.e = aggregateStream.header.MsgLen + stream.msg.b = stream.msg.b + 11 + aggregateStream.header.MsgLen + preTagLen := bele.BEUint32(stream.msg.buf[stream.msg.b:]) + if preTagLen != aggregateStream.header.MsgLen { + nazalog.Errorf("[%s] AggregateMessage preTagLen Error.", s.uniqueKey) + return ErrRTMP + } + stream.msg.b += 4 + s.avObserver.OnReadRTMPAVMsg(aggregateStream.toAVMsg()) + } else { + nazalog.Errorf("[%s] AggregateMessage audio/video TypeID Error.", s.uniqueKey) + return ErrRTMP + } + } else { + return nil + } + } + return nil +} func (s *ServerSession) doConnect(tid int, stream *Stream) error { val, err := stream.msg.readObjectWithType() if err != nil {