From e3578157f43c4aa39790e5e5b72a737eab87c1ec Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sun, 24 Oct 2021 11:03:01 +0800 Subject: [PATCH] mod messages: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [perf] rtmp合并发送功能使用writev实现 - [refactor] 可定制性: logic: 抽象出ILalServer接口;业务方可在自身逻辑中创建server,选择是否获取notify通知,以及使用api控制server - [refactor] 兼容性: 再增加一个配置文件默认搜索地址 - [refactor] 可读性: logic: ServerManager和Config不再作为全局变量使用;去除entry.go中间层;iface_impl.go移入innertest中;signal_xxx.go移入base中 --- app/lalserver/main.go | 10 +- pkg/base/bufwriter.go | 116 -------- pkg/base/bufwriter_test.go | 63 ----- pkg/base/merge_writer.go | 80 ++++++ pkg/base/merge_writer_test.go | 55 ++++ pkg/{logic => base}/signal_unix.go | 8 +- pkg/{logic => base}/signal_windows.go | 2 +- pkg/{logic => innertest}/iface_impl.go | 33 ++- pkg/innertest/innertest.go | 8 +- pkg/logic/config.go | 2 + pkg/logic/entry.go | 89 ------ pkg/logic/group.go | 104 +++---- pkg/logic/group_manager.go | 71 ++--- pkg/logic/group_manager_test.go | 21 +- pkg/logic/http_api.go | 111 ++++---- pkg/logic/http_api_test.go | 2 +- pkg/logic/http_notify.go | 70 +++-- pkg/logic/logic.go | 80 +++++- pkg/logic/notify.go | 62 ----- pkg/logic/server_manager.go | 364 ++++++++++++++++--------- pkg/logic/var.go | 26 -- 21 files changed, 684 insertions(+), 693 deletions(-) delete mode 100644 pkg/base/bufwriter.go delete mode 100644 pkg/base/bufwriter_test.go create mode 100644 pkg/base/merge_writer.go create mode 100644 pkg/base/merge_writer_test.go rename pkg/{logic => base}/signal_unix.go (75%) rename pkg/{logic => base}/signal_windows.go (88%) rename pkg/{logic => innertest}/iface_impl.go (88%) delete mode 100644 pkg/logic/entry.go delete mode 100644 pkg/logic/notify.go delete mode 100644 pkg/logic/var.go diff --git a/app/lalserver/main.go b/app/lalserver/main.go index fe7d20f..c05f038 100644 --- a/app/lalserver/main.go +++ b/app/lalserver/main.go @@ -22,14 +22,13 @@ import ( "github.com/q191201771/naza/pkg/bininfo" ) -var sm *logic.ServerManager - func main() { defer nazalog.Sync() confFile := parseFlag() - logic.Init(confFile) - logic.RunLoop() + sm := logic.NewLalServer(confFile) + err := sm.RunLoop() + nazalog.Infof("server manager done. err=%+v", err) } func parseFlag() string { @@ -57,6 +56,7 @@ func parseFlag() string { filepath.FromSlash("../lalserver.conf.json"), filepath.FromSlash("../../lalserver.conf.json"), filepath.FromSlash("../../conf/lalserver.conf.json"), + filepath.FromSlash("lal/conf/lalserver.conf.json"), } for _, dcf := range defaultConfigFileList { fi, err := os.Stat(dcf) @@ -68,7 +68,7 @@ func parseFlag() string { } } - // 默认位置都没有,退出程序 + // 所有默认位置都找不到配置文件,退出程序 flag.Usage() _, _ = fmt.Fprintf(os.Stderr, ` Example: diff --git a/pkg/base/bufwriter.go b/pkg/base/bufwriter.go deleted file mode 100644 index 5c8470d..0000000 --- a/pkg/base/bufwriter.go +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2021, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package base - -// 调用方场景/约定描述: -// - 内部实际写总是全量写成功 -// - 外部写调用的内存块,永久有效,调用结束后,外部并不使用 -// 内部实际写的内存块,调用结束后,实际写的实现可能还继续持有该内存块 -// -// 与bufio.Writer表现不同的地方: -// 数据超过缓存容量时,bufio.Writer一般会切分成多个缓存容量大小的块实际写,BufWriter则可能实际写大数据 - -type IBufWriter interface { - Write(p []byte) - Flush() -} - -type WriterFunc func(p []byte) - -func NewWriterFuncSize(wr WriterFunc, size int) IBufWriter { - if size <= 0 { - return &directWriter{ - wr: wr, - } - } - - b := &bufWriter{ - wr: wr, - defaultSize: size, - } - b.mallocOnePiece(size) - return b -} - -type bufWriter struct { - wr WriterFunc - defaultSize int // 缓存最大容量 - buf []byte - n int // 当前已缓存大小 -} - -type directWriter struct { - wr WriterFunc -} - -func (b *bufWriter) Write(p []byte) { - avail := b.available() - if len(p) > avail { - // 缓存剩余空间不够 - - if b.n == 0 { - // 缓存完全没有使用,依然空间不够 - // 直接发送 - b.wr(p) - return - } - - // 填满当前缓存块,并发送 - b.append(p[:avail]) - b.Flush() - - // 剩余数据 - remain := len(p) - avail - if remain < b.defaultSize { - // 剩余数据较小 - // 只追加进入缓存 - b.append(p[avail:]) - } else { - // 剩余数据较大 - // 直接发送 - b.wr(p[avail:]) - } - return - } - - // 缓存剩余空间足够 - // 追加进入缓存 - b.append(p) -} - -func (b *bufWriter) Flush() { - if b.n == 0 { - return - } - b.wr(b.buf[:b.n]) - b.mallocOnePiece(b.defaultSize) -} - -// 缓存剩余空间 -func (b *bufWriter) available() int { - return len(b.buf) - b.n -} - -func (b *bufWriter) mallocOnePiece(size int) { - b.buf = make([]byte, size) - b.n = 0 -} - -func (b *bufWriter) append(p []byte) { - copy(b.buf[b.n:], p) - b.n += len(p) -} - -func (dw *directWriter) Write(p []byte) { - dw.wr(p) -} - -func (dw *directWriter) Flush() { - // noop -} diff --git a/pkg/base/bufwriter_test.go b/pkg/base/bufwriter_test.go deleted file mode 100644 index b95328e..0000000 --- a/pkg/base/bufwriter_test.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2021, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package base - -import ( - "bytes" - "testing" - - "github.com/q191201771/naza/pkg/assert" -) - -func TestBufWriter(t *testing.T) { - var buf bytes.Buffer - w := NewWriterFuncSize(func(p []byte) { - _, _ = buf.Write(p) - }, 4096) - wb, _ := w.(*bufWriter) - - w.Write(bytes.Repeat([]byte{0x1}, 5000)) - assert.Equal(t, 4096-0, wb.available()) - assert.Equal(t, bytes.Repeat([]byte{0x1}, 5000), buf.Bytes()) - buf.Reset() - - w.Write(bytes.Repeat([]byte{0x2}, 1024)) - assert.Equal(t, 4096-1024, wb.available()) - assert.Equal(t, 0, buf.Len()) - w.Write(bytes.Repeat([]byte{0x3}, 1024)) - assert.Equal(t, 4096-2048, wb.available()) - assert.Equal(t, 0, buf.Len()) - - w.Write(bytes.Repeat([]byte{0x4}, 4096)) - assert.Equal(t, 4096-2048, wb.available()) - assert.Equal(t, 4096, buf.Len()) - assert.Equal(t, bytes.Repeat([]byte{0x2}, 1024), buf.Bytes()[:1024]) - assert.Equal(t, bytes.Repeat([]byte{0x3}, 1024), buf.Bytes()[1024:2048]) - assert.Equal(t, bytes.Repeat([]byte{0x4}, 2048), buf.Bytes()[2048:]) - buf.Reset() - - w.Write(bytes.Repeat([]byte{0x5}, 8192)) - assert.Equal(t, 4096-0, wb.available()) - assert.Equal(t, 2048+8192, buf.Len()) - assert.Equal(t, bytes.Repeat([]byte{0x4}, 2048), buf.Bytes()[:2048]) - assert.Equal(t, bytes.Repeat([]byte{0x5}, 8192), buf.Bytes()[2048:]) - buf.Reset() - - w.Flush() - assert.Equal(t, 4096-0, wb.available()) - assert.Equal(t, 0, buf.Len()) - - w.Write(bytes.Repeat([]byte{0x6}, 1024)) - assert.Equal(t, 4096-1024, wb.available()) - assert.Equal(t, 0, buf.Len()) - w.Flush() - assert.Equal(t, 4096-0, wb.available()) - assert.Equal(t, 1024, buf.Len()) - assert.Equal(t, bytes.Repeat([]byte{0x6}, 1024), buf.Bytes()[:1024]) -} diff --git a/pkg/base/merge_writer.go b/pkg/base/merge_writer.go new file mode 100644 index 0000000..5b9c13c --- /dev/null +++ b/pkg/base/merge_writer.go @@ -0,0 +1,80 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package base + +import ( + "github.com/q191201771/naza/pkg/nazalog" + "net" +) + +// TODO(chef): feat 通过时间戳(目前是数据大小)来设定合并阈值 + +// MergeWriter 合并多个内存块,达到阈值后一次性将内存块数组返回给上层 +// +// 注意,输入时的单个内存块,回调时不会出现拆分切割的情况 +// +type MergeWriter struct { + onWritev OnWritev + size int + + currSize int + bs net.Buffers // TODO(chef): perf 复用外层切片 +} + +type OnWritev func(bs net.Buffers) + +// NewMergeWriter +// +// @param onWritev 回调缓存的1~n个内存块 +// @param size 回调阈值 +// +func NewMergeWriter(onWritev OnWritev, size int) *MergeWriter { + return &MergeWriter{ + onWritev: onWritev, + size: size, + } +} + +// Write +// +// 注意,函数调用结束后,`b`内存块会被内部持有 +// +func (w *MergeWriter) Write(b []byte) { + nazalog.Debugf("[%p] MergeWriter::Write. len=%d", w, len(b)) + w.bs = append(w.bs, b) + w.currSize += len(b) + if w.currSize >= w.size { + w.flush() + } +} + +// Flush 强制将内部缓冲的数据全部回调排空 +// +func (w *MergeWriter) Flush() { + nazalog.Debugf("[%p] MergeWriter::Flush.", w) + if w.currSize > 0 { + w.flush() + } +} + +// flush 将内部缓冲的数据全部回调排空 +// +func (w *MergeWriter) flush() { + // only for debug log + var n int + var ns []int + for _, v := range w.bs { + n += len(v) + ns = append(ns, len(v)) + } + nazalog.Debugf("[%p] MergeWriter::flush. len=%d(%v)", w, n, ns) + w.onWritev(w.bs) + w.currSize = 0 + w.bs = nil +} diff --git a/pkg/base/merge_writer_test.go b/pkg/base/merge_writer_test.go new file mode 100644 index 0000000..ad57ea3 --- /dev/null +++ b/pkg/base/merge_writer_test.go @@ -0,0 +1,55 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package base + +import ( + "bytes" + "github.com/q191201771/naza/pkg/assert" + "net" + "testing" +) + +func TestMergeWriter(t *testing.T) { + goldenBuf1 := bytes.Repeat([]byte{'a'}, 8192) + goldenBuf2 := bytes.Repeat([]byte{'b'}, 8192) + + var cbBuf net.Buffers + w := NewMergeWriter(func(bs net.Buffers) { + cbBuf = bs + }, 4096) + + // 直接超过 + w.Write(goldenBuf1) + assert.Equal(t, 1, len(cbBuf)) + assert.Equal(t, goldenBuf1, cbBuf[0]) + cbBuf = nil + + // 不超过 + w.Write(goldenBuf1[:1024]) + assert.Equal(t, nil, cbBuf) + + // 多次不超过 + w.Write(goldenBuf2[:2048]) + assert.Equal(t, nil, cbBuf) + + // 多次超过 + w.Write(goldenBuf1[:2048]) + assert.Equal(t, 3, len(cbBuf)) + assert.Equal(t, goldenBuf1[:1024], cbBuf[0]) + assert.Equal(t, goldenBuf2[:2048], cbBuf[1]) + assert.Equal(t, goldenBuf1[:2048], cbBuf[2]) + cbBuf = nil + + // 不超过,强制刷新 + w.Write(goldenBuf1[:1024]) + assert.Equal(t, nil, cbBuf) + w.Flush() + assert.Equal(t, 1, len(cbBuf)) + assert.Equal(t, goldenBuf1[:1024], cbBuf[0]) +} diff --git a/pkg/logic/signal_unix.go b/pkg/base/signal_unix.go similarity index 75% rename from pkg/logic/signal_unix.go rename to pkg/base/signal_unix.go index 5c44df4..712fe6c 100644 --- a/pkg/logic/signal_unix.go +++ b/pkg/base/signal_unix.go @@ -8,7 +8,7 @@ // +build linux darwin netbsd freebsd openbsd dragonfly -package logic +package base import ( "os" @@ -18,7 +18,11 @@ import ( log "github.com/q191201771/naza/pkg/nazalog" ) -func runSignalHandler(cb func()) { +// RunSignalHandler 监听SIGUSR1和SIGUSR2信号并回调 +// +// TODO(chef): refactor 函数名应与SIGUSR1挂钩 +// +func RunSignalHandler(cb func()) { c := make(chan os.Signal) signal.Notify(c, syscall.SIGUSR1, syscall.SIGUSR2) s := <-c diff --git a/pkg/logic/signal_windows.go b/pkg/base/signal_windows.go similarity index 88% rename from pkg/logic/signal_windows.go rename to pkg/base/signal_windows.go index 4fad27b..97dc83a 100644 --- a/pkg/logic/signal_windows.go +++ b/pkg/base/signal_windows.go @@ -10,6 +10,6 @@ package logic -func runSignalHandler(cb func()) { +func RunSignalHandler(cb func()) { } diff --git a/pkg/logic/iface_impl.go b/pkg/innertest/iface_impl.go similarity index 88% rename from pkg/logic/iface_impl.go rename to pkg/innertest/iface_impl.go index 2c24762..e68cc3c 100644 --- a/pkg/logic/iface_impl.go +++ b/pkg/innertest/iface_impl.go @@ -6,18 +6,21 @@ // // Author: Chef (191201771@qq.com) -package logic +package innertest import ( "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/hls" "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/httpts" + "github.com/q191201771/lal/pkg/logic" "github.com/q191201771/lal/pkg/remux" "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/lal/pkg/rtsp" ) +// TODO(chef): refactor 有的interface以I开头,有的不是 + // TODO(chef): 整理所有Server类型Session的生命周期管理 // - // - rtmp没有独立的Pub、Sub Session结构体类型,而是直接使用ServerSession @@ -150,19 +153,24 @@ var ( // --------------------------------------------------------------------------------------------------------------------- -var _ rtmp.ServerObserver = &ServerManager{} -var _ rtsp.ServerObserver = &ServerManager{} -var _ HttpServerHandlerObserver = &ServerManager{} +var _ logic.ILalServer = &logic.ServerManager{} +var _ rtmp.ServerObserver = &logic.ServerManager{} +var _ logic.HttpServerHandlerObserver = &logic.ServerManager{} +var _ rtsp.ServerObserver = &logic.ServerManager{} +var _ logic.IGroupCreator = &logic.ServerManager{} +var _ logic.GroupObserver = &logic.ServerManager{} -var _ HttpApiServerObserver = &ServerManager{} +var _ logic.INotifyHandler = &logic.HttpNotify{} +var _ logic.IGroupManager = &logic.SimpleGroupManager{} +var _ logic.IGroupManager = &logic.ComplexGroupManager{} -var _ rtmp.PubSessionObserver = &Group{} // -var _ rtsp.PullSessionObserver = &Group{} +var _ rtmp.PubSessionObserver = &logic.Group{} // +var _ rtsp.PullSessionObserver = &logic.Group{} var _ rtsp.PullSessionObserver = &remux.AvPacket2RtmpRemuxer{} -var _ rtsp.PubSessionObserver = &Group{} +var _ rtsp.PubSessionObserver = &logic.Group{} var _ rtsp.PubSessionObserver = &remux.AvPacket2RtmpRemuxer{} -var _ hls.MuxerObserver = &Group{} -var _ rtsp.BaseInSessionObserver = &Group{} // +var _ hls.MuxerObserver = &logic.Group{} +var _ rtsp.BaseInSessionObserver = &logic.Group{} // var _ rtsp.BaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{} var _ rtmp.ServerSessionObserver = &rtmp.Server{} @@ -180,8 +188,3 @@ var _ rtsp.IInterleavedPacketWriter = &rtsp.ClientCommandSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.ServerCommandSession{} var _ hls.StreamerObserver = &hls.Muxer{} - -var _ IGroupManager = &SimpleGroupManager{} -var _ IGroupManager = &ComplexGroupManager{} - -var _ INotifyHandler = &DefaultNotifyHandler{} diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index e000a76..87ef054 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -74,11 +74,11 @@ func InnerTestEntry(t *testing.T) { var err error - logic.Init(confFile) - go logic.RunLoop() + sm := logic.NewServerManager(confFile) + go sm.RunLoop() time.Sleep(200 * time.Millisecond) - config := logic.GetConfig() + config := sm.Config() _ = os.RemoveAll(config.HlsConfig.OutPath) @@ -160,7 +160,7 @@ func InnerTestEntry(t *testing.T) { rtmpWriter.Dispose() // 由于windows没有信号,会导致编译错误,所以直接调用Dispose //_ = syscall.Kill(syscall.Getpid(), syscall.SIGUSR1) - logic.Dispose() + sm.Dispose() nazalog.Debugf("count. %d %d %d", fileTagCount.Load(), httpflvPullTagCount.Load(), rtmpPullTagCount.Load()) compareFile() diff --git a/pkg/logic/config.go b/pkg/logic/config.go index 133f20e..1bea572 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -139,6 +139,8 @@ type CommonHttpAddrConfig struct { } func LoadConfAndInitLog(confFile string) *Config { + var config *Config + // 读取配置文件并解析原始内容 rawContent, err := ioutil.ReadFile(confFile) if err != nil { diff --git a/pkg/logic/entry.go b/pkg/logic/entry.go deleted file mode 100644 index d22fdcb..0000000 --- a/pkg/logic/entry.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package logic - -import ( - "net/http" - _ "net/http/pprof" - "os" - "strings" - - "github.com/q191201771/lal/pkg/base" - "github.com/q191201771/lal/pkg/hls" - - "github.com/q191201771/naza/pkg/bininfo" - "github.com/q191201771/naza/pkg/nazalog" - //"github.com/felixge/fgprof" -) - -var ( - config *Config - sm *ServerManager -) - -// TODO(chef) 临时供innertest使用,后面应该重构 -func GetConfig() *Config { - return config -} - -func Init(confFile string) { - LoadConfAndInitLog(confFile) - - dir, _ := os.Getwd() - nazalog.Infof("wd: %s", dir) - nazalog.Infof("args: %s", strings.Join(os.Args, " ")) - nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine()) - nazalog.Infof("version: %s", base.LalFullInfo) - nazalog.Infof("github: %s", base.LalGithubSite) - nazalog.Infof("doc: %s", base.LalDocSite) - - if config.HlsConfig.Enable && config.HlsConfig.UseMemoryAsDiskFlag { - nazalog.Infof("hls use memory as disk.") - hls.SetUseMemoryAsDiskFlag(true) - } - - if config.RecordConfig.EnableFlv { - if err := os.MkdirAll(config.RecordConfig.FlvOutPath, 0777); err != nil { - nazalog.Errorf("record flv mkdir error. path=%s, err=%+v", config.RecordConfig.FlvOutPath, err) - } - if err := os.MkdirAll(config.RecordConfig.MpegtsOutPath, 0777); err != nil { - nazalog.Errorf("record mpegts mkdir error. path=%s, err=%+v", config.RecordConfig.MpegtsOutPath, err) - } - } -} - -func RunLoop() { - sm = NewServerManager() - - if config.PprofConfig.Enable { - go runWebPprof(config.PprofConfig.Addr) - } - go runSignalHandler(func() { - sm.Dispose() - }) - - err := sm.RunLoop() - nazalog.Errorf("server manager loop break. err=%+v", err) -} - -func Dispose() { - sm.Dispose() -} - -func runWebPprof(addr string) { - nazalog.Infof("start web pprof listen. addr=%s", addr) - - //nazalog.Warn("start fgprof.") - //http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler()) - - if err := http.ListenAndServe(addr, nil); err != nil { - nazalog.Error(err) - return - } -} diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 5d94ddd..2acf683 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -11,6 +11,7 @@ package logic import ( "encoding/json" "fmt" + "net" "path/filepath" "strings" "sync" @@ -22,8 +23,6 @@ import ( "github.com/q191201771/lal/pkg/remux" - "github.com/q191201771/naza/pkg/defertaskthread" - "github.com/q191201771/lal/pkg/rtprtcp" "github.com/q191201771/lal/pkg/hevc" @@ -42,10 +41,16 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) +type GroupObserver interface { + CleanupHlsIfNeeded(appName string, streamName string, path string) +} + type Group struct { UniqueKey string // const after init appName string // const after init streamName string // const after init TODO chef: 和stat里的字段重复,可以删除掉 + config *Config + observer GroupObserver exitChan chan struct{} @@ -80,7 +85,7 @@ type Group struct { // rtmp pub使用 dummyAudioFilter *DummyAudioFilter // rtmp sub使用 - rtmpBufWriter base.IBufWriter // TODO(chef): 后面可以在业务层加一个定时Flush + rtmpMergeWriter *base.MergeWriter // TODO(chef): 后面可以在业务层加一个定时Flush // mpegts使用 patpmt []byte // rtsp使用 @@ -99,7 +104,7 @@ type pushProxy struct { pushSession *rtmp.PushSession } -func NewGroup(appName string, streamName string) *Group { +func NewGroup(appName string, streamName string, config *Config, observer GroupObserver) *Group { uk := base.GenUkGroup() url2PushProxy := make(map[string]*pushProxy) // TODO(chef): 移入Enable里面并进行review+测试 @@ -117,6 +122,8 @@ func NewGroup(appName string, streamName string) *Group { UniqueKey: uk, appName: appName, streamName: streamName, + config: config, + observer: observer, stat: base.StatGroup{ StreamName: streamName, }, @@ -139,7 +146,9 @@ func NewGroup(appName string, streamName string) *Group { } g.setPullUrl(config.RelayPullConfig.Enable, pullUrl) - g.rtmpBufWriter = base.NewWriterFuncSize(g.write2RtmpSubSessions, config.RtmpConfig.MergeWriteSize) + if config.RtmpConfig.MergeWriteSize > 0 { + g.rtmpMergeWriter = base.NewMergeWriter(g.writev2RtmpSubSessions, config.RtmpConfig.MergeWriteSize) + } nazalog.Infof("[%s] lifecycle new group. group=%p, appName=%s, streamName=%s", uk, g, appName, streamName) return g @@ -310,7 +319,7 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) bool { group.rtmpPubSession = session group.addIn() - if config.RtspConfig.Enable { + if group.config.RtspConfig.Enable { group.rtmp2RtspRemuxer = remux.NewRtmp2RtspRemuxer( func(sdpCtx sdp.LogicContext) { group.sdpCtx = &sdpCtx @@ -320,9 +329,9 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) bool { } // TODO(chef): 为rtmp pull以及rtsp也添加叠加静音音频的功能 - if config.RtmpConfig.AddDummyAudioEnable { + if group.config.RtmpConfig.AddDummyAudioEnable { // TODO(chef): 从整体控制和锁关系来说,应该让pub的数据回调到group中进锁后再让数据流入filter - group.dummyAudioFilter = NewDummyAudioFilter(group.UniqueKey, config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg) + group.dummyAudioFilter = NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg) session.SetPubSessionObserver(group.dummyAudioFilter) } else { session.SetPubSessionObserver(group) @@ -782,12 +791,12 @@ func (group *Group) isTotalEmpty() bool { // func (group *Group) addIn() { // 是否启动hls - if config.HlsConfig.Enable { + if group.config.HlsConfig.Enable { if group.hlsMuxer != nil { nazalog.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer) } - enable := config.HlsConfig.Enable || config.HlsConfig.EnableHttps - group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &config.HlsConfig.MuxerConfig, group) + enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps + group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group) group.hlsMuxer.Start() } @@ -799,9 +808,9 @@ func (group *Group) addIn() { now := time.Now().Unix() // 是否录制成flv文件 - if config.RecordConfig.EnableFlv { + if group.config.RecordConfig.EnableFlv { filename := fmt.Sprintf("%s-%d.flv", group.streamName, now) - filenameWithPath := filepath.Join(config.RecordConfig.FlvOutPath, filename) + filenameWithPath := filepath.Join(group.config.RecordConfig.FlvOutPath, filename) if group.recordFlv != nil { nazalog.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s", group.UniqueKey, filenameWithPath, group.recordFlv.Name()) @@ -823,9 +832,9 @@ func (group *Group) addIn() { } // 是否录制成ts文件 - if config.RecordConfig.EnableMpegts { + if group.config.RecordConfig.EnableMpegts { filename := fmt.Sprintf("%s-%d.ts", group.streamName, now) - filenameWithPath := filepath.Join(config.RecordConfig.MpegtsOutPath, filename) + filenameWithPath := filepath.Join(group.config.RecordConfig.MpegtsOutPath, filename) if group.recordMpegts != nil { nazalog.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s", group.UniqueKey, filenameWithPath, group.recordMpegts.Name()) @@ -846,7 +855,7 @@ func (group *Group) addIn() { // func (group *Group) delIn() { // 停止hls - if config.HlsConfig.Enable && group.hlsMuxer != nil { + if group.config.HlsConfig.Enable && group.hlsMuxer != nil { group.disposeHlsMuxer() } @@ -861,7 +870,7 @@ func (group *Group) delIn() { } // 停止flv录制 - if config.RecordConfig.EnableFlv { + if group.config.RecordConfig.EnableFlv { if group.recordFlv != nil { if err := group.recordFlv.Dispose(); err != nil { nazalog.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err) @@ -871,7 +880,7 @@ func (group *Group) delIn() { } // 停止ts录制 - if config.RecordConfig.EnableMpegts { + if group.config.RecordConfig.EnableMpegts { if group.recordMpegts != nil { if err := group.recordMpegts.Dispose(); err != nil { nazalog.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err) @@ -892,33 +901,7 @@ func (group *Group) disposeHlsMuxer() { if group.hlsMuxer != nil { group.hlsMuxer.Dispose() - // 添加延时任务,删除HLS文件 - if config.HlsConfig.Enable && - (config.HlsConfig.CleanupMode == hls.CleanupModeInTheEnd || config.HlsConfig.CleanupMode == hls.CleanupModeAsap) { - defertaskthread.Go( - config.HlsConfig.FragmentDurationMs*config.HlsConfig.FragmentNum*2, - func(param ...interface{}) { - appName := param[0].(string) - streamName := param[1].(string) - outPath := param[2].(string) - - if g := sm.GetGroup(appName, streamName); g != nil { - if g.IsHlsMuxerAlive() { - nazalog.Warnf("cancel cleanup hls file path since hls muxer still alive. streamName=%s", streamName) - return - } - } - - nazalog.Infof("cleanup hls file path. streamName=%s, path=%s", streamName, outPath) - if err := hls.RemoveAll(outPath); err != nil { - nazalog.Warnf("cleanup hls file path error. path=%s, err=%+v", outPath, err) - } - }, - group.appName, - group.streamName, - group.hlsMuxer.OutPath(), - ) - } + group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath()) group.hlsMuxer = nil } @@ -1010,12 +993,12 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { //nazalog.Debugf("[%s] broadcaseRTMP. header=%+v, %s", group.UniqueKey, msg.Header, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 7))) // # hls - if config.HlsConfig.Enable && group.hlsMuxer != nil { + if group.config.HlsConfig.Enable && group.hlsMuxer != nil { group.hlsMuxer.FeedRtmpMessage(msg) } // # rtsp - if config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil { + if group.config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil { group.rtmp2RtspRemuxer.FeedRtmpMsg(msg) } @@ -1062,7 +1045,9 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { // 有新加入的sub session(本次循环的第一个新加入的sub session),把rtmp buf writer中的缓存数据全部广播发送给老的sub session // 从而确保新加入的sub session不会发送这部分脏的数据 // 注意,此处可能被调用多次,但是只有第一次会实际flush缓存数据 - group.rtmpBufWriter.Flush() + if group.rtmpMergeWriter != nil { + group.rtmpMergeWriter.Flush() + } session.IsFresh = false } @@ -1072,13 +1057,19 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { // 把rtmp buf writer中的缓存数据全部广播发送给老的sub session // 并且修改这个sub session的标志 // 让rtmp buf writer来发送这个关键帧 - group.rtmpBufWriter.Flush() + if group.rtmpMergeWriter != nil { + group.rtmpMergeWriter.Flush() + } session.ShouldWaitVideoKeyFrame = false } } // ## 转发本次数据 if len(group.rtmpSubSessionSet) > 0 { - group.rtmpBufWriter.Write(lcd.Get()) + if group.rtmpMergeWriter == nil { + group.write2RtmpSubSessions(lcd.Get()) + } else { + group.rtmpMergeWriter.Write(lcd.Get()) + } } // TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下 @@ -1156,10 +1147,10 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { } // # 缓存关键信息,以及gop - if config.RtmpConfig.Enable { + if group.config.RtmpConfig.Enable { group.rtmpGopCache.Feed(msg, lcd.Get) } - if config.HttpflvConfig.Enable { + if group.config.HttpflvConfig.Enable { group.httpflvGopCache.Feed(msg, lrm2ft.Get) } @@ -1254,6 +1245,15 @@ func (group *Group) write2RtmpSubSessions(b []byte) { } } +func (group *Group) writev2RtmpSubSessions(bs net.Buffers) { + for session := range group.rtmpSubSessionSet { + if session.IsFresh || session.ShouldWaitVideoKeyFrame { + continue + } + _ = session.Writev(bs) + } +} + // --------------------------------------------------------------------------------------------------------------------- func (group *Group) isPullEnable() bool { diff --git a/pkg/logic/group_manager.go b/pkg/logic/group_manager.go index 82877e6..4739e43 100644 --- a/pkg/logic/group_manager.go +++ b/pkg/logic/group_manager.go @@ -8,26 +8,13 @@ package logic -// 注意,这个模块暂时不对外开放 -// -// TODO -// - 现有逻辑重构至当前模块中【DONE】 -// - server_manger接入当前模块,替换掉原来的map【DONE】 -// - 重构整理使用server_manager的地方【DONE】 -// - 实现appName逻辑的IGroupManager【DONE】 -// - 增加单元测试【DONE】 -// - 配置文件或var.go中增加选取具体IGroupManager实现的开关 -// - 去除配置文件中一部分的url_pattern -// - 更新相应的文档:本文件注释,server_manager等中原有关于appName的注释,配置文件文档,流地址列表文档 -// - 创建group时没有appname,后面又有了,可以考虑更新一下 +// 注意,这个模块的功能不完全,目前只使用SimpleGroupManager // --------------------------------------------------------------------------------------------------------------------- -// OnIterateGroup -// -// @return 如果返回false,则删除这个元素 -// -type OnIterateGroup func(group *Group) bool +type IGroupCreator interface { + CreateGroup(appName, streamName string) *Group +} // IGroupManager // @@ -37,8 +24,7 @@ type OnIterateGroup func(group *Group) bool type IGroupManager interface { // GetOrCreateGroup // - // @param appName 如果没有,可以为"" - // + // @param appName 注意,如果没有,可以为"" // @return createFlag 如果为false表示group之前就存在,如果为true表示为当前新创建 // GetOrCreateGroup(appName string, streamName string) (group *Group, createFlag bool) @@ -49,11 +35,15 @@ type IGroupManager interface { // GetGroup(appName string, streamName string) *Group - Iterate(onIterateGroup OnIterateGroup) + // Iterate 遍历所有 Group + // + // @param onIterateGroup 如果返回false,则删除这个 Group + // + Iterate(onIterateGroup func(group *Group) bool) Len() int - // TODO(chef): 没有提供删除操作,因为目前使用时,是在遍历时做删除的 + // TODO(chef): feat 没有提供删除操作,因为目前使用时,是在遍历时做删除的 } // --------------------------------------------------------------------------------------------------------------------- @@ -61,19 +51,21 @@ type IGroupManager interface { // SimpleGroupManager 忽略appName,只使用streamName // type SimpleGroupManager struct { - groups map[string]*Group // streamName -> Group + groupCreator IGroupCreator + groups map[string]*Group // streamName -> Group } -func NewSimpleGroupManager() *SimpleGroupManager { +func NewSimpleGroupManager(groupCreator IGroupCreator) *SimpleGroupManager { return &SimpleGroupManager{ - groups: make(map[string]*Group), + groupCreator: groupCreator, + groups: make(map[string]*Group), } } func (s *SimpleGroupManager) GetOrCreateGroup(appName string, streamName string) (group *Group, createFlag bool) { g := s.GetGroup(appName, streamName) if g == nil { - g = NewGroup(appName, streamName) + g = s.groupCreator.CreateGroup(appName, streamName) s.groups[streamName] = g return g, true } @@ -88,9 +80,7 @@ func (s *SimpleGroupManager) GetGroup(appName string, streamName string) *Group return g } -// Iterate 遍历group -// -func (s *SimpleGroupManager) Iterate(onIterateGroup OnIterateGroup) { +func (s *SimpleGroupManager) Iterate(onIterateGroup func(group *Group) bool) { for streamName, group := range s.groups { if !onIterateGroup(group) { delete(s.groups, streamName) @@ -104,6 +94,21 @@ func (s *SimpleGroupManager) Len() int { // --------------------------------------------------------------------------------------------------------------------- +// TODO(chef): +// +// - 现有逻辑重构至当前模块中【DONE】 +// - server_manger接入当前模块,替换掉原来的map【DONE】 +// - 重构整理使用server_manager的地方【DONE】 +// - 实现appName逻辑的IGroupManager【DONE】 +// - 增加单元测试【DONE】 +// - 配置文件或var.go中增加选取具体IGroupManager实现的开关 +// - 去除配置文件中一部分的url_pattern +// - 更新相应的文档:本文件注释,server_manager等中原有关于appName的注释,配置文件文档,流地址列表文档 +// - 创建group时没有appname,后面又有了,可以考虑更新一下 +// - ComplexGroupManager使用IGroupCreator + +// --------------------------------------------------------------------------------------------------------------------- + // ComplexGroupManager // // 背景: @@ -118,13 +123,15 @@ func (s *SimpleGroupManager) Len() int { // - group可能由第一种协议创建,也可能由第二种协议创建 // type ComplexGroupManager struct { + groupCreator IGroupCreator // 注意,一个group只可能在一个容器中,两个容器中的group加起来才是全量 onlyStreamNameGroups map[string]*Group // streamName -> Group appNameStreamNameGroups map[string]map[string]*Group // appName -> streamName -> Group } -func NewComplexGroupManager() *ComplexGroupManager { +func NewComplexGroupManager(groupCreator IGroupCreator) *ComplexGroupManager { return &ComplexGroupManager{ + groupCreator: groupCreator, onlyStreamNameGroups: make(map[string]*Group), appNameStreamNameGroups: make(map[string]map[string]*Group), } @@ -159,7 +166,7 @@ func (gm *ComplexGroupManager) getGroup(appName string, streamName string, shoul // 两个容器都没找到 if shouldCreate { - group = NewGroup(appName, streamName) + group = gm.groupCreator.CreateGroup(appName, streamName) gm.onlyStreamNameGroups[streamName] = group return group, true } else { @@ -183,7 +190,7 @@ func (gm *ComplexGroupManager) getGroup(appName string, streamName string, shoul // 都没有找到 if shouldCreate { - group = NewGroup(appName, streamName) + group = gm.groupCreator.CreateGroup(appName, streamName) if !mok { m = make(map[string]*Group) gm.appNameStreamNameGroups[appName] = m @@ -196,7 +203,7 @@ func (gm *ComplexGroupManager) getGroup(appName string, streamName string, shoul } } -func (gm *ComplexGroupManager) Iterate(onIterateGroup OnIterateGroup) { +func (gm *ComplexGroupManager) Iterate(onIterateGroup func(group *Group) bool) { for streamName, group := range gm.onlyStreamNameGroups { if !onIterateGroup(group) { delete(gm.onlyStreamNameGroups, streamName) diff --git a/pkg/logic/group_manager_test.go b/pkg/logic/group_manager_test.go index 63c45f5..c384186 100644 --- a/pkg/logic/group_manager_test.go +++ b/pkg/logic/group_manager_test.go @@ -13,10 +13,17 @@ import ( "testing" ) -func TestGroupManager(t *testing.T) { - // TODO(chef): refactor 由于Group中耦合了全局变量config(不依赖取值,但是不能为nil),所以该单元测试进行前对config做初始化 - config = &Config{} +type mockGroupCreator struct { +} + +func (m *mockGroupCreator) CreateGroup(appName, streamName string) *Group { + var config Config + return NewGroup(appName, streamName, &config, nil) +} + +var mgc = &mockGroupCreator{} +func TestGroupManager(t *testing.T) { var ( sgm IGroupManager sg0 *Group @@ -35,8 +42,8 @@ func TestGroupManager(t *testing.T) { createFlag bool ) - sgm = NewSimpleGroupManager() - cgm = NewComplexGroupManager() + sgm = NewSimpleGroupManager(mgc) + cgm = NewComplexGroupManager(mgc) // (为空时)获取 // 获取到nil @@ -266,8 +273,8 @@ func TestGroupManager(t *testing.T) { //---------------------------- - sgm = NewSimpleGroupManager() - cgm = NewComplexGroupManager() + sgm = NewSimpleGroupManager(mgc) + cgm = NewComplexGroupManager(mgc) sg0 = cgm.GetGroup("", "stream1") cg0 = cgm.GetGroup("", "stream1") diff --git a/pkg/logic/http_api.go b/pkg/logic/http_api.go index 97e9bf6..2b8858f 100644 --- a/pkg/logic/http_api.go +++ b/pkg/logic/http_api.go @@ -10,36 +10,25 @@ package logic import ( "encoding/json" + "github.com/q191201771/naza/pkg/nazahttp" "net" "net/http" - "time" - - "github.com/q191201771/naza/pkg/nazahttp" "github.com/q191201771/lal/pkg/base" - "github.com/q191201771/naza/pkg/bininfo" "github.com/q191201771/naza/pkg/nazalog" ) -var serverStartTime string - -type HttpApiServerObserver interface { - OnStatAllGroup() []base.StatGroup - OnStatGroup(streamName string) *base.StatGroup - OnCtrlStartPull(info base.ApiCtrlStartPullReq) - OnCtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic -} - type HttpApiServer struct { - addr string - observer HttpApiServerObserver - ln net.Listener + addr string + sm *ServerManager + + ln net.Listener } -func NewHttpApiServer(addr string, observer HttpApiServerObserver) *HttpApiServer { +func NewHttpApiServer(addr string, sm *ServerManager) *HttpApiServer { return &HttpApiServer{ - addr: addr, - observer: observer, + addr: addr, + sm: sm, } } @@ -51,7 +40,7 @@ func (h *HttpApiServer) Listen() (err error) { return } -func (h *HttpApiServer) Runloop() error { +func (h *HttpApiServer) RunLoop() error { mux := http.NewServeMux() mux.HandleFunc("/api/list", h.apiListHandler) @@ -68,55 +57,19 @@ func (h *HttpApiServer) Runloop() error { // TODO chef: dispose -func (h *HttpApiServer) apiListHandler(w http.ResponseWriter, req *http.Request) { - // TODO chef: 写完api list页面 - b := []byte(` - -lal http api list - -
-
-

api接口列表:

- -
-

其他链接:

- - - -`) - - w.Header().Add("Server", base.LalHttpApiServer) - _, _ = w.Write(b) -} - func (h *HttpApiServer) statLalInfoHandler(w http.ResponseWriter, req *http.Request) { var v base.ApiStatLalInfo v.ErrorCode = base.ErrorCodeSucc v.Desp = base.DespSucc - v.Data.BinInfo = bininfo.StringifySingleLine() - v.Data.LalVersion = base.LalVersion - v.Data.ApiVersion = base.HttpApiVersion - v.Data.NotifyVersion = base.HttpNotifyVersion - v.Data.StartTime = serverStartTime - v.Data.ServerId = config.ServerId + v.Data = h.sm.StatLalInfo() feedback(v, w) } func (h *HttpApiServer) statAllGroupHandler(w http.ResponseWriter, req *http.Request) { - gs := h.observer.OnStatAllGroup() var v base.ApiStatAllGroup v.ErrorCode = base.ErrorCodeSucc v.Desp = base.DespSucc - v.Data.Groups = gs + v.Data.Groups = h.sm.StatAllGroup() feedback(v, w) } @@ -132,7 +85,7 @@ func (h *HttpApiServer) statGroupHandler(w http.ResponseWriter, req *http.Reques return } - v.Data = h.observer.OnStatGroup(streamName) + v.Data = h.sm.StatGroup(streamName) if v.Data == nil { v.ErrorCode = base.ErrorCodeGroupNotFound v.Desp = base.DespGroupNotFound @@ -160,7 +113,7 @@ func (h *HttpApiServer) ctrlStartPullHandler(w http.ResponseWriter, req *http.Re } nazalog.Infof("http api start pull. req info=%+v", info) - h.observer.OnCtrlStartPull(info) + h.sm.CtrlStartPull(info) v.ErrorCode = base.ErrorCodeSucc v.Desp = base.DespSucc feedback(v, w) @@ -181,17 +134,45 @@ func (h *HttpApiServer) ctrlKickOutSessionHandler(w http.ResponseWriter, req *ht } nazalog.Infof("http api kick out session. req info=%+v", info) - resp := h.observer.OnCtrlKickOutSession(info) + resp := h.sm.CtrlKickOutSession(info) feedback(resp, w) return } +func (h *HttpApiServer) apiListHandler(w http.ResponseWriter, req *http.Request) { + // TODO chef: 写完api list页面 + b := []byte(` + +lal http api list + +
+
+

api接口列表:

+ +
+

其他链接:

+ + + +`) + + w.Header().Add("Server", base.LalHttpApiServer) + _, _ = w.Write(b) +} + +// --------------------------------------------------------------------------------------------------------------------- + func feedback(v interface{}, w http.ResponseWriter) { resp, _ := json.Marshal(v) w.Header().Add("Server", base.LalHttpApiServer) _, _ = w.Write(resp) } - -func init() { - serverStartTime = time.Now().Format("2006-01-02 15:04:05.999") -} diff --git a/pkg/logic/http_api_test.go b/pkg/logic/http_api_test.go index 732127b..c4159e8 100644 --- a/pkg/logic/http_api_test.go +++ b/pkg/logic/http_api_test.go @@ -18,6 +18,6 @@ func TestHttpApiServer(t *testing.T) { // nazalog.Error(err) // return //} - //err := s.Runloop() + //err := s.RunLoop() //nazalog.Error(err) } diff --git a/pkg/logic/http_notify.go b/pkg/logic/http_notify.go index 0f06adc..be27da7 100644 --- a/pkg/logic/http_notify.go +++ b/pkg/logic/http_notify.go @@ -12,13 +12,13 @@ import ( "net/http" "time" - "github.com/q191201771/naza/pkg/bininfo" - "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/nazahttp" "github.com/q191201771/naza/pkg/nazalog" ) +// TODO(chef): refactor 配置参数供外部传入 +// TODO(chef): refactor maxTaskLen修改为能表示是阻塞任务的意思 var ( maxTaskLen = 1024 notifyTimeoutSec = 3 @@ -30,12 +30,14 @@ type PostTask struct { } type HttpNotify struct { + cfg HttpNotifyConfig taskQueue chan PostTask client *http.Client } -func NewHttpNotify() *HttpNotify { +func NewHttpNotify(cfg HttpNotifyConfig) *HttpNotify { httpNotify := &HttpNotify{ + cfg: cfg, taskQueue: make(chan PostTask, maxTaskLen), client: &http.Client{ Timeout: time.Duration(notifyTimeoutSec) * time.Second, @@ -48,42 +50,68 @@ func NewHttpNotify() *HttpNotify { // TODO(chef): Dispose -// 注意,这里的函数命名以On开头并不是因为是回调函数,而是notify给业务方的接口叫做on_server_start -func (h *HttpNotify) OnServerStart() { - var info base.LalInfo - info.BinInfo = bininfo.StringifySingleLine() - info.LalVersion = base.LalVersion - info.ApiVersion = base.HttpApiVersion - info.NotifyVersion = base.HttpNotifyVersion - info.StartTime = serverStartTime - info.ServerId = config.ServerId - h.asyncPost(config.HttpNotifyConfig.OnServerStart, info) +// --------------------------------------------------------------------------------------------------------------------- + +func (h *HttpNotify) NotifyServerStart(info base.LalInfo) { + h.asyncPost(h.cfg.OnServerStart, info) +} + +func (h *HttpNotify) NotifyUpdate(info base.UpdateInfo) { + h.asyncPost(h.cfg.OnUpdate, info) +} + +func (h *HttpNotify) NotifyPubStart(info base.PubStartInfo) { + h.asyncPost(h.cfg.OnPubStart, info) +} + +func (h *HttpNotify) NotifyPubStop(info base.PubStopInfo) { + h.asyncPost(h.cfg.OnPubStop, info) +} + +func (h *HttpNotify) NotifySubStart(info base.SubStartInfo) { + h.asyncPost(h.cfg.OnSubStart, info) +} + +func (h *HttpNotify) NotifySubStop(info base.SubStopInfo) { + h.asyncPost(h.cfg.OnSubStop, info) +} + +func (h *HttpNotify) NotifyRtmpConnect(info base.RtmpConnectInfo) { + h.asyncPost(h.cfg.OnRtmpConnect, info) +} + +// ----- implement INotifyHandler interface ---------------------------------------------------------------------------- + +func (h *HttpNotify) OnServerStart(info base.LalInfo) { + h.NotifyServerStart(info) } func (h *HttpNotify) OnUpdate(info base.UpdateInfo) { - h.asyncPost(config.HttpNotifyConfig.OnUpdate, info) + h.NotifyUpdate(info) } func (h *HttpNotify) OnPubStart(info base.PubStartInfo) { - h.asyncPost(config.HttpNotifyConfig.OnPubStart, info) + h.NotifyPubStart(info) } func (h *HttpNotify) OnPubStop(info base.PubStopInfo) { - h.asyncPost(config.HttpNotifyConfig.OnPubStop, info) + h.NotifyPubStop(info) } func (h *HttpNotify) OnSubStart(info base.SubStartInfo) { - h.asyncPost(config.HttpNotifyConfig.OnSubStart, info) + h.NotifySubStart(info) } func (h *HttpNotify) OnSubStop(info base.SubStopInfo) { - h.asyncPost(config.HttpNotifyConfig.OnSubStop, info) + h.NotifySubStop(info) } func (h *HttpNotify) OnRtmpConnect(info base.RtmpConnectInfo) { - h.asyncPost(config.HttpNotifyConfig.OnRtmpConnect, info) + h.NotifyRtmpConnect(info) } +// --------------------------------------------------------------------------------------------------------------------- + func (h *HttpNotify) RunLoop() { for { select { @@ -93,8 +121,10 @@ func (h *HttpNotify) RunLoop() { } } +// --------------------------------------------------------------------------------------------------------------------- + func (h *HttpNotify) asyncPost(url string, info interface{}) { - if !config.HttpNotifyConfig.Enable || url == "" { + if !h.cfg.Enable || url == "" { return } diff --git a/pkg/logic/logic.go b/pkg/logic/logic.go index 7b64f0e..072ee71 100644 --- a/pkg/logic/logic.go +++ b/pkg/logic/logic.go @@ -8,8 +8,80 @@ package logic -import ( - "errors" -) +import "github.com/q191201771/lal/pkg/base" + +type ILalServer interface { + RunLoop() error + Dispose() + + // StatLalInfo StatXxx... CtrlXxx... + // 一些获取状态、发送控制命令的API + // 目的是方便业务方在不修改logic包内代码的前提下,在外层实现一些特定逻辑的定制化开发 + // + StatLalInfo() base.LalInfo + StatAllGroup() (sgs []base.StatGroup) + StatGroup(streamName string) *base.StatGroup + CtrlStartPull(info base.ApiCtrlStartPullReq) + CtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic +} -var ErrLogic = errors.New("lal.logic: fxxk") +// NewLalServer 创建一个lal server +// +// @param confFile 配置文件地址 +// +// @param modOption +// 可变参数,如果不关心,可以不填 +// 目的是方便业务方在不修改logic包内代码的前提下,在外层实现一些特定逻辑的定制化开发 +// Option struct中可修改的参数说明: +// - notifyHandler 事件监听 +// 业务方可实现 INotifyHandler 接口并传入从而获取到对应的事件通知 +// 注意,如果业务方实现了自己的事件监听,则lal server内部不再走http notify的逻辑 +// 如果不填写,内部默认走http notify的逻辑(当然,还需要在配置文件中开启http notify功能) +// +func NewLalServer(confFile string, modOption ...ModOption) ILalServer { + return NewServerManager(confFile, modOption...) +} + +// --------------------------------------------------------------------------------------------------------------------- + +// INotifyHandler 事件通知接口 +// +type INotifyHandler interface { + OnServerStart(info base.LalInfo) + OnUpdate(info base.UpdateInfo) + OnPubStart(info base.PubStartInfo) + OnPubStop(info base.PubStopInfo) + OnSubStart(info base.SubStartInfo) + OnSubStop(info base.SubStopInfo) + OnRtmpConnect(info base.RtmpConnectInfo) +} + +type Option struct { + notifyHandler INotifyHandler +} + +var defaultOption = Option{ + notifyHandler: nil, // 注意,为nil时,内部会赋值为 HttpNotify +} + +type ModOption func(option *Option) + +// --------------------------------------------------------------------------------------------------------------------- + +// 一些没有放入配置文件中,包级别的配置,暂时没有对外暴露 +// +var ( + relayPushTimeoutMs = 5000 + relayPushWriteAvTimeoutMs = 5000 + relayPullTimeoutMs = 5000 + relayPullReadAvTimeoutMs = 5000 + calcSessionStatIntervalSec uint32 = 5 + + // checkSessionAliveIntervalSec + // + // - 对于输入型session,检查一定时间内,是否没有收到数据 + // - 对于输出型session,检查一定时间内,是否没有发送数据 + // 注意,这里既检查socket发送阻塞,又检查上层没有给session喂数据 + // + checkSessionAliveIntervalSec uint32 = 10 +) diff --git a/pkg/logic/notify.go b/pkg/logic/notify.go deleted file mode 100644 index 71f267c..0000000 --- a/pkg/logic/notify.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2021, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package logic - -import ( - "github.com/q191201771/lal/pkg/base" -) - -type INotifyHandler interface { - OnServerStart() - OnUpdate(info base.UpdateInfo) - OnPubStart(info base.PubStartInfo) - OnPubStop(info base.PubStopInfo) - OnSubStart(info base.SubStartInfo) - OnSubStop(info base.SubStopInfo) - OnRtmpConnect(info base.RtmpConnectInfo) -} - -type DefaultNotifyHandler struct { - httpNotify *HttpNotify -} - -func NewDefaultNotifyHandler() *DefaultNotifyHandler { - NewHttpNotify() - return &DefaultNotifyHandler{ - httpNotify: NewHttpNotify(), - } -} - -func (d *DefaultNotifyHandler) OnServerStart() { - d.httpNotify.OnServerStart() -} - -func (d *DefaultNotifyHandler) OnUpdate(info base.UpdateInfo) { - d.httpNotify.OnUpdate(info) -} - -func (d *DefaultNotifyHandler) OnPubStart(info base.PubStartInfo) { - d.httpNotify.OnPubStart(info) -} - -func (d *DefaultNotifyHandler) OnPubStop(info base.PubStopInfo) { - d.httpNotify.OnPubStop(info) -} - -func (d *DefaultNotifyHandler) OnSubStart(info base.SubStartInfo) { - d.httpNotify.OnSubStart(info) -} - -func (d *DefaultNotifyHandler) OnSubStop(info base.SubStopInfo) { - d.httpNotify.OnSubStop(info) -} - -func (d *DefaultNotifyHandler) OnRtmpConnect(info base.RtmpConnectInfo) { - d.httpNotify.OnRtmpConnect(info) -} diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 8606f0f..4a98fba 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -10,6 +10,11 @@ package logic import ( "fmt" + "github.com/q191201771/naza/pkg/bininfo" + "github.com/q191201771/naza/pkg/defertaskthread" + "net/http" + "os" + "strings" "sync" "time" @@ -24,9 +29,15 @@ import ( "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/naza/pkg/nazalog" + _ "net/http/pprof" + //"github.com/felixge/fgprof" ) type ServerManager struct { + option Option + serverStartTime string + config *Config + httpServerManager *base.HttpServerManager httpServerHandler *HttpServerHandler hlsServerHandler *hls.ServerHandler @@ -40,34 +51,81 @@ type ServerManager struct { groupManager IGroupManager } -func NewServerManager() *ServerManager { - m := &ServerManager{ - exitChan: make(chan struct{}), - groupManager: NewSimpleGroupManager(), +func NewServerManager(confFile string, modOption ...ModOption) *ServerManager { + sm := &ServerManager{ + serverStartTime: time.Now().Format("2006-01-02 15:04:05.999"), + exitChan: make(chan struct{}), + } + sm.groupManager = NewSimpleGroupManager(sm) + + sm.config = LoadConfAndInitLog(confFile) + + // TODO(chef): refactor 启动信息可以考虑放入package base中,所有的app都打印 + dir, _ := os.Getwd() + nazalog.Infof("wd: %s", dir) + nazalog.Infof("args: %s", strings.Join(os.Args, " ")) + nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine()) + nazalog.Infof("version: %s", base.LalFullInfo) + nazalog.Infof("github: %s", base.LalGithubSite) + nazalog.Infof("doc: %s", base.LalDocSite) + nazalog.Infof("serverStartTime: %s", sm.serverStartTime) + + if sm.config.HlsConfig.Enable && sm.config.HlsConfig.UseMemoryAsDiskFlag { + nazalog.Infof("hls use memory as disk.") + hls.SetUseMemoryAsDiskFlag(true) + } + + if sm.config.RecordConfig.EnableFlv { + if err := os.MkdirAll(sm.config.RecordConfig.FlvOutPath, 0777); err != nil { + nazalog.Errorf("record flv mkdir error. path=%s, err=%+v", sm.config.RecordConfig.FlvOutPath, err) + } + if err := os.MkdirAll(sm.config.RecordConfig.MpegtsOutPath, 0777); err != nil { + nazalog.Errorf("record mpegts mkdir error. path=%s, err=%+v", sm.config.RecordConfig.MpegtsOutPath, err) + } + } + + sm.option = defaultOption + for _, fn := range modOption { + fn(&sm.option) + } + if sm.option.notifyHandler == nil { + sm.option.notifyHandler = NewHttpNotify(sm.config.HttpNotifyConfig) } - if config.HttpflvConfig.Enable || config.HttpflvConfig.EnableHttps || - config.HttptsConfig.Enable || config.HttptsConfig.EnableHttps || - config.HlsConfig.Enable || config.HlsConfig.EnableHttps { - m.httpServerManager = base.NewHttpServerManager() - m.httpServerHandler = NewHttpServerHandler(m) - m.hlsServerHandler = hls.NewServerHandler(config.HlsConfig.OutPath) + if sm.config.HttpflvConfig.Enable || sm.config.HttpflvConfig.EnableHttps || + sm.config.HttptsConfig.Enable || sm.config.HttptsConfig.EnableHttps || + sm.config.HlsConfig.Enable || sm.config.HlsConfig.EnableHttps { + sm.httpServerManager = base.NewHttpServerManager() + sm.httpServerHandler = NewHttpServerHandler(sm) + sm.hlsServerHandler = hls.NewServerHandler(sm.config.HlsConfig.OutPath) } - if config.RtmpConfig.Enable { - m.rtmpServer = rtmp.NewServer(m, config.RtmpConfig.Addr) + if sm.config.RtmpConfig.Enable { + // TODO(chef): refactor 参数顺序统一。Observer都放最后好一些。比如rtmp和rtsp的NewServer + sm.rtmpServer = rtmp.NewServer(sm, sm.config.RtmpConfig.Addr) } - if config.RtspConfig.Enable { - m.rtspServer = rtsp.NewServer(config.RtspConfig.Addr, m) + if sm.config.RtspConfig.Enable { + sm.rtspServer = rtsp.NewServer(sm.config.RtspConfig.Addr, sm) } - if config.HttpApiConfig.Enable { - m.httpApiServer = NewHttpApiServer(config.HttpApiConfig.Addr, m) + if sm.config.HttpApiConfig.Enable { + sm.httpApiServer = NewHttpApiServer(sm.config.HttpApiConfig.Addr, sm) } - return m + + return sm } +// ----- implement ILalServer interface -------------------------------------------------------------------------------- + func (sm *ServerManager) RunLoop() error { - notifyHandler.OnServerStart() + sm.option.notifyHandler.OnServerStart(sm.StatLalInfo()) + + if sm.config.PprofConfig.Enable { + go runWebPprof(sm.config.PprofConfig.Addr) + } + + go base.RunSignalHandler(func() { + sm.Dispose() + }) var addMux = func(config CommonHttpServerConfig, handler base.Handler, name string) error { if config.Enable { @@ -97,13 +155,13 @@ func (sm *ServerManager) RunLoop() error { return nil } - if err := addMux(config.HttpflvConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpflv"); err != nil { + if err := addMux(sm.config.HttpflvConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpflv"); err != nil { return err } - if err := addMux(config.HttptsConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpts"); err != nil { + if err := addMux(sm.config.HttptsConfig.CommonHttpServerConfig, sm.httpServerHandler.ServeSubSession, "httpts"); err != nil { return err } - if err := addMux(config.HlsConfig.CommonHttpServerConfig, sm.hlsServerHandler.ServeHTTP, "hls"); err != nil { + if err := addMux(sm.config.HlsConfig.CommonHttpServerConfig, sm.hlsServerHandler.ServeHTTP, "hls"); err != nil { return err } @@ -142,17 +200,17 @@ func (sm *ServerManager) RunLoop() error { return err } go func() { - if err := sm.httpApiServer.Runloop(); err != nil { + if err := sm.httpApiServer.RunLoop(); err != nil { nazalog.Error(err) } }() } - uis := uint32(config.HttpNotifyConfig.UpdateIntervalSec) + uis := uint32(sm.config.HttpNotifyConfig.UpdateIntervalSec) var updateInfo base.UpdateInfo - updateInfo.ServerId = config.ServerId - updateInfo.Groups = sm.statAllGroup() - notifyHandler.OnUpdate(updateInfo) + updateInfo.ServerId = sm.config.ServerId + updateInfo.Groups = sm.StatAllGroup() + sm.option.notifyHandler.OnUpdate(updateInfo) t := time.NewTicker(1 * time.Second) defer t.Stop() @@ -194,9 +252,9 @@ func (sm *ServerManager) RunLoop() error { // 定时通过http notify发送group相关的信息 if uis != 0 && (count%uis) == 0 { - updateInfo.ServerId = config.ServerId - updateInfo.Groups = sm.statAllGroup() - notifyHandler.OnUpdate(updateInfo) + updateInfo.ServerId = sm.config.ServerId + updateInfo.Groups = sm.StatAllGroup() + sm.option.notifyHandler.OnUpdate(updateInfo) } } } @@ -226,22 +284,85 @@ func (sm *ServerManager) Dispose() { sm.exitChan <- struct{}{} } -func (sm *ServerManager) GetGroup(appName string, streamName string) *Group { +func (sm *ServerManager) StatLalInfo() base.LalInfo { + var lalInfo base.LalInfo + lalInfo.BinInfo = bininfo.StringifySingleLine() + lalInfo.LalVersion = base.LalVersion + lalInfo.ApiVersion = base.HttpApiVersion + lalInfo.NotifyVersion = base.HttpNotifyVersion + lalInfo.StartTime = sm.serverStartTime + lalInfo.ServerId = sm.config.ServerId + return lalInfo +} + +func (sm *ServerManager) StatAllGroup() (sgs []base.StatGroup) { sm.mutex.Lock() defer sm.mutex.Unlock() - return sm.getGroup(appName, streamName) + sm.groupManager.Iterate(func(group *Group) bool { + sgs = append(sgs, group.GetStat()) + return true + }) + return } -// --------------------------------------------------------------------------------------------------------------------- -// rtmp.ServerObserver interface -// --------------------------------------------------------------------------------------------------------------------- +func (sm *ServerManager) StatGroup(streamName string) *base.StatGroup { + sm.mutex.Lock() + defer sm.mutex.Unlock() + g := sm.getGroup("", streamName) + if g == nil { + return nil + } + // copy + var ret base.StatGroup + ret = g.GetStat() + return &ret +} +func (sm *ServerManager) CtrlStartPull(info base.ApiCtrlStartPullReq) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + g := sm.getGroup(info.AppName, info.StreamName) + if g == nil { + nazalog.Warnf("group not exist, ignore start pull. streamName=%s", info.StreamName) + return + } + var url string + if info.UrlParam != "" { + url = fmt.Sprintf("rtmp://%s/%s/%s?%s", info.Addr, info.AppName, info.StreamName, info.UrlParam) + } else { + url = fmt.Sprintf("rtmp://%s/%s/%s", info.Addr, info.AppName, info.StreamName) + } + g.StartPull(url) +} +func (sm *ServerManager) CtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic { + sm.mutex.Lock() + defer sm.mutex.Unlock() + g := sm.getGroup("", info.StreamName) + if g == nil { + return base.HttpResponseBasic{ + ErrorCode: base.ErrorCodeGroupNotFound, + Desp: base.DespGroupNotFound, + } + } + if !g.KickOutSession(info.SessionId) { + return base.HttpResponseBasic{ + ErrorCode: base.ErrorCodeSessionNotFound, + Desp: base.DespSessionNotFound, + } + } + return base.HttpResponseBasic{ + ErrorCode: base.ErrorCodeSucc, + Desp: base.DespSucc, + } +} + +// ----- implement rtmp.ServerObserver interface ----------------------------------------------------------------------- func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.ObjectPairArray) { sm.mutex.Lock() defer sm.mutex.Unlock() var info base.RtmpConnectInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.SessionId = session.UniqueKey() info.RemoteAddr = session.GetStat().RemoteAddr if app, err := opa.FindString("app"); err == nil { @@ -253,7 +374,7 @@ func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.Obj if tcUrl, err := opa.FindString("tcUrl"); err == nil { info.TcUrl = tcUrl } - notifyHandler.OnRtmpConnect(info) + sm.option.notifyHandler.OnRtmpConnect(info) } func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool { @@ -265,7 +386,7 @@ func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool { // TODO chef: res值为false时,可以考虑不回调 // TODO chef: 每次赋值都逐个拼,代码冗余,考虑直接用ISession抽离一下代码 var info base.PubStartInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolRtmp info.Url = session.Url() info.AppName = session.AppName() @@ -275,7 +396,7 @@ func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool { info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnPubStart(info) + sm.option.notifyHandler.OnPubStart(info) return res } @@ -290,7 +411,7 @@ func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) { group.DelRtmpPubSession(session) var info base.PubStopInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolRtmp info.Url = session.Url() info.AppName = session.AppName() @@ -300,7 +421,7 @@ func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) { info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnPubStop(info) + sm.option.notifyHandler.OnPubStop(info) } func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool { @@ -310,7 +431,7 @@ func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool { group.AddRtmpSubSession(session) var info base.SubStartInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolRtmp info.Protocol = session.Url() info.AppName = session.AppName() @@ -320,7 +441,7 @@ func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool { info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnSubStart(info) + sm.option.notifyHandler.OnSubStart(info) return true } @@ -336,7 +457,7 @@ func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) { group.DelRtmpSubSession(session) var info base.SubStopInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolRtmp info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -345,12 +466,10 @@ func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) { info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnSubStop(info) + sm.option.notifyHandler.OnSubStop(info) } -// --------------------------------------------------------------------------------------------------------------------- -// httpflv.ServerObserver interface -// --------------------------------------------------------------------------------------------------------------------- +// ----- implement HttpServerHandlerObserver interface ----------------------------------------------------------------- func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) bool { sm.mutex.Lock() @@ -359,7 +478,7 @@ func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) boo group.AddHttpflvSubSession(session) var info base.SubStartInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolHttpflv info.Url = session.Url() info.AppName = session.AppName() @@ -369,7 +488,7 @@ func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) boo info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnSubStart(info) + sm.option.notifyHandler.OnSubStart(info) return true } @@ -384,7 +503,7 @@ func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) { group.DelHttpflvSubSession(session) var info base.SubStopInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolHttpflv info.Url = session.Url() info.AppName = session.AppName() @@ -394,13 +513,9 @@ func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) { info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnSubStop(info) + sm.option.notifyHandler.OnSubStop(info) } -// --------------------------------------------------------------------------------------------------------------------- -// httpts.ServerObserver interface -// --------------------------------------------------------------------------------------------------------------------- - func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool { sm.mutex.Lock() defer sm.mutex.Unlock() @@ -408,7 +523,7 @@ func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool group.AddHttptsSubSession(session) var info base.SubStartInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolHttpts info.Url = session.Url() info.AppName = session.AppName() @@ -418,7 +533,7 @@ func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnSubStart(info) + sm.option.notifyHandler.OnSubStart(info) return true } @@ -433,7 +548,7 @@ func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) { group.DelHttptsSubSession(session) var info base.SubStopInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolHttpts info.Url = session.Url() info.AppName = session.AppName() @@ -443,12 +558,10 @@ func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) { info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnSubStop(info) + sm.option.notifyHandler.OnSubStop(info) } -// --------------------------------------------------------------------------------------------------------------------- -// rtsp.ServerObserver interface -// --------------------------------------------------------------------------------------------------------------------- +// ----- implement rtsp.ServerObserver interface ----------------------------------------------------------------------- func (sm *ServerManager) OnNewRtspSessionConnect(session *rtsp.ServerCommandSession) { // TODO chef: impl me @@ -465,7 +578,7 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool { res := group.AddRtspPubSession(session) var info base.PubStartInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolRtsp info.Url = session.Url() info.AppName = session.AppName() @@ -475,7 +588,7 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool { info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnPubStart(info) + sm.option.notifyHandler.OnPubStart(info) return res } @@ -492,7 +605,7 @@ func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) { group.DelRtspPubSession(session) var info base.PubStopInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolRtsp info.Url = session.Url() info.AppName = session.AppName() @@ -502,7 +615,7 @@ func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) { info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnPubStop(info) + sm.option.notifyHandler.OnPubStop(info) } func (sm *ServerManager) OnNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) { @@ -520,7 +633,7 @@ func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) bool res := group.HandleNewRtspSubSessionPlay(session) var info base.SubStartInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolRtsp info.Url = session.Url() info.AppName = session.AppName() @@ -530,7 +643,7 @@ func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) bool info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnSubStart(info) + sm.option.notifyHandler.OnSubStart(info) return res } @@ -547,7 +660,7 @@ func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) { group.DelRtspSubSession(session) var info base.SubStopInfo - info.ServerId = config.ServerId + info.ServerId = sm.config.ServerId info.Protocol = base.ProtocolRtsp info.Url = session.Url() info.AppName = session.AppName() @@ -557,81 +670,60 @@ func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) { info.RemoteAddr = session.GetStat().RemoteAddr info.HasInSession = group.HasInSession() info.HasOutSession = group.HasOutSession() - notifyHandler.OnSubStop(info) + sm.option.notifyHandler.OnSubStop(info) } -// --------------------------------------------------------------------------------------------------------------------- -// HttpApiServerObserver interface -// --------------------------------------------------------------------------------------------------------------------- +// ----- implement IGroupCreator interface ----------------------------------------------------------------------------- -func (sm *ServerManager) OnStatAllGroup() (sgs []base.StatGroup) { - return sm.statAllGroup() +func (sm *ServerManager) CreateGroup(appName string, streamName string) *Group { + return NewGroup(appName, streamName, sm.config, sm) } -func (sm *ServerManager) OnStatGroup(streamName string) *base.StatGroup { - sm.mutex.Lock() - defer sm.mutex.Unlock() - g := sm.getGroup("", streamName) - if g == nil { - return nil - } - // copy - var ret base.StatGroup - ret = g.GetStat() - return &ret -} - -func (sm *ServerManager) OnCtrlStartPull(info base.ApiCtrlStartPullReq) { - sm.mutex.Lock() - defer sm.mutex.Unlock() - g := sm.getGroup(info.AppName, info.StreamName) - if g == nil { - nazalog.Warnf("group not exist, ignore start pull. streamName=%s", info.StreamName) - return - } - var url string - if info.UrlParam != "" { - url = fmt.Sprintf("rtmp://%s/%s/%s?%s", info.Addr, info.AppName, info.StreamName, info.UrlParam) - } else { - url = fmt.Sprintf("rtmp://%s/%s/%s", info.Addr, info.AppName, info.StreamName) - } - g.StartPull(url) -} +// ----- implement GroupObserver interface ----------------------------------------------------------------------------- + +func (sm *ServerManager) CleanupHlsIfNeeded(appName string, streamName string, path string) { + if sm.config.HlsConfig.Enable && + (sm.config.HlsConfig.CleanupMode == hls.CleanupModeInTheEnd || sm.config.HlsConfig.CleanupMode == hls.CleanupModeAsap) { + defertaskthread.Go( + sm.config.HlsConfig.FragmentDurationMs*sm.config.HlsConfig.FragmentNum*2, + func(param ...interface{}) { + appName := param[0].(string) + streamName := param[1].(string) + outPath := param[2].(string) + + if g := sm.GetGroup(appName, streamName); g != nil { + if g.IsHlsMuxerAlive() { + nazalog.Warnf("cancel cleanup hls file path since hls muxer still alive. streamName=%s", streamName) + return + } + } -func (sm *ServerManager) OnCtrlKickOutSession(info base.ApiCtrlKickOutSession) base.HttpResponseBasic { - sm.mutex.Lock() - defer sm.mutex.Unlock() - g := sm.getGroup("", info.StreamName) - if g == nil { - return base.HttpResponseBasic{ - ErrorCode: base.ErrorCodeGroupNotFound, - Desp: base.DespGroupNotFound, - } - } - if !g.KickOutSession(info.SessionId) { - return base.HttpResponseBasic{ - ErrorCode: base.ErrorCodeSessionNotFound, - Desp: base.DespSessionNotFound, - } - } - return base.HttpResponseBasic{ - ErrorCode: base.ErrorCodeSucc, - Desp: base.DespSucc, + nazalog.Infof("cleanup hls file path. streamName=%s, path=%s", streamName, outPath) + if err := hls.RemoveAll(outPath); err != nil { + nazalog.Warnf("cleanup hls file path error. path=%s, err=%+v", outPath, err) + } + }, + appName, + streamName, + path, + ) } } // --------------------------------------------------------------------------------------------------------------------- -func (sm *ServerManager) statAllGroup() (sgs []base.StatGroup) { +func (sm *ServerManager) Config() *Config { + return sm.config +} + +func (sm *ServerManager) GetGroup(appName string, streamName string) *Group { sm.mutex.Lock() defer sm.mutex.Unlock() - sm.groupManager.Iterate(func(group *Group) bool { - sgs = append(sgs, group.GetStat()) - return true - }) - return + return sm.getGroup(appName, streamName) } +// ----- private method ------------------------------------------------------------------------------------------------ + // 注意,函数内部不加锁,由调用方保证加锁进入 func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group { g, createFlag := sm.groupManager.GetOrCreateGroup(appName, streamName) @@ -644,3 +736,17 @@ func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Gr func (sm *ServerManager) getGroup(appName string, streamName string) *Group { return sm.groupManager.GetGroup(appName, streamName) } + +// --------------------------------------------------------------------------------------------------------------------- + +func runWebPprof(addr string) { + nazalog.Infof("start web pprof listen. addr=%s", addr) + + //nazalog.Warn("start fgprof.") + //http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler()) + + if err := http.ListenAndServe(addr, nil); err != nil { + nazalog.Error(err) + return + } +} diff --git a/pkg/logic/var.go b/pkg/logic/var.go deleted file mode 100644 index a6cf254..0000000 --- a/pkg/logic/var.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package logic - -//var relayPushCheckIntervalMS = 1000 -var relayPushTimeoutMs = 5000 -var relayPushWriteAvTimeoutMs = 5000 - -var relayPullTimeoutMs = 5000 -var relayPullReadAvTimeoutMs = 5000 - -var calcSessionStatIntervalSec uint32 = 5 - -// 对于输入型session,检查一定时间内,是否没有收到数据 -// -// 对于输出型session,检查一定时间内,是否没有发送数据 -// 注意,这里既检查socket发送阻塞,又检查上层没有给session喂数据 -var checkSessionAliveIntervalSec uint32 = 10 - -var notifyHandler INotifyHandler = NewDefaultNotifyHandler()