[feat] customize: 支持将流输出给业务方做二次开发

pull/286/head
q191201771 2 years ago
parent b128695ae9
commit e66a93f55a

@ -26,10 +26,34 @@ import (
"github.com/q191201771/naza/pkg/bininfo"
)
// 注意使用这个demo时请确保这三个文件存在文件下载地址 https://github.com/q191201771/lalext/tree/master/avfile
const (
h264filename = "/tmp/test.h264"
aacfilename = "/tmp/test.aac"
flvfilename = "/tmp/test.flv"
)
// 文档见 <lalserver二次开发 - pub接入自定义流>
// https://pengrl.com/lal/#/customize_pub
//
// MySession 演示业务方实现 logic.ICustomizeHookSessionContext 接口从而hook所有输入到lalserver中的流以及流中的数据。
type MySession struct {
uniqueKey string
streamName string
}
func (i *MySession) OnMsg(msg base.RtmpMsg) {
// 业务方可以在这里对流做处理
if msg.IsAacSeqHeader() || msg.IsVideoKeySeqHeader() || msg.IsVideoKeyNalu() {
nazalog.Debugf("%s", msg.DebugString())
}
}
func (i *MySession) OnStop() {
nazalog.Debugf("OnStop")
}
func main() {
defer nazalog.Sync()
@ -38,7 +62,16 @@ func main() {
option.ConfFilename = confFilename
})
// 比常规lalserver多加了这一行
// 在常规lalserver基础上增加这行用于演示hook lalserver中的流
lals.WithOnHookSession(func(uniqueKey string, streamName string) logic.ICustomizeHookSessionContext {
// 有新的流了创建业务层的对象用于hook这个流
return &MySession{
uniqueKey: uniqueKey,
streamName: streamName,
}
})
// 在常规lalserver基础上增加这两个例子用于演示向lalserver输入自定义流
go showHowToCustomizePub(lals)
go showHowToFlvCustomizePub(lals)
@ -61,10 +94,7 @@ func parseFlag() string {
}
func showHowToFlvCustomizePub(lals logic.ILalServer) {
const (
flvfilename = "/tmp/test.flv"
customizePubStreamName = "f110"
)
const customizePubStreamName = "f110"
time.Sleep(200 * time.Millisecond)
@ -90,12 +120,7 @@ func showHowToFlvCustomizePub(lals logic.ILalServer) {
}
func showHowToCustomizePub(lals logic.ILalServer) {
const (
h264filename = "/tmp/test.h264"
aacfilename = "/tmp/test.aac"
customizePubStreamName = "c110"
)
const customizePubStreamName = "c110"
time.Sleep(200 * time.Millisecond)

@ -65,6 +65,10 @@ import (
// -> ...
// -> ...
type GroupOption struct {
onHookSession func(uniqueKey string, streamName string) ICustomizeHookSessionContext
}
type IGroupObserver interface {
CleanupHlsIfNeeded(appName string, streamName string, path string)
OnHlsMakeTs(info base.HlsMakeTsInfo)
@ -77,6 +81,10 @@ type Group struct {
appName string // const after init
streamName string // const after init TODO chef: 和stat里的字段重复可以删除掉
config *Config
option GroupOption
customizeHookSessionContext ICustomizeHookSessionContext
observer IGroupObserver
exitChan chan struct{}
@ -133,7 +141,7 @@ type Group struct {
rtspPullDumpFile *base.DumpFile
}
func NewGroup(appName string, streamName string, config *Config, observer IGroupObserver) *Group {
func NewGroup(appName string, streamName string, config *Config, option GroupOption, observer IGroupObserver) *Group {
uk := base.GenUkGroup()
g := &Group{
@ -141,6 +149,7 @@ func NewGroup(appName string, streamName string, config *Config, observer IGroup
appName: appName,
streamName: streamName,
config: config,
option: option,
observer: observer,
stat: base.StatGroup{
StreamName: streamName,

@ -238,6 +238,10 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
group.rtmp2RtspRemuxer.FeedRtmpMsg(msg)
}
if group.customizeHookSessionContext != nil {
group.customizeHookSessionContext.OnMsg(msg)
}
// # 广播。遍历所有 rtmp sub session转发数据
// ## 如果是新的 sub session发送已缓存的信息
for session := range group.rtmpSubSessionSet {

@ -361,6 +361,10 @@ func (group *Group) addIn() {
group.dummyAudioFilter = remux.NewDummyAudioFilter(group.UniqueKey, group.config.InSessionConfig.AddDummyAudioWaitAudioMs, group.broadcastByRtmpMsg)
}
if group.option.onHookSession != nil {
group.customizeHookSessionContext = group.option.onHookSession(group.inSessionUniqueKey(), group.streamName)
}
group.startPushIfNeeded()
group.startHlsIfNeeded()
group.startRecordFlvIfNeeded(now)
@ -375,6 +379,11 @@ func (group *Group) delIn() {
group.rtmp2MpegtsRemuxer = nil
}
if group.customizeHookSessionContext != nil {
group.customizeHookSessionContext.OnStop()
group.customizeHookSessionContext = nil
}
group.stopPushIfNeeded()
group.stopHlsIfNeeded()
group.stopRecordFlvIfNeeded()

@ -19,7 +19,7 @@ type mockGroupCreator struct {
func (m *mockGroupCreator) CreateGroup(appName, streamName string) *Group {
var config Config
return NewGroup(appName, streamName, &config, nil)
return NewGroup(appName, streamName, &config, GroupOption{}, nil)
}
var mgc = &mockGroupCreator{}

@ -34,6 +34,14 @@ type ILalServer interface {
//
DelCustomizePubSession(ICustomizePubSessionContext)
// WithOnHookSession
//
// hook所有输入到lal server的流。
// 二次开发时使用。
// 当有流进入lal server时 onHookSession 被调用,业务方应该在回调中返回一个实现了 ICustomizeHookSessionContext 接口的对象。
// 后续收到流数据以及流停止时, ICustomizeHookSessionContext 对象的 OnMsg 和 OnStop 方法会被调用。
WithOnHookSession(onHookSession func(uniqueKey string, streamName string) ICustomizeHookSessionContext)
// StatLalInfo StatAllGroup StatGroup CtrlStartPull CtrlKickOutSession
//
// 一些获取状态、发送控制命令的API。
@ -67,6 +75,19 @@ type ICustomizePubSessionContext interface {
StreamName() string
}
// ICustomizeHookSessionContext
//
// 业务方实现该接口从而hook所有输入到lalserver中的流以及流中的数据。
type ICustomizeHookSessionContext interface {
// OnMsg
//
// 注意业务方应尽快处理回调消息否则会阻塞lal server内部处理逻辑。
//
// @param msg: 注意,业务方不应该修改或持有该内存块,如果有需要,应对该内存块进行拷贝,比如调用 msg.Clone() 生成新的 base.RtmpMsg 再使用
OnMsg(msg base.RtmpMsg)
OnStop()
}
// ---------------------------------------------------------------------------------------------------------------------
// INotifyHandler 事件通知接口

@ -48,6 +48,8 @@ type ServerManager struct {
mutex sync.Mutex
groupManager IGroupManager
onHookSession func(uniqueKey string, streamName string) ICustomizeHookSessionContext
}
func NewServerManager(modOption ...ModOption) *ServerManager {
@ -395,6 +397,10 @@ func (sm *ServerManager) DelCustomizePubSession(sessionCtx ICustomizePubSessionC
group.DelCustomizePubSession(sessionCtx)
}
func (sm *ServerManager) WithOnHookSession(onHookSession func(uniqueKey string, streamName string) ICustomizeHookSessionContext) {
sm.onHookSession = onHookSession
}
// ----- implement rtmp.IServerObserver interface -----------------------------------------------------------------------
func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.ObjectPairArray) {
@ -709,7 +715,10 @@ func (sm *ServerManager) CreateGroup(appName string, streamName string) *Group {
} else {
config = sm.config
}
return NewGroup(appName, streamName, config, sm)
option := GroupOption{
onHookSession: sm.onHookSession,
}
return NewGroup(appName, streamName, config, option, sm)
}
// ----- implement IGroupObserver interface -----------------------------------------------------------------------------

Loading…
Cancel
Save