[feat] 新增simple auth鉴权功能,rtmp,httpflv支持md5鉴权

pull/130/head
q191201771 3 years ago
parent 7f79c71356
commit 57ad766d34

@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.2.4",
"conf_version": "v0.2.5",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -71,6 +71,12 @@
"on_sub_stop": "http://127.0.0.1:10101/on_sub_stop",
"on_rtmp_connect": "http://127.0.0.1:10101/on_rtmp_connect"
},
"simple_auth": {
"key": "q191201771",
"pub_rtmp_enable": false,
"sub_rtmp_enable": false,
"sub_httpflv_enable": false
},
"pprof": {
"enable": true,
"addr": ":8084"

@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.2.4",
"conf_version": "v0.2.5",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -17,13 +17,13 @@
},
"httpflv": {
"enable": true,
"enable_https": false,
"enable_https": true,
"url_pattern": "/",
"gop_num": 0
},
"hls": {
"enable": true,
"enable_https": false,
"enable_https": true,
"url_pattern": "/hls/",
"out_path": "./lal_record/hls/",
"fragment_duration_ms": 3000,
@ -33,7 +33,7 @@
},
"httpts": {
"enable": true,
"enable_https":false,
"enable_https": true,
"url_pattern": "/"
},
"rtsp": {
@ -71,6 +71,12 @@
"on_sub_stop": "http://127.0.0.1:10101/on_sub_stop",
"on_rtmp_connect": "http://127.0.0.1:10101/on_rtmp_connect"
},
"simple_auth": {
"key": "q191201771",
"pub_rtmp_enable": false,
"sub_rtmp_enable": false,
"sub_httpflv_enable": false
},
"pprof": {
"enable": true,
"addr": ":8084"

@ -9,9 +9,10 @@
package avc
import (
"io"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/naza/pkg/nazalog"
"io"
"github.com/q191201771/lal/pkg/base"

@ -73,6 +73,15 @@ var (
var ErrSdp = errors.New("lal.sdp: fxxk")
// ----- pkg/logic -------------------------------------------------------------------------------------------------------
var (
ErrDupInStream = errors.New("lal.logic: in stream already exist at group")
ErrSimpleAuthParamNotFound = errors.New("lal.logic: simple auth failed since url param lal_secret not found")
ErrSimpleAuthFailed = errors.New("lal.logic: simple auth failed since url param lal_secret invalid")
)
// ---------------------------------------------------------------------------------------------------------------------
func NewErrAmfInvalidType(b byte) error {

@ -21,7 +21,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
const ConfVersion = "v0.2.4"
const ConfVersion = "v0.2.5"
const (
defaultHlsCleanupMode = hls.CleanupModeInTheEnd
@ -45,6 +45,7 @@ type Config struct {
HttpApiConfig HttpApiConfig `json:"http_api"`
ServerId string `json:"server_id"`
HttpNotifyConfig HttpNotifyConfig `json:"http_notify"`
SimpleAuthConfig SimpleAuthConfig `json:"simple_auth"`
PprofConfig PprofConfig `json:"pprof"`
LogConfig nazalog.Option `json:"log"`
}
@ -118,6 +119,13 @@ type HttpNotifyConfig struct {
OnRtmpConnect string `json:"on_rtmp_connect"`
}
type SimpleAuthConfig struct {
Key string `json:"key"`
PubRtmpEnable bool `json:"pub_rtmp_enable"`
SubRtmpEnable bool `json:"sub_rtmp_enable"`
SubHttpflvEnable bool `json:"sub_httpflv_enable"`
}
type PprofConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`

@ -305,15 +305,15 @@ func (group *Group) Dispose() {
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) bool {
func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error {
nazalog.Debugf("[%s] [%s] add PubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.hasInSession() {
nazalog.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey())
return false
nazalog.Errorf("[%s] in stream already exist at group. wanna add=%s", group.UniqueKey, session.UniqueKey())
return base.ErrDupInStream
}
group.rtmpPubSession = session
@ -337,7 +337,7 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) bool {
session.SetPubSessionObserver(group)
}
return true
return nil
}
// TODO chef: rtsp package中增加回调返回值判断如果是false将连接关掉

@ -20,9 +20,13 @@ import (
)
type HttpServerHandlerObserver interface {
// OnNewHttpflvSubSession
//
// 通知上层有新的拉流者
// 返回值: true则允许拉流false则关闭连接
OnNewHttpflvSubSession(session *httpflv.SubSession) bool
//
// @return true则允许拉流false则关闭连接
//
OnNewHttpflvSubSession(session *httpflv.SubSession) error
OnDelHttpflvSubSession(session *httpflv.SubSession)
@ -76,8 +80,10 @@ func (h *HttpServerHandler) ServeSubSession(writer http.ResponseWriter, req *htt
if strings.HasSuffix(urlCtx.LastItemOfPath, ".flv") {
session := httpflv.NewSubSession(conn, urlCtx, isWebSocket, webSocketKey)
nazalog.Debugf("[%s] < read http request. url=%s", session.UniqueKey(), session.Url())
if !h.observer.OnNewHttpflvSubSession(session) {
session.Dispose()
if err = h.observer.OnNewHttpflvSubSession(session); err != nil {
nazalog.Infof("[%s] dispose by observer. err=%+v", session.UniqueKey(), err)
_ = session.Dispose()
return
}
err = session.RunLoop()
nazalog.Debugf("[%s] httpflv sub session loop done. err=%v", session.UniqueKey(), err)

@ -51,6 +51,8 @@ type ServerManager struct {
mutex sync.Mutex
groupManager IGroupManager
simpleAuthCtx *SimpleAuthCtx
}
func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
@ -113,6 +115,8 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
sm.httpApiServer = NewHttpApiServer(sm.config.HttpApiConfig.Addr, sm)
}
sm.simpleAuthCtx = NewSimpleAuthCtx(sm.config.SimpleAuthConfig)
return sm
}
@ -379,13 +383,10 @@ func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.Obj
sm.option.NotifyHandler.OnRtmpConnect(info)
}
func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool {
func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
res := group.AddRtmpPubSession(session)
// TODO chef: res值为false时可以考虑不回调
// TODO chef: 每次赋值都逐个拼代码冗余考虑直接用ISession抽离一下代码
var info base.PubStartInfo
info.ServerId = sm.config.ServerId
@ -396,10 +397,22 @@ func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool {
info.UrlParam = session.RawQuery()
info.SessionId = session.UniqueKey()
info.RemoteAddr = session.GetStat().RemoteAddr
// 先做simple auth鉴权
if err := sm.simpleAuthCtx.OnPubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
if err := group.AddRtmpPubSession(session); err != nil {
return err
}
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnPubStart(info)
return res
return nil
}
func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) {
@ -426,11 +439,9 @@ func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) {
sm.option.NotifyHandler.OnPubStop(info)
}
func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool {
func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddRtmpSubSession(session)
var info base.SubStartInfo
info.ServerId = sm.config.ServerId
@ -441,11 +452,19 @@ func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool {
info.UrlParam = session.RawQuery()
info.SessionId = session.UniqueKey()
info.RemoteAddr = session.GetStat().RemoteAddr
if err := sm.simpleAuthCtx.OnSubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddRtmpSubSession(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnSubStart(info)
return true
sm.option.NotifyHandler.OnSubStart(info)
return nil
}
func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) {
@ -473,11 +492,9 @@ func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) {
// ----- implement HttpServerHandlerObserver interface -----------------------------------------------------------------
func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) bool {
func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddHttpflvSubSession(session)
var info base.SubStartInfo
info.ServerId = sm.config.ServerId
@ -488,10 +505,19 @@ func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) boo
info.UrlParam = session.RawQuery()
info.SessionId = session.UniqueKey()
info.RemoteAddr = session.GetStat().RemoteAddr
if err := sm.simpleAuthCtx.OnSubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.AddHttpflvSubSession(session)
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnSubStart(info)
return true
return nil
}
func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) {

@ -0,0 +1,74 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"net/url"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazamd5"
)
func SimpleAuthCalcSecret(key string, streamName string) string {
return nazamd5.Md5([]byte(key + streamName))
}
// ---------------------------------------------------------------------------------------------------------------------
// TODO(chef): [refactor] 结合 NotifyHandler 整理
const secretName = "lal_secret"
type SimpleAuthCtx struct {
key string
config SimpleAuthConfig
}
// NewSimpleAuthCtx
//
// @param key: 如果为空,则所有鉴权接口都返回成功
//
func NewSimpleAuthCtx(config SimpleAuthConfig) *SimpleAuthCtx {
return &SimpleAuthCtx{
config: config,
}
}
func (s *SimpleAuthCtx) OnPubStart(info base.PubStartInfo) error {
if s.config.PubRtmpEnable && info.Protocol == base.ProtocolRtmp {
return s.check(info.StreamName, info.UrlParam)
}
return nil
}
func (s *SimpleAuthCtx) OnSubStart(info base.SubStartInfo) error {
if (s.config.SubRtmpEnable && info.Protocol == base.ProtocolRtmp) ||
(s.config.SubHttpflvEnable && info.Protocol == base.ProtocolHttpflv) {
return s.check(info.StreamName, info.UrlParam)
}
return nil
}
func (s *SimpleAuthCtx) check(streamName string, urlParam string) error {
q, err := url.ParseQuery(urlParam)
if err != nil {
return err
}
v := q.Get(secretName)
if v == "" {
return base.ErrSimpleAuthParamNotFound
}
se := SimpleAuthCalcSecret(s.config.Key, streamName)
if se != v {
return base.ErrSimpleAuthFailed
}
return nil
}

@ -0,0 +1,34 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"testing"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/assert"
)
func TestSimpleAuthCalcSecret(t *testing.T) {
res := SimpleAuthCalcSecret("q191201771", "test110")
assert.Equal(t, "700997e1595a06c9ffa60ebef79105b0", res)
}
func TestSimpleAuthCtx(t *testing.T) {
ctx := NewSimpleAuthCtx(SimpleAuthConfig{
Key: "q191201771",
PubRtmpEnable: true,
})
var info base.PubStartInfo
info.Protocol = base.ProtocolRtmp
info.StreamName = "test110"
info.UrlParam = "lal_secret=700997e1595a06c9ffa60ebef79105b0"
res := ctx.OnPubStart(info)
assert.Equal(t, nil, res)
}

@ -16,9 +16,22 @@ import (
type ServerObserver interface {
OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
OnNewRtmpPubSession(session *ServerSession) bool // 返回true则允许推流返回false则强制关闭这个连接
// OnNewRtmpPubSession
//
// 上层代码应该在这个事件回调中注册音视频数据的监听
//
// @return 上层如果想关闭这个session则回调中返回不为nil的error值
//
OnNewRtmpPubSession(session *ServerSession) error
// OnDelRtmpPubSession
//
// 注意如果session是上层通过 OnNewRtmpPubSession 回调的返回值关闭的则该session不再触发这个逻辑
//
OnDelRtmpPubSession(session *ServerSession)
OnNewRtmpSubSession(session *ServerSession) bool // 返回true则允许拉流返回false则强制关闭这个连接
OnNewRtmpSubSession(session *ServerSession) error
OnDelRtmpSubSession(session *ServerSession)
}
@ -65,8 +78,11 @@ func (server *Server) Dispose() {
func (server *Server) handleTcpConnect(conn net.Conn) {
log.Infof("accept a rtmp connection. remoteAddr=%s", conn.RemoteAddr().String())
session := NewServerSession(server, conn)
err := session.RunLoop()
log.Infof("[%s] rtmp loop done. err=%v", session.uniqueKey, err)
_ = session.RunLoop()
if session.DisposeByObserverFlag {
return
}
switch session.t {
case ServerSessionTypeUnknown:
// noop
@ -77,24 +93,16 @@ func (server *Server) handleTcpConnect(conn net.Conn) {
}
}
// ServerSessionObserver
// ----- ServerSessionObserver ------------------------------------------------------------------------------------
func (server *Server) OnRtmpConnect(session *ServerSession, opa ObjectPairArray) {
server.observer.OnRtmpConnect(session, opa)
}
// ServerSessionObserver
func (server *Server) OnNewRtmpPubSession(session *ServerSession) {
if !server.observer.OnNewRtmpPubSession(session) {
log.Warnf("dispose PubSession since pub exist.")
session.Dispose()
return
}
func (server *Server) OnNewRtmpPubSession(session *ServerSession) error {
return server.observer.OnNewRtmpPubSession(session)
}
// ServerSessionObserver
func (server *Server) OnNewRtmpSubSession(session *ServerSession) {
if !server.observer.OnNewRtmpSubSession(session) {
session.Dispose()
return
}
func (server *Server) OnNewRtmpSubSession(session *ServerSession) error {
return server.observer.OnNewRtmpSubSession(session)
}

@ -12,6 +12,7 @@ import (
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/q191201771/naza/pkg/nazaerrors"
@ -27,8 +28,16 @@ import (
type ServerSessionObserver interface {
OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
OnNewRtmpPubSession(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听
OnNewRtmpSubSession(session *ServerSession)
// OnNewRtmpPubSession
//
// 上层代码应该在这个事件回调中注册音视频数据的监听
//
// @return 上层如果想关闭这个session则回调中返回不为nil的error值
//
OnNewRtmpPubSession(session *ServerSession) error
OnNewRtmpSubSession(session *ServerSession) error
}
type PubSessionObserver interface {
@ -74,6 +83,10 @@ type ServerSession struct {
// only for SubSession
IsFresh bool
ShouldWaitVideoKeyFrame bool
disposeOnce sync.Once
DisposeByObserverFlag bool
}
func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession {
@ -102,10 +115,14 @@ func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSess
func (s *ServerSession) RunLoop() (err error) {
if err = s.handshake(); err != nil {
_ = s.dispose(err)
return err
}
return s.runReadLoop()
err = s.runReadLoop()
_ = s.dispose(err)
return err
}
func (s *ServerSession) Write(msg []byte) error {
@ -123,8 +140,7 @@ func (s *ServerSession) Flush() error {
}
func (s *ServerSession) Dispose() error {
nazalog.Infof("[%s] lifecycle dispose rtmp ServerSession.", s.uniqueKey)
return s.conn.Close()
return s.dispose(nil)
}
func (s *ServerSession) Url() string {
@ -419,7 +435,7 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) {
nazalog.Infof("[%s] < R publish('%s')", s.uniqueKey, s.streamNameWithRawQuery)
nazalog.Infof("[%s] > W onStatus('NetStream.Publish.Start').", s.uniqueKey)
if err := s.packer.writeOnStatusPublish(s.conn, Msid1); err != nil {
if err = s.packer.writeOnStatusPublish(s.conn, Msid1); err != nil {
return err
}
@ -427,9 +443,11 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) {
s.modConnProps()
s.t = ServerSessionTypePub
s.observer.OnNewRtmpPubSession(s)
return nil
err = s.observer.OnNewRtmpPubSession(s)
if err != nil {
s.DisposeByObserverFlag = true
}
return err
}
func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
@ -467,9 +485,11 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
s.modConnProps()
s.t = ServerSessionTypeSub
s.observer.OnNewRtmpSubSession(s)
return nil
err = s.observer.OnNewRtmpSubSession(s)
if err != nil {
s.DisposeByObserverFlag = true
}
return err
}
func (s *ServerSession) modConnProps() {
@ -482,3 +502,16 @@ func (s *ServerSession) modConnProps() {
s.conn.ModWriteTimeoutMs(serverSessionWriteAvTimeoutMs)
}
}
func (s *ServerSession) dispose(err error) error {
var retErr error
s.disposeOnce.Do(func() {
nazalog.Infof("[%s] lifecycle dispose rtmp ServerSession. err=%+v", s.uniqueKey, err)
if s.conn == nil {
retErr = base.ErrSessionNotStarted
return
}
retErr = s.conn.Close()
})
return retErr
}

Loading…
Cancel
Save