diff --git a/pkg/base/session.go b/pkg/base/session.go index d3d50d1..b40ab45 100644 --- a/pkg/base/session.go +++ b/pkg/base/session.go @@ -13,6 +13,21 @@ import ( "strings" ) +// group中,session Dispose表现记录 +// +// Dispose结束后回调OnDel: +// rtmp.ServerSession(包含pub和sub) 1 +// rtsp.PubSession和rtsp.SubSession 1 +// rtmp.PullSession 2 +// httpflv.SubSession 3 +// httpts.SubSession 3 +// +// +// 情况1: 协议正常走完回调OnAdd,在自身server的RunLoop结束后,回调OnDel +// 情况2: 在group中pull阻塞结束后,手动回调OnDel +// 情况3: 在logic中sub RunLoop结束后,手动回调OnDel +// + // IsUseClosedConnectionError 当connection处于这些情况时,就不需要再Close了 // TODO(chef): 临时放这 // TODO(chef): 目前暂时没有使用,因为connection支持多次调用Close @@ -81,7 +96,9 @@ type IClientSessionLifecycle interface { type IServerSessionLifecycle interface { // RunLoop 开启session的事件循环,阻塞直到session结束 // - RunLoop() error + // 注意,rtsp的 pub和sub没有RunLoop,RunLoop是在cmd session上,所以暂时把这个函数从接口去除 + // + //RunLoop() error // Dispose 主动关闭session时调用 // diff --git a/pkg/innertest/iface_impl.go b/pkg/innertest/iface_impl.go index 288e4cc..66dc455 100644 --- a/pkg/innertest/iface_impl.go +++ b/pkg/innertest/iface_impl.go @@ -65,12 +65,10 @@ var ( // IServerSession var ( _ base.IServerSession = &rtmp.ServerSession{} + _ base.IServerSession = &rtsp.PubSession{} + _ base.IServerSession = &rtsp.SubSession{} _ base.IServerSession = &httpflv.SubSession{} _ base.IServerSession = &httpts.SubSession{} - - // 这两个比较特殊,它们没有RunLoop函数,RunLoop在rtsp.ServerCommandSession上 - //_ base.IServerSession = &rtsp.PubSession{} - //_ base.IServerSession = &rtsp.SubSession{} ) // IClientSessionLifecycle: 所有Client Session都满足 @@ -91,12 +89,11 @@ var ( var ( // server session _ base.IServerSessionLifecycle = &rtmp.ServerSession{} + _ base.IServerSessionLifecycle = &rtsp.PubSession{} + _ base.IServerSessionLifecycle = &rtsp.SubSession{} _ base.IServerSessionLifecycle = &httpflv.SubSession{} _ base.IServerSessionLifecycle = &httpts.SubSession{} - // 这两个比较特殊,它们没有RunLoop函数,RunLoop在rtsp.ServerCommandSession上 - //_ base.IServerSessionLifecycle = &rtsp.PubSession{} - //_ base.IServerSessionLifecycle = &rtsp.SubSession{} // other _ base.IServerSessionLifecycle = &base.HttpSubSession{} _ base.IServerSessionLifecycle = &rtsp.ServerCommandSession{} diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index 4854a5d..6fdc2a0 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -10,14 +10,15 @@ package innertest import ( "fmt" - "github.com/q191201771/naza/pkg/nazabytes" - "github.com/q191201771/naza/pkg/nazalog" "io/ioutil" "net/http" "os" "testing" "time" + "github.com/q191201771/naza/pkg/nazabytes" + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/lal/pkg/httpts" "github.com/q191201771/naza/pkg/filebatch" @@ -109,11 +110,11 @@ func Entry(tt *testing.T) { mode = 0 entry() - //mode = 1 - //entry() - // - //mode = 2 - //entry() + mode = 1 + entry() + + mode = 2 + entry() } func entry() { @@ -130,7 +131,7 @@ func entry() { rtmpPullTagCount.Store(0) httptsSize.Store(0) hls.Clock = mock.NewFakeClock() - hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.Local)) + hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.UTC)) httpts.SubSessionWriteChanSize = 0 var err error @@ -388,8 +389,8 @@ func getHttpts() ([]byte, error) { defer resp.Body.Close() var buf nazabytes.Buffer + buf.ReserveBytes(goldenHttptsLenList[mode]) for { - buf.ReserveBytes(10000) n, err := resp.Body.Read(buf.WritableBytes()) if n > 0 { buf.Flush(n) @@ -464,17 +465,17 @@ var goldenPlaylistM3u8List = []string{ #EXT-X-MEDIA-SEQUENCE:2 #EXTINF:3.333, -innertest-1642346665000-2.ts +innertest-1642375465000-2.ts #EXTINF:4.000, -innertest-1642346665000-3.ts +innertest-1642375465000-3.ts #EXTINF:4.867, -innertest-1642346665000-4.ts +innertest-1642375465000-4.ts #EXTINF:3.133, -innertest-1642346665000-5.ts +innertest-1642375465000-5.ts #EXTINF:4.000, -innertest-1642346665000-6.ts +innertest-1642375465000-6.ts #EXTINF:2.621, -innertest-1642346665000-7.ts +innertest-1642375465000-7.ts #EXT-X-ENDLIST `, `#EXTM3U @@ -484,17 +485,17 @@ innertest-1642346665000-7.ts #EXT-X-MEDIA-SEQUENCE:4 #EXTINF:3.088, -innertest-1642346665000-4.ts +innertest-1642375465000-4.ts #EXTINF:3.088, -innertest-1642346665000-5.ts +innertest-1642375465000-5.ts #EXTINF:3.089, -innertest-1642346665000-6.ts +innertest-1642375465000-6.ts #EXTINF:3.088, -innertest-1642346665000-7.ts +innertest-1642375465000-7.ts #EXTINF:3.088, -innertest-1642346665000-8.ts +innertest-1642375465000-8.ts #EXTINF:2.113, -innertest-1642346665000-9.ts +innertest-1642375465000-9.ts #EXT-X-ENDLIST `, `#EXTM3U @@ -504,17 +505,17 @@ innertest-1642346665000-9.ts #EXT-X-MEDIA-SEQUENCE:2 #EXTINF:3.333, -innertest-1642346665000-2.ts +innertest-1642375465000-2.ts #EXTINF:4.000, -innertest-1642346665000-3.ts +innertest-1642375465000-3.ts #EXTINF:4.867, -innertest-1642346665000-4.ts +innertest-1642375465000-4.ts #EXTINF:3.133, -innertest-1642346665000-5.ts +innertest-1642375465000-5.ts #EXTINF:4.000, -innertest-1642346665000-6.ts +innertest-1642375465000-6.ts #EXTINF:2.600, -innertest-1642346665000-7.ts +innertest-1642375465000-7.ts #EXT-X-ENDLIST `, } @@ -527,21 +528,21 @@ var goldenRecordM3u8List = []string{ #EXT-X-DISCONTINUITY #EXTINF:4.000, -innertest-1642346665000-0.ts +innertest-1642375465000-0.ts #EXTINF:4.000, -innertest-1642346665000-1.ts +innertest-1642375465000-1.ts #EXTINF:3.333, -innertest-1642346665000-2.ts +innertest-1642375465000-2.ts #EXTINF:4.000, -innertest-1642346665000-3.ts +innertest-1642375465000-3.ts #EXTINF:4.867, -innertest-1642346665000-4.ts +innertest-1642375465000-4.ts #EXTINF:3.133, -innertest-1642346665000-5.ts +innertest-1642375465000-5.ts #EXTINF:4.000, -innertest-1642346665000-6.ts +innertest-1642375465000-6.ts #EXTINF:2.621, -innertest-1642346665000-7.ts +innertest-1642375465000-7.ts #EXT-X-ENDLIST `, `#EXTM3U @@ -551,25 +552,25 @@ innertest-1642346665000-7.ts #EXT-X-DISCONTINUITY #EXTINF:3.088, -innertest-1642346665000-0.ts +innertest-1642375465000-0.ts #EXTINF:3.088, -innertest-1642346665000-1.ts +innertest-1642375465000-1.ts #EXTINF:3.089, -innertest-1642346665000-2.ts +innertest-1642375465000-2.ts #EXTINF:3.088, -innertest-1642346665000-3.ts +innertest-1642375465000-3.ts #EXTINF:3.088, -innertest-1642346665000-4.ts +innertest-1642375465000-4.ts #EXTINF:3.088, -innertest-1642346665000-5.ts +innertest-1642375465000-5.ts #EXTINF:3.089, -innertest-1642346665000-6.ts +innertest-1642375465000-6.ts #EXTINF:3.088, -innertest-1642346665000-7.ts +innertest-1642375465000-7.ts #EXTINF:3.088, -innertest-1642346665000-8.ts +innertest-1642375465000-8.ts #EXTINF:2.113, -innertest-1642346665000-9.ts +innertest-1642375465000-9.ts #EXT-X-ENDLIST `, `#EXTM3U @@ -579,21 +580,21 @@ innertest-1642346665000-9.ts #EXT-X-DISCONTINUITY #EXTINF:4.000, -innertest-1642346665000-0.ts +innertest-1642375465000-0.ts #EXTINF:4.000, -innertest-1642346665000-1.ts +innertest-1642375465000-1.ts #EXTINF:3.333, -innertest-1642346665000-2.ts +innertest-1642375465000-2.ts #EXTINF:4.000, -innertest-1642346665000-3.ts +innertest-1642375465000-3.ts #EXTINF:4.867, -innertest-1642346665000-4.ts +innertest-1642375465000-4.ts #EXTINF:3.133, -innertest-1642346665000-5.ts +innertest-1642375465000-5.ts #EXTINF:4.000, -innertest-1642346665000-6.ts +innertest-1642375465000-6.ts #EXTINF:2.600, -innertest-1642346665000-7.ts +innertest-1642375465000-7.ts #EXT-X-ENDLIST `, } diff --git a/pkg/logic/config.go b/pkg/logic/config.go index 5335a1c..d6fe0cc 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -322,6 +322,9 @@ func LoadConfAndInitLog(confFile string) *Config { return config } + +// --------------------------------------------------------------------------------------------------------------------- + func mergeCommonHttpAddrConfig(dst, src *CommonHttpAddrConfig) { if dst.HttpListenAddr == "" && src.HttpListenAddr != "" { dst.HttpListenAddr = src.HttpListenAddr diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 530fb97..66b9aef 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -10,7 +10,6 @@ package logic import ( "encoding/json" - "fmt" "strings" "sync" @@ -74,34 +73,11 @@ type Group struct { rtmpMergeWriter *base.MergeWriter // TODO(chef): 后面可以在业务层加一个定时Flush // stat base.StatGroup - // - tickCount uint32 -} - -type pullProxy struct { - isPulling bool - pullSession *rtmp.PullSession -} - -type pushProxy struct { - isPushing bool - pushSession *rtmp.PushSession } func NewGroup(appName string, streamName string, config *Config, observer GroupObserver) *Group { uk := base.GenUkGroup() - url2PushProxy := make(map[string]*pushProxy) // TODO(chef): 移入Enable里面并进行review+测试 - if config.RelayPushConfig.Enable { - for _, addr := range config.RelayPushConfig.AddrList { - pushUrl := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName) - url2PushProxy[pushUrl] = &pushProxy{ - isPushing: false, - pushSession: nil, - } - } - } - g := &Group{ UniqueKey: uk, appName: appName, @@ -118,17 +94,11 @@ func NewGroup(appName string, streamName string, config *Config, observer GroupO rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}), rtmpGopCache: remux.NewGopCache("rtmp", uk, config.RtmpConfig.GopNum), httpflvGopCache: remux.NewGopCache("httpflv", uk, config.HttpflvConfig.GopNum), - pushEnable: config.RelayPushConfig.Enable, - url2PushProxy: url2PushProxy, pullProxy: &pullProxy{}, } - // 根据配置文件中的静态回源配置来初始化回源设置 - var pullUrl string - if config.RelayPullConfig.Enable { - pullUrl = fmt.Sprintf("rtmp://%s/%s/%s", config.RelayPullConfig.Addr, appName, streamName) - } - g.setPullUrl(config.RelayPullConfig.Enable, pullUrl) + g.initRelayPush() + g.initRelayPull() if config.RtmpConfig.MergeWriteSize > 0 { g.rtmpMergeWriter = base.NewMergeWriter(g.writev2RtmpSubSessions, config.RtmpConfig.MergeWriteSize) @@ -142,107 +112,30 @@ func (group *Group) RunLoop() { <-group.exitChan } -// Tick TODO chef: 传入时间 -// 目前每秒触发一次 -func (group *Group) Tick() { +// Tick 定时器 +// +// @param tickCount 当前时间,单位秒。注意,不一定是Unix时间戳,可以是从0开始+1秒递增的时间 +// +func (group *Group) Tick(tickCount uint32) { group.mutex.Lock() defer group.mutex.Unlock() group.stopPullIfNeeded() group.pullIfNeeded() - // 还有pub推流,没在push就触发push - group.pushIfNeeded() - - // TODO chef: - // 梳理和naza.Connection超时重复部分 + group.startPushIfNeeded() - // TODO(chef): 所有dispose后,是否需要做打扫处理(比如设置nil以及主动调del),还是都在后面的del回调函数中统一处理 // 定时关闭没有数据的session - if group.tickCount%checkSessionAliveIntervalSec == 0 { - if group.rtmpPubSession != nil { - if readAlive, _ := group.rtmpPubSession.IsAlive(); !readAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtmpPubSession.UniqueKey()) - group.rtmpPubSession.Dispose() - group.rtmp2RtspRemuxer = nil - } - } - if group.rtspPubSession != nil { - if readAlive, _ := group.rtspPubSession.IsAlive(); !readAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey()) - group.rtspPubSession.Dispose() - group.rtspPubSession = nil - group.rtsp2RtmpRemuxer = nil - } - } - if group.pullProxy.pullSession != nil { - if readAlive, _ := group.pullProxy.pullSession.IsAlive(); !readAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.pullProxy.pullSession.UniqueKey()) - group.pullProxy.pullSession.Dispose() - group.delRtmpPullSession(group.pullProxy.pullSession) - } - } - for session := range group.rtmpSubSessionSet { - if _, writeAlive := session.IsAlive(); !writeAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) - session.Dispose() - group.delRtmpSubSession(session) - } - } - for session := range group.httpflvSubSessionSet { - if _, writeAlive := session.IsAlive(); !writeAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) - session.Dispose() - group.delHttpflvSubSession(session) - } - } - for session := range group.httptsSubSessionSet { - if _, writeAlive := session.IsAlive(); !writeAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) - session.Dispose() - group.delHttptsSubSession(session) - } - } - for session := range group.rtspSubSessionSet { - if _, writeAlive := session.IsAlive(); !writeAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) - session.Dispose() - group.delRtspSubSession(session) - } - } + if tickCount%checkSessionAliveIntervalSec == 0 { + group.disposeInactiveSessions() } // 定时计算session bitrate - if group.tickCount%calcSessionStatIntervalSec == 0 { - if group.rtmpPubSession != nil { - group.rtmpPubSession.UpdateStat(calcSessionStatIntervalSec) - } - if group.rtspPubSession != nil { - group.rtspPubSession.UpdateStat(calcSessionStatIntervalSec) - } - if group.pullProxy.pullSession != nil { - group.pullProxy.pullSession.UpdateStat(calcSessionStatIntervalSec) - } - for session := range group.rtmpSubSessionSet { - session.UpdateStat(calcSessionStatIntervalSec) - } - for session := range group.httpflvSubSessionSet { - session.UpdateStat(calcSessionStatIntervalSec) - } - for session := range group.httptsSubSessionSet { - session.UpdateStat(calcSessionStatIntervalSec) - } - for session := range group.rtspSubSessionSet { - session.UpdateStat(calcSessionStatIntervalSec) - } + if tickCount%calcSessionStatIntervalSec == 0 { + group.updateAllSessionStat() } - - group.tickCount++ } -// Dispose 主动释放所有资源。保证所有资源的生命周期逻辑上都在我们的控制中。降低出bug的几率,降低心智负担。 -// 注意,Dispose后,不应再使用这个对象。 -// 值得一提,如果是从其他协程回调回来的消息,在使用Group中的资源前,要判断资源是否存在以及可用。 -// +// Dispose ... func (group *Group) Dispose() { Log.Infof("[%s] lifecycle dispose group.", group.UniqueKey) group.exitChan <- struct{}{} @@ -252,13 +145,9 @@ func (group *Group) Dispose() { if group.rtmpPubSession != nil { group.rtmpPubSession.Dispose() - group.rtmpPubSession = nil - group.rtmp2RtspRemuxer = nil } if group.rtspPubSession != nil { group.rtspPubSession.Dispose() - group.rtspPubSession = nil - group.rtsp2RtmpRemuxer = nil } for session := range group.rtmpSubSessionSet { @@ -276,163 +165,11 @@ func (group *Group) Dispose() { } group.httptsSubSessionSet = nil - group.disposeHlsMuxer() - - if group.pushEnable { - for _, v := range group.url2PushProxy { - if v.pushSession != nil { - v.pushSession.Dispose() - } - } - group.url2PushProxy = nil - } -} - -// --------------------------------------------------------------------------------------------------------------------- - -func (group *Group) AddRtmpSubSession(session *rtmp.ServerSession) { - Log.Debugf("[%s] [%s] add SubSession into group.", group.UniqueKey, session.UniqueKey()) - group.mutex.Lock() - defer group.mutex.Unlock() - group.rtmpSubSessionSet[session] = struct{}{} - // 加入时,如果上行还没有推过视频(比如还没推流,或者是单音频流),就不需要等待关键帧了 - // 也即我们假定上行肯定是以关键帧为开始进行视频发送,假设不是,那么我们按上行的流正常发,而不过滤掉关键帧前面的不包含关键帧的非完整GOP - // TODO(chef): - // 1. 需要仔细考虑单音频无视频的流的情况 - // 2. 这里不修改标志,让这个session继续等关键帧也可以 - if group.stat.VideoCodec == "" { - session.ShouldWaitVideoKeyFrame = false - } - - group.pullIfNeeded() -} - -func (group *Group) AddHttpflvSubSession(session *httpflv.SubSession) { - Log.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey()) - session.WriteHttpResponseHeader() - session.WriteFlvHeader() - - group.mutex.Lock() - defer group.mutex.Unlock() - group.httpflvSubSessionSet[session] = struct{}{} - // 加入时,如果上行还没有推流过,就不需要等待关键帧了 - if group.stat.VideoCodec == "" { - session.ShouldWaitVideoKeyFrame = false - } - - group.pullIfNeeded() -} - -// AddHttptsSubSession TODO chef: -// 这里应该也要考虑触发hls muxer开启 -// 也即HTTPTS sub需要使用hls muxer,hls muxer开启和关闭都要考虑HTTPTS sub -func (group *Group) AddHttptsSubSession(session *httpts.SubSession) { - Log.Debugf("[%s] [%s] add httpts SubSession into group.", group.UniqueKey, session.UniqueKey()) - session.WriteHttpResponseHeader() - - group.mutex.Lock() - defer group.mutex.Unlock() - group.httptsSubSessionSet[session] = struct{}{} - - group.pullIfNeeded() -} - -func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) { - group.mutex.Lock() - defer group.mutex.Unlock() - // TODO(chef): 应该有等待机制,而不是直接关闭 - if group.sdpCtx == nil { - Log.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]", - group.UniqueKey, session.UniqueKey()) - return false, nil - } - - return true, group.sdpCtx.RawSdp -} - -func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) { - Log.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey()) - - group.mutex.Lock() - defer group.mutex.Unlock() - group.rtspSubSessionSet[session] = struct{}{} - if group.stat.VideoCodec == "" { - session.ShouldWaitVideoKeyFrame = false - } - - // TODO(chef): rtsp sub也应该判断是否需要静态pull回源 -} - -func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) { - Log.Debugf("[%s] [%s] add rtmp PushSession into group.", group.UniqueKey, session.UniqueKey()) - group.mutex.Lock() - defer group.mutex.Unlock() - if group.url2PushProxy != nil { - group.url2PushProxy[url].pushSession = session - } -} - -func (group *Group) DelRtmpSubSession(session *rtmp.ServerSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.delRtmpSubSession(session) -} - -func (group *Group) DelHttpflvSubSession(session *httpflv.SubSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.delHttpflvSubSession(session) -} - -func (group *Group) DelHttptsSubSession(session *httpts.SubSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.delHttptsSubSession(session) -} - -func (group *Group) DelRtspSubSession(session *rtsp.SubSession) { - Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey()) - group.mutex.Lock() - defer group.mutex.Unlock() - delete(group.rtspSubSessionSet, session) -} - -func (group *Group) DelRtmpPushSession(url string, session *rtmp.PushSession) { - Log.Debugf("[%s] [%s] del rtmp PushSession into group.", group.UniqueKey, session.UniqueKey()) - group.mutex.Lock() - defer group.mutex.Unlock() - if group.url2PushProxy != nil { - group.url2PushProxy[url].pushSession = nil - group.url2PushProxy[url].isPushing = false - } + group.delIn() } // --------------------------------------------------------------------------------------------------------------------- -func (group *Group) IsTotalEmpty() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return group.isTotalEmpty() -} - -func (group *Group) HasInSession() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return group.hasInSession() -} - -func (group *Group) HasOutSession() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return group.hasOutSession() -} - -func (group *Group) IsHlsMuxerAlive() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return group.hlsMuxer != nil -} - func (group *Group) StringifyDebugStats(maxsub int) string { b, _ := json.Marshal(group.GetStat(maxsub)) return string(b) @@ -499,13 +236,11 @@ func (group *Group) KickOutSession(sessionId string) bool { if strings.HasPrefix(sessionId, base.UkPreRtmpServerSession) { if group.rtmpPubSession != nil { group.rtmpPubSession.Dispose() - group.rtmp2RtspRemuxer = nil return true } } else if strings.HasPrefix(sessionId, base.UkPreRtspPubSession) { if group.rtspPubSession != nil { group.rtspPubSession.Dispose() - group.rtsp2RtmpRemuxer = nil return true } } else if strings.HasPrefix(sessionId, base.UkPreFlvSubSession) { @@ -524,7 +259,12 @@ func (group *Group) KickOutSession(sessionId string) bool { } } } else if strings.HasPrefix(sessionId, base.UkPreRtspSubSession) { - // TODO chef: impl me + for s := range group.rtspSubSessionSet { + if s.UniqueKey() == sessionId { + s.Dispose() + return true + } + } } else { Log.Errorf("[%s] kick out session while session id format invalid. %s", group.UniqueKey, sessionId) } @@ -532,87 +272,101 @@ func (group *Group) KickOutSession(sessionId string) bool { return false } -// StartPull 外部命令主动触发pull拉流 -// -// 当前调用时机: -// 1. 比如http api -// -func (group *Group) StartPull(url string) { +func (group *Group) IsTotalEmpty() bool { group.mutex.Lock() defer group.mutex.Unlock() - - group.setPullUrl(true, url) - group.pullIfNeeded() -} - -// --------------------------------------------------------------------------------------------------------------------- - -func (group *Group) delRtmpSubSession(session *rtmp.ServerSession) { - Log.Debugf("[%s] [%s] del rtmp SubSession from group.", group.UniqueKey, session.UniqueKey()) - delete(group.rtmpSubSessionSet, session) + return group.isTotalEmpty() } -func (group *Group) delHttpflvSubSession(session *httpflv.SubSession) { - Log.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey()) - delete(group.httpflvSubSessionSet, session) +func (group *Group) HasInSession() bool { + group.mutex.Lock() + defer group.mutex.Unlock() + return group.hasInSession() } -func (group *Group) delHttptsSubSession(session *httpts.SubSession) { - Log.Debugf("[%s] [%s] del httpts SubSession from group.", group.UniqueKey, session.UniqueKey()) - delete(group.httptsSubSessionSet, session) +func (group *Group) HasOutSession() bool { + group.mutex.Lock() + defer group.mutex.Unlock() + return group.hasOutSession() } -func (group *Group) delRtspSubSession(session *rtsp.SubSession) { - Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey()) - delete(group.rtspSubSessionSet, session) -} +// --------------------------------------------------------------------------------------------------------------------- -func (group *Group) pushIfNeeded() { - // push转推功能没开 - if !group.pushEnable { - return +// disposeInactiveSessions 关闭不活跃的session +// +// TODO(chef): [fix] Push是否需要检查 +// TODO chef: [refactor] 梳理和naza.Connection超时重复部分 +// +func (group *Group) disposeInactiveSessions() { + if group.rtmpPubSession != nil { + if readAlive, _ := group.rtmpPubSession.IsAlive(); !readAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtmpPubSession.UniqueKey()) + group.rtmpPubSession.Dispose() + } } - // 没有pub发布者 - if group.rtmpPubSession == nil && group.rtspPubSession == nil { - return + if group.rtspPubSession != nil { + if readAlive, _ := group.rtspPubSession.IsAlive(); !readAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey()) + group.rtspPubSession.Dispose() + } } - - // relay push时携带rtmp pub的参数 - // TODO chef: 这个逻辑放这里不太好看 - var urlParam string - if group.rtmpPubSession != nil { - urlParam = group.rtmpPubSession.RawQuery() + if group.pullProxy.pullSession != nil { + if readAlive, _ := group.pullProxy.pullSession.IsAlive(); !readAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.pullProxy.pullSession.UniqueKey()) + group.pullProxy.pullSession.Dispose() + } } - - for url, v := range group.url2PushProxy { - // 正在转推中 - if v.isPushing { - continue + for session := range group.rtmpSubSessionSet { + if _, writeAlive := session.IsAlive(); !writeAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) + session.Dispose() } - v.isPushing = true - - urlWithParam := url - if urlParam != "" { - urlWithParam += "?" + urlParam + } + for session := range group.rtspSubSessionSet { + if _, writeAlive := session.IsAlive(); !writeAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) + session.Dispose() } - Log.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam) - - go func(u, u2 string) { - pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.PushTimeoutMs = relayPushTimeoutMs - option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs - }) - err := pushSession.Push(u2) - if err != nil { - Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) - group.DelRtmpPushSession(u, pushSession) - return - } - group.AddRtmpPushSession(u, pushSession) - err = <-pushSession.WaitChan() - Log.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) - group.DelRtmpPushSession(u, pushSession) - }(url, urlWithParam) + } + for session := range group.httpflvSubSessionSet { + if _, writeAlive := session.IsAlive(); !writeAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) + session.Dispose() + } + } + for session := range group.httptsSubSessionSet { + if _, writeAlive := session.IsAlive(); !writeAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) + session.Dispose() + } + } +} + +// updateAllSessionStat 更新所有session的状态 +// +// TODO(chef): [fix] Push是否需要更新 +// +func (group *Group) updateAllSessionStat() { + if group.rtmpPubSession != nil { + group.rtmpPubSession.UpdateStat(calcSessionStatIntervalSec) + } + if group.rtspPubSession != nil { + group.rtspPubSession.UpdateStat(calcSessionStatIntervalSec) + } + if group.pullProxy.pullSession != nil { + group.pullProxy.pullSession.UpdateStat(calcSessionStatIntervalSec) + } + for session := range group.rtmpSubSessionSet { + session.UpdateStat(calcSessionStatIntervalSec) + } + for session := range group.httpflvSubSessionSet { + session.UpdateStat(calcSessionStatIntervalSec) + } + for session := range group.httptsSubSessionSet { + session.UpdateStat(calcSessionStatIntervalSec) + } + for session := range group.rtspSubSessionSet { + session.UpdateStat(calcSessionStatIntervalSec) } } @@ -650,98 +404,12 @@ func (group *Group) isTotalEmpty() bool { !group.hasPushSession() } -func (group *Group) disposeHlsMuxer() { - if group.hlsMuxer != nil { - group.hlsMuxer.Dispose() - - group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath()) - - group.hlsMuxer = nil +func (group *Group) inSessionUniqueKey() string { + if group.rtmpPubSession != nil { + return group.rtmpPubSession.UniqueKey() } -} - -// ----- relay pull ---------------------------------------------------------------------------------------------------- - -func (group *Group) isPullEnable() bool { - return group.pullEnable -} - -func (group *Group) setPullUrl(enable bool, url string) { - group.pullEnable = enable - group.pullUrl = url -} - -func (group *Group) getPullUrl() string { - return group.pullUrl -} - -func (group *Group) setPullingFlag(flag bool) { - group.pullProxy.isPulling = flag -} - -func (group *Group) getPullingFlag() bool { - return group.pullProxy.isPulling -} - -// 判断是否需要pull从远端拉流至本地,如果需要,则触发pull -// -// 当前调用时机: -// 1. 添加新sub session -// 2. 外部命令,比如http api -// 3. 定时器,比如pull的连接断了,通过定时器可以重启触发pull -// -func (group *Group) pullIfNeeded() { - if !group.isPullEnable() { - return - } - // 如果没有从本地拉流的,就不需要pull了 - if !group.hasOutSession() { - return - } - // 如果本地已经有输入型的流,就不需要pull了 - if group.hasInSession() { - return - } - // 已经在pull中,就不需要pull了 - if group.getPullingFlag() { - return - } - group.setPullingFlag(true) - - Log.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.getPullUrl()) - - go func() { - pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { - option.PullTimeoutMs = relayPullTimeoutMs - option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs - }) - // TODO(chef): 处理数据回调,是否应该等待Add成功之后。避免竞态条件中途加入了其他in session - err := pullSession.Pull(group.getPullUrl(), group.OnReadRtmpAvMsg) - if err != nil { - Log.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err) - group.DelRtmpPullSession(pullSession) - return - } - res := group.AddRtmpPullSession(pullSession) - if res { - err = <-pullSession.WaitChan() - Log.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err) - group.DelRtmpPullSession(pullSession) - } else { - pullSession.Dispose() - } - }() -} - -// 判断是否需要停止pull -// -// 当前调用时机: -// 1. 定时器定时检查 -// -func (group *Group) stopPullIfNeeded() { - // 没有输出型的流了 - if group.pullProxy.pullSession != nil && !group.hasOutSession() { - Log.Infof("[%s] stop pull since no sub session.", group.UniqueKey) - group.pullProxy.pullSession.Dispose() + if group.rtspPubSession != nil { + return group.rtspPubSession.UniqueKey() } + return "" } diff --git a/pkg/logic/group__streaming.go b/pkg/logic/group__core_streaming.go similarity index 100% rename from pkg/logic/group__streaming.go rename to pkg/logic/group__core_streaming.go diff --git a/pkg/logic/group__s.go b/pkg/logic/group__in.go similarity index 56% rename from pkg/logic/group__s.go rename to pkg/logic/group__in.go index 3d55e17..a0cebf4 100644 --- a/pkg/logic/group__s.go +++ b/pkg/logic/group__in.go @@ -9,14 +9,9 @@ package logic import ( - "fmt" - "path/filepath" "time" "github.com/q191201771/lal/pkg/base" - "github.com/q191201771/lal/pkg/hls" - "github.com/q191201771/lal/pkg/httpflv" - "github.com/q191201771/lal/pkg/mpegts" "github.com/q191201771/lal/pkg/remux" "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/lal/pkg/rtsp" @@ -139,9 +134,6 @@ func (group *Group) delRtspPubSession(session *rtsp.PubSession) { return } - _ = group.rtspPubSession.Dispose() - group.rtspPubSession = nil - group.rtsp2RtmpRemuxer = nil group.delIn() } @@ -158,65 +150,21 @@ func (group *Group) delRtmpPullSession(session *rtmp.PullSession) { // addIn 有pub或pull的输入型session加入时,需要调用该函数 // func (group *Group) addIn() { - // 是否push转推 - group.pushIfNeeded() - - // 是否启动hls - if group.config.HlsConfig.Enable { - if group.hlsMuxer != nil { - Log.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer) - } - enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps - group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group) - group.hlsMuxer.Start() - } - now := time.Now().Unix() - // 是否录制成flv文件 + group.startPushIfNeeded() + group.startHlsIfNeeded() group.startRecordFlvIfNeeded(now) - - // 是否录制成ts文件 - group.startRecordTsIfNeeded(now) + group.startRecordMpegtsIfNeeded(now) } // delIn 有pub或pull的输入型session离开时,需要调用该函数 // func (group *Group) delIn() { - // 停止hls - if group.config.HlsConfig.Enable && group.hlsMuxer != nil { - group.disposeHlsMuxer() - } - - // 停止转推 - if group.pushEnable { - for _, v := range group.url2PushProxy { - if v.pushSession != nil { - v.pushSession.Dispose() - } - v.pushSession = nil - } - } - - // 停止flv录制 - if group.config.RecordConfig.EnableFlv { - if group.recordFlv != nil { - if err := group.recordFlv.Dispose(); err != nil { - Log.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err) - } - group.recordFlv = nil - } - } - - // 停止ts录制 - if group.config.RecordConfig.EnableMpegts { - if group.recordMpegts != nil { - if err := group.recordMpegts.Dispose(); err != nil { - Log.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err) - } - group.recordMpegts = nil - } - } + group.stopPushIfNeeded() + group.stopHlsIfNeeded() + group.stopRecordFlvIfNeeded() + group.stopRecordMpegtsIfNeeded() group.rtmpPubSession = nil group.rtspPubSession = nil @@ -228,70 +176,3 @@ func (group *Group) delIn() { group.patpmt = nil group.sdpCtx = nil } - -// --------------------------------------------------------------------------------------------------------------------- - -// startRecordFlvIfNeeded 是否开启flv录制 -// -func (group *Group) startRecordFlvIfNeeded(nowUnix int64) { - if !group.config.RecordConfig.EnableFlv { - return - } - - // 构造文件名 - filename := fmt.Sprintf("%s-%d.flv", group.streamName, nowUnix) - filenameWithPath := filepath.Join(group.config.RecordConfig.FlvOutPath, filename) - // 如果已经在录制,则先关闭 - // TODO(chef): 正常的逻辑是否会走到这? - if group.recordFlv != nil { - Log.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s", - group.UniqueKey, filenameWithPath, group.recordFlv.Name()) - _ = group.recordFlv.Dispose() - } - // 初始化录制 - group.recordFlv = &httpflv.FlvFileWriter{} - if err := group.recordFlv.Open(filenameWithPath); err != nil { - Log.Errorf("[%s] record flv open file failed. filename=%s, err=%+v", - group.UniqueKey, filenameWithPath, err) - group.recordFlv = nil - } - if err := group.recordFlv.WriteFlvHeader(); err != nil { - Log.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v", - group.UniqueKey, filenameWithPath, err) - group.recordFlv = nil - } -} - -func (group *Group) startRecordTsIfNeeded(nowUnix int64) { - if !group.config.RecordConfig.EnableMpegts { - return - } - - // 构造文件名 - filename := fmt.Sprintf("%s-%d.ts", group.streamName, nowUnix) - filenameWithPath := filepath.Join(group.config.RecordConfig.MpegtsOutPath, filename) - // 如果已经在录制,则先关闭 - if group.recordMpegts != nil { - Log.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s", - group.UniqueKey, filenameWithPath, group.recordMpegts.Name()) - _ = group.recordMpegts.Dispose() - } - group.recordMpegts = &mpegts.FileWriter{} - if err := group.recordMpegts.Create(filenameWithPath); err != nil { - Log.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v", - group.UniqueKey, filenameWithPath, err) - group.recordMpegts = nil - } -} - -// --------------------------------------------------------------------------------------------------------------------- - -func (group *Group) inSessionUniqueKey() string { - if group.rtmpPubSession != nil { - return group.rtmpPubSession.UniqueKey() - } - if group.rtspPubSession != nil { - return group.rtspPubSession.UniqueKey() - } - return "" -} diff --git a/pkg/logic/group__out_sub.go b/pkg/logic/group__out_sub.go new file mode 100644 index 0000000..a8875aa --- /dev/null +++ b/pkg/logic/group__out_sub.go @@ -0,0 +1,135 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/httpts" + "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/lal/pkg/rtsp" +) + +func (group *Group) AddRtmpSubSession(session *rtmp.ServerSession) { + Log.Debugf("[%s] [%s] add SubSession into group.", group.UniqueKey, session.UniqueKey()) + group.mutex.Lock() + defer group.mutex.Unlock() + group.rtmpSubSessionSet[session] = struct{}{} + // 加入时,如果上行还没有推过视频(比如还没推流,或者是单音频流),就不需要等待关键帧了 + // 也即我们假定上行肯定是以关键帧为开始进行视频发送,假设不是,那么我们按上行的流正常发,而不过滤掉关键帧前面的不包含关键帧的非完整GOP + // TODO(chef): + // 1. 需要仔细考虑单音频无视频的流的情况 + // 2. 这里不修改标志,让这个session继续等关键帧也可以 + if group.stat.VideoCodec == "" { + session.ShouldWaitVideoKeyFrame = false + } + + group.pullIfNeeded() +} + +func (group *Group) AddHttpflvSubSession(session *httpflv.SubSession) { + Log.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey()) + session.WriteHttpResponseHeader() + session.WriteFlvHeader() + + group.mutex.Lock() + defer group.mutex.Unlock() + group.httpflvSubSessionSet[session] = struct{}{} + // 加入时,如果上行还没有推流过,就不需要等待关键帧了 + if group.stat.VideoCodec == "" { + session.ShouldWaitVideoKeyFrame = false + } + + group.pullIfNeeded() +} + +// AddHttptsSubSession TODO chef: +// 这里应该也要考虑触发hls muxer开启 +// 也即HTTPTS sub需要使用hls muxer,hls muxer开启和关闭都要考虑HTTPTS sub +func (group *Group) AddHttptsSubSession(session *httpts.SubSession) { + Log.Debugf("[%s] [%s] add httpts SubSession into group.", group.UniqueKey, session.UniqueKey()) + session.WriteHttpResponseHeader() + + group.mutex.Lock() + defer group.mutex.Unlock() + group.httptsSubSessionSet[session] = struct{}{} + + group.pullIfNeeded() +} + +func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) { + group.mutex.Lock() + defer group.mutex.Unlock() + // TODO(chef): 应该有等待机制,而不是直接关闭 + if group.sdpCtx == nil { + Log.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]", + group.UniqueKey, session.UniqueKey()) + return false, nil + } + + return true, group.sdpCtx.RawSdp +} + +func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) { + Log.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey()) + + group.mutex.Lock() + defer group.mutex.Unlock() + group.rtspSubSessionSet[session] = struct{}{} + if group.stat.VideoCodec == "" { + session.ShouldWaitVideoKeyFrame = false + } + + // TODO(chef): rtsp sub也应该判断是否需要静态pull回源 +} + +func (group *Group) DelRtmpSubSession(session *rtmp.ServerSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delRtmpSubSession(session) +} + +func (group *Group) DelHttpflvSubSession(session *httpflv.SubSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delHttpflvSubSession(session) +} + +func (group *Group) DelHttptsSubSession(session *httpts.SubSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delHttptsSubSession(session) +} + +func (group *Group) DelRtspSubSession(session *rtsp.SubSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delRtspSubSession(session) +} + +// --------------------------------------------------------------------------------------------------------------------- + +func (group *Group) delRtmpSubSession(session *rtmp.ServerSession) { + Log.Debugf("[%s] [%s] del rtmp SubSession from group.", group.UniqueKey, session.UniqueKey()) + delete(group.rtmpSubSessionSet, session) +} + +func (group *Group) delHttpflvSubSession(session *httpflv.SubSession) { + Log.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey()) + delete(group.httpflvSubSessionSet, session) +} + +func (group *Group) delHttptsSubSession(session *httpts.SubSession) { + Log.Debugf("[%s] [%s] del httpts SubSession from group.", group.UniqueKey, session.UniqueKey()) + delete(group.httptsSubSessionSet, session) +} + +func (group *Group) delRtspSubSession(session *rtsp.SubSession) { + Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey()) + delete(group.rtspSubSessionSet, session) +} diff --git a/pkg/logic/group__record_flv.go b/pkg/logic/group__record_flv.go new file mode 100644 index 0000000..8d615e4 --- /dev/null +++ b/pkg/logic/group__record_flv.go @@ -0,0 +1,57 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "fmt" + "path/filepath" + + "github.com/q191201771/lal/pkg/httpflv" +) + +// startRecordFlvIfNeeded 必要时开启flv录制 +// +func (group *Group) startRecordFlvIfNeeded(nowUnix int64) { + if !group.config.RecordConfig.EnableFlv { + return + } + + // 构造文件名 + filename := fmt.Sprintf("%s-%d.flv", group.streamName, nowUnix) + filenameWithPath := filepath.Join(group.config.RecordConfig.FlvOutPath, filename) + // 如果已经在录制,则先关闭 + // TODO(chef): 正常的逻辑是否会走到这? + if group.recordFlv != nil { + Log.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s", + group.UniqueKey, filenameWithPath, group.recordFlv.Name()) + _ = group.recordFlv.Dispose() + } + // 初始化录制 + group.recordFlv = &httpflv.FlvFileWriter{} + if err := group.recordFlv.Open(filenameWithPath); err != nil { + Log.Errorf("[%s] record flv open file failed. filename=%s, err=%+v", + group.UniqueKey, filenameWithPath, err) + group.recordFlv = nil + } + if err := group.recordFlv.WriteFlvHeader(); err != nil { + Log.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v", + group.UniqueKey, filenameWithPath, err) + group.recordFlv = nil + } +} + +func (group *Group) stopRecordFlvIfNeeded() { + if !group.config.RecordConfig.EnableFlv { + return + } + if group.recordFlv != nil { + _ = group.recordFlv.Dispose() + group.recordFlv = nil + } +} diff --git a/pkg/logic/group__record_hls.go b/pkg/logic/group__record_hls.go new file mode 100644 index 0000000..f8015d2 --- /dev/null +++ b/pkg/logic/group__record_hls.go @@ -0,0 +1,43 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import "github.com/q191201771/lal/pkg/hls" + +func (group *Group) IsHlsMuxerAlive() bool { + group.mutex.Lock() + defer group.mutex.Unlock() + return group.hlsMuxer != nil +} + +// startHlsIfNeeded 必要时启动hls +// +func (group *Group) startHlsIfNeeded() { + // TODO(chef): [refactor] ts依赖hls + if !group.config.HlsConfig.Enable { + return + } + if group.hlsMuxer != nil { + Log.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer) + } + enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps + group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group) + group.hlsMuxer.Start() +} + +func (group *Group) stopHlsIfNeeded() { + if !group.config.HlsConfig.Enable { + return + } + if group.hlsMuxer != nil { + group.hlsMuxer.Dispose() + group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath()) + group.hlsMuxer = nil + } +} diff --git a/pkg/logic/group__record_mpegts.go b/pkg/logic/group__record_mpegts.go new file mode 100644 index 0000000..ccc3e76 --- /dev/null +++ b/pkg/logic/group__record_mpegts.go @@ -0,0 +1,50 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "fmt" + "path/filepath" + + "github.com/q191201771/lal/pkg/mpegts" +) + +// startRecordMpegtsIfNeeded 必要时开启ts录制 +// +func (group *Group) startRecordMpegtsIfNeeded(nowUnix int64) { + if !group.config.RecordConfig.EnableMpegts { + return + } + + // 构造文件名 + filename := fmt.Sprintf("%s-%d.ts", group.streamName, nowUnix) + filenameWithPath := filepath.Join(group.config.RecordConfig.MpegtsOutPath, filename) + // 如果已经在录制,则先关闭 + if group.recordMpegts != nil { + Log.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s", + group.UniqueKey, filenameWithPath, group.recordMpegts.Name()) + _ = group.recordMpegts.Dispose() + } + group.recordMpegts = &mpegts.FileWriter{} + if err := group.recordMpegts.Create(filenameWithPath); err != nil { + Log.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v", + group.UniqueKey, filenameWithPath, err) + group.recordMpegts = nil + } +} + +func (group *Group) stopRecordMpegtsIfNeeded() { + if !group.config.RecordConfig.EnableMpegts { + return + } + if group.recordMpegts != nil { + _ = group.recordMpegts.Dispose() + group.recordMpegts = nil + } +} diff --git a/pkg/logic/group__relay_pull.go b/pkg/logic/group__relay_pull.go new file mode 100644 index 0000000..d785ba7 --- /dev/null +++ b/pkg/logic/group__relay_pull.go @@ -0,0 +1,133 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "fmt" + + "github.com/q191201771/lal/pkg/rtmp" +) + +// StartPull 外部命令主动触发pull拉流 +// +// 当前调用时机: +// 1. 比如http api +// +func (group *Group) StartPull(url string) { + group.mutex.Lock() + defer group.mutex.Unlock() + + group.setPullUrl(true, url) + group.pullIfNeeded() +} + +// --------------------------------------------------------------------------------------------------------------------- + +type pullProxy struct { + isPulling bool + pullSession *rtmp.PullSession +} + +func (group *Group) initRelayPull() { + enable := group.config.RelayPullConfig.Enable + addr := group.config.RelayPullConfig.Addr + appName := group.appName + streamName := group.streamName + + // 根据配置文件中的静态回源配置来初始化回源设置 + var pullUrl string + if enable { + pullUrl = fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName) + } + group.setPullUrl(enable, pullUrl) +} + +func (group *Group) isPullEnable() bool { + return group.pullEnable +} + +func (group *Group) setPullUrl(enable bool, url string) { + group.pullEnable = enable + group.pullUrl = url +} + +func (group *Group) getPullUrl() string { + return group.pullUrl +} + +func (group *Group) setPullingFlag(flag bool) { + group.pullProxy.isPulling = flag +} + +func (group *Group) getPullingFlag() bool { + return group.pullProxy.isPulling +} + +// 判断是否需要pull从远端拉流至本地,如果需要,则触发pull +// +// 当前调用时机: +// 1. 添加新sub session +// 2. 外部命令,比如http api +// 3. 定时器,比如pull的连接断了,通过定时器可以重启触发pull +// +func (group *Group) pullIfNeeded() { + if !group.isPullEnable() { + return + } + // 如果没有从本地拉流的,就不需要pull了 + if !group.hasOutSession() { + return + } + // 如果本地已经有输入型的流,就不需要pull了 + if group.hasInSession() { + return + } + // 已经在pull中,就不需要pull了 + if group.getPullingFlag() { + return + } + group.setPullingFlag(true) + + Log.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.getPullUrl()) + + go func() { + pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { + option.PullTimeoutMs = relayPullTimeoutMs + option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs + }) + // TODO(chef): 处理数据回调,是否应该等待Add成功之后。避免竞态条件中途加入了其他in session + err := pullSession.Pull(group.getPullUrl(), group.OnReadRtmpAvMsg) + if err != nil { + Log.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err) + group.DelRtmpPullSession(pullSession) + return + } + res := group.AddRtmpPullSession(pullSession) + if res { + err = <-pullSession.WaitChan() + Log.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err) + group.DelRtmpPullSession(pullSession) + } else { + pullSession.Dispose() + } + }() +} + +// 判断是否需要停止pull +// +// 当前调用时机: +// 1. 定时器定时检查 +// +func (group *Group) stopPullIfNeeded() { + // 没有输出型的流了 + if group.pullProxy.pullSession != nil && !group.hasOutSession() { + Log.Infof("[%s] stop pull since no sub session.", group.UniqueKey) + group.pullProxy.pullSession.Dispose() + } +} diff --git a/pkg/logic/group__relay_push.go b/pkg/logic/group__relay_push.go new file mode 100644 index 0000000..40c5f37 --- /dev/null +++ b/pkg/logic/group__relay_push.go @@ -0,0 +1,123 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "fmt" + + "github.com/q191201771/lal/pkg/rtmp" +) + +func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) { + Log.Debugf("[%s] [%s] add rtmp PushSession into group.", group.UniqueKey, session.UniqueKey()) + group.mutex.Lock() + defer group.mutex.Unlock() + if group.url2PushProxy != nil { + group.url2PushProxy[url].pushSession = session + } +} + +func (group *Group) DelRtmpPushSession(url string, session *rtmp.PushSession) { + Log.Debugf("[%s] [%s] del rtmp PushSession into group.", group.UniqueKey, session.UniqueKey()) + group.mutex.Lock() + defer group.mutex.Unlock() + if group.url2PushProxy != nil { + group.url2PushProxy[url].pushSession = nil + group.url2PushProxy[url].isPushing = false + } +} + +type pushProxy struct { + isPushing bool + pushSession *rtmp.PushSession +} + +func (group *Group) initRelayPush() { + enable := group.config.RelayPushConfig.Enable + addrList := group.config.RelayPushConfig.AddrList + appName := group.appName + streamName := group.streamName + + url2PushProxy := make(map[string]*pushProxy) + if enable { + for _, addr := range addrList { + pushUrl := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName) + url2PushProxy[pushUrl] = &pushProxy{ + isPushing: false, + pushSession: nil, + } + } + } + + group.pushEnable = group.config.RelayPushConfig.Enable + group.url2PushProxy = url2PushProxy +} + +// startPushIfNeeded 必要时进行replay push转推 +// +func (group *Group) startPushIfNeeded() { + // push转推功能没开 + if !group.pushEnable { + return + } + // 没有pub发布者 + if group.rtmpPubSession == nil && group.rtspPubSession == nil { + return + } + + // relay push时携带rtmp pub的参数 + // TODO chef: 这个逻辑放这里不太好看 + var urlParam string + if group.rtmpPubSession != nil { + urlParam = group.rtmpPubSession.RawQuery() + } + + for url, v := range group.url2PushProxy { + // 正在转推中 + if v.isPushing { + continue + } + v.isPushing = true + + urlWithParam := url + if urlParam != "" { + urlWithParam += "?" + urlParam + } + Log.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam) + + go func(u, u2 string) { + pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { + option.PushTimeoutMs = relayPushTimeoutMs + option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs + }) + err := pushSession.Push(u2) + if err != nil { + Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) + group.DelRtmpPushSession(u, pushSession) + return + } + group.AddRtmpPushSession(u, pushSession) + err = <-pushSession.WaitChan() + Log.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) + group.DelRtmpPushSession(u, pushSession) + }(url, urlWithParam) + } +} + +func (group *Group) stopPushIfNeeded() { + if !group.pushEnable { + return + } + for _, v := range group.url2PushProxy { + if v.pushSession != nil { + v.pushSession.Dispose() + } + v.pushSession = nil + } +} diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 4c5c0cc..ab45976 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -96,8 +96,7 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager { } if sm.config.RtmpConfig.Enable { - // TODO(chef): refactor 参数顺序统一。Observer都放最后好一些。比如rtmp和rtsp的NewServer - sm.rtmpServer = rtmp.NewServer(sm, sm.config.RtmpConfig.Addr) + sm.rtmpServer = rtmp.NewServer(sm.config.RtmpConfig.Addr, sm) } if sm.config.RtspConfig.Enable { sm.rtspServer = rtsp.NewServer(sm.config.RtspConfig.Addr, sm) @@ -222,13 +221,13 @@ func (sm *ServerManager) RunLoop() error { t := time.NewTicker(1 * time.Second) defer t.Stop() - var count uint32 + var tickCount uint32 for { select { case <-sm.exitChan: return nil case <-t.C: - count++ + tickCount++ sm.mutex.Lock() @@ -240,13 +239,13 @@ func (sm *ServerManager) RunLoop() error { return false } - group.Tick() + group.Tick(tickCount) return true }) // 定时打印一些group相关的debug日志 if sm.config.DebugConfig.LogGroupIntervalSec > 0 && - count%uint32(sm.config.DebugConfig.LogGroupIntervalSec) == 0 { + tickCount%uint32(sm.config.DebugConfig.LogGroupIntervalSec) == 0 { groupNum := sm.groupManager.Len() Log.Debugf("DEBUG_GROUP_LOG: group size=%d", groupNum) if sm.config.DebugConfig.LogGroupMaxGroupNum > 0 { @@ -264,7 +263,7 @@ func (sm *ServerManager) RunLoop() error { sm.mutex.Unlock() // 定时通过http notify发送group相关的信息 - if uis != 0 && (count%uis) == 0 { + if uis != 0 && (tickCount%uis) == 0 { updateInfo.ServerId = sm.config.ServerId updateInfo.Groups = sm.StatAllGroup() sm.option.NotifyHandler.OnUpdate(updateInfo) @@ -653,7 +652,6 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) error { } func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) { - // TODO chef: impl me sm.mutex.Lock() defer sm.mutex.Unlock() group := sm.getGroup(session.AppName(), session.StreamName()) @@ -815,7 +813,3 @@ func (sm *ServerManager) serveHls(writer http.ResponseWriter, req *http.Request) sm.hlsServerHandler.ServeHTTP(writer, req) } - -func (sm *ServerManager) runWebPprof(addr string) { - -} diff --git a/pkg/logic/var.go b/pkg/logic/var.go index 811dfae..a5ad96b 100644 --- a/pkg/logic/var.go +++ b/pkg/logic/var.go @@ -13,10 +13,13 @@ import "github.com/q191201771/naza/pkg/nazalog" var Log = nazalog.GetGlobalLogger() var ( - relayPushTimeoutMs = 5000 - relayPushWriteAvTimeoutMs = 5000 - relayPullTimeoutMs = 5000 - relayPullReadAvTimeoutMs = 5000 + relayPushTimeoutMs = 5000 + relayPushWriteAvTimeoutMs = 5000 + relayPullTimeoutMs = 5000 + relayPullReadAvTimeoutMs = 5000 + + // calcSessionStatIntervalSec 计算所有session收发码率的时间间隔 + // calcSessionStatIntervalSec uint32 = 5 // checkSessionAliveIntervalSec diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go index 3d2c366..9785621 100644 --- a/pkg/rtmp/server.go +++ b/pkg/rtmp/server.go @@ -34,15 +34,15 @@ type ServerObserver interface { } type Server struct { - observer ServerObserver addr string + observer ServerObserver ln net.Listener } -func NewServer(observer ServerObserver, addr string) *Server { +func NewServer(addr string, observer ServerObserver) *Server { return &Server{ - observer: observer, addr: addr, + observer: observer, } }