diff --git a/conf/lalserver.conf.json b/conf/lalserver.conf.json index 4f73dc6..cc12f4d 100644 --- a/conf/lalserver.conf.json +++ b/conf/lalserver.conf.json @@ -39,8 +39,8 @@ "delete_threshold": 6, "cleanup_mode": 1, "use_memory_as_disk_flag": false, - "session_timeout_ms": 30000, - "session_hash_key": "something_secret" + "sub_session_timeout_ms": 30000, + "sub_session_hash_key": "q191201771" }, "httpts": { "enable": true, diff --git a/conf/lalserver.conf.json.tmpl b/conf/lalserver.conf.json.tmpl index b31ecc8..cc12f4d 100644 --- a/conf/lalserver.conf.json.tmpl +++ b/conf/lalserver.conf.json.tmpl @@ -38,7 +38,9 @@ "fragment_num": 6, "delete_threshold": 6, "cleanup_mode": 1, - "use_memory_as_disk_flag": false + "use_memory_as_disk_flag": false, + "sub_session_timeout_ms": 30000, + "sub_session_hash_key": "q191201771" }, "httpts": { "enable": true, diff --git a/pkg/base/error.go b/pkg/base/error.go index 6fb7265..64d213d 100644 --- a/pkg/base/error.go +++ b/pkg/base/error.go @@ -37,8 +37,6 @@ var ( ErrSessionNotStarted = errors.New("lal.base: session has not been started yet") ErrInvalidUrl = errors.New("lal.base: invalid url") - - ErrGroupNotFound = errors.New("lal.base: group not found") ) // ----- pkg/hevc ------------------------------------------------------------------------------------------------------ diff --git a/pkg/base/t_unique.go b/pkg/base/t_unique.go index 405079a..94ebe1b 100644 --- a/pkg/base/t_unique.go +++ b/pkg/base/t_unique.go @@ -23,7 +23,7 @@ const ( UkPreFlvPullSession = SessionProtocolFlvStr + SessionBaseTypePullStr // "FLVPULL" UkPreTsSubSession = SessionProtocolTsStr + SessionBaseTypePubSubStr // "TSSUB" UkPrePsPubSession = SessionProtocolPsStr + SessionBaseTypePubStr // "PSPUB" - UkPreHlsSubSession = SessionProtocolHlsStr + SessionBaseTypeSubStr // "HLSSUB" + UkPreHlsSubSession = SessionProtocolHlsStr + SessionBaseTypeSubStr // "HLSSUB" UkPreRtspServerCommandSession = "RTSPSRVCMD" // 这个不暴露给上层 diff --git a/pkg/base/url.go b/pkg/base/url.go index 4cd1476..0b189c6 100644 --- a/pkg/base/url.go +++ b/pkg/base/url.go @@ -47,7 +47,7 @@ type UrlContext struct { StdHost string // host or host:port HostWithPort string // 当原始url中不包含port时,填充scheme对应的默认port Host string // 不包含port - Port int // 当原始url中不包含port时,填充scheme对应的默认port + Port int // 当原始url中不包含port时,填充scheme对应的默认port //UrlPathContext PathWithRawQuery string // 注意,有前面的'/' diff --git a/pkg/hls/server_handler.go b/pkg/hls/server_handler.go index f12d8e3..56578ac 100644 --- a/pkg/hls/server_handler.go +++ b/pkg/hls/server_handler.go @@ -27,29 +27,26 @@ type IHlsServerHandlerObserver interface { } type ServerHandler struct { - outPath string - observer IHlsServerHandlerObserver - urlPattern string - sessionMap map[string]*SubSession - mutex sync.Mutex - sessionTimeout time.Duration - sessionHashKey string + outPath string + observer IHlsServerHandlerObserver + urlPattern string + sessionMap map[string]*SubSession + mutex sync.Mutex + subSessionTimeout time.Duration + subSessionHashKey string } -func NewServerHandler(outPath, urlPattern, sessionHashKey string, sessionTimeoutMs int, observer IHlsServerHandlerObserver) *ServerHandler { +func NewServerHandler(outPath, urlPattern, subSessionHashKey string, subSessionTimeoutMs int, observer IHlsServerHandlerObserver) *ServerHandler { if strings.HasPrefix(urlPattern, "/") { urlPattern = urlPattern[1:] } - if sessionTimeoutMs == 0 { - sessionTimeoutMs = 30000 - } sh := &ServerHandler{ - outPath: outPath, - observer: observer, - urlPattern: urlPattern, - sessionMap: make(map[string]*SubSession), - sessionTimeout: time.Duration(sessionTimeoutMs) * time.Millisecond, - sessionHashKey: sessionHashKey, + outPath: outPath, + observer: observer, + urlPattern: urlPattern, + sessionMap: make(map[string]*SubSession), + subSessionTimeout: time.Duration(subSessionTimeoutMs) * time.Millisecond, + subSessionHashKey: subSessionHashKey, } go sh.runLoop() return sh @@ -62,16 +59,15 @@ func (s *ServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { return } - sessionIdHash, err := s.ServeHTTPWithUrlCtx(resp, req, urlCtx) - if err != nil && sessionIdHash != "" { - s.delSubSession(sessionIdHash, nil) - return - } + s.ServeHTTPWithUrlCtx(resp, req, urlCtx) } -func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http.Request, urlCtx base.UrlContext) (sessionIdHash string, err error) { +func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http.Request, urlCtx base.UrlContext) { //Log.Debugf("%+v", req) + var sessionIdHash string + var err error + urlObj, _ := url.Parse(urlCtx.Url) // TODO chef: @@ -80,34 +76,41 @@ func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http. filename := urlCtx.LastItemOfPath filetype := urlCtx.GetFileType() - // handle session - sessionIdHash = urlObj.Query().Get("session_id") - if filetype == "ts" && sessionIdHash != "" { - err = s.keepSessionAlive(sessionIdHash) - if err != nil { - Log.Warnf("keepSessionAlive failed. session[%s] err=%+v", sessionIdHash, err) - resp.WriteHeader(http.StatusNotFound) - return - } - } else if filetype == "m3u8" { - var redirectUrl string - redirectUrl, err = s.handleSubSession(sessionIdHash, urlObj, req, urlCtx) - if err != nil { - Log.Warnf("handle hlsSubSession[%s]. err=%+v", sessionIdHash, err) - resp.WriteHeader(http.StatusNotFound) - return - } - if redirectUrl != "" { - if redirectUrl != urlCtx.Url { + // 如果开启了hls sub session功能 + if s.isSubSessionModeEnable() { + sessionIdHash = urlObj.Query().Get("session_id") + if filetype == "ts" && sessionIdHash != "" { + // 注意,为了增强容错性,不管是session_id字段无效,还是session_id为空,我们都依然返回ts文件内容给播放端 + if sessionIdHash != "" { + err = s.keepSessionAlive(sessionIdHash) + if err != nil { + Log.Warnf("keepSessionAlive failed. session=%s, err=%+v", sessionIdHash, err) + } + } else { + // noop + } + } else if filetype == "m3u8" { + if sessionIdHash != "" { + err = s.keepSessionAlive(sessionIdHash) + if err != nil { + Log.Warnf("keepSessionAlive failed. session=%s, err=%+v", sessionIdHash, err) + } + } else { + // m3u8请求时,session_id不存在,创建session对象,并让m3u8跳转到携带session_id的url请求 + + session, err := s.createSubSession(req, urlCtx) + if err != nil { + resp.WriteHeader(http.StatusNotFound) + return + } + + query := urlObj.Query() + query.Set("session_id", session.sessionIdHash) + redirectUrl := urlObj.Path + "?" + query.Encode() resp.Header().Add("Cache-Control", "no-cache") resp.Header().Add("Access-Control-Allow-Origin", "*") http.Redirect(resp, req, redirectUrl, http.StatusFound) return - } else { - err = errors.New(fmt.Sprintf("duplicate redirect url[%s]", redirectUrl)) - Log.Error(err.Error()) - resp.WriteHeader(http.StatusBadRequest) - return } } } @@ -134,6 +137,7 @@ func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http. case "m3u8": resp.Header().Add("Content-Type", "application/x-mpegurl") resp.Header().Add("Server", base.LalHlsM3u8Server) + // 给ts文件都携带上session_id字段 if sessionIdHash != "" { content = bytes.ReplaceAll(content, []byte(".ts"), []byte(".ts?session_id="+sessionIdHash)) } @@ -155,74 +159,54 @@ func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http. return } +// getSubSession 获取 SubSession,如果不存在,返回nil func (s *ServerHandler) getSubSession(sessionIdHash string) *SubSession { - return s.sessionMap[sessionIdHash] -} - -func (s *ServerHandler) keepSessionAlive(sessionIdHash string) error { s.mutex.Lock() defer s.mutex.Unlock() - session := s.getSubSession(sessionIdHash) - if session == nil { - return base.ErrHlsSessionNotFound - } - session.KeepAlive() - return nil + return s.sessionMap[sessionIdHash] } func (s *ServerHandler) createSubSession(req *http.Request, urlCtx base.UrlContext) (*SubSession, error) { s.mutex.Lock() defer s.mutex.Unlock() - session := NewSubSession(req, urlCtx, s.urlPattern, s.sessionHashKey, s.sessionTimeout) + session := NewSubSession(req, urlCtx, s.urlPattern, s.subSessionHashKey, s.subSessionTimeout) s.sessionMap[session.sessionIdHash] = session - err := s.observer.OnNewHlsSubSession(session) - return session, err + if err := s.observer.OnNewHlsSubSession(session); err != nil { + delete(s.sessionMap, session.sessionIdHash) + return nil, err + } + return session, nil } -func (s *ServerHandler) delSubSession(sessionIdHash string, session *SubSession) { +// keepSessionAlive 标记延长session存活时间,如果session不存在,返回 base.ErrHlsSessionNotFound +func (s *ServerHandler) keepSessionAlive(sessionIdHash string) error { s.mutex.Lock() defer s.mutex.Unlock() + session := s.sessionMap[sessionIdHash] if session == nil { - session = s.sessionMap[sessionIdHash] - } - if session != nil { - delete(s.sessionMap, sessionIdHash) - s.observer.OnDelHlsSubSession(session) - } -} - -func (s *ServerHandler) onSubSessionExpired(sessionIdHash string, session *SubSession) { - s.delSubSession(sessionIdHash, session) -} - -func (s *ServerHandler) handleSubSession(sessionIdHash string, urlObj *url.URL, req *http.Request, urlCtx base.UrlContext) (redirectUrl string, err error) { - if sessionIdHash != "" { - err = s.keepSessionAlive(sessionIdHash) - if err != nil { - return "", err - } - } else { - session, err := s.createSubSession(req, urlCtx) - if err != nil { - return "", err - } - query := urlObj.Query() - query.Set("session_id", session.sessionIdHash) - redirectUrl = urlObj.Path + "?" + query.Encode() - return redirectUrl, nil + return base.ErrHlsSessionNotFound } - return "", nil + session.KeepAlive() + return nil } func (s *ServerHandler) clearExpireSession() { + s.mutex.Lock() + defer s.mutex.Unlock() for sessionIdHash, session := range s.sessionMap { if session.IsExpired() { - s.onSubSessionExpired(sessionIdHash, session) + delete(s.sessionMap, sessionIdHash) + s.observer.OnDelHlsSubSession(session) } } } +func (s *ServerHandler) isSubSessionModeEnable() bool { + return s.subSessionHashKey != "" +} + func (s *ServerHandler) runLoop() { + // TODO(chef): [refactor] 也许可以弄到group中管理超时,和其他协议的session管理方式保持一致 202211 ticker := time.NewTicker(1 * time.Second) for range ticker.C { s.clearExpireSession() diff --git a/pkg/hls/server_sub_session.go b/pkg/hls/server_sub_session.go index 7f3db2a..5624b48 100644 --- a/pkg/hls/server_sub_session.go +++ b/pkg/hls/server_sub_session.go @@ -1,3 +1,11 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + package hls import ( @@ -43,6 +51,7 @@ func NewSubSession(req *http.Request, urlCtx base.UrlContext, hlsUrlPattern, ses session.stat.RemoteAddr = req.RemoteAddr session.stat.StartTime = time.Now().String() + // TODO(chef): [refactor] 也许后续可以弄短点,比如前8位或16位 202211 session.sessionIdHash = nazamd5.Md5([]byte(session.stat.SessionId + sessionHashKey)) return session } diff --git a/pkg/logic/config.go b/pkg/logic/config.go index 31f3888..036d1fa 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -90,8 +90,8 @@ type HlsConfig struct { UseMemoryAsDiskFlag bool `json:"use_memory_as_disk_flag"` hls.MuxerConfig - SessionTimeoutMs int `json:"session_timeout_ms"` - SessionHashKey string `json:"session_hash_key"` + SubSessionTimeoutMs int `json:"sub_session_timeout_ms"` + SubSessionHashKey string `json:"session_hash_key"` } type RtspConfig struct { @@ -296,16 +296,21 @@ func LoadConfAndInitLog(rawContent []byte) *Config { config.HlsConfig.FragmentNum) config.HlsConfig.DeleteThreshold = config.HlsConfig.FragmentNum } + if config.HlsConfig.SubSessionHashKey != "" && config.HlsConfig.SubSessionTimeoutMs == 0 { + // 没有设置超时值,或者超时为0时 + Log.Warnf("config hls.sub_session_timeout_ms is 0. set to %d(which is fragment_num * fragment_duration_ms * 2)", + config.HlsConfig.FragmentNum*config.HlsConfig.FragmentDurationMs*2) + } if (config.HttpflvConfig.Enable || config.HttpflvConfig.EnableHttps) && !j.Exist("httpflv.url_pattern") { - Log.Warnf("config httpflv.url_pattern not exist. set to default wchich is %s", defaultHttpflvUrlPattern) + Log.Warnf("config httpflv.url_pattern not exist. set to default which is %s", defaultHttpflvUrlPattern) config.HttpflvConfig.UrlPattern = defaultHttpflvUrlPattern } if (config.HttptsConfig.Enable || config.HttptsConfig.EnableHttps) && !j.Exist("httpts.url_pattern") { - Log.Warnf("config httpts.url_pattern not exist. set to default wchich is %s", defaultHttptsUrlPattern) + Log.Warnf("config httpts.url_pattern not exist. set to default which is %s", defaultHttptsUrlPattern) config.HttptsConfig.UrlPattern = defaultHttptsUrlPattern } if (config.HlsConfig.Enable || config.HlsConfig.EnableHttps) && !j.Exist("hls.url_pattern") { - Log.Warnf("config hls.url_pattern not exist. set to default wchich is %s", defaultHlsUrlPattern) + Log.Warnf("config hls.url_pattern not exist. set to default which is %s", defaultHlsUrlPattern) config.HttpflvConfig.UrlPattern = defaultHlsUrlPattern } diff --git a/pkg/logic/group__.go b/pkg/logic/group__.go index 3f7fe67..afe75c3 100644 --- a/pkg/logic/group__.go +++ b/pkg/logic/group__.go @@ -43,6 +43,8 @@ import ( // | group.updateAllSessionStat()更新信息 | Y | Y | // | group.inSessionUniqueKey() | Y | Y | +// TODO(chef): [refactor] 整理sub类型流接入需要做的事情的文档 202211 + // --------------------------------------------------------------------------------------------------------------------- // 输入流到输出流的转换路径关系(一共6种输入): // diff --git a/pkg/logic/server_manager__.go b/pkg/logic/server_manager__.go index 3b1a483..5069819 100644 --- a/pkg/logic/server_manager__.go +++ b/pkg/logic/server_manager__.go @@ -122,7 +122,7 @@ Doc: %s sm.config.HlsConfig.Enable || sm.config.HlsConfig.EnableHttps { sm.httpServerManager = base.NewHttpServerManager() sm.httpServerHandler = NewHttpServerHandler(sm) - sm.hlsServerHandler = hls.NewServerHandler(sm.config.HlsConfig.OutPath, sm.config.HlsConfig.UrlPattern, sm.config.HlsConfig.SessionHashKey, sm.config.HlsConfig.SessionTimeoutMs, sm) + sm.hlsServerHandler = hls.NewServerHandler(sm.config.HlsConfig.OutPath, sm.config.HlsConfig.UrlPattern, sm.config.HlsConfig.SubSessionHashKey, sm.config.HlsConfig.SubSessionTimeoutMs, sm) } if sm.config.RtmpConfig.Enable {