[opt] rtmp: PullSession支持配置是否复用接收message时的内存块

pull/197/head
q191201771 3 years ago
parent b8a551ad2c
commit f4a0e16d7f

@ -36,6 +36,7 @@ func main() {
%s -i rtmp://127.0.0.1/live/test110 -o rtmp://127.0.0.1/live/test220
%s -i rtmp://127.0.0.1/live/test110 -o rtmp://127.0.0.1/live/test220,rtmp://127.0.0.1/live/test330
`, os.Args[0], os.Args[0])
base.OsExitAndWaitPressIfWindows(1)
}
ol := strings.Split(*o, ",")

@ -214,6 +214,7 @@ func (t *Tunnel) Start() (ret ErrorCode) {
t.pullSession = rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = pullTimeoutMs
//option.ReuseReadMessageBufferFlag = false
}).WithOnReadRtmpAvMsg(func(msg base.RtmpMsg) {
m := msg.Clone()
t.rtmpMsgQ <- m

@ -25,8 +25,10 @@ import (
// 读取chunk并合并chunk生成message返回给上层
//
type ChunkComposer struct {
peerChunkSize uint32
csid2stream map[int]*Stream
peerChunkSize uint32
reuseBufferFlag bool // TODO(chef): [fix] RtmpTypeIdAggregateMessage时reuseBufferFlag==false的处理 202206
csid2stream map[int]*Stream
}
func NewChunkComposer() *ChunkComposer {
@ -36,6 +38,10 @@ func NewChunkComposer() *ChunkComposer {
}
}
func (c *ChunkComposer) SetReuseBufferFlag(val bool) {
c.reuseBufferFlag = val
}
func (c *ChunkComposer) SetPeerChunkSize(val uint32) {
c.peerChunkSize = val
}
@ -254,7 +260,11 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
return err
}
stream.msg.Reset()
if c.reuseBufferFlag {
stream.msg.Reset()
} else {
stream.msg.ResetAndFree()
}
}
}

@ -19,22 +19,27 @@ type PullSession struct {
}
type PullSessionOption struct {
// PullTimeoutMs
//
// 从调用Pull函数到接收音视频数据的前一步也即收到服务端返回的rtmp play对应结果的信令的超时时间
// 如果为0则没有超时时间
//
PullTimeoutMs int
ReadAvTimeoutMs int
ReadBufSize int // io层读取音视频数据时的缓冲大小如果为0则没有缓冲
HandshakeComplexFlag bool
PeerWinAckSize int
ReadAvTimeoutMs int
ReadBufSize int // io层读取音视频数据时的缓冲大小如果为0则没有缓冲
HandshakeComplexFlag bool
PeerWinAckSize int
ReuseReadMessageBufferFlag bool // 接收Message时是否复用内存块
}
var defaultPullSessionOption = PullSessionOption{
PullTimeoutMs: 10000,
ReadAvTimeoutMs: 0,
ReadBufSize: 0,
HandshakeComplexFlag: false,
PeerWinAckSize: 0,
PullTimeoutMs: 10000,
ReadAvTimeoutMs: 0,
ReadBufSize: 0,
HandshakeComplexFlag: false,
PeerWinAckSize: 0,
ReuseReadMessageBufferFlag: true,
}
type ModPullSessionOption func(option *PullSessionOption)
@ -52,6 +57,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
option.ReadBufSize = opt.ReadBufSize
option.HandshakeComplexFlag = opt.HandshakeComplexFlag
option.PeerWinAckSize = opt.PeerWinAckSize
option.ReuseReadMessageBufferFlag = opt.ReuseReadMessageBufferFlag
}),
}
}

@ -77,17 +77,20 @@ type ClientSessionOption struct {
HandshakeComplexFlag bool // 握手是否使用复杂模式
PeerWinAckSize int
ReuseReadMessageBufferFlag bool // 接收Message时是否重用内存块
}
var defaultClientSessOption = ClientSessionOption{
DoTimeoutMs: 10000,
ReadAvTimeoutMs: 0,
WriteAvTimeoutMs: 0,
ReadBufSize: 0,
WriteBufSize: 0,
WriteChanSize: 0,
HandshakeComplexFlag: false,
PeerWinAckSize: 0,
DoTimeoutMs: 10000,
ReadAvTimeoutMs: 0,
WriteAvTimeoutMs: 0,
ReadBufSize: 0,
WriteBufSize: 0,
WriteChanSize: 0,
HandshakeComplexFlag: false,
PeerWinAckSize: 0,
ReuseReadMessageBufferFlag: true,
}
type ModClientSessionOption func(option *ClientSessionOption)

@ -54,6 +54,7 @@ func (stream *Stream) toAvMsg() base.RtmpMsg {
// ----- StreamMsg -----------------------------------------------------------------------------------------------------
type StreamMsg struct {
// TODO(chef): [refactor] 考虑外部(chunk_composer)不要直接访问buff封装一层 202206
buff *nazabytes.Buffer
}
@ -78,6 +79,10 @@ func (msg *StreamMsg) Reset() {
msg.buff.Reset()
}
func (msg *StreamMsg) ResetAndFree() {
msg.buff.ResetAndFree()
}
func (msg *StreamMsg) peekStringWithType() (string, error) {
str, _, err := Amf0.ReadString(msg.buff.Bytes())
return str, err

Loading…
Cancel
Save