1. [refactor] 重构logic.Group 2. [test] 修复CI环境的innertest

pull/134/head
q191201771 3 years ago
parent c3314bc7d2
commit 8c0d272fad

@ -13,6 +13,21 @@ import (
"strings"
)
// group中session Dispose表现记录
//
// Dispose结束后回调OnDel:
// rtmp.ServerSession(包含pub和sub) 1
// rtsp.PubSession和rtsp.SubSession 1
// rtmp.PullSession 2
// httpflv.SubSession 3
// httpts.SubSession 3
//
//
// 情况1: 协议正常走完回调OnAdd在自身server的RunLoop结束后回调OnDel
// 情况2: 在group中pull阻塞结束后手动回调OnDel
// 情况3: 在logic中sub RunLoop结束后手动回调OnDel
//
// IsUseClosedConnectionError 当connection处于这些情况时就不需要再Close了
// TODO(chef): 临时放这
// TODO(chef): 目前暂时没有使用因为connection支持多次调用Close
@ -81,7 +96,9 @@ type IClientSessionLifecycle interface {
type IServerSessionLifecycle interface {
// RunLoop 开启session的事件循环阻塞直到session结束
//
RunLoop() error
// 注意rtsp的 pub和sub没有RunLoopRunLoop是在cmd session上所以暂时把这个函数从接口去除
//
//RunLoop() error
// Dispose 主动关闭session时调用
//

@ -65,12 +65,10 @@ var (
// IServerSession
var (
_ base.IServerSession = &rtmp.ServerSession{}
_ base.IServerSession = &rtsp.PubSession{}
_ base.IServerSession = &rtsp.SubSession{}
_ base.IServerSession = &httpflv.SubSession{}
_ base.IServerSession = &httpts.SubSession{}
// 这两个比较特殊它们没有RunLoop函数RunLoop在rtsp.ServerCommandSession上
//_ base.IServerSession = &rtsp.PubSession{}
//_ base.IServerSession = &rtsp.SubSession{}
)
// IClientSessionLifecycle: 所有Client Session都满足
@ -91,12 +89,11 @@ var (
var (
// server session
_ base.IServerSessionLifecycle = &rtmp.ServerSession{}
_ base.IServerSessionLifecycle = &rtsp.PubSession{}
_ base.IServerSessionLifecycle = &rtsp.SubSession{}
_ base.IServerSessionLifecycle = &httpflv.SubSession{}
_ base.IServerSessionLifecycle = &httpts.SubSession{}
// 这两个比较特殊它们没有RunLoop函数RunLoop在rtsp.ServerCommandSession上
//_ base.IServerSessionLifecycle = &rtsp.PubSession{}
//_ base.IServerSessionLifecycle = &rtsp.SubSession{}
// other
_ base.IServerSessionLifecycle = &base.HttpSubSession{}
_ base.IServerSessionLifecycle = &rtsp.ServerCommandSession{}

@ -10,14 +10,15 @@ package innertest
import (
"fmt"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/naza/pkg/nazalog"
"io/ioutil"
"net/http"
"os"
"testing"
"time"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/naza/pkg/filebatch"
@ -109,11 +110,11 @@ func Entry(tt *testing.T) {
mode = 0
entry()
//mode = 1
//entry()
//
//mode = 2
//entry()
mode = 1
entry()
mode = 2
entry()
}
func entry() {
@ -130,7 +131,7 @@ func entry() {
rtmpPullTagCount.Store(0)
httptsSize.Store(0)
hls.Clock = mock.NewFakeClock()
hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.Local))
hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.UTC))
httpts.SubSessionWriteChanSize = 0
var err error
@ -388,8 +389,8 @@ func getHttpts() ([]byte, error) {
defer resp.Body.Close()
var buf nazabytes.Buffer
buf.ReserveBytes(goldenHttptsLenList[mode])
for {
buf.ReserveBytes(10000)
n, err := resp.Body.Read(buf.WritableBytes())
if n > 0 {
buf.Flush(n)
@ -464,17 +465,17 @@ var goldenPlaylistM3u8List = []string{
#EXT-X-MEDIA-SEQUENCE:2
#EXTINF:3.333,
innertest-1642346665000-2.ts
innertest-1642375465000-2.ts
#EXTINF:4.000,
innertest-1642346665000-3.ts
innertest-1642375465000-3.ts
#EXTINF:4.867,
innertest-1642346665000-4.ts
innertest-1642375465000-4.ts
#EXTINF:3.133,
innertest-1642346665000-5.ts
innertest-1642375465000-5.ts
#EXTINF:4.000,
innertest-1642346665000-6.ts
innertest-1642375465000-6.ts
#EXTINF:2.621,
innertest-1642346665000-7.ts
innertest-1642375465000-7.ts
#EXT-X-ENDLIST
`,
`#EXTM3U
@ -484,17 +485,17 @@ innertest-1642346665000-7.ts
#EXT-X-MEDIA-SEQUENCE:4
#EXTINF:3.088,
innertest-1642346665000-4.ts
innertest-1642375465000-4.ts
#EXTINF:3.088,
innertest-1642346665000-5.ts
innertest-1642375465000-5.ts
#EXTINF:3.089,
innertest-1642346665000-6.ts
innertest-1642375465000-6.ts
#EXTINF:3.088,
innertest-1642346665000-7.ts
innertest-1642375465000-7.ts
#EXTINF:3.088,
innertest-1642346665000-8.ts
innertest-1642375465000-8.ts
#EXTINF:2.113,
innertest-1642346665000-9.ts
innertest-1642375465000-9.ts
#EXT-X-ENDLIST
`,
`#EXTM3U
@ -504,17 +505,17 @@ innertest-1642346665000-9.ts
#EXT-X-MEDIA-SEQUENCE:2
#EXTINF:3.333,
innertest-1642346665000-2.ts
innertest-1642375465000-2.ts
#EXTINF:4.000,
innertest-1642346665000-3.ts
innertest-1642375465000-3.ts
#EXTINF:4.867,
innertest-1642346665000-4.ts
innertest-1642375465000-4.ts
#EXTINF:3.133,
innertest-1642346665000-5.ts
innertest-1642375465000-5.ts
#EXTINF:4.000,
innertest-1642346665000-6.ts
innertest-1642375465000-6.ts
#EXTINF:2.600,
innertest-1642346665000-7.ts
innertest-1642375465000-7.ts
#EXT-X-ENDLIST
`,
}
@ -527,21 +528,21 @@ var goldenRecordM3u8List = []string{
#EXT-X-DISCONTINUITY
#EXTINF:4.000,
innertest-1642346665000-0.ts
innertest-1642375465000-0.ts
#EXTINF:4.000,
innertest-1642346665000-1.ts
innertest-1642375465000-1.ts
#EXTINF:3.333,
innertest-1642346665000-2.ts
innertest-1642375465000-2.ts
#EXTINF:4.000,
innertest-1642346665000-3.ts
innertest-1642375465000-3.ts
#EXTINF:4.867,
innertest-1642346665000-4.ts
innertest-1642375465000-4.ts
#EXTINF:3.133,
innertest-1642346665000-5.ts
innertest-1642375465000-5.ts
#EXTINF:4.000,
innertest-1642346665000-6.ts
innertest-1642375465000-6.ts
#EXTINF:2.621,
innertest-1642346665000-7.ts
innertest-1642375465000-7.ts
#EXT-X-ENDLIST
`,
`#EXTM3U
@ -551,25 +552,25 @@ innertest-1642346665000-7.ts
#EXT-X-DISCONTINUITY
#EXTINF:3.088,
innertest-1642346665000-0.ts
innertest-1642375465000-0.ts
#EXTINF:3.088,
innertest-1642346665000-1.ts
innertest-1642375465000-1.ts
#EXTINF:3.089,
innertest-1642346665000-2.ts
innertest-1642375465000-2.ts
#EXTINF:3.088,
innertest-1642346665000-3.ts
innertest-1642375465000-3.ts
#EXTINF:3.088,
innertest-1642346665000-4.ts
innertest-1642375465000-4.ts
#EXTINF:3.088,
innertest-1642346665000-5.ts
innertest-1642375465000-5.ts
#EXTINF:3.089,
innertest-1642346665000-6.ts
innertest-1642375465000-6.ts
#EXTINF:3.088,
innertest-1642346665000-7.ts
innertest-1642375465000-7.ts
#EXTINF:3.088,
innertest-1642346665000-8.ts
innertest-1642375465000-8.ts
#EXTINF:2.113,
innertest-1642346665000-9.ts
innertest-1642375465000-9.ts
#EXT-X-ENDLIST
`,
`#EXTM3U
@ -579,21 +580,21 @@ innertest-1642346665000-9.ts
#EXT-X-DISCONTINUITY
#EXTINF:4.000,
innertest-1642346665000-0.ts
innertest-1642375465000-0.ts
#EXTINF:4.000,
innertest-1642346665000-1.ts
innertest-1642375465000-1.ts
#EXTINF:3.333,
innertest-1642346665000-2.ts
innertest-1642375465000-2.ts
#EXTINF:4.000,
innertest-1642346665000-3.ts
innertest-1642375465000-3.ts
#EXTINF:4.867,
innertest-1642346665000-4.ts
innertest-1642375465000-4.ts
#EXTINF:3.133,
innertest-1642346665000-5.ts
innertest-1642375465000-5.ts
#EXTINF:4.000,
innertest-1642346665000-6.ts
innertest-1642375465000-6.ts
#EXTINF:2.600,
innertest-1642346665000-7.ts
innertest-1642375465000-7.ts
#EXT-X-ENDLIST
`,
}

@ -322,6 +322,9 @@ func LoadConfAndInitLog(confFile string) *Config {
return config
}
// ---------------------------------------------------------------------------------------------------------------------
func mergeCommonHttpAddrConfig(dst, src *CommonHttpAddrConfig) {
if dst.HttpListenAddr == "" && src.HttpListenAddr != "" {
dst.HttpListenAddr = src.HttpListenAddr

@ -10,7 +10,6 @@ package logic
import (
"encoding/json"
"fmt"
"strings"
"sync"
@ -74,34 +73,11 @@ type Group struct {
rtmpMergeWriter *base.MergeWriter // TODO(chef): 后面可以在业务层加一个定时Flush
//
stat base.StatGroup
//
tickCount uint32
}
type pullProxy struct {
isPulling bool
pullSession *rtmp.PullSession
}
type pushProxy struct {
isPushing bool
pushSession *rtmp.PushSession
}
func NewGroup(appName string, streamName string, config *Config, observer GroupObserver) *Group {
uk := base.GenUkGroup()
url2PushProxy := make(map[string]*pushProxy) // TODO(chef): 移入Enable里面并进行review+测试
if config.RelayPushConfig.Enable {
for _, addr := range config.RelayPushConfig.AddrList {
pushUrl := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName)
url2PushProxy[pushUrl] = &pushProxy{
isPushing: false,
pushSession: nil,
}
}
}
g := &Group{
UniqueKey: uk,
appName: appName,
@ -118,17 +94,11 @@ func NewGroup(appName string, streamName string, config *Config, observer GroupO
rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}),
rtmpGopCache: remux.NewGopCache("rtmp", uk, config.RtmpConfig.GopNum),
httpflvGopCache: remux.NewGopCache("httpflv", uk, config.HttpflvConfig.GopNum),
pushEnable: config.RelayPushConfig.Enable,
url2PushProxy: url2PushProxy,
pullProxy: &pullProxy{},
}
// 根据配置文件中的静态回源配置来初始化回源设置
var pullUrl string
if config.RelayPullConfig.Enable {
pullUrl = fmt.Sprintf("rtmp://%s/%s/%s", config.RelayPullConfig.Addr, appName, streamName)
}
g.setPullUrl(config.RelayPullConfig.Enable, pullUrl)
g.initRelayPush()
g.initRelayPull()
if config.RtmpConfig.MergeWriteSize > 0 {
g.rtmpMergeWriter = base.NewMergeWriter(g.writev2RtmpSubSessions, config.RtmpConfig.MergeWriteSize)
@ -142,107 +112,30 @@ func (group *Group) RunLoop() {
<-group.exitChan
}
// Tick TODO chef: 传入时间
// 目前每秒触发一次
func (group *Group) Tick() {
// Tick 定时器
//
// @param tickCount 当前时间单位秒。注意不一定是Unix时间戳可以是从0开始+1秒递增的时间
//
func (group *Group) Tick(tickCount uint32) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.stopPullIfNeeded()
group.pullIfNeeded()
// 还有pub推流没在push就触发push
group.pushIfNeeded()
// TODO chef:
// 梳理和naza.Connection超时重复部分
group.startPushIfNeeded()
// TODO(chef): 所有dispose后是否需要做打扫处理(比如设置nil以及主动调del)还是都在后面的del回调函数中统一处理
// 定时关闭没有数据的session
if group.tickCount%checkSessionAliveIntervalSec == 0 {
if group.rtmpPubSession != nil {
if readAlive, _ := group.rtmpPubSession.IsAlive(); !readAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtmpPubSession.UniqueKey())
group.rtmpPubSession.Dispose()
group.rtmp2RtspRemuxer = nil
}
}
if group.rtspPubSession != nil {
if readAlive, _ := group.rtspPubSession.IsAlive(); !readAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey())
group.rtspPubSession.Dispose()
group.rtspPubSession = nil
group.rtsp2RtmpRemuxer = nil
}
}
if group.pullProxy.pullSession != nil {
if readAlive, _ := group.pullProxy.pullSession.IsAlive(); !readAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.pullProxy.pullSession.UniqueKey())
group.pullProxy.pullSession.Dispose()
group.delRtmpPullSession(group.pullProxy.pullSession)
}
}
for session := range group.rtmpSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
group.delRtmpSubSession(session)
}
}
for session := range group.httpflvSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
group.delHttpflvSubSession(session)
}
}
for session := range group.httptsSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
group.delHttptsSubSession(session)
}
}
for session := range group.rtspSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
group.delRtspSubSession(session)
}
}
if tickCount%checkSessionAliveIntervalSec == 0 {
group.disposeInactiveSessions()
}
// 定时计算session bitrate
if group.tickCount%calcSessionStatIntervalSec == 0 {
if group.rtmpPubSession != nil {
group.rtmpPubSession.UpdateStat(calcSessionStatIntervalSec)
}
if group.rtspPubSession != nil {
group.rtspPubSession.UpdateStat(calcSessionStatIntervalSec)
}
if group.pullProxy.pullSession != nil {
group.pullProxy.pullSession.UpdateStat(calcSessionStatIntervalSec)
}
for session := range group.rtmpSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for session := range group.httpflvSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for session := range group.httptsSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for session := range group.rtspSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
if tickCount%calcSessionStatIntervalSec == 0 {
group.updateAllSessionStat()
}
group.tickCount++
}
// Dispose 主动释放所有资源。保证所有资源的生命周期逻辑上都在我们的控制中。降低出bug的几率降低心智负担。
// 注意Dispose后不应再使用这个对象。
// 值得一提如果是从其他协程回调回来的消息在使用Group中的资源前要判断资源是否存在以及可用。
//
// Dispose ...
func (group *Group) Dispose() {
Log.Infof("[%s] lifecycle dispose group.", group.UniqueKey)
group.exitChan <- struct{}{}
@ -252,13 +145,9 @@ func (group *Group) Dispose() {
if group.rtmpPubSession != nil {
group.rtmpPubSession.Dispose()
group.rtmpPubSession = nil
group.rtmp2RtspRemuxer = nil
}
if group.rtspPubSession != nil {
group.rtspPubSession.Dispose()
group.rtspPubSession = nil
group.rtsp2RtmpRemuxer = nil
}
for session := range group.rtmpSubSessionSet {
@ -276,163 +165,11 @@ func (group *Group) Dispose() {
}
group.httptsSubSessionSet = nil
group.disposeHlsMuxer()
if group.pushEnable {
for _, v := range group.url2PushProxy {
if v.pushSession != nil {
v.pushSession.Dispose()
}
}
group.url2PushProxy = nil
}
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) AddRtmpSubSession(session *rtmp.ServerSession) {
Log.Debugf("[%s] [%s] add SubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtmpSubSessionSet[session] = struct{}{}
// 加入时,如果上行还没有推过视频(比如还没推流,或者是单音频流),就不需要等待关键帧了
// 也即我们假定上行肯定是以关键帧为开始进行视频发送假设不是那么我们按上行的流正常发而不过滤掉关键帧前面的不包含关键帧的非完整GOP
// TODO(chef):
// 1. 需要仔细考虑单音频无视频的流的情况
// 2. 这里不修改标志让这个session继续等关键帧也可以
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
group.pullIfNeeded()
}
func (group *Group) AddHttpflvSubSession(session *httpflv.SubSession) {
Log.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey())
session.WriteHttpResponseHeader()
session.WriteFlvHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
group.httpflvSubSessionSet[session] = struct{}{}
// 加入时,如果上行还没有推流过,就不需要等待关键帧了
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
group.pullIfNeeded()
}
// AddHttptsSubSession TODO chef:
// 这里应该也要考虑触发hls muxer开启
// 也即HTTPTS sub需要使用hls muxerhls muxer开启和关闭都要考虑HTTPTS sub
func (group *Group) AddHttptsSubSession(session *httpts.SubSession) {
Log.Debugf("[%s] [%s] add httpts SubSession into group.", group.UniqueKey, session.UniqueKey())
session.WriteHttpResponseHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
group.httptsSubSessionSet[session] = struct{}{}
group.pullIfNeeded()
}
func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) {
group.mutex.Lock()
defer group.mutex.Unlock()
// TODO(chef): 应该有等待机制,而不是直接关闭
if group.sdpCtx == nil {
Log.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]",
group.UniqueKey, session.UniqueKey())
return false, nil
}
return true, group.sdpCtx.RawSdp
}
func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) {
Log.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtspSubSessionSet[session] = struct{}{}
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
// TODO(chef): rtsp sub也应该判断是否需要静态pull回源
}
func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) {
Log.Debugf("[%s] [%s] add rtmp PushSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.url2PushProxy != nil {
group.url2PushProxy[url].pushSession = session
}
}
func (group *Group) DelRtmpSubSession(session *rtmp.ServerSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delRtmpSubSession(session)
}
func (group *Group) DelHttpflvSubSession(session *httpflv.SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delHttpflvSubSession(session)
}
func (group *Group) DelHttptsSubSession(session *httpts.SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delHttptsSubSession(session)
}
func (group *Group) DelRtspSubSession(session *rtsp.SubSession) {
Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
delete(group.rtspSubSessionSet, session)
}
func (group *Group) DelRtmpPushSession(url string, session *rtmp.PushSession) {
Log.Debugf("[%s] [%s] del rtmp PushSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.url2PushProxy != nil {
group.url2PushProxy[url].pushSession = nil
group.url2PushProxy[url].isPushing = false
}
group.delIn()
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) IsTotalEmpty() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.isTotalEmpty()
}
func (group *Group) HasInSession() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.hasInSession()
}
func (group *Group) HasOutSession() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.hasOutSession()
}
func (group *Group) IsHlsMuxerAlive() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.hlsMuxer != nil
}
func (group *Group) StringifyDebugStats(maxsub int) string {
b, _ := json.Marshal(group.GetStat(maxsub))
return string(b)
@ -499,13 +236,11 @@ func (group *Group) KickOutSession(sessionId string) bool {
if strings.HasPrefix(sessionId, base.UkPreRtmpServerSession) {
if group.rtmpPubSession != nil {
group.rtmpPubSession.Dispose()
group.rtmp2RtspRemuxer = nil
return true
}
} else if strings.HasPrefix(sessionId, base.UkPreRtspPubSession) {
if group.rtspPubSession != nil {
group.rtspPubSession.Dispose()
group.rtsp2RtmpRemuxer = nil
return true
}
} else if strings.HasPrefix(sessionId, base.UkPreFlvSubSession) {
@ -524,7 +259,12 @@ func (group *Group) KickOutSession(sessionId string) bool {
}
}
} else if strings.HasPrefix(sessionId, base.UkPreRtspSubSession) {
// TODO chef: impl me
for s := range group.rtspSubSessionSet {
if s.UniqueKey() == sessionId {
s.Dispose()
return true
}
}
} else {
Log.Errorf("[%s] kick out session while session id format invalid. %s", group.UniqueKey, sessionId)
}
@ -532,87 +272,101 @@ func (group *Group) KickOutSession(sessionId string) bool {
return false
}
// StartPull 外部命令主动触发pull拉流
//
// 当前调用时机:
// 1. 比如http api
//
func (group *Group) StartPull(url string) {
func (group *Group) IsTotalEmpty() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
group.setPullUrl(true, url)
group.pullIfNeeded()
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) delRtmpSubSession(session *rtmp.ServerSession) {
Log.Debugf("[%s] [%s] del rtmp SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.rtmpSubSessionSet, session)
return group.isTotalEmpty()
}
func (group *Group) delHttpflvSubSession(session *httpflv.SubSession) {
Log.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.httpflvSubSessionSet, session)
func (group *Group) HasInSession() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.hasInSession()
}
func (group *Group) delHttptsSubSession(session *httpts.SubSession) {
Log.Debugf("[%s] [%s] del httpts SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.httptsSubSessionSet, session)
func (group *Group) HasOutSession() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.hasOutSession()
}
func (group *Group) delRtspSubSession(session *rtsp.SubSession) {
Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.rtspSubSessionSet, session)
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) pushIfNeeded() {
// push转推功能没开
if !group.pushEnable {
return
// disposeInactiveSessions 关闭不活跃的session
//
// TODO(chef): [fix] Push是否需要检查
// TODO chef: [refactor] 梳理和naza.Connection超时重复部分
//
func (group *Group) disposeInactiveSessions() {
if group.rtmpPubSession != nil {
if readAlive, _ := group.rtmpPubSession.IsAlive(); !readAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtmpPubSession.UniqueKey())
group.rtmpPubSession.Dispose()
}
}
// 没有pub发布者
if group.rtmpPubSession == nil && group.rtspPubSession == nil {
return
if group.rtspPubSession != nil {
if readAlive, _ := group.rtspPubSession.IsAlive(); !readAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey())
group.rtspPubSession.Dispose()
}
}
// relay push时携带rtmp pub的参数
// TODO chef: 这个逻辑放这里不太好看
var urlParam string
if group.rtmpPubSession != nil {
urlParam = group.rtmpPubSession.RawQuery()
if group.pullProxy.pullSession != nil {
if readAlive, _ := group.pullProxy.pullSession.IsAlive(); !readAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.pullProxy.pullSession.UniqueKey())
group.pullProxy.pullSession.Dispose()
}
}
for url, v := range group.url2PushProxy {
// 正在转推中
if v.isPushing {
continue
for session := range group.rtmpSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
}
v.isPushing = true
urlWithParam := url
if urlParam != "" {
urlWithParam += "?" + urlParam
}
for session := range group.rtspSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
}
Log.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam)
go func(u, u2 string) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = relayPushTimeoutMs
option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs
})
err := pushSession.Push(u2)
if err != nil {
Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
return
}
group.AddRtmpPushSession(u, pushSession)
err = <-pushSession.WaitChan()
Log.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
}(url, urlWithParam)
}
for session := range group.httpflvSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
}
}
for session := range group.httptsSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
}
}
}
// updateAllSessionStat 更新所有session的状态
//
// TODO(chef): [fix] Push是否需要更新
//
func (group *Group) updateAllSessionStat() {
if group.rtmpPubSession != nil {
group.rtmpPubSession.UpdateStat(calcSessionStatIntervalSec)
}
if group.rtspPubSession != nil {
group.rtspPubSession.UpdateStat(calcSessionStatIntervalSec)
}
if group.pullProxy.pullSession != nil {
group.pullProxy.pullSession.UpdateStat(calcSessionStatIntervalSec)
}
for session := range group.rtmpSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for session := range group.httpflvSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for session := range group.httptsSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for session := range group.rtspSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
}
@ -650,98 +404,12 @@ func (group *Group) isTotalEmpty() bool {
!group.hasPushSession()
}
func (group *Group) disposeHlsMuxer() {
if group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath())
group.hlsMuxer = nil
func (group *Group) inSessionUniqueKey() string {
if group.rtmpPubSession != nil {
return group.rtmpPubSession.UniqueKey()
}
}
// ----- relay pull ----------------------------------------------------------------------------------------------------
func (group *Group) isPullEnable() bool {
return group.pullEnable
}
func (group *Group) setPullUrl(enable bool, url string) {
group.pullEnable = enable
group.pullUrl = url
}
func (group *Group) getPullUrl() string {
return group.pullUrl
}
func (group *Group) setPullingFlag(flag bool) {
group.pullProxy.isPulling = flag
}
func (group *Group) getPullingFlag() bool {
return group.pullProxy.isPulling
}
// 判断是否需要pull从远端拉流至本地如果需要则触发pull
//
// 当前调用时机:
// 1. 添加新sub session
// 2. 外部命令比如http api
// 3. 定时器比如pull的连接断了通过定时器可以重启触发pull
//
func (group *Group) pullIfNeeded() {
if !group.isPullEnable() {
return
}
// 如果没有从本地拉流的就不需要pull了
if !group.hasOutSession() {
return
}
// 如果本地已经有输入型的流就不需要pull了
if group.hasInSession() {
return
}
// 已经在pull中就不需要pull了
if group.getPullingFlag() {
return
}
group.setPullingFlag(true)
Log.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.getPullUrl())
go func() {
pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = relayPullTimeoutMs
option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs
})
// TODO(chef): 处理数据回调是否应该等待Add成功之后。避免竞态条件中途加入了其他in session
err := pullSession.Pull(group.getPullUrl(), group.OnReadRtmpAvMsg)
if err != nil {
Log.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
return
}
res := group.AddRtmpPullSession(pullSession)
if res {
err = <-pullSession.WaitChan()
Log.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
} else {
pullSession.Dispose()
}
}()
}
// 判断是否需要停止pull
//
// 当前调用时机:
// 1. 定时器定时检查
//
func (group *Group) stopPullIfNeeded() {
// 没有输出型的流了
if group.pullProxy.pullSession != nil && !group.hasOutSession() {
Log.Infof("[%s] stop pull since no sub session.", group.UniqueKey)
group.pullProxy.pullSession.Dispose()
if group.rtspPubSession != nil {
return group.rtspPubSession.UniqueKey()
}
return ""
}

@ -9,14 +9,9 @@
package logic
import (
"fmt"
"path/filepath"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp"
@ -139,9 +134,6 @@ func (group *Group) delRtspPubSession(session *rtsp.PubSession) {
return
}
_ = group.rtspPubSession.Dispose()
group.rtspPubSession = nil
group.rtsp2RtmpRemuxer = nil
group.delIn()
}
@ -158,65 +150,21 @@ func (group *Group) delRtmpPullSession(session *rtmp.PullSession) {
// addIn 有pub或pull的输入型session加入时需要调用该函数
//
func (group *Group) addIn() {
// 是否push转推
group.pushIfNeeded()
// 是否启动hls
if group.config.HlsConfig.Enable {
if group.hlsMuxer != nil {
Log.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer)
}
enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps
group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group)
group.hlsMuxer.Start()
}
now := time.Now().Unix()
// 是否录制成flv文件
group.startPushIfNeeded()
group.startHlsIfNeeded()
group.startRecordFlvIfNeeded(now)
// 是否录制成ts文件
group.startRecordTsIfNeeded(now)
group.startRecordMpegtsIfNeeded(now)
}
// delIn 有pub或pull的输入型session离开时需要调用该函数
//
func (group *Group) delIn() {
// 停止hls
if group.config.HlsConfig.Enable && group.hlsMuxer != nil {
group.disposeHlsMuxer()
}
// 停止转推
if group.pushEnable {
for _, v := range group.url2PushProxy {
if v.pushSession != nil {
v.pushSession.Dispose()
}
v.pushSession = nil
}
}
// 停止flv录制
if group.config.RecordConfig.EnableFlv {
if group.recordFlv != nil {
if err := group.recordFlv.Dispose(); err != nil {
Log.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err)
}
group.recordFlv = nil
}
}
// 停止ts录制
if group.config.RecordConfig.EnableMpegts {
if group.recordMpegts != nil {
if err := group.recordMpegts.Dispose(); err != nil {
Log.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err)
}
group.recordMpegts = nil
}
}
group.stopPushIfNeeded()
group.stopHlsIfNeeded()
group.stopRecordFlvIfNeeded()
group.stopRecordMpegtsIfNeeded()
group.rtmpPubSession = nil
group.rtspPubSession = nil
@ -228,70 +176,3 @@ func (group *Group) delIn() {
group.patpmt = nil
group.sdpCtx = nil
}
// ---------------------------------------------------------------------------------------------------------------------
// startRecordFlvIfNeeded 是否开启flv录制
//
func (group *Group) startRecordFlvIfNeeded(nowUnix int64) {
if !group.config.RecordConfig.EnableFlv {
return
}
// 构造文件名
filename := fmt.Sprintf("%s-%d.flv", group.streamName, nowUnix)
filenameWithPath := filepath.Join(group.config.RecordConfig.FlvOutPath, filename)
// 如果已经在录制,则先关闭
// TODO(chef): 正常的逻辑是否会走到这?
if group.recordFlv != nil {
Log.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordFlv.Name())
_ = group.recordFlv.Dispose()
}
// 初始化录制
group.recordFlv = &httpflv.FlvFileWriter{}
if err := group.recordFlv.Open(filenameWithPath); err != nil {
Log.Errorf("[%s] record flv open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFlv = nil
}
if err := group.recordFlv.WriteFlvHeader(); err != nil {
Log.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFlv = nil
}
}
func (group *Group) startRecordTsIfNeeded(nowUnix int64) {
if !group.config.RecordConfig.EnableMpegts {
return
}
// 构造文件名
filename := fmt.Sprintf("%s-%d.ts", group.streamName, nowUnix)
filenameWithPath := filepath.Join(group.config.RecordConfig.MpegtsOutPath, filename)
// 如果已经在录制,则先关闭
if group.recordMpegts != nil {
Log.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordMpegts.Name())
_ = group.recordMpegts.Dispose()
}
group.recordMpegts = &mpegts.FileWriter{}
if err := group.recordMpegts.Create(filenameWithPath); err != nil {
Log.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordMpegts = nil
}
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) inSessionUniqueKey() string {
if group.rtmpPubSession != nil {
return group.rtmpPubSession.UniqueKey()
}
if group.rtspPubSession != nil {
return group.rtspPubSession.UniqueKey()
}
return ""
}

@ -0,0 +1,135 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp"
)
func (group *Group) AddRtmpSubSession(session *rtmp.ServerSession) {
Log.Debugf("[%s] [%s] add SubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtmpSubSessionSet[session] = struct{}{}
// 加入时,如果上行还没有推过视频(比如还没推流,或者是单音频流),就不需要等待关键帧了
// 也即我们假定上行肯定是以关键帧为开始进行视频发送假设不是那么我们按上行的流正常发而不过滤掉关键帧前面的不包含关键帧的非完整GOP
// TODO(chef):
// 1. 需要仔细考虑单音频无视频的流的情况
// 2. 这里不修改标志让这个session继续等关键帧也可以
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
group.pullIfNeeded()
}
func (group *Group) AddHttpflvSubSession(session *httpflv.SubSession) {
Log.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey())
session.WriteHttpResponseHeader()
session.WriteFlvHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
group.httpflvSubSessionSet[session] = struct{}{}
// 加入时,如果上行还没有推流过,就不需要等待关键帧了
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
group.pullIfNeeded()
}
// AddHttptsSubSession TODO chef:
// 这里应该也要考虑触发hls muxer开启
// 也即HTTPTS sub需要使用hls muxerhls muxer开启和关闭都要考虑HTTPTS sub
func (group *Group) AddHttptsSubSession(session *httpts.SubSession) {
Log.Debugf("[%s] [%s] add httpts SubSession into group.", group.UniqueKey, session.UniqueKey())
session.WriteHttpResponseHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
group.httptsSubSessionSet[session] = struct{}{}
group.pullIfNeeded()
}
func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) {
group.mutex.Lock()
defer group.mutex.Unlock()
// TODO(chef): 应该有等待机制,而不是直接关闭
if group.sdpCtx == nil {
Log.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]",
group.UniqueKey, session.UniqueKey())
return false, nil
}
return true, group.sdpCtx.RawSdp
}
func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) {
Log.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtspSubSessionSet[session] = struct{}{}
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
// TODO(chef): rtsp sub也应该判断是否需要静态pull回源
}
func (group *Group) DelRtmpSubSession(session *rtmp.ServerSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delRtmpSubSession(session)
}
func (group *Group) DelHttpflvSubSession(session *httpflv.SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delHttpflvSubSession(session)
}
func (group *Group) DelHttptsSubSession(session *httpts.SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delHttptsSubSession(session)
}
func (group *Group) DelRtspSubSession(session *rtsp.SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delRtspSubSession(session)
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) delRtmpSubSession(session *rtmp.ServerSession) {
Log.Debugf("[%s] [%s] del rtmp SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.rtmpSubSessionSet, session)
}
func (group *Group) delHttpflvSubSession(session *httpflv.SubSession) {
Log.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.httpflvSubSessionSet, session)
}
func (group *Group) delHttptsSubSession(session *httpts.SubSession) {
Log.Debugf("[%s] [%s] del httpts SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.httptsSubSessionSet, session)
}
func (group *Group) delRtspSubSession(session *rtsp.SubSession) {
Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.rtspSubSessionSet, session)
}

@ -0,0 +1,57 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"path/filepath"
"github.com/q191201771/lal/pkg/httpflv"
)
// startRecordFlvIfNeeded 必要时开启flv录制
//
func (group *Group) startRecordFlvIfNeeded(nowUnix int64) {
if !group.config.RecordConfig.EnableFlv {
return
}
// 构造文件名
filename := fmt.Sprintf("%s-%d.flv", group.streamName, nowUnix)
filenameWithPath := filepath.Join(group.config.RecordConfig.FlvOutPath, filename)
// 如果已经在录制,则先关闭
// TODO(chef): 正常的逻辑是否会走到这?
if group.recordFlv != nil {
Log.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordFlv.Name())
_ = group.recordFlv.Dispose()
}
// 初始化录制
group.recordFlv = &httpflv.FlvFileWriter{}
if err := group.recordFlv.Open(filenameWithPath); err != nil {
Log.Errorf("[%s] record flv open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFlv = nil
}
if err := group.recordFlv.WriteFlvHeader(); err != nil {
Log.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFlv = nil
}
}
func (group *Group) stopRecordFlvIfNeeded() {
if !group.config.RecordConfig.EnableFlv {
return
}
if group.recordFlv != nil {
_ = group.recordFlv.Dispose()
group.recordFlv = nil
}
}

@ -0,0 +1,43 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import "github.com/q191201771/lal/pkg/hls"
func (group *Group) IsHlsMuxerAlive() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.hlsMuxer != nil
}
// startHlsIfNeeded 必要时启动hls
//
func (group *Group) startHlsIfNeeded() {
// TODO(chef): [refactor] ts依赖hls
if !group.config.HlsConfig.Enable {
return
}
if group.hlsMuxer != nil {
Log.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer)
}
enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps
group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group)
group.hlsMuxer.Start()
}
func (group *Group) stopHlsIfNeeded() {
if !group.config.HlsConfig.Enable {
return
}
if group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath())
group.hlsMuxer = nil
}
}

@ -0,0 +1,50 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"path/filepath"
"github.com/q191201771/lal/pkg/mpegts"
)
// startRecordMpegtsIfNeeded 必要时开启ts录制
//
func (group *Group) startRecordMpegtsIfNeeded(nowUnix int64) {
if !group.config.RecordConfig.EnableMpegts {
return
}
// 构造文件名
filename := fmt.Sprintf("%s-%d.ts", group.streamName, nowUnix)
filenameWithPath := filepath.Join(group.config.RecordConfig.MpegtsOutPath, filename)
// 如果已经在录制,则先关闭
if group.recordMpegts != nil {
Log.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordMpegts.Name())
_ = group.recordMpegts.Dispose()
}
group.recordMpegts = &mpegts.FileWriter{}
if err := group.recordMpegts.Create(filenameWithPath); err != nil {
Log.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordMpegts = nil
}
}
func (group *Group) stopRecordMpegtsIfNeeded() {
if !group.config.RecordConfig.EnableMpegts {
return
}
if group.recordMpegts != nil {
_ = group.recordMpegts.Dispose()
group.recordMpegts = nil
}
}

@ -0,0 +1,133 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"github.com/q191201771/lal/pkg/rtmp"
)
// StartPull 外部命令主动触发pull拉流
//
// 当前调用时机:
// 1. 比如http api
//
func (group *Group) StartPull(url string) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.setPullUrl(true, url)
group.pullIfNeeded()
}
// ---------------------------------------------------------------------------------------------------------------------
type pullProxy struct {
isPulling bool
pullSession *rtmp.PullSession
}
func (group *Group) initRelayPull() {
enable := group.config.RelayPullConfig.Enable
addr := group.config.RelayPullConfig.Addr
appName := group.appName
streamName := group.streamName
// 根据配置文件中的静态回源配置来初始化回源设置
var pullUrl string
if enable {
pullUrl = fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName)
}
group.setPullUrl(enable, pullUrl)
}
func (group *Group) isPullEnable() bool {
return group.pullEnable
}
func (group *Group) setPullUrl(enable bool, url string) {
group.pullEnable = enable
group.pullUrl = url
}
func (group *Group) getPullUrl() string {
return group.pullUrl
}
func (group *Group) setPullingFlag(flag bool) {
group.pullProxy.isPulling = flag
}
func (group *Group) getPullingFlag() bool {
return group.pullProxy.isPulling
}
// 判断是否需要pull从远端拉流至本地如果需要则触发pull
//
// 当前调用时机:
// 1. 添加新sub session
// 2. 外部命令比如http api
// 3. 定时器比如pull的连接断了通过定时器可以重启触发pull
//
func (group *Group) pullIfNeeded() {
if !group.isPullEnable() {
return
}
// 如果没有从本地拉流的就不需要pull了
if !group.hasOutSession() {
return
}
// 如果本地已经有输入型的流就不需要pull了
if group.hasInSession() {
return
}
// 已经在pull中就不需要pull了
if group.getPullingFlag() {
return
}
group.setPullingFlag(true)
Log.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.getPullUrl())
go func() {
pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = relayPullTimeoutMs
option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs
})
// TODO(chef): 处理数据回调是否应该等待Add成功之后。避免竞态条件中途加入了其他in session
err := pullSession.Pull(group.getPullUrl(), group.OnReadRtmpAvMsg)
if err != nil {
Log.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
return
}
res := group.AddRtmpPullSession(pullSession)
if res {
err = <-pullSession.WaitChan()
Log.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
} else {
pullSession.Dispose()
}
}()
}
// 判断是否需要停止pull
//
// 当前调用时机:
// 1. 定时器定时检查
//
func (group *Group) stopPullIfNeeded() {
// 没有输出型的流了
if group.pullProxy.pullSession != nil && !group.hasOutSession() {
Log.Infof("[%s] stop pull since no sub session.", group.UniqueKey)
group.pullProxy.pullSession.Dispose()
}
}

@ -0,0 +1,123 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"github.com/q191201771/lal/pkg/rtmp"
)
func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) {
Log.Debugf("[%s] [%s] add rtmp PushSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.url2PushProxy != nil {
group.url2PushProxy[url].pushSession = session
}
}
func (group *Group) DelRtmpPushSession(url string, session *rtmp.PushSession) {
Log.Debugf("[%s] [%s] del rtmp PushSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.url2PushProxy != nil {
group.url2PushProxy[url].pushSession = nil
group.url2PushProxy[url].isPushing = false
}
}
type pushProxy struct {
isPushing bool
pushSession *rtmp.PushSession
}
func (group *Group) initRelayPush() {
enable := group.config.RelayPushConfig.Enable
addrList := group.config.RelayPushConfig.AddrList
appName := group.appName
streamName := group.streamName
url2PushProxy := make(map[string]*pushProxy)
if enable {
for _, addr := range addrList {
pushUrl := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName)
url2PushProxy[pushUrl] = &pushProxy{
isPushing: false,
pushSession: nil,
}
}
}
group.pushEnable = group.config.RelayPushConfig.Enable
group.url2PushProxy = url2PushProxy
}
// startPushIfNeeded 必要时进行replay push转推
//
func (group *Group) startPushIfNeeded() {
// push转推功能没开
if !group.pushEnable {
return
}
// 没有pub发布者
if group.rtmpPubSession == nil && group.rtspPubSession == nil {
return
}
// relay push时携带rtmp pub的参数
// TODO chef: 这个逻辑放这里不太好看
var urlParam string
if group.rtmpPubSession != nil {
urlParam = group.rtmpPubSession.RawQuery()
}
for url, v := range group.url2PushProxy {
// 正在转推中
if v.isPushing {
continue
}
v.isPushing = true
urlWithParam := url
if urlParam != "" {
urlWithParam += "?" + urlParam
}
Log.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam)
go func(u, u2 string) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = relayPushTimeoutMs
option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs
})
err := pushSession.Push(u2)
if err != nil {
Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
return
}
group.AddRtmpPushSession(u, pushSession)
err = <-pushSession.WaitChan()
Log.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
}(url, urlWithParam)
}
}
func (group *Group) stopPushIfNeeded() {
if !group.pushEnable {
return
}
for _, v := range group.url2PushProxy {
if v.pushSession != nil {
v.pushSession.Dispose()
}
v.pushSession = nil
}
}

@ -96,8 +96,7 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
}
if sm.config.RtmpConfig.Enable {
// TODO(chef): refactor 参数顺序统一。Observer都放最后好一些。比如rtmp和rtsp的NewServer
sm.rtmpServer = rtmp.NewServer(sm, sm.config.RtmpConfig.Addr)
sm.rtmpServer = rtmp.NewServer(sm.config.RtmpConfig.Addr, sm)
}
if sm.config.RtspConfig.Enable {
sm.rtspServer = rtsp.NewServer(sm.config.RtspConfig.Addr, sm)
@ -222,13 +221,13 @@ func (sm *ServerManager) RunLoop() error {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
var count uint32
var tickCount uint32
for {
select {
case <-sm.exitChan:
return nil
case <-t.C:
count++
tickCount++
sm.mutex.Lock()
@ -240,13 +239,13 @@ func (sm *ServerManager) RunLoop() error {
return false
}
group.Tick()
group.Tick(tickCount)
return true
})
// 定时打印一些group相关的debug日志
if sm.config.DebugConfig.LogGroupIntervalSec > 0 &&
count%uint32(sm.config.DebugConfig.LogGroupIntervalSec) == 0 {
tickCount%uint32(sm.config.DebugConfig.LogGroupIntervalSec) == 0 {
groupNum := sm.groupManager.Len()
Log.Debugf("DEBUG_GROUP_LOG: group size=%d", groupNum)
if sm.config.DebugConfig.LogGroupMaxGroupNum > 0 {
@ -264,7 +263,7 @@ func (sm *ServerManager) RunLoop() error {
sm.mutex.Unlock()
// 定时通过http notify发送group相关的信息
if uis != 0 && (count%uis) == 0 {
if uis != 0 && (tickCount%uis) == 0 {
updateInfo.ServerId = sm.config.ServerId
updateInfo.Groups = sm.StatAllGroup()
sm.option.NotifyHandler.OnUpdate(updateInfo)
@ -653,7 +652,6 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) error {
}
func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) {
// TODO chef: impl me
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
@ -815,7 +813,3 @@ func (sm *ServerManager) serveHls(writer http.ResponseWriter, req *http.Request)
sm.hlsServerHandler.ServeHTTP(writer, req)
}
func (sm *ServerManager) runWebPprof(addr string) {
}

@ -13,10 +13,13 @@ import "github.com/q191201771/naza/pkg/nazalog"
var Log = nazalog.GetGlobalLogger()
var (
relayPushTimeoutMs = 5000
relayPushWriteAvTimeoutMs = 5000
relayPullTimeoutMs = 5000
relayPullReadAvTimeoutMs = 5000
relayPushTimeoutMs = 5000
relayPushWriteAvTimeoutMs = 5000
relayPullTimeoutMs = 5000
relayPullReadAvTimeoutMs = 5000
// calcSessionStatIntervalSec 计算所有session收发码率的时间间隔
//
calcSessionStatIntervalSec uint32 = 5
// checkSessionAliveIntervalSec

@ -34,15 +34,15 @@ type ServerObserver interface {
}
type Server struct {
observer ServerObserver
addr string
observer ServerObserver
ln net.Listener
}
func NewServer(observer ServerObserver, addr string) *Server {
func NewServer(addr string, observer ServerObserver) *Server {
return &Server{
observer: observer,
addr: addr,
observer: observer,
}
}

Loading…
Cancel
Save