You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lal/pkg/logic/server_manager__.go

825 lines
22 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"sync"
"time"
"github.com/q191201771/naza/pkg/taskpool"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/naza/pkg/defertaskthread"
//"github.com/felixge/fgprof"
)
type ServerManager struct {
option Option
serverStartTime string
config *Config
httpServerManager *base.HttpServerManager
httpServerHandler *HttpServerHandler
hlsServerHandler *hls.ServerHandler
rtmpServer *rtmp.Server
rtmpsServer *rtmp.Server
rtspServer *rtsp.Server
rtspsServer *rtsp.Server
httpApiServer *HttpApiServer
pprofServer *http.Server
wsrtspServer *rtsp.WebsocketServer
exitChan chan struct{}
mutex sync.Mutex
groupManager IGroupManager
onHookSession func(uniqueKey string, streamName string) ICustomizeHookSessionContext
notifyHandlerThread taskpool.Pool
ipBlacklist IpBlacklist
}
func NewServerManager(modOption ...ModOption) *ServerManager {
sm := &ServerManager{
serverStartTime: base.ReadableNowTime(),
exitChan: make(chan struct{}, 1),
}
sm.groupManager = NewSimpleGroupManager(sm)
sm.option = defaultOption
for _, fn := range modOption {
fn(&sm.option)
}
rawContent := sm.option.ConfRawContent
if len(rawContent) == 0 {
rawContent = base.WrapReadConfigFile(sm.option.ConfFilename, base.LalDefaultConfFilenameList, func() {
_, _ = fmt.Fprintf(os.Stderr, `
Example:
%s -c %s
Github: %s
Doc: %s
`, os.Args[0], filepath.FromSlash("./conf/"+base.LalDefaultConfigFilename), base.LalGithubSite, base.LalDocSite)
})
}
sm.config = LoadConfAndInitLog(rawContent)
base.LogoutStartInfo()
if sm.config.HlsConfig.Enable && sm.config.HlsConfig.UseMemoryAsDiskFlag {
Log.Infof("hls use memory as disk.")
hls.SetUseMemoryAsDiskFlag(true)
}
if sm.config.RecordConfig.EnableFlv {
if err := os.MkdirAll(sm.config.RecordConfig.FlvOutPath, 0777); err != nil {
Log.Errorf("record flv mkdir error. path=%s, err=%+v", sm.config.RecordConfig.FlvOutPath, err)
}
}
if sm.config.RecordConfig.EnableMpegts {
if err := os.MkdirAll(sm.config.RecordConfig.MpegtsOutPath, 0777); err != nil {
Log.Errorf("record mpegts mkdir error. path=%s, err=%+v", sm.config.RecordConfig.MpegtsOutPath, err)
}
}
sm.nhInitNotifyHandler()
if sm.config.HttpflvConfig.Enable || sm.config.HttpflvConfig.EnableHttps ||
sm.config.HttptsConfig.Enable || sm.config.HttptsConfig.EnableHttps ||
sm.config.HlsConfig.Enable || sm.config.HlsConfig.EnableHttps {
sm.httpServerManager = base.NewHttpServerManager()
sm.httpServerHandler = NewHttpServerHandler(sm)
sm.hlsServerHandler = hls.NewServerHandler(sm.config.HlsConfig.OutPath, sm.config.HlsConfig.UrlPattern, sm.config.HlsConfig.SubSessionHashKey, sm.config.HlsConfig.SubSessionTimeoutMs, sm)
}
if sm.config.RtmpConfig.Enable {
sm.rtmpServer = rtmp.NewServer(sm.config.RtmpConfig.Addr, sm)
}
if sm.config.RtmpConfig.RtmpsEnable {
sm.rtmpsServer = rtmp.NewServer(sm.config.RtmpConfig.RtmpsAddr, sm)
}
if sm.config.RtspConfig.Enable {
sm.rtspServer = rtsp.NewServer(sm.config.RtspConfig.Addr, sm, sm.config.RtspConfig.ServerAuthConfig)
}
if sm.config.RtspConfig.RtspsEnable {
sm.rtspsServer = rtsp.NewServer(sm.config.RtspConfig.RtspsAddr, sm, sm.config.RtspConfig.ServerAuthConfig)
}
if sm.config.RtspConfig.WsRtspEnable {
sm.wsrtspServer = rtsp.NewWebsocketServer(sm.config.RtspConfig.WsRtspAddr, sm, sm.config.RtspConfig.ServerAuthConfig)
}
if sm.config.HttpApiConfig.Enable {
sm.httpApiServer = NewHttpApiServer(sm.config.HttpApiConfig.Addr, sm)
}
if sm.config.PprofConfig.Enable {
sm.pprofServer = &http.Server{Addr: sm.config.PprofConfig.Addr, Handler: nil}
}
if sm.option.Authentication == nil {
sm.option.Authentication = NewSimpleAuthCtx(sm.config.SimpleAuthConfig)
}
return sm
}
// ----- implement ILalServer interface --------------------------------------------------------------------------------
func (sm *ServerManager) RunLoop() error {
// TODO(chef): 作为阻塞函数,外部只能获取失败或结束的信息,没法获取到启动成功的信息
sm.nhOnServerStart(sm.StatLalInfo())
if sm.pprofServer != nil {
go func() {
//Log.Warn("start fgprof.")
//http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
Log.Infof("start web pprof listen. addr=%s", sm.config.PprofConfig.Addr)
if err := sm.pprofServer.ListenAndServe(); err != nil {
Log.Error(err)
}
}()
}
go base.RunSignalHandler(func() {
sm.Dispose()
})
var addMux = func(config CommonHttpServerConfig, handler base.Handler, name string) error {
if config.Enable {
err := sm.httpServerManager.AddListen(
base.LocalAddrCtx{Addr: config.HttpListenAddr},
config.UrlPattern,
handler,
)
if err != nil {
Log.Errorf("add http listen for %s failed. addr=%s, pattern=%s, err=%+v", name, config.HttpListenAddr, config.UrlPattern, err)
return err
}
Log.Infof("add http listen for %s. addr=%s, pattern=%s", name, config.HttpListenAddr, config.UrlPattern)
}
if config.EnableHttps {
err := sm.httpServerManager.AddListen(
base.LocalAddrCtx{IsHttps: true, Addr: config.HttpsListenAddr, CertFile: config.HttpsCertFile, KeyFile: config.HttpsKeyFile},
config.UrlPattern,
handler,
)
if err != nil {
Log.Errorf("add https listen for %s failed. addr=%s, pattern=%s, err=%+v", name, config.HttpsListenAddr, config.UrlPattern, err)
} else {
Log.Infof("add https listen for %s. addr=%s, pattern=%s", name, config.HttpsListenAddr, config.UrlPattern)
}
}
return nil
}
if err := addMux(sm.config.HttpflvConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpflv"); err != nil {
return err
}
if err := addMux(sm.config.HttptsConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpts"); err != nil {
return err
}
if err := addMux(sm.config.HlsConfig.CommonHttpServerConfig, sm.serveHls, "hls"); err != nil {
return err
}
if sm.httpServerManager != nil {
go func() {
if err := sm.httpServerManager.RunLoop(); err != nil {
Log.Error(err)
}
}()
}
if sm.rtmpServer != nil {
if err := sm.rtmpServer.Listen(); err != nil {
return err
}
go func() {
if err := sm.rtmpServer.RunLoop(); err != nil {
Log.Error(err)
}
}()
}
if sm.rtmpsServer != nil {
err := sm.rtmpsServer.ListenWithTLS(sm.config.RtmpConfig.RtmpsCertFile, sm.config.RtmpConfig.RtmpsKeyFile)
// rtmps启动失败影响降级当rtmps启动时我们并不返回错误保证不因为rtmps影响其他服务
if err == nil {
go func() {
if errRun := sm.rtmpsServer.RunLoop(); errRun != nil {
Log.Error(errRun)
}
}()
}
}
if sm.rtspServer != nil {
if err := sm.rtspServer.Listen(); err != nil {
return err
}
go func() {
if err := sm.rtspServer.RunLoop(); err != nil {
Log.Error(err)
}
}()
}
if sm.rtspsServer != nil {
err := sm.rtspsServer.ListenWithTLS(sm.config.RtspConfig.RtspsCertFile, sm.config.RtspConfig.RtspsKeyFile)
// rtsps启动失败影响降级当rtsps启动时我们并不返回错误保证不因为rtsps影响其他服务
if err == nil {
go func() {
if errRun := sm.rtspsServer.RunLoop(); errRun != nil {
Log.Error(errRun)
}
}()
}
}
if sm.wsrtspServer != nil {
go func() {
err := sm.wsrtspServer.Listen()
if err != nil {
Log.Error(err)
}
}()
}
if sm.httpApiServer != nil {
if err := sm.httpApiServer.Listen(); err != nil {
return err
}
go func() {
if err := sm.httpApiServer.RunLoop(); err != nil {
Log.Error(err)
}
}()
}
uis := uint32(sm.config.HttpNotifyConfig.UpdateIntervalSec)
var updateInfo base.UpdateInfo
updateInfo.Groups = sm.StatAllGroup()
sm.nhOnUpdate(updateInfo)
t := time.NewTicker(1 * time.Second)
defer t.Stop()
var tickCount uint32
for {
select {
case <-sm.exitChan:
return nil
case <-t.C:
tickCount++
sm.mutex.Lock()
// 关闭空闲的group
sm.groupManager.Iterate(func(group *Group) bool {
if group.IsInactive() {
Log.Infof("erase inactive group. [%s]", group.UniqueKey)
group.Dispose()
return false
}
group.Tick(tickCount)
return true
})
// 定时打印一些group相关的debug日志
if sm.config.DebugConfig.LogGroupIntervalSec > 0 &&
tickCount%uint32(sm.config.DebugConfig.LogGroupIntervalSec) == 0 {
groupNum := sm.groupManager.Len()
Log.Debugf("DEBUG_GROUP_LOG: group size=%d", groupNum)
if sm.config.DebugConfig.LogGroupMaxGroupNum > 0 {
var loggedGroupCount int
sm.groupManager.Iterate(func(group *Group) bool {
loggedGroupCount++
if loggedGroupCount <= sm.config.DebugConfig.LogGroupMaxGroupNum {
Log.Debugf("DEBUG_GROUP_LOG: %d %s", loggedGroupCount, group.StringifyDebugStats(sm.config.DebugConfig.LogGroupMaxSubNumPerGroup))
}
return true
})
}
}
sm.mutex.Unlock()
// 定时通过http notify发送group相关的信息
if uis != 0 && (tickCount%uis) == 0 {
updateInfo.Groups = sm.StatAllGroup()
sm.nhOnUpdate(updateInfo)
}
}
}
// never reach here
}
func (sm *ServerManager) Dispose() {
Log.Debug("dispose server manager.")
if sm.rtmpServer != nil {
sm.rtmpServer.Dispose()
}
if sm.rtmpsServer != nil {
sm.rtmpsServer.Dispose()
}
if sm.rtspServer != nil {
sm.rtspServer.Dispose()
}
if sm.rtspsServer != nil {
sm.rtspsServer.Dispose()
}
if sm.httpServerManager != nil {
sm.httpServerManager.Dispose()
}
if sm.pprofServer != nil {
sm.pprofServer.Close()
}
//if sm.hlsServer != nil {
// sm.hlsServer.Dispose()
//}
sm.mutex.Lock()
sm.groupManager.Iterate(func(group *Group) bool {
group.Dispose()
return true
})
sm.mutex.Unlock()
sm.exitChan <- struct{}{}
}
// ---------------------------------------------------------------------------------------------------------------------
func (sm *ServerManager) AddCustomizePubSession(streamName string) (ICustomizePubSessionContext, error) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup("", streamName)
return group.AddCustomizePubSession(streamName)
}
func (sm *ServerManager) DelCustomizePubSession(sessionCtx ICustomizePubSessionContext) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup("", sessionCtx.StreamName())
if group == nil {
return
}
group.DelCustomizePubSession(sessionCtx)
}
func (sm *ServerManager) WithOnHookSession(onHookSession func(uniqueKey string, streamName string) ICustomizeHookSessionContext) {
sm.onHookSession = onHookSession
}
// ----- implement rtmp.IServerObserver interface -----------------------------------------------------------------------
func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.ObjectPairArray) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
var info base.RtmpConnectInfo
info.SessionId = session.UniqueKey()
info.RemoteAddr = session.GetStat().RemoteAddr
info.App, _ = opa.FindString("app")
info.FlashVer, _ = opa.FindString("flashVer")
info.TcUrl, _ = opa.FindString("tcUrl")
sm.nhOnRtmpConnect(info)
}
func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2PubStartInfo(session)
// 先做simple auth鉴权
if err := sm.option.Authentication.OnPubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
if err := group.AddRtmpPubSession(session); err != nil {
return err
}
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnPubStart(info)
return nil
}
func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
if group == nil {
return
}
group.DelRtmpPubSession(session)
info := base.Session2PubStopInfo(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnPubStop(info)
}
func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2SubStartInfo(session)
if err := sm.option.Authentication.OnSubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddRtmpSubSession(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStart(info)
return nil
}
func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
if group == nil {
return
}
group.DelRtmpSubSession(session)
info := base.Session2SubStopInfo(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStop(info)
}
// ----- implement IHttpServerHandlerObserver interface -----------------------------------------------------------------
func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2SubStartInfo(session)
if err := sm.option.Authentication.OnSubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddHttpflvSubSession(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStart(info)
return nil
}
func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
if group == nil {
return
}
group.DelHttpflvSubSession(session)
info := base.Session2SubStopInfo(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStop(info)
}
func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2SubStartInfo(session)
if err := sm.option.Authentication.OnSubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddHttptsSubSession(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStart(info)
return nil
}
func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
if group == nil {
return
}
group.DelHttptsSubSession(session)
info := base.Session2SubStopInfo(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStop(info)
}
// ----- implement rtsp.IServerObserver interface -----------------------------------------------------------------------
func (sm *ServerManager) OnNewRtspSessionConnect(session *rtsp.ServerCommandSession) {
// TODO chef: impl me
}
func (sm *ServerManager) OnDelRtspSession(session *rtsp.ServerCommandSession) {
// TODO chef: impl me
}
func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2PubStartInfo(session)
if err := sm.option.Authentication.OnPubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
if err := group.AddRtspPubSession(session); err != nil {
return err
}
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnPubStart(info)
return nil
}
func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
if group == nil {
return
}
group.DelRtspPubSession(session)
info := base.Session2PubStopInfo(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnPubStop(info)
}
func (sm *ServerManager) OnNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2SubStartInfo(session)
if err := sm.option.Authentication.OnSubStart(info); err != nil {
return false, nil
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
ok, sdp = group.HandleNewRtspSubSessionDescribe(session)
if !ok {
return
}
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStart(info)
return
}
func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.HandleNewRtspSubSessionPlay(session)
return nil
}
func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
if group == nil {
return
}
group.DelRtspSubSession(session)
info := base.Session2SubStopInfo(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStop(info)
}
func (sm *ServerManager) OnNewHlsSubSession(session *hls.SubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2SubStartInfo(session)
if err := sm.option.Authentication.OnSubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddHlsSubSession(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStart(info)
return nil
}
func (sm *ServerManager) OnDelHlsSubSession(session *hls.SubSession) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
if group == nil {
return
}
group.DelHlsSubSession(session)
info := base.Session2SubStopInfo(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.nhOnSubStop(info)
}
// ----- implement IGroupCreator interface -----------------------------------------------------------------------------
func (sm *ServerManager) CreateGroup(appName string, streamName string) *Group {
var config *Config
if sm.option.ModConfigGroupCreator != nil {
cloneConfig := *sm.config
sm.option.ModConfigGroupCreator(appName, streamName, &cloneConfig)
config = &cloneConfig
} else {
config = sm.config
}
option := GroupOption{
onHookSession: sm.onHookSession,
}
return NewGroup(appName, streamName, config, option, sm)
}
// ----- implement IGroupObserver interface -----------------------------------------------------------------------------
func (sm *ServerManager) CleanupHlsIfNeeded(appName string, streamName string, path string) {
if sm.config.HlsConfig.Enable &&
(sm.config.HlsConfig.CleanupMode == hls.CleanupModeInTheEnd || sm.config.HlsConfig.CleanupMode == hls.CleanupModeAsap) {
defertaskthread.Go(
sm.config.HlsConfig.FragmentDurationMs*(sm.config.HlsConfig.FragmentNum+sm.config.HlsConfig.DeleteThreshold),
func(param ...interface{}) {
an := param[0].(string)
sn := param[1].(string)
outPath := param[2].(string)
if g := sm.GetGroup(an, sn); g != nil {
if g.IsHlsMuxerAlive() {
Log.Warnf("cancel cleanup hls file path since hls muxer still alive. streamName=%s", sn)
return
}
}
Log.Infof("cleanup hls file path. streamName=%s, path=%s", sn, outPath)
if err := hls.RemoveAll(outPath); err != nil {
Log.Warnf("cleanup hls file path error. path=%s, err=%+v", outPath, err)
}
},
appName,
streamName,
path,
)
}
}
func (sm *ServerManager) OnRelayPullStart(info base.PullStartInfo) {
sm.nhOnRelayPullStart(info)
}
func (sm *ServerManager) OnRelayPullStop(info base.PullStopInfo) {
sm.nhOnRelayPullStop(info)
}
func (sm *ServerManager) OnHlsMakeTs(info base.HlsMakeTsInfo) {
sm.nhOnHlsMakeTs(info)
}
// ---------------------------------------------------------------------------------------------------------------------
func (sm *ServerManager) Config() *Config {
return sm.config
}
func (sm *ServerManager) GetGroup(appName string, streamName string) *Group {
sm.mutex.Lock()
defer sm.mutex.Unlock()
return sm.getGroup(appName, streamName)
}
// ----- private method ------------------------------------------------------------------------------------------------
// 注意,函数内部不加锁,由调用方保证加锁进入
func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group {
g, createFlag := sm.groupManager.GetOrCreateGroup(appName, streamName)
if createFlag {
go g.RunLoop()
}
return g
}
func (sm *ServerManager) getGroup(appName string, streamName string) *Group {
return sm.groupManager.GetGroup(appName, streamName)
}
func (sm *ServerManager) serveHls(writer http.ResponseWriter, req *http.Request) {
urlCtx, err := base.ParseUrl(base.ParseHttpRequest(req), 80)
if err != nil {
Log.Errorf("parse url. err=%+v", err)
return
}
if urlCtx.GetFileType() == "m3u8" {
// TODO(chef): [refactor] 需要整理,这里使用 hls.PathStrategy 不太好 202207
streamName := hls.PathStrategy.GetRequestInfo(urlCtx, sm.config.HlsConfig.OutPath).StreamName
if err = sm.option.Authentication.OnHls(streamName, urlCtx.RawQuery); err != nil {
Log.Errorf("simple auth failed. err=%+v", err)
return
}
}
remoteIp, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
Log.Warnf("SplitHostPort failed. addr=%s, err=%+v", req.RemoteAddr, err)
return
}
if sm.ipBlacklist.Has(remoteIp) {
//Log.Warnf("found %s in ip blacklist, so do not serve this request.", remoteIp)
sm.hlsServerHandler.CloseSubSessionIfExist(req)
writer.WriteHeader(http.StatusNotFound)
return
}
sm.hlsServerHandler.ServeHTTP(writer, req)
}