- [feat] simple auth增加支持rtsp,httpts,hls协议 - [feat] simple auth鉴权可配置后门鉴权参数 - [opt] simple auth鉴权参数的md5值兼容大小写

pull/130/head
q191201771 3 years ago
parent 20d5d56690
commit f16b1f8d4f

@ -1,14 +1,17 @@
# lalserver 配置文件说明
```
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief", //. 配置文件对应的文档说明链接,在程序中没实际用途
"conf_version": "0.2.3", //. 配置文件版本号,业务方不应该手动修改,程序中会检查该版本
"conf_version": "0.2.6", //. 配置文件版本号,业务方不应该手动修改,程序中会检查该版本
// 号是否与代码中声明的一致
"rtmp": {
"enable": true, //. 是否开启rtmp服务的监听
// 注意配置文件中控制各协议类型的enable开关都应该按需打开避免造成不必要的协议转换的开销
"addr": ":19350", //. RTMP服务监听的端口客户端向lalserver推拉流都是这个地址
"gop_num": 2, //. RTMP拉流的GOP缓存数量加速流打开时间但是可能增加延时
"merge_write_size": 8192, //. 将小包数据合并进行发送,单位字节,提高服务器性能,但是可能造成卡顿
"addr": ":1935", //. RTMP服务监听的端口客户端向lalserver推拉流都是这个地址
"gop_num": 0, //. RTMP拉流的GOP缓存数量加速流打开时间但是可能增加延时
//. 如果为0则不使用缓存发送
"merge_write_size": 0, //. 将小包数据合并进行发送,单位字节,提高服务器性能,但是可能造成卡顿
// 如果为0则不合并发送
"add_dummy_audio_enable": false, //. 是否开启动态检测添加静音AAC数据的功能
// 如果开启rtmp pub推流时如果超过`add_dummy_audio_wait_audio_ms`时间依然没有
@ -28,7 +31,7 @@
"enable_https": true, //. 是否开启HTTPS-FLV监听
"url_pattern": "/", //. 拉流url路由路径地址。默认值为`/`,表示不受限制,路由地址可以为任意路径地址。
// 如果设置为`/live/`,则只能从`/live/`路径下拉流,比如`/live/test110.flv`
"gop_num": 2 //.
"gop_num": 0 //. 见rtmp.gop_num
},
"hls": {
"enable": true, //. 是否开启HLS服务的监听
@ -94,6 +97,17 @@
"on_sub_stop": "http://127.0.0.1:10101/on_sub_stop",
"on_rtmp_connect": "http://127.0.0.1:10101/on_rtmp_connect"
},
"simple_auth": { // 鉴权文档见: https://pengrl.com/lal/#/auth
"key": "q191201771", // 私有key计算md5鉴权参数时使用
"dangerous_lal_secret": "pengrl", // 后门鉴权参数,所有的流可通过该参数值鉴权
"pub_rtmp_enable": false, // rtmp推流是否开启鉴权true为开启鉴权false为不开启鉴权
"sub_rtmp_enable": false, // rtmp拉流是否开启鉴权
"sub_httpflv_enable": false, // httpflv拉流是否开启鉴权
"sub_httpts_enable": false, // httpts拉流是否开启鉴权
"pub_rtsp_enable": false, // rtsp推流是否开启鉴权
"sub_rtsp_enable": false, // rtsp拉流是否开启鉴权
"hls_m3u8_enable": true // m3u8拉流是否开启鉴权
},
"pprof": {
"enable": true, //. 是否开启Go pprof web服务的监听
"addr": ":8084" //. Go pprof web地址

@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.2.5",
"conf_version": "v0.2.6",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -73,9 +73,14 @@
},
"simple_auth": {
"key": "q191201771",
"dangerous_lal_secret": "pengrl",
"pub_rtmp_enable": false,
"sub_rtmp_enable": false,
"sub_httpflv_enable": false
"sub_httpflv_enable": false,
"sub_httpts_enable": false,
"pub_rtsp_enable": false,
"sub_rtsp_enable": false,
"hls_m3u8_enable": false
},
"pprof": {
"enable": true,

@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.2.5",
"conf_version": "v0.2.6",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -73,9 +73,14 @@
},
"simple_auth": {
"key": "q191201771",
"dangerous_lal_secret": "pengrl",
"pub_rtmp_enable": false,
"sub_rtmp_enable": false,
"sub_httpflv_enable": false
"sub_httpflv_enable": false,
"sub_httpts_enable": false,
"pub_rtsp_enable": false,
"sub_rtsp_enable": false,
"hls_m3u8_enable": false
},
"pprof": {
"enable": true,

@ -11,6 +11,7 @@ package base
import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
@ -54,8 +55,33 @@ type UrlContext struct {
RawQuery string
RawUrlWithoutUserInfo string
filenameWithoutType string
fileType string
}
func (u *UrlContext) GetFilenameWithoutType() string {
u.calcFilenameAndTypeIfNeeded()
return u.filenameWithoutType
}
func (u *UrlContext) GetFileType() string {
u.calcFilenameAndTypeIfNeeded()
return u.fileType
}
func (u *UrlContext) calcFilenameAndTypeIfNeeded() {
if len(u.filenameWithoutType) == 0 || len(u.fileType) == 0 {
ss := strings.Split(u.LastItemOfPath, ".")
u.filenameWithoutType = ss[0]
if len(ss) > 1 {
u.fileType = ss[1]
}
}
}
// ---------------------------------------------------------------------------------------------------------------------
// ParseUrl
//
// @param defaultPort: 注意如果rawUrl中显示指定了端口则该参数不生效
@ -127,6 +153,8 @@ func ParseUrl(rawUrl string, defaultPort int) (ctx UrlContext, err error) {
return ctx, nil
}
// ---------------------------------------------------------------------------------------------------------------------
func ParseRtmpUrl(rawUrl string) (ctx UrlContext, err error) {
ctx, err = ParseUrl(rawUrl, -1)
if err != nil {
@ -147,10 +175,6 @@ func ParseRtmpUrl(rawUrl string) (ctx UrlContext, err error) {
return
}
func ParseHttpflvUrl(rawUrl string) (ctx UrlContext, err error) {
return ParseHttpUrl(rawUrl, ".flv")
}
func ParseRtspUrl(rawUrl string) (ctx UrlContext, err error) {
ctx, err = ParseUrl(rawUrl, -1)
if err != nil {
@ -163,6 +187,29 @@ func ParseRtspUrl(rawUrl string) (ctx UrlContext, err error) {
return
}
func ParseHttpflvUrl(rawUrl string) (ctx UrlContext, err error) {
return parseHttpUrl(rawUrl, ".flv")
}
// ---------------------------------------------------------------------------------------------------------------------
// ParseHttpRequest
//
// @return 完整url
//
func ParseHttpRequest(req *http.Request) string {
// TODO(chef): [refactor] scheme是否能从从req.URL.Scheme获取
var scheme string
if req.TLS == nil {
scheme = "http"
} else {
scheme = "https"
}
return fmt.Sprintf("%s://%s%s", scheme, req.Host, req.RequestURI)
}
// ----- private -------------------------------------------------------------------------------------------------------
func parseUrlPath(stdUrl *url.URL) (ctx UrlPathContext, err error) {
ctx.Path = stdUrl.Path
@ -194,12 +241,12 @@ func parseUrlPath(stdUrl *url.URL) (ctx UrlPathContext, err error) {
return ctx, nil
}
func ParseHttpUrl(rawUrl string, suffix string) (ctx UrlContext, err error) {
func parseHttpUrl(rawUrl string, filetype string) (ctx UrlContext, err error) {
ctx, err = ParseUrl(rawUrl, -1)
if err != nil {
return
}
if (ctx.Scheme != "http" && ctx.Scheme != "https") || ctx.Host == "" || ctx.Path == "" || !strings.HasSuffix(ctx.LastItemOfPath, suffix) {
if (ctx.Scheme != "http" && ctx.Scheme != "https") || ctx.Host == "" || ctx.Path == "" || !strings.HasSuffix(ctx.LastItemOfPath, filetype) {
return ctx, fmt.Errorf("%w. url=%s", ErrInvalidUrl, rawUrl)
}

@ -12,6 +12,8 @@ import (
"fmt"
"path/filepath"
"strings"
"github.com/q191201771/lal/pkg/base"
)
// 聚合以下功能:
@ -19,11 +21,8 @@ import (
// - 路由策略: HTTP请求HLS时request URI和文件路径的映射规则
type RequestInfo struct {
FileName string
FileType string
StreamName string
FileNameWithPath string
StreamName string // uri结合策略
FileNameWithPath string // uri结合策略, 从磁盘打开文件时使用
}
type IPathStrategy interface {
@ -31,11 +30,17 @@ type IPathStrategy interface {
IPathWriteStrategy
}
// IPathRequestStrategy
//
// 路由策略
// 接到HTTP请求时对应文件路径的映射逻辑
//
type IPathRequestStrategy interface {
// 解析HTTP请求得到文件名、文件类型、流名称、文件所在路径
GetRequestInfo(uri string, rootOutPath string) RequestInfo
// GetRequestInfo
//
// 解析HTTP请求得到流名称、文件所在路径
//
GetRequestInfo(urlCtx base.UrlContext, rootOutPath string) RequestInfo
}
// 落盘策略
@ -99,6 +104,8 @@ const (
type DefaultPathStrategy struct {
}
// GetRequestInfo
//
// RequestURI example:
// uri -> FileName StreamName FileType FileNameWithPath
// /hls/test110.m3u8 -> test110.m3u8 test110 m3u8 {rootOutPath}/test110/playlist.m3u8
@ -106,24 +113,24 @@ type DefaultPathStrategy struct {
// /hls/test110/record.m3u8 -> record.m3u8 test110 m3u8 {rootOutPath}/test110/record.m3u8
// /hls/test110/test110-1620540712084-.ts -> test110-1620540712084-.ts test110 ts {rootOutPath/test110/test110-1620540712084-.ts
// /hls/test110-1620540712084-.ts -> test110-1620540712084-.ts test110 ts {rootOutPath/test110/test110-1620540712084-.ts
func (dps *DefaultPathStrategy) GetRequestInfo(uri string, rootOutPath string) (ri RequestInfo) {
uriItems := strings.Split(uri, "/")
ri.FileName = uriItems[len(uriItems)-1]
fileNameItems := strings.Split(ri.FileName, ".")
fileNameWithOutType := fileNameItems[0]
ri.FileType = fileNameItems[len(fileNameItems)-1]
if ri.FileType == "m3u8" {
if ri.FileName == playlistM3u8FileName || ri.FileName == recordM3u8FileName {
//
func (dps *DefaultPathStrategy) GetRequestInfo(urlCtx base.UrlContext, rootOutPath string) (ri RequestInfo) {
filename := urlCtx.LastItemOfPath
filetype := urlCtx.GetFileType()
fileNameWithoutType := urlCtx.GetFilenameWithoutType()
if filetype == "m3u8" {
if filename == playlistM3u8FileName || filename == recordM3u8FileName {
uriItems := strings.Split(urlCtx.Path, "/")
ri.StreamName = uriItems[len(uriItems)-2]
ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, ri.FileName)
ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, filename)
} else {
ri.StreamName = fileNameWithOutType
ri.StreamName = fileNameWithoutType
ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, playlistM3u8FileName)
}
} else if ri.FileType == "ts" {
ri.StreamName = dps.getStreamNameFromTsFileName(ri.FileName)
ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, ri.FileName)
} else if filetype == "ts" {
ri.StreamName = dps.getStreamNameFromTsFileName(filename)
ri.FileNameWithPath = filepath.Join(rootOutPath, ri.StreamName, filename)
}
return

@ -11,6 +11,10 @@ package hls_test
import (
"testing"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/assert"
)
@ -20,40 +24,32 @@ func TestDefaultPathStrategy_GetRequestInfo(t *testing.T) {
rootOutPath := "/tmp/lal/hls/"
golden := map[string]hls.RequestInfo{
"/hls/test110.m3u8": {
FileName: "test110.m3u8",
FileType: "m3u8",
"http://127.0.0.1:8080/hls/test110.m3u8": {
StreamName: "test110",
FileNameWithPath: "/tmp/lal/hls/test110/playlist.m3u8",
},
"/hls/test110/playlist.m3u8": {
FileName: "playlist.m3u8",
FileType: "m3u8",
"http://127.0.0.1:8080/hls/test110/playlist.m3u8": {
StreamName: "test110",
FileNameWithPath: "/tmp/lal/hls/test110/playlist.m3u8",
},
"/hls/test110/record.m3u8": {
FileName: "record.m3u8",
FileType: "m3u8",
"http://127.0.0.1:8080/hls/test110/record.m3u8": {
StreamName: "test110",
FileNameWithPath: "/tmp/lal/hls/test110/record.m3u8",
},
"/hls/test110/test110-1620540712084-0.ts": {
FileName: "test110-1620540712084-0.ts",
FileType: "ts",
"http://127.0.0.1:8080/hls/test110/test110-1620540712084-0.ts": {
StreamName: "test110",
FileNameWithPath: "/tmp/lal/hls/test110/test110-1620540712084-0.ts",
},
"/hls/test110-1620540712084-0.ts": {
FileName: "test110-1620540712084-0.ts",
FileType: "ts",
"http://127.0.0.1:8080/hls/test110-1620540712084-0.ts": {
StreamName: "test110",
FileNameWithPath: "/tmp/lal/hls/test110/test110-1620540712084-0.ts",
},
}
for k, v := range golden {
out := dps.GetRequestInfo(k, rootOutPath)
ctx, err := base.ParseUrl(k, -1)
nazalog.Assert(nil, err)
out := dps.GetRequestInfo(ctx, rootOutPath)
assert.Equal(t, v, out)
}
}

@ -18,9 +18,6 @@ import (
type ServerHandler struct {
outPath string
//addr string
//ln net.Listener
//httpSrv *http.Server
}
func NewServerHandler(outPath string) *ServerHandler {
@ -29,37 +26,29 @@ func NewServerHandler(outPath string) *ServerHandler {
}
}
//
//func (s *Server) Listen() (err error) {
// if s.ln, err = net.Listen("tcp", s.addr); err != nil {
// return
// }
// s.httpSrv = &http.Server{Addr: s.addr, Handler: s}
// nazalog.Infof("start hls server listen. addr=%s", s.addr)
// return
//}
//
//func (s *Server) RunLoop() error {
// return s.httpSrv.Serve(s.ln)
//}
//
//func (s *Server) Dispose() {
// if err := s.httpSrv.Close(); err != nil {
// nazalog.Error(err)
// }
//}
func (s *ServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
urlCtx, err := base.ParseUrl(base.ParseHttpRequest(req), 80)
if err != nil {
nazalog.Errorf("parse url. err=%+v", err)
return
}
s.ServeHTTPWithUrlCtx(resp, urlCtx)
}
func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, urlCtx base.UrlContext) {
//nazalog.Debugf("%+v", req)
// TODO chef:
// - check appname in URI path
ri := PathStrategy.GetRequestInfo(req.RequestURI, s.outPath)
filename := urlCtx.LastItemOfPath
filetype := urlCtx.GetFileType()
ri := PathStrategy.GetRequestInfo(urlCtx, s.outPath)
//nazalog.Debugf("%+v", ri)
if ri.FileName == "" || ri.StreamName == "" || ri.FileNameWithPath == "" || (ri.FileType != "m3u8" && ri.FileType != "ts") {
nazalog.Warnf("invalid hls request. uri=%s, request=%+v", req.RequestURI, ri)
if filename == "" || (filetype != "m3u8" && filetype != "ts") || ri.StreamName == "" || ri.FileNameWithPath == "" {
nazalog.Warnf("invalid hls request. url=%+v, request=%+v", urlCtx, ri)
resp.WriteHeader(404)
return
}
@ -71,7 +60,7 @@ func (s *ServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
return
}
switch ri.FileType {
switch filetype {
case "m3u8":
resp.Header().Add("Content-Type", "application/x-mpegurl")
resp.Header().Add("Server", base.LalHlsM3u8Server)

@ -28,7 +28,7 @@ import (
// - rtsp是否也应该上层使用Command作为代理避免生命周期管理混乱
//
// server.pub: rtmp(), rtsp
// server.sub: rtmp(), rtsp, flv, ts
// server.sub: rtmp(), rtsp, flv, ts, 还有一个比较特殊的hls
//
// client.push: rtmp, rtsp
// client.pull: rtmp, rtsp, flv

@ -21,7 +21,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
const ConfVersion = "v0.2.5"
const ConfVersion = "v0.2.6"
const (
defaultHlsCleanupMode = hls.CleanupModeInTheEnd
@ -120,10 +120,14 @@ type HttpNotifyConfig struct {
}
type SimpleAuthConfig struct {
Key string `json:"key"`
PubRtmpEnable bool `json:"pub_rtmp_enable"`
SubRtmpEnable bool `json:"sub_rtmp_enable"`
SubHttpflvEnable bool `json:"sub_httpflv_enable"`
Key string `json:"key"`
DangerousLalSecret string `json:"dangerous_lal_secret"`
PubRtmpEnable bool `json:"pub_rtmp_enable"`
SubRtmpEnable bool `json:"sub_rtmp_enable"`
SubHttpflvEnable bool `json:"sub_httpflv_enable"`
SubHttptsEnable bool `json:"sub_httpts_enable"`
PubRtspEnable bool `json:"pub_rtsp_enable"`
SubRtspEnable bool `json:"sub_rtsp_enable"`
}
type PprofConfig struct {

@ -312,7 +312,8 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error {
defer group.mutex.Unlock()
if group.hasInSession() {
nazalog.Errorf("[%s] in stream already exist at group. wanna add=%s", group.UniqueKey, session.UniqueKey())
// TODO(chef): [refactor] 打印in session
nazalog.Errorf("[%s] in stream already exist at group. add=%s", group.UniqueKey, session.UniqueKey())
return base.ErrDupInStream
}
@ -341,15 +342,15 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error {
}
// TODO chef: rtsp package中增加回调返回值判断如果是false将连接关掉
func (group *Group) AddRtspPubSession(session *rtsp.PubSession) bool {
func (group *Group) AddRtspPubSession(session *rtsp.PubSession) error {
nazalog.Debugf("[%s] [%s] add RTSP PubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.hasInSession() {
nazalog.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey())
return false
nazalog.Errorf("[%s] in stream already exist at group. wanna add=%s", group.UniqueKey, session.UniqueKey())
return base.ErrDupInStream
}
group.rtspPubSession = session
@ -360,7 +361,7 @@ func (group *Group) AddRtspPubSession(session *rtsp.PubSession) bool {
})
session.SetObserver(group)
return true
return nil
}
func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) bool {
@ -462,7 +463,7 @@ func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (o
return true, group.sdpCtx.RawSdp
}
func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) bool {
func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) {
nazalog.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
@ -473,8 +474,6 @@ func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) bool {
}
// TODO(chef): rtsp sub也应该判断是否需要静态pull回源
return true
}
func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) {

@ -9,7 +9,6 @@
package logic
import (
"fmt"
"net/http"
"strings"
@ -24,13 +23,12 @@ type HttpServerHandlerObserver interface {
//
// 通知上层有新的拉流者
//
// @return true则允许拉流false则关闭连接
// @return nil则允许拉流不为nil则关闭连接
//
OnNewHttpflvSubSession(session *httpflv.SubSession) error
OnDelHttpflvSubSession(session *httpflv.SubSession)
OnNewHttptsSubSession(session *httpts.SubSession) bool
OnNewHttptsSubSession(session *httpts.SubSession) error
OnDelHttptsSubSession(session *httpts.SubSession)
}
@ -45,15 +43,7 @@ func NewHttpServerHandler(observer HttpServerHandlerObserver) *HttpServerHandler
}
func (h *HttpServerHandler) ServeSubSession(writer http.ResponseWriter, req *http.Request) {
var scheme string
// TODO(chef) 这里scheme直接使用http和https没有考虑ws和wss注意后续的逻辑可能会依赖此处
if req.TLS == nil {
scheme = "http"
} else {
scheme = "https"
}
rawUrl := fmt.Sprintf("%s://%s%s", scheme, req.Host, req.RequestURI)
urlCtx, err := base.ParseUrl(rawUrl, 80)
urlCtx, err := base.ParseUrl(base.ParseHttpRequest(req), 80)
if err != nil {
nazalog.Errorf("parse url. err=%+v", err)
return
@ -94,8 +84,10 @@ func (h *HttpServerHandler) ServeSubSession(writer http.ResponseWriter, req *htt
if strings.HasSuffix(urlCtx.LastItemOfPath, ".ts") {
session := httpts.NewSubSession(conn, urlCtx, isWebSocket, webSocketKey)
nazalog.Debugf("[%s] < read http request. url=%s", session.UniqueKey(), session.Url())
if !h.observer.OnNewHttptsSubSession(session) {
session.Dispose()
if err = h.observer.OnNewHttptsSubSession(session); err != nil {
nazalog.Infof("[%s] dispose by observer. err=%+v", session.UniqueKey(), err)
_ = session.Dispose()
return
}
err = session.RunLoop()
nazalog.Debugf("[%s] httpts sub session loop done. err=%v", session.UniqueKey(), err)

@ -167,7 +167,7 @@ func (sm *ServerManager) RunLoop() error {
if err := addMux(sm.config.HttptsConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpts"); err != nil {
return err
}
if err := addMux(sm.config.HlsConfig.CommonHttpServerConfig, sm.hlsServerHandler.ServeHTTP, "hls"); err != nil {
if err := addMux(sm.config.HlsConfig.CommonHttpServerConfig, sm.serveHls, "hls"); err != nil {
return err
}
@ -544,11 +544,9 @@ func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) {
sm.option.NotifyHandler.OnSubStop(info)
}
func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool {
func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddHttptsSubSession(session)
var info base.SubStartInfo
info.ServerId = sm.config.ServerId
@ -559,10 +557,21 @@ func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool
info.UrlParam = session.RawQuery()
info.SessionId = session.UniqueKey()
info.RemoteAddr = session.GetStat().RemoteAddr
sm.option.NotifyHandler.OnSubStart(info)
if err := sm.simpleAuthCtx.OnSubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddHttptsSubSession(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnSubStart(info)
return true
return nil
}
func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) {
@ -599,11 +608,9 @@ func (sm *ServerManager) OnDelRtspSession(session *rtsp.ServerCommandSession) {
// TODO chef: impl me
}
func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool {
func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
res := group.AddRtspPubSession(session)
var info base.PubStartInfo
info.ServerId = sm.config.ServerId
@ -614,11 +621,21 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool {
info.UrlParam = session.RawQuery()
info.SessionId = session.UniqueKey()
info.RemoteAddr = session.GetStat().RemoteAddr
if err := sm.simpleAuthCtx.OnPubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
if err := group.AddRtspPubSession(session); err != nil {
return err
}
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnPubStart(info)
return res
sm.option.NotifyHandler.OnPubStart(info)
return nil
}
func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) {
@ -653,12 +670,9 @@ func (sm *ServerManager) OnNewRtspSubSessionDescribe(session *rtsp.SubSession) (
return group.HandleNewRtspSubSessionDescribe(session)
}
func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) bool {
func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
res := group.HandleNewRtspSubSessionPlay(session)
var info base.SubStartInfo
info.ServerId = sm.config.ServerId
@ -669,15 +683,22 @@ func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) bool
info.UrlParam = session.RawQuery()
info.SessionId = session.UniqueKey()
info.RemoteAddr = session.GetStat().RemoteAddr
if err := sm.simpleAuthCtx.OnSubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.HandleNewRtspSubSessionPlay(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnSubStart(info)
return res
sm.option.NotifyHandler.OnSubStart(info)
return nil
}
func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) {
// TODO chef: impl me
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
@ -765,6 +786,22 @@ func (sm *ServerManager) getGroup(appName string, streamName string) *Group {
return sm.groupManager.GetGroup(appName, streamName)
}
func (sm *ServerManager) serveHls(writer http.ResponseWriter, req *http.Request) {
urlCtx, err := base.ParseUrl(base.ParseHttpRequest(req), 80)
if err != nil {
nazalog.Errorf("parse url. err=%+v", err)
return
}
if urlCtx.GetFileType() == "m3u8" {
if err = sm.simpleAuthCtx.OnHls(urlCtx.GetFilenameWithoutType(), urlCtx.RawQuery); err != nil {
nazalog.Errorf("simple auth failed. err=%+v", err)
return
}
}
sm.hlsServerHandler.ServeHTTP(writer, req)
}
// ---------------------------------------------------------------------------------------------------------------------
func runWebPprof(addr string) {

@ -10,6 +10,7 @@ package logic
import (
"net/url"
"strings"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazamd5"
@ -41,7 +42,8 @@ func NewSimpleAuthCtx(config SimpleAuthConfig) *SimpleAuthCtx {
}
func (s *SimpleAuthCtx) OnPubStart(info base.PubStartInfo) error {
if s.config.PubRtmpEnable && info.Protocol == base.ProtocolRtmp {
if s.config.PubRtmpEnable && info.Protocol == base.ProtocolRtmp ||
s.config.PubRtspEnable && info.Protocol == base.ProtocolRtsp {
return s.check(info.StreamName, info.UrlParam)
}
@ -50,13 +52,19 @@ func (s *SimpleAuthCtx) OnPubStart(info base.PubStartInfo) error {
func (s *SimpleAuthCtx) OnSubStart(info base.SubStartInfo) error {
if (s.config.SubRtmpEnable && info.Protocol == base.ProtocolRtmp) ||
(s.config.SubHttpflvEnable && info.Protocol == base.ProtocolHttpflv) {
(s.config.SubHttpflvEnable && info.Protocol == base.ProtocolHttpflv) ||
(s.config.SubHttptsEnable && info.Protocol == base.ProtocolHttpts) ||
(s.config.SubRtspEnable && info.Protocol == base.ProtocolRtsp) {
return s.check(info.StreamName, info.UrlParam)
}
return nil
}
func (s *SimpleAuthCtx) OnHls(streamName string, urlParam string) error {
return s.check(streamName, urlParam)
}
func (s *SimpleAuthCtx) check(streamName string, urlParam string) error {
q, err := url.ParseQuery(urlParam)
if err != nil {
@ -66,8 +74,9 @@ func (s *SimpleAuthCtx) check(streamName string, urlParam string) error {
if v == "" {
return base.ErrSimpleAuthParamNotFound
}
v = strings.ToLower(v)
se := SimpleAuthCalcSecret(s.config.Key, streamName)
if se != v {
if v != s.config.DangerousLalSecret && v != se {
return base.ErrSimpleAuthFailed
}
return nil

@ -22,13 +22,35 @@ func TestSimpleAuthCalcSecret(t *testing.T) {
func TestSimpleAuthCtx(t *testing.T) {
ctx := NewSimpleAuthCtx(SimpleAuthConfig{
Key: "q191201771",
PubRtmpEnable: true,
Key: "q191201771",
DangerousLalSecret: "pengrl",
PubRtmpEnable: true,
})
var info base.PubStartInfo
info.Protocol = base.ProtocolRtmp
info.StreamName = "test110"
info.UrlParam = "lal_secret=700997e1595a06c9ffa60ebef79105b0"
res := ctx.OnPubStart(info)
assert.Equal(t, nil, res)
// 测试大写MD5
info.UrlParam = "lal_secret=700997E1595A06C9FFA60EBEF79105B0"
res = ctx.OnPubStart(info)
assert.Equal(t, nil, res)
// 测试DangerousLalSecret
info.UrlParam = "lal_secret=pengrl"
res = ctx.OnPubStart(info)
assert.Equal(t, nil, res)
// 测试失败1 缺少lal_secret
info.UrlParam = ""
res = ctx.OnPubStart(info)
assert.Equal(t, base.ErrSimpleAuthParamNotFound, res)
// 测试失败2 lal_secret值无效
info.UrlParam = "lal_secret=invalid"
res = ctx.OnPubStart(info)
assert.Equal(t, base.ErrSimpleAuthFailed, res)
}

@ -23,9 +23,12 @@ type ServerObserver interface {
///////////////////////////////////////////////////////////////////////////
// OnNewRtspPubSession
//
// @brief Announce阶段回调
// @return 如果返回false则表示上层要强制关闭这个推流请求
OnNewRtspPubSession(session *PubSession) bool
// @return 如果返回非nil则表示上层要强制关闭这个推流请求
//
OnNewRtspPubSession(session *PubSession) error
OnDelRtspPubSession(session *PubSession)
@ -38,9 +41,12 @@ type ServerObserver interface {
//
OnNewRtspSubSessionDescribe(session *SubSession) (ok bool, sdp []byte)
// @brief Describe阶段回调
// @return ok 如果返回false则表示上层要强制关闭这个拉流请求
OnNewRtspSubSessionPlay(session *SubSession) bool
// OnNewRtspSubSessionPlay
//
// @brief Play阶段回调
// @return ok 如果返回非nil则表示上层要强制关闭这个拉流请求
//
OnNewRtspSubSessionPlay(session *SubSession) error
OnDelRtspSubSession(session *SubSession)
}
@ -87,31 +93,30 @@ func (s *Server) Dispose() {
}
}
// ServerCommandSessionObserver
func (s *Server) OnNewRtspPubSession(session *PubSession) bool {
// ----- ServerCommandSessionObserver ----------------------------------------------------------------------------------
func (s *Server) OnNewRtspPubSession(session *PubSession) error {
return s.observer.OnNewRtspPubSession(session)
}
// ServerCommandSessionObserver
func (s *Server) OnNewRtspSubSessionDescribe(session *SubSession) (ok bool, sdp []byte) {
return s.observer.OnNewRtspSubSessionDescribe(session)
}
// ServerCommandSessionObserver
func (s *Server) OnNewRtspSubSessionPlay(session *SubSession) bool {
func (s *Server) OnNewRtspSubSessionPlay(session *SubSession) error {
return s.observer.OnNewRtspSubSessionPlay(session)
}
// ServerCommandSessionObserver
func (s *Server) OnDelRtspPubSession(session *PubSession) {
s.observer.OnDelRtspPubSession(session)
}
// ServerCommandSessionObserver
func (s *Server) OnDelRtspSubSession(session *SubSession) {
s.observer.OnDelRtspSubSession(session)
}
// ---------------------------------------------------------------------------------------------------------------------
func (s *Server) handleTcpConnect(conn net.Conn) {
session := NewServerCommandSession(s, conn)
s.observer.OnNewRtspSessionConnect(session)

@ -28,9 +28,9 @@ type ServerCommandSessionObserver interface {
// OnNewRtspPubSession
//
// @brief Announce阶段回调
// @return 如果返回false,则表示上层要强制关闭这个推流请求
// @return 如果返回非nil,则表示上层要强制关闭这个推流请求
//
OnNewRtspPubSession(session *PubSession) bool
OnNewRtspPubSession(session *PubSession) error
// OnNewRtspSubSessionDescribe
//
@ -42,10 +42,10 @@ type ServerCommandSessionObserver interface {
// OnNewRtspSubSessionPlay
//
// @brief Describe阶段回调
// @return ok 如果返回false,则表示上层要强制关闭这个拉流请求
// @brief Play阶段回调
// @return ok 如果返回非nil,则表示上层要强制关闭这个拉流请求
//
OnNewRtspSubSessionPlay(session *SubSession) bool
OnNewRtspSubSessionPlay(session *SubSession) error
}
type ServerCommandSession struct {
@ -237,8 +237,8 @@ func (session *ServerCommandSession) handleAnnounce(requestCtx nazahttp.HttpReqM
nazalog.Infof("[%s] link new PubSession. [%s]", session.uniqueKey, session.pubSession.uniqueKey)
session.pubSession.InitWithSdp(sdpCtx)
if ok := session.observer.OnNewRtspPubSession(session.pubSession); !ok {
return base.ErrRtspClosedByObserver
if err = session.observer.OnNewRtspPubSession(session.pubSession); err != nil {
return err
}
resp := PackResponseAnnounce(requestCtx.Headers.Get(HeaderCSeq))
@ -350,8 +350,9 @@ func (session *ServerCommandSession) handleRecord(requestCtx nazahttp.HttpReqMsg
func (session *ServerCommandSession) handlePlay(requestCtx nazahttp.HttpReqMsgCtx) error {
nazalog.Infof("[%s] < R PLAY", session.uniqueKey)
if ok := session.observer.OnNewRtspSubSessionPlay(session.subSession); !ok {
return base.ErrRtspClosedByObserver
// TODO(chef): [opt] 上层关闭可以考虑回复非200状态码再关闭
if err := session.observer.OnNewRtspSubSessionPlay(session.subSession); err != nil {
return err
}
resp := PackResponsePlay(requestCtx.Headers.Get(HeaderCSeq))
_, err := session.conn.Write([]byte(resp))

Loading…
Cancel
Save