From f16b1f8d4fc9457d58c68d21dbded89eef3a731f Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 15 Jan 2022 11:08:03 +0800 Subject: [PATCH] =?UTF-8?q?-=20[feat]=20simple=20auth=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=94=AF=E6=8C=81rtsp,httpts,hls=E5=8D=8F=E8=AE=AE=20-=20[feat?= =?UTF-8?q?]=20simple=20auth=E9=89=B4=E6=9D=83=E5=8F=AF=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E5=90=8E=E9=97=A8=E9=89=B4=E6=9D=83=E5=8F=82=E6=95=B0=20-=20[o?= =?UTF-8?q?pt]=20simple=20auth=E9=89=B4=E6=9D=83=E5=8F=82=E6=95=B0?= =?UTF-8?q?=E7=9A=84md5=E5=80=BC=E5=85=BC=E5=AE=B9=E5=A4=A7=E5=B0=8F?= =?UTF-8?q?=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/ConfigBrief.md | 24 +++++++--- conf/lalserver.conf.json | 9 +++- conf/lalserver.conf.json.tmpl | 9 +++- pkg/base/url.go | 59 ++++++++++++++++++++++--- pkg/hls/path_strategy.go | 49 ++++++++++++--------- pkg/hls/path_strategy_test.go | 28 +++++------- pkg/hls/server_handler.go | 43 +++++++----------- pkg/innertest/iface_impl.go | 2 +- pkg/logic/config.go | 14 +++--- pkg/logic/group.go | 15 +++---- pkg/logic/http_server_handler.go | 22 +++------ pkg/logic/server_manager.go | 71 +++++++++++++++++++++++------- pkg/logic/simple_auth.go | 15 +++++-- pkg/logic/simple_auth_test.go | 26 ++++++++++- pkg/rtsp/server.go | 29 +++++++----- pkg/rtsp/server_command_session.go | 19 ++++---- 16 files changed, 283 insertions(+), 151 deletions(-) diff --git a/conf/ConfigBrief.md b/conf/ConfigBrief.md index 78bb814..74a76ba 100644 --- a/conf/ConfigBrief.md +++ b/conf/ConfigBrief.md @@ -1,14 +1,17 @@ +# lalserver 配置文件说明 + ``` { "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", //. 配置文件对应的文档说明链接,在程序中没实际用途 - "conf_version": "0.2.3", //. 配置文件版本号,业务方不应该手动修改,程序中会检查该版本 + "conf_version": "0.2.6", //. 配置文件版本号,业务方不应该手动修改,程序中会检查该版本 // 号是否与代码中声明的一致 "rtmp": { "enable": true, //. 是否开启rtmp服务的监听 // 注意,配置文件中控制各协议类型的enable开关都应该按需打开,避免造成不必要的协议转换的开销 - "addr": ":19350", //. RTMP服务监听的端口,客户端向lalserver推拉流都是这个地址 - "gop_num": 2, //. RTMP拉流的GOP缓存数量,加速流打开时间,但是可能增加延时 - "merge_write_size": 8192, //. 将小包数据合并进行发送,单位字节,提高服务器性能,但是可能造成卡顿 + "addr": ":1935", //. RTMP服务监听的端口,客户端向lalserver推拉流都是这个地址 + "gop_num": 0, //. RTMP拉流的GOP缓存数量,加速流打开时间,但是可能增加延时 + //. 如果为0,则不使用缓存发送 + "merge_write_size": 0, //. 将小包数据合并进行发送,单位字节,提高服务器性能,但是可能造成卡顿 // 如果为0,则不合并发送 "add_dummy_audio_enable": false, //. 是否开启动态检测添加静音AAC数据的功能 // 如果开启,rtmp pub推流时,如果超过`add_dummy_audio_wait_audio_ms`时间依然没有 @@ -28,7 +31,7 @@ "enable_https": true, //. 是否开启HTTPS-FLV监听 "url_pattern": "/", //. 拉流url路由路径地址。默认值为`/`,表示不受限制,路由地址可以为任意路径地址。 // 如果设置为`/live/`,则只能从`/live/`路径下拉流,比如`/live/test110.flv` - "gop_num": 2 //. + "gop_num": 0 //. 见rtmp.gop_num }, "hls": { "enable": true, //. 是否开启HLS服务的监听 @@ -94,6 +97,17 @@ "on_sub_stop": "http://127.0.0.1:10101/on_sub_stop", "on_rtmp_connect": "http://127.0.0.1:10101/on_rtmp_connect" }, + "simple_auth": { // 鉴权文档见: https://pengrl.com/lal/#/auth + "key": "q191201771", // 私有key,计算md5鉴权参数时使用 + "dangerous_lal_secret": "pengrl", // 后门鉴权参数,所有的流可通过该参数值鉴权 + "pub_rtmp_enable": false, // rtmp推流是否开启鉴权,true为开启鉴权,false为不开启鉴权 + "sub_rtmp_enable": false, // rtmp拉流是否开启鉴权 + "sub_httpflv_enable": false, // httpflv拉流是否开启鉴权 + "sub_httpts_enable": false, // httpts拉流是否开启鉴权 + "pub_rtsp_enable": false, // rtsp推流是否开启鉴权 + "sub_rtsp_enable": false, // rtsp拉流是否开启鉴权 + "hls_m3u8_enable": true // m3u8拉流是否开启鉴权 + }, "pprof": { "enable": true, //. 是否开启Go pprof web服务的监听 "addr": ":8084" //. Go pprof web地址 diff --git a/conf/lalserver.conf.json b/conf/lalserver.conf.json index 7485aa5..5ba0471 100644 --- a/conf/lalserver.conf.json +++ b/conf/lalserver.conf.json @@ -1,6 +1,6 @@ { "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", - "conf_version": "v0.2.5", + "conf_version": "v0.2.6", "rtmp": { "enable": true, "addr": ":1935", @@ -73,9 +73,14 @@ }, "simple_auth": { "key": "q191201771", + "dangerous_lal_secret": "pengrl", "pub_rtmp_enable": false, "sub_rtmp_enable": false, - "sub_httpflv_enable": false + "sub_httpflv_enable": false, + "sub_httpts_enable": false, + "pub_rtsp_enable": false, + "sub_rtsp_enable": false, + "hls_m3u8_enable": false }, "pprof": { "enable": true, diff --git a/conf/lalserver.conf.json.tmpl b/conf/lalserver.conf.json.tmpl index 7485aa5..5ba0471 100644 --- a/conf/lalserver.conf.json.tmpl +++ b/conf/lalserver.conf.json.tmpl @@ -1,6 +1,6 @@ { "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", - "conf_version": "v0.2.5", + "conf_version": "v0.2.6", "rtmp": { "enable": true, "addr": ":1935", @@ -73,9 +73,14 @@ }, "simple_auth": { "key": "q191201771", + "dangerous_lal_secret": "pengrl", "pub_rtmp_enable": false, "sub_rtmp_enable": false, - "sub_httpflv_enable": false + "sub_httpflv_enable": false, + "sub_httpts_enable": false, + "pub_rtsp_enable": false, + "sub_rtsp_enable": false, + "hls_m3u8_enable": false }, "pprof": { "enable": true, diff --git a/pkg/base/url.go b/pkg/base/url.go index 3829144..2c57daf 100644 --- a/pkg/base/url.go +++ b/pkg/base/url.go @@ -11,6 +11,7 @@ package base import ( "fmt" "net" + "net/http" "net/url" "strconv" "strings" @@ -54,8 +55,33 @@ type UrlContext struct { RawQuery string RawUrlWithoutUserInfo string + + filenameWithoutType string + fileType string +} + +func (u *UrlContext) GetFilenameWithoutType() string { + u.calcFilenameAndTypeIfNeeded() + return u.filenameWithoutType } +func (u *UrlContext) GetFileType() string { + u.calcFilenameAndTypeIfNeeded() + return u.fileType +} + +func (u *UrlContext) calcFilenameAndTypeIfNeeded() { + if len(u.filenameWithoutType) == 0 || len(u.fileType) == 0 { + ss := strings.Split(u.LastItemOfPath, ".") + u.filenameWithoutType = ss[0] + if len(ss) > 1 { + u.fileType = ss[1] + } + } +} + +// --------------------------------------------------------------------------------------------------------------------- + // ParseUrl // // @param defaultPort: 注意,如果rawUrl中显示指定了端口,则该参数不生效 @@ -127,6 +153,8 @@ func ParseUrl(rawUrl string, defaultPort int) (ctx UrlContext, err error) { return ctx, nil } +// --------------------------------------------------------------------------------------------------------------------- + func ParseRtmpUrl(rawUrl string) (ctx UrlContext, err error) { ctx, err = ParseUrl(rawUrl, -1) if err != nil { @@ -147,10 +175,6 @@ func ParseRtmpUrl(rawUrl string) (ctx UrlContext, err error) { return } -func ParseHttpflvUrl(rawUrl string) (ctx UrlContext, err error) { - return ParseHttpUrl(rawUrl, ".flv") -} - func ParseRtspUrl(rawUrl string) (ctx UrlContext, err error) { ctx, err = ParseUrl(rawUrl, -1) if err != nil { @@ -163,6 +187,29 @@ func ParseRtspUrl(rawUrl string) (ctx UrlContext, err error) { return } +func ParseHttpflvUrl(rawUrl string) (ctx UrlContext, err error) { + return parseHttpUrl(rawUrl, ".flv") +} + +// --------------------------------------------------------------------------------------------------------------------- + +// ParseHttpRequest +// +// @return 完整url +// +func ParseHttpRequest(req *http.Request) string { + // TODO(chef): [refactor] scheme是否能从从req.URL.Scheme获取 + var scheme string + if req.TLS == nil { + scheme = "http" + } else { + scheme = "https" + } + return fmt.Sprintf("%s://%s%s", scheme, req.Host, req.RequestURI) +} + +// ----- private ------------------------------------------------------------------------------------------------------- + func parseUrlPath(stdUrl *url.URL) (ctx UrlPathContext, err error) { ctx.Path = stdUrl.Path @@ -194,12 +241,12 @@ func parseUrlPath(stdUrl *url.URL) (ctx UrlPathContext, err error) { return ctx, nil } -func ParseHttpUrl(rawUrl string, suffix string) (ctx UrlContext, err error) { +func parseHttpUrl(rawUrl string, filetype string) (ctx UrlContext, err error) { ctx, err = ParseUrl(rawUrl, -1) if err != nil { return } - if (ctx.Scheme != "http" && ctx.Scheme != "https") || ctx.Host == "" || ctx.Path == "" || !strings.HasSuffix(ctx.LastItemOfPath, suffix) { + if (ctx.Scheme != "http" && ctx.Scheme != "https") || ctx.Host == "" || ctx.Path == "" || !strings.HasSuffix(ctx.LastItemOfPath, filetype) { return ctx, fmt.Errorf("%w. url=%s", ErrInvalidUrl, rawUrl) } diff --git a/pkg/hls/path_strategy.go b/pkg/hls/path_strategy.go index 60d6baf..69643fd 100644 --- a/pkg/hls/path_strategy.go +++ b/pkg/hls/path_strategy.go @@ -12,6 +12,8 @@ import ( "fmt" "path/filepath" "strings" + + "github.com/q191201771/lal/pkg/base" ) // 聚合以下功能: @@ -19,11 +21,8 @@ import ( // - 路由策略: HTTP请求HLS时,request URI和文件路径的映射规则 type RequestInfo struct { - FileName string - FileType string - - StreamName string - FileNameWithPath string + StreamName string // uri结合策略 + FileNameWithPath string // uri结合策略, 从磁盘打开文件时使用 } type IPathStrategy interface { @@ -31,11 +30,17 @@ type IPathStrategy interface { IPathWriteStrategy } +// IPathRequestStrategy +// // 路由策略 // 接到HTTP请求时,对应文件路径的映射逻辑 +// type IPathRequestStrategy interface { - // 解析HTTP请求,得到文件名、文件类型、流名称、文件所在路径 - GetRequestInfo(uri string, rootOutPath string) RequestInfo + // GetRequestInfo + // + // 解析HTTP请求,得到流名称、文件所在路径 + // + GetRequestInfo(urlCtx base.UrlContext, rootOutPath string) RequestInfo } // 落盘策略 @@ -99,6 +104,8 @@ const ( type DefaultPathStrategy struct { } +// GetRequestInfo +// // RequestURI example: // uri -> FileName StreamName FileType FileNameWithPath // /hls/test110.m3u8 -> test110.m3u8 test110 m3u8 {rootOutPath}/test110/playlist.m3u8 @@ -106,24 +113,24 @@ type DefaultPathStrategy struct { // /hls/test110/record.m3u8 -> record.m3u8 test110 m3u8 {rootOutPath}/test110/record.m3u8 // /hls/test110/test110-1620540712084-.ts -> test110-1620540712084-.ts test110 ts {rootOutPath/test110/test110-1620540712084-.ts // /hls/test110-1620540712084-.ts -> test110-1620540712084-.ts test110 ts {rootOutPath/test110/test110-1620540712084-.ts -func (dps *DefaultPathStrategy) GetRequestInfo(uri string, rootOutPath string) (ri RequestInfo) { - uriItems := strings.Split(uri, "/") - ri.FileName = uriItems[len(uriItems)-1] - fileNameItems := strings.Split(ri.FileName, ".") - fileNameWithOutType := fileNameItems[0] - ri.FileType = fileNameItems[len(fileNameItems)-1] - - if ri.FileType == "m3u8" { - if ri.FileName == playlistM3u8FileName || ri.FileName == recordM3u8FileName { +// +func (dps *DefaultPathStrategy) GetRequestInfo(urlCtx base.UrlContext, rootOutPath string) (ri RequestInfo) { + filename := urlCtx.LastItemOfPath + filetype := urlCtx.GetFileType() + fileNameWithoutType := urlCtx.GetFilenameWithoutType() + + if filetype == "m3u8" { + if filename == playlistM3u8FileName || filename == recordM3u8FileName { + uriItems := strings.Split(urlCtx.Path, "/") ri.StreamName = uriItems[len(uriItems)-2] - ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, ri.FileName) + ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, filename) } else { - ri.StreamName = fileNameWithOutType + ri.StreamName = fileNameWithoutType ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, playlistM3u8FileName) } - } else if ri.FileType == "ts" { - ri.StreamName = dps.getStreamNameFromTsFileName(ri.FileName) - ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, ri.FileName) + } else if filetype == "ts" { + ri.StreamName = dps.getStreamNameFromTsFileName(filename) + ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, filename) } return diff --git a/pkg/hls/path_strategy_test.go b/pkg/hls/path_strategy_test.go index 3ae05d7..e8d7525 100644 --- a/pkg/hls/path_strategy_test.go +++ b/pkg/hls/path_strategy_test.go @@ -11,6 +11,10 @@ package hls_test import ( "testing" + "github.com/q191201771/naza/pkg/nazalog" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/hls" "github.com/q191201771/naza/pkg/assert" ) @@ -20,40 +24,32 @@ func TestDefaultPathStrategy_GetRequestInfo(t *testing.T) { rootOutPath := "/tmp/lal/hls/" golden := map[string]hls.RequestInfo{ - "/hls/test110.m3u8": { - FileName: "test110.m3u8", - FileType: "m3u8", + "http://127.0.0.1:8080/hls/test110.m3u8": { StreamName: "test110", FileNameWithPath: "/tmp/lal/hls/test110/playlist.m3u8", }, - "/hls/test110/playlist.m3u8": { - FileName: "playlist.m3u8", - FileType: "m3u8", + "http://127.0.0.1:8080/hls/test110/playlist.m3u8": { StreamName: "test110", FileNameWithPath: "/tmp/lal/hls/test110/playlist.m3u8", }, - "/hls/test110/record.m3u8": { - FileName: "record.m3u8", - FileType: "m3u8", + "http://127.0.0.1:8080/hls/test110/record.m3u8": { StreamName: "test110", FileNameWithPath: "/tmp/lal/hls/test110/record.m3u8", }, - "/hls/test110/test110-1620540712084-0.ts": { - FileName: "test110-1620540712084-0.ts", - FileType: "ts", + "http://127.0.0.1:8080/hls/test110/test110-1620540712084-0.ts": { StreamName: "test110", FileNameWithPath: "/tmp/lal/hls/test110/test110-1620540712084-0.ts", }, - "/hls/test110-1620540712084-0.ts": { - FileName: "test110-1620540712084-0.ts", - FileType: "ts", + "http://127.0.0.1:8080/hls/test110-1620540712084-0.ts": { StreamName: "test110", FileNameWithPath: "/tmp/lal/hls/test110/test110-1620540712084-0.ts", }, } for k, v := range golden { - out := dps.GetRequestInfo(k, rootOutPath) + ctx, err := base.ParseUrl(k, -1) + nazalog.Assert(nil, err) + out := dps.GetRequestInfo(ctx, rootOutPath) assert.Equal(t, v, out) } } diff --git a/pkg/hls/server_handler.go b/pkg/hls/server_handler.go index 8051808..1409c30 100644 --- a/pkg/hls/server_handler.go +++ b/pkg/hls/server_handler.go @@ -18,9 +18,6 @@ import ( type ServerHandler struct { outPath string - //addr string - //ln net.Listener - //httpSrv *http.Server } func NewServerHandler(outPath string) *ServerHandler { @@ -29,37 +26,29 @@ func NewServerHandler(outPath string) *ServerHandler { } } -// -//func (s *Server) Listen() (err error) { -// if s.ln, err = net.Listen("tcp", s.addr); err != nil { -// return -// } -// s.httpSrv = &http.Server{Addr: s.addr, Handler: s} -// nazalog.Infof("start hls server listen. addr=%s", s.addr) -// return -//} -// -//func (s *Server) RunLoop() error { -// return s.httpSrv.Serve(s.ln) -//} -// -//func (s *Server) Dispose() { -// if err := s.httpSrv.Close(); err != nil { -// nazalog.Error(err) -// } -//} - func (s *ServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + urlCtx, err := base.ParseUrl(base.ParseHttpRequest(req), 80) + if err != nil { + nazalog.Errorf("parse url. err=%+v", err) + return + } + s.ServeHTTPWithUrlCtx(resp, urlCtx) +} + +func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, urlCtx base.UrlContext) { //nazalog.Debugf("%+v", req) // TODO chef: // - check appname in URI path - ri := PathStrategy.GetRequestInfo(req.RequestURI, s.outPath) + filename := urlCtx.LastItemOfPath + filetype := urlCtx.GetFileType() + + ri := PathStrategy.GetRequestInfo(urlCtx, s.outPath) //nazalog.Debugf("%+v", ri) - if ri.FileName == "" || ri.StreamName == "" || ri.FileNameWithPath == "" || (ri.FileType != "m3u8" && ri.FileType != "ts") { - nazalog.Warnf("invalid hls request. uri=%s, request=%+v", req.RequestURI, ri) + if filename == "" || (filetype != "m3u8" && filetype != "ts") || ri.StreamName == "" || ri.FileNameWithPath == "" { + nazalog.Warnf("invalid hls request. url=%+v, request=%+v", urlCtx, ri) resp.WriteHeader(404) return } @@ -71,7 +60,7 @@ func (s *ServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { return } - switch ri.FileType { + switch filetype { case "m3u8": resp.Header().Add("Content-Type", "application/x-mpegurl") resp.Header().Add("Server", base.LalHlsM3u8Server) diff --git a/pkg/innertest/iface_impl.go b/pkg/innertest/iface_impl.go index 3982be3..288e4cc 100644 --- a/pkg/innertest/iface_impl.go +++ b/pkg/innertest/iface_impl.go @@ -28,7 +28,7 @@ import ( // - rtsp是否也应该上层使用Command作为代理,避免生命周期管理混乱 // // server.pub: rtmp(), rtsp -// server.sub: rtmp(), rtsp, flv, ts +// server.sub: rtmp(), rtsp, flv, ts, 还有一个比较特殊的hls // // client.push: rtmp, rtsp // client.pull: rtmp, rtsp, flv diff --git a/pkg/logic/config.go b/pkg/logic/config.go index aedda18..a3e5200 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -21,7 +21,7 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -const ConfVersion = "v0.2.5" +const ConfVersion = "v0.2.6" const ( defaultHlsCleanupMode = hls.CleanupModeInTheEnd @@ -120,10 +120,14 @@ type HttpNotifyConfig struct { } type SimpleAuthConfig struct { - Key string `json:"key"` - PubRtmpEnable bool `json:"pub_rtmp_enable"` - SubRtmpEnable bool `json:"sub_rtmp_enable"` - SubHttpflvEnable bool `json:"sub_httpflv_enable"` + Key string `json:"key"` + DangerousLalSecret string `json:"dangerous_lal_secret"` + PubRtmpEnable bool `json:"pub_rtmp_enable"` + SubRtmpEnable bool `json:"sub_rtmp_enable"` + SubHttpflvEnable bool `json:"sub_httpflv_enable"` + SubHttptsEnable bool `json:"sub_httpts_enable"` + PubRtspEnable bool `json:"pub_rtsp_enable"` + SubRtspEnable bool `json:"sub_rtsp_enable"` } type PprofConfig struct { diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 5bd11d9..1a53eec 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -312,7 +312,8 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error { defer group.mutex.Unlock() if group.hasInSession() { - nazalog.Errorf("[%s] in stream already exist at group. wanna add=%s", group.UniqueKey, session.UniqueKey()) + // TODO(chef): [refactor] 打印in session + nazalog.Errorf("[%s] in stream already exist at group. add=%s", group.UniqueKey, session.UniqueKey()) return base.ErrDupInStream } @@ -341,15 +342,15 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error { } // TODO chef: rtsp package中,增加回调返回值判断,如果是false,将连接关掉 -func (group *Group) AddRtspPubSession(session *rtsp.PubSession) bool { +func (group *Group) AddRtspPubSession(session *rtsp.PubSession) error { nazalog.Debugf("[%s] [%s] add RTSP PubSession into group.", group.UniqueKey, session.UniqueKey()) group.mutex.Lock() defer group.mutex.Unlock() if group.hasInSession() { - nazalog.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey()) - return false + nazalog.Errorf("[%s] in stream already exist at group. wanna add=%s", group.UniqueKey, session.UniqueKey()) + return base.ErrDupInStream } group.rtspPubSession = session @@ -360,7 +361,7 @@ func (group *Group) AddRtspPubSession(session *rtsp.PubSession) bool { }) session.SetObserver(group) - return true + return nil } func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) bool { @@ -462,7 +463,7 @@ func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (o return true, group.sdpCtx.RawSdp } -func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) bool { +func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) { nazalog.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey()) group.mutex.Lock() @@ -473,8 +474,6 @@ func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) bool { } // TODO(chef): rtsp sub也应该判断是否需要静态pull回源 - - return true } func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) { diff --git a/pkg/logic/http_server_handler.go b/pkg/logic/http_server_handler.go index d889121..4c5992b 100644 --- a/pkg/logic/http_server_handler.go +++ b/pkg/logic/http_server_handler.go @@ -9,7 +9,6 @@ package logic import ( - "fmt" "net/http" "strings" @@ -24,13 +23,12 @@ type HttpServerHandlerObserver interface { // // 通知上层有新的拉流者 // - // @return true则允许拉流,false则关闭连接 + // @return nil则允许拉流,不为nil则关闭连接 // OnNewHttpflvSubSession(session *httpflv.SubSession) error - OnDelHttpflvSubSession(session *httpflv.SubSession) - OnNewHttptsSubSession(session *httpts.SubSession) bool + OnNewHttptsSubSession(session *httpts.SubSession) error OnDelHttptsSubSession(session *httpts.SubSession) } @@ -45,15 +43,7 @@ func NewHttpServerHandler(observer HttpServerHandlerObserver) *HttpServerHandler } func (h *HttpServerHandler) ServeSubSession(writer http.ResponseWriter, req *http.Request) { - var scheme string - // TODO(chef) 这里scheme直接使用http和https,没有考虑ws和wss,注意,后续的逻辑可能会依赖此处 - if req.TLS == nil { - scheme = "http" - } else { - scheme = "https" - } - rawUrl := fmt.Sprintf("%s://%s%s", scheme, req.Host, req.RequestURI) - urlCtx, err := base.ParseUrl(rawUrl, 80) + urlCtx, err := base.ParseUrl(base.ParseHttpRequest(req), 80) if err != nil { nazalog.Errorf("parse url. err=%+v", err) return @@ -94,8 +84,10 @@ func (h *HttpServerHandler) ServeSubSession(writer http.ResponseWriter, req *htt if strings.HasSuffix(urlCtx.LastItemOfPath, ".ts") { session := httpts.NewSubSession(conn, urlCtx, isWebSocket, webSocketKey) nazalog.Debugf("[%s] < read http request. url=%s", session.UniqueKey(), session.Url()) - if !h.observer.OnNewHttptsSubSession(session) { - session.Dispose() + if err = h.observer.OnNewHttptsSubSession(session); err != nil { + nazalog.Infof("[%s] dispose by observer. err=%+v", session.UniqueKey(), err) + _ = session.Dispose() + return } err = session.RunLoop() nazalog.Debugf("[%s] httpts sub session loop done. err=%v", session.UniqueKey(), err) diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index ec1eb2e..f717693 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -167,7 +167,7 @@ func (sm *ServerManager) RunLoop() error { if err := addMux(sm.config.HttptsConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpts"); err != nil { return err } - if err := addMux(sm.config.HlsConfig.CommonHttpServerConfig, sm.hlsServerHandler.ServeHTTP, "hls"); err != nil { + if err := addMux(sm.config.HlsConfig.CommonHttpServerConfig, sm.serveHls, "hls"); err != nil { return err } @@ -544,11 +544,9 @@ func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) { sm.option.NotifyHandler.OnSubStop(info) } -func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool { +func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) error { sm.mutex.Lock() defer sm.mutex.Unlock() - group := sm.getOrCreateGroup(session.AppName(), session.StreamName()) - group.AddHttptsSubSession(session) var info base.SubStartInfo info.ServerId = sm.config.ServerId @@ -559,10 +557,21 @@ func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool info.UrlParam = session.RawQuery() info.SessionId = session.UniqueKey() info.RemoteAddr = session.GetStat().RemoteAddr + sm.option.NotifyHandler.OnSubStart(info) + + if err := sm.simpleAuthCtx.OnSubStart(info); err != nil { + return err + } + + group := sm.getOrCreateGroup(session.AppName(), session.StreamName()) + group.AddHttptsSubSession(session) + info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() + sm.option.NotifyHandler.OnSubStart(info) - return true + + return nil } func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) { @@ -599,11 +608,9 @@ func (sm *ServerManager) OnDelRtspSession(session *rtsp.ServerCommandSession) { // TODO chef: impl me } -func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool { +func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) error { sm.mutex.Lock() defer sm.mutex.Unlock() - group := sm.getOrCreateGroup(session.AppName(), session.StreamName()) - res := group.AddRtspPubSession(session) var info base.PubStartInfo info.ServerId = sm.config.ServerId @@ -614,11 +621,21 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool { info.UrlParam = session.RawQuery() info.SessionId = session.UniqueKey() info.RemoteAddr = session.GetStat().RemoteAddr + + if err := sm.simpleAuthCtx.OnPubStart(info); err != nil { + return err + } + + group := sm.getOrCreateGroup(session.AppName(), session.StreamName()) + if err := group.AddRtspPubSession(session); err != nil { + return err + } + info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - sm.option.NotifyHandler.OnPubStart(info) - return res + sm.option.NotifyHandler.OnPubStart(info) + return nil } func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) { @@ -653,12 +670,9 @@ func (sm *ServerManager) OnNewRtspSubSessionDescribe(session *rtsp.SubSession) ( return group.HandleNewRtspSubSessionDescribe(session) } -func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) bool { +func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) error { sm.mutex.Lock() defer sm.mutex.Unlock() - group := sm.getOrCreateGroup(session.AppName(), session.StreamName()) - - res := group.HandleNewRtspSubSessionPlay(session) var info base.SubStartInfo info.ServerId = sm.config.ServerId @@ -669,15 +683,22 @@ func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) bool info.UrlParam = session.RawQuery() info.SessionId = session.UniqueKey() info.RemoteAddr = session.GetStat().RemoteAddr + + if err := sm.simpleAuthCtx.OnSubStart(info); err != nil { + return err + } + + group := sm.getOrCreateGroup(session.AppName(), session.StreamName()) + group.HandleNewRtspSubSessionPlay(session) + info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - sm.option.NotifyHandler.OnSubStart(info) - return res + sm.option.NotifyHandler.OnSubStart(info) + return nil } func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) { - // TODO chef: impl me sm.mutex.Lock() defer sm.mutex.Unlock() group := sm.getGroup(session.AppName(), session.StreamName()) @@ -765,6 +786,22 @@ func (sm *ServerManager) getGroup(appName string, streamName string) *Group { return sm.groupManager.GetGroup(appName, streamName) } +func (sm *ServerManager) serveHls(writer http.ResponseWriter, req *http.Request) { + urlCtx, err := base.ParseUrl(base.ParseHttpRequest(req), 80) + if err != nil { + nazalog.Errorf("parse url. err=%+v", err) + return + } + if urlCtx.GetFileType() == "m3u8" { + if err = sm.simpleAuthCtx.OnHls(urlCtx.GetFilenameWithoutType(), urlCtx.RawQuery); err != nil { + nazalog.Errorf("simple auth failed. err=%+v", err) + return + } + } + + sm.hlsServerHandler.ServeHTTP(writer, req) +} + // --------------------------------------------------------------------------------------------------------------------- func runWebPprof(addr string) { diff --git a/pkg/logic/simple_auth.go b/pkg/logic/simple_auth.go index 19cd7cd..84d8ba6 100644 --- a/pkg/logic/simple_auth.go +++ b/pkg/logic/simple_auth.go @@ -10,6 +10,7 @@ package logic import ( "net/url" + "strings" "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/nazamd5" @@ -41,7 +42,8 @@ func NewSimpleAuthCtx(config SimpleAuthConfig) *SimpleAuthCtx { } func (s *SimpleAuthCtx) OnPubStart(info base.PubStartInfo) error { - if s.config.PubRtmpEnable && info.Protocol == base.ProtocolRtmp { + if s.config.PubRtmpEnable && info.Protocol == base.ProtocolRtmp || + s.config.PubRtspEnable && info.Protocol == base.ProtocolRtsp { return s.check(info.StreamName, info.UrlParam) } @@ -50,13 +52,19 @@ func (s *SimpleAuthCtx) OnPubStart(info base.PubStartInfo) error { func (s *SimpleAuthCtx) OnSubStart(info base.SubStartInfo) error { if (s.config.SubRtmpEnable && info.Protocol == base.ProtocolRtmp) || - (s.config.SubHttpflvEnable && info.Protocol == base.ProtocolHttpflv) { + (s.config.SubHttpflvEnable && info.Protocol == base.ProtocolHttpflv) || + (s.config.SubHttptsEnable && info.Protocol == base.ProtocolHttpts) || + (s.config.SubRtspEnable && info.Protocol == base.ProtocolRtsp) { return s.check(info.StreamName, info.UrlParam) } return nil } +func (s *SimpleAuthCtx) OnHls(streamName string, urlParam string) error { + return s.check(streamName, urlParam) +} + func (s *SimpleAuthCtx) check(streamName string, urlParam string) error { q, err := url.ParseQuery(urlParam) if err != nil { @@ -66,8 +74,9 @@ func (s *SimpleAuthCtx) check(streamName string, urlParam string) error { if v == "" { return base.ErrSimpleAuthParamNotFound } + v = strings.ToLower(v) se := SimpleAuthCalcSecret(s.config.Key, streamName) - if se != v { + if v != s.config.DangerousLalSecret && v != se { return base.ErrSimpleAuthFailed } return nil diff --git a/pkg/logic/simple_auth_test.go b/pkg/logic/simple_auth_test.go index 5ab09c5..1093963 100644 --- a/pkg/logic/simple_auth_test.go +++ b/pkg/logic/simple_auth_test.go @@ -22,13 +22,35 @@ func TestSimpleAuthCalcSecret(t *testing.T) { func TestSimpleAuthCtx(t *testing.T) { ctx := NewSimpleAuthCtx(SimpleAuthConfig{ - Key: "q191201771", - PubRtmpEnable: true, + Key: "q191201771", + DangerousLalSecret: "pengrl", + PubRtmpEnable: true, }) var info base.PubStartInfo info.Protocol = base.ProtocolRtmp info.StreamName = "test110" + info.UrlParam = "lal_secret=700997e1595a06c9ffa60ebef79105b0" res := ctx.OnPubStart(info) assert.Equal(t, nil, res) + + // 测试大写MD5 + info.UrlParam = "lal_secret=700997E1595A06C9FFA60EBEF79105B0" + res = ctx.OnPubStart(info) + assert.Equal(t, nil, res) + + // 测试DangerousLalSecret + info.UrlParam = "lal_secret=pengrl" + res = ctx.OnPubStart(info) + assert.Equal(t, nil, res) + + // 测试失败1 缺少lal_secret + info.UrlParam = "" + res = ctx.OnPubStart(info) + assert.Equal(t, base.ErrSimpleAuthParamNotFound, res) + + // 测试失败2 lal_secret值无效 + info.UrlParam = "lal_secret=invalid" + res = ctx.OnPubStart(info) + assert.Equal(t, base.ErrSimpleAuthFailed, res) } diff --git a/pkg/rtsp/server.go b/pkg/rtsp/server.go index 31e6a56..d88428a 100644 --- a/pkg/rtsp/server.go +++ b/pkg/rtsp/server.go @@ -23,9 +23,12 @@ type ServerObserver interface { /////////////////////////////////////////////////////////////////////////// + // OnNewRtspPubSession + // // @brief Announce阶段回调 - // @return 如果返回false,则表示上层要强制关闭这个推流请求 - OnNewRtspPubSession(session *PubSession) bool + // @return 如果返回非nil,则表示上层要强制关闭这个推流请求 + // + OnNewRtspPubSession(session *PubSession) error OnDelRtspPubSession(session *PubSession) @@ -38,9 +41,12 @@ type ServerObserver interface { // OnNewRtspSubSessionDescribe(session *SubSession) (ok bool, sdp []byte) - // @brief Describe阶段回调 - // @return ok 如果返回false,则表示上层要强制关闭这个拉流请求 - OnNewRtspSubSessionPlay(session *SubSession) bool + // OnNewRtspSubSessionPlay + // + // @brief Play阶段回调 + // @return ok 如果返回非nil,则表示上层要强制关闭这个拉流请求 + // + OnNewRtspSubSessionPlay(session *SubSession) error OnDelRtspSubSession(session *SubSession) } @@ -87,31 +93,30 @@ func (s *Server) Dispose() { } } -// ServerCommandSessionObserver -func (s *Server) OnNewRtspPubSession(session *PubSession) bool { +// ----- ServerCommandSessionObserver ---------------------------------------------------------------------------------- + +func (s *Server) OnNewRtspPubSession(session *PubSession) error { return s.observer.OnNewRtspPubSession(session) } -// ServerCommandSessionObserver func (s *Server) OnNewRtspSubSessionDescribe(session *SubSession) (ok bool, sdp []byte) { return s.observer.OnNewRtspSubSessionDescribe(session) } -// ServerCommandSessionObserver -func (s *Server) OnNewRtspSubSessionPlay(session *SubSession) bool { +func (s *Server) OnNewRtspSubSessionPlay(session *SubSession) error { return s.observer.OnNewRtspSubSessionPlay(session) } -// ServerCommandSessionObserver func (s *Server) OnDelRtspPubSession(session *PubSession) { s.observer.OnDelRtspPubSession(session) } -// ServerCommandSessionObserver func (s *Server) OnDelRtspSubSession(session *SubSession) { s.observer.OnDelRtspSubSession(session) } +// --------------------------------------------------------------------------------------------------------------------- + func (s *Server) handleTcpConnect(conn net.Conn) { session := NewServerCommandSession(s, conn) s.observer.OnNewRtspSessionConnect(session) diff --git a/pkg/rtsp/server_command_session.go b/pkg/rtsp/server_command_session.go index 7c17d00..f66458e 100644 --- a/pkg/rtsp/server_command_session.go +++ b/pkg/rtsp/server_command_session.go @@ -28,9 +28,9 @@ type ServerCommandSessionObserver interface { // OnNewRtspPubSession // // @brief Announce阶段回调 - // @return 如果返回false,则表示上层要强制关闭这个推流请求 + // @return 如果返回非nil,则表示上层要强制关闭这个推流请求 // - OnNewRtspPubSession(session *PubSession) bool + OnNewRtspPubSession(session *PubSession) error // OnNewRtspSubSessionDescribe // @@ -42,10 +42,10 @@ type ServerCommandSessionObserver interface { // OnNewRtspSubSessionPlay // - // @brief Describe阶段回调 - // @return ok 如果返回false,则表示上层要强制关闭这个拉流请求 + // @brief Play阶段回调 + // @return ok 如果返回非nil,则表示上层要强制关闭这个拉流请求 // - OnNewRtspSubSessionPlay(session *SubSession) bool + OnNewRtspSubSessionPlay(session *SubSession) error } type ServerCommandSession struct { @@ -237,8 +237,8 @@ func (session *ServerCommandSession) handleAnnounce(requestCtx nazahttp.HttpReqM nazalog.Infof("[%s] link new PubSession. [%s]", session.uniqueKey, session.pubSession.uniqueKey) session.pubSession.InitWithSdp(sdpCtx) - if ok := session.observer.OnNewRtspPubSession(session.pubSession); !ok { - return base.ErrRtspClosedByObserver + if err = session.observer.OnNewRtspPubSession(session.pubSession); err != nil { + return err } resp := PackResponseAnnounce(requestCtx.Headers.Get(HeaderCSeq)) @@ -350,8 +350,9 @@ func (session *ServerCommandSession) handleRecord(requestCtx nazahttp.HttpReqMsg func (session *ServerCommandSession) handlePlay(requestCtx nazahttp.HttpReqMsgCtx) error { nazalog.Infof("[%s] < R PLAY", session.uniqueKey) - if ok := session.observer.OnNewRtspSubSessionPlay(session.subSession); !ok { - return base.ErrRtspClosedByObserver + // TODO(chef): [opt] 上层关闭,可以考虑回复非200状态码再关闭 + if err := session.observer.OnNewRtspSubSessionPlay(session.subSession); err != nil { + return err } resp := PackResponsePlay(requestCtx.Headers.Get(HeaderCSeq)) _, err := session.conn.Write([]byte(resp))