[fix] 确保rtsp sub拉流从关键帧开始发送数据,避免因此引起的花屏

pull/104/head
q191201771 4 years ago
parent d11074958d
commit 4cea1b8cf5

@ -5,6 +5,7 @@
// 号是否与代码中声明的一致
"rtmp": {
"enable": true, //. 是否开启rtmp服务的监听
// 注意配置文件中控制各协议类型的enable开关都应该按需打开避免造成不必要的协议转换的开销
"addr": ":19350", //. RTMP服务监听的端口客户端向lalserver推拉流都是这个地址
"gop_num": 2, //. RTMP拉流的GOP缓存数量加速流打开时间但是可能增加延时
"merge_write_size": 8192 //. 将小包数据合并进行发送,单位字节,提高服务器性能,但是可能造成卡顿
@ -25,7 +26,7 @@
},
"hls": {
"enable": true, //. 是否开启HLS服务的监听
"enable_https": true, //. 是否开启HTTPS-FLV监听
"enable_https": true, //. 是否开启HTTPS-FLV监听
"url_pattern": "/hls/", //. 拉流url路由地址默认值`/hls/`,对应:
// - `/hls/{streamName}.m3u8`
// `/hls/{streamName}/playlist.m3u8`

@ -55,17 +55,21 @@ var NaluTypeMapping = map[uint8]string{
// ISO_IEC_23008-2_2013.pdf
// Table 7-1 NAL unit type codes and NAL unit type classes
const (
NaluTypeSliceTrailN uint8 = 0 // 0x0
NaluTypeSliceTrailR uint8 = 1 // 0x01
NaluTypeSliceTrailN uint8 = 0 // 0x0
NaluTypeSliceTrailR uint8 = 1 // 0x01
// 19, 20, 21都是关键帧
// TODO(chef): 161718也是关键帧吗
NaluTypeSliceIdr uint8 = 19 // 0x13
NaluTypeSliceIdrNlp uint8 = 20 // 0x14
NaluTypeSliceCranut uint8 = 21 // 0x15
NaluTypeVps uint8 = 32 // 0x20
NaluTypeSps uint8 = 33 // 0x21
NaluTypePps uint8 = 34 // 0x22
NaluTypeAud uint8 = 35 // 0x23
NaluTypeSei uint8 = 39 // 0x27
NaluTypeSeiSuffix uint8 = 40 // 0x28
NaluTypeVps uint8 = 32 // 0x20
NaluTypeSps uint8 = 33 // 0x21
NaluTypePps uint8 = 34 // 0x22
NaluTypeAud uint8 = 35 // 0x23
NaluTypeSei uint8 = 39 // 0x27
NaluTypeSeiSuffix uint8 = 40 // 0x28
)
type Context struct {

@ -163,6 +163,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) {
//nazalog.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
} else {
//nazalog.Debugf("[%s] OnFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw))
// 收到视频可能触发建立fragment的条件是
// 关键帧数据 &&
// ((没有收到过音频seq header) || -> 只有视频
@ -180,7 +181,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) {
return
}
//nazalog.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
//nazalog.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw))
}
mpegts.PackTsPacket(frame, func(packet []byte) {

@ -81,7 +81,7 @@ type Group struct {
// mpegts使用
patpmt []byte
// rtsp使用
sdp []byte
sdpCtx *sdp.LogicContext
//
tickCount uint32
}
@ -305,7 +305,7 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) bool {
if config.RtspConfig.Enable {
group.rtmp2RtspRemuxer = remux.NewRtmp2RtspRemuxer(
func(sdpCtx sdp.LogicContext) {
group.sdp = sdpCtx.RawSdp
group.sdpCtx = &sdpCtx
},
group.onRtpPacket,
)
@ -443,13 +443,13 @@ func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (o
group.mutex.Lock()
defer group.mutex.Unlock()
// TODO(chef): 应该有等待机制,而不是直接关闭
if group.sdp == nil {
if group.sdpCtx == nil {
nazalog.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]",
group.UniqueKey, session.UniqueKey())
return false, nil
}
return true, group.sdp
return true, group.sdpCtx.RawSdp
}
func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) bool {
@ -458,6 +458,9 @@ func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) bool {
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtspSubSessionSet[session] = struct{}{}
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
return true
}
@ -507,77 +510,6 @@ func (group *Group) HasOutSession() bool {
return group.hasOutSession()
}
func (group *Group) BroadcastByRtmpMsg(msg base.RtmpMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.broadcastByRtmpMsg(msg)
}
// hls.Muxer
func (group *Group) OnPatPmt(b []byte) {
group.patpmt = b
if group.recordMpegts != nil {
if err := group.recordMpegts.Write(b); err != nil {
nazalog.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err)
}
}
}
// hls.Muxer
func (group *Group) OnTsPackets(rawFrame []byte, boundary bool) {
// 因为最前面Feed时已经加锁了所以这里回调上来就不用加锁了
for session := range group.httptsSubSessionSet {
if session.IsFresh {
if boundary {
session.Write(group.patpmt)
session.Write(rawFrame)
session.IsFresh = false
}
} else {
session.Write(rawFrame)
}
}
if group.recordMpegts != nil {
if err := group.recordMpegts.Write(rawFrame); err != nil {
nazalog.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err)
}
}
}
// rtmp.PubSession or rtmp.PullSession
func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) {
group.BroadcastByRtmpMsg(msg)
}
// rtsp.PubSession
func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.onRtpPacket(pkt)
}
// rtsp.PubSession
func (group *Group) OnSdp(sdpCtx sdp.LogicContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.sdp = sdpCtx.RawSdp
group.rtsp2RtmpRemuxer.OnSdp(sdpCtx)
}
// rtsp.PubSession
func (group *Group) OnAvPacket(pkt base.AvPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
//nazalog.Tracef("[%s] > Group::OnAvPacket. type=%s, ts=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp)
group.rtsp2RtmpRemuxer.OnAvPacket(pkt)
}
func (group *Group) StringifyDebugStats() string {
group.mutex.Lock()
subLen := len(group.rtmpSubSessionSet) + len(group.httpflvSubSessionSet) + len(group.httptsSubSessionSet) + len(group.rtspSubSessionSet)
@ -736,329 +668,110 @@ func (group *Group) delRtspSubSession(session *rtsp.SubSession) {
delete(group.rtspSubSessionSet, session)
}
// TODO chef: 目前相当于其他类型往rtmp.AVMsg转了考虑统一往一个通用类型转
// @param msg 调用结束后内部不持有msg.Payload内存块
func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
var (
lcd LazyChunkDivider
lrm2ft LazyRtmpMsg2FlvTag
)
//nazalog.Debugf("[%s] broadcaseRTMP. header=%+v, %s", group.UniqueKey, msg.Header, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 7)))
// # hls
if config.HlsConfig.Enable && group.hlsMuxer != nil {
group.hlsMuxer.FeedRtmpMessage(msg)
func (group *Group) stopPullIfNeeded() {
if group.pullProxy.pullSession != nil && !group.hasOutSession() {
nazalog.Infof("[%s] stop pull since no sub session.", group.UniqueKey)
group.pullProxy.pullSession.Dispose()
}
}
// # rtsp
if config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil {
group.rtmp2RtspRemuxer.FeedRtmpMsg(msg)
func (group *Group) pullIfNeeded() {
if !group.pullEnable {
return
}
// # 设置好用于发送的 rtmp 头部信息
currHeader := remux.MakeDefaultRtmpHeader(msg.Header)
if currHeader.MsgLen != uint32(len(msg.Payload)) {
nazalog.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header)
if !group.hasOutSession() {
return
}
if group.hasInSession() {
return
}
// 正在回源中
if group.pullProxy.isPulling {
return
}
group.pullProxy.isPulling = true
// # 懒初始化rtmp chunk切片以及httpflv转换
lcd.Init(msg.Payload, &currHeader)
lrm2ft.Init(msg)
// # 广播。遍历所有 rtmp sub session转发数据
// ## 如果是新的 sub session发送已缓存的信息
for session := range group.rtmpSubSessionSet {
if session.IsFresh {
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
if group.rtmpGopCache.Metadata != nil {
nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.Metadata)
}
if group.rtmpGopCache.VideoSeqHeader != nil {
nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.rtmpGopCache.AacSeqHeader != nil {
nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.AacSeqHeader)
}
gopCount := group.rtmpGopCache.GetGopCount()
if gopCount > 0 {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
nazalog.Debugf("[%s] [%s] write gop cahe. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount)
}
for i := 0; i < gopCount; i++ {
for _, item := range group.rtmpGopCache.GetGopDataAt(i) {
_ = session.Write(item)
}
}
// 有新加入的sub session本次循环的第一个新加入的sub session把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 从而确保新加入的sub session不会发送这部分脏的数据
// 注意此处可能被调用多次但是只有第一次会实际flush缓存数据
group.rtmpBufWriter.Flush()
nazalog.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.pullUrl)
session.IsFresh = false
go func() {
pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = relayPullTimeoutMs
option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs
})
err := pullSession.Pull(group.pullUrl, group.OnReadRtmpAvMsg)
if err != nil {
nazalog.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
return
}
if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() {
// 有sub session在等待关键帧并且当前是关键帧
// 把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 并且修改这个sub session的标志
// 让rtmp buf writer来发送这个关键帧
group.rtmpBufWriter.Flush()
session.ShouldWaitVideoKeyFrame = false
res := group.AddRtmpPullSession(pullSession)
if res {
err = <-pullSession.WaitChan()
nazalog.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
} else {
pullSession.Dispose()
}
}()
}
func (group *Group) pushIfNeeded() {
// push转推功能没开
if !config.RelayPushConfig.Enable {
return
}
// ## 转发本次数据
if len(group.rtmpSubSessionSet) > 0 {
group.rtmpBufWriter.Write(lcd.Get())
// 没有pub发布者
if group.rtmpPubSession == nil && group.rtspPubSession == nil {
return
}
// TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下
if config.RelayPushConfig.Enable {
for _, v := range group.url2PushProxy {
if v.pushSession == nil {
continue
}
// relay push时携带rtmp pub的参数
// TODO chef: 这个逻辑放这里不太好看
var urlParam string
if group.rtmpPubSession != nil {
urlParam = group.rtmpPubSession.RawQuery()
}
if v.pushSession.IsFresh {
if group.rtmpGopCache.Metadata != nil {
_ = v.pushSession.Write(group.rtmpGopCache.Metadata)
}
if group.rtmpGopCache.VideoSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.rtmpGopCache.AacSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.AacSeqHeader)
}
for i := 0; i < group.rtmpGopCache.GetGopCount(); i++ {
for _, item := range group.rtmpGopCache.GetGopDataAt(i) {
_ = v.pushSession.Write(item)
}
}
for url, v := range group.url2PushProxy {
// 正在转推中
if v.isPushing {
continue
}
v.isPushing = true
v.pushSession.IsFresh = false
urlWithParam := url
if urlParam != "" {
urlWithParam += "?" + urlParam
}
nazalog.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam)
go func(u, u2 string) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = relayPushTimeoutMs
option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs
})
err := pushSession.Push(u2)
if err != nil {
nazalog.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
return
}
group.AddRtmpPushSession(u, pushSession)
err = <-pushSession.WaitChan()
nazalog.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
}(url, urlWithParam)
}
}
_ = v.pushSession.Write(lcd.Get())
func (group *Group) hasPushSession() bool {
for _, item := range group.url2PushProxy {
if item.isPushing || item.pushSession != nil {
return true
}
}
// # 广播。遍历所有 httpflv sub session转发数据
for session := range group.httpflvSubSessionSet {
if session.IsFresh {
if group.httpflvGopCache.Metadata != nil {
session.Write(group.httpflvGopCache.Metadata)
}
if group.httpflvGopCache.VideoSeqHeader != nil {
session.Write(group.httpflvGopCache.VideoSeqHeader)
}
if group.httpflvGopCache.AacSeqHeader != nil {
session.Write(group.httpflvGopCache.AacSeqHeader)
}
gopCount := group.httpflvGopCache.GetGopCount()
if gopCount > 0 {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
}
for i := 0; i < gopCount; i++ {
for _, item := range group.httpflvGopCache.GetGopDataAt(i) {
session.Write(item)
}
}
session.IsFresh = false
}
// 是否在等待关键帧
if session.ShouldWaitVideoKeyFrame {
if msg.IsVideoKeyNalu() {
session.Write(lrm2ft.Get())
session.ShouldWaitVideoKeyFrame = false
}
} else {
session.Write(lrm2ft.Get())
}
}
// # 录制flv文件
if group.recordFlv != nil {
if err := group.recordFlv.WriteRaw(lrm2ft.Get()); err != nil {
nazalog.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err)
}
}
// # 缓存关键信息以及gop
if config.RtmpConfig.Enable {
group.rtmpGopCache.Feed(msg, lcd.Get)
}
if config.HttpflvConfig.Enable {
group.httpflvGopCache.Feed(msg, lrm2ft.Get)
}
// # 记录stat
if group.stat.AudioCodec == "" {
if msg.IsAacSeqHeader() {
group.stat.AudioCodec = base.AudioCodecAac
}
}
if group.stat.VideoCodec == "" {
if msg.IsAvcKeySeqHeader() {
group.stat.VideoCodec = base.VideoCodecAvc
}
if msg.IsHevcKeySeqHeader() {
group.stat.VideoCodec = base.VideoCodecHevc
}
}
if group.stat.VideoHeight == 0 || group.stat.VideoWidth == 0 {
if msg.IsAvcKeySeqHeader() {
sps, _, err := avc.ParseSpsPpsFromSeqHeader(msg.Payload)
if err == nil {
var ctx avc.Context
err = avc.ParseSps(sps, &ctx)
if err == nil {
group.stat.VideoHeight = int(ctx.Height)
group.stat.VideoWidth = int(ctx.Width)
}
}
}
if msg.IsHevcKeySeqHeader() {
_, sps, _, err := hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload)
if err == nil {
var ctx hevc.Context
err = hevc.ParseSps(sps, &ctx)
if err == nil {
group.stat.VideoHeight = int(ctx.PicHeightInLumaSamples)
group.stat.VideoWidth = int(ctx.PicWidthInLumaSamples)
}
}
}
}
}
func (group *Group) onRtpPacket(pkt rtprtcp.RtpPacket) {
for s := range group.rtspSubSessionSet {
s.WriteRtpPacket(pkt)
}
}
func (group *Group) write2RtmpSubSessions(b []byte) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
continue
}
_ = session.Write(b)
}
}
func (group *Group) stopPullIfNeeded() {
if group.pullProxy.pullSession != nil && !group.hasOutSession() {
nazalog.Infof("[%s] stop pull since no sub session.", group.UniqueKey)
group.pullProxy.pullSession.Dispose()
}
}
func (group *Group) pullIfNeeded() {
if !group.pullEnable {
return
}
if !group.hasOutSession() {
return
}
if group.hasInSession() {
return
}
// 正在回源中
if group.pullProxy.isPulling {
return
}
group.pullProxy.isPulling = true
nazalog.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.pullUrl)
go func() {
pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = relayPullTimeoutMs
option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs
})
err := pullSession.Pull(group.pullUrl, group.OnReadRtmpAvMsg)
if err != nil {
nazalog.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
return
}
res := group.AddRtmpPullSession(pullSession)
if res {
err = <-pullSession.WaitChan()
nazalog.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
} else {
pullSession.Dispose()
}
}()
}
func (group *Group) pushIfNeeded() {
// push转推功能没开
if !config.RelayPushConfig.Enable {
return
}
// 没有pub发布者
if group.rtmpPubSession == nil && group.rtspPubSession == nil {
return
}
// relay push时携带rtmp pub的参数
// TODO chef: 这个逻辑放这里不太好看
var urlParam string
if group.rtmpPubSession != nil {
urlParam = group.rtmpPubSession.RawQuery()
}
for url, v := range group.url2PushProxy {
// 正在转推中
if v.isPushing {
continue
}
v.isPushing = true
urlWithParam := url
if urlParam != "" {
urlWithParam += "?" + urlParam
}
nazalog.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam)
go func(u, u2 string) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = relayPushTimeoutMs
option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs
})
err := pushSession.Push(u2)
if err != nil {
nazalog.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
return
}
group.AddRtmpPushSession(u, pushSession)
err = <-pushSession.WaitChan()
nazalog.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
}(url, urlWithParam)
}
}
func (group *Group) hasPushSession() bool {
for _, item := range group.url2PushProxy {
if item.isPushing || item.pushSession != nil {
return true
}
}
return false
}
return false
}
func (group *Group) isTotalEmpty() bool {
return group.rtmpPubSession == nil &&
@ -1181,7 +894,7 @@ func (group *Group) delIn() {
group.patpmt = nil
group.sdp = nil
group.sdpCtx = nil
}
func (group *Group) disposeHlsMuxer() {
@ -1219,3 +932,333 @@ func (group *Group) disposeHlsMuxer() {
group.hlsMuxer = nil
}
}
// ---------------------------------------------------------------------------------------------------------------------
// 音视频数据转发、转封装的逻辑
// ---------------------------------------------------------------------------------------------------------------------
// rtmp.PubSession or rtmp.PullSession
func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.broadcastByRtmpMsg(msg)
}
// rtsp.PubSession
func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.onRtpPacket(pkt)
}
// rtsp.PubSession
func (group *Group) OnSdp(sdpCtx sdp.LogicContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.sdpCtx = &sdpCtx
group.rtsp2RtmpRemuxer.OnSdp(sdpCtx)
}
// rtsp.PubSession
func (group *Group) OnAvPacket(pkt base.AvPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
//nazalog.Debugf("[%s] > Group::OnAvPacket. type=%s, ts=%d, len=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp, len(pkt.Payload))
group.rtsp2RtmpRemuxer.OnAvPacket(pkt)
}
// hls.Muxer
func (group *Group) OnPatPmt(b []byte) {
group.patpmt = b
if group.recordMpegts != nil {
if err := group.recordMpegts.Write(b); err != nil {
nazalog.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err)
}
}
}
// hls.Muxer
func (group *Group) OnTsPackets(rawFrame []byte, boundary bool) {
// 因为最前面Feed时已经加锁了所以这里回调上来就不用加锁了
for session := range group.httptsSubSessionSet {
if session.IsFresh {
if boundary {
session.Write(group.patpmt)
session.Write(rawFrame)
session.IsFresh = false
}
} else {
session.Write(rawFrame)
}
}
if group.recordMpegts != nil {
if err := group.recordMpegts.Write(rawFrame); err != nil {
nazalog.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err)
}
}
}
// TODO chef: 目前相当于其他类型往rtmp.AVMsg转了考虑统一往一个通用类型转
//
// rtmp.PubSession, rtmp.PullSession, rtsp2rtmpRemuxer
//
// @param msg 调用结束后内部不持有msg.Payload内存块
//
func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
var (
lcd LazyChunkDivider
lrm2ft LazyRtmpMsg2FlvTag
)
//nazalog.Debugf("[%s] broadcaseRTMP. header=%+v, %s", group.UniqueKey, msg.Header, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 7)))
// # hls
if config.HlsConfig.Enable && group.hlsMuxer != nil {
group.hlsMuxer.FeedRtmpMessage(msg)
}
// # rtsp
if config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil {
group.rtmp2RtspRemuxer.FeedRtmpMsg(msg)
}
// # 设置好用于发送的 rtmp 头部信息
currHeader := remux.MakeDefaultRtmpHeader(msg.Header)
if currHeader.MsgLen != uint32(len(msg.Payload)) {
nazalog.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header)
}
// # 懒初始化rtmp chunk切片以及httpflv转换
lcd.Init(msg.Payload, &currHeader)
lrm2ft.Init(msg)
// # 广播。遍历所有 rtmp sub session转发数据
// ## 如果是新的 sub session发送已缓存的信息
for session := range group.rtmpSubSessionSet {
if session.IsFresh {
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
if group.rtmpGopCache.Metadata != nil {
nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.Metadata)
}
if group.rtmpGopCache.VideoSeqHeader != nil {
nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.rtmpGopCache.AacSeqHeader != nil {
nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.AacSeqHeader)
}
gopCount := group.rtmpGopCache.GetGopCount()
if gopCount > 0 {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
nazalog.Debugf("[%s] [%s] write gop cahe. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount)
}
for i := 0; i < gopCount; i++ {
for _, item := range group.rtmpGopCache.GetGopDataAt(i) {
_ = session.Write(item)
}
}
// 有新加入的sub session本次循环的第一个新加入的sub session把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 从而确保新加入的sub session不会发送这部分脏的数据
// 注意此处可能被调用多次但是只有第一次会实际flush缓存数据
group.rtmpBufWriter.Flush()
session.IsFresh = false
}
if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() {
// 有sub session在等待关键帧并且当前是关键帧
// 把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 并且修改这个sub session的标志
// 让rtmp buf writer来发送这个关键帧
group.rtmpBufWriter.Flush()
session.ShouldWaitVideoKeyFrame = false
}
}
// ## 转发本次数据
if len(group.rtmpSubSessionSet) > 0 {
group.rtmpBufWriter.Write(lcd.Get())
}
// TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下
if config.RelayPushConfig.Enable {
for _, v := range group.url2PushProxy {
if v.pushSession == nil {
continue
}
if v.pushSession.IsFresh {
if group.rtmpGopCache.Metadata != nil {
_ = v.pushSession.Write(group.rtmpGopCache.Metadata)
}
if group.rtmpGopCache.VideoSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.rtmpGopCache.AacSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.AacSeqHeader)
}
for i := 0; i < group.rtmpGopCache.GetGopCount(); i++ {
for _, item := range group.rtmpGopCache.GetGopDataAt(i) {
_ = v.pushSession.Write(item)
}
}
v.pushSession.IsFresh = false
}
_ = v.pushSession.Write(lcd.Get())
}
}
// # 广播。遍历所有 httpflv sub session转发数据
for session := range group.httpflvSubSessionSet {
if session.IsFresh {
if group.httpflvGopCache.Metadata != nil {
session.Write(group.httpflvGopCache.Metadata)
}
if group.httpflvGopCache.VideoSeqHeader != nil {
session.Write(group.httpflvGopCache.VideoSeqHeader)
}
if group.httpflvGopCache.AacSeqHeader != nil {
session.Write(group.httpflvGopCache.AacSeqHeader)
}
gopCount := group.httpflvGopCache.GetGopCount()
if gopCount > 0 {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
}
for i := 0; i < gopCount; i++ {
for _, item := range group.httpflvGopCache.GetGopDataAt(i) {
session.Write(item)
}
}
session.IsFresh = false
}
// 是否在等待关键帧
if session.ShouldWaitVideoKeyFrame {
if msg.IsVideoKeyNalu() {
session.Write(lrm2ft.Get())
session.ShouldWaitVideoKeyFrame = false
}
} else {
session.Write(lrm2ft.Get())
}
}
// # 录制flv文件
if group.recordFlv != nil {
if err := group.recordFlv.WriteRaw(lrm2ft.Get()); err != nil {
nazalog.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err)
}
}
// # 缓存关键信息以及gop
if config.RtmpConfig.Enable {
group.rtmpGopCache.Feed(msg, lcd.Get)
}
if config.HttpflvConfig.Enable {
group.httpflvGopCache.Feed(msg, lrm2ft.Get)
}
// # 记录stat
if group.stat.AudioCodec == "" {
if msg.IsAacSeqHeader() {
group.stat.AudioCodec = base.AudioCodecAac
}
}
if group.stat.VideoCodec == "" {
if msg.IsAvcKeySeqHeader() {
group.stat.VideoCodec = base.VideoCodecAvc
}
if msg.IsHevcKeySeqHeader() {
group.stat.VideoCodec = base.VideoCodecHevc
}
}
if group.stat.VideoHeight == 0 || group.stat.VideoWidth == 0 {
if msg.IsAvcKeySeqHeader() {
sps, _, err := avc.ParseSpsPpsFromSeqHeader(msg.Payload)
if err == nil {
var ctx avc.Context
err = avc.ParseSps(sps, &ctx)
if err == nil {
group.stat.VideoHeight = int(ctx.Height)
group.stat.VideoWidth = int(ctx.Width)
}
}
}
if msg.IsHevcKeySeqHeader() {
_, sps, _, err := hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload)
if err == nil {
var ctx hevc.Context
err = hevc.ParseSps(sps, &ctx)
if err == nil {
group.stat.VideoHeight = int(ctx.PicHeightInLumaSamples)
group.stat.VideoWidth = int(ctx.PicWidthInLumaSamples)
}
}
}
}
}
// rtsp.PubSession, rtmp2RtspRemuxer
//
func (group *Group) onRtpPacket(pkt rtprtcp.RtpPacket) {
// 音频直接发送
if group.sdpCtx.IsAudioPayloadTypeOrigin(int(pkt.Header.PacketType)) {
for s := range group.rtspSubSessionSet {
s.WriteRtpPacket(pkt)
}
return
}
var (
boundary bool
boundaryChecked bool // 保证只检查0次或1次减少性能开销
)
for s := range group.rtspSubSessionSet {
if !s.ShouldWaitVideoKeyFrame {
s.WriteRtpPacket(pkt)
continue
}
if !boundaryChecked {
switch group.sdpCtx.GetVideoPayloadTypeBase() {
case base.AvPacketPtAvc:
boundary = rtprtcp.IsAvcBoundary(pkt)
case base.AvPacketPtHevc:
boundary = rtprtcp.IsHevcBoundary(pkt)
default:
// 注意不是avc和hevc时直接发送
boundary = true
}
boundaryChecked = true
}
if boundary {
s.WriteRtpPacket(pkt)
s.ShouldWaitVideoKeyFrame = false
}
}
}
func (group *Group) write2RtmpSubSessions(b []byte) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
continue
}
_ = session.Write(b)
}
}

@ -189,7 +189,7 @@ func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) {
r.clearVideoSeqHeader()
}
} else {
if t == hevc.NaluTypeSliceIdr || t == hevc.NaluTypeSliceIdrNlp {
if t == hevc.NaluTypeSliceIdr || t == hevc.NaluTypeSliceIdrNlp || t == hevc.NaluTypeSliceCranut {
payload[0] = base.RtmpHevcKeyFrame
} else {
payload[0] = base.RtmpHevcInterFrame

@ -34,6 +34,7 @@ const (
NaluTypeAvcStapa = 24 // one packet, multiple nals
NaluTypeAvcFua = 28
// TODO(chef): hevc有stapa格式吗
NaluTypeHevcFua = 49
)

@ -8,7 +8,13 @@
package rtprtcp
import "github.com/q191201771/naza/pkg/bele"
import (
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
// -----------------------------------
// rfc3550 5.1 RTP Fixed Header Fields
@ -118,3 +124,85 @@ func ParseRtpPacket(b []byte) (pkt RtpPacket, err error) {
copy(pkt.Raw, b)
return
}
func (p *RtpPacket) Body() []byte {
if p.Header.payloadOffset == 0 {
nazalog.Warnf("CHEFNOTICEME. payloadOffset=%d", p.Header.payloadOffset)
p.Header.payloadOffset = RtpFixedHeaderLength
}
return p.Raw[p.Header.payloadOffset:]
}
// @param pt: 取值范围为AvPacketPtAvc或AvPacketPtHevc否则直接返回false
//
func IsAvcHevcBoundary(pkt RtpPacket, pt base.AvPacketPt) bool {
switch pt {
case base.AvPacketPtAvc:
return IsAvcBoundary(pkt)
case base.AvPacketPtHevc:
return IsHevcBoundary(pkt)
}
return false
}
func IsAvcBoundary(pkt RtpPacket) bool {
boundaryNaluTypes := map[uint8]struct{}{
avc.NaluTypeSps: {},
avc.NaluTypePps: {},
avc.NaluTypeIdrSlice: {},
}
b := pkt.Body()
outerNaluType := avc.ParseNaluType(b[0])
if _, ok := boundaryNaluTypes[outerNaluType]; ok {
return true
}
if outerNaluType == NaluTypeAvcStapa {
t := avc.ParseNaluType(b[3])
if _, ok := boundaryNaluTypes[t]; ok {
return true
}
}
if outerNaluType == NaluTypeAvcFua {
t := avc.ParseNaluType(b[1])
if _, ok := boundaryNaluTypes[t]; ok {
if b[1]&0x80 != 0 {
return true
}
}
}
return false
}
func IsHevcBoundary(pkt RtpPacket) bool {
boundaryNaluTypes := map[uint8]struct{}{
hevc.NaluTypeVps: {},
hevc.NaluTypeSps: {},
hevc.NaluTypePps: {},
hevc.NaluTypeSliceIdr: {},
hevc.NaluTypeSliceIdrNlp: {},
hevc.NaluTypeSliceCranut: {},
}
b := pkt.Body()
outerNaluType := hevc.ParseNaluType(b[0])
if _, ok := boundaryNaluTypes[outerNaluType]; ok {
return true
}
if outerNaluType == NaluTypeHevcFua {
t := b[2] & 0x3F // 注意这里是后6位不是中间6位
if _, ok := boundaryNaluTypes[t]; ok {
if b[2]&0x80 != 0 {
return true
}
}
}
return false
}

@ -291,7 +291,11 @@ func calcPositionIfNeededHevc(pkt *RtpPacket) {
fallthrough
case hevc.NaluTypeSliceTrailR:
fallthrough
case hevc.NaluTypeSliceIdr:
fallthrough
case hevc.NaluTypeSliceIdrNlp:
fallthrough
case hevc.NaluTypeSliceCranut:
pkt.positionType = PositionTypeSingle
return
case NaluTypeHevcFua:

@ -23,6 +23,8 @@ type SubSession struct {
urlCtx base.UrlContext
cmdSession *ServerCommandSession
baseOutSession *BaseOutSession
ShouldWaitVideoKeyFrame bool
}
func NewSubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *SubSession {
@ -31,6 +33,8 @@ func NewSubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *Su
uniqueKey: uk,
urlCtx: urlCtx,
cmdSession: cmdSession,
ShouldWaitVideoKeyFrame: true,
}
baseOutSession := NewBaseOutSession(uk, s)
s.baseOutSession = baseOutSession

Loading…
Cancel
Save