diff --git a/conf/node2.conf.json b/conf/node2.conf.json index 471655d..06fa827 100644 --- a/conf/node2.conf.json +++ b/conf/node2.conf.json @@ -22,8 +22,8 @@ "gop_num": 0 }, "hls": { - "enable": true, - "enable_https": true, + "enable": false, + "enable_https": false, "url_pattern": "/hls/", "out_path": "./lal_record/hls/", "fragment_duration_ms": 3000, @@ -54,7 +54,7 @@ "addr_list":[ ] }, - "relay_pull": { + "static_relay_pull": { "enable": false, "addr": "" }, diff --git a/pkg/base/http_api_t.go b/pkg/base/http_api_t.go index 46920a3..c6328fc 100644 --- a/pkg/base/http_api_t.go +++ b/pkg/base/http_api_t.go @@ -8,9 +8,9 @@ package base -// 文档见: https://pengrl.com/p/20100/ +// 文档见: https://pengrl.com/lal/#/HTTPAPI -const HttpApiVersion = "v0.2.0" +const HttpApiVersion = "v0.3.0" const ( ErrorCodeSucc = 0 @@ -64,7 +64,8 @@ type ApiStatGroup struct { type ApiCtrlStartRelayPull struct { HttpResponseBasic Data struct { - SessionId string `json:"session_id"` + StreamName string `json:"stream_name"` + SessionId string `json:"session_id"` } `json:"data"` } @@ -85,7 +86,7 @@ type ApiCtrlStartRelayPullReq struct { AutoStopPullAfterNoOutMs int `json:"auto_stop_pull_after_no_out_ms"` } -type ApiCtrlKickOutSession struct { +type ApiCtrlKickSession struct { StreamName string `json:"stream_name"` SessionId string `json:"session_id"` } diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index 2a7f412..484b934 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -355,10 +355,6 @@ func getAllHttpApi(addr string) { var b []byte var err error - b, err = httpGet(fmt.Sprintf("http://%s/api/list", addr)) - Log.Assert(nil, err) - Log.Debugf("%s", string(b)) - b, err = httpGet(fmt.Sprintf("http://%s/api/stat/lal_info", addr)) Log.Assert(nil, err) Log.Debugf("%s", string(b)) @@ -376,8 +372,8 @@ func getAllHttpApi(addr string) { Log.Assert(nil, err) Log.Debugf("%s", string(b)) - var ackos base.ApiCtrlKickOutSession - b, err = httpPost(fmt.Sprintf("http://%s/api/ctrl/kick_out_session", addr), &ackos) + var ackos base.ApiCtrlKickSession + b, err = httpPost(fmt.Sprintf("http://%s/api/ctrl/kick_session", addr), &ackos) Log.Assert(nil, err) Log.Debugf("%s", string(b)) } diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 2901047..3a21eef 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -232,19 +232,27 @@ func (group *Group) GetStat(maxsub int) base.StatGroup { return group.stat } -func (group *Group) KickOutSession(sessionId string) bool { +func (group *Group) KickSession(sessionId string) bool { group.mutex.Lock() defer group.mutex.Unlock() Log.Infof("[%s] kick out session. session id=%s", group.UniqueKey, sessionId) if strings.HasPrefix(sessionId, base.UkPreRtmpServerSession) { - if group.rtmpPubSession != nil { + if group.rtmpPubSession != nil && group.rtmpPubSession.UniqueKey() == sessionId { group.rtmpPubSession.Dispose() return true } + for s := range group.rtmpSubSessionSet { + if s.UniqueKey() == sessionId { + s.Dispose() + return true + } + } + } else if strings.HasPrefix(sessionId, base.UkPreRtmpPullSession) || strings.HasPrefix(sessionId, base.UkPreRtspPullSession) { + return group.kickPull(sessionId) } else if strings.HasPrefix(sessionId, base.UkPreRtspPubSession) { - if group.rtspPubSession != nil { + if group.rtspPubSession != nil && group.rtspPubSession.UniqueKey() == sessionId { group.rtspPubSession.Dispose() return true } @@ -271,16 +279,16 @@ func (group *Group) KickOutSession(sessionId string) bool { } } } else { - Log.Errorf("[%s] kick out session while session id format invalid. %s", group.UniqueKey, sessionId) + Log.Errorf("[%s] kick session while session id format invalid. %s", group.UniqueKey, sessionId) } return false } -func (group *Group) IsTotalEmpty() bool { +func (group *Group) IsInactive() bool { group.mutex.Lock() defer group.mutex.Unlock() - return group.isTotalEmpty() + return group.isTotalEmpty() && !group.isPullModuleAlive() } func (group *Group) HasInSession() bool { diff --git a/pkg/logic/group__relay_pull.go b/pkg/logic/group__relay_pull.go index 4b119dd..af2ac7b 100644 --- a/pkg/logic/group__relay_pull.go +++ b/pkg/logic/group__relay_pull.go @@ -12,6 +12,7 @@ import ( "fmt" "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/rtsp" + "github.com/q191201771/naza/pkg/nazalog" "strings" "time" @@ -38,6 +39,9 @@ func (group *Group) StartPull(info base.ApiCtrlStartRelayPullReq) (string, error // @return 如果PullSession存在,返回它的unique key // func (group *Group) StopPull() string { + group.mutex.Lock() + defer group.mutex.Unlock() + group.pullProxy.apiEnable = false return group.stopPull() } @@ -60,7 +64,8 @@ type pullProxy struct { rtspSession *rtsp.PullSession } -// 根据配置文件中的静态回源配置来初始化回源设置 +// initRelayPullByConfig 根据配置文件中的静态回源配置来初始化回源设置 +// func (group *Group) initRelayPullByConfig() { const ( staticRelayPullTimeoutMs = 5000 // @@ -73,7 +78,9 @@ func (group *Group) initRelayPullByConfig() { appName := group.appName streamName := group.streamName - group.pullProxy = &pullProxy{} + group.pullProxy = &pullProxy{ + lastHasOutTs: time.Now().UnixNano() / 1e6, + } var pullUrl string if enable { @@ -149,6 +156,24 @@ func (group *Group) pullSessionUniqueKey() string { return "" } +func (group *Group) isPullModuleAlive() bool { + return group.hasPullSession() || group.pullProxy.isSessionPulling || !group.shouldAutoStopPull() +} + +// kickPull +// +// @return 返回true,表示找到对应的session,并关闭 +// +func (group *Group) kickPull(sessionId string) bool { + if (group.pullProxy.rtmpSession != nil && group.pullProxy.rtmpSession.UniqueKey() == sessionId) || + (group.pullProxy.rtspSession != nil && group.pullProxy.rtspSession.UniqueKey() == sessionId) { + group.pullProxy.apiEnable = false + group.stopPull() + return true + } + return false +} + // 判断是否需要pull从远端拉流至本地,如果需要,则触发pull // // 当前调用时机: @@ -162,7 +187,7 @@ func (group *Group) pullIfNeeded() (string, error) { } // 如果没有从本地拉流的,就不需要pull了 - if group.ShouldAutoStop() { + if group.shouldAutoStopPull() { return "", nil } @@ -254,35 +279,35 @@ func (group *Group) pullIfNeeded() (string, error) { } func (group *Group) stopPull() string { - Log.Infof("[%s] stop pull since no sub session.", group.UniqueKey) - // 关闭时,清空用于重试的计数 group.pullProxy.startCount = 0 if group.pullProxy.rtmpSession != nil { + Log.Infof("[%s] stop pull session.", group.UniqueKey) group.pullProxy.rtmpSession.Dispose() - return group.pullProxy.rtspSession.UniqueKey() + return group.pullProxy.rtmpSession.UniqueKey() } if group.pullProxy.rtspSession != nil { + Log.Infof("[%s] stop pull session.", group.UniqueKey) group.pullProxy.rtspSession.Dispose() - return group.pullProxy.rtmpSession.UniqueKey() + return group.pullProxy.rtspSession.UniqueKey() } return "" } func (group *Group) tickPullModule() { if group.hasSubSession() { - group.pullProxy.lastHasOutTs = time.Now().Unix() + group.pullProxy.lastHasOutTs = time.Now().UnixNano() / 1e6 } - if group.ShouldAutoStop() { + if group.shouldAutoStopPull() { group.stopPull() } else { group.pullIfNeeded() } } -func (group *Group) ShouldAutoStop() bool { +func (group *Group) shouldAutoStopPull() bool { if group.pullProxy.autoStopPullAfterNoOutMs < 0 { return false } else if group.pullProxy.autoStopPullAfterNoOutMs == 0 { @@ -291,6 +316,7 @@ func (group *Group) ShouldAutoStop() bool { if group.hasOutSession() { return false } - return time.Now().Unix()-group.pullProxy.lastHasOutTs >= int64(group.pullProxy.autoStopPullAfterNoOutMs) + nazalog.Debugf("%d %d %d", group.pullProxy.lastHasOutTs, time.Now().UnixNano(), group.pullProxy.autoStopPullAfterNoOutMs) + return group.pullProxy.lastHasOutTs != -1 && time.Now().UnixNano()/1e6-group.pullProxy.lastHasOutTs >= int64(group.pullProxy.autoStopPullAfterNoOutMs) } } diff --git a/pkg/logic/http_api.go b/pkg/logic/http_api.go index 3269af0..9096be2 100644 --- a/pkg/logic/http_api.go +++ b/pkg/logic/http_api.go @@ -50,7 +50,7 @@ func (h *HttpApiServer) RunLoop() error { mux.HandleFunc("/api/stat/all_group", h.statAllGroupHandler) mux.HandleFunc("/api/ctrl/start_relay_pull", h.ctrlStartRelayPullHandler) mux.HandleFunc("/api/ctrl/stop_relay_pull", h.ctrlStopRelayPullHandler) - mux.HandleFunc("/api/ctrl/kick_out_session", h.ctrlKickOutSessionHandler) + mux.HandleFunc("/api/ctrl/kick_session", h.ctrlKickSessionHandler) var srv http.Server srv.Handler = mux @@ -126,7 +126,7 @@ func (h *HttpApiServer) ctrlStartRelayPullHandler(w http.ResponseWriter, req *ht info.PullRetryNum = -1 } if !j.Exist("auto_stop_pull_after_no_out_ms") { - info.AutoStopPullAfterNoOutMs = 10000 + info.AutoStopPullAfterNoOutMs = -1 } return nil } @@ -141,17 +141,8 @@ func (h *HttpApiServer) ctrlStartRelayPullHandler(w http.ResponseWriter, req *ht } Log.Infof("http api start pull. req info=%+v", info) - var sessionId string - sessionId, err = h.sm.CtrlStartRelayPull(info) - if err != nil { - v.ErrorCode = base.ErrorCodeStartRelayPullFail - v.Desp = err.Error() - } else { - v.ErrorCode = base.ErrorCodeSucc - v.Desp = base.DespSucc - v.Data.SessionId = sessionId - } - feedback(v, w) + resp := h.sm.CtrlStartRelayPull(info) + feedback(resp, w) return } @@ -174,9 +165,9 @@ func (h *HttpApiServer) ctrlStopRelayPullHandler(w http.ResponseWriter, req *htt return } -func (h *HttpApiServer) ctrlKickOutSessionHandler(w http.ResponseWriter, req *http.Request) { +func (h *HttpApiServer) ctrlKickSessionHandler(w http.ResponseWriter, req *http.Request) { var v base.HttpResponseBasic - var info base.ApiCtrlKickOutSession + var info base.ApiCtrlKickSession err := nazahttp.UnmarshalRequestJsonBody(req, &info, "stream_name", "session_id") if err != nil { @@ -188,7 +179,7 @@ func (h *HttpApiServer) ctrlKickOutSessionHandler(w http.ResponseWriter, req *ht } Log.Infof("http api kick out session. req info=%+v", info) - resp := h.sm.CtrlKickOutSession(info) + resp := h.sm.CtrlKickSession(info) feedback(resp, w) return } diff --git a/pkg/logic/logic.go b/pkg/logic/logic.go index 4e7591b..c30acd6 100644 --- a/pkg/logic/logic.go +++ b/pkg/logic/logic.go @@ -39,9 +39,9 @@ type ILalServer interface { StatLalInfo() base.LalInfo StatAllGroup() (sgs []base.StatGroup) StatGroup(streamName string) *base.StatGroup - CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) (string, error) + CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) base.ApiCtrlStartRelayPull CtrlStopRelayPull(streamName string) base.ApiCtrlStopRelayPull - CtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic + CtrlKickSession(info base.ApiCtrlKickSession) base.HttpResponseBasic } // NewLalServer 创建一个lal server diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 527ace1..f59e3c8 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -263,8 +263,8 @@ func (sm *ServerManager) RunLoop() error { // 关闭空闲的group sm.groupManager.Iterate(func(group *Group) bool { - if group.IsTotalEmpty() { - Log.Infof("erase empty group. [%s]", group.UniqueKey) + if group.IsInactive() { + Log.Infof("erase inactive group. [%s]", group.UniqueKey) group.Dispose() return false } @@ -367,7 +367,7 @@ func (sm *ServerManager) StatGroup(streamName string) *base.StatGroup { return &ret } -func (sm *ServerManager) CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) (string, error) { +func (sm *ServerManager) CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) (ret base.ApiCtrlStartRelayPull) { sm.mutex.Lock() defer sm.mutex.Unlock() @@ -375,7 +375,9 @@ func (sm *ServerManager) CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) if streamName == "" { ctx, err := base.ParseUrl(info.Url, -1) if err != nil { - return "", err + ret.ErrorCode = base.ErrorCodeStartRelayPullFail + ret.Desp = err.Error() + return } streamName = ctx.LastItemOfPath } @@ -383,7 +385,17 @@ func (sm *ServerManager) CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) // 注意,如果group不存在,我们依然relay pull g := sm.getOrCreateGroup("", streamName) - return g.StartPull(info) + sessionId, err := g.StartPull(info) + if err != nil { + ret.ErrorCode = base.ErrorCodeStartRelayPullFail + ret.Desp = err.Error() + } else { + ret.ErrorCode = base.ErrorCodeSucc + ret.Desp = base.DespSucc + ret.Data.StreamName = streamName + ret.Data.SessionId = sessionId + } + return } // CtrlStopRelayPull @@ -413,11 +425,11 @@ func (sm *ServerManager) CtrlStopRelayPull(streamName string) (ret base.ApiCtrlS return } -// CtrlKickOutSession +// CtrlKickSession // // TODO(chef): refactor 不要返回http结果,返回error吧 // -func (sm *ServerManager) CtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic { +func (sm *ServerManager) CtrlKickSession(info base.ApiCtrlKickSession) base.HttpResponseBasic { sm.mutex.Lock() defer sm.mutex.Unlock() g := sm.getGroup("", info.StreamName) @@ -427,7 +439,7 @@ func (sm *ServerManager) CtrlKickOutSession(info base.ApiCtrlKickOutSession) bas Desp: base.DespGroupNotFound, } } - if !g.KickOutSession(info.SessionId) { + if !g.KickSession(info.SessionId) { return base.HttpResponseBasic{ ErrorCode: base.ErrorCodeSessionNotFound, Desp: base.DespSessionNotFound,