[refactor] 整理所有超时相关的代码

pull/286/head
q191201771 2 years ago
parent 5620db9654
commit 6fe6629848

@ -82,7 +82,7 @@ func main() {
var observer Observer var observer Observer
pullSession := rtsp.NewPullSession(&observer, func(option *rtsp.PullSessionOption) { pullSession := rtsp.NewPullSession(&observer, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMs = 5000 option.PullTimeoutMs = 10000
option.OverTcp = overTcp != 0 option.OverTcp = overTcp != 0
}) })

@ -32,8 +32,8 @@ func main() {
inUrl, outUrl, overTcp := parseFlag() inUrl, outUrl, overTcp := parseFlag()
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = 5000 option.PushTimeoutMs = 10000
option.WriteAvTimeoutMs = 5000 option.WriteAvTimeoutMs = 10000
}) })
err := pushSession.Push(outUrl) err := pushSession.Push(outUrl)
@ -45,7 +45,7 @@ func main() {
nazalog.Assert(nil, err) nazalog.Assert(nil, err)
}) })
pullSession := rtsp.NewPullSession(remuxer, func(option *rtsp.PullSessionOption) { pullSession := rtsp.NewPullSession(remuxer, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMs = 5000 option.PullTimeoutMs = 10000
option.OverTcp = overTcp != 0 option.OverTcp = overTcp != 0
}) })

@ -62,7 +62,7 @@ func NewRtspTunnel(pullUrl string, pushUrl string, pullOverTcp bool, pushOverTcp
// @return: 如果为nil表示任务启动成功此时数据已经在后台转发 // @return: 如果为nil表示任务启动成功此时数据已经在后台转发
func (r *RtspTunnel) Start() error { func (r *RtspTunnel) Start() error {
r.pullSession = rtsp.NewPullSession(r, func(option *rtsp.PullSessionOption) { r.pullSession = rtsp.NewPullSession(r, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMs = 5000 option.PullTimeoutMs = 10000
option.OverTcp = r.pullOverTcp option.OverTcp = r.pullOverTcp
}) })
if err := r.pullSession.Pull(r.pullUrl); err != nil { if err := r.pullSession.Pull(r.pullUrl); err != nil {
@ -73,7 +73,7 @@ func (r *RtspTunnel) Start() error {
nazalog.Debugf("[%s] start pull succ. sdp=%s", r.uniqueKey, string(sdpCtx.RawSdp)) nazalog.Debugf("[%s] start pull succ. sdp=%s", r.uniqueKey, string(sdpCtx.RawSdp))
r.pushSession = rtsp.NewPushSession(func(option *rtsp.PushSessionOption) { r.pushSession = rtsp.NewPushSession(func(option *rtsp.PushSessionOption) {
option.PushTimeoutMs = 5000 option.PushTimeoutMs = 10000
option.OverTcp = r.pushOverTcp option.OverTcp = r.pushOverTcp
}) })
if err := r.pushSession.Push(r.pushUrl, sdpCtx); err != nil { if err := r.pushSession.Push(r.pushUrl, sdpCtx); err != nil {

@ -90,7 +90,7 @@ func main() {
func push(tags []httpflv.Tag, url string, isRecursive bool) { func push(tags []httpflv.Tag, url string, isRecursive bool) {
ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = 5000 option.PushTimeoutMs = 10000
option.WriteAvTimeoutMs = 10000 option.WriteAvTimeoutMs = 10000
option.WriteBufSize = 0 option.WriteBufSize = 0
option.WriteChanSize = 0 option.WriteChanSize = 0

@ -23,6 +23,11 @@ import (
// TODO(chef): 检查所有 interface是否以I开头 202207 // TODO(chef): 检查所有 interface是否以I开头 202207
// TODO(chef): 增加 gb28181.PubSession 202207 // TODO(chef): 增加 gb28181.PubSession 202207
// pub: rtmp, rtsp, ps, ICustomizePubSessionContext
// sub: rtmp, rtsp, httpflv, httpts, hls, ICustomizeHookSessionContext
// pull: rtmp, rtsp, httpflv
// push: rtmp, rtsp
var ( var (
_ base.ISession = &rtmp.ServerSession{} _ base.ISession = &rtmp.ServerSession{}
_ base.ISession = &rtsp.PubSession{} _ base.ISession = &rtsp.PubSession{}

@ -435,8 +435,6 @@ func (group *Group) OutSessionNum() int {
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
// disposeInactiveSessions 关闭不活跃的session // disposeInactiveSessions 关闭不活跃的session
//
// TODO chef: [refactor] 梳理和naza.Connection超时重复部分
func (group *Group) disposeInactiveSessions(tickCount uint32) { func (group *Group) disposeInactiveSessions(tickCount uint32) {
if group.psPubSession != nil { if group.psPubSession != nil {
if group.psPubTimeoutSec == 0 { if group.psPubTimeoutSec == 0 {
@ -456,9 +454,9 @@ func (group *Group) disposeInactiveSessions(tickCount uint32) {
} }
} }
// 以下都是以 checkSessionAliveIntervalSec 为间隔的清理逻辑 // 以下都是以 CheckSessionAliveIntervalSec 为间隔的清理逻辑
if tickCount%checkSessionAliveIntervalSec != 0 { if tickCount%CheckSessionAliveIntervalSec != 0 {
return return
} }

@ -69,13 +69,6 @@ type pullProxy struct {
// initRelayPullByConfig 根据配置文件中的静态回源配置来初始化回源设置 // initRelayPullByConfig 根据配置文件中的静态回源配置来初始化回源设置
func (group *Group) initRelayPullByConfig() { func (group *Group) initRelayPullByConfig() {
// 注意这是配置文件中静态回源的配置值不是HTTP-API的默认值
const (
staticRelayPullTimeoutMs = 5000 //
staticRelayPullRetryNum = base.PullRetryNumForever
staticRelayPullAutoStopPullAfterNoOutMs = base.AutoStopPullAfterNoOutMsImmediately
)
enable := group.config.StaticRelayPullConfig.Enable enable := group.config.StaticRelayPullConfig.Enable
addr := group.config.StaticRelayPullConfig.Addr addr := group.config.StaticRelayPullConfig.Addr
appName := group.appName appName := group.appName
@ -93,7 +86,7 @@ func (group *Group) initRelayPullByConfig() {
group.pullProxy.pullUrl = pullUrl group.pullProxy.pullUrl = pullUrl
group.pullProxy.staticRelayPullEnable = enable group.pullProxy.staticRelayPullEnable = enable
group.pullProxy.pullTimeoutMs = staticRelayPullTimeoutMs group.pullProxy.pullTimeoutMs = StaticRelayPullTimeoutMs
group.pullProxy.pullRetryNum = staticRelayPullRetryNum group.pullProxy.pullRetryNum = staticRelayPullRetryNum
group.pullProxy.autoStopPullAfterNoOutMs = staticRelayPullAutoStopPullAfterNoOutMs group.pullProxy.autoStopPullAfterNoOutMs = staticRelayPullAutoStopPullAfterNoOutMs
} }

@ -97,8 +97,8 @@ func (group *Group) startPushIfNeeded() {
go func(u, u2 string) { go func(u, u2 string) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = relayPushTimeoutMs option.PushTimeoutMs = RelayPushTimeoutMs
option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs option.WriteAvTimeoutMs = RelayPushWriteAvTimeoutMs
}) })
err := pushSession.Push(u2) err := pushSession.Push(u2)
if err != nil { if err != nil {

@ -131,7 +131,7 @@ func (h *HttpApiServer) ctrlStartRelayPullHandler(w http.ResponseWriter, req *ht
} }
if !j.Exist("pull_timeout_ms") { if !j.Exist("pull_timeout_ms") {
info.PullTimeoutMs = 5000 info.PullTimeoutMs = DefaultApiCtrlStartRelayPullReqPullTimeoutMs
} }
if !j.Exist("pull_retry_num") { if !j.Exist("pull_retry_num") {
info.PullRetryNum = base.PullRetryNumNever info.PullRetryNum = base.PullRetryNumNever
@ -200,7 +200,7 @@ func (h *HttpApiServer) ctrlStartRtpPubHandler(w http.ResponseWriter, req *http.
} }
if !j.Exist("timeout_ms") { if !j.Exist("timeout_ms") {
info.TimeoutMs = 60000 info.TimeoutMs = DefaultApiCtrlStartRtpPubReqTimeoutMs
} }
// 不存在时默认0值的不需要手动写了 // 不存在时默认0值的不需要手动写了
//if !j.Exist("port") { //if !j.Exist("port") {

@ -8,25 +8,79 @@
package logic package logic
import "github.com/q191201771/naza/pkg/nazalog" import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
)
var Log = nazalog.GetGlobalLogger() var Log = nazalog.GetGlobalLogger()
var ( var (
relayPushTimeoutMs = 5000
relayPushWriteAvTimeoutMs = 5000
// calcSessionStatIntervalSec 计算所有session收发码率的时间间隔 // 所有session超时管理整理如下
// //
calcSessionStatIntervalSec uint32 = 5 // (1.) 第一种方式,是上层判断
//
// (1.1.) CheckSessionAliveIntervalSec
// - rtmp pub, rtsp pub,
// - rtmp pull, rtsp pull,
// - rtmp sub, rtsp sub, httpflv sub, httpts sub,
// - rtmp push,
//
// (1.2.) HTTP-API参数 ApiCtrlStartRtpPubReq.TimeoutMs
// - ps pub,
//
// (1.3.) 无
// - customize pub,
// - hls sub,
//
// (2.) 第二种方式是session自身提供的超时功能
//
// (2.x.) rtmp pub, rtmp sub: 底层naza connection并且设置了超时 rtmp.serverSessionReadAvTimeoutMs rtmp.serverSessionWriteAvTimeoutMs
// (2.x.) rtsp pub, rtsp sub: cmd以及tcp模式时底层naza connection但是没有设置超时(udp使用 nazanet.UdpConnection),
// (2.x.) rtmp pull, rtsp pull: HTTP-API参数 ApiCtrlStartRelayPullReq.PullTimeoutMs 静态回源时 StaticRelayPullTimeoutMs
// (2.x.) httpflv sub, httpts sub: httpflv.SubSessionWriteTimeoutMs , httpts.SubSessionWriteTimeoutMs
// (2.x.) rtmp push: RelayPushTimeoutMs, RelayPushWriteAvTimeoutMs,
// (2.x.) 无: ps pub, customize pub,
// (2.x.) hls sub: 配置文件中配置项 sub_session_timeout_ms
//
// (3.) client类型session默认超时
// - rtmp push: rtmp.PushSessionOption.PushTimeoutMs WriteAvTimeoutMs
// - rtsp push: rtsp.PushSessionOption.PushTimeoutMs
// - rtmp pull: rtmp.PullSessionOption.PullTimeoutMs ReadAvTimeoutMs
// - rtsp pull: rtsp.PullSessionOption.PullTimeoutMs
// - httpflv pull: httpflv.PullSessionOption.PullTimeoutMs ReadTimeoutMs
// checkSessionAliveIntervalSec // CheckSessionAliveIntervalSec
// //
// - 对于输入型session检查一定时间内是否没有收到数据 // 检查session是否有数据传输的时间间隔该间隔内没有数据传输的session将被关闭。
// - 对于输出型session检查一定时间内是否没有发送数据
// 注意这里既检查socket发送阻塞又检查上层没有给session喂数据
// //
checkSessionAliveIntervalSec uint32 = 10 // 对于输入型session检查一定时间内是否没有收到数据。
//
// 对于输出型session检查一定时间内是否没有发送数据。
// 注意socket阻塞无法发送和上层没有向该session喂入数据都算没有发送数据。
//
CheckSessionAliveIntervalSec uint32 = 10
RelayPushTimeoutMs = 10000
RelayPushWriteAvTimeoutMs = 10000
StaticRelayPullTimeoutMs = 10000
DefaultApiCtrlStartRtpPubReqTimeoutMs = 60000
DefaultApiCtrlStartRelayPullReqPullTimeoutMs = 10000
)
// 注意这是配置文件中静态回源的配置值不是HTTP-API的默认值
const (
staticRelayPullRetryNum = base.PullRetryNumForever
staticRelayPullAutoStopPullAfterNoOutMs = base.AutoStopPullAfterNoOutMsImmediately
)
var (
// calcSessionStatIntervalSec 计算所有session收发码率的时间间隔
//
calcSessionStatIntervalSec uint32 = 5
) )
const ( const (

Loading…
Cancel
Save