diff --git a/conf/ConfigBrief.md b/conf/ConfigBrief.md index cf80403..3c6dc2f 100644 --- a/conf/ConfigBrief.md +++ b/conf/ConfigBrief.md @@ -5,6 +5,7 @@ // 号是否与代码中声明的一致 "rtmp": { "enable": true, //. 是否开启rtmp服务的监听 + // 注意,配置文件中控制各协议类型的enable开关都应该按需打开,避免造成不必要的协议转换的开销 "addr": ":19350", //. RTMP服务监听的端口,客户端向lalserver推拉流都是这个地址 "gop_num": 2, //. RTMP拉流的GOP缓存数量,加速流打开时间,但是可能增加延时 "merge_write_size": 8192 //. 将小包数据合并进行发送,单位字节,提高服务器性能,但是可能造成卡顿 @@ -25,7 +26,7 @@ }, "hls": { "enable": true, //. 是否开启HLS服务的监听 - "enable_https": true, //. 是否开启HTTPS-FLV监听 + "enable_https": true, //. 是否开启HTTPS-FLV监听 "url_pattern": "/hls/", //. 拉流url路由地址,默认值`/hls/`,对应: // - `/hls/{streamName}.m3u8` 或 // `/hls/{streamName}/playlist.m3u8` 或 diff --git a/pkg/hevc/hevc.go b/pkg/hevc/hevc.go index 9009fc2..8f7cc51 100644 --- a/pkg/hevc/hevc.go +++ b/pkg/hevc/hevc.go @@ -55,17 +55,21 @@ var NaluTypeMapping = map[uint8]string{ // ISO_IEC_23008-2_2013.pdf // Table 7-1 – NAL unit type codes and NAL unit type classes const ( - NaluTypeSliceTrailN uint8 = 0 // 0x0 - NaluTypeSliceTrailR uint8 = 1 // 0x01 + NaluTypeSliceTrailN uint8 = 0 // 0x0 + NaluTypeSliceTrailR uint8 = 1 // 0x01 + + // 19, 20, 21都是关键帧 + // TODO(chef): 16,17,18也是关键帧吗? NaluTypeSliceIdr uint8 = 19 // 0x13 NaluTypeSliceIdrNlp uint8 = 20 // 0x14 NaluTypeSliceCranut uint8 = 21 // 0x15 - NaluTypeVps uint8 = 32 // 0x20 - NaluTypeSps uint8 = 33 // 0x21 - NaluTypePps uint8 = 34 // 0x22 - NaluTypeAud uint8 = 35 // 0x23 - NaluTypeSei uint8 = 39 // 0x27 - NaluTypeSeiSuffix uint8 = 40 // 0x28 + + NaluTypeVps uint8 = 32 // 0x20 + NaluTypeSps uint8 = 33 // 0x21 + NaluTypePps uint8 = 34 // 0x22 + NaluTypeAud uint8 = 35 // 0x23 + NaluTypeSei uint8 = 39 // 0x27 + NaluTypeSeiSuffix uint8 = 40 // 0x28 ) type Context struct { diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index 123ef8b..31112ee 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -163,6 +163,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) { //nazalog.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw)) } else { + //nazalog.Debugf("[%s] OnFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw)) // 收到视频,可能触发建立fragment的条件是: // 关键帧数据 && // ((没有收到过音频seq header) || -> 只有视频 @@ -180,7 +181,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) { return } - //nazalog.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw)) + //nazalog.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw)) } mpegts.PackTsPacket(frame, func(packet []byte) { diff --git a/pkg/logic/group.go b/pkg/logic/group.go index cbccbb9..402e171 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -81,7 +81,7 @@ type Group struct { // mpegts使用 patpmt []byte // rtsp使用 - sdp []byte + sdpCtx *sdp.LogicContext // tickCount uint32 } @@ -305,7 +305,7 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) bool { if config.RtspConfig.Enable { group.rtmp2RtspRemuxer = remux.NewRtmp2RtspRemuxer( func(sdpCtx sdp.LogicContext) { - group.sdp = sdpCtx.RawSdp + group.sdpCtx = &sdpCtx }, group.onRtpPacket, ) @@ -443,13 +443,13 @@ func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (o group.mutex.Lock() defer group.mutex.Unlock() // TODO(chef): 应该有等待机制,而不是直接关闭 - if group.sdp == nil { + if group.sdpCtx == nil { nazalog.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]", group.UniqueKey, session.UniqueKey()) return false, nil } - return true, group.sdp + return true, group.sdpCtx.RawSdp } func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) bool { @@ -458,6 +458,9 @@ func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) bool { group.mutex.Lock() defer group.mutex.Unlock() group.rtspSubSessionSet[session] = struct{}{} + if group.stat.VideoCodec == "" { + session.ShouldWaitVideoKeyFrame = false + } return true } @@ -507,77 +510,6 @@ func (group *Group) HasOutSession() bool { return group.hasOutSession() } -func (group *Group) BroadcastByRtmpMsg(msg base.RtmpMsg) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.broadcastByRtmpMsg(msg) -} - -// hls.Muxer -func (group *Group) OnPatPmt(b []byte) { - group.patpmt = b - - if group.recordMpegts != nil { - if err := group.recordMpegts.Write(b); err != nil { - nazalog.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err) - } - } -} - -// hls.Muxer -func (group *Group) OnTsPackets(rawFrame []byte, boundary bool) { - // 因为最前面Feed时已经加锁了,所以这里回调上来就不用加锁了 - - for session := range group.httptsSubSessionSet { - if session.IsFresh { - if boundary { - session.Write(group.patpmt) - session.Write(rawFrame) - session.IsFresh = false - } - } else { - session.Write(rawFrame) - } - } - - if group.recordMpegts != nil { - if err := group.recordMpegts.Write(rawFrame); err != nil { - nazalog.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err) - } - } -} - -// rtmp.PubSession or rtmp.PullSession -func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) { - group.BroadcastByRtmpMsg(msg) -} - -// rtsp.PubSession -func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) { - group.mutex.Lock() - defer group.mutex.Unlock() - - group.onRtpPacket(pkt) -} - -// rtsp.PubSession -func (group *Group) OnSdp(sdpCtx sdp.LogicContext) { - group.mutex.Lock() - defer group.mutex.Unlock() - - group.sdp = sdpCtx.RawSdp - group.rtsp2RtmpRemuxer.OnSdp(sdpCtx) -} - -// rtsp.PubSession -func (group *Group) OnAvPacket(pkt base.AvPacket) { - group.mutex.Lock() - defer group.mutex.Unlock() - //nazalog.Tracef("[%s] > Group::OnAvPacket. type=%s, ts=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp) - - group.rtsp2RtmpRemuxer.OnAvPacket(pkt) -} - func (group *Group) StringifyDebugStats() string { group.mutex.Lock() subLen := len(group.rtmpSubSessionSet) + len(group.httpflvSubSessionSet) + len(group.httptsSubSessionSet) + len(group.rtspSubSessionSet) @@ -736,329 +668,110 @@ func (group *Group) delRtspSubSession(session *rtsp.SubSession) { delete(group.rtspSubSessionSet, session) } -// TODO chef: 目前相当于其他类型往rtmp.AVMsg转了,考虑统一往一个通用类型转 -// @param msg 调用结束后,内部不持有msg.Payload内存块 -func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { - var ( - lcd LazyChunkDivider - lrm2ft LazyRtmpMsg2FlvTag - ) - - //nazalog.Debugf("[%s] broadcaseRTMP. header=%+v, %s", group.UniqueKey, msg.Header, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 7))) - - // # hls - if config.HlsConfig.Enable && group.hlsMuxer != nil { - group.hlsMuxer.FeedRtmpMessage(msg) +func (group *Group) stopPullIfNeeded() { + if group.pullProxy.pullSession != nil && !group.hasOutSession() { + nazalog.Infof("[%s] stop pull since no sub session.", group.UniqueKey) + group.pullProxy.pullSession.Dispose() } +} - // # rtsp - if config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil { - group.rtmp2RtspRemuxer.FeedRtmpMsg(msg) +func (group *Group) pullIfNeeded() { + if !group.pullEnable { + return } - - // # 设置好用于发送的 rtmp 头部信息 - currHeader := remux.MakeDefaultRtmpHeader(msg.Header) - if currHeader.MsgLen != uint32(len(msg.Payload)) { - nazalog.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header) + if !group.hasOutSession() { + return } + if group.hasInSession() { + return + } + // 正在回源中 + if group.pullProxy.isPulling { + return + } + group.pullProxy.isPulling = true - // # 懒初始化rtmp chunk切片,以及httpflv转换 - lcd.Init(msg.Payload, &currHeader) - lrm2ft.Init(msg) - - // # 广播。遍历所有 rtmp sub session,转发数据 - // ## 如果是新的 sub session,发送已缓存的信息 - for session := range group.rtmpSubSessionSet { - if session.IsFresh { - // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 - if group.rtmpGopCache.Metadata != nil { - nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey()) - _ = session.Write(group.rtmpGopCache.Metadata) - } - if group.rtmpGopCache.VideoSeqHeader != nil { - nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey()) - _ = session.Write(group.rtmpGopCache.VideoSeqHeader) - } - if group.rtmpGopCache.AacSeqHeader != nil { - nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey()) - _ = session.Write(group.rtmpGopCache.AacSeqHeader) - } - gopCount := group.rtmpGopCache.GetGopCount() - if gopCount > 0 { - // GOP缓存中肯定包含了关键帧 - session.ShouldWaitVideoKeyFrame = false - - nazalog.Debugf("[%s] [%s] write gop cahe. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount) - } - for i := 0; i < gopCount; i++ { - for _, item := range group.rtmpGopCache.GetGopDataAt(i) { - _ = session.Write(item) - } - } - - // 有新加入的sub session(本次循环的第一个新加入的sub session),把rtmp buf writer中的缓存数据全部广播发送给老的sub session - // 从而确保新加入的sub session不会发送这部分脏的数据 - // 注意,此处可能被调用多次,但是只有第一次会实际flush缓存数据 - group.rtmpBufWriter.Flush() + nazalog.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.pullUrl) - session.IsFresh = false + go func() { + pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { + option.PullTimeoutMs = relayPullTimeoutMs + option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs + }) + err := pullSession.Pull(group.pullUrl, group.OnReadRtmpAvMsg) + if err != nil { + nazalog.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err) + group.DelRtmpPullSession(pullSession) + return } - - if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() { - // 有sub session在等待关键帧,并且当前是关键帧 - // 把rtmp buf writer中的缓存数据全部广播发送给老的sub session - // 并且修改这个sub session的标志 - // 让rtmp buf writer来发送这个关键帧 - group.rtmpBufWriter.Flush() - session.ShouldWaitVideoKeyFrame = false + res := group.AddRtmpPullSession(pullSession) + if res { + err = <-pullSession.WaitChan() + nazalog.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err) + group.DelRtmpPullSession(pullSession) + } else { + pullSession.Dispose() } + }() +} + +func (group *Group) pushIfNeeded() { + // push转推功能没开 + if !config.RelayPushConfig.Enable { + return } - // ## 转发本次数据 - if len(group.rtmpSubSessionSet) > 0 { - group.rtmpBufWriter.Write(lcd.Get()) + // 没有pub发布者 + if group.rtmpPubSession == nil && group.rtspPubSession == nil { + return } - // TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下 - if config.RelayPushConfig.Enable { - for _, v := range group.url2PushProxy { - if v.pushSession == nil { - continue - } + // relay push时携带rtmp pub的参数 + // TODO chef: 这个逻辑放这里不太好看 + var urlParam string + if group.rtmpPubSession != nil { + urlParam = group.rtmpPubSession.RawQuery() + } - if v.pushSession.IsFresh { - if group.rtmpGopCache.Metadata != nil { - _ = v.pushSession.Write(group.rtmpGopCache.Metadata) - } - if group.rtmpGopCache.VideoSeqHeader != nil { - _ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader) - } - if group.rtmpGopCache.AacSeqHeader != nil { - _ = v.pushSession.Write(group.rtmpGopCache.AacSeqHeader) - } - for i := 0; i < group.rtmpGopCache.GetGopCount(); i++ { - for _, item := range group.rtmpGopCache.GetGopDataAt(i) { - _ = v.pushSession.Write(item) - } - } + for url, v := range group.url2PushProxy { + // 正在转推中 + if v.isPushing { + continue + } + v.isPushing = true - v.pushSession.IsFresh = false + urlWithParam := url + if urlParam != "" { + urlWithParam += "?" + urlParam + } + nazalog.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam) + + go func(u, u2 string) { + pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { + option.PushTimeoutMs = relayPushTimeoutMs + option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs + }) + err := pushSession.Push(u2) + if err != nil { + nazalog.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) + group.DelRtmpPushSession(u, pushSession) + return } + group.AddRtmpPushSession(u, pushSession) + err = <-pushSession.WaitChan() + nazalog.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) + group.DelRtmpPushSession(u, pushSession) + }(url, urlWithParam) + } +} - _ = v.pushSession.Write(lcd.Get()) +func (group *Group) hasPushSession() bool { + for _, item := range group.url2PushProxy { + if item.isPushing || item.pushSession != nil { + return true } } - - // # 广播。遍历所有 httpflv sub session,转发数据 - for session := range group.httpflvSubSessionSet { - if session.IsFresh { - if group.httpflvGopCache.Metadata != nil { - session.Write(group.httpflvGopCache.Metadata) - } - if group.httpflvGopCache.VideoSeqHeader != nil { - session.Write(group.httpflvGopCache.VideoSeqHeader) - } - if group.httpflvGopCache.AacSeqHeader != nil { - session.Write(group.httpflvGopCache.AacSeqHeader) - } - gopCount := group.httpflvGopCache.GetGopCount() - if gopCount > 0 { - // GOP缓存中肯定包含了关键帧 - session.ShouldWaitVideoKeyFrame = false - } - for i := 0; i < gopCount; i++ { - for _, item := range group.httpflvGopCache.GetGopDataAt(i) { - session.Write(item) - } - } - - session.IsFresh = false - } - - // 是否在等待关键帧 - if session.ShouldWaitVideoKeyFrame { - if msg.IsVideoKeyNalu() { - session.Write(lrm2ft.Get()) - session.ShouldWaitVideoKeyFrame = false - } - } else { - session.Write(lrm2ft.Get()) - } - } - - // # 录制flv文件 - if group.recordFlv != nil { - if err := group.recordFlv.WriteRaw(lrm2ft.Get()); err != nil { - nazalog.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err) - } - } - - // # 缓存关键信息,以及gop - if config.RtmpConfig.Enable { - group.rtmpGopCache.Feed(msg, lcd.Get) - } - if config.HttpflvConfig.Enable { - group.httpflvGopCache.Feed(msg, lrm2ft.Get) - } - - // # 记录stat - if group.stat.AudioCodec == "" { - if msg.IsAacSeqHeader() { - group.stat.AudioCodec = base.AudioCodecAac - } - } - if group.stat.VideoCodec == "" { - if msg.IsAvcKeySeqHeader() { - group.stat.VideoCodec = base.VideoCodecAvc - } - if msg.IsHevcKeySeqHeader() { - group.stat.VideoCodec = base.VideoCodecHevc - } - } - if group.stat.VideoHeight == 0 || group.stat.VideoWidth == 0 { - if msg.IsAvcKeySeqHeader() { - sps, _, err := avc.ParseSpsPpsFromSeqHeader(msg.Payload) - if err == nil { - var ctx avc.Context - err = avc.ParseSps(sps, &ctx) - if err == nil { - group.stat.VideoHeight = int(ctx.Height) - group.stat.VideoWidth = int(ctx.Width) - } - } - } - if msg.IsHevcKeySeqHeader() { - _, sps, _, err := hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload) - if err == nil { - var ctx hevc.Context - err = hevc.ParseSps(sps, &ctx) - if err == nil { - group.stat.VideoHeight = int(ctx.PicHeightInLumaSamples) - group.stat.VideoWidth = int(ctx.PicWidthInLumaSamples) - } - } - } - } -} - -func (group *Group) onRtpPacket(pkt rtprtcp.RtpPacket) { - for s := range group.rtspSubSessionSet { - s.WriteRtpPacket(pkt) - } -} - -func (group *Group) write2RtmpSubSessions(b []byte) { - for session := range group.rtmpSubSessionSet { - if session.IsFresh || session.ShouldWaitVideoKeyFrame { - continue - } - _ = session.Write(b) - } -} - -func (group *Group) stopPullIfNeeded() { - if group.pullProxy.pullSession != nil && !group.hasOutSession() { - nazalog.Infof("[%s] stop pull since no sub session.", group.UniqueKey) - group.pullProxy.pullSession.Dispose() - } -} - -func (group *Group) pullIfNeeded() { - if !group.pullEnable { - return - } - if !group.hasOutSession() { - return - } - if group.hasInSession() { - return - } - // 正在回源中 - if group.pullProxy.isPulling { - return - } - group.pullProxy.isPulling = true - - nazalog.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.pullUrl) - - go func() { - pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { - option.PullTimeoutMs = relayPullTimeoutMs - option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs - }) - err := pullSession.Pull(group.pullUrl, group.OnReadRtmpAvMsg) - if err != nil { - nazalog.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err) - group.DelRtmpPullSession(pullSession) - return - } - res := group.AddRtmpPullSession(pullSession) - if res { - err = <-pullSession.WaitChan() - nazalog.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err) - group.DelRtmpPullSession(pullSession) - } else { - pullSession.Dispose() - } - }() -} - -func (group *Group) pushIfNeeded() { - // push转推功能没开 - if !config.RelayPushConfig.Enable { - return - } - // 没有pub发布者 - if group.rtmpPubSession == nil && group.rtspPubSession == nil { - return - } - - // relay push时携带rtmp pub的参数 - // TODO chef: 这个逻辑放这里不太好看 - var urlParam string - if group.rtmpPubSession != nil { - urlParam = group.rtmpPubSession.RawQuery() - } - - for url, v := range group.url2PushProxy { - // 正在转推中 - if v.isPushing { - continue - } - v.isPushing = true - - urlWithParam := url - if urlParam != "" { - urlWithParam += "?" + urlParam - } - nazalog.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam) - - go func(u, u2 string) { - pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.PushTimeoutMs = relayPushTimeoutMs - option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs - }) - err := pushSession.Push(u2) - if err != nil { - nazalog.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) - group.DelRtmpPushSession(u, pushSession) - return - } - group.AddRtmpPushSession(u, pushSession) - err = <-pushSession.WaitChan() - nazalog.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) - group.DelRtmpPushSession(u, pushSession) - }(url, urlWithParam) - } -} - -func (group *Group) hasPushSession() bool { - for _, item := range group.url2PushProxy { - if item.isPushing || item.pushSession != nil { - return true - } - } - return false -} + return false +} func (group *Group) isTotalEmpty() bool { return group.rtmpPubSession == nil && @@ -1181,7 +894,7 @@ func (group *Group) delIn() { group.patpmt = nil - group.sdp = nil + group.sdpCtx = nil } func (group *Group) disposeHlsMuxer() { @@ -1219,3 +932,333 @@ func (group *Group) disposeHlsMuxer() { group.hlsMuxer = nil } } + +// --------------------------------------------------------------------------------------------------------------------- +// 音视频数据转发、转封装的逻辑 +// --------------------------------------------------------------------------------------------------------------------- + +// rtmp.PubSession or rtmp.PullSession +func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.broadcastByRtmpMsg(msg) +} + +// rtsp.PubSession +func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) { + group.mutex.Lock() + defer group.mutex.Unlock() + + group.onRtpPacket(pkt) +} + +// rtsp.PubSession +func (group *Group) OnSdp(sdpCtx sdp.LogicContext) { + group.mutex.Lock() + defer group.mutex.Unlock() + + group.sdpCtx = &sdpCtx + group.rtsp2RtmpRemuxer.OnSdp(sdpCtx) +} + +// rtsp.PubSession +func (group *Group) OnAvPacket(pkt base.AvPacket) { + group.mutex.Lock() + defer group.mutex.Unlock() + //nazalog.Debugf("[%s] > Group::OnAvPacket. type=%s, ts=%d, len=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp, len(pkt.Payload)) + + group.rtsp2RtmpRemuxer.OnAvPacket(pkt) +} + +// hls.Muxer +func (group *Group) OnPatPmt(b []byte) { + group.patpmt = b + + if group.recordMpegts != nil { + if err := group.recordMpegts.Write(b); err != nil { + nazalog.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err) + } + } +} + +// hls.Muxer +func (group *Group) OnTsPackets(rawFrame []byte, boundary bool) { + // 因为最前面Feed时已经加锁了,所以这里回调上来就不用加锁了 + + for session := range group.httptsSubSessionSet { + if session.IsFresh { + if boundary { + session.Write(group.patpmt) + session.Write(rawFrame) + session.IsFresh = false + } + } else { + session.Write(rawFrame) + } + } + + if group.recordMpegts != nil { + if err := group.recordMpegts.Write(rawFrame); err != nil { + nazalog.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err) + } + } +} + +// TODO chef: 目前相当于其他类型往rtmp.AVMsg转了,考虑统一往一个通用类型转 +// +// rtmp.PubSession, rtmp.PullSession, rtsp2rtmpRemuxer +// +// @param msg 调用结束后,内部不持有msg.Payload内存块 +// +func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { + var ( + lcd LazyChunkDivider + lrm2ft LazyRtmpMsg2FlvTag + ) + + //nazalog.Debugf("[%s] broadcaseRTMP. header=%+v, %s", group.UniqueKey, msg.Header, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 7))) + + // # hls + if config.HlsConfig.Enable && group.hlsMuxer != nil { + group.hlsMuxer.FeedRtmpMessage(msg) + } + + // # rtsp + if config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil { + group.rtmp2RtspRemuxer.FeedRtmpMsg(msg) + } + + // # 设置好用于发送的 rtmp 头部信息 + currHeader := remux.MakeDefaultRtmpHeader(msg.Header) + if currHeader.MsgLen != uint32(len(msg.Payload)) { + nazalog.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header) + } + + // # 懒初始化rtmp chunk切片,以及httpflv转换 + lcd.Init(msg.Payload, &currHeader) + lrm2ft.Init(msg) + + // # 广播。遍历所有 rtmp sub session,转发数据 + // ## 如果是新的 sub session,发送已缓存的信息 + for session := range group.rtmpSubSessionSet { + if session.IsFresh { + // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 + if group.rtmpGopCache.Metadata != nil { + nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey()) + _ = session.Write(group.rtmpGopCache.Metadata) + } + if group.rtmpGopCache.VideoSeqHeader != nil { + nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey()) + _ = session.Write(group.rtmpGopCache.VideoSeqHeader) + } + if group.rtmpGopCache.AacSeqHeader != nil { + nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey()) + _ = session.Write(group.rtmpGopCache.AacSeqHeader) + } + gopCount := group.rtmpGopCache.GetGopCount() + if gopCount > 0 { + // GOP缓存中肯定包含了关键帧 + session.ShouldWaitVideoKeyFrame = false + + nazalog.Debugf("[%s] [%s] write gop cahe. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount) + } + for i := 0; i < gopCount; i++ { + for _, item := range group.rtmpGopCache.GetGopDataAt(i) { + _ = session.Write(item) + } + } + + // 有新加入的sub session(本次循环的第一个新加入的sub session),把rtmp buf writer中的缓存数据全部广播发送给老的sub session + // 从而确保新加入的sub session不会发送这部分脏的数据 + // 注意,此处可能被调用多次,但是只有第一次会实际flush缓存数据 + group.rtmpBufWriter.Flush() + + session.IsFresh = false + } + + if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() { + // 有sub session在等待关键帧,并且当前是关键帧 + // 把rtmp buf writer中的缓存数据全部广播发送给老的sub session + // 并且修改这个sub session的标志 + // 让rtmp buf writer来发送这个关键帧 + group.rtmpBufWriter.Flush() + session.ShouldWaitVideoKeyFrame = false + } + } + // ## 转发本次数据 + if len(group.rtmpSubSessionSet) > 0 { + group.rtmpBufWriter.Write(lcd.Get()) + } + + // TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下 + if config.RelayPushConfig.Enable { + for _, v := range group.url2PushProxy { + if v.pushSession == nil { + continue + } + + if v.pushSession.IsFresh { + if group.rtmpGopCache.Metadata != nil { + _ = v.pushSession.Write(group.rtmpGopCache.Metadata) + } + if group.rtmpGopCache.VideoSeqHeader != nil { + _ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader) + } + if group.rtmpGopCache.AacSeqHeader != nil { + _ = v.pushSession.Write(group.rtmpGopCache.AacSeqHeader) + } + for i := 0; i < group.rtmpGopCache.GetGopCount(); i++ { + for _, item := range group.rtmpGopCache.GetGopDataAt(i) { + _ = v.pushSession.Write(item) + } + } + + v.pushSession.IsFresh = false + } + + _ = v.pushSession.Write(lcd.Get()) + } + } + + // # 广播。遍历所有 httpflv sub session,转发数据 + for session := range group.httpflvSubSessionSet { + if session.IsFresh { + if group.httpflvGopCache.Metadata != nil { + session.Write(group.httpflvGopCache.Metadata) + } + if group.httpflvGopCache.VideoSeqHeader != nil { + session.Write(group.httpflvGopCache.VideoSeqHeader) + } + if group.httpflvGopCache.AacSeqHeader != nil { + session.Write(group.httpflvGopCache.AacSeqHeader) + } + gopCount := group.httpflvGopCache.GetGopCount() + if gopCount > 0 { + // GOP缓存中肯定包含了关键帧 + session.ShouldWaitVideoKeyFrame = false + } + for i := 0; i < gopCount; i++ { + for _, item := range group.httpflvGopCache.GetGopDataAt(i) { + session.Write(item) + } + } + + session.IsFresh = false + } + + // 是否在等待关键帧 + if session.ShouldWaitVideoKeyFrame { + if msg.IsVideoKeyNalu() { + session.Write(lrm2ft.Get()) + session.ShouldWaitVideoKeyFrame = false + } + } else { + session.Write(lrm2ft.Get()) + } + } + + // # 录制flv文件 + if group.recordFlv != nil { + if err := group.recordFlv.WriteRaw(lrm2ft.Get()); err != nil { + nazalog.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err) + } + } + + // # 缓存关键信息,以及gop + if config.RtmpConfig.Enable { + group.rtmpGopCache.Feed(msg, lcd.Get) + } + if config.HttpflvConfig.Enable { + group.httpflvGopCache.Feed(msg, lrm2ft.Get) + } + + // # 记录stat + if group.stat.AudioCodec == "" { + if msg.IsAacSeqHeader() { + group.stat.AudioCodec = base.AudioCodecAac + } + } + if group.stat.VideoCodec == "" { + if msg.IsAvcKeySeqHeader() { + group.stat.VideoCodec = base.VideoCodecAvc + } + if msg.IsHevcKeySeqHeader() { + group.stat.VideoCodec = base.VideoCodecHevc + } + } + if group.stat.VideoHeight == 0 || group.stat.VideoWidth == 0 { + if msg.IsAvcKeySeqHeader() { + sps, _, err := avc.ParseSpsPpsFromSeqHeader(msg.Payload) + if err == nil { + var ctx avc.Context + err = avc.ParseSps(sps, &ctx) + if err == nil { + group.stat.VideoHeight = int(ctx.Height) + group.stat.VideoWidth = int(ctx.Width) + } + } + } + if msg.IsHevcKeySeqHeader() { + _, sps, _, err := hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload) + if err == nil { + var ctx hevc.Context + err = hevc.ParseSps(sps, &ctx) + if err == nil { + group.stat.VideoHeight = int(ctx.PicHeightInLumaSamples) + group.stat.VideoWidth = int(ctx.PicWidthInLumaSamples) + } + } + } + } +} + +// rtsp.PubSession, rtmp2RtspRemuxer +// +func (group *Group) onRtpPacket(pkt rtprtcp.RtpPacket) { + // 音频直接发送 + if group.sdpCtx.IsAudioPayloadTypeOrigin(int(pkt.Header.PacketType)) { + for s := range group.rtspSubSessionSet { + s.WriteRtpPacket(pkt) + } + return + } + + var ( + boundary bool + boundaryChecked bool // 保证只检查0次或1次,减少性能开销 + ) + + for s := range group.rtspSubSessionSet { + if !s.ShouldWaitVideoKeyFrame { + s.WriteRtpPacket(pkt) + continue + } + + if !boundaryChecked { + switch group.sdpCtx.GetVideoPayloadTypeBase() { + case base.AvPacketPtAvc: + boundary = rtprtcp.IsAvcBoundary(pkt) + case base.AvPacketPtHevc: + boundary = rtprtcp.IsHevcBoundary(pkt) + default: + // 注意,不是avc和hevc时,直接发送 + boundary = true + } + boundaryChecked = true + } + + if boundary { + s.WriteRtpPacket(pkt) + s.ShouldWaitVideoKeyFrame = false + } + } +} + +func (group *Group) write2RtmpSubSessions(b []byte) { + for session := range group.rtmpSubSessionSet { + if session.IsFresh || session.ShouldWaitVideoKeyFrame { + continue + } + _ = session.Write(b) + } +} diff --git a/pkg/remux/avpacket2rtmp.go b/pkg/remux/avpacket2rtmp.go index 24f6ca5..6c7e48c 100644 --- a/pkg/remux/avpacket2rtmp.go +++ b/pkg/remux/avpacket2rtmp.go @@ -189,7 +189,7 @@ func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) { r.clearVideoSeqHeader() } } else { - if t == hevc.NaluTypeSliceIdr || t == hevc.NaluTypeSliceIdrNlp { + if t == hevc.NaluTypeSliceIdr || t == hevc.NaluTypeSliceIdrNlp || t == hevc.NaluTypeSliceCranut { payload[0] = base.RtmpHevcKeyFrame } else { payload[0] = base.RtmpHevcInterFrame diff --git a/pkg/rtprtcp/rtp.go b/pkg/rtprtcp/rtp.go index c6fcd52..d4cb20e 100644 --- a/pkg/rtprtcp/rtp.go +++ b/pkg/rtprtcp/rtp.go @@ -34,6 +34,7 @@ const ( NaluTypeAvcStapa = 24 // one packet, multiple nals NaluTypeAvcFua = 28 + // TODO(chef): hevc有stapa格式吗 NaluTypeHevcFua = 49 ) diff --git a/pkg/rtprtcp/rtp_packet.go b/pkg/rtprtcp/rtp_packet.go index 16355c5..2d26962 100644 --- a/pkg/rtprtcp/rtp_packet.go +++ b/pkg/rtprtcp/rtp_packet.go @@ -8,7 +8,13 @@ package rtprtcp -import "github.com/q191201771/naza/pkg/bele" +import ( + "github.com/q191201771/lal/pkg/avc" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/hevc" + "github.com/q191201771/naza/pkg/bele" + "github.com/q191201771/naza/pkg/nazalog" +) // ----------------------------------- // rfc3550 5.1 RTP Fixed Header Fields @@ -118,3 +124,85 @@ func ParseRtpPacket(b []byte) (pkt RtpPacket, err error) { copy(pkt.Raw, b) return } + +func (p *RtpPacket) Body() []byte { + if p.Header.payloadOffset == 0 { + nazalog.Warnf("CHEFNOTICEME. payloadOffset=%d", p.Header.payloadOffset) + p.Header.payloadOffset = RtpFixedHeaderLength + } + return p.Raw[p.Header.payloadOffset:] +} + +// @param pt: 取值范围为AvPacketPtAvc或AvPacketPtHevc,否则直接返回false +// +func IsAvcHevcBoundary(pkt RtpPacket, pt base.AvPacketPt) bool { + switch pt { + case base.AvPacketPtAvc: + return IsAvcBoundary(pkt) + case base.AvPacketPtHevc: + return IsHevcBoundary(pkt) + } + return false +} + +func IsAvcBoundary(pkt RtpPacket) bool { + boundaryNaluTypes := map[uint8]struct{}{ + avc.NaluTypeSps: {}, + avc.NaluTypePps: {}, + avc.NaluTypeIdrSlice: {}, + } + + b := pkt.Body() + outerNaluType := avc.ParseNaluType(b[0]) + + if _, ok := boundaryNaluTypes[outerNaluType]; ok { + return true + } + + if outerNaluType == NaluTypeAvcStapa { + t := avc.ParseNaluType(b[3]) + if _, ok := boundaryNaluTypes[t]; ok { + return true + } + } + + if outerNaluType == NaluTypeAvcFua { + t := avc.ParseNaluType(b[1]) + if _, ok := boundaryNaluTypes[t]; ok { + if b[1]&0x80 != 0 { + return true + } + } + } + + return false +} + +func IsHevcBoundary(pkt RtpPacket) bool { + boundaryNaluTypes := map[uint8]struct{}{ + hevc.NaluTypeVps: {}, + hevc.NaluTypeSps: {}, + hevc.NaluTypePps: {}, + hevc.NaluTypeSliceIdr: {}, + hevc.NaluTypeSliceIdrNlp: {}, + hevc.NaluTypeSliceCranut: {}, + } + + b := pkt.Body() + outerNaluType := hevc.ParseNaluType(b[0]) + + if _, ok := boundaryNaluTypes[outerNaluType]; ok { + return true + } + + if outerNaluType == NaluTypeHevcFua { + t := b[2] & 0x3F // 注意,这里是后6位,不是中间6位 + if _, ok := boundaryNaluTypes[t]; ok { + if b[2]&0x80 != 0 { + return true + } + } + } + + return false +} diff --git a/pkg/rtprtcp/rtp_unpacker_avc_hevc.go b/pkg/rtprtcp/rtp_unpacker_avc_hevc.go index 1f2bdc3..d593808 100644 --- a/pkg/rtprtcp/rtp_unpacker_avc_hevc.go +++ b/pkg/rtprtcp/rtp_unpacker_avc_hevc.go @@ -291,7 +291,11 @@ func calcPositionIfNeededHevc(pkt *RtpPacket) { fallthrough case hevc.NaluTypeSliceTrailR: fallthrough + case hevc.NaluTypeSliceIdr: + fallthrough case hevc.NaluTypeSliceIdrNlp: + fallthrough + case hevc.NaluTypeSliceCranut: pkt.positionType = PositionTypeSingle return case NaluTypeHevcFua: diff --git a/pkg/rtsp/server_sub_session.go b/pkg/rtsp/server_sub_session.go index e340d6c..e2e1abc 100644 --- a/pkg/rtsp/server_sub_session.go +++ b/pkg/rtsp/server_sub_session.go @@ -23,6 +23,8 @@ type SubSession struct { urlCtx base.UrlContext cmdSession *ServerCommandSession baseOutSession *BaseOutSession + + ShouldWaitVideoKeyFrame bool } func NewSubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *SubSession { @@ -31,6 +33,8 @@ func NewSubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *Su uniqueKey: uk, urlCtx: urlCtx, cmdSession: cmdSession, + + ShouldWaitVideoKeyFrame: true, } baseOutSession := NewBaseOutSession(uk, s) s.baseOutSession = baseOutSession