mod messages:

- [perf] rtmp合并发送功能使用writev实现
- [refactor] 可定制性: logic: 抽象出ILalServer接口;业务方可在自身逻辑中创建server,选择是否获取notify通知,以及使用api控制server
- [refactor] 兼容性: 再增加一个配置文件默认搜索地址
- [refactor] 可读性: logic: ServerManager和Config不再作为全局变量使用;去除entry.go中间层;iface_impl.go移入innertest中;signal_xxx.go移入base中
pull/114/head
q191201771 3 years ago
parent 7d6c38a66b
commit e3578157f4

@ -22,14 +22,13 @@ import (
"github.com/q191201771/naza/pkg/bininfo" "github.com/q191201771/naza/pkg/bininfo"
) )
var sm *logic.ServerManager
func main() { func main() {
defer nazalog.Sync() defer nazalog.Sync()
confFile := parseFlag() confFile := parseFlag()
logic.Init(confFile) sm := logic.NewLalServer(confFile)
logic.RunLoop() err := sm.RunLoop()
nazalog.Infof("server manager done. err=%+v", err)
} }
func parseFlag() string { func parseFlag() string {
@ -57,6 +56,7 @@ func parseFlag() string {
filepath.FromSlash("../lalserver.conf.json"), filepath.FromSlash("../lalserver.conf.json"),
filepath.FromSlash("../../lalserver.conf.json"), filepath.FromSlash("../../lalserver.conf.json"),
filepath.FromSlash("../../conf/lalserver.conf.json"), filepath.FromSlash("../../conf/lalserver.conf.json"),
filepath.FromSlash("lal/conf/lalserver.conf.json"),
} }
for _, dcf := range defaultConfigFileList { for _, dcf := range defaultConfigFileList {
fi, err := os.Stat(dcf) fi, err := os.Stat(dcf)
@ -68,7 +68,7 @@ func parseFlag() string {
} }
} }
// 默认位置都没有,退出程序 // 所有默认位置都找不到配置文件,退出程序
flag.Usage() flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, ` _, _ = fmt.Fprintf(os.Stderr, `
Example: Example:

@ -1,116 +0,0 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
// 调用方场景/约定描述:
// - 内部实际写总是全量写成功
// - 外部写调用的内存块,永久有效,调用结束后,外部并不使用
// 内部实际写的内存块,调用结束后,实际写的实现可能还继续持有该内存块
//
// 与bufio.Writer表现不同的地方
// 数据超过缓存容量时bufio.Writer一般会切分成多个缓存容量大小的块实际写BufWriter则可能实际写大数据
type IBufWriter interface {
Write(p []byte)
Flush()
}
type WriterFunc func(p []byte)
func NewWriterFuncSize(wr WriterFunc, size int) IBufWriter {
if size <= 0 {
return &directWriter{
wr: wr,
}
}
b := &bufWriter{
wr: wr,
defaultSize: size,
}
b.mallocOnePiece(size)
return b
}
type bufWriter struct {
wr WriterFunc
defaultSize int // 缓存最大容量
buf []byte
n int // 当前已缓存大小
}
type directWriter struct {
wr WriterFunc
}
func (b *bufWriter) Write(p []byte) {
avail := b.available()
if len(p) > avail {
// 缓存剩余空间不够
if b.n == 0 {
// 缓存完全没有使用,依然空间不够
// 直接发送
b.wr(p)
return
}
// 填满当前缓存块,并发送
b.append(p[:avail])
b.Flush()
// 剩余数据
remain := len(p) - avail
if remain < b.defaultSize {
// 剩余数据较小
// 只追加进入缓存
b.append(p[avail:])
} else {
// 剩余数据较大
// 直接发送
b.wr(p[avail:])
}
return
}
// 缓存剩余空间足够
// 追加进入缓存
b.append(p)
}
func (b *bufWriter) Flush() {
if b.n == 0 {
return
}
b.wr(b.buf[:b.n])
b.mallocOnePiece(b.defaultSize)
}
// 缓存剩余空间
func (b *bufWriter) available() int {
return len(b.buf) - b.n
}
func (b *bufWriter) mallocOnePiece(size int) {
b.buf = make([]byte, size)
b.n = 0
}
func (b *bufWriter) append(p []byte) {
copy(b.buf[b.n:], p)
b.n += len(p)
}
func (dw *directWriter) Write(p []byte) {
dw.wr(p)
}
func (dw *directWriter) Flush() {
// noop
}

@ -1,63 +0,0 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"bytes"
"testing"
"github.com/q191201771/naza/pkg/assert"
)
func TestBufWriter(t *testing.T) {
var buf bytes.Buffer
w := NewWriterFuncSize(func(p []byte) {
_, _ = buf.Write(p)
}, 4096)
wb, _ := w.(*bufWriter)
w.Write(bytes.Repeat([]byte{0x1}, 5000))
assert.Equal(t, 4096-0, wb.available())
assert.Equal(t, bytes.Repeat([]byte{0x1}, 5000), buf.Bytes())
buf.Reset()
w.Write(bytes.Repeat([]byte{0x2}, 1024))
assert.Equal(t, 4096-1024, wb.available())
assert.Equal(t, 0, buf.Len())
w.Write(bytes.Repeat([]byte{0x3}, 1024))
assert.Equal(t, 4096-2048, wb.available())
assert.Equal(t, 0, buf.Len())
w.Write(bytes.Repeat([]byte{0x4}, 4096))
assert.Equal(t, 4096-2048, wb.available())
assert.Equal(t, 4096, buf.Len())
assert.Equal(t, bytes.Repeat([]byte{0x2}, 1024), buf.Bytes()[:1024])
assert.Equal(t, bytes.Repeat([]byte{0x3}, 1024), buf.Bytes()[1024:2048])
assert.Equal(t, bytes.Repeat([]byte{0x4}, 2048), buf.Bytes()[2048:])
buf.Reset()
w.Write(bytes.Repeat([]byte{0x5}, 8192))
assert.Equal(t, 4096-0, wb.available())
assert.Equal(t, 2048+8192, buf.Len())
assert.Equal(t, bytes.Repeat([]byte{0x4}, 2048), buf.Bytes()[:2048])
assert.Equal(t, bytes.Repeat([]byte{0x5}, 8192), buf.Bytes()[2048:])
buf.Reset()
w.Flush()
assert.Equal(t, 4096-0, wb.available())
assert.Equal(t, 0, buf.Len())
w.Write(bytes.Repeat([]byte{0x6}, 1024))
assert.Equal(t, 4096-1024, wb.available())
assert.Equal(t, 0, buf.Len())
w.Flush()
assert.Equal(t, 4096-0, wb.available())
assert.Equal(t, 1024, buf.Len())
assert.Equal(t, bytes.Repeat([]byte{0x6}, 1024), buf.Bytes()[:1024])
}

@ -0,0 +1,80 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"github.com/q191201771/naza/pkg/nazalog"
"net"
)
// TODO(chef): feat 通过时间戳(目前是数据大小)来设定合并阈值
// MergeWriter 合并多个内存块,达到阈值后一次性将内存块数组返回给上层
//
// 注意,输入时的单个内存块,回调时不会出现拆分切割的情况
//
type MergeWriter struct {
onWritev OnWritev
size int
currSize int
bs net.Buffers // TODO(chef): perf 复用外层切片
}
type OnWritev func(bs net.Buffers)
// NewMergeWriter
//
// @param onWritev 回调缓存的1~n个内存块
// @param size 回调阈值
//
func NewMergeWriter(onWritev OnWritev, size int) *MergeWriter {
return &MergeWriter{
onWritev: onWritev,
size: size,
}
}
// Write
//
// 注意,函数调用结束后,`b`内存块会被内部持有
//
func (w *MergeWriter) Write(b []byte) {
nazalog.Debugf("[%p] MergeWriter::Write. len=%d", w, len(b))
w.bs = append(w.bs, b)
w.currSize += len(b)
if w.currSize >= w.size {
w.flush()
}
}
// Flush 强制将内部缓冲的数据全部回调排空
//
func (w *MergeWriter) Flush() {
nazalog.Debugf("[%p] MergeWriter::Flush.", w)
if w.currSize > 0 {
w.flush()
}
}
// flush 将内部缓冲的数据全部回调排空
//
func (w *MergeWriter) flush() {
// only for debug log
var n int
var ns []int
for _, v := range w.bs {
n += len(v)
ns = append(ns, len(v))
}
nazalog.Debugf("[%p] MergeWriter::flush. len=%d(%v)", w, n, ns)
w.onWritev(w.bs)
w.currSize = 0
w.bs = nil
}

@ -0,0 +1,55 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"bytes"
"github.com/q191201771/naza/pkg/assert"
"net"
"testing"
)
func TestMergeWriter(t *testing.T) {
goldenBuf1 := bytes.Repeat([]byte{'a'}, 8192)
goldenBuf2 := bytes.Repeat([]byte{'b'}, 8192)
var cbBuf net.Buffers
w := NewMergeWriter(func(bs net.Buffers) {
cbBuf = bs
}, 4096)
// 直接超过
w.Write(goldenBuf1)
assert.Equal(t, 1, len(cbBuf))
assert.Equal(t, goldenBuf1, cbBuf[0])
cbBuf = nil
// 不超过
w.Write(goldenBuf1[:1024])
assert.Equal(t, nil, cbBuf)
// 多次不超过
w.Write(goldenBuf2[:2048])
assert.Equal(t, nil, cbBuf)
// 多次超过
w.Write(goldenBuf1[:2048])
assert.Equal(t, 3, len(cbBuf))
assert.Equal(t, goldenBuf1[:1024], cbBuf[0])
assert.Equal(t, goldenBuf2[:2048], cbBuf[1])
assert.Equal(t, goldenBuf1[:2048], cbBuf[2])
cbBuf = nil
// 不超过,强制刷新
w.Write(goldenBuf1[:1024])
assert.Equal(t, nil, cbBuf)
w.Flush()
assert.Equal(t, 1, len(cbBuf))
assert.Equal(t, goldenBuf1[:1024], cbBuf[0])
}

@ -8,7 +8,7 @@
// +build linux darwin netbsd freebsd openbsd dragonfly // +build linux darwin netbsd freebsd openbsd dragonfly
package logic package base
import ( import (
"os" "os"
@ -18,7 +18,11 @@ import (
log "github.com/q191201771/naza/pkg/nazalog" log "github.com/q191201771/naza/pkg/nazalog"
) )
func runSignalHandler(cb func()) { // RunSignalHandler 监听SIGUSR1和SIGUSR2信号并回调
//
// TODO(chef): refactor 函数名应与SIGUSR1挂钩
//
func RunSignalHandler(cb func()) {
c := make(chan os.Signal) c := make(chan os.Signal)
signal.Notify(c, syscall.SIGUSR1, syscall.SIGUSR2) signal.Notify(c, syscall.SIGUSR1, syscall.SIGUSR2)
s := <-c s := <-c

@ -10,6 +10,6 @@
package logic package logic
func runSignalHandler(cb func()) { func RunSignalHandler(cb func()) {
} }

@ -6,18 +6,21 @@
// //
// Author: Chef (191201771@qq.com) // Author: Chef (191201771@qq.com)
package logic package innertest
import ( import (
"github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hls" "github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/httpts" "github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/lal/pkg/remux" "github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp" "github.com/q191201771/lal/pkg/rtsp"
) )
// TODO(chef): refactor 有的interface以I开头有的不是
// TODO(chef): 整理所有Server类型Session的生命周期管理 // TODO(chef): 整理所有Server类型Session的生命周期管理
// - // -
// - rtmp没有独立的Pub、Sub Session结构体类型而是直接使用ServerSession // - rtmp没有独立的Pub、Sub Session结构体类型而是直接使用ServerSession
@ -150,19 +153,24 @@ var (
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
var _ rtmp.ServerObserver = &ServerManager{} var _ logic.ILalServer = &logic.ServerManager{}
var _ rtsp.ServerObserver = &ServerManager{} var _ rtmp.ServerObserver = &logic.ServerManager{}
var _ HttpServerHandlerObserver = &ServerManager{} var _ logic.HttpServerHandlerObserver = &logic.ServerManager{}
var _ rtsp.ServerObserver = &logic.ServerManager{}
var _ logic.IGroupCreator = &logic.ServerManager{}
var _ logic.GroupObserver = &logic.ServerManager{}
var _ HttpApiServerObserver = &ServerManager{} var _ logic.INotifyHandler = &logic.HttpNotify{}
var _ logic.IGroupManager = &logic.SimpleGroupManager{}
var _ logic.IGroupManager = &logic.ComplexGroupManager{}
var _ rtmp.PubSessionObserver = &Group{} // var _ rtmp.PubSessionObserver = &logic.Group{} //
var _ rtsp.PullSessionObserver = &Group{} var _ rtsp.PullSessionObserver = &logic.Group{}
var _ rtsp.PullSessionObserver = &remux.AvPacket2RtmpRemuxer{} var _ rtsp.PullSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ rtsp.PubSessionObserver = &Group{} var _ rtsp.PubSessionObserver = &logic.Group{}
var _ rtsp.PubSessionObserver = &remux.AvPacket2RtmpRemuxer{} var _ rtsp.PubSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ hls.MuxerObserver = &Group{} var _ hls.MuxerObserver = &logic.Group{}
var _ rtsp.BaseInSessionObserver = &Group{} // var _ rtsp.BaseInSessionObserver = &logic.Group{} //
var _ rtsp.BaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{} var _ rtsp.BaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ rtmp.ServerSessionObserver = &rtmp.Server{} var _ rtmp.ServerSessionObserver = &rtmp.Server{}
@ -180,8 +188,3 @@ var _ rtsp.IInterleavedPacketWriter = &rtsp.ClientCommandSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.ServerCommandSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.ServerCommandSession{}
var _ hls.StreamerObserver = &hls.Muxer{} var _ hls.StreamerObserver = &hls.Muxer{}
var _ IGroupManager = &SimpleGroupManager{}
var _ IGroupManager = &ComplexGroupManager{}
var _ INotifyHandler = &DefaultNotifyHandler{}

@ -74,11 +74,11 @@ func InnerTestEntry(t *testing.T) {
var err error var err error
logic.Init(confFile) sm := logic.NewServerManager(confFile)
go logic.RunLoop() go sm.RunLoop()
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
config := logic.GetConfig() config := sm.Config()
_ = os.RemoveAll(config.HlsConfig.OutPath) _ = os.RemoveAll(config.HlsConfig.OutPath)
@ -160,7 +160,7 @@ func InnerTestEntry(t *testing.T) {
rtmpWriter.Dispose() rtmpWriter.Dispose()
// 由于windows没有信号会导致编译错误所以直接调用Dispose // 由于windows没有信号会导致编译错误所以直接调用Dispose
//_ = syscall.Kill(syscall.Getpid(), syscall.SIGUSR1) //_ = syscall.Kill(syscall.Getpid(), syscall.SIGUSR1)
logic.Dispose() sm.Dispose()
nazalog.Debugf("count. %d %d %d", fileTagCount.Load(), httpflvPullTagCount.Load(), rtmpPullTagCount.Load()) nazalog.Debugf("count. %d %d %d", fileTagCount.Load(), httpflvPullTagCount.Load(), rtmpPullTagCount.Load())
compareFile() compareFile()

@ -139,6 +139,8 @@ type CommonHttpAddrConfig struct {
} }
func LoadConfAndInitLog(confFile string) *Config { func LoadConfAndInitLog(confFile string) *Config {
var config *Config
// 读取配置文件并解析原始内容 // 读取配置文件并解析原始内容
rawContent, err := ioutil.ReadFile(confFile) rawContent, err := ioutil.ReadFile(confFile)
if err != nil { if err != nil {

@ -1,89 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"net/http"
_ "net/http/pprof"
"os"
"strings"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/bininfo"
"github.com/q191201771/naza/pkg/nazalog"
//"github.com/felixge/fgprof"
)
var (
config *Config
sm *ServerManager
)
// TODO(chef) 临时供innertest使用后面应该重构
func GetConfig() *Config {
return config
}
func Init(confFile string) {
LoadConfAndInitLog(confFile)
dir, _ := os.Getwd()
nazalog.Infof("wd: %s", dir)
nazalog.Infof("args: %s", strings.Join(os.Args, " "))
nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine())
nazalog.Infof("version: %s", base.LalFullInfo)
nazalog.Infof("github: %s", base.LalGithubSite)
nazalog.Infof("doc: %s", base.LalDocSite)
if config.HlsConfig.Enable && config.HlsConfig.UseMemoryAsDiskFlag {
nazalog.Infof("hls use memory as disk.")
hls.SetUseMemoryAsDiskFlag(true)
}
if config.RecordConfig.EnableFlv {
if err := os.MkdirAll(config.RecordConfig.FlvOutPath, 0777); err != nil {
nazalog.Errorf("record flv mkdir error. path=%s, err=%+v", config.RecordConfig.FlvOutPath, err)
}
if err := os.MkdirAll(config.RecordConfig.MpegtsOutPath, 0777); err != nil {
nazalog.Errorf("record mpegts mkdir error. path=%s, err=%+v", config.RecordConfig.MpegtsOutPath, err)
}
}
}
func RunLoop() {
sm = NewServerManager()
if config.PprofConfig.Enable {
go runWebPprof(config.PprofConfig.Addr)
}
go runSignalHandler(func() {
sm.Dispose()
})
err := sm.RunLoop()
nazalog.Errorf("server manager loop break. err=%+v", err)
}
func Dispose() {
sm.Dispose()
}
func runWebPprof(addr string) {
nazalog.Infof("start web pprof listen. addr=%s", addr)
//nazalog.Warn("start fgprof.")
//http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
if err := http.ListenAndServe(addr, nil); err != nil {
nazalog.Error(err)
return
}
}

@ -11,6 +11,7 @@ package logic
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
@ -22,8 +23,6 @@ import (
"github.com/q191201771/lal/pkg/remux" "github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/naza/pkg/defertaskthread"
"github.com/q191201771/lal/pkg/rtprtcp" "github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/hevc" "github.com/q191201771/lal/pkg/hevc"
@ -42,10 +41,16 @@ import (
"github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/nazalog"
) )
type GroupObserver interface {
CleanupHlsIfNeeded(appName string, streamName string, path string)
}
type Group struct { type Group struct {
UniqueKey string // const after init UniqueKey string // const after init
appName string // const after init appName string // const after init
streamName string // const after init TODO chef: 和stat里的字段重复可以删除掉 streamName string // const after init TODO chef: 和stat里的字段重复可以删除掉
config *Config
observer GroupObserver
exitChan chan struct{} exitChan chan struct{}
@ -80,7 +85,7 @@ type Group struct {
// rtmp pub使用 // rtmp pub使用
dummyAudioFilter *DummyAudioFilter dummyAudioFilter *DummyAudioFilter
// rtmp sub使用 // rtmp sub使用
rtmpBufWriter base.IBufWriter // TODO(chef): 后面可以在业务层加一个定时Flush rtmpMergeWriter *base.MergeWriter // TODO(chef): 后面可以在业务层加一个定时Flush
// mpegts使用 // mpegts使用
patpmt []byte patpmt []byte
// rtsp使用 // rtsp使用
@ -99,7 +104,7 @@ type pushProxy struct {
pushSession *rtmp.PushSession pushSession *rtmp.PushSession
} }
func NewGroup(appName string, streamName string) *Group { func NewGroup(appName string, streamName string, config *Config, observer GroupObserver) *Group {
uk := base.GenUkGroup() uk := base.GenUkGroup()
url2PushProxy := make(map[string]*pushProxy) // TODO(chef): 移入Enable里面并进行review+测试 url2PushProxy := make(map[string]*pushProxy) // TODO(chef): 移入Enable里面并进行review+测试
@ -117,6 +122,8 @@ func NewGroup(appName string, streamName string) *Group {
UniqueKey: uk, UniqueKey: uk,
appName: appName, appName: appName,
streamName: streamName, streamName: streamName,
config: config,
observer: observer,
stat: base.StatGroup{ stat: base.StatGroup{
StreamName: streamName, StreamName: streamName,
}, },
@ -139,7 +146,9 @@ func NewGroup(appName string, streamName string) *Group {
} }
g.setPullUrl(config.RelayPullConfig.Enable, pullUrl) g.setPullUrl(config.RelayPullConfig.Enable, pullUrl)
g.rtmpBufWriter = base.NewWriterFuncSize(g.write2RtmpSubSessions, config.RtmpConfig.MergeWriteSize) if config.RtmpConfig.MergeWriteSize > 0 {
g.rtmpMergeWriter = base.NewMergeWriter(g.writev2RtmpSubSessions, config.RtmpConfig.MergeWriteSize)
}
nazalog.Infof("[%s] lifecycle new group. group=%p, appName=%s, streamName=%s", uk, g, appName, streamName) nazalog.Infof("[%s] lifecycle new group. group=%p, appName=%s, streamName=%s", uk, g, appName, streamName)
return g return g
@ -310,7 +319,7 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) bool {
group.rtmpPubSession = session group.rtmpPubSession = session
group.addIn() group.addIn()
if config.RtspConfig.Enable { if group.config.RtspConfig.Enable {
group.rtmp2RtspRemuxer = remux.NewRtmp2RtspRemuxer( group.rtmp2RtspRemuxer = remux.NewRtmp2RtspRemuxer(
func(sdpCtx sdp.LogicContext) { func(sdpCtx sdp.LogicContext) {
group.sdpCtx = &sdpCtx group.sdpCtx = &sdpCtx
@ -320,9 +329,9 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) bool {
} }
// TODO(chef): 为rtmp pull以及rtsp也添加叠加静音音频的功能 // TODO(chef): 为rtmp pull以及rtsp也添加叠加静音音频的功能
if config.RtmpConfig.AddDummyAudioEnable { if group.config.RtmpConfig.AddDummyAudioEnable {
// TODO(chef): 从整体控制和锁关系来说应该让pub的数据回调到group中进锁后再让数据流入filter // TODO(chef): 从整体控制和锁关系来说应该让pub的数据回调到group中进锁后再让数据流入filter
group.dummyAudioFilter = NewDummyAudioFilter(group.UniqueKey, config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg) group.dummyAudioFilter = NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg)
session.SetPubSessionObserver(group.dummyAudioFilter) session.SetPubSessionObserver(group.dummyAudioFilter)
} else { } else {
session.SetPubSessionObserver(group) session.SetPubSessionObserver(group)
@ -782,12 +791,12 @@ func (group *Group) isTotalEmpty() bool {
// //
func (group *Group) addIn() { func (group *Group) addIn() {
// 是否启动hls // 是否启动hls
if config.HlsConfig.Enable { if group.config.HlsConfig.Enable {
if group.hlsMuxer != nil { if group.hlsMuxer != nil {
nazalog.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer) nazalog.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer)
} }
enable := config.HlsConfig.Enable || config.HlsConfig.EnableHttps enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps
group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &config.HlsConfig.MuxerConfig, group) group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group)
group.hlsMuxer.Start() group.hlsMuxer.Start()
} }
@ -799,9 +808,9 @@ func (group *Group) addIn() {
now := time.Now().Unix() now := time.Now().Unix()
// 是否录制成flv文件 // 是否录制成flv文件
if config.RecordConfig.EnableFlv { if group.config.RecordConfig.EnableFlv {
filename := fmt.Sprintf("%s-%d.flv", group.streamName, now) filename := fmt.Sprintf("%s-%d.flv", group.streamName, now)
filenameWithPath := filepath.Join(config.RecordConfig.FlvOutPath, filename) filenameWithPath := filepath.Join(group.config.RecordConfig.FlvOutPath, filename)
if group.recordFlv != nil { if group.recordFlv != nil {
nazalog.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s", nazalog.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordFlv.Name()) group.UniqueKey, filenameWithPath, group.recordFlv.Name())
@ -823,9 +832,9 @@ func (group *Group) addIn() {
} }
// 是否录制成ts文件 // 是否录制成ts文件
if config.RecordConfig.EnableMpegts { if group.config.RecordConfig.EnableMpegts {
filename := fmt.Sprintf("%s-%d.ts", group.streamName, now) filename := fmt.Sprintf("%s-%d.ts", group.streamName, now)
filenameWithPath := filepath.Join(config.RecordConfig.MpegtsOutPath, filename) filenameWithPath := filepath.Join(group.config.RecordConfig.MpegtsOutPath, filename)
if group.recordMpegts != nil { if group.recordMpegts != nil {
nazalog.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s", nazalog.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordMpegts.Name()) group.UniqueKey, filenameWithPath, group.recordMpegts.Name())
@ -846,7 +855,7 @@ func (group *Group) addIn() {
// //
func (group *Group) delIn() { func (group *Group) delIn() {
// 停止hls // 停止hls
if config.HlsConfig.Enable && group.hlsMuxer != nil { if group.config.HlsConfig.Enable && group.hlsMuxer != nil {
group.disposeHlsMuxer() group.disposeHlsMuxer()
} }
@ -861,7 +870,7 @@ func (group *Group) delIn() {
} }
// 停止flv录制 // 停止flv录制
if config.RecordConfig.EnableFlv { if group.config.RecordConfig.EnableFlv {
if group.recordFlv != nil { if group.recordFlv != nil {
if err := group.recordFlv.Dispose(); err != nil { if err := group.recordFlv.Dispose(); err != nil {
nazalog.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err) nazalog.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err)
@ -871,7 +880,7 @@ func (group *Group) delIn() {
} }
// 停止ts录制 // 停止ts录制
if config.RecordConfig.EnableMpegts { if group.config.RecordConfig.EnableMpegts {
if group.recordMpegts != nil { if group.recordMpegts != nil {
if err := group.recordMpegts.Dispose(); err != nil { if err := group.recordMpegts.Dispose(); err != nil {
nazalog.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err) nazalog.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err)
@ -892,33 +901,7 @@ func (group *Group) disposeHlsMuxer() {
if group.hlsMuxer != nil { if group.hlsMuxer != nil {
group.hlsMuxer.Dispose() group.hlsMuxer.Dispose()
// 添加延时任务删除HLS文件 group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath())
if config.HlsConfig.Enable &&
(config.HlsConfig.CleanupMode == hls.CleanupModeInTheEnd || config.HlsConfig.CleanupMode == hls.CleanupModeAsap) {
defertaskthread.Go(
config.HlsConfig.FragmentDurationMs*config.HlsConfig.FragmentNum*2,
func(param ...interface{}) {
appName := param[0].(string)
streamName := param[1].(string)
outPath := param[2].(string)
if g := sm.GetGroup(appName, streamName); g != nil {
if g.IsHlsMuxerAlive() {
nazalog.Warnf("cancel cleanup hls file path since hls muxer still alive. streamName=%s", streamName)
return
}
}
nazalog.Infof("cleanup hls file path. streamName=%s, path=%s", streamName, outPath)
if err := hls.RemoveAll(outPath); err != nil {
nazalog.Warnf("cleanup hls file path error. path=%s, err=%+v", outPath, err)
}
},
group.appName,
group.streamName,
group.hlsMuxer.OutPath(),
)
}
group.hlsMuxer = nil group.hlsMuxer = nil
} }
@ -1010,12 +993,12 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
//nazalog.Debugf("[%s] broadcaseRTMP. header=%+v, %s", group.UniqueKey, msg.Header, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 7))) //nazalog.Debugf("[%s] broadcaseRTMP. header=%+v, %s", group.UniqueKey, msg.Header, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 7)))
// # hls // # hls
if config.HlsConfig.Enable && group.hlsMuxer != nil { if group.config.HlsConfig.Enable && group.hlsMuxer != nil {
group.hlsMuxer.FeedRtmpMessage(msg) group.hlsMuxer.FeedRtmpMessage(msg)
} }
// # rtsp // # rtsp
if config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil { if group.config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil {
group.rtmp2RtspRemuxer.FeedRtmpMsg(msg) group.rtmp2RtspRemuxer.FeedRtmpMsg(msg)
} }
@ -1062,7 +1045,9 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
// 有新加入的sub session本次循环的第一个新加入的sub session把rtmp buf writer中的缓存数据全部广播发送给老的sub session // 有新加入的sub session本次循环的第一个新加入的sub session把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 从而确保新加入的sub session不会发送这部分脏的数据 // 从而确保新加入的sub session不会发送这部分脏的数据
// 注意此处可能被调用多次但是只有第一次会实际flush缓存数据 // 注意此处可能被调用多次但是只有第一次会实际flush缓存数据
group.rtmpBufWriter.Flush() if group.rtmpMergeWriter != nil {
group.rtmpMergeWriter.Flush()
}
session.IsFresh = false session.IsFresh = false
} }
@ -1072,13 +1057,19 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
// 把rtmp buf writer中的缓存数据全部广播发送给老的sub session // 把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 并且修改这个sub session的标志 // 并且修改这个sub session的标志
// 让rtmp buf writer来发送这个关键帧 // 让rtmp buf writer来发送这个关键帧
group.rtmpBufWriter.Flush() if group.rtmpMergeWriter != nil {
group.rtmpMergeWriter.Flush()
}
session.ShouldWaitVideoKeyFrame = false session.ShouldWaitVideoKeyFrame = false
} }
} }
// ## 转发本次数据 // ## 转发本次数据
if len(group.rtmpSubSessionSet) > 0 { if len(group.rtmpSubSessionSet) > 0 {
group.rtmpBufWriter.Write(lcd.Get()) if group.rtmpMergeWriter == nil {
group.write2RtmpSubSessions(lcd.Get())
} else {
group.rtmpMergeWriter.Write(lcd.Get())
}
} }
// TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下 // TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下
@ -1156,10 +1147,10 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
} }
// # 缓存关键信息以及gop // # 缓存关键信息以及gop
if config.RtmpConfig.Enable { if group.config.RtmpConfig.Enable {
group.rtmpGopCache.Feed(msg, lcd.Get) group.rtmpGopCache.Feed(msg, lcd.Get)
} }
if config.HttpflvConfig.Enable { if group.config.HttpflvConfig.Enable {
group.httpflvGopCache.Feed(msg, lrm2ft.Get) group.httpflvGopCache.Feed(msg, lrm2ft.Get)
} }
@ -1254,6 +1245,15 @@ func (group *Group) write2RtmpSubSessions(b []byte) {
} }
} }
func (group *Group) writev2RtmpSubSessions(bs net.Buffers) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
continue
}
_ = session.Writev(bs)
}
}
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
func (group *Group) isPullEnable() bool { func (group *Group) isPullEnable() bool {

@ -8,26 +8,13 @@
package logic package logic
// 注意,这个模块暂时不对外开放 // 注意这个模块的功能不完全目前只使用SimpleGroupManager
//
// TODO
// - 现有逻辑重构至当前模块中【DONE】
// - server_manger接入当前模块替换掉原来的map【DONE】
// - 重构整理使用server_manager的地方【DONE】
// - 实现appName逻辑的IGroupManager【DONE】
// - 增加单元测试【DONE】
// - 配置文件或var.go中增加选取具体IGroupManager实现的开关
// - 去除配置文件中一部分的url_pattern
// - 更新相应的文档本文件注释server_manager等中原有关于appName的注释配置文件文档流地址列表文档
// - 创建group时没有appname后面又有了可以考虑更新一下
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
// OnIterateGroup type IGroupCreator interface {
// CreateGroup(appName, streamName string) *Group
// @return 如果返回false则删除这个元素 }
//
type OnIterateGroup func(group *Group) bool
// IGroupManager // IGroupManager
// //
@ -37,8 +24,7 @@ type OnIterateGroup func(group *Group) bool
type IGroupManager interface { type IGroupManager interface {
// GetOrCreateGroup // GetOrCreateGroup
// //
// @param appName 如果没有,可以为"" // @param appName 注意,如果没有,可以为""
//
// @return createFlag 如果为false表示group之前就存在如果为true表示为当前新创建 // @return createFlag 如果为false表示group之前就存在如果为true表示为当前新创建
// //
GetOrCreateGroup(appName string, streamName string) (group *Group, createFlag bool) GetOrCreateGroup(appName string, streamName string) (group *Group, createFlag bool)
@ -49,11 +35,15 @@ type IGroupManager interface {
// //
GetGroup(appName string, streamName string) *Group GetGroup(appName string, streamName string) *Group
Iterate(onIterateGroup OnIterateGroup) // Iterate 遍历所有 Group
//
// @param onIterateGroup 如果返回false则删除这个 Group
//
Iterate(onIterateGroup func(group *Group) bool)
Len() int Len() int
// TODO(chef): 没有提供删除操作,因为目前使用时,是在遍历时做删除的 // TODO(chef): feat 没有提供删除操作,因为目前使用时,是在遍历时做删除的
} }
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
@ -61,11 +51,13 @@ type IGroupManager interface {
// SimpleGroupManager 忽略appName只使用streamName // SimpleGroupManager 忽略appName只使用streamName
// //
type SimpleGroupManager struct { type SimpleGroupManager struct {
groupCreator IGroupCreator
groups map[string]*Group // streamName -> Group groups map[string]*Group // streamName -> Group
} }
func NewSimpleGroupManager() *SimpleGroupManager { func NewSimpleGroupManager(groupCreator IGroupCreator) *SimpleGroupManager {
return &SimpleGroupManager{ return &SimpleGroupManager{
groupCreator: groupCreator,
groups: make(map[string]*Group), groups: make(map[string]*Group),
} }
} }
@ -73,7 +65,7 @@ func NewSimpleGroupManager() *SimpleGroupManager {
func (s *SimpleGroupManager) GetOrCreateGroup(appName string, streamName string) (group *Group, createFlag bool) { func (s *SimpleGroupManager) GetOrCreateGroup(appName string, streamName string) (group *Group, createFlag bool) {
g := s.GetGroup(appName, streamName) g := s.GetGroup(appName, streamName)
if g == nil { if g == nil {
g = NewGroup(appName, streamName) g = s.groupCreator.CreateGroup(appName, streamName)
s.groups[streamName] = g s.groups[streamName] = g
return g, true return g, true
} }
@ -88,9 +80,7 @@ func (s *SimpleGroupManager) GetGroup(appName string, streamName string) *Group
return g return g
} }
// Iterate 遍历group func (s *SimpleGroupManager) Iterate(onIterateGroup func(group *Group) bool) {
//
func (s *SimpleGroupManager) Iterate(onIterateGroup OnIterateGroup) {
for streamName, group := range s.groups { for streamName, group := range s.groups {
if !onIterateGroup(group) { if !onIterateGroup(group) {
delete(s.groups, streamName) delete(s.groups, streamName)
@ -104,6 +94,21 @@ func (s *SimpleGroupManager) Len() int {
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
// TODO(chef):
//
// - 现有逻辑重构至当前模块中【DONE】
// - server_manger接入当前模块替换掉原来的map【DONE】
// - 重构整理使用server_manager的地方【DONE】
// - 实现appName逻辑的IGroupManager【DONE】
// - 增加单元测试【DONE】
// - 配置文件或var.go中增加选取具体IGroupManager实现的开关
// - 去除配置文件中一部分的url_pattern
// - 更新相应的文档本文件注释server_manager等中原有关于appName的注释配置文件文档流地址列表文档
// - 创建group时没有appname后面又有了可以考虑更新一下
// - ComplexGroupManager使用IGroupCreator
// ---------------------------------------------------------------------------------------------------------------------
// ComplexGroupManager // ComplexGroupManager
// //
// 背景: // 背景:
@ -118,13 +123,15 @@ func (s *SimpleGroupManager) Len() int {
// - group可能由第一种协议创建也可能由第二种协议创建 // - group可能由第一种协议创建也可能由第二种协议创建
// //
type ComplexGroupManager struct { type ComplexGroupManager struct {
groupCreator IGroupCreator
// 注意一个group只可能在一个容器中两个容器中的group加起来才是全量 // 注意一个group只可能在一个容器中两个容器中的group加起来才是全量
onlyStreamNameGroups map[string]*Group // streamName -> Group onlyStreamNameGroups map[string]*Group // streamName -> Group
appNameStreamNameGroups map[string]map[string]*Group // appName -> streamName -> Group appNameStreamNameGroups map[string]map[string]*Group // appName -> streamName -> Group
} }
func NewComplexGroupManager() *ComplexGroupManager { func NewComplexGroupManager(groupCreator IGroupCreator) *ComplexGroupManager {
return &ComplexGroupManager{ return &ComplexGroupManager{
groupCreator: groupCreator,
onlyStreamNameGroups: make(map[string]*Group), onlyStreamNameGroups: make(map[string]*Group),
appNameStreamNameGroups: make(map[string]map[string]*Group), appNameStreamNameGroups: make(map[string]map[string]*Group),
} }
@ -159,7 +166,7 @@ func (gm *ComplexGroupManager) getGroup(appName string, streamName string, shoul
// 两个容器都没找到 // 两个容器都没找到
if shouldCreate { if shouldCreate {
group = NewGroup(appName, streamName) group = gm.groupCreator.CreateGroup(appName, streamName)
gm.onlyStreamNameGroups[streamName] = group gm.onlyStreamNameGroups[streamName] = group
return group, true return group, true
} else { } else {
@ -183,7 +190,7 @@ func (gm *ComplexGroupManager) getGroup(appName string, streamName string, shoul
// 都没有找到 // 都没有找到
if shouldCreate { if shouldCreate {
group = NewGroup(appName, streamName) group = gm.groupCreator.CreateGroup(appName, streamName)
if !mok { if !mok {
m = make(map[string]*Group) m = make(map[string]*Group)
gm.appNameStreamNameGroups[appName] = m gm.appNameStreamNameGroups[appName] = m
@ -196,7 +203,7 @@ func (gm *ComplexGroupManager) getGroup(appName string, streamName string, shoul
} }
} }
func (gm *ComplexGroupManager) Iterate(onIterateGroup OnIterateGroup) { func (gm *ComplexGroupManager) Iterate(onIterateGroup func(group *Group) bool) {
for streamName, group := range gm.onlyStreamNameGroups { for streamName, group := range gm.onlyStreamNameGroups {
if !onIterateGroup(group) { if !onIterateGroup(group) {
delete(gm.onlyStreamNameGroups, streamName) delete(gm.onlyStreamNameGroups, streamName)

@ -13,10 +13,17 @@ import (
"testing" "testing"
) )
func TestGroupManager(t *testing.T) { type mockGroupCreator struct {
// TODO(chef): refactor 由于Group中耦合了全局变量config不依赖取值但是不能为nil所以该单元测试进行前对config做初始化 }
config = &Config{}
func (m *mockGroupCreator) CreateGroup(appName, streamName string) *Group {
var config Config
return NewGroup(appName, streamName, &config, nil)
}
var mgc = &mockGroupCreator{}
func TestGroupManager(t *testing.T) {
var ( var (
sgm IGroupManager sgm IGroupManager
sg0 *Group sg0 *Group
@ -35,8 +42,8 @@ func TestGroupManager(t *testing.T) {
createFlag bool createFlag bool
) )
sgm = NewSimpleGroupManager() sgm = NewSimpleGroupManager(mgc)
cgm = NewComplexGroupManager() cgm = NewComplexGroupManager(mgc)
// (为空时)获取 // (为空时)获取
// 获取到nil // 获取到nil
@ -266,8 +273,8 @@ func TestGroupManager(t *testing.T) {
//---------------------------- //----------------------------
sgm = NewSimpleGroupManager() sgm = NewSimpleGroupManager(mgc)
cgm = NewComplexGroupManager() cgm = NewComplexGroupManager(mgc)
sg0 = cgm.GetGroup("", "stream1") sg0 = cgm.GetGroup("", "stream1")
cg0 = cgm.GetGroup("", "stream1") cg0 = cgm.GetGroup("", "stream1")

@ -10,36 +10,25 @@ package logic
import ( import (
"encoding/json" "encoding/json"
"github.com/q191201771/naza/pkg/nazahttp"
"net" "net"
"net/http" "net/http"
"time"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/bininfo"
"github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/nazalog"
) )
var serverStartTime string
type HttpApiServerObserver interface {
OnStatAllGroup() []base.StatGroup
OnStatGroup(streamName string) *base.StatGroup
OnCtrlStartPull(info base.ApiCtrlStartPullReq)
OnCtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic
}
type HttpApiServer struct { type HttpApiServer struct {
addr string addr string
observer HttpApiServerObserver sm *ServerManager
ln net.Listener ln net.Listener
} }
func NewHttpApiServer(addr string, observer HttpApiServerObserver) *HttpApiServer { func NewHttpApiServer(addr string, sm *ServerManager) *HttpApiServer {
return &HttpApiServer{ return &HttpApiServer{
addr: addr, addr: addr,
observer: observer, sm: sm,
} }
} }
@ -51,7 +40,7 @@ func (h *HttpApiServer) Listen() (err error) {
return return
} }
func (h *HttpApiServer) Runloop() error { func (h *HttpApiServer) RunLoop() error {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/api/list", h.apiListHandler) mux.HandleFunc("/api/list", h.apiListHandler)
@ -68,55 +57,19 @@ func (h *HttpApiServer) Runloop() error {
// TODO chef: dispose // TODO chef: dispose
func (h *HttpApiServer) apiListHandler(w http.ResponseWriter, req *http.Request) {
// TODO chef: 写完api list页面
b := []byte(`
<html>
<head><title>lal http api list</title></head>
<body>
<br>
<br>
<p>api</p>
<ul>
<li><a href="/api/list">/api/list</a></li>
<li><a href="/api/stat/group?stream_name=test110">/api/stat/group?stream_name=test110</a></li>
<li><a href="/api/stat/all_group">/api/stat/all_group</a></li>
<li><a href="/api/stat/lal_info">/api/stat/lal_info</a></li>
<li><a href="/api/ctrl/start_pull?protocol=rtmp&addr=127.0.0.1:1935&app_name=live&stream_name=test110&url_param=token=aaa">/api/ctrl/start_pull?protocol=rtmp&addr=127.0.0.1:1935&app_name=live&stream_name=test110&url_param=token=aaa</a></li>
</ul>
<br>
<p></p>
<ul>
<li><a href="https://pengrl.com/p/20100/">lal http api</a></li>
<li><a href="https://github.com/q191201771/lal">lal github</a></li>
</ul>
</body>
</html>
`)
w.Header().Add("Server", base.LalHttpApiServer)
_, _ = w.Write(b)
}
func (h *HttpApiServer) statLalInfoHandler(w http.ResponseWriter, req *http.Request) { func (h *HttpApiServer) statLalInfoHandler(w http.ResponseWriter, req *http.Request) {
var v base.ApiStatLalInfo var v base.ApiStatLalInfo
v.ErrorCode = base.ErrorCodeSucc v.ErrorCode = base.ErrorCodeSucc
v.Desp = base.DespSucc v.Desp = base.DespSucc
v.Data.BinInfo = bininfo.StringifySingleLine() v.Data = h.sm.StatLalInfo()
v.Data.LalVersion = base.LalVersion
v.Data.ApiVersion = base.HttpApiVersion
v.Data.NotifyVersion = base.HttpNotifyVersion
v.Data.StartTime = serverStartTime
v.Data.ServerId = config.ServerId
feedback(v, w) feedback(v, w)
} }
func (h *HttpApiServer) statAllGroupHandler(w http.ResponseWriter, req *http.Request) { func (h *HttpApiServer) statAllGroupHandler(w http.ResponseWriter, req *http.Request) {
gs := h.observer.OnStatAllGroup()
var v base.ApiStatAllGroup var v base.ApiStatAllGroup
v.ErrorCode = base.ErrorCodeSucc v.ErrorCode = base.ErrorCodeSucc
v.Desp = base.DespSucc v.Desp = base.DespSucc
v.Data.Groups = gs v.Data.Groups = h.sm.StatAllGroup()
feedback(v, w) feedback(v, w)
} }
@ -132,7 +85,7 @@ func (h *HttpApiServer) statGroupHandler(w http.ResponseWriter, req *http.Reques
return return
} }
v.Data = h.observer.OnStatGroup(streamName) v.Data = h.sm.StatGroup(streamName)
if v.Data == nil { if v.Data == nil {
v.ErrorCode = base.ErrorCodeGroupNotFound v.ErrorCode = base.ErrorCodeGroupNotFound
v.Desp = base.DespGroupNotFound v.Desp = base.DespGroupNotFound
@ -160,7 +113,7 @@ func (h *HttpApiServer) ctrlStartPullHandler(w http.ResponseWriter, req *http.Re
} }
nazalog.Infof("http api start pull. req info=%+v", info) nazalog.Infof("http api start pull. req info=%+v", info)
h.observer.OnCtrlStartPull(info) h.sm.CtrlStartPull(info)
v.ErrorCode = base.ErrorCodeSucc v.ErrorCode = base.ErrorCodeSucc
v.Desp = base.DespSucc v.Desp = base.DespSucc
feedback(v, w) feedback(v, w)
@ -181,17 +134,45 @@ func (h *HttpApiServer) ctrlKickOutSessionHandler(w http.ResponseWriter, req *ht
} }
nazalog.Infof("http api kick out session. req info=%+v", info) nazalog.Infof("http api kick out session. req info=%+v", info)
resp := h.observer.OnCtrlKickOutSession(info) resp := h.sm.CtrlKickOutSession(info)
feedback(resp, w) feedback(resp, w)
return return
} }
func (h *HttpApiServer) apiListHandler(w http.ResponseWriter, req *http.Request) {
// TODO chef: 写完api list页面
b := []byte(`
<html>
<head><title>lal http api list</title></head>
<body>
<br>
<br>
<p>api</p>
<ul>
<li><a href="/api/list">/api/list</a></li>
<li><a href="/api/stat/group?stream_name=test110">/api/stat/group?stream_name=test110</a></li>
<li><a href="/api/stat/all_group">/api/stat/all_group</a></li>
<li><a href="/api/stat/lal_info">/api/stat/lal_info</a></li>
<li><a href="/api/ctrl/start_pull?protocol=rtmp&addr=127.0.0.1:1935&app_name=live&stream_name=test110&url_param=token=aaa">/api/ctrl/start_pull?protocol=rtmp&addr=127.0.0.1:1935&app_name=live&stream_name=test110&url_param=token=aaa</a></li>
</ul>
<br>
<p></p>
<ul>
<li><a href="https://pengrl.com/p/20100/">lal http api</a></li>
<li><a href="https://github.com/q191201771/lal">lal github</a></li>
</ul>
</body>
</html>
`)
w.Header().Add("Server", base.LalHttpApiServer)
_, _ = w.Write(b)
}
// ---------------------------------------------------------------------------------------------------------------------
func feedback(v interface{}, w http.ResponseWriter) { func feedback(v interface{}, w http.ResponseWriter) {
resp, _ := json.Marshal(v) resp, _ := json.Marshal(v)
w.Header().Add("Server", base.LalHttpApiServer) w.Header().Add("Server", base.LalHttpApiServer)
_, _ = w.Write(resp) _, _ = w.Write(resp)
} }
func init() {
serverStartTime = time.Now().Format("2006-01-02 15:04:05.999")
}

@ -18,6 +18,6 @@ func TestHttpApiServer(t *testing.T) {
// nazalog.Error(err) // nazalog.Error(err)
// return // return
//} //}
//err := s.Runloop() //err := s.RunLoop()
//nazalog.Error(err) //nazalog.Error(err)
} }

@ -12,13 +12,13 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/q191201771/naza/pkg/bininfo"
"github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazahttp" "github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/nazalog"
) )
// TODO(chef): refactor 配置参数供外部传入
// TODO(chef): refactor maxTaskLen修改为能表示是阻塞任务的意思
var ( var (
maxTaskLen = 1024 maxTaskLen = 1024
notifyTimeoutSec = 3 notifyTimeoutSec = 3
@ -30,12 +30,14 @@ type PostTask struct {
} }
type HttpNotify struct { type HttpNotify struct {
cfg HttpNotifyConfig
taskQueue chan PostTask taskQueue chan PostTask
client *http.Client client *http.Client
} }
func NewHttpNotify() *HttpNotify { func NewHttpNotify(cfg HttpNotifyConfig) *HttpNotify {
httpNotify := &HttpNotify{ httpNotify := &HttpNotify{
cfg: cfg,
taskQueue: make(chan PostTask, maxTaskLen), taskQueue: make(chan PostTask, maxTaskLen),
client: &http.Client{ client: &http.Client{
Timeout: time.Duration(notifyTimeoutSec) * time.Second, Timeout: time.Duration(notifyTimeoutSec) * time.Second,
@ -48,42 +50,68 @@ func NewHttpNotify() *HttpNotify {
// TODO(chef): Dispose // TODO(chef): Dispose
// 注意这里的函数命名以On开头并不是因为是回调函数而是notify给业务方的接口叫做on_server_start // ---------------------------------------------------------------------------------------------------------------------
func (h *HttpNotify) OnServerStart() {
var info base.LalInfo func (h *HttpNotify) NotifyServerStart(info base.LalInfo) {
info.BinInfo = bininfo.StringifySingleLine() h.asyncPost(h.cfg.OnServerStart, info)
info.LalVersion = base.LalVersion }
info.ApiVersion = base.HttpApiVersion
info.NotifyVersion = base.HttpNotifyVersion func (h *HttpNotify) NotifyUpdate(info base.UpdateInfo) {
info.StartTime = serverStartTime h.asyncPost(h.cfg.OnUpdate, info)
info.ServerId = config.ServerId }
h.asyncPost(config.HttpNotifyConfig.OnServerStart, info)
func (h *HttpNotify) NotifyPubStart(info base.PubStartInfo) {
h.asyncPost(h.cfg.OnPubStart, info)
}
func (h *HttpNotify) NotifyPubStop(info base.PubStopInfo) {
h.asyncPost(h.cfg.OnPubStop, info)
}
func (h *HttpNotify) NotifySubStart(info base.SubStartInfo) {
h.asyncPost(h.cfg.OnSubStart, info)
}
func (h *HttpNotify) NotifySubStop(info base.SubStopInfo) {
h.asyncPost(h.cfg.OnSubStop, info)
}
func (h *HttpNotify) NotifyRtmpConnect(info base.RtmpConnectInfo) {
h.asyncPost(h.cfg.OnRtmpConnect, info)
}
// ----- implement INotifyHandler interface ----------------------------------------------------------------------------
func (h *HttpNotify) OnServerStart(info base.LalInfo) {
h.NotifyServerStart(info)
} }
func (h *HttpNotify) OnUpdate(info base.UpdateInfo) { func (h *HttpNotify) OnUpdate(info base.UpdateInfo) {
h.asyncPost(config.HttpNotifyConfig.OnUpdate, info) h.NotifyUpdate(info)
} }
func (h *HttpNotify) OnPubStart(info base.PubStartInfo) { func (h *HttpNotify) OnPubStart(info base.PubStartInfo) {
h.asyncPost(config.HttpNotifyConfig.OnPubStart, info) h.NotifyPubStart(info)
} }
func (h *HttpNotify) OnPubStop(info base.PubStopInfo) { func (h *HttpNotify) OnPubStop(info base.PubStopInfo) {
h.asyncPost(config.HttpNotifyConfig.OnPubStop, info) h.NotifyPubStop(info)
} }
func (h *HttpNotify) OnSubStart(info base.SubStartInfo) { func (h *HttpNotify) OnSubStart(info base.SubStartInfo) {
h.asyncPost(config.HttpNotifyConfig.OnSubStart, info) h.NotifySubStart(info)
} }
func (h *HttpNotify) OnSubStop(info base.SubStopInfo) { func (h *HttpNotify) OnSubStop(info base.SubStopInfo) {
h.asyncPost(config.HttpNotifyConfig.OnSubStop, info) h.NotifySubStop(info)
} }
func (h *HttpNotify) OnRtmpConnect(info base.RtmpConnectInfo) { func (h *HttpNotify) OnRtmpConnect(info base.RtmpConnectInfo) {
h.asyncPost(config.HttpNotifyConfig.OnRtmpConnect, info) h.NotifyRtmpConnect(info)
} }
// ---------------------------------------------------------------------------------------------------------------------
func (h *HttpNotify) RunLoop() { func (h *HttpNotify) RunLoop() {
for { for {
select { select {
@ -93,8 +121,10 @@ func (h *HttpNotify) RunLoop() {
} }
} }
// ---------------------------------------------------------------------------------------------------------------------
func (h *HttpNotify) asyncPost(url string, info interface{}) { func (h *HttpNotify) asyncPost(url string, info interface{}) {
if !config.HttpNotifyConfig.Enable || url == "" { if !h.cfg.Enable || url == "" {
return return
} }

@ -8,8 +8,80 @@
package logic package logic
import ( import "github.com/q191201771/lal/pkg/base"
"errors"
) type ILalServer interface {
RunLoop() error
Dispose()
// StatLalInfo StatXxx... CtrlXxx...
// 一些获取状态、发送控制命令的API
// 目的是方便业务方在不修改logic包内代码的前提下在外层实现一些特定逻辑的定制化开发
//
StatLalInfo() base.LalInfo
StatAllGroup() (sgs []base.StatGroup)
StatGroup(streamName string) *base.StatGroup
CtrlStartPull(info base.ApiCtrlStartPullReq)
CtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic
}
// NewLalServer 创建一个lal server
//
// @param confFile 配置文件地址
//
// @param modOption
// 可变参数,如果不关心,可以不填
// 目的是方便业务方在不修改logic包内代码的前提下在外层实现一些特定逻辑的定制化开发
// Option struct中可修改的参数说明
// - notifyHandler 事件监听
// 业务方可实现 INotifyHandler 接口并传入从而获取到对应的事件通知
// 注意如果业务方实现了自己的事件监听则lal server内部不再走http notify的逻辑
// 如果不填写内部默认走http notify的逻辑当然还需要在配置文件中开启http notify功能
//
func NewLalServer(confFile string, modOption ...ModOption) ILalServer {
return NewServerManager(confFile, modOption...)
}
// ---------------------------------------------------------------------------------------------------------------------
// INotifyHandler 事件通知接口
//
type INotifyHandler interface {
OnServerStart(info base.LalInfo)
OnUpdate(info base.UpdateInfo)
OnPubStart(info base.PubStartInfo)
OnPubStop(info base.PubStopInfo)
OnSubStart(info base.SubStartInfo)
OnSubStop(info base.SubStopInfo)
OnRtmpConnect(info base.RtmpConnectInfo)
}
type Option struct {
notifyHandler INotifyHandler
}
var defaultOption = Option{
notifyHandler: nil, // 注意为nil时内部会赋值为 HttpNotify
}
type ModOption func(option *Option)
// ---------------------------------------------------------------------------------------------------------------------
var ErrLogic = errors.New("lal.logic: fxxk") // 一些没有放入配置文件中,包级别的配置,暂时没有对外暴露
//
var (
relayPushTimeoutMs = 5000
relayPushWriteAvTimeoutMs = 5000
relayPullTimeoutMs = 5000
relayPullReadAvTimeoutMs = 5000
calcSessionStatIntervalSec uint32 = 5
// checkSessionAliveIntervalSec
//
// - 对于输入型session检查一定时间内是否没有收到数据
// - 对于输出型session检查一定时间内是否没有发送数据
// 注意这里既检查socket发送阻塞又检查上层没有给session喂数据
//
checkSessionAliveIntervalSec uint32 = 10
)

@ -1,62 +0,0 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"github.com/q191201771/lal/pkg/base"
)
type INotifyHandler interface {
OnServerStart()
OnUpdate(info base.UpdateInfo)
OnPubStart(info base.PubStartInfo)
OnPubStop(info base.PubStopInfo)
OnSubStart(info base.SubStartInfo)
OnSubStop(info base.SubStopInfo)
OnRtmpConnect(info base.RtmpConnectInfo)
}
type DefaultNotifyHandler struct {
httpNotify *HttpNotify
}
func NewDefaultNotifyHandler() *DefaultNotifyHandler {
NewHttpNotify()
return &DefaultNotifyHandler{
httpNotify: NewHttpNotify(),
}
}
func (d *DefaultNotifyHandler) OnServerStart() {
d.httpNotify.OnServerStart()
}
func (d *DefaultNotifyHandler) OnUpdate(info base.UpdateInfo) {
d.httpNotify.OnUpdate(info)
}
func (d *DefaultNotifyHandler) OnPubStart(info base.PubStartInfo) {
d.httpNotify.OnPubStart(info)
}
func (d *DefaultNotifyHandler) OnPubStop(info base.PubStopInfo) {
d.httpNotify.OnPubStop(info)
}
func (d *DefaultNotifyHandler) OnSubStart(info base.SubStartInfo) {
d.httpNotify.OnSubStart(info)
}
func (d *DefaultNotifyHandler) OnSubStop(info base.SubStopInfo) {
d.httpNotify.OnSubStop(info)
}
func (d *DefaultNotifyHandler) OnRtmpConnect(info base.RtmpConnectInfo) {
d.httpNotify.OnRtmpConnect(info)
}

@ -10,6 +10,11 @@ package logic
import ( import (
"fmt" "fmt"
"github.com/q191201771/naza/pkg/bininfo"
"github.com/q191201771/naza/pkg/defertaskthread"
"net/http"
"os"
"strings"
"sync" "sync"
"time" "time"
@ -24,9 +29,15 @@ import (
"github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/nazalog"
_ "net/http/pprof"
//"github.com/felixge/fgprof"
) )
type ServerManager struct { type ServerManager struct {
option Option
serverStartTime string
config *Config
httpServerManager *base.HttpServerManager httpServerManager *base.HttpServerManager
httpServerHandler *HttpServerHandler httpServerHandler *HttpServerHandler
hlsServerHandler *hls.ServerHandler hlsServerHandler *hls.ServerHandler
@ -40,34 +51,81 @@ type ServerManager struct {
groupManager IGroupManager groupManager IGroupManager
} }
func NewServerManager() *ServerManager { func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
m := &ServerManager{ sm := &ServerManager{
serverStartTime: time.Now().Format("2006-01-02 15:04:05.999"),
exitChan: make(chan struct{}), exitChan: make(chan struct{}),
groupManager: NewSimpleGroupManager(),
} }
sm.groupManager = NewSimpleGroupManager(sm)
sm.config = LoadConfAndInitLog(confFile)
// TODO(chef): refactor 启动信息可以考虑放入package base中所有的app都打印
dir, _ := os.Getwd()
nazalog.Infof("wd: %s", dir)
nazalog.Infof("args: %s", strings.Join(os.Args, " "))
nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine())
nazalog.Infof("version: %s", base.LalFullInfo)
nazalog.Infof("github: %s", base.LalGithubSite)
nazalog.Infof("doc: %s", base.LalDocSite)
nazalog.Infof("serverStartTime: %s", sm.serverStartTime)
if config.HttpflvConfig.Enable || config.HttpflvConfig.EnableHttps || if sm.config.HlsConfig.Enable && sm.config.HlsConfig.UseMemoryAsDiskFlag {
config.HttptsConfig.Enable || config.HttptsConfig.EnableHttps || nazalog.Infof("hls use memory as disk.")
config.HlsConfig.Enable || config.HlsConfig.EnableHttps { hls.SetUseMemoryAsDiskFlag(true)
m.httpServerManager = base.NewHttpServerManager()
m.httpServerHandler = NewHttpServerHandler(m)
m.hlsServerHandler = hls.NewServerHandler(config.HlsConfig.OutPath)
} }
if config.RtmpConfig.Enable { if sm.config.RecordConfig.EnableFlv {
m.rtmpServer = rtmp.NewServer(m, config.RtmpConfig.Addr) if err := os.MkdirAll(sm.config.RecordConfig.FlvOutPath, 0777); err != nil {
nazalog.Errorf("record flv mkdir error. path=%s, err=%+v", sm.config.RecordConfig.FlvOutPath, err)
}
if err := os.MkdirAll(sm.config.RecordConfig.MpegtsOutPath, 0777); err != nil {
nazalog.Errorf("record mpegts mkdir error. path=%s, err=%+v", sm.config.RecordConfig.MpegtsOutPath, err)
}
} }
if config.RtspConfig.Enable {
m.rtspServer = rtsp.NewServer(config.RtspConfig.Addr, m) sm.option = defaultOption
for _, fn := range modOption {
fn(&sm.option)
} }
if config.HttpApiConfig.Enable { if sm.option.notifyHandler == nil {
m.httpApiServer = NewHttpApiServer(config.HttpApiConfig.Addr, m) sm.option.notifyHandler = NewHttpNotify(sm.config.HttpNotifyConfig)
} }
return m
if sm.config.HttpflvConfig.Enable || sm.config.HttpflvConfig.EnableHttps ||
sm.config.HttptsConfig.Enable || sm.config.HttptsConfig.EnableHttps ||
sm.config.HlsConfig.Enable || sm.config.HlsConfig.EnableHttps {
sm.httpServerManager = base.NewHttpServerManager()
sm.httpServerHandler = NewHttpServerHandler(sm)
sm.hlsServerHandler = hls.NewServerHandler(sm.config.HlsConfig.OutPath)
} }
if sm.config.RtmpConfig.Enable {
// TODO(chef): refactor 参数顺序统一。Observer都放最后好一些。比如rtmp和rtsp的NewServer
sm.rtmpServer = rtmp.NewServer(sm, sm.config.RtmpConfig.Addr)
}
if sm.config.RtspConfig.Enable {
sm.rtspServer = rtsp.NewServer(sm.config.RtspConfig.Addr, sm)
}
if sm.config.HttpApiConfig.Enable {
sm.httpApiServer = NewHttpApiServer(sm.config.HttpApiConfig.Addr, sm)
}
return sm
}
// ----- implement ILalServer interface --------------------------------------------------------------------------------
func (sm *ServerManager) RunLoop() error { func (sm *ServerManager) RunLoop() error {
notifyHandler.OnServerStart() sm.option.notifyHandler.OnServerStart(sm.StatLalInfo())
if sm.config.PprofConfig.Enable {
go runWebPprof(sm.config.PprofConfig.Addr)
}
go base.RunSignalHandler(func() {
sm.Dispose()
})
var addMux = func(config CommonHttpServerConfig, handler base.Handler, name string) error { var addMux = func(config CommonHttpServerConfig, handler base.Handler, name string) error {
if config.Enable { if config.Enable {
@ -97,13 +155,13 @@ func (sm *ServerManager) RunLoop() error {
return nil return nil
} }
if err := addMux(config.HttpflvConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpflv"); err != nil { if err := addMux(sm.config.HttpflvConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpflv"); err != nil {
return err return err
} }
if err := addMux(config.HttptsConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpts"); err != nil { if err := addMux(sm.config.HttptsConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpts"); err != nil {
return err return err
} }
if err := addMux(config.HlsConfig.CommonHttpServerConfig, sm.hlsServerHandler.ServeHTTP, "hls"); err != nil { if err := addMux(sm.config.HlsConfig.CommonHttpServerConfig, sm.hlsServerHandler.ServeHTTP, "hls"); err != nil {
return err return err
} }
@ -142,17 +200,17 @@ func (sm *ServerManager) RunLoop() error {
return err return err
} }
go func() { go func() {
if err := sm.httpApiServer.Runloop(); err != nil { if err := sm.httpApiServer.RunLoop(); err != nil {
nazalog.Error(err) nazalog.Error(err)
} }
}() }()
} }
uis := uint32(config.HttpNotifyConfig.UpdateIntervalSec) uis := uint32(sm.config.HttpNotifyConfig.UpdateIntervalSec)
var updateInfo base.UpdateInfo var updateInfo base.UpdateInfo
updateInfo.ServerId = config.ServerId updateInfo.ServerId = sm.config.ServerId
updateInfo.Groups = sm.statAllGroup() updateInfo.Groups = sm.StatAllGroup()
notifyHandler.OnUpdate(updateInfo) sm.option.notifyHandler.OnUpdate(updateInfo)
t := time.NewTicker(1 * time.Second) t := time.NewTicker(1 * time.Second)
defer t.Stop() defer t.Stop()
@ -194,9 +252,9 @@ func (sm *ServerManager) RunLoop() error {
// 定时通过http notify发送group相关的信息 // 定时通过http notify发送group相关的信息
if uis != 0 && (count%uis) == 0 { if uis != 0 && (count%uis) == 0 {
updateInfo.ServerId = config.ServerId updateInfo.ServerId = sm.config.ServerId
updateInfo.Groups = sm.statAllGroup() updateInfo.Groups = sm.StatAllGroup()
notifyHandler.OnUpdate(updateInfo) sm.option.notifyHandler.OnUpdate(updateInfo)
} }
} }
} }
@ -226,22 +284,85 @@ func (sm *ServerManager) Dispose() {
sm.exitChan <- struct{}{} sm.exitChan <- struct{}{}
} }
func (sm *ServerManager) GetGroup(appName string, streamName string) *Group { func (sm *ServerManager) StatLalInfo() base.LalInfo {
var lalInfo base.LalInfo
lalInfo.BinInfo = bininfo.StringifySingleLine()
lalInfo.LalVersion = base.LalVersion
lalInfo.ApiVersion = base.HttpApiVersion
lalInfo.NotifyVersion = base.HttpNotifyVersion
lalInfo.StartTime = sm.serverStartTime
lalInfo.ServerId = sm.config.ServerId
return lalInfo
}
func (sm *ServerManager) StatAllGroup() (sgs []base.StatGroup) {
sm.mutex.Lock() sm.mutex.Lock()
defer sm.mutex.Unlock() defer sm.mutex.Unlock()
return sm.getGroup(appName, streamName) sm.groupManager.Iterate(func(group *Group) bool {
sgs = append(sgs, group.GetStat())
return true
})
return
} }
// --------------------------------------------------------------------------------------------------------------------- func (sm *ServerManager) StatGroup(streamName string) *base.StatGroup {
// rtmp.ServerObserver interface sm.mutex.Lock()
// --------------------------------------------------------------------------------------------------------------------- defer sm.mutex.Unlock()
g := sm.getGroup("", streamName)
if g == nil {
return nil
}
// copy
var ret base.StatGroup
ret = g.GetStat()
return &ret
}
func (sm *ServerManager) CtrlStartPull(info base.ApiCtrlStartPullReq) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup(info.AppName, info.StreamName)
if g == nil {
nazalog.Warnf("group not exist, ignore start pull. streamName=%s", info.StreamName)
return
}
var url string
if info.UrlParam != "" {
url = fmt.Sprintf("rtmp://%s/%s/%s?%s", info.Addr, info.AppName, info.StreamName, info.UrlParam)
} else {
url = fmt.Sprintf("rtmp://%s/%s/%s", info.Addr, info.AppName, info.StreamName)
}
g.StartPull(url)
}
func (sm *ServerManager) CtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("", info.StreamName)
if g == nil {
return base.HttpResponseBasic{
ErrorCode: base.ErrorCodeGroupNotFound,
Desp: base.DespGroupNotFound,
}
}
if !g.KickOutSession(info.SessionId) {
return base.HttpResponseBasic{
ErrorCode: base.ErrorCodeSessionNotFound,
Desp: base.DespSessionNotFound,
}
}
return base.HttpResponseBasic{
ErrorCode: base.ErrorCodeSucc,
Desp: base.DespSucc,
}
}
// ----- implement rtmp.ServerObserver interface -----------------------------------------------------------------------
func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.ObjectPairArray) { func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.ObjectPairArray) {
sm.mutex.Lock() sm.mutex.Lock()
defer sm.mutex.Unlock() defer sm.mutex.Unlock()
var info base.RtmpConnectInfo var info base.RtmpConnectInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.SessionId = session.UniqueKey() info.SessionId = session.UniqueKey()
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
if app, err := opa.FindString("app"); err == nil { if app, err := opa.FindString("app"); err == nil {
@ -253,7 +374,7 @@ func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.Obj
if tcUrl, err := opa.FindString("tcUrl"); err == nil { if tcUrl, err := opa.FindString("tcUrl"); err == nil {
info.TcUrl = tcUrl info.TcUrl = tcUrl
} }
notifyHandler.OnRtmpConnect(info) sm.option.notifyHandler.OnRtmpConnect(info)
} }
func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool { func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool {
@ -265,7 +386,7 @@ func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool {
// TODO chef: res值为false时可以考虑不回调 // TODO chef: res值为false时可以考虑不回调
// TODO chef: 每次赋值都逐个拼代码冗余考虑直接用ISession抽离一下代码 // TODO chef: 每次赋值都逐个拼代码冗余考虑直接用ISession抽离一下代码
var info base.PubStartInfo var info base.PubStartInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtmp info.Protocol = base.ProtocolRtmp
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -275,7 +396,7 @@ func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool {
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnPubStart(info) sm.option.notifyHandler.OnPubStart(info)
return res return res
} }
@ -290,7 +411,7 @@ func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) {
group.DelRtmpPubSession(session) group.DelRtmpPubSession(session)
var info base.PubStopInfo var info base.PubStopInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtmp info.Protocol = base.ProtocolRtmp
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -300,7 +421,7 @@ func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) {
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnPubStop(info) sm.option.notifyHandler.OnPubStop(info)
} }
func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool { func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool {
@ -310,7 +431,7 @@ func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool {
group.AddRtmpSubSession(session) group.AddRtmpSubSession(session)
var info base.SubStartInfo var info base.SubStartInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtmp info.Protocol = base.ProtocolRtmp
info.Protocol = session.Url() info.Protocol = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -320,7 +441,7 @@ func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool {
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnSubStart(info) sm.option.notifyHandler.OnSubStart(info)
return true return true
} }
@ -336,7 +457,7 @@ func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) {
group.DelRtmpSubSession(session) group.DelRtmpSubSession(session)
var info base.SubStopInfo var info base.SubStopInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtmp info.Protocol = base.ProtocolRtmp
info.AppName = session.AppName() info.AppName = session.AppName()
info.StreamName = session.StreamName() info.StreamName = session.StreamName()
@ -345,12 +466,10 @@ func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) {
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnSubStop(info) sm.option.notifyHandler.OnSubStop(info)
} }
// --------------------------------------------------------------------------------------------------------------------- // ----- implement HttpServerHandlerObserver interface -----------------------------------------------------------------
// httpflv.ServerObserver interface
// ---------------------------------------------------------------------------------------------------------------------
func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) bool { func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) bool {
sm.mutex.Lock() sm.mutex.Lock()
@ -359,7 +478,7 @@ func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) boo
group.AddHttpflvSubSession(session) group.AddHttpflvSubSession(session)
var info base.SubStartInfo var info base.SubStartInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolHttpflv info.Protocol = base.ProtocolHttpflv
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -369,7 +488,7 @@ func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) boo
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnSubStart(info) sm.option.notifyHandler.OnSubStart(info)
return true return true
} }
@ -384,7 +503,7 @@ func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) {
group.DelHttpflvSubSession(session) group.DelHttpflvSubSession(session)
var info base.SubStopInfo var info base.SubStopInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolHttpflv info.Protocol = base.ProtocolHttpflv
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -394,13 +513,9 @@ func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) {
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnSubStop(info) sm.option.notifyHandler.OnSubStop(info)
} }
// ---------------------------------------------------------------------------------------------------------------------
// httpts.ServerObserver interface
// ---------------------------------------------------------------------------------------------------------------------
func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool { func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool {
sm.mutex.Lock() sm.mutex.Lock()
defer sm.mutex.Unlock() defer sm.mutex.Unlock()
@ -408,7 +523,7 @@ func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool
group.AddHttptsSubSession(session) group.AddHttptsSubSession(session)
var info base.SubStartInfo var info base.SubStartInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolHttpts info.Protocol = base.ProtocolHttpts
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -418,7 +533,7 @@ func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnSubStart(info) sm.option.notifyHandler.OnSubStart(info)
return true return true
} }
@ -433,7 +548,7 @@ func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) {
group.DelHttptsSubSession(session) group.DelHttptsSubSession(session)
var info base.SubStopInfo var info base.SubStopInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolHttpts info.Protocol = base.ProtocolHttpts
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -443,12 +558,10 @@ func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) {
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnSubStop(info) sm.option.notifyHandler.OnSubStop(info)
} }
// --------------------------------------------------------------------------------------------------------------------- // ----- implement rtsp.ServerObserver interface -----------------------------------------------------------------------
// rtsp.ServerObserver interface
// ---------------------------------------------------------------------------------------------------------------------
func (sm *ServerManager) OnNewRtspSessionConnect(session *rtsp.ServerCommandSession) { func (sm *ServerManager) OnNewRtspSessionConnect(session *rtsp.ServerCommandSession) {
// TODO chef: impl me // TODO chef: impl me
@ -465,7 +578,7 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool {
res := group.AddRtspPubSession(session) res := group.AddRtspPubSession(session)
var info base.PubStartInfo var info base.PubStartInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtsp info.Protocol = base.ProtocolRtsp
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -475,7 +588,7 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool {
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnPubStart(info) sm.option.notifyHandler.OnPubStart(info)
return res return res
} }
@ -492,7 +605,7 @@ func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) {
group.DelRtspPubSession(session) group.DelRtspPubSession(session)
var info base.PubStopInfo var info base.PubStopInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtsp info.Protocol = base.ProtocolRtsp
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -502,7 +615,7 @@ func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) {
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnPubStop(info) sm.option.notifyHandler.OnPubStop(info)
} }
func (sm *ServerManager) OnNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) { func (sm *ServerManager) OnNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) {
@ -520,7 +633,7 @@ func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) bool
res := group.HandleNewRtspSubSessionPlay(session) res := group.HandleNewRtspSubSessionPlay(session)
var info base.SubStartInfo var info base.SubStartInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtsp info.Protocol = base.ProtocolRtsp
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -530,7 +643,7 @@ func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) bool
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnSubStart(info) sm.option.notifyHandler.OnSubStart(info)
return res return res
} }
@ -547,7 +660,7 @@ func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) {
group.DelRtspSubSession(session) group.DelRtspSubSession(session)
var info base.SubStopInfo var info base.SubStopInfo
info.ServerId = config.ServerId info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtsp info.Protocol = base.ProtocolRtsp
info.Url = session.Url() info.Url = session.Url()
info.AppName = session.AppName() info.AppName = session.AppName()
@ -557,81 +670,60 @@ func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) {
info.RemoteAddr = session.GetStat().RemoteAddr info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession() info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession() info.HasOutSession = group.HasOutSession()
notifyHandler.OnSubStop(info) sm.option.notifyHandler.OnSubStop(info)
} }
// --------------------------------------------------------------------------------------------------------------------- // ----- implement IGroupCreator interface -----------------------------------------------------------------------------
// HttpApiServerObserver interface
// ---------------------------------------------------------------------------------------------------------------------
func (sm *ServerManager) OnStatAllGroup() (sgs []base.StatGroup) { func (sm *ServerManager) CreateGroup(appName string, streamName string) *Group {
return sm.statAllGroup() return NewGroup(appName, streamName, sm.config, sm)
} }
func (sm *ServerManager) OnStatGroup(streamName string) *base.StatGroup { // ----- implement GroupObserver interface -----------------------------------------------------------------------------
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("", streamName)
if g == nil {
return nil
}
// copy
var ret base.StatGroup
ret = g.GetStat()
return &ret
}
func (sm *ServerManager) OnCtrlStartPull(info base.ApiCtrlStartPullReq) { func (sm *ServerManager) CleanupHlsIfNeeded(appName string, streamName string, path string) {
sm.mutex.Lock() if sm.config.HlsConfig.Enable &&
defer sm.mutex.Unlock() (sm.config.HlsConfig.CleanupMode == hls.CleanupModeInTheEnd || sm.config.HlsConfig.CleanupMode == hls.CleanupModeAsap) {
g := sm.getGroup(info.AppName, info.StreamName) defertaskthread.Go(
if g == nil { sm.config.HlsConfig.FragmentDurationMs*sm.config.HlsConfig.FragmentNum*2,
nazalog.Warnf("group not exist, ignore start pull. streamName=%s", info.StreamName) func(param ...interface{}) {
appName := param[0].(string)
streamName := param[1].(string)
outPath := param[2].(string)
if g := sm.GetGroup(appName, streamName); g != nil {
if g.IsHlsMuxerAlive() {
nazalog.Warnf("cancel cleanup hls file path since hls muxer still alive. streamName=%s", streamName)
return return
} }
var url string
if info.UrlParam != "" {
url = fmt.Sprintf("rtmp://%s/%s/%s?%s", info.Addr, info.AppName, info.StreamName, info.UrlParam)
} else {
url = fmt.Sprintf("rtmp://%s/%s/%s", info.Addr, info.AppName, info.StreamName)
}
g.StartPull(url)
} }
func (sm *ServerManager) OnCtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic { nazalog.Infof("cleanup hls file path. streamName=%s, path=%s", streamName, outPath)
sm.mutex.Lock() if err := hls.RemoveAll(outPath); err != nil {
defer sm.mutex.Unlock() nazalog.Warnf("cleanup hls file path error. path=%s, err=%+v", outPath, err)
g := sm.getGroup("", info.StreamName)
if g == nil {
return base.HttpResponseBasic{
ErrorCode: base.ErrorCodeGroupNotFound,
Desp: base.DespGroupNotFound,
}
}
if !g.KickOutSession(info.SessionId) {
return base.HttpResponseBasic{
ErrorCode: base.ErrorCodeSessionNotFound,
Desp: base.DespSessionNotFound,
} }
} },
return base.HttpResponseBasic{ appName,
ErrorCode: base.ErrorCodeSucc, streamName,
Desp: base.DespSucc, path,
)
} }
} }
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
func (sm *ServerManager) statAllGroup() (sgs []base.StatGroup) { func (sm *ServerManager) Config() *Config {
return sm.config
}
func (sm *ServerManager) GetGroup(appName string, streamName string) *Group {
sm.mutex.Lock() sm.mutex.Lock()
defer sm.mutex.Unlock() defer sm.mutex.Unlock()
sm.groupManager.Iterate(func(group *Group) bool { return sm.getGroup(appName, streamName)
sgs = append(sgs, group.GetStat())
return true
})
return
} }
// ----- private method ------------------------------------------------------------------------------------------------
// 注意,函数内部不加锁,由调用方保证加锁进入 // 注意,函数内部不加锁,由调用方保证加锁进入
func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group { func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group {
g, createFlag := sm.groupManager.GetOrCreateGroup(appName, streamName) g, createFlag := sm.groupManager.GetOrCreateGroup(appName, streamName)
@ -644,3 +736,17 @@ func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Gr
func (sm *ServerManager) getGroup(appName string, streamName string) *Group { func (sm *ServerManager) getGroup(appName string, streamName string) *Group {
return sm.groupManager.GetGroup(appName, streamName) return sm.groupManager.GetGroup(appName, streamName)
} }
// ---------------------------------------------------------------------------------------------------------------------
func runWebPprof(addr string) {
nazalog.Infof("start web pprof listen. addr=%s", addr)
//nazalog.Warn("start fgprof.")
//http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
if err := http.ListenAndServe(addr, nil); err != nil {
nazalog.Error(err)
return
}
}

@ -1,26 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
//var relayPushCheckIntervalMS = 1000
var relayPushTimeoutMs = 5000
var relayPushWriteAvTimeoutMs = 5000
var relayPullTimeoutMs = 5000
var relayPullReadAvTimeoutMs = 5000
var calcSessionStatIntervalSec uint32 = 5
// 对于输入型session检查一定时间内是否没有收到数据
//
// 对于输出型session检查一定时间内是否没有发送数据
// 注意这里既检查socket发送阻塞又检查上层没有给session喂数据
var checkSessionAliveIntervalSec uint32 = 10
var notifyHandler INotifyHandler = NewDefaultNotifyHandler()
Loading…
Cancel
Save