diff --git a/pkg/logic/group__core_streaming.go b/pkg/logic/group__core_streaming.go index 4bcc417..7db46f0 100644 --- a/pkg/logic/group__core_streaming.go +++ b/pkg/logic/group__core_streaming.go @@ -372,13 +372,17 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { // # 缓存关键信息,以及gop if group.config.RtmpConfig.Enable || group.config.RtmpConfig.RtmpsEnable { - group.rtmpGopCache.Feed(msg, lazyRtmpChunkDivider.GetEnsureWithoutSdf()) + if !group.rtmpGopCache.Feed(msg, lazyRtmpChunkDivider.GetEnsureWithoutSdf()) { + Log.Warnf("[%s] over frame number limit for a single gop in rtmp cache.", group.UniqueKey) + } if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata { group.rtmpGopCache.SetMetadata(lazyRtmpChunkDivider.GetEnsureWithSdf(), lazyRtmpChunkDivider.GetEnsureWithoutSdf()) } } if group.config.HttpflvConfig.Enable { - group.httpflvGopCache.Feed(msg, lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf()) + if !group.httpflvGopCache.Feed(msg, lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf()) { + Log.Warnf("[%s] over frame number limit for a single gop in http flv cache.", group.UniqueKey) + } if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata { // 注意,因为withSdf实际上用不上,而且我们也没实现,所以全部用without了 group.httpflvGopCache.SetMetadata(lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf(), lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf()) diff --git a/pkg/remux/gop_cache.go b/pkg/remux/gop_cache.go index fe5d3b9..60d4d28 100644 --- a/pkg/remux/gop_cache.go +++ b/pkg/remux/gop_cache.go @@ -96,24 +96,24 @@ func (gc *GopCache) SetMetadata(w []byte, wo []byte) { // Feed // // @param lg: 内部可能持有lg返回的内存块 -func (gc *GopCache) Feed(msg base.RtmpMsg, b []byte) { +func (gc *GopCache) Feed(msg base.RtmpMsg, b []byte) bool { // TODO(chef): [refactor] 重构lg两个参数这种方式 202207 switch msg.Header.MsgTypeId { case base.RtmpTypeIdMetadata: // noop - return + return true case base.RtmpTypeIdAudio: if msg.IsAacSeqHeader() { gc.AacSeqHeader = b Log.Debugf("[%s] cache %s aac seq header. size:%d", gc.uniqueKey, gc.t, len(gc.AacSeqHeader)) - return + return true } case base.RtmpTypeIdVideo: if msg.IsVideoKeySeqHeader() { gc.VideoSeqHeader = b Log.Debugf("[%s] cache %s video seq header. size:%d", gc.uniqueKey, gc.t, len(gc.VideoSeqHeader)) - return + return true } } @@ -121,9 +121,10 @@ func (gc *GopCache) Feed(msg base.RtmpMsg, b []byte) { if msg.IsVideoKeyNalu() { gc.feedNewGop(msg, b) } else { - gc.feedLastGop(msg, b) + return gc.feedLastGop(msg, b) } } + return true } // GetGopCount 获取GOP数量,注意,最后一个可能是不完整的 @@ -153,13 +154,16 @@ func (gc *GopCache) Clear() { // // 往最后一个GOP元素追加一个msg // 注意,如果GopCache为空,则不缓存msg -func (gc *GopCache) feedLastGop(msg base.RtmpMsg, b []byte) { +func (gc *GopCache) feedLastGop(msg base.RtmpMsg, b []byte) bool { if !gc.isGopRingEmpty() { gopPos := (gc.gopRingLast - 1 + gc.gopSize) % gc.gopSize if gc.gopRing[gopPos].len() <= gc.singleGopMaxFrameNum || gc.singleGopMaxFrameNum == 0 { gc.gopRing[gopPos].Feed(msg, b) + } else { + return false } } + return true } // feedNewGop