[refactor] 重构已支持静音叠加功能的session(rtmp pub, customize pub)的代码 #160

pull/237/head
q191201771 2 years ago
parent 44cc464836
commit aeb318d2d0

@ -45,22 +45,22 @@ import (
// ---------------------------------------------------------------------------------------------------------------------
// 输入流到输出流的转换路径关系
//
// customizePubSession.WithOnRtmpMsg -> [dummyAudioFilter] -> OnReadRtmpAvMsg -> rtmp2RtspRemuxer -> rtsp
// -> rtmp
// -> http-flv, ts, hls
// customizePubSession.WithOnRtmpMsg -> OnReadRtmpAvMsg(enter Lock) -> [dummyAudioFilter] -> broadcastByRtmpMsg -> rtmp, http-flv
// -> rtmp2RtspRemuxer -> rtsp
// -> rtmp2MpegtsRemuxer -> ts, hls
//
// ---------------------------------------------------------------------------------------------------------------------
// rtmpPubSession 和customizePubSession一样省略
// rtmpPubSession和customizePubSession一样省略
//
// ---------------------------------------------------------------------------------------------------------------------
// rtspPubSession -> OnRtpPacket -> rtsp
// -> OnAvPacket -> rtsp2RtmpRemuxer -> onRtmpMsgFromRemux -> broadcastByRtmpMsg -> rtmp
// -> http-flv, ts, hls
// rtspPubSession -> OnRtpPacket(enter Lock) -> rtsp
// -> OnAvPacket(enter Lock) -> rtsp2RtmpRemuxer -> onRtmpMsgFromRemux -> [dummyAudioFilter] -> broadcastByRtmpMsg -> rtmp, http-flv
// -> rtmp2MpegtsRemuxer -> ts, hls
//
// ---------------------------------------------------------------------------------------------------------------------
// psPubSession -> OnAvPacketFromPsPubSession -> rtsp2RtmpRemuxer -> onRtmpMsgFromRemux -> broadcastByRtmpMsg -> rtmp2RtspRemuxer -> rtsp
// -> rtmp
// -> http-flv, ts, hls
// psPubSession -> OnAvPacketFromPsPubSession(enter Lock) -> rtsp2RtmpRemuxer -> onRtmpMsgFromRemux -> [dummyAudioFilter] -> broadcastByRtmpMsg -> ...
// -> ...
// -> ...
type IGroupObserver interface {
CleanupHlsIfNeeded(appName string, streamName string, path string)
@ -170,7 +170,6 @@ func (group *Group) RunLoop() {
// Tick 定时器
//
// @param tickCount 当前时间单位秒。注意不一定是Unix时间戳可以是从0开始+1秒递增的时间
//
func (group *Group) Tick(tickCount uint32) {
group.mutex.Lock()
defer group.mutex.Unlock()
@ -402,7 +401,6 @@ func (group *Group) OutSessionNum() int {
// disposeInactiveSessions 关闭不活跃的session
//
// TODO chef: [refactor] 梳理和naza.Connection超时重复部分
//
func (group *Group) disposeInactiveSessions(tickCount uint32) {
if group.psPubSession != nil {
if group.psPubTimeoutSec == 0 {
@ -485,7 +483,6 @@ func (group *Group) disposeInactiveSessions(tickCount uint32) {
}
// updateAllSessionStat 更新所有session的状态
//
func (group *Group) updateAllSessionStat() {
if group.rtmpPubSession != nil {
group.rtmpPubSession.UpdateStat(calcSessionStatIntervalSec)
@ -549,13 +546,11 @@ func (group *Group) hasInSession() bool {
}
// hasOutSession 是否还有out往外发送音视频数据的session
//
func (group *Group) hasOutSession() bool {
return group.hasSubSession() || group.hasPushSession()
}
// isTotalEmpty 当前group是否完全没有流了
//
func (group *Group) isTotalEmpty() bool {
return !group.hasInSession() && !group.hasOutSession()
}

@ -35,11 +35,14 @@ import (
//
// 输入rtmp数据.
// 来自 rtmp.ServerSession(Pub), rtmp.PullSession, CustomizePubSessionContext(remux.AvPacket2RtmpRemuxer), (remux.DummyAudioFilter) 的回调.
//
func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.broadcastByRtmpMsg(msg)
if group.dummyAudioFilter != nil {
group.dummyAudioFilter.Feed(msg)
} else {
group.broadcastByRtmpMsg(msg)
}
}
// ---------------------------------------------------------------------------------------------------------------------
@ -48,7 +51,6 @@ func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) {
//
// 输入rtsp(rtp)和rtp合帧之后的数据.
// 来自 rtsp.PubSession 的回调.
//
func (group *Group) OnSdp(sdpCtx sdp.LogicContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
@ -87,7 +89,6 @@ func (group *Group) OnAvPacket(pkt base.AvPacket) {
// OnAvPacketFromPsPubSession
//
// 来自 gb28181.PubSession 的回调.
//
func (group *Group) OnAvPacketFromPsPubSession(pkt *base.AvPacket) {
// TODO(chef): [refactor] 统一所有回调AvPacket和*AvPacket 202208
@ -107,7 +108,6 @@ func (group *Group) OnAvPacketFromPsPubSession(pkt *base.AvPacket) {
//
// 输入mpegts数据.
// 来自 remux.Rtmp2MpegtsRemuxer 的回调.
//
func (group *Group) OnPatPmt(b []byte) {
group.patpmt = b
@ -133,9 +133,12 @@ func (group *Group) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary
//
// 输入rtmp数据.
// 来自 remux.AvPacket2RtmpRemuxer 的回调.
//
func (group *Group) onRtmpMsgFromRemux(msg base.RtmpMsg) {
group.broadcastByRtmpMsg(msg)
if group.dummyAudioFilter != nil {
group.dummyAudioFilter.Feed(msg)
} else {
group.broadcastByRtmpMsg(msg)
}
}
// ---------------------------------------------------------------------------------------------------------------------
@ -144,7 +147,6 @@ func (group *Group) onRtmpMsgFromRemux(msg base.RtmpMsg) {
//
// 输入rtsp(rtp)数据.
// 来自 remux.Rtmp2RtspRemuxer 的回调.
//
func (group *Group) onSdpFromRemux(sdpCtx sdp.LogicContext) {
group.sdpCtx = &sdpCtx
group.feedWaitRtspSubSessions()
@ -160,7 +162,6 @@ func (group *Group) onRtpPacketFromRemux(pkt rtprtcp.RtpPacket) {
// OnFragmentOpen
//
// 来自 hls.Muxer 的回调
//
func (group *Group) OnFragmentOpen() {
group.rtmp2MpegtsRemuxer.FlushAudio()
}
@ -172,7 +173,6 @@ func (group *Group) OnFragmentOpen() {
// 使用rtmp类型的数据做为输入广播给各协议的输出
//
// @param msg 调用结束后内部不持有msg.Payload内存块
//
func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
//Log.Debugf("> broadcastByRtmpMsg. %s", msg.DebugString())

@ -39,11 +39,10 @@ func (group *Group) AddCustomizePubSession(streamName string) (ICustomizePubSess
)
}
group.customizePubSession.WithOnRtmpMsg(group.OnReadRtmpAvMsg)
if group.config.RtmpConfig.AddDummyAudioEnable {
group.dummyAudioFilter = remux.NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg)
group.customizePubSession.WithOnRtmpMsg(group.dummyAudioFilter.OnReadRtmpAvMsg)
} else {
group.customizePubSession.WithOnRtmpMsg(group.OnReadRtmpAvMsg)
group.dummyAudioFilter = remux.NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, group.broadcastByRtmpMsg)
}
return group.customizePubSession, nil
@ -71,16 +70,10 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error {
)
}
// TODO(chef): feat 为其他输入流也添加假音频。比如rtmp pull以及rtsp
// TODO(chef): refactor 可考虑抽象出一个输入流的配置块
// TODO(chef): refactor 考虑放入addIn中
session.SetPubSessionObserver(group)
if group.config.RtmpConfig.AddDummyAudioEnable {
// TODO(chef): 从整体控制和锁关系来说应该让pub的数据回调到group中进锁后再让数据流入filter
// TODO(chef): 这里用OnReadRtmpAvMsg正确吗是否会重复进锁
group.dummyAudioFilter = remux.NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg)
session.SetPubSessionObserver(group.dummyAudioFilter)
} else {
session.SetPubSessionObserver(group)
group.dummyAudioFilter = remux.NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, group.broadcastByRtmpMsg)
}
return nil
@ -104,6 +97,10 @@ func (group *Group) AddRtspPubSession(session *rtsp.PubSession) error {
group.rtsp2RtmpRemuxer = remux.NewAvPacket2RtmpRemuxer().WithOnRtmpMsg(group.onRtmpMsgFromRemux)
session.SetObserver(group)
//if group.config.RtmpConfig.AddDummyAudioEnable {
// group.dummyAudioFilter = remux.NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, )
//}
return nil
}
@ -362,7 +359,6 @@ func (group *Group) delPullSession(session base.IObject) {
// ---------------------------------------------------------------------------------------------------------------------
// addIn 有pub或pull的输入型session加入时需要调用该函数
//
func (group *Group) addIn() {
now := time.Now().Unix()
@ -377,7 +373,6 @@ func (group *Group) addIn() {
}
// delIn 有pub或pull的输入型session离开时需要调用该函数
//
func (group *Group) delIn() {
// 注意remuxer放前面使得有机会将内部缓存的数据吐出来
if group.rtmp2MpegtsRemuxer != nil {

@ -112,12 +112,12 @@ func (s *SimpleGroupManager) Len() int {
// ---------------------------------------------------------------------------------------------------------------------
//
// 背景:
// - 有的协议需要结合appName和streamName作为流唯一标识比如rtmphttpflvhttpts
// - 有的协议不需要appName只使用streamName作为流唯一标识比如rtsp
// - 有的协议需要结合appName和streamName作为流唯一标识比如rtmphttpflvhttpts
// - 有的协议不需要appName只使用streamName作为流唯一标识比如rtsp
//
// 目标:
// - 有appName的协议需要参考appName。
// - 没appName的协议需要和有appName的协议互通。
// - 有appName的协议需要参考appName。
// - 没appName的协议需要和有appName的协议互通。
//
// 注意:
// - 当以上两种类型的协议混用时系统使用者应避免第二种协议的streamName在第一种协议中存在相同的streamName但是appName不止一个这种情况下内部无法知道该如何对应。

Loading…
Cancel
Save