diff --git a/app/demo/pullrtsp/pullrtsp.go b/app/demo/pullrtsp/pullrtsp.go index 14a33fe..3b9787f 100644 --- a/app/demo/pullrtsp/pullrtsp.go +++ b/app/demo/pullrtsp/pullrtsp.go @@ -82,7 +82,7 @@ func main() { var observer Observer pullSession := rtsp.NewPullSession(&observer, func(option *rtsp.PullSessionOption) { - option.PullTimeoutMs = 5000 + option.PullTimeoutMs = 10000 option.OverTcp = overTcp != 0 }) diff --git a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go index 8b1a49c..1815e72 100644 --- a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go +++ b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go @@ -32,8 +32,8 @@ func main() { inUrl, outUrl, overTcp := parseFlag() pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.PushTimeoutMs = 5000 - option.WriteAvTimeoutMs = 5000 + option.PushTimeoutMs = 10000 + option.WriteAvTimeoutMs = 10000 }) err := pushSession.Push(outUrl) @@ -45,7 +45,7 @@ func main() { nazalog.Assert(nil, err) }) pullSession := rtsp.NewPullSession(remuxer, func(option *rtsp.PullSessionOption) { - option.PullTimeoutMs = 5000 + option.PullTimeoutMs = 10000 option.OverTcp = overTcp != 0 }) diff --git a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go index cca4a2c..d14ff5e 100644 --- a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go +++ b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go @@ -62,7 +62,7 @@ func NewRtspTunnel(pullUrl string, pushUrl string, pullOverTcp bool, pushOverTcp // @return: 如果为nil,表示任务启动成功,此时数据已经在后台转发 func (r *RtspTunnel) Start() error { r.pullSession = rtsp.NewPullSession(r, func(option *rtsp.PullSessionOption) { - option.PullTimeoutMs = 5000 + option.PullTimeoutMs = 10000 option.OverTcp = r.pullOverTcp }) if err := r.pullSession.Pull(r.pullUrl); err != nil { @@ -73,7 +73,7 @@ func (r *RtspTunnel) Start() error { nazalog.Debugf("[%s] start pull succ. sdp=%s", r.uniqueKey, string(sdpCtx.RawSdp)) r.pushSession = rtsp.NewPushSession(func(option *rtsp.PushSessionOption) { - option.PushTimeoutMs = 5000 + option.PushTimeoutMs = 10000 option.OverTcp = r.pushOverTcp }) if err := r.pushSession.Push(r.pushUrl, sdpCtx); err != nil { diff --git a/app/demo/pushrtmp/pushrtmp.go b/app/demo/pushrtmp/pushrtmp.go index 6b254f0..46489a0 100644 --- a/app/demo/pushrtmp/pushrtmp.go +++ b/app/demo/pushrtmp/pushrtmp.go @@ -90,7 +90,7 @@ func main() { func push(tags []httpflv.Tag, url string, isRecursive bool) { ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.PushTimeoutMs = 5000 + option.PushTimeoutMs = 10000 option.WriteAvTimeoutMs = 10000 option.WriteBufSize = 0 option.WriteChanSize = 0 diff --git a/pkg/innertest/iface_impl.go b/pkg/innertest/iface_impl.go index 623ada3..bf3f7a0 100644 --- a/pkg/innertest/iface_impl.go +++ b/pkg/innertest/iface_impl.go @@ -23,6 +23,11 @@ import ( // TODO(chef): 检查所有 interface是否以I开头 202207 // TODO(chef): 增加 gb28181.PubSession 202207 +// pub: rtmp, rtsp, ps, ICustomizePubSessionContext +// sub: rtmp, rtsp, httpflv, httpts, hls, ICustomizeHookSessionContext +// pull: rtmp, rtsp, httpflv +// push: rtmp, rtsp + var ( _ base.ISession = &rtmp.ServerSession{} _ base.ISession = &rtsp.PubSession{} diff --git a/pkg/logic/group__.go b/pkg/logic/group__.go index 570160f..dc4e18d 100644 --- a/pkg/logic/group__.go +++ b/pkg/logic/group__.go @@ -435,8 +435,6 @@ func (group *Group) OutSessionNum() int { // --------------------------------------------------------------------------------------------------------------------- // disposeInactiveSessions 关闭不活跃的session -// -// TODO chef: [refactor] 梳理和naza.Connection超时重复部分 func (group *Group) disposeInactiveSessions(tickCount uint32) { if group.psPubSession != nil { if group.psPubTimeoutSec == 0 { @@ -456,9 +454,9 @@ func (group *Group) disposeInactiveSessions(tickCount uint32) { } } - // 以下都是以 checkSessionAliveIntervalSec 为间隔的清理逻辑 + // 以下都是以 CheckSessionAliveIntervalSec 为间隔的清理逻辑 - if tickCount%checkSessionAliveIntervalSec != 0 { + if tickCount%CheckSessionAliveIntervalSec != 0 { return } diff --git a/pkg/logic/group__relay_pull.go b/pkg/logic/group__relay_pull.go index 3f53231..c4d1182 100644 --- a/pkg/logic/group__relay_pull.go +++ b/pkg/logic/group__relay_pull.go @@ -69,13 +69,6 @@ type pullProxy struct { // initRelayPullByConfig 根据配置文件中的静态回源配置来初始化回源设置 func (group *Group) initRelayPullByConfig() { - // 注意,这是配置文件中静态回源的配置值,不是HTTP-API的默认值 - const ( - staticRelayPullTimeoutMs = 5000 // - staticRelayPullRetryNum = base.PullRetryNumForever - staticRelayPullAutoStopPullAfterNoOutMs = base.AutoStopPullAfterNoOutMsImmediately - ) - enable := group.config.StaticRelayPullConfig.Enable addr := group.config.StaticRelayPullConfig.Addr appName := group.appName @@ -93,7 +86,7 @@ func (group *Group) initRelayPullByConfig() { group.pullProxy.pullUrl = pullUrl group.pullProxy.staticRelayPullEnable = enable - group.pullProxy.pullTimeoutMs = staticRelayPullTimeoutMs + group.pullProxy.pullTimeoutMs = StaticRelayPullTimeoutMs group.pullProxy.pullRetryNum = staticRelayPullRetryNum group.pullProxy.autoStopPullAfterNoOutMs = staticRelayPullAutoStopPullAfterNoOutMs } diff --git a/pkg/logic/group__relay_push.go b/pkg/logic/group__relay_push.go index 191cc21..6bd717c 100644 --- a/pkg/logic/group__relay_push.go +++ b/pkg/logic/group__relay_push.go @@ -97,8 +97,8 @@ func (group *Group) startPushIfNeeded() { go func(u, u2 string) { pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.PushTimeoutMs = relayPushTimeoutMs - option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs + option.PushTimeoutMs = RelayPushTimeoutMs + option.WriteAvTimeoutMs = RelayPushWriteAvTimeoutMs }) err := pushSession.Push(u2) if err != nil { diff --git a/pkg/logic/http_api.go b/pkg/logic/http_api.go index 43c01ce..a22bb7a 100644 --- a/pkg/logic/http_api.go +++ b/pkg/logic/http_api.go @@ -131,7 +131,7 @@ func (h *HttpApiServer) ctrlStartRelayPullHandler(w http.ResponseWriter, req *ht } if !j.Exist("pull_timeout_ms") { - info.PullTimeoutMs = 5000 + info.PullTimeoutMs = DefaultApiCtrlStartRelayPullReqPullTimeoutMs } if !j.Exist("pull_retry_num") { info.PullRetryNum = base.PullRetryNumNever @@ -200,7 +200,7 @@ func (h *HttpApiServer) ctrlStartRtpPubHandler(w http.ResponseWriter, req *http. } if !j.Exist("timeout_ms") { - info.TimeoutMs = 60000 + info.TimeoutMs = DefaultApiCtrlStartRtpPubReqTimeoutMs } // 不存在时默认0值的,不需要手动写了 //if !j.Exist("port") { diff --git a/pkg/logic/var.go b/pkg/logic/var.go index 011a53d..bd851ce 100644 --- a/pkg/logic/var.go +++ b/pkg/logic/var.go @@ -8,25 +8,79 @@ package logic -import "github.com/q191201771/naza/pkg/nazalog" +import ( + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/nazalog" +) var Log = nazalog.GetGlobalLogger() var ( - relayPushTimeoutMs = 5000 - relayPushWriteAvTimeoutMs = 5000 - // calcSessionStatIntervalSec 计算所有session收发码率的时间间隔 + // 所有session超时管理整理如下: // - calcSessionStatIntervalSec uint32 = 5 + // (1.) 第一种方式,是上层判断 + // + // (1.1.) CheckSessionAliveIntervalSec + // - rtmp pub, rtsp pub, + // - rtmp pull, rtsp pull, + // - rtmp sub, rtsp sub, httpflv sub, httpts sub, + // - rtmp push, + // + // (1.2.) HTTP-API参数 ApiCtrlStartRtpPubReq.TimeoutMs + // - ps pub, + // + // (1.3.) 无 + // - customize pub, + // - hls sub, + // + // (2.) 第二种方式,是session自身提供的超时功能: + // + // (2.x.) rtmp pub, rtmp sub: 底层naza connection,并且设置了超时 rtmp.serverSessionReadAvTimeoutMs rtmp.serverSessionWriteAvTimeoutMs + // (2.x.) rtsp pub, rtsp sub: cmd以及tcp模式时底层naza connection,但是没有设置超时(udp使用 nazanet.UdpConnection), + // (2.x.) rtmp pull, rtsp pull: HTTP-API参数 ApiCtrlStartRelayPullReq.PullTimeoutMs 静态回源时 StaticRelayPullTimeoutMs + // (2.x.) httpflv sub, httpts sub: httpflv.SubSessionWriteTimeoutMs , httpts.SubSessionWriteTimeoutMs + // (2.x.) rtmp push: RelayPushTimeoutMs, RelayPushWriteAvTimeoutMs, + // (2.x.) 无: ps pub, customize pub, + // (2.x.) hls sub: 配置文件中配置项 sub_session_timeout_ms + // + // (3.) client类型session默认超时: + // - rtmp push: rtmp.PushSessionOption.PushTimeoutMs WriteAvTimeoutMs + // - rtsp push: rtsp.PushSessionOption.PushTimeoutMs + // - rtmp pull: rtmp.PullSessionOption.PullTimeoutMs ReadAvTimeoutMs + // - rtsp pull: rtsp.PullSessionOption.PullTimeoutMs + // - httpflv pull: httpflv.PullSessionOption.PullTimeoutMs ReadTimeoutMs - // checkSessionAliveIntervalSec + // CheckSessionAliveIntervalSec // - // - 对于输入型session,检查一定时间内,是否没有收到数据 - // - 对于输出型session,检查一定时间内,是否没有发送数据 - // 注意,这里既检查socket发送阻塞,又检查上层没有给session喂数据 + // 检查session是否有数据传输的时间间隔,该间隔内没有数据传输的session将被关闭。 // - checkSessionAliveIntervalSec uint32 = 10 + // 对于输入型session,检查一定时间内,是否没有收到数据。 + // + // 对于输出型session,检查一定时间内,是否没有发送数据。 + // 注意,socket阻塞无法发送和上层没有向该session喂入数据都算没有发送数据。 + // + CheckSessionAliveIntervalSec uint32 = 10 + + RelayPushTimeoutMs = 10000 + RelayPushWriteAvTimeoutMs = 10000 + + StaticRelayPullTimeoutMs = 10000 + + DefaultApiCtrlStartRtpPubReqTimeoutMs = 60000 + DefaultApiCtrlStartRelayPullReqPullTimeoutMs = 10000 +) + +// 注意,这是配置文件中静态回源的配置值,不是HTTP-API的默认值 +const ( + staticRelayPullRetryNum = base.PullRetryNumForever + staticRelayPullAutoStopPullAfterNoOutMs = base.AutoStopPullAfterNoOutMsImmediately +) + +var ( + // calcSessionStatIntervalSec 计算所有session收发码率的时间间隔 + // + calcSessionStatIntervalSec uint32 = 5 ) const (