diff --git a/app/demo/customize_lalserver/customize_lalserver.go b/app/demo/customize_lalserver/customize_lalserver.go index 5f8227d..8f2c03a 100644 --- a/app/demo/customize_lalserver/customize_lalserver.go +++ b/app/demo/customize_lalserver/customize_lalserver.go @@ -11,10 +11,12 @@ package main import ( "flag" "fmt" + "github.com/q191201771/lal/pkg/aac" "github.com/q191201771/lal/pkg/avc" "github.com/q191201771/lal/pkg/remux" "io/ioutil" "os" + "sync" "time" "github.com/q191201771/naza/pkg/nazalog" @@ -25,66 +27,145 @@ import ( "github.com/q191201771/naza/pkg/bininfo" ) -//lal/app/demo/customize_lalserver +// lal/app/demo/customize_lalserver // -// 演示lalserver通过插件功能扩展输入流 -// 提示,插件功能是基于代码层面的,和通过与lalserver建立连接将流发送到lalserver是两种不同的方式 +// [what] +// 演示业务方如何通过lalserver的插件功能,将自身的流输入到lalserver中 +// +// [why] +// 业务方的流输入到lalserver后,就可以使用lalserver的功能了,比如录制功能,(使用lalserver所支持的协议)从lalserver拉流等等 +// +// 提示,插件功能是基于代码层面的,和与lalserver建立连接将流发送到lalserver是两种不同的方式,但是流进入lalserver后,效果是一样的 // // 提示,这个demo,可以看成是业务方基于lal实现的一个定制化(功能增强版)的lalserver应用 // 换句话说,它并不强制要求在lal的github repo下 // -// demo的具体功能是,读取一个h264 es流文件,并将这个流输入到lalserver中 -// 注意,lalserver其实并不关心业务方的流的来源(比如网络or文件or其他) -// 也不关心原始流的格式 -// 只要业务方将流转换成lalserver所要求的格式,调用相应的接口传入数据即可 +// [how] +// demo的具体功能是,分别读取一个h264 es流文件和一个aac es流文件,并将音视频流输入到lalserver中 // -// 业务方的流输入到lalserver后,就可以使用lalserver所支持的协议,从lalserver拉流了 +// 注意,其实lalserver并不关心业务方的流的来源(比如网络or文件or其他),也不关心流的原始格式 +// 业务方只要将流转换成lalserver所要求的格式,调用相应的接口传入数据即可 // func showHowToCustomizePub(lals logic.ILalServer) { const ( customizePubStreamName = "c110" h264filename = "/tmp/test.h264" + aacfilename = "/tmp/test.aac" - durationInterval = uint32(66) + audioDurationInterval = uint32(23) + videoDurationInterval = uint32(66) ) + time.Sleep(200 * time.Millisecond) + session, err := lals.AddCustomizePubSession(customizePubStreamName) + nazalog.Assert(nil, err) + session.WithOption(func(option *remux.AvPacketStreamOption) { + option.VideoFormat = remux.AvPacketStreamVideoFormatAnnexb + }) + + audioContent, err := ioutil.ReadFile(aacfilename) + nazalog.Assert(nil, err) + + videoContent, err := ioutil.ReadFile(h264filename) + nazalog.Assert(nil, err) + + asc, err := aac.MakeAscWithAdtsHeader(audioContent[:aac.AdtsHeaderLength]) + nazalog.Assert(nil, err) + + session.FeedAudioSpecificConfig(asc) + + var ( + m sync.Mutex + audios []base.AvPacket + videos []base.AvPacket + ) + + var reorderFeedFilterFn = func(packet base.AvPacket) { + m.Lock() + defer m.Unlock() + + if packet.IsAudio() { + audios = append(audios, packet) + } else { + videos = append(videos, packet) + } + + for len(audios) > 0 && len(videos) > 0 { + if audios[0].Timestamp <= videos[0].Timestamp { + session.FeedAvPacket(audios[0]) + audios = audios[1:] + } else { + session.FeedAvPacket(videos[0]) + videos = videos[1:] + } + } + } + + var wg sync.WaitGroup + wg.Add(2) + go func() { - time.Sleep(200 * time.Millisecond) - session, err := lals.AddCustomizePubSession(customizePubStreamName) - nazalog.Assert(nil, err) - session.WithOption(func(option *remux.AvPacketStreamOption) { - option.VideoFormat = remux.AvPacketStreamVideoFormatAnnexb - }) + i := 0 + timestamp := uint32(0) + for { + ctx, err := aac.NewAdtsHeaderContext(audioContent[i : i+aac.AdtsHeaderLength]) + nazalog.Assert(nil, err) - // demo的输入比较简单,一次性将整个es文件读入 - content, err := ioutil.ReadFile(h264filename) - nazalog.Assert(nil, err) + packet := base.AvPacket{ + PayloadType: base.AvPacketPtAac, + Timestamp: timestamp, + Payload: audioContent[i+aac.AdtsHeaderLength : i+int(ctx.AdtsLength)], + } + reorderFeedFilterFn(packet) + + i += int(ctx.AdtsLength) + + timestamp += audioDurationInterval + time.Sleep(time.Duration(audioDurationInterval) * time.Millisecond) + + if i == len(audioContent) { + break + } + } + wg.Done() + }() + + go func() { timestamp := uint32(0) for { // 借助lal中的一个帮助函数,将es流切割成一个一个的nal // 提示,这里切割的是整个文件,并且,函数执行结束后,并没有退出for循环,换句话说,流会无限循环输入到lalserver中 - err = avc.IterateNaluAnnexb(content, func(nal []byte) { + err = avc.IterateNaluAnnexb(videoContent, func(nal []byte) { // 将nal数据转换为lalserver要求的格式输入 packet := base.AvPacket{ PayloadType: base.AvPacketPtAvc, Timestamp: timestamp, Payload: append(avc.NaluStartCode4, nal...), } - session.FeedAvPacket(packet) + reorderFeedFilterFn(packet) // 发送完后,计算时间戳,并按帧间隔时间延时发送 t := avc.ParseNaluType(nal[0]) if t == avc.NaluTypeSps || t == avc.NaluTypePps || t == avc.NaluTypeSei { // noop } else { - timestamp += durationInterval - time.Sleep(time.Duration(durationInterval) * time.Millisecond) + timestamp += videoDurationInterval + time.Sleep(time.Duration(videoDurationInterval) * time.Millisecond) } }) + nazalog.Assert(nil, err) + + break } + + wg.Done() }() + + wg.Wait() + + lals.DelCustomizePubSession(session) } func main() { @@ -95,7 +176,7 @@ func main() { option.ConfFilename = confFilename }) - showHowToCustomizePub(lals) + go showHowToCustomizePub(lals) err := lals.RunLoop() nazalog.Infof("server manager done. err=%+v", err) diff --git a/pkg/base/avpacket.go b/pkg/base/avpacket.go index 6f9c717..9ad16b4 100644 --- a/pkg/base/avpacket.go +++ b/pkg/base/avpacket.go @@ -12,9 +12,9 @@ type AvPacketPt int const ( AvPacketPtUnknown AvPacketPt = -1 - AvPacketPtAvc AvPacketPt = RtpPacketTypeAvcOrHevc - AvPacketPtHevc AvPacketPt = RtpPacketTypeHevc - AvPacketPtAac AvPacketPt = RtpPacketTypeAac + AvPacketPtAvc AvPacketPt = 96 // h264 + AvPacketPtHevc AvPacketPt = 98 // h265 + AvPacketPtAac AvPacketPt = 97 ) // AvPacket @@ -41,3 +41,11 @@ func (a AvPacketPt) ReadableString() string { } return "" } + +func (packet AvPacket) IsAudio() bool { + return packet.PayloadType == AvPacketPtAac +} + +func (packet AvPacket) IsVideo() bool { + return packet.PayloadType == AvPacketPtAvc || packet.PayloadType == AvPacketPtHevc +} diff --git a/pkg/base/unique.go b/pkg/base/unique.go index 35422da..de9db4d 100644 --- a/pkg/base/unique.go +++ b/pkg/base/unique.go @@ -11,17 +11,18 @@ package base import "github.com/q191201771/naza/pkg/unique" const ( - UkPreRtmpServerSession = "RTMPPUBSUB" - UkPreRtmpPushSession = "RTMPPUSH" - UkPreRtmpPullSession = "RTMPPULL" - UkPreRtspServerCommandSession = "RTSPSRVCMD" - UkPreRtspPubSession = "RTSPPUB" - UkPreRtspSubSession = "RTSPSUB" - UkPreRtspPushSession = "RTSPPUSH" - UkPreRtspPullSession = "RTSPPULL" - UkPreFlvSubSession = "FLVSUB" - UkPreTsSubSession = "TSSUB" - UkPreFlvPullSession = "FLVPULL" + UkPreCustomizePubSessionContext = "CUSTOMIZEPUB" + UkPreRtmpServerSession = "RTMPPUBSUB" + UkPreRtmpPushSession = "RTMPPUSH" + UkPreRtmpPullSession = "RTMPPULL" + UkPreRtspServerCommandSession = "RTSPSRVCMD" + UkPreRtspPubSession = "RTSPPUB" + UkPreRtspSubSession = "RTSPSUB" + UkPreRtspPushSession = "RTSPPUSH" + UkPreRtspPullSession = "RTSPPULL" + UkPreFlvSubSession = "FLVSUB" + UkPreTsSubSession = "TSSUB" + UkPreFlvPullSession = "FLVPULL" UkPreGroup = "GROUP" UkPreHlsMuxer = "HLSMUXER" @@ -32,6 +33,10 @@ const ( // return unique.GenUniqueKey(prefix) //} +func GenUkCustomizePubSession() string { + return siUkCustomizePubSession.GenUniqueKey() +} + func GenUkRtmpServerSession() string { return siUkRtmpServerSession.GenUniqueKey() } @@ -89,6 +94,7 @@ func GenUkRtmp2MpegtsRemuxer() string { } var ( + siUkCustomizePubSession *unique.SingleGenerator siUkRtmpServerSession *unique.SingleGenerator siUkRtmpPushSession *unique.SingleGenerator siUkRtmpPullSession *unique.SingleGenerator @@ -107,6 +113,7 @@ var ( ) func init() { + siUkCustomizePubSession = unique.NewSingleGenerator(UkPreCustomizePubSessionContext) siUkRtmpServerSession = unique.NewSingleGenerator(UkPreRtmpServerSession) siUkRtmpPushSession = unique.NewSingleGenerator(UkPreRtmpPushSession) siUkRtmpPullSession = unique.NewSingleGenerator(UkPreRtmpPullSession) diff --git a/pkg/innertest/iface_impl.go b/pkg/innertest/iface_impl.go index 1f2bda8..fe2966e 100644 --- a/pkg/innertest/iface_impl.go +++ b/pkg/innertest/iface_impl.go @@ -136,6 +136,12 @@ var ( // --------------------------------------------------------------------------------------------------------------------- +var _ logic.ICustomizePubSessionContext = &logic.CustomizePubSessionContext{} +var _ remux.IAvPacketStream = &logic.CustomizePubSessionContext{} +var _ remux.IAvPacketStream = &remux.AvPacket2RtmpRemuxer{} + +// --------------------------------------------------------------------------------------------------------------------- + var _ logic.ILalServer = &logic.ServerManager{} var _ rtmp.IServerObserver = &logic.ServerManager{} var _ logic.IHttpServerHandlerObserver = &logic.ServerManager{} diff --git a/pkg/logic/customize_pubsession.go b/pkg/logic/customize_pubsession.go index b500d32..c9089ce 100644 --- a/pkg/logic/customize_pubsession.go +++ b/pkg/logic/customize_pubsession.go @@ -14,6 +14,8 @@ import ( ) type CustomizePubSessionContext struct { + uniqueKey string + streamName string remuxer *remux.AvPacket2RtmpRemuxer onRtmpMsg func(msg base.RtmpMsg) @@ -21,6 +23,7 @@ type CustomizePubSessionContext struct { func NewCustomizePubSessionContext(streamName string) *CustomizePubSessionContext { return &CustomizePubSessionContext{ + uniqueKey: base.GenUkCustomizePubSession(), streamName: streamName, remuxer: remux.NewAvPacket2RtmpRemuxer(), } @@ -31,6 +34,10 @@ func (ctx *CustomizePubSessionContext) WithOnRtmpMsg(onRtmpMsg func(msg base.Rtm return ctx } +func (ctx *CustomizePubSessionContext) UniqueKey() string { + return ctx.uniqueKey +} + func (ctx *CustomizePubSessionContext) StreamName() string { return ctx.streamName } @@ -41,6 +48,10 @@ func (ctx *CustomizePubSessionContext) WithOption(modOption func(option *remux.A ctx.remuxer.WithOption(modOption) } +func (ctx *CustomizePubSessionContext) FeedAudioSpecificConfig(asc []byte) { + ctx.remuxer.InitWithAvConfig(asc, nil, nil, nil) +} + func (ctx *CustomizePubSessionContext) FeedAvPacket(packet base.AvPacket) { ctx.remuxer.FeedAvPacket(packet) } diff --git a/pkg/logic/group__in.go b/pkg/logic/group__in.go index 0bf8f65..0936a29 100644 --- a/pkg/logic/group__in.go +++ b/pkg/logic/group__in.go @@ -123,8 +123,10 @@ func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) bool { // --------------------------------------------------------------------------------------------------------------------- -func (sm *Group) DelCustomizePubSession(sessionCtx ICustomizePubSessionContext) { - +func (group *Group) DelCustomizePubSession(sessionCtx ICustomizePubSessionContext) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delCustomizePubSession(sessionCtx) } func (group *Group) DelRtmpPubSession(session *rtmp.ServerSession) { @@ -147,6 +149,18 @@ func (group *Group) DelRtmpPullSession(session *rtmp.PullSession) { // --------------------------------------------------------------------------------------------------------------------- +func (group *Group) delCustomizePubSession(sessionCtx ICustomizePubSessionContext) { + Log.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, sessionCtx.UniqueKey()) + + if sessionCtx != group.customizePubSession { + Log.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", + group.UniqueKey, sessionCtx.UniqueKey(), group.rtmpPubSession) + return + } + + group.delIn() +} + func (group *Group) delRtmpPubSession(session *rtmp.ServerSession) { Log.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, session.UniqueKey()) @@ -212,6 +226,7 @@ func (group *Group) delIn() { group.rtmpPubSession = nil group.rtspPubSession = nil + group.customizePubSession = nil group.rtsp2RtmpRemuxer = nil group.rtmp2RtspRemuxer = nil group.dummyAudioFilter = nil diff --git a/pkg/logic/logic.go b/pkg/logic/logic.go index 3fc6146..a17a128 100644 --- a/pkg/logic/logic.go +++ b/pkg/logic/logic.go @@ -20,17 +20,20 @@ type ILalServer interface { RunLoop() error Dispose() - // AddCustomizePubSession DelCustomizePubSession + // AddCustomizePubSession 定制化功能。业务方可以将自己的流输入到 ILalServer 中 // - // 业务方可以将自己的流输入到 ILalServer 中 + // @example 示例见 lal/app/demo/customize_lalserver // AddCustomizePubSession(streamName string) (ICustomizePubSessionContext, error) + + // DelCustomizePubSession 将 ICustomizePubSessionContext 从 ILalServer 中删除 + // DelCustomizePubSession(ICustomizePubSessionContext) - // StatLalInfo StatXxx... CtrlXxx... + // StatLalInfo StatAllGroup StatGroup CtrlStartPull CtrlKickOutSession // - // 一些获取状态、发送控制命令的API - // 目的是方便业务方在不修改logic包内代码的前提下,在外层实现一些特定逻辑的定制化开发 + // 一些获取状态、发送控制命令的API。 + // 目的是方便业务方在不修改logic包内代码的前提下,在外层实现一些特定逻辑的定制化开发。 // StatLalInfo() base.LalInfo StatAllGroup() (sgs []base.StatGroup) @@ -41,9 +44,7 @@ type ILalServer interface { // NewLalServer 创建一个lal server // -// @param modOption -// - 可变参数,如果不关心,可以不填,具体字段见 Option -// - 目的是方便业务方在不修改logic包内代码的前提下,在外层实现一些特定逻辑的定制化开发 +// @param modOption: 定制化配置。可变参数,如果不关心,可以不填,具体字段见 Option // func NewLalServer(modOption ...ModOption) ILalServer { return NewServerManager(modOption...) @@ -52,12 +53,11 @@ func NewLalServer(modOption ...ModOption) ILalServer { // --------------------------------------------------------------------------------------------------------------------- type ICustomizePubSessionContext interface { - // WithOption FeedAvPacket - // - // 见 remux.IAvPacketStream + // IAvPacketStream 传入音视频数据相关的接口。具体见 remux.IAvPacketStream // - WithOption(modOption func(option *remux.AvPacketStreamOption)) - FeedAvPacket(packet base.AvPacket) + remux.IAvPacketStream + + UniqueKey() string StreamName() string } @@ -77,18 +77,16 @@ type INotifyHandler interface { } type Option struct { - // ConfFilename - // - // 配置文件,注意,如果为空,内部会尝试从 DefaultConfFilenameList 读取默认配置文件 + // ConfFilename 配置文件,注意,如果为空,内部会尝试从 DefaultConfFilenameList 读取默认配置文件 // ConfFilename string // NotifyHandler // // 事件监听 - // 业务方可实现 INotifyHandler 接口并传入从而获取到对应的事件通知 - // 如果不填写保持默认值nil,内部默认走http notify的逻辑(当然,还需要在配置文件中开启http notify功能) - // 注意,如果业务方实现了自己的事件监听,则lal server内部不再走http notify的逻辑(也即二选一) + // 业务方可实现 INotifyHandler 接口并传入从而获取到对应的事件通知。 + // 如果不填写保持默认值nil,内部默认走http notify的逻辑(当然,还需要在配置文件中开启http notify功能)。 + // 注意,如果业务方实现了自己的事件监听,则lal server内部不再走http notify的逻辑(也即二选一)。 // NotifyHandler INotifyHandler } @@ -99,9 +97,7 @@ var defaultOption = Option{ type ModOption func(option *Option) -// DefaultConfFilenameList -// -// 没有指定配置文件时,按顺序作为优先级,找到第一个存在的并使用 +// DefaultConfFilenameList 没有指定配置文件时,按顺序作为优先级,找到第一个存在的并使用 // var DefaultConfFilenameList = []string{ filepath.FromSlash("lalserver.conf.json"), diff --git a/pkg/remux/avpacket2rtmp.go b/pkg/remux/avpacket2rtmp.go index d900cc0..85620f4 100644 --- a/pkg/remux/avpacket2rtmp.go +++ b/pkg/remux/avpacket2rtmp.go @@ -46,9 +46,8 @@ func NewAvPacket2RtmpRemuxer() *AvPacket2RtmpRemuxer { } } -func (r *AvPacket2RtmpRemuxer) WithOption(modOption func(option *AvPacketStreamOption)) *AvPacket2RtmpRemuxer { +func (r *AvPacket2RtmpRemuxer) WithOption(modOption func(option *AvPacketStreamOption)) { modOption(&r.option) - return r } func (r *AvPacket2RtmpRemuxer) WithOnRtmpMsg(onRtmpMsg rtmp.OnReadRtmpAvMsg) *AvPacket2RtmpRemuxer { diff --git a/pkg/remux/avpacket_stream.go b/pkg/remux/avpacket_stream.go index 8ab8361..8ecc726 100644 --- a/pkg/remux/avpacket_stream.go +++ b/pkg/remux/avpacket_stream.go @@ -28,5 +28,29 @@ var DefaultApsOption = AvPacketStreamOption{ type IAvPacketStream interface { WithOption(modOption func(option *AvPacketStreamOption)) + + // FeedAudioSpecificConfig 传入音频AAC的初始化数据 + // + // @param asc: AudioSpecificConfig。含义可参考 aac.AscContext, aac.MakeAscWithAdtsHeader 等内容 + // + FeedAudioSpecificConfig(asc []byte) + + // FeedAvPacket + // + // @param packet: + // + // PayloadType: 类型,支持avc(h264),hevc(h265),aac + // + // Timestamp: 时间戳,单位毫秒 + // + // Payload: 音视频数据,格式如下 + // + // 如果是音频AAC,格式是裸数据,不需要adts头。 + // 注意,调用 FeedAvPacket 传入音频数据前,需要先调用 FeedAudioSpecificConfig + // + // 如果是视频,支持Avcc和Annexb两种格式。 + // Avcc也即[<4字节长度 + nal>...],Annexb也即[<4字节start code 00 00 00 01 + nal>...]。 + // 注意,sps和pps也通过 FeedAvPacket 传入。sps和pps可以单独调用 FeedAvPacket,也可以sps+pps+I帧组合在一起调用一次 FeedAvPacket + // FeedAvPacket(packet base.AvPacket) }