commit messages:

- `/app/demo/dispatch`处理`on_update`事件回调
- HTTP Notify增加新事件回调`/on_server_start`
- HTTP API `/api/stat/lal_info` 中增加`server_id`字段
- 增加`gen_tag.sh`,用于打tag
pull/39/head
q191201771 4 years ago
parent 08287e7b8b
commit 34bd5cf004

@ -0,0 +1,30 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
// 服务启动前,设置好一些配置
type Config struct {
// 本服务HTTP监听端口用于接收各lal节点的HTTP Notify
ListenAddr string
// 配置向本服务汇报的节点信息
ServerID2Server map[string]Server
// 级联拉流时携带该URL参数使得我们可以区分是级联拉流还是用户拉流
PullSecretParam string
// 检测lal节点update报活的超时时间
ServerTimeoutSec int
}
// lal节点静态配置信息
type Server struct {
RTMPAddr string // 可用于级联拉流的RTMP地址
APIAddr string // HTTP API接口地址
}

@ -10,43 +10,118 @@ package datamanager
import (
"sync"
"time"
"github.com/q191201771/naza/pkg/nazalog"
)
type DataManagerMemory struct {
mutex sync.Mutex
pubStream2ServerID map[string]string
serverTimeoutSec int
mutex sync.Mutex
serverID2pubStreams map[string]map[string]struct{}
serverID2AliveTS map[string]int64
}
func NewDataManagerMemory() *DataManagerMemory {
return &DataManagerMemory{
pubStream2ServerID: make(map[string]string),
func NewDataManagerMemory(serverTimeoutSec int) *DataManagerMemory {
d := &DataManagerMemory{
serverTimeoutSec: serverTimeoutSec,
serverID2pubStreams: make(map[string]map[string]struct{}),
serverID2AliveTS: make(map[string]int64),
}
// TODO chef: release goroutine
go func() {
var count int
for {
time.Sleep(1 * time.Second)
count++
now := time.Now().Unix()
d.mutex.Lock()
// 清除长时间没有update报活的节点
for serverID, ts := range d.serverID2AliveTS {
if now > ts && now-ts > int64(d.serverTimeoutSec)*1000 {
nazalog.Warnf("server timeout. serverID=%s", serverID)
delete(d.serverID2pubStreams, serverID)
}
}
// 定时打印数据日志
if count%60 == 0 {
nazalog.Infof("data info. %+v", d.serverID2pubStreams)
}
d.mutex.Unlock()
}
}()
return d
}
func (d *DataManagerMemory) AddPub(streamName, serverID string) {
d.mutex.Lock()
defer d.mutex.Unlock()
nazalog.Infof("add pub. streamName=%s, serverID=%s", streamName, serverID)
d.pubStream2ServerID[streamName] = serverID
pss, _ := d.serverID2pubStreams[serverID]
if pss == nil {
pss = make(map[string]struct{})
}
pss[streamName] = struct{}{}
}
func (d *DataManagerMemory) DelPub(streamName, serverID string) {
d.mutex.Lock()
defer d.mutex.Unlock()
// 清除用户推流对应的节点信息
cacheServerID, exist := d.pubStream2ServerID[streamName]
if !exist || serverID != cacheServerID {
nazalog.Errorf("del pub but server id dismatch. streamName=%s, serverID=%s, cache id=%s", streamName, serverID, cacheServerID)
actualServerID, _ := d.queryPub(streamName)
if actualServerID != serverID {
return
}
delete(d.pubStream2ServerID, streamName)
delete(d.serverID2pubStreams[serverID], streamName)
}
func (d *DataManagerMemory) QueryPub(streamName string) (serverID string, exist bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
serverID, exist = d.pubStream2ServerID[streamName]
return
return d.queryPub(streamName)
}
func (d *DataManagerMemory) UpdatePub(serverID string, streamNameList []string) {
// 3. server超时去掉所有上面所有的pub
d.mutex.Lock()
defer d.mutex.Unlock()
d.markAlive(serverID)
// 更新serverID对应的stream列表
pss := make(map[string]struct{})
for _, s := range streamNameList {
pss[s] = struct{}{}
}
cpss := d.serverID2pubStreams[serverID]
d.serverID2pubStreams[serverID] = pss
// only for log
for s := range pss {
if _, exist := cpss[s]; !exist {
nazalog.Warnf("update pub, add. serverID=%s, streamName=%s", serverID, s)
}
}
for s := range cpss {
if _, exist := pss[s]; !exist {
nazalog.Warnf("update pub, del. serverID=%s, streamName=%s", serverID, s)
}
}
}
func (d *DataManagerMemory) queryPub(streamName string) (string, bool) {
for serverID, pss := range d.serverID2pubStreams {
if _, exist := pss[streamName]; exist {
return serverID, true
}
}
return "", false
}
func (d *DataManagerMemory) markAlive(serverID string) {
d.serverID2AliveTS[serverID] = time.Now().Unix()
}

@ -8,10 +8,17 @@
package datamanager
// 本demo的数据存储在内存中只实现了DataManagerMemory所以存在单点风险指的是dispatch永久性发生故障短暂故障或重启是ok的
// 生产环境可以把数据存储在redis、mysql等数据库中实现DataManager interface即可
type DataManger interface {
AddPub(streamName, serverID string)
DelPub(streamName, serverID string)
QueryPub(streamName string) (serverID string, exist bool)
// 1. 全量校正。比如自身服务重启了lal节点重启了或其他原因Add、Del消息丢失了
// 2. 心跳保活
UpdatePub(serverID string, streamNameList []string)
}
type DataManagerType int
@ -20,10 +27,11 @@ const (
DMTMemory DataManagerType = iota
)
func NewDataManager(t DataManagerType) DataManger {
// @param serverTimeoutSec 超过该时间间隔没有Update则清空对应节点的所有信息
func NewDataManager(t DataManagerType, serverTimeoutSec int) DataManger {
switch t {
case DMTMemory:
return NewDataManagerMemory()
return NewDataManagerMemory(serverTimeoutSec)
default:
panic("invalid data manager type")
}

@ -22,41 +22,32 @@ import (
"github.com/q191201771/naza/pkg/unique"
)
//
// 结合lalserver的HTTP Notify事件通知以及HTTP API接口
// 简单演示如何试验一个简单的调度服务,
// 简单演示如何实现一个简单的调度服务,
// 使得多个lalserver节点可以组成一个集群
// 集群内的所有节点功能都是相同的,
// 你可以将流推送至任意一个节点,并从任意一个节点拉流,
// 同一路流,推流和拉流可以在不同的节点。
//
// 本demo的数据存储在内存中所以存在单点风险
// 生产环境可以把数据存储在redis、mysql等数据库中
// 多个调度节点从数据库中读写数据。
type Server struct {
rtmpAddr string
apiAddr string
}
// config
var (
listenAddr = ":10101"
serverID2Server = map[string]Server{
var config = Config{
ListenAddr: ":10101",
ServerID2Server: map[string]Server{
"1": {
rtmpAddr: "127.0.0.1:19350",
apiAddr: "127.0.0.1:8083",
RTMPAddr: "127.0.0.1:19350",
APIAddr: "127.0.0.1:8083",
},
"2": {
rtmpAddr: "127.0.0.1:19550",
apiAddr: "127.0.0.1:8283",
RTMPAddr: "127.0.0.1:19550",
APIAddr: "127.0.0.1:8283",
},
}
pullSecretParam = "lal_cluster_inner_pull=1"
)
},
PullSecretParam: "lal_cluster_inner_pull=1",
ServerTimeoutSec: 30,
}
var (
dataManager datamanager.DataManger
)
var dataManager datamanager.DataManger
func OnPubStartHandler(w http.ResponseWriter, r *http.Request) {
id := unique.GenUniqueKey("ReqID")
@ -70,24 +61,31 @@ func OnPubStartHandler(w http.ResponseWriter, r *http.Request) {
// 演示如何踢掉session服务于鉴权失败等场景
//if info.URLParam == "" {
if info.SessionID == "RTMPPUBSUB1" {
reqServer, exist := serverID2Server[info.ServerID]
if !exist {
nazalog.Errorf("[%s] req server id invalid.", id)
return
}
url := fmt.Sprintf("http://%s/api/ctrl/kick_out_session", reqServer.apiAddr)
var b base.APICtrlKickOutSession
b.StreamName = info.StreamName
b.SessionID = info.SessionID
nazalog.Infof("[%s] ctrl kick out session. send to %s with %+v", id, reqServer.apiAddr, b)
if _, err := nazahttp.PostJson(url, b, nil); err != nil {
nazalog.Errorf("[%s] post json error. err=%+v", id, err)
}
//if info.SessionID == "RTMPPUBSUB1" {
// reqServer, exist := config.ServerID2Server[info.ServerID]
// if !exist {
// nazalog.Errorf("[%s] req server id invalid.", id)
// return
// }
// url := fmt.Sprintf("http://%s/api/ctrl/kick_out_session", reqServer.APIAddr)
// var b base.APICtrlKickOutSession
// b.StreamName = info.StreamName
// b.SessionID = info.SessionID
//
// nazalog.Infof("[%s] ctrl kick out session. send to %s with %+v", id, reqServer.APIAddr, b)
// if _, err := nazahttp.PostJson(url, b, nil); err != nil {
// nazalog.Errorf("[%s] post json error. err=%+v", id, err)
// }
// return
//}
if _, exist := config.ServerID2Server[info.ServerID]; !exist {
nazalog.Errorf("server id has not config. serverID=%s", info.ServerID)
return
}
dataManager.AddPub(info.StreamName, info.SessionID)
nazalog.Infof("add pub. streamName=%s, serverID=%s", info.StreamName, info.ServerID)
dataManager.AddPub(info.StreamName, info.ServerID)
}
func OnPubStopHandler(w http.ResponseWriter, r *http.Request) {
@ -100,6 +98,12 @@ func OnPubStopHandler(w http.ResponseWriter, r *http.Request) {
}
nazalog.Infof("[%s] on_pub_stop. info=%+v", id, info)
if _, exist := config.ServerID2Server[info.ServerID]; !exist {
nazalog.Errorf("server id has not config. serverID=%s", info.ServerID)
return
}
nazalog.Infof("del pub. streamName=%s, serverID=%s", info.StreamName, info.ServerID)
dataManager.DelPub(info.StreamName, info.ServerID)
}
@ -115,18 +119,18 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) {
// sub拉流时判断是否需要触发pull级联拉流
// 1. 是内部级联拉流,不需要触发
if strings.Contains(info.URLParam, pullSecretParam) {
if strings.Contains(info.URLParam, config.PullSecretParam) {
nazalog.Infof("[%s] sub is pull by other node, ignore.", id)
return
}
// 2. 已经存在输入流,不需要触发
// 2. 汇报的节点已经存在输入流,不需要触发
if info.HasInSession {
nazalog.Infof("[%s] in not empty, ignore.", id)
return
}
// 3. 非法节点,本服务没有配置这个节点
reqServer, exist := serverID2Server[info.ServerID]
// 3. 非法节点,本服务没有配置汇报的节点
reqServer, exist := config.ServerID2Server[info.ServerID]
if !exist {
nazalog.Errorf("[%s] req server id invalid.", id)
return
@ -139,23 +143,19 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) {
return
}
// TODO chef: 5. 这里的容错是否会出现?是否可以去掉?
pubServer, exist := serverID2Server[pubServerID]
if !exist {
nazalog.Errorf("[%s] pub server id invalid. serverID=%s", id, pubServerID)
return
}
pubServer, exist := config.ServerID2Server[pubServerID]
nazalog.Assert(true, exist)
// 向pub所在节点发送pull级联拉流的命令
url := fmt.Sprintf("http://%s/api/ctrl/start_pull", reqServer.apiAddr)
// 向汇报节点发送pull级联拉流的命令其中包含pub所在节点信息
url := fmt.Sprintf("http://%s/api/ctrl/start_pull", reqServer.APIAddr)
var b base.APICtrlStartPullReq
b.Protocol = base.ProtocolRTMP
b.Addr = pubServer.rtmpAddr
b.Addr = pubServer.RTMPAddr
b.AppName = info.AppName
b.StreamName = info.StreamName
b.URLParam = pullSecretParam
b.URLParam = config.PullSecretParam
nazalog.Infof("[%s] ctrl pull. send to %s with %+v", id, reqServer.apiAddr, b)
nazalog.Infof("[%s] ctrl pull. send to %s with %+v", id, reqServer.APIAddr, b)
if _, err := nazahttp.PostJson(url, b, nil); err != nil {
nazalog.Errorf("[%s] post json error. err=%+v", id, err)
}
@ -170,9 +170,6 @@ func OnSubStopHandler(w http.ResponseWriter, r *http.Request) {
return
}
nazalog.Infof("[%s] on_sub_stop. info=%+v", id, info)
// 没什么好做的
// 目前lalserver在sub为空时内部会主动关闭pull
}
func OnUpdateHandler(w http.ResponseWriter, r *http.Request) {
@ -185,9 +182,14 @@ func OnUpdateHandler(w http.ResponseWriter, r *http.Request) {
}
nazalog.Infof("[%s] on_update. info=%+v", id, info)
// TODO chef:
// 1. 更新pubStream2ServerID去掉过期的增加不存在的
// 2. 没有pub但是有sub的触发ctrl pull
var streamNameList []string
for _, g := range info.Groups {
// pub exist
if g.StatPub.SessionID != "" {
streamNameList = append(streamNameList, g.StreamName)
}
}
dataManager.UpdatePub(info.ServerID, streamNameList)
}
func logHandler(w http.ResponseWriter, r *http.Request) {
@ -196,9 +198,9 @@ func logHandler(w http.ResponseWriter, r *http.Request) {
}
func main() {
dataManager = datamanager.NewDataManager(datamanager.DMTMemory)
dataManager = datamanager.NewDataManager(datamanager.DMTMemory, config.ServerTimeoutSec)
l, err := net.Listen("tcp", listenAddr)
l, err := net.Listen("tcp", config.ListenAddr)
nazalog.Assert(nil, err)
m := http.NewServeMux()
@ -208,6 +210,7 @@ func main() {
m.HandleFunc("/on_sub_stop", OnSubStopHandler)
m.HandleFunc("/on_update", OnUpdateHandler)
m.HandleFunc("/on_rtmp_connect", logHandler)
m.HandleFunc("/on_server_start", logHandler)
srv := http.Server{
Handler: m,

@ -18,11 +18,6 @@ GitStatus=`git status -s`
BuildTime=`date +'%Y.%m.%d.%H%M%S'`
BuildGoVersion=`go version`
# 如果读取到git信息最新tag是v开头则修改代码 pkg/base/version.go 中的版本信息
if [[ ${GitTag} == v* ]]; then
gsed -i "/^var LALVersion/cvar LALVersion = \"${GitTag}\"" pkg/base/version.go
fi
LDFlags=" \
-X 'github.com/q191201771/naza/pkg/bininfo.GitTag=${GitTag}' \
-X 'github.com/q191201771/naza/pkg/bininfo.GitCommitLog=${GitCommitLog}' \

@ -46,6 +46,7 @@
"http_notify": {
"enable": true,
"update_interval_sec": 5,
"on_server_start": "http://127.0.0.1:10101/on_server_start",
"on_update": "http://127.0.0.1:10101/on_update",
"on_pub_start": "http://127.0.0.1:10101/on_pub_start",
"on_pub_stop": "http://127.0.0.1:10101/on_pub_stop",

@ -45,9 +45,10 @@
},
"server_id": "1", // 当前lalserver唯一ID。多个lalserver HTTP Notify同一个地址时可通过该ID区分
"http_notify": {
"enable": true, // 是否开启HTTP Notify事件回调
"update_interval_sec": 5, // update事件回调间隔单位毫秒
"on_update": "http://127.0.0.1:10101/on_update", // 各事件HTTP Notify事件回调地址
"enable": true, // 是否开启HTTP Notify事件回调
"update_interval_sec": 5, // update事件回调间隔单位毫秒
"on_server_start": "http://127.0.0.1:10101/on_server_start", // 各事件HTTP Notify事件回调地址
"on_update": "http://127.0.0.1:10101/on_update",
"on_pub_start": "http://127.0.0.1:10101/on_pub_start",
"on_pub_stop": "http://127.0.0.1:10101/on_pub_stop",
"on_sub_start": "http://127.0.0.1:10101/on_sub_start",

@ -34,6 +34,7 @@
"http_notify": {
"enable": true,
"update_interval_sec": 5,
"on_server_start": "http://127.0.0.1:10101/on_server_start",
"on_update": "http://127.0.0.1:10101/on_update",
"on_pub_start": "http://127.0.0.1:10101/on_pub_start",
"on_pub_stop": "http://127.0.0.1:10101/on_pub_stop",

@ -20,9 +20,9 @@ echo ${v} >> ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/VERSION.txt
echo ${v} >> ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/VERSION.txt
echo ${v} >> ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/VERSION.txt
cp conf/lalserver.conf.json conf/edge.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/conf
cp conf/lalserver.conf.json conf/edge.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/conf
cp conf/lalserver.conf.json conf/edge.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/conf
cp conf/lalserver.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/conf
cp conf/lalserver.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/conf
cp conf/lalserver.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/conf
GitTag=`git tag --sort=version:refname | tail -n 1`
GitCommitLog=`git log --pretty=oneline -n 1`

@ -0,0 +1,43 @@
#!/usr/bin/env bash
# 根据CHANGELOG.md中的最新版本号决定是否更新version.go和以及打git tag
#
# 步骤:
# 1. 提交所有代码
# 2. 修改CHANGELOG.md
# 3. 执行gen_tag.sh
#set -x
# CHANGELOG.md中的版本号
NewVersion=`cat CHANGELOG.md| grep '#### v' | head -n 1 | awk '{print $2}'`
echo 'newest version in CHANGELOG.md: ' $NewVersion
# git tag中的版本号
GitTag=`git tag --sort=version:refname | tail -n 1`
echo "newest version in git tag: " $GitTag
# 源码中的版本号
FileVersion=`cat pkg/base/version.go | grep 'const LALVersion' | awk -F\" '{print $2}'`
echo "newest version in version.go: " $FileVersion
# CHANGELOG.md和源码中的不一致更新源码并提交修改
if [ "$NewVersion" == "$FileVersion" ];then
echo 'same tag, noop.'
else
echo 'update version.go'
#gsed -i "/^var LALVersion/cvar LALVersion = \"${NewVersion}\"" pkg/base/version.go
#git add pkg/base/version.go
#git commit -m '${NewVersion} -> version.go'
#git push
fi
# CHANGELOG.md和git tag不一致打新的tag
if [ "$NewVersion" == "$FileVersion" ];then
echo 'same tag, noop.'
else
echo 'add tag.'
#git tag ${NewVersion}
#git push --tags
fi

@ -10,7 +10,7 @@ package base
type AVPacketPT int
var (
const (
AVPacketPTAVC AVPacketPT = RTPPacketTypeAVCOrHEVC
AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC
AVPacketPTHEVC AVPacketPT = 98

@ -10,7 +10,7 @@ package base
// 文档见: https://pengrl.com/p/20100/
const HTTPAPIVersion = "v0.1.1"
const HTTPAPIVersion = "v0.1.2"
const (
ErrorCodeSucc = 0
@ -28,15 +28,18 @@ type HTTPResponseBasic struct {
Desp string `json:"desp"`
}
type LALInfo struct {
ServerID string `json:"server_id"`
BinInfo string `json:"bin_info"`
LalVersion string `json:"lal_version"`
APIVersion string `json:"api_version"`
NotifyVersion string `json:"notify_version"`
StartTime string `json:"start_time"`
}
type APIStatLALInfo struct {
HTTPResponseBasic
Data struct {
BinInfo string `json:"bin_info"`
LalVersion string `json:"lal_version"`
APIVersion string `json:"api_version"`
NotifyVersion string `json:"notify_version"`
StartTime string `json:"start_time"`
} `json:"data"`
Data LALInfo `json:"data"`
}
type APIStatAllGroup struct {

@ -10,7 +10,7 @@ package base
// 文档见: https://pengrl.com/p/20101/
const HTTPNotifyVersion = "v0.0.3"
const HTTPNotifyVersion = "v0.0.4"
type SessionEventCommonInfo struct {
ServerID string `json:"server_id"`

@ -0,0 +1,37 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
//
// | . | rtmp pub | rtsp pub | rtmp sub | flv sub | ts sub | rtsp sub | rtmp push | rtmp pull | flv pull |
// | - | - | - | - | - | - | - | - | - | - |
// | file | server_session.go | server_pub_session.go | server_session.go | server_sub_session.go | server_sub_session.go | server_sub_session.go | client_push_session.go | client_pull_session.go | client_pull_session.go |
// | struct | ServerSession | PubSession | ServerSession | SubSession | SubSession | SubSession | PushSession | PullSession | PullSession |
//
//
//
// | . | all | rtmppub | rtsppub | rtmpsub | flvsub | tssub | rtspsub | rtmppush | rtmppull | flvpull |
// | - | - | - | - | - | - | - | - | - | - | - |
// | UniqueKey | √ | √ | √ | √ | √ | √ | √ | √ | √ | √ |
// | StreamName | x | √ | √ | √ | √ | √ | √ | √ | √ | x |
// | RunLoop() | x | √ | x | √ | √ | √ | x | x | x | x |
// | Dispose() | x | √ | √ | √ | √ | √ | x | √ | √ | √ |
// | GetStat() | x | √ | √ | √ | √ | √ | x | x | √ | x |
// | UpdateStat() | x | √ | √ | √ | √ | √ | x | x | √ | x |
// | IsAlive() | x | √ | √ | √ | √ | √ | x | x | √ | x |
// | SingleConn | x | √ | x | √ | √ | √ | √ | √ | √ | √ |
// | RemoteAddr() | x | √ | x | √ | √ | x | x | x | x | x |
//
// Dispose由外部调用表示主动关闭正常的session
// 外部调用Dispose后不应继续使用该session
// Dispose后RunLoop结束阻塞
//
// 对端关闭或session内部关闭也会导致RunLoop结束阻塞
//
// RunLoop结束阻塞后可通知上层告知session生命周期结束

@ -15,8 +15,8 @@ import "strings"
// 另外,我们也在本文件提供另外一些信息
// 并且将这些信息打入可执行文件、日志、各协议中的标准版本字段中
// 版本,该变量由build脚本修改维护
var LALVersion = "v0.16.0"
// 版本,该变量由外部脚本修改维护
const LALVersion = "v0.16.0"
var (
LALLibraryName = "lal"

@ -25,13 +25,13 @@ import (
var tsHTTPResponseHeader []byte
type SubSession struct {
UniqueKey string
StartTick int64
UniqueKey string
StreamName string
AppName string
URI string
Headers map[string]string
StartTick int64
AppName string
URI string
Headers map[string]string
IsFresh bool

@ -19,7 +19,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
//var ErrMissKeyField = errors.New("missing key field in config file")
const ConfigVersion = "0.0.1"
type Config struct {
RTMPConfig RTMPConfig `json:"rtmp"`
@ -82,6 +82,7 @@ type HTTPAPIConfig struct {
type HTTPNotifyConfig struct {
Enable bool `json:"enable"`
UpdateIntervalSec int `json:"update_interval_sec"`
OnServerStart string `json:"on_server_start"`
OnUpdate string `json:"on_update"`
OnPubStart string `json:"on_pub_start"`
OnPubStop string `json:"on_pub_stop"`

@ -38,9 +38,6 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// TODO chef:
// group的函数比较多考虑调整一下函数排列位置
type Group struct {
UniqueKey string // const after init
appName string // const after init
@ -67,7 +64,7 @@ type Group struct {
url2PushProxy map[string]*pushProxy
//
hlsMuxer *hls.Muxer
//
// rtmp pub/pull使用
gopCache *GOPCache
httpflvGopCache *GOPCache
// rtsp pub使用
@ -477,7 +474,7 @@ func (group *Group) OnRTPPacket(pkt rtprtcp.RTPPacket) {
defer group.mutex.Unlock()
for s := range group.rtspSubSessionSet {
s.WriteRawRTPPacket(pkt.Raw)
s.WriteRTPPacket(pkt)
}
}
@ -809,7 +806,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
for session := range group.rtmpSubSessionSet {
// ## 3.1. 如果是新的 sub session发送已缓存的信息
if session.IsFresh {
// TODO 头信息和full gop也可以在SubSession刚加入时发送
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
if group.gopCache.Metadata != nil {
_ = session.AsyncWrite(group.gopCache.Metadata)
}

@ -107,6 +107,7 @@ func (h *HTTPAPIServer) statLALInfoHandler(w http.ResponseWriter, req *http.Requ
v.Data.APIVersion = base.HTTPAPIVersion
v.Data.NotifyVersion = base.HTTPNotifyVersion
v.Data.StartTime = serverStartTime
v.Data.ServerID = config.ServerID
feedback(v, w)
}

@ -12,6 +12,8 @@ import (
"net/http"
"time"
"github.com/q191201771/naza/pkg/bininfo"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog"
@ -34,6 +36,17 @@ type HTTPNotify struct {
var httpNotify *HTTPNotify
func (h *HTTPNotify) OnServerStart() {
var info base.LALInfo
info.BinInfo = bininfo.StringifySingleLine()
info.LalVersion = base.LALVersion
info.APIVersion = base.HTTPAPIVersion
info.NotifyVersion = base.HTTPNotifyVersion
info.StartTime = serverStartTime
info.ServerID = config.ServerID
h.asyncPost(config.HTTPNotifyConfig.OnServerStart, info)
}
func (h *HTTPNotify) OnUpdate(info base.UpdateInfo) {
h.asyncPost(config.HTTPNotifyConfig.OnUpdate, info)
}

@ -67,6 +67,8 @@ func NewServerManager() *ServerManager {
}
func (sm *ServerManager) RunLoop() {
httpNotify.OnServerStart()
if sm.rtmpServer != nil {
if err := sm.rtmpServer.Listen(); err != nil {
nazalog.Error(err)
@ -372,6 +374,9 @@ func (sm *ServerManager) OnNewHTTPTSSubSession(session *httpts.SubSession) bool
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName, session.StreamName)
group.AddHTTPTSSubSession(session)
// TODO chef: 部分session没有Notify
return true
}

@ -73,7 +73,6 @@ func (s *PullSession) GetStat() base.StatSession {
return s.core.GetStat()
}
// TODO chef: 默认每5秒调用一次
func (s *PullSession) UpdateStat(interval uint32) {
s.core.UpdateStat(interval)
}

@ -191,7 +191,6 @@ func (s *ClientSession) GetStat() base.StatSession {
return s.stat
}
// TODO chef: 默认每5秒调用一次
func (s *ClientSession) UpdateStat(interval uint32) {
currStat := s.conn.GetStat()
var diffStat connection.Stat

@ -56,7 +56,6 @@ func init() {
// ffmpeg -re -stream_loop -1 -i /Volumes/Data/tmp/wontcry.flv -acodec copy -vcodec copy -f rtsp rtsp://localhost:5544/live/test110
// read http request. method=OPTIONS, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:1 User-Agent:Lavf57.83.100], body= - server.go:95
//
// read http request. method=ANNOUNCE, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:2 Content-Length:490 Content-Type:application/sdp User-Agent:Lavf57.83.100], body=v=0
// o=- 0 0 IN IP4 127.0.0.1
// s=No Name
@ -73,13 +72,9 @@ func init() {
// a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=121056E500
// a=control:streamid=1
// - server.go:95
//
// read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=0, headers=map[CSeq:3 Transport:RTP/AVP/UDP;unicast;client_port=32182-32183;mode=record User-Agent:Lavf57.83.100], body= - server.go:95
//
// read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=1, headers=map[CSeq:4 Session:191201771 Transport:RTP/AVP/UDP;unicast;client_port=32184-32185;mode=record User-Agent:Lavf57.83.100], body= - server.go:95
//
// read http request. method=RECORD, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:5 Range:npt=0.000- Session:191201771 User-Agent:Lavf57.83.100], body= - server.go:95
//
// read http request. method=TEARDOWN, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:6 Session:191201771 User-Agent:Lavf57.83.100], body= - server.go:95
// read udp packet failed. err=read udp [::]:8002: use of closed network connection - server_pub_session.go:199

@ -123,14 +123,14 @@ Loop:
break Loop
}
sdpCtx, err := sdp.ParseSDP(body)
sdpLogicCtx, err := sdp.ParseSDP2LogicContext(body)
if err != nil {
nazalog.Errorf("parse sdp failed. err=%v", err)
break Loop
}
pubSession = NewPubSession(presentation)
pubSession.InitWithSDP(body, sdpCtx)
pubSession.InitWithSDP(body, sdpLogicCtx)
s.m.Lock()
s.presentation2PubSession[presentation] = pubSession
@ -185,8 +185,10 @@ Loop:
break Loop
}
} else if subSession != nil {
subSession.SetRTPConn(rtpConn)
subSession.SetRTCPConn(rtcpConn)
if err = subSession.Setup(uri, rtpConn, rtcpConn); err != nil {
nazalog.Errorf("SETUP failed. err=%+v", err)
break Loop
}
} else {
nazalog.Error("SETUP while session not exist.")
break Loop
@ -233,7 +235,8 @@ Loop:
pubSession, ok := s.presentation2PubSession[presentation]
s.m.Unlock()
if ok {
resp := PackResponseDescribe(headers[HeaderFieldCSeq], string(pubSession.rawSDP))
rawSDP, _ := pubSession.GetSDP()
resp := PackResponseDescribe(headers[HeaderFieldCSeq], string(rawSDP))
_, _ = conn.Write([]byte(resp))
} else {
nazalog.Errorf("rtsp sub but pub not exist. presentation=%s", presentation)

@ -26,7 +26,9 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// PubSession会同时向上层回调rtp packet以及rtp合并后的av packet
// PubSession会向上层回调两种格式的数据
// 1. 原始的rtp packet
// 2. rtp合并后的av packet
type PubSessionObserver interface {
OnRTPPacket(pkt rtprtcp.RTPPacket)
@ -39,21 +41,18 @@ type PubSessionObserver interface {
}
type PubSession struct {
UniqueKey string
StreamName string // presentation
UniqueKey string
StreamName string // presentation
observer PubSessionObserver
avPacketQueue *AVPacketQueue
audioUnpacker *rtprtcp.RTPUnpacker
videoUnpacker *rtprtcp.RTPUnpacker
audioRRProducer *rtprtcp.RRProducer
videoRRProducer *rtprtcp.RRProducer
audioSsrc uint32
videoSsrc uint32
audioPayloadType base.AVPacketPT
videoPayloadType base.AVPacketPT
audioAControl string
videoAControl string
audioUnpacker *rtprtcp.RTPUnpacker
videoUnpacker *rtprtcp.RTPUnpacker
audioRRProducer *rtprtcp.RRProducer
videoRRProducer *rtprtcp.RRProducer
audioSsrc uint32
videoSsrc uint32
audioRTPConn *nazanet.UDPConnection
videoRTPConn *nazanet.UDPConnection
@ -65,13 +64,9 @@ type PubSession struct {
staleStat *connection.Stat
stat base.StatPub
vps []byte // 如果是H265的话
sps []byte
pps []byte
asc []byte
m sync.Mutex
rawSDP []byte
m sync.Mutex
rawSDP []byte // const after set
sdpLogicCtx sdp.LogicContext // const after set
}
func NewPubSession(streamName string) *PubSession {
@ -94,79 +89,31 @@ func NewPubSession(streamName string) *PubSession {
func (p *PubSession) SetObserver(observer PubSessionObserver) {
p.observer = observer
p.observer.OnAVConfig(p.asc, p.vps, p.sps, p.pps)
p.observer.OnAVConfig(p.sdpLogicCtx.ASC, p.sdpLogicCtx.VPS, p.sdpLogicCtx.SPS, p.sdpLogicCtx.PPS)
}
func (p *PubSession) InitWithSDP(rawSDP []byte, sdpCtx sdp.SDPContext) {
func (p *PubSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) {
p.m.Lock()
p.rawSDP = rawSDP
p.sdpLogicCtx = sdpLogicCtx
p.m.Unlock()
var err error
var audioClockRate int
var videoClockRate int
for i, item := range sdpCtx.ARTPMapList {
switch item.PayloadType {
case base.RTPPacketTypeAVCOrHEVC:
videoClockRate = item.ClockRate
if item.EncodingName == "H265" {
p.videoPayloadType = base.AVPacketPTHEVC
} else {
p.videoPayloadType = base.AVPacketPTAVC
}
if i < len(sdpCtx.AControlList) {
p.videoAControl = sdpCtx.AControlList[i].Value
}
case base.RTPPacketTypeAAC:
audioClockRate = item.ClockRate
p.audioPayloadType = base.AVPacketPTAAC
if i < len(sdpCtx.AControlList) {
p.audioAControl = sdpCtx.AControlList[i].Value
}
default:
nazalog.Errorf("unknown payloadType. type=%d", item.PayloadType)
}
}
for _, item := range sdpCtx.AFmtPBaseList {
switch item.Format {
case base.RTPPacketTypeAVCOrHEVC:
if p.videoPayloadType == base.AVPacketPTHEVC {
p.vps, p.sps, p.pps, err = sdp.ParseVPSSPSPPS(item)
} else {
p.sps, p.pps, err = sdp.ParseSPSPPS(item)
}
if err != nil {
nazalog.Errorf("parse sps pps from sdp failed.")
}
case base.RTPPacketTypeAAC:
p.asc, err = sdp.ParseASC(item)
if err != nil {
nazalog.Errorf("parse asc from sdp failed.")
}
default:
nazalog.Errorf("unknown format. fmt=%d", item.Format)
}
}
p.audioUnpacker = rtprtcp.NewRTPUnpacker(p.audioPayloadType, audioClockRate, unpackerItemMaxSize, p.onAVPacketUnpacked)
p.videoUnpacker = rtprtcp.NewRTPUnpacker(p.videoPayloadType, videoClockRate, unpackerItemMaxSize, p.onAVPacketUnpacked)
p.audioUnpacker = rtprtcp.NewRTPUnpacker(p.sdpLogicCtx.AudioPayloadType, p.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, p.onAVPacketUnpacked)
p.videoUnpacker = rtprtcp.NewRTPUnpacker(p.sdpLogicCtx.VideoPayloadType, p.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, p.onAVPacketUnpacked)
p.audioRRProducer = rtprtcp.NewRRProducer(audioClockRate)
p.videoRRProducer = rtprtcp.NewRRProducer(videoClockRate)
p.audioRRProducer = rtprtcp.NewRRProducer(p.sdpLogicCtx.AudioClockRate)
p.videoRRProducer = rtprtcp.NewRRProducer(p.sdpLogicCtx.VideoClockRate)
if p.audioPayloadType != 0 && p.videoPayloadType != 0 {
if p.sdpLogicCtx.AudioPayloadType != 0 && p.sdpLogicCtx.VideoPayloadType != 0 {
p.avPacketQueue = NewAVPacketQueue(p.onAVPacket)
}
}
func (p *PubSession) Setup(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error {
if strings.HasSuffix(uri, p.audioAControl) {
if strings.HasSuffix(uri, p.sdpLogicCtx.AudioAControl) {
p.audioRTPConn = rtpConn
p.audioRTCPConn = rtcpConn
} else if strings.HasSuffix(uri, p.videoAControl) {
} else if strings.HasSuffix(uri, p.sdpLogicCtx.VideoAControl) {
p.videoRTPConn = rtpConn
p.videoRTCPConn = rtcpConn
} else {
@ -228,10 +175,10 @@ func (p *PubSession) IsAlive(interval uint32) (ret bool) {
return ret
}
func (p *PubSession) GetSDP() []byte {
func (p *PubSession) GetSDP() ([]byte, sdp.LogicContext) {
p.m.Lock()
defer p.m.Unlock()
return p.rawSDP
return p.rawSDP, p.sdpLogicCtx
}
// callback by UDPConnection

@ -11,24 +11,32 @@ package rtsp
import (
"encoding/hex"
"net"
"strings"
"time"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazanet"
)
// to be continued
// 注意音频和视频是不同的UDP连接
// pub和sub挂载转发时需要对应上
// TODO chef: 主动发送SR
type SubSession struct {
UniqueKey string
StreamName string
UniqueKey string // const after ctor
StreamName string // const after ctor
rawSDP []byte // const after set
sdpLogicCtx sdp.LogicContext // const after set
rtpConn *nazanet.UDPConnection
rtcpConn *nazanet.UDPConnection
stat base.StatPub
audioRTPConn *nazanet.UDPConnection
videoRTPConn *nazanet.UDPConnection
audioRTCPConn *nazanet.UDPConnection
videoRTCPConn *nazanet.UDPConnection
stat base.StatPub
}
func NewSubSession(streamName string) *SubSession {
@ -47,14 +55,26 @@ func NewSubSession(streamName string) *SubSession {
return ss
}
func (s *SubSession) SetRTPConn(conn *nazanet.UDPConnection) {
s.rtpConn = conn
go s.rtpConn.RunLoop(s.onReadUDPPacket)
func (s *SubSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) {
s.rawSDP = rawSDP
s.sdpLogicCtx = sdpLogicCtx
}
func (s *SubSession) SetRTCPConn(conn *nazanet.UDPConnection) {
s.rtcpConn = conn
go s.rtcpConn.RunLoop(s.onReadUDPPacket)
func (s *SubSession) Setup(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error {
if strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) {
s.audioRTPConn = rtpConn
s.audioRTCPConn = rtcpConn
} else if strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) {
s.videoRTPConn = rtpConn
s.videoRTCPConn = rtcpConn
} else {
return ErrRTSP
}
go rtpConn.RunLoop(s.onReadUDPPacket)
go rtcpConn.RunLoop(s.onReadUDPPacket)
return nil
}
func (s *SubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bool {
@ -62,8 +82,13 @@ func (s *SubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo
return true
}
func (s *SubSession) WriteRawRTPPacket(b []byte) {
if err := s.rtpConn.Write(b); err != nil {
nazalog.Errorf("err=%+v", err)
}
//to be continued
//conn可能还不存在这里涉及到pub和sub是否需要等到setup再回调给上层的问题
func (s *SubSession) WriteRTPPacket(packet rtprtcp.RTPPacket) {
//switch packet.Header.PacketType {
//case base.RTPPacketTypeAVCOrHEVC:
// s.videoRTPConn.Write(packet.Raw)
//case base.RTPPacketTypeAAC:
// s.audioRTPConn.Write(packet.Raw)
//}
}

@ -19,7 +19,24 @@ import (
var ErrSDP = errors.New("lal.sdp: fxxk")
type SDPContext struct {
const (
ARTPMapEncodingName = "H265"
)
type LogicContext struct {
AudioClockRate int
VideoClockRate int
AudioPayloadType base.AVPacketPT
VideoPayloadType base.AVPacketPT
AudioAControl string
VideoAControl string
ASC []byte
VPS []byte
SPS []byte
PPS []byte
}
type RawContext struct {
ARTPMapList []ARTPMap
AFmtPBaseList []AFmtPBase
AControlList []AControl
@ -41,9 +58,64 @@ type AControl struct {
Value string
}
func ParseSDP2LogicContext(b []byte) (LogicContext, error) {
var ret LogicContext
c, err := ParseSDP2RawContext(b)
if err != nil {
return ret, err
}
for i, item := range c.ARTPMapList {
switch item.PayloadType {
case base.RTPPacketTypeAVCOrHEVC:
ret.VideoClockRate = item.ClockRate
if item.EncodingName == ARTPMapEncodingName {
ret.VideoPayloadType = base.AVPacketPTHEVC
} else {
ret.VideoPayloadType = base.AVPacketPTAVC
}
if i < len(c.AControlList) {
ret.VideoAControl = c.AControlList[i].Value
}
case base.RTPPacketTypeAAC:
ret.AudioClockRate = item.ClockRate
ret.AudioPayloadType = base.AVPacketPTAAC
if i < len(c.AControlList) {
ret.AudioAControl = c.AControlList[i].Value
}
default:
return ret, ErrSDP
}
}
for _, item := range c.AFmtPBaseList {
switch item.Format {
case base.RTPPacketTypeAVCOrHEVC:
if ret.VideoPayloadType == base.AVPacketPTHEVC {
ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(item)
} else {
ret.SPS, ret.PPS, err = ParseSPSPPS(item)
}
if err != nil {
return ret, err
}
case base.RTPPacketTypeAAC:
ret.ASC, err = ParseASC(item)
if err != nil {
return ret, err
}
default:
return ret, ErrSDP
}
}
return ret, nil
}
// 例子见单元测试
func ParseSDP(b []byte) (SDPContext, error) {
var sdpCtx SDPContext
func ParseSDP2RawContext(b []byte) (RawContext, error) {
var sdpCtx RawContext
s := string(b)
lines := strings.Split(s, "\r\n")

@ -44,8 +44,8 @@ var goldenPPS = []byte{
0x68, 0xEB, 0xEC, 0xB2, 0x2C,
}
func TestParseSDP(t *testing.T) {
sdpCtx, err := sdp.ParseSDP([]byte(goldenSDP))
func TestParseSDP2RawContext(t *testing.T) {
sdpCtx, err := sdp.ParseSDP2RawContext([]byte(goldenSDP))
assert.Equal(t, nil, err)
nazalog.Debugf("sdp=%+v", sdpCtx)
}

Loading…
Cancel
Save