diff --git a/app/demo/pullrtmp2pushrtmp/main.go b/app/demo/pullrtmp2pushrtmp/main.go index b507f98..0983d71 100644 --- a/app/demo/pullrtmp2pushrtmp/main.go +++ b/app/demo/pullrtmp2pushrtmp/main.go @@ -36,6 +36,7 @@ func main() { %s -i rtmp://127.0.0.1/live/test110 -o rtmp://127.0.0.1/live/test220 %s -i rtmp://127.0.0.1/live/test110 -o rtmp://127.0.0.1/live/test220,rtmp://127.0.0.1/live/test330 `, os.Args[0], os.Args[0]) + base.OsExitAndWaitPressIfWindows(1) } ol := strings.Split(*o, ",") diff --git a/app/demo/pullrtmp2pushrtmp/tunnel.go b/app/demo/pullrtmp2pushrtmp/tunnel.go index 2551a3e..1acc289 100644 --- a/app/demo/pullrtmp2pushrtmp/tunnel.go +++ b/app/demo/pullrtmp2pushrtmp/tunnel.go @@ -214,6 +214,7 @@ func (t *Tunnel) Start() (ret ErrorCode) { t.pullSession = rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { option.PullTimeoutMs = pullTimeoutMs + //option.ReuseReadMessageBufferFlag = false }).WithOnReadRtmpAvMsg(func(msg base.RtmpMsg) { m := msg.Clone() t.rtmpMsgQ <- m diff --git a/pkg/rtmp/chunk_composer.go b/pkg/rtmp/chunk_composer.go index c579e91..aeedd67 100644 --- a/pkg/rtmp/chunk_composer.go +++ b/pkg/rtmp/chunk_composer.go @@ -25,8 +25,10 @@ import ( // 读取chunk,并合并chunk,生成message返回给上层 // type ChunkComposer struct { - peerChunkSize uint32 - csid2stream map[int]*Stream + peerChunkSize uint32 + reuseBufferFlag bool // TODO(chef): [fix] RtmpTypeIdAggregateMessage时,reuseBufferFlag==false的处理 202206 + + csid2stream map[int]*Stream } func NewChunkComposer() *ChunkComposer { @@ -36,6 +38,10 @@ func NewChunkComposer() *ChunkComposer { } } +func (c *ChunkComposer) SetReuseBufferFlag(val bool) { + c.reuseBufferFlag = val +} + func (c *ChunkComposer) SetPeerChunkSize(val uint32) { c.peerChunkSize = val } @@ -254,7 +260,11 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { return err } - stream.msg.Reset() + if c.reuseBufferFlag { + stream.msg.Reset() + } else { + stream.msg.ResetAndFree() + } } } diff --git a/pkg/rtmp/client_pull_session.go b/pkg/rtmp/client_pull_session.go index e1a5efb..3f8d734 100644 --- a/pkg/rtmp/client_pull_session.go +++ b/pkg/rtmp/client_pull_session.go @@ -19,22 +19,27 @@ type PullSession struct { } type PullSessionOption struct { + // PullTimeoutMs + // // 从调用Pull函数,到接收音视频数据的前一步,也即收到服务端返回的rtmp play对应结果的信令的超时时间 // 如果为0,则没有超时时间 + // PullTimeoutMs int - ReadAvTimeoutMs int - ReadBufSize int // io层读取音视频数据时的缓冲大小,如果为0,则没有缓冲 - HandshakeComplexFlag bool - PeerWinAckSize int + ReadAvTimeoutMs int + ReadBufSize int // io层读取音视频数据时的缓冲大小,如果为0,则没有缓冲 + HandshakeComplexFlag bool + PeerWinAckSize int + ReuseReadMessageBufferFlag bool // 接收Message时,是否复用内存块 } var defaultPullSessionOption = PullSessionOption{ - PullTimeoutMs: 10000, - ReadAvTimeoutMs: 0, - ReadBufSize: 0, - HandshakeComplexFlag: false, - PeerWinAckSize: 0, + PullTimeoutMs: 10000, + ReadAvTimeoutMs: 0, + ReadBufSize: 0, + HandshakeComplexFlag: false, + PeerWinAckSize: 0, + ReuseReadMessageBufferFlag: true, } type ModPullSessionOption func(option *PullSessionOption) @@ -52,6 +57,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession { option.ReadBufSize = opt.ReadBufSize option.HandshakeComplexFlag = opt.HandshakeComplexFlag option.PeerWinAckSize = opt.PeerWinAckSize + option.ReuseReadMessageBufferFlag = opt.ReuseReadMessageBufferFlag }), } } diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index e18b2c0..c9b370d 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -77,17 +77,20 @@ type ClientSessionOption struct { HandshakeComplexFlag bool // 握手是否使用复杂模式 PeerWinAckSize int + + ReuseReadMessageBufferFlag bool // 接收Message时,是否重用内存块 } var defaultClientSessOption = ClientSessionOption{ - DoTimeoutMs: 10000, - ReadAvTimeoutMs: 0, - WriteAvTimeoutMs: 0, - ReadBufSize: 0, - WriteBufSize: 0, - WriteChanSize: 0, - HandshakeComplexFlag: false, - PeerWinAckSize: 0, + DoTimeoutMs: 10000, + ReadAvTimeoutMs: 0, + WriteAvTimeoutMs: 0, + ReadBufSize: 0, + WriteBufSize: 0, + WriteChanSize: 0, + HandshakeComplexFlag: false, + PeerWinAckSize: 0, + ReuseReadMessageBufferFlag: true, } type ModClientSessionOption func(option *ClientSessionOption) diff --git a/pkg/rtmp/stream.go b/pkg/rtmp/stream.go index 6faea61..ce595a2 100644 --- a/pkg/rtmp/stream.go +++ b/pkg/rtmp/stream.go @@ -54,6 +54,7 @@ func (stream *Stream) toAvMsg() base.RtmpMsg { // ----- StreamMsg ----------------------------------------------------------------------------------------------------- type StreamMsg struct { + // TODO(chef): [refactor] 考虑外部(chunk_composer)不要直接访问buff,封装一层 202206 buff *nazabytes.Buffer } @@ -78,6 +79,10 @@ func (msg *StreamMsg) Reset() { msg.buff.Reset() } +func (msg *StreamMsg) ResetAndFree() { + msg.buff.ResetAndFree() +} + func (msg *StreamMsg) peekStringWithType() (string, error) { str, _, err := Amf0.ReadString(msg.buff.Bytes()) return str, err