From a41dc6d8eebe7f67054e27ba8110a13d9f57e8b6 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sun, 23 May 2021 00:45:54 +0800 Subject: [PATCH] messages: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. #86 [fix] gop缓存设置为0时,可能花屏 2. #84 [fix] rtmp merge write开启时,可能给新加入的sub session发送错误的数据 --- conf/ConfigBrief.md | 7 ++-- pkg/base/bufwriter.go | 12 ++---- pkg/httpflv/server_sub_session.go | 10 +++-- pkg/logic/gop_cache.go | 62 ++++++++++++++++++++++++++---- pkg/logic/group.go | 63 +++++++++++++++++++++++++++---- pkg/rtmp/server_session.go | 16 ++++---- 6 files changed, 134 insertions(+), 36 deletions(-) diff --git a/conf/ConfigBrief.md b/conf/ConfigBrief.md index 36bccda..cf80403 100644 --- a/conf/ConfigBrief.md +++ b/conf/ConfigBrief.md @@ -4,9 +4,10 @@ "conf_version": "0.2.2", //. 配置文件版本号,业务方不应该手动修改,程序中会检查该版本 // 号是否与代码中声明的一致 "rtmp": { - "enable": true, //. 是否开启rtmp服务的监听 - "addr": ":19350", //. RTMP服务监听的端口,客户端向lalserver推拉流都是这个地址 - "gop_num": 2 //. RTMP拉流的GOP缓存数量,加速秒开 + "enable": true, //. 是否开启rtmp服务的监听 + "addr": ":19350", //. RTMP服务监听的端口,客户端向lalserver推拉流都是这个地址 + "gop_num": 2, //. RTMP拉流的GOP缓存数量,加速流打开时间,但是可能增加延时 + "merge_write_size": 8192 //. 将小包数据合并进行发送,单位字节,提高服务器性能,但是可能造成卡顿 }, "default_http": { //. http监听相关的默认配置,如果hls, httpflv, httpts中没有单独配置以下配置项, // 则使用default_http中的配置 diff --git a/pkg/base/bufwriter.go b/pkg/base/bufwriter.go index 178f52f..5c8470d 100644 --- a/pkg/base/bufwriter.go +++ b/pkg/base/bufwriter.go @@ -85,7 +85,10 @@ func (b *bufWriter) Write(p []byte) { } func (b *bufWriter) Flush() { - b.flush() + if b.n == 0 { + return + } + b.wr(b.buf[:b.n]) b.mallocOnePiece(b.defaultSize) } @@ -104,13 +107,6 @@ func (b *bufWriter) append(p []byte) { b.n += len(p) } -func (b *bufWriter) flush() { - if b.n == 0 { - return - } - b.wr(b.buf[:b.n]) -} - func (dw *directWriter) Write(p []byte) { dw.wr(p) } diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 6c123e9..3bf0f73 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -21,14 +21,15 @@ import ( var flvHTTPResponseHeader []byte type SubSession struct { - *base.HTTPSubSession // 直接使用它提供的函数 - IsFresh bool + *base.HTTPSubSession // 直接使用它提供的函数 + IsFresh bool + ShouldWaitVideoKeyFrame bool } func NewSubSession(conn net.Conn, urlCtx base.URLContext, isWebSocket bool, websocketKey string) *SubSession { uk := base.GenUKFLVSubSession() s := &SubSession{ - base.NewHTTPSubSession(base.HTTPSubSessionOption{ + HTTPSubSession: base.NewHTTPSubSession(base.HTTPSubSessionOption{ Conn: conn, ConnModOption: func(option *connection.Option) { option.WriteChanSize = SubSessionWriteChanSize @@ -40,7 +41,8 @@ func NewSubSession(conn net.Conn, urlCtx base.URLContext, isWebSocket bool, webs IsWebSocket: isWebSocket, WebSocketKey: websocketKey, }), - true, + IsFresh: true, + ShouldWaitVideoKeyFrame: true, } nazalog.Infof("[%s] lifecycle new httpflv SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String()) return s diff --git a/pkg/logic/gop_cache.go b/pkg/logic/gop_cache.go index 24adcbd..1b0c133 100644 --- a/pkg/logic/gop_cache.go +++ b/pkg/logic/gop_cache.go @@ -54,18 +54,63 @@ func (l *LazyRTMPMsg2FLVTag) Get() []byte { return l.tag } +// --------------------------------------------------------------------------------------------------------------------- + +// 提供两个功能: +// 1. 缓存Metadata, VideoSeqHeader, AACSeqHeader +// 2. 缓存音视频GOP数据 +// +// 以下,只讨论GOPCache的第2点功能 +// +// 音频和视频都会缓存 +// +// GOPCache也可能不缓存GOP数据,见NewGOPCache函数的gopNum参数说明 +// +// 以下,我们只讨论gopNum > 0(也即gopSize > 1)的情况 +// +// GOPCache为空时,只有输入了关键帧,才能开启GOP缓存,非关键帧以及音频数据不会被缓存 +// 因此,单音频的流是ok的,相当于不缓存任何数据 +// +// GOPCache不为空时,输入关键帧触发生成新的GOP元素,其他情况则往最后一个GOP元素一直追加 +// +// first用于读取第一个GOP(可能不完整),last的前一个用于写入当前GOP +// +// 最近不完整的GOP也会被缓存,见NewGOPCache函数的gopNum参数说明 +// +// ----- +// gopNum = 1 +// gopSize = 2 +// +// first | first | first | 在后面两个状态间转换,就不画了 +// | | | | | | +// 0 1 | 0 1 | 0 1 | +// * * | * * | * * | +// | | | | | | +// last | last | last | +// | | | +// (empty) | (full) | (full) | +// GetGOPCount: 0 | 1 | 1 | +// ----- +// +// type GOPCache struct { - t string - uniqueKey string + t string + uniqueKey string + Metadata []byte VideoSeqHeader []byte AACSeqHeader []byte - gopRing []GOP - gopRingFirst int - gopRingLast int - gopSize int + + gopRing []GOP + gopRingFirst int + gopRingLast int + gopSize int } +// @param gopNum: gop缓存大小 +// 如果为0,则不缓存音频数据,也即GOP缓存功能不生效 +// 如果>0,则缓存个完整GOP,另外还可能有半个最近不完整的GOP +// func NewGOPCache(t string, uniqueKey string, gopNum int) *GOPCache { return &GOPCache{ t: t, @@ -99,7 +144,6 @@ func (gc *GOPCache) Feed(msg base.RTMPMsg, lg LazyGet) { } } - // 这个size的判断去掉也行 if gc.gopSize > 1 { if msg.IsVideoKeyNALU() { gc.feedNewGOP(msg, lg()) @@ -109,6 +153,7 @@ func (gc *GOPCache) Feed(msg base.RTMPMsg, lg LazyGet) { } } +// 获取GOP数量,注意,最后一个可能是不完整的 func (gc *GOPCache) GetGOPCount() int { return (gc.gopRingLast + gc.gopSize - gc.gopRingFirst) % gc.gopSize } @@ -128,12 +173,15 @@ func (gc *GOPCache) Clear() { gc.gopRingFirst = 0 } +// 往最后一个GOP元素追加一个msg +// 注意,如果GOPCache为空,则不缓存msg func (gc *GOPCache) feedLastGOP(msg base.RTMPMsg, b []byte) { if !gc.isGOPRingEmpty() { gc.gopRing[(gc.gopRingLast-1+gc.gopSize)%gc.gopSize].Feed(msg, b) } } +// 生成一个最新的GOP元素,并往里追加一个msg func (gc *GOPCache) feedNewGOP(msg base.RTMPMsg, b []byte) { if gc.isGOPRingFull() { gc.gopRingFirst = (gc.gopRingFirst + 1) % gc.gopSize diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 8451ad1..3c370ce 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -364,6 +364,14 @@ func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) { group.mutex.Lock() defer group.mutex.Unlock() group.rtmpSubSessionSet[session] = struct{}{} + // 加入时,如果上行还没有推过视频(比如还没推流,或者是单音频流),就不需要等待关键帧了 + // 也即我们假定上行肯定是以关键帧为开始进行视频发送,假设不是,那么我们按上行的流正常发,而不过滤掉关键帧前面的不包含关键帧的非完整GOP + // TODO(chef): + // 1. 需要仔细考虑单音频无视频的流的情况 + // 2. 这里不修改标志,让这个session继续等关键帧也可以 + if group.stat.VideoCodec == "" { + session.ShouldWaitVideoKeyFrame = false + } group.pullIfNeeded() } @@ -382,6 +390,10 @@ func (group *Group) AddHTTPFLVSubSession(session *httpflv.SubSession) { group.mutex.Lock() defer group.mutex.Unlock() group.httpflvSubSessionSet[session] = struct{}{} + // 加入时,如果上行还没有推流过,就不需要等待关键帧了 + if group.stat.VideoCodec == "" { + session.ShouldWaitVideoKeyFrame = false + } group.pullIfNeeded() } @@ -753,25 +765,46 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) { if session.IsFresh { // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 if group.rtmpGopCache.Metadata != nil { - //nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey) + nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey()) _ = session.Write(group.rtmpGopCache.Metadata) } if group.rtmpGopCache.VideoSeqHeader != nil { - //nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey) + nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey()) _ = session.Write(group.rtmpGopCache.VideoSeqHeader) } if group.rtmpGopCache.AACSeqHeader != nil { - //nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey) + nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey()) _ = session.Write(group.rtmpGopCache.AACSeqHeader) } - for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ { + gopCount := group.rtmpGopCache.GetGOPCount() + if gopCount > 0 { + // GOP缓存中肯定包含了关键帧 + session.ShouldWaitVideoKeyFrame = false + + nazalog.Debugf("[%s] [%s] write gop cahe. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount) + } + for i := 0; i < gopCount; i++ { for _, item := range group.rtmpGopCache.GetGOPDataAt(i) { _ = session.Write(item) } } + // 有新加入的sub session(本次循环的第一个新加入的sub session),把rtmp buf writer中的缓存数据全部广播发送给老的sub session + // 从而确保新加入的sub session不会发送这部分脏的数据 + // 注意,此处可能被调用多次,但是只有第一次会实际flush缓存数据 + group.rtmpBufWriter.Flush() + session.IsFresh = false } + + if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNALU() { + // 有sub session在等待关键帧,并且当前是关键帧 + // 把rtmp buf writer中的缓存数据全部广播发送给老的sub session + // 并且修改这个sub session的标志 + // 让rtmp buf writer来发送这个关键帧 + group.rtmpBufWriter.Flush() + session.ShouldWaitVideoKeyFrame = false + } } // ## 3.2. 转发本次数据 if len(group.rtmpSubSessionSet) > 0 { @@ -820,7 +853,12 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) { if group.httpflvGopCache.AACSeqHeader != nil { session.Write(group.httpflvGopCache.AACSeqHeader) } - for i := 0; i < group.httpflvGopCache.GetGOPCount(); i++ { + gopCount := group.httpflvGopCache.GetGOPCount() + if gopCount > 0 { + // GOP缓存中肯定包含了关键帧 + session.ShouldWaitVideoKeyFrame = false + } + for i := 0; i < gopCount; i++ { for _, item := range group.httpflvGopCache.GetGOPDataAt(i) { session.Write(item) } @@ -829,7 +867,15 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) { session.IsFresh = false } - session.Write(lrm2ft.Get()) + // 是否在等待关键帧 + if session.ShouldWaitVideoKeyFrame { + if msg.IsVideoKeyNALU() { + session.Write(lrm2ft.Get()) + session.ShouldWaitVideoKeyFrame = false + } + } else { + session.Write(lrm2ft.Get()) + } } // # 5. 录制flv文件 @@ -853,7 +899,7 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) { group.stat.AudioCodec = base.AudioCodecAAC } } - if group.stat.AudioCodec == "" { + if group.stat.VideoCodec == "" { if msg.IsAVCKeySeqHeader() { group.stat.VideoCodec = base.VideoCodecAVC } @@ -889,6 +935,9 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) { func (group *Group) write2RTMPSubSessions(b []byte) { for session := range group.rtmpSubSessionSet { + if session.IsFresh || session.ShouldWaitVideoKeyFrame { + continue + } _ = session.Write(b) } } diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index d0726c3..28fc988 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -70,7 +70,8 @@ type ServerSession struct { avObserver PubSessionObserver // only for SubSession - IsFresh bool + IsFresh bool + ShouldWaitVideoKeyFrame bool } func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession { @@ -85,12 +86,13 @@ func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSess StartTime: time.Now().Format("2006-01-02 15:04:05.999"), RemoteAddr: conn.RemoteAddr().String(), }, - uniqueKey: uk, - observer: observer, - t: ServerSessionTypeUnknown, - chunkComposer: NewChunkComposer(), - packer: NewMessagePacker(), - IsFresh: true, + uniqueKey: uk, + observer: observer, + t: ServerSessionTypeUnknown, + chunkComposer: NewChunkComposer(), + packer: NewMessagePacker(), + IsFresh: true, + ShouldWaitVideoKeyFrame: true, } nazalog.Infof("[%s] lifecycle new rtmp ServerSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String()) return s