pull/248/head
q191201771 2 years ago
parent 08b1e51bdf
commit a6cdd08861

@ -39,8 +39,8 @@
"delete_threshold": 6,
"cleanup_mode": 1,
"use_memory_as_disk_flag": false,
"session_timeout_ms": 30000,
"session_hash_key": "something_secret"
"sub_session_timeout_ms": 30000,
"sub_session_hash_key": "q191201771"
},
"httpts": {
"enable": true,

@ -38,7 +38,9 @@
"fragment_num": 6,
"delete_threshold": 6,
"cleanup_mode": 1,
"use_memory_as_disk_flag": false
"use_memory_as_disk_flag": false,
"sub_session_timeout_ms": 30000,
"sub_session_hash_key": "q191201771"
},
"httpts": {
"enable": true,

@ -37,8 +37,6 @@ var (
ErrSessionNotStarted = errors.New("lal.base: session has not been started yet")
ErrInvalidUrl = errors.New("lal.base: invalid url")
ErrGroupNotFound = errors.New("lal.base: group not found")
)
// ----- pkg/hevc ------------------------------------------------------------------------------------------------------

@ -23,7 +23,7 @@ const (
UkPreFlvPullSession = SessionProtocolFlvStr + SessionBaseTypePullStr // "FLVPULL"
UkPreTsSubSession = SessionProtocolTsStr + SessionBaseTypePubSubStr // "TSSUB"
UkPrePsPubSession = SessionProtocolPsStr + SessionBaseTypePubStr // "PSPUB"
UkPreHlsSubSession = SessionProtocolHlsStr + SessionBaseTypeSubStr // "HLSSUB"
UkPreHlsSubSession = SessionProtocolHlsStr + SessionBaseTypeSubStr // "HLSSUB"
UkPreRtspServerCommandSession = "RTSPSRVCMD" // 这个不暴露给上层

@ -47,7 +47,7 @@ type UrlContext struct {
StdHost string // host or host:port
HostWithPort string // 当原始url中不包含port时填充scheme对应的默认port
Host string // 不包含port
Port int // 当原始url中不包含port时填充scheme对应的默认port
Port int // 当原始url中不包含port时填充scheme对应的默认port
//UrlPathContext
PathWithRawQuery string // 注意,有前面的'/'

@ -27,29 +27,26 @@ type IHlsServerHandlerObserver interface {
}
type ServerHandler struct {
outPath string
observer IHlsServerHandlerObserver
urlPattern string
sessionMap map[string]*SubSession
mutex sync.Mutex
sessionTimeout time.Duration
sessionHashKey string
outPath string
observer IHlsServerHandlerObserver
urlPattern string
sessionMap map[string]*SubSession
mutex sync.Mutex
subSessionTimeout time.Duration
subSessionHashKey string
}
func NewServerHandler(outPath, urlPattern, sessionHashKey string, sessionTimeoutMs int, observer IHlsServerHandlerObserver) *ServerHandler {
func NewServerHandler(outPath, urlPattern, subSessionHashKey string, subSessionTimeoutMs int, observer IHlsServerHandlerObserver) *ServerHandler {
if strings.HasPrefix(urlPattern, "/") {
urlPattern = urlPattern[1:]
}
if sessionTimeoutMs == 0 {
sessionTimeoutMs = 30000
}
sh := &ServerHandler{
outPath: outPath,
observer: observer,
urlPattern: urlPattern,
sessionMap: make(map[string]*SubSession),
sessionTimeout: time.Duration(sessionTimeoutMs) * time.Millisecond,
sessionHashKey: sessionHashKey,
outPath: outPath,
observer: observer,
urlPattern: urlPattern,
sessionMap: make(map[string]*SubSession),
subSessionTimeout: time.Duration(subSessionTimeoutMs) * time.Millisecond,
subSessionHashKey: subSessionHashKey,
}
go sh.runLoop()
return sh
@ -62,16 +59,15 @@ func (s *ServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
return
}
sessionIdHash, err := s.ServeHTTPWithUrlCtx(resp, req, urlCtx)
if err != nil && sessionIdHash != "" {
s.delSubSession(sessionIdHash, nil)
return
}
s.ServeHTTPWithUrlCtx(resp, req, urlCtx)
}
func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http.Request, urlCtx base.UrlContext) (sessionIdHash string, err error) {
func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http.Request, urlCtx base.UrlContext) {
//Log.Debugf("%+v", req)
var sessionIdHash string
var err error
urlObj, _ := url.Parse(urlCtx.Url)
// TODO chef:
@ -80,34 +76,41 @@ func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http.
filename := urlCtx.LastItemOfPath
filetype := urlCtx.GetFileType()
// handle session
sessionIdHash = urlObj.Query().Get("session_id")
if filetype == "ts" && sessionIdHash != "" {
err = s.keepSessionAlive(sessionIdHash)
if err != nil {
Log.Warnf("keepSessionAlive failed. session[%s] err=%+v", sessionIdHash, err)
resp.WriteHeader(http.StatusNotFound)
return
}
} else if filetype == "m3u8" {
var redirectUrl string
redirectUrl, err = s.handleSubSession(sessionIdHash, urlObj, req, urlCtx)
if err != nil {
Log.Warnf("handle hlsSubSession[%s]. err=%+v", sessionIdHash, err)
resp.WriteHeader(http.StatusNotFound)
return
}
if redirectUrl != "" {
if redirectUrl != urlCtx.Url {
// 如果开启了hls sub session功能
if s.isSubSessionModeEnable() {
sessionIdHash = urlObj.Query().Get("session_id")
if filetype == "ts" && sessionIdHash != "" {
// 注意为了增强容错性不管是session_id字段无效还是session_id为空我们都依然返回ts文件内容给播放端
if sessionIdHash != "" {
err = s.keepSessionAlive(sessionIdHash)
if err != nil {
Log.Warnf("keepSessionAlive failed. session=%s, err=%+v", sessionIdHash, err)
}
} else {
// noop
}
} else if filetype == "m3u8" {
if sessionIdHash != "" {
err = s.keepSessionAlive(sessionIdHash)
if err != nil {
Log.Warnf("keepSessionAlive failed. session=%s, err=%+v", sessionIdHash, err)
}
} else {
// m3u8请求时session_id不存在创建session对象并让m3u8跳转到携带session_id的url请求
session, err := s.createSubSession(req, urlCtx)
if err != nil {
resp.WriteHeader(http.StatusNotFound)
return
}
query := urlObj.Query()
query.Set("session_id", session.sessionIdHash)
redirectUrl := urlObj.Path + "?" + query.Encode()
resp.Header().Add("Cache-Control", "no-cache")
resp.Header().Add("Access-Control-Allow-Origin", "*")
http.Redirect(resp, req, redirectUrl, http.StatusFound)
return
} else {
err = errors.New(fmt.Sprintf("duplicate redirect url[%s]", redirectUrl))
Log.Error(err.Error())
resp.WriteHeader(http.StatusBadRequest)
return
}
}
}
@ -134,6 +137,7 @@ func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http.
case "m3u8":
resp.Header().Add("Content-Type", "application/x-mpegurl")
resp.Header().Add("Server", base.LalHlsM3u8Server)
// 给ts文件都携带上session_id字段
if sessionIdHash != "" {
content = bytes.ReplaceAll(content, []byte(".ts"), []byte(".ts?session_id="+sessionIdHash))
}
@ -155,74 +159,54 @@ func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http.
return
}
// getSubSession 获取 SubSession如果不存在返回nil
func (s *ServerHandler) getSubSession(sessionIdHash string) *SubSession {
return s.sessionMap[sessionIdHash]
}
func (s *ServerHandler) keepSessionAlive(sessionIdHash string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
session := s.getSubSession(sessionIdHash)
if session == nil {
return base.ErrHlsSessionNotFound
}
session.KeepAlive()
return nil
return s.sessionMap[sessionIdHash]
}
func (s *ServerHandler) createSubSession(req *http.Request, urlCtx base.UrlContext) (*SubSession, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
session := NewSubSession(req, urlCtx, s.urlPattern, s.sessionHashKey, s.sessionTimeout)
session := NewSubSession(req, urlCtx, s.urlPattern, s.subSessionHashKey, s.subSessionTimeout)
s.sessionMap[session.sessionIdHash] = session
err := s.observer.OnNewHlsSubSession(session)
return session, err
if err := s.observer.OnNewHlsSubSession(session); err != nil {
delete(s.sessionMap, session.sessionIdHash)
return nil, err
}
return session, nil
}
func (s *ServerHandler) delSubSession(sessionIdHash string, session *SubSession) {
// keepSessionAlive 标记延长session存活时间如果session不存在返回 base.ErrHlsSessionNotFound
func (s *ServerHandler) keepSessionAlive(sessionIdHash string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
session := s.sessionMap[sessionIdHash]
if session == nil {
session = s.sessionMap[sessionIdHash]
}
if session != nil {
delete(s.sessionMap, sessionIdHash)
s.observer.OnDelHlsSubSession(session)
}
}
func (s *ServerHandler) onSubSessionExpired(sessionIdHash string, session *SubSession) {
s.delSubSession(sessionIdHash, session)
}
func (s *ServerHandler) handleSubSession(sessionIdHash string, urlObj *url.URL, req *http.Request, urlCtx base.UrlContext) (redirectUrl string, err error) {
if sessionIdHash != "" {
err = s.keepSessionAlive(sessionIdHash)
if err != nil {
return "", err
}
} else {
session, err := s.createSubSession(req, urlCtx)
if err != nil {
return "", err
}
query := urlObj.Query()
query.Set("session_id", session.sessionIdHash)
redirectUrl = urlObj.Path + "?" + query.Encode()
return redirectUrl, nil
return base.ErrHlsSessionNotFound
}
return "", nil
session.KeepAlive()
return nil
}
func (s *ServerHandler) clearExpireSession() {
s.mutex.Lock()
defer s.mutex.Unlock()
for sessionIdHash, session := range s.sessionMap {
if session.IsExpired() {
s.onSubSessionExpired(sessionIdHash, session)
delete(s.sessionMap, sessionIdHash)
s.observer.OnDelHlsSubSession(session)
}
}
}
func (s *ServerHandler) isSubSessionModeEnable() bool {
return s.subSessionHashKey != ""
}
func (s *ServerHandler) runLoop() {
// TODO(chef): [refactor] 也许可以弄到group中管理超时和其他协议的session管理方式保持一致 202211
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
s.clearExpireSession()

@ -1,3 +1,11 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
@ -43,6 +51,7 @@ func NewSubSession(req *http.Request, urlCtx base.UrlContext, hlsUrlPattern, ses
session.stat.RemoteAddr = req.RemoteAddr
session.stat.StartTime = time.Now().String()
// TODO(chef): [refactor] 也许后续可以弄短点比如前8位或16位 202211
session.sessionIdHash = nazamd5.Md5([]byte(session.stat.SessionId + sessionHashKey))
return session
}

@ -90,8 +90,8 @@ type HlsConfig struct {
UseMemoryAsDiskFlag bool `json:"use_memory_as_disk_flag"`
hls.MuxerConfig
SessionTimeoutMs int `json:"session_timeout_ms"`
SessionHashKey string `json:"session_hash_key"`
SubSessionTimeoutMs int `json:"sub_session_timeout_ms"`
SubSessionHashKey string `json:"session_hash_key"`
}
type RtspConfig struct {
@ -296,16 +296,21 @@ func LoadConfAndInitLog(rawContent []byte) *Config {
config.HlsConfig.FragmentNum)
config.HlsConfig.DeleteThreshold = config.HlsConfig.FragmentNum
}
if config.HlsConfig.SubSessionHashKey != "" && config.HlsConfig.SubSessionTimeoutMs == 0 {
// 没有设置超时值或者超时为0时
Log.Warnf("config hls.sub_session_timeout_ms is 0. set to %d(which is fragment_num * fragment_duration_ms * 2)",
config.HlsConfig.FragmentNum*config.HlsConfig.FragmentDurationMs*2)
}
if (config.HttpflvConfig.Enable || config.HttpflvConfig.EnableHttps) && !j.Exist("httpflv.url_pattern") {
Log.Warnf("config httpflv.url_pattern not exist. set to default wchich is %s", defaultHttpflvUrlPattern)
Log.Warnf("config httpflv.url_pattern not exist. set to default which is %s", defaultHttpflvUrlPattern)
config.HttpflvConfig.UrlPattern = defaultHttpflvUrlPattern
}
if (config.HttptsConfig.Enable || config.HttptsConfig.EnableHttps) && !j.Exist("httpts.url_pattern") {
Log.Warnf("config httpts.url_pattern not exist. set to default wchich is %s", defaultHttptsUrlPattern)
Log.Warnf("config httpts.url_pattern not exist. set to default which is %s", defaultHttptsUrlPattern)
config.HttptsConfig.UrlPattern = defaultHttptsUrlPattern
}
if (config.HlsConfig.Enable || config.HlsConfig.EnableHttps) && !j.Exist("hls.url_pattern") {
Log.Warnf("config hls.url_pattern not exist. set to default wchich is %s", defaultHlsUrlPattern)
Log.Warnf("config hls.url_pattern not exist. set to default which is %s", defaultHlsUrlPattern)
config.HttpflvConfig.UrlPattern = defaultHlsUrlPattern
}

@ -43,6 +43,8 @@ import (
// | group.updateAllSessionStat()更新信息 | Y | Y |
// | group.inSessionUniqueKey() | Y | Y |
// TODO(chef): [refactor] 整理sub类型流接入需要做的事情的文档 202211
// ---------------------------------------------------------------------------------------------------------------------
// 输入流到输出流的转换路径关系一共6种输入
//

@ -122,7 +122,7 @@ Doc: %s
sm.config.HlsConfig.Enable || sm.config.HlsConfig.EnableHttps {
sm.httpServerManager = base.NewHttpServerManager()
sm.httpServerHandler = NewHttpServerHandler(sm)
sm.hlsServerHandler = hls.NewServerHandler(sm.config.HlsConfig.OutPath, sm.config.HlsConfig.UrlPattern, sm.config.HlsConfig.SessionHashKey, sm.config.HlsConfig.SessionTimeoutMs, sm)
sm.hlsServerHandler = hls.NewServerHandler(sm.config.HlsConfig.OutPath, sm.config.HlsConfig.UrlPattern, sm.config.HlsConfig.SubSessionHashKey, sm.config.HlsConfig.SubSessionTimeoutMs, sm)
}
if sm.config.RtmpConfig.Enable {

Loading…
Cancel
Save