1. [feat] 配置文件中支持配置是否清除过期流的HLS文件 2. [feat] 增加HTTP API接口`/api/ctrl/kick_out_session`,用于踢掉指定的session 3. [feat] HTTP Notify事件回调中的session结构体都增加session id字段

pull/39/head
q191201771 4 years ago
parent adcb4935ad
commit a58a2ce60c

@ -47,7 +47,7 @@
- [x] **多种编码格式**。视频支持H264/AVCH265/HEVC音频支持AAC
- [x] **静态pull回源**。用于搭建简易集群
- [x] **静态push转推**。支持转推多个地址。通过静态文件配置
- [x] **HTTP API接口**。用于获取服务信息,见[《lal流媒体服务器的HTTP API接口》](https://pengrl.com/p/20100)
- [x] **HTTP API接口**。用于获取服务信息,向服务发送命令。见[《lal流媒体服务器的HTTP API接口》](https://pengrl.com/p/20100)
- [x] **HTTP Notify事件回调**。见[《lal HTTP Notify(or Callback or Webhook)事件回调》](https://pengrl.com/p/20101)
- [x] **秒开播放**。GOP缓冲

@ -0,0 +1,52 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package datamanager
import (
"sync"
"github.com/q191201771/naza/pkg/nazalog"
)
type DataManagerMemory struct {
mutex sync.Mutex
pubStream2ServerID map[string]string
}
func NewDataManagerMemory() *DataManagerMemory {
return &DataManagerMemory{
pubStream2ServerID: make(map[string]string),
}
}
func (d *DataManagerMemory) AddPub(streamName, serverID string) {
d.mutex.Lock()
defer d.mutex.Unlock()
nazalog.Infof("add pub. streamName=%s, serverID=%s", streamName, serverID)
d.pubStream2ServerID[streamName] = serverID
}
func (d *DataManagerMemory) DelPub(streamName, serverID string) {
d.mutex.Lock()
defer d.mutex.Unlock()
// 清除用户推流对应的节点信息
cacheServerID, exist := d.pubStream2ServerID[streamName]
if !exist || serverID != cacheServerID {
nazalog.Errorf("del pub but server id dismatch. streamName=%s, serverID=%s, cache id=%s", streamName, serverID, cacheServerID)
return
}
delete(d.pubStream2ServerID, streamName)
}
func (d *DataManagerMemory) QueryPub(streamName string) (serverID string, exist bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
serverID, exist = d.pubStream2ServerID[streamName]
return
}

@ -0,0 +1,30 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package datamanager
type DataManger interface {
AddPub(streamName, serverID string)
DelPub(streamName, serverID string)
QueryPub(streamName string) (serverID string, exist bool)
}
type DataManagerType int
const (
DMTMemory DataManagerType = iota
)
func NewDataManager(t DataManagerType) DataManger {
switch t {
case DMTMemory:
return NewDataManagerMemory()
default:
panic("invalid data manager type")
}
}

@ -14,8 +14,8 @@ import (
"net"
"net/http"
"strings"
"sync"
"github.com/q191201771/lal/app/demo/dispatch/datamanager"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog"
@ -55,8 +55,7 @@ var (
)
var (
mutex sync.Mutex
stream2ServerID map[string]string
dataManager datamanager.DataManger
)
func OnPubStartHandler(w http.ResponseWriter, r *http.Request) {
@ -69,12 +68,26 @@ func OnPubStartHandler(w http.ResponseWriter, r *http.Request) {
}
nazalog.Infof("[%s] on_pub_start. info=%+v", id, info)
mutex.Lock()
defer mutex.Unlock()
// 演示如何踢掉session服务于鉴权失败等场景
//if info.URLParam == "" {
if info.SessionID == "RTMPPUBSUB1" {
reqServer, exist := serverID2Server[info.ServerID]
if !exist {
nazalog.Errorf("[%s] req server id invalid.", id)
return
}
url := fmt.Sprintf("http://%s/api/ctrl/kick_out_session", reqServer.apiAddr)
var b base.APICtrlKickOutSession
b.StreamName = info.StreamName
b.SessionID = info.SessionID
nazalog.Infof("[%s] ctrl kick out session. send to %s with %+v", id, reqServer.apiAddr, b)
if _, err := nazahttp.PostJson(url, b, nil); err != nil {
nazalog.Errorf("[%s] post json error. err=%+v", id, err)
}
}
// 保存用户推流对应的节点信息
nazalog.Infof("[%s] add to cache.", id)
stream2ServerID[info.StreamName] = info.ServerID
dataManager.AddPub(info.StreamName, info.SessionID)
}
func OnPubStopHandler(w http.ResponseWriter, r *http.Request) {
@ -87,15 +100,7 @@ func OnPubStopHandler(w http.ResponseWriter, r *http.Request) {
}
nazalog.Infof("[%s] on_pub_stop. info=%+v", id, info)
mutex.Lock()
defer mutex.Unlock()
// 清除用户推流对应的节点信息
serverID, exist := stream2ServerID[info.StreamName]
if !exist || serverID != info.ServerID {
nazalog.Errorf("[%s] OnPubStopHandler. req id=%s, cache id=%s", id, info.ServerID, serverID)
return
}
delete(stream2ServerID, serverID)
dataManager.DelPub(info.StreamName, info.ServerID)
}
func OnSubStartHandler(w http.ResponseWriter, r *http.Request) {
@ -109,42 +114,39 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) {
nazalog.Infof("[%s] on_sub_start. info=%+v", id, info)
// sub拉流时判断是否需要触发pull级联拉流
// 是内部级联拉流不需要触发pull级联拉流
// 1. 是内部级联拉流,不需要触发
if strings.Contains(info.URLParam, pullSecretParam) {
nazalog.Infof("[%s] sub is pull by other node, ignore.", id)
return
}
// 已经存在输入流,不需要触发pull级联拉流
// 2. 已经存在输入流,不需要触发
if info.HasInSession {
nazalog.Infof("[%s] in not empty, ignore.", id)
return
}
mutex.Lock()
defer mutex.Unlock()
// 当前节点
// 3. 非法节点,本服务没有配置这个节点
reqServer, exist := serverID2Server[info.ServerID]
if !exist {
nazalog.Errorf("[%s] req server id invalid.", id)
return
}
pubServerID, exist := stream2ServerID[info.StreamName]
// 没有查到流所在节点,不需要触发pull级联拉流
pubServerID, exist := dataManager.QueryPub(info.StreamName)
// 4. 没有查到流所在节点,不需要触发
if !exist {
nazalog.Infof("[%s] pub not exist, ignore.", id)
return
}
// 流所在节点
// TODO chef: 5. 这里的容错是否会出现?是否可以去掉?
pubServer, exist := serverID2Server[pubServerID]
if !exist {
nazalog.Errorf("[%s] pub server id invalid. serverID=%s", id, pubServerID)
return
}
// 向当前节点发送pull级联拉流的命令
// 向pub所在节点发送pull级联拉流的命令
url := fmt.Sprintf("http://%s/api/ctrl/start_pull", reqServer.apiAddr)
var b base.APICtrlStartPullReq
b.Protocol = base.ProtocolRTMP
@ -157,7 +159,6 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) {
if _, err := nazahttp.PostJson(url, b, nil); err != nil {
nazalog.Errorf("[%s] post json error. err=%+v", id, err)
}
}
func OnSubStopHandler(w http.ResponseWriter, r *http.Request) {
@ -185,7 +186,7 @@ func OnUpdateHandler(w http.ResponseWriter, r *http.Request) {
nazalog.Infof("[%s] on_update. info=%+v", id, info)
// TODO chef:
// 1. 更新stream2ServerID去掉过期的增加不存在的
// 1. 更新pubStream2ServerID去掉过期的增加不存在的
// 2. 没有pub但是有sub的触发ctrl pull
}
@ -195,7 +196,7 @@ func logHandler(w http.ResponseWriter, r *http.Request) {
}
func main() {
stream2ServerID = make(map[string]string)
dataManager = datamanager.NewDataManager(datamanager.DMTMemory)
l, err := net.Listen("tcp", listenAddr)
nazalog.Assert(nil, err)

@ -18,7 +18,8 @@
"sub_listen_addr": ":8181",
"out_path": "/tmp/lal/edge/",
"fragment_duration_ms": 3000,
"fragment_num": 6
"fragment_num": 6,
"cleanup_flag": true
},
"httpts": {
"enable": true,

@ -18,7 +18,8 @@
"sub_listen_addr": ":8081",
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6
"fragment_num": 6,
"cleanup_flag": true
},
"httpts": {
"enable": true,

@ -18,7 +18,9 @@
"sub_listen_addr": ":8081", // HLS监听地址
"out_path": "/tmp/lal/hls/", // HLS文件保存根目录
"fragment_duration_ms": 3000, // 单个TS文件切片时长单位毫秒
"fragment_num": 6 // M3U8文件列表中TS文件的数量
"fragment_num": 6, // M3U8文件列表中TS文件的数量
"cleanup_flag": true // 输入流结束后是否清理hls的文件。
// 注意如果为true文件将在输入流结束后的 <fragment_duration_ms> * <fragment_num> * 2 的时间点清理
},
"httpts": {
"enable": true, // 是否开启HTTP-TS服务的监听。注意这并不是HLS中的TS而是在一条HTTP长连接上持续性传输TS流

@ -18,7 +18,8 @@
"sub_listen_addr": ":8081",
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6
"fragment_num": 6,
"cleanup_flag": true
},
"httpts": {
"enable": true,

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.12
require github.com/q191201771/naza v0.15.3
require github.com/q191201771/naza v0.15.4

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.15.3 h1:6IAE0J/z6vWxy1kiC4u5RFFvONa4oWgwJ+IDa5QeSPY=
github.com/q191201771/naza v0.15.3/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=
github.com/q191201771/naza v0.15.4 h1:m9nQ08todkK08gb3sbfXSUxdvoYWRj0SpPILNOXkKV0=
github.com/q191201771/naza v0.15.4/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=

@ -8,21 +8,19 @@
package base
import "errors"
// 文档见: https://pengrl.com/p/20100/
var ErrParamMissing = errors.New("lal.logic: param missing")
const HTTPAPIVersion = "v0.1.0"
const HTTPAPIVersion = "v0.1.1"
const (
ErrorCodeSucc = 0
DespSucc = "succ"
ErrorCodeGroupNotFound = 1001
DespGroupNotFound = "group not found"
ErrorCodeParamMissing = 1002
DespParamMissing = "param missing"
ErrorCodeSucc = 0
DespSucc = "succ"
ErrorCodeGroupNotFound = 1001
DespGroupNotFound = "group not found"
ErrorCodeParamMissing = 1002
DespParamMissing = "param missing"
ErrorCodeSessionNotFound = 1003
DespSessionNotFound = "session not found"
)
type HTTPResponseBasic struct {
@ -60,3 +58,8 @@ type APICtrlStartPullReq struct {
StreamName string `json:"stream_name"`
URLParam string `json:"url_param"`
}
type APICtrlKickOutSession struct {
StreamName string `json:"stream_name"`
SessionID string `json:"session_id"`
}

@ -48,6 +48,7 @@ type StatPull struct {
type StatSession struct {
Protocol string `json:"protocol"`
SessionID string `json:"session_id"`
StartTime string `json:"start_time"`
RemoteAddr string `json:"remote_addr"`
ReadBytesSum uint64 `json:"read_bytes_sum"`
@ -57,6 +58,7 @@ type StatSession struct {
func StatSession2Pub(ss StatSession) (ret StatPub) {
ret.Protocol = ss.Protocol
ret.SessionID = ss.SessionID
ret.StartTime = ss.StartTime
ret.RemoteAddr = ss.RemoteAddr
ret.ReadBytesSum = ss.ReadBytesSum
@ -67,6 +69,7 @@ func StatSession2Pub(ss StatSession) (ret StatPub) {
func StatSession2Sub(ss StatSession) (ret StatSub) {
ret.Protocol = ss.Protocol
ret.SessionID = ss.SessionID
ret.StartTime = ss.StartTime
ret.RemoteAddr = ss.RemoteAddr
ret.ReadBytesSum = ss.ReadBytesSum
@ -77,6 +80,7 @@ func StatSession2Sub(ss StatSession) (ret StatSub) {
func StatSession2Pull(ss StatSession) (ret StatPull) {
ret.Protocol = ss.Protocol
ret.SessionID = ss.SessionID
ret.StartTime = ss.StartTime
ret.RemoteAddr = ss.RemoteAddr
ret.ReadBytesSum = ss.ReadBytesSum

@ -0,0 +1,30 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import "github.com/q191201771/naza/pkg/unique"
const (
UKPRTMPServerSession = "RTMPPUBSUB"
UKPRTSPPubSession = "RTSPPUB"
UKPFLVSubSession = "FLVSUB"
UKPTSSubSession = "TSSUB"
UKPRTSPSubSession = "RTSPSUB"
UKPRTMPPushSession = "RTMPPUSH"
UKPRTMPPullSession = "RTMPPULL"
UKPFLVPullSession = "FLVPULL"
UKPGroup = "GROUP"
UKPHLSMuxer = "HLSMUXER"
UKPStreamer = "STREAMER"
)
func GenUniqueKey(prefix string) string {
return unique.GenUniqueKey(prefix)
}

@ -19,8 +19,6 @@ import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/unique"
"github.com/q191201771/naza/pkg/nazalog"
)
@ -44,12 +42,12 @@ type MuxerConfig struct {
type Muxer struct {
UniqueKey string
streamName string
outPath string
playlistFilename string
playlistFilenameBak string
recordPlayListFilename string
recordPlayListFilenameBak string
streamName string // const after init
outPath string // const after init
playlistFilename string // const after init
playlistFilenameBak string // const after init
recordPlayListFilename string // const after init
recordPlayListFilenameBak string // const after init
config *MuxerConfig
observer MuxerObserver
@ -78,7 +76,7 @@ type fragmentInfo struct {
// @param observer 可以为nil如果不为nilTS流将回调给上层
func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *Muxer {
uk := unique.GenUniqueKey("HLSMUXER")
uk := base.GenUniqueKey(base.UKPHLSMuxer)
op := getMuxerOutPath(config.OutPath, streamName)
playlistFilename := getM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
@ -177,6 +175,10 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) {
}
}
func (m *Muxer) OutPath() string {
return m.outPath
}
// 决定是否开启新的TS切片文件注意可能已经有TS切片也可能没有这是第一个切片
//
// @param boundary 调用方认为可能是开启新TS切片的时间点

@ -15,7 +15,6 @@ import (
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
type StreamerObserver interface {
@ -42,7 +41,7 @@ type Streamer struct {
}
func NewStreamer(observer StreamerObserver) *Streamer {
uk := unique.GenUniqueKey("STREAMER")
uk := base.GenUniqueKey(base.UKPStreamer)
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
return &Streamer{

@ -21,7 +21,6 @@ import (
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
type PullSessionOption struct {
@ -54,7 +53,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
fn(&option)
}
uk := unique.GenUniqueKey("FLVPULL")
uk := base.GenUniqueKey(base.UKPFLVPullSession)
s := &PullSession{
option: option,
UniqueKey: uk,

@ -21,7 +21,6 @@ import (
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
var flvHTTPResponseHeader []byte
@ -45,7 +44,7 @@ type SubSession struct {
}
func NewSubSession(conn net.Conn) *SubSession {
uk := unique.GenUniqueKey("FLVSUB")
uk := base.GenUniqueKey(base.UKPFLVSubSession)
s := &SubSession{
UniqueKey: uk,
IsFresh: true,
@ -57,6 +56,7 @@ func NewSubSession(conn net.Conn) *SubSession {
stat: base.StatSub{
StatSession: base.StatSession{
Protocol: base.ProtocolHTTPFLV,
SessionID: uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: conn.RemoteAddr().String(),
},

@ -20,7 +20,6 @@ import (
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
var tsHTTPResponseHeader []byte
@ -43,7 +42,7 @@ type SubSession struct {
}
func NewSubSession(conn net.Conn) *SubSession {
uk := unique.GenUniqueKey("TSSUB")
uk := base.GenUniqueKey(base.UKPTSSubSession)
s := &SubSession{
UniqueKey: uk,
IsFresh: true,
@ -55,6 +54,7 @@ func NewSubSession(conn net.Conn) *SubSession {
stat: base.StatSub{
StatSession: base.StatSession{
Protocol: base.ProtocolHTTPTS,
SessionID: uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: conn.RemoteAddr().String(),
},

@ -56,6 +56,7 @@ type HTTPTSConfig struct {
type HLSConfig struct {
SubListenAddr string `json:"sub_listen_addr"`
hls.MuxerConfig
CleanupFlag bool `json:"cleanup_flag"`
}
type RTSPConfig struct {

@ -11,8 +11,12 @@ package logic
import (
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"github.com/q191201771/naza/pkg/defertaskthread"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/hevc"
@ -32,14 +36,10 @@ import (
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
// TODO chef:
// - group可以考虑搞个协程
// - 多长没有sub订阅拉流关闭pull回源
// - pull重试次数
// - sub无数据超时时间
// group的函数比较多考虑调整一下函数排列位置
type Group struct {
UniqueKey string // const after init
@ -90,7 +90,7 @@ type pushProxy struct {
}
func NewGroup(appName string, streamName string, pullEnable bool, pullURL string) *Group {
uk := unique.GenUniqueKey("GROUP")
uk := base.GenUniqueKey(base.UKPGroup)
nazalog.Infof("[%s] lifecycle new group. appName=%s, streamName=%s", uk, appName, streamName)
url2PushProxy := make(map[string]*pushProxy)
@ -151,14 +151,12 @@ func (group *Group) Tick() {
if !group.rtmpPubSession.IsAlive(checkSessionAliveIntervalSec) {
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtmpPubSession.UniqueKey)
group.rtmpPubSession.Dispose()
group.delRTMPPubSession(group.rtmpPubSession)
}
}
if group.rtspPubSession != nil {
if !group.rtspPubSession.IsAlive(checkSessionAliveIntervalSec) {
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey)
group.rtspPubSession.Dispose()
group.delRTSPPubSession(group.rtspPubSession)
}
}
if group.pullProxy.pullSession != nil {
@ -233,6 +231,10 @@ func (group *Group) Dispose() {
group.rtmpPubSession.Dispose()
group.rtmpPubSession = nil
}
if group.rtspPubSession != nil {
group.rtspPubSession.Dispose()
group.rtspPubSession = nil
}
for session := range group.rtmpSubSessionSet {
session.Dispose()
@ -249,10 +251,7 @@ func (group *Group) Dispose() {
}
group.httptsSubSessionSet = nil
if group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.hlsMuxer = nil
}
group.disposeHLSMuxer()
if config.RelayPushConfig.Enable {
for _, v := range group.url2PushProxy {
@ -368,6 +367,9 @@ func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) {
group.delHTTPFLVSubSession(session)
}
// TODO chef:
// 这里应该也要考虑触发hls muxer开启
// 也即HTTPTS sub需要使用hls muxerhls muxer开启和关闭都要考虑HTTPTS sub
func (group *Group) AddHTTPTSSubSession(session *httpts.SubSession) {
nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey)
session.WriteHTTPResponseHeader()
@ -604,6 +606,52 @@ func (group *Group) StartPull(url string) {
group.pullIfNeeded()
}
func (group *Group) IsHLSMuxerAlive() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.hlsMuxer != nil
}
func (group *Group) KickOutSession(sessionID string) bool {
group.mutex.Lock()
defer group.mutex.Unlock()
nazalog.Infof("[%s] kick out session. session id=%s", group.UniqueKey, sessionID)
if strings.HasPrefix(sessionID, base.UKPRTMPServerSession) {
if group.rtmpPubSession != nil {
group.rtmpPubSession.Dispose()
return true
}
} else if strings.HasPrefix(sessionID, base.UKPRTSPPubSession) {
if group.rtspPubSession != nil {
group.rtspPubSession.Dispose()
return true
}
} else if strings.HasPrefix(sessionID, base.UKPFLVSubSession) {
// TODO chef: 考虑数据结构改成sessionIDzuokey的map
for s := range group.httpflvSubSessionSet {
if s.UniqueKey == sessionID {
s.Dispose()
return true
}
}
} else if strings.HasPrefix(sessionID, base.UKPTSSubSession) {
for s := range group.httptsSubSessionSet {
if s.UniqueKey == sessionID {
s.Dispose()
return true
}
}
} else if strings.HasPrefix(sessionID, base.UKPRTSPSubSession) {
// TODO chef: impl me
} else {
nazalog.Errorf("[%s] kick out session while session id format invalid. %s", group.UniqueKey, sessionID)
}
return false
}
func (group *Group) delRTMPPubSession(session *rtmp.ServerSession) {
nazalog.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, session.UniqueKey)
@ -954,11 +1002,11 @@ func (group *Group) pushIfNeeded() {
}
v.isPushing = true
urlwithParam := url
urlWithParam := url
if urlParam != "" {
urlwithParam += "?" + urlParam
urlWithParam += "?" + urlParam
}
nazalog.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlwithParam)
nazalog.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam)
go func(u, u2 string) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
@ -976,7 +1024,7 @@ func (group *Group) pushIfNeeded() {
err = <-pushSession.Done()
nazalog.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRTMPPushSession(u, pushSession)
}(url, urlwithParam)
}(url, urlWithParam)
}
}
@ -1030,8 +1078,7 @@ func (group *Group) addIn() {
func (group *Group) delIn() {
if config.HLSConfig.Enable && group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.hlsMuxer = nil
group.disposeHLSMuxer()
}
if config.RelayPushConfig.Enable {
@ -1047,6 +1094,41 @@ func (group *Group) delIn() {
group.httpflvGopCache.Clear()
}
func (group *Group) disposeHLSMuxer() {
if group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
// 添加延时任务删除HLS文件
if config.HLSConfig.Enable && config.HLSConfig.CleanupFlag {
defertaskthread.Go(
config.HLSConfig.FragmentDurationMS*config.HLSConfig.FragmentNum*2,
func(param ...interface{}) {
appName := param[0].(string)
streamName := param[1].(string)
outPath := param[2].(string)
if g := sm.getGroup(appName, streamName); g != nil {
if g.IsHLSMuxerAlive() {
nazalog.Warnf("cancel cleanup hls file path since hls muxer still alive. streamName=%s", streamName)
return
}
}
nazalog.Infof("cleanup hls file path. streamName=%s, path=%s", streamName, outPath)
if err := os.RemoveAll(outPath); err != nil {
nazalog.Warnf("cleanup hls file path error. path=%s, err=%+v", outPath, err)
}
},
group.appName,
group.streamName,
group.hlsMuxer.OutPath(),
)
}
group.hlsMuxer = nil
}
}
// TODO chef: 后续看是否有更合适的方法判断
func (group *Group) isHEVC() bool {
return group.vps != nil

@ -27,6 +27,7 @@ type HTTPAPIServerObserver interface {
OnStatAllGroup() []base.StatGroup
OnStatGroup(streamName string) *base.StatGroup
OnCtrlStartPull(info base.APICtrlStartPullReq)
OnCtrlKickOutSession(info base.APICtrlKickOutSession) base.HTTPResponseBasic
}
type HTTPAPIServer struct {
@ -58,6 +59,7 @@ func (h *HTTPAPIServer) Runloop() error {
mux.HandleFunc("/api/stat/group", h.statGroupHandler)
mux.HandleFunc("/api/stat/all_group", h.statAllGroupHandler)
mux.HandleFunc("/api/ctrl/start_pull", h.ctrlStartPullHandler)
mux.HandleFunc("/api/ctrl/kick_out_session", h.ctrlKickOutSessionHandler)
var srv http.Server
srv.Handler = mux
@ -110,7 +112,6 @@ func (h *HTTPAPIServer) statLALInfoHandler(w http.ResponseWriter, req *http.Requ
func (h *HTTPAPIServer) statAllGroupHandler(w http.ResponseWriter, req *http.Request) {
gs := h.observer.OnStatAllGroup()
var v base.APIStatAllGroup
v.ErrorCode = base.ErrorCodeSucc
v.Desp = base.DespSucc
@ -146,25 +147,44 @@ func (h *HTTPAPIServer) statGroupHandler(w http.ResponseWriter, req *http.Reques
func (h *HTTPAPIServer) ctrlStartPullHandler(w http.ResponseWriter, req *http.Request) {
var v base.HTTPResponseBasic
var info base.APICtrlStartPullReq
err := nazahttp.UnmarshalRequestJsonBody(req, &info, "protocol", "addr", "app_name", "stream_name")
if err != nil {
nazalog.Warnf("http api start pull error. err=%+v", err)
v.ErrorCode = base.ErrorCodeParamMissing
v.Desp = base.DespParamMissing
feedback(v, w)
return
}
nazalog.Infof("http api start pull. req info=%+v", info)
h.observer.OnCtrlStartPull(info)
v.ErrorCode = base.ErrorCodeSucc
v.Desp = base.DespSucc
feedback(v, w)
return
}
func (h *HTTPAPIServer) ctrlKickOutSessionHandler(w http.ResponseWriter, req *http.Request) {
var v base.HTTPResponseBasic
var info base.APICtrlKickOutSession
err := nazahttp.UnmarshalRequestJsonBody(req, &info, "stream_name", "session_id")
if err != nil {
nazalog.Warnf("http api kick out session error. err=%+v", err)
v.ErrorCode = base.ErrorCodeParamMissing
v.Desp = base.DespParamMissing
feedback(v, w)
return
}
nazalog.Infof("http api kick out session. req info=%+v", info)
resp := h.observer.OnCtrlKickOutSession(info)
feedback(resp, w)
return
}
func feedback(v interface{}, w http.ResponseWriter) {
resp, _ := json.Marshal(v)
w.Header().Add("Server", base.LALHTTPAPIServer)

@ -157,7 +157,7 @@ func (sm *ServerManager) RunLoop() {
sm.iterateGroup()
if (count % 10) == 0 {
if (count % 30) == 0 {
sm.mutex.Lock()
nazalog.Debugf("group size=%d", len(sm.groupMap))
//for _, g := range sm.groupMap {
@ -199,6 +199,14 @@ func (sm *ServerManager) Dispose() {
sm.exitChan <- struct{}{}
}
func (sm *ServerManager) GetGroup(appName string, streamName string) *Group {
sm.mutex.Lock()
defer sm.mutex.Unlock()
return sm.getGroup(appName, streamName)
}
// ServerObserver of rtmp.Server
func (sm *ServerManager) OnRTMPConnect(session *rtmp.ServerSession, opa rtmp.ObjectPairArray) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
@ -451,13 +459,36 @@ func (sm *ServerManager) OnCtrlStartPull(info base.APICtrlStartPullReq) {
g.StartPull(url)
}
// HTTPAPIServerObserver
func (sm *ServerManager) OnCtrlKickOutSession(info base.APICtrlKickOutSession) base.HTTPResponseBasic {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("fake", info.StreamName)
if g == nil {
return base.HTTPResponseBasic{
ErrorCode: base.ErrorCodeGroupNotFound,
Desp: base.DespGroupNotFound,
}
}
if !g.KickOutSession(info.SessionID) {
return base.HTTPResponseBasic{
ErrorCode: base.ErrorCodeSessionNotFound,
Desp: base.DespSessionNotFound,
}
}
return base.HTTPResponseBasic{
ErrorCode: base.ErrorCodeSucc,
Desp: base.DespSucc,
}
}
func (sm *ServerManager) iterateGroup() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
for k, group := range sm.groupMap {
// 关闭空闲的group
if group.IsTotalEmpty() {
nazalog.Infof("erase empty group manager. [%s]", group.UniqueKey)
nazalog.Infof("erase empty group. [%s]", group.UniqueKey)
group.Dispose()
delete(sm.groupMap, k)
continue

@ -20,7 +20,6 @@ import (
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/connection"
log "github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
@ -82,9 +81,9 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
var uk string
switch t {
case CSTPullSession:
uk = unique.GenUniqueKey("RTMPPULL")
uk = base.GenUniqueKey(base.UKPRTMPPullSession)
case CSTPushSession:
uk = unique.GenUniqueKey("RTMPPUSH")
uk = base.GenUniqueKey(base.UKPRTMPPushSession)
}
option := defaultClientSessOption
@ -101,6 +100,7 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
chunkComposer: NewChunkComposer(),
stat: base.StatSession{
Protocol: base.ProtocolRTMP,
SessionID: uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
},
}

@ -18,7 +18,6 @@ import (
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
// TODO chef: 没有进化成Pub Sub时的超时释放
@ -74,13 +73,14 @@ type ServerSession struct {
}
func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession {
uk := unique.GenUniqueKey("RTMPPUBSUB")
uk := base.GenUniqueKey(base.UKPRTMPServerSession)
s := &ServerSession{
conn: connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = readBufSize
}),
stat: base.StatSession{
Protocol: base.ProtocolRTMP,
SessionID: uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: conn.RemoteAddr().String(),
},

@ -23,8 +23,6 @@ import (
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/unique"
"github.com/q191201771/naza/pkg/nazalog"
)
@ -77,13 +75,14 @@ type PubSession struct {
}
func NewPubSession(streamName string) *PubSession {
uk := unique.GenUniqueKey("RTSPPUB")
uk := base.GenUniqueKey(base.UKPRTSPPubSession)
ps := &PubSession{
UniqueKey: uk,
StreamName: streamName,
stat: base.StatPub{
StatSession: base.StatSession{
Protocol: base.ProtocolRTSP,
SessionID: uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
},
},
@ -180,6 +179,7 @@ func (p *PubSession) Setup(uri string, rtpConn, rtcpConn *nazanet.UDPConnection)
return nil
}
// TODO chef: dispose后回调上层
func (p *PubSession) Dispose() {
nazalog.Infof("[%s] lifecycle dispose rtsp PubSession.", p.UniqueKey)

@ -16,7 +16,6 @@ import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazanet"
"github.com/q191201771/naza/pkg/unique"
)
// to be continued
@ -33,7 +32,7 @@ type SubSession struct {
}
func NewSubSession(streamName string) *SubSession {
uk := unique.GenUniqueKey("RTSPSUB")
uk := base.GenUniqueKey(base.UKPRTSPSubSession)
ss := &SubSession{
UniqueKey: uk,
StreamName: streamName,

Loading…
Cancel
Save