[feat] demo: customize_lalserver,演示向lalserver中输入音频加视频

pull/154/head
q191201771 3 years ago
parent fcfdb09463
commit 6718316ae9

@ -11,10 +11,12 @@ package main
import (
"flag"
"fmt"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/remux"
"io/ioutil"
"os"
"sync"
"time"
"github.com/q191201771/naza/pkg/nazalog"
@ -25,66 +27,145 @@ import (
"github.com/q191201771/naza/pkg/bininfo"
)
//lal/app/demo/customize_lalserver
// lal/app/demo/customize_lalserver
//
// 演示lalserver通过插件功能扩展输入流
// 提示插件功能是基于代码层面的和通过与lalserver建立连接将流发送到lalserver是两种不同的方式
// [what]
// 演示业务方如何通过lalserver的插件功能将自身的流输入到lalserver中
//
// [why]
// 业务方的流输入到lalserver后就可以使用lalserver的功能了比如录制功能使用lalserver所支持的协议从lalserver拉流等等
//
// 提示插件功能是基于代码层面的和与lalserver建立连接将流发送到lalserver是两种不同的方式但是流进入lalserver后效果是一样的
//
// 提示这个demo可以看成是业务方基于lal实现的一个定制化功能增强版的lalserver应用
// 换句话说它并不强制要求在lal的github repo下
//
// demo的具体功能是读取一个h264 es流文件并将这个流输入到lalserver中
// 注意lalserver其实并不关心业务方的流的来源比如网络or文件or其他
// 也不关心原始流的格式
// 只要业务方将流转换成lalserver所要求的格式调用相应的接口传入数据即可
// [how]
// demo的具体功能是分别读取一个h264 es流文件和一个aac es流文件并将音视频流输入到lalserver中
//
// 业务方的流输入到lalserver后就可以使用lalserver所支持的协议从lalserver拉流了
// 注意其实lalserver并不关心业务方的流的来源比如网络or文件or其他也不关心流的原始格式
// 业务方只要将流转换成lalserver所要求的格式调用相应的接口传入数据即可
//
func showHowToCustomizePub(lals logic.ILalServer) {
const (
customizePubStreamName = "c110"
h264filename = "/tmp/test.h264"
aacfilename = "/tmp/test.aac"
durationInterval = uint32(66)
audioDurationInterval = uint32(23)
videoDurationInterval = uint32(66)
)
time.Sleep(200 * time.Millisecond)
session, err := lals.AddCustomizePubSession(customizePubStreamName)
nazalog.Assert(nil, err)
session.WithOption(func(option *remux.AvPacketStreamOption) {
option.VideoFormat = remux.AvPacketStreamVideoFormatAnnexb
})
audioContent, err := ioutil.ReadFile(aacfilename)
nazalog.Assert(nil, err)
videoContent, err := ioutil.ReadFile(h264filename)
nazalog.Assert(nil, err)
asc, err := aac.MakeAscWithAdtsHeader(audioContent[:aac.AdtsHeaderLength])
nazalog.Assert(nil, err)
session.FeedAudioSpecificConfig(asc)
var (
m sync.Mutex
audios []base.AvPacket
videos []base.AvPacket
)
var reorderFeedFilterFn = func(packet base.AvPacket) {
m.Lock()
defer m.Unlock()
if packet.IsAudio() {
audios = append(audios, packet)
} else {
videos = append(videos, packet)
}
for len(audios) > 0 && len(videos) > 0 {
if audios[0].Timestamp <= videos[0].Timestamp {
session.FeedAvPacket(audios[0])
audios = audios[1:]
} else {
session.FeedAvPacket(videos[0])
videos = videos[1:]
}
}
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
time.Sleep(200 * time.Millisecond)
session, err := lals.AddCustomizePubSession(customizePubStreamName)
nazalog.Assert(nil, err)
session.WithOption(func(option *remux.AvPacketStreamOption) {
option.VideoFormat = remux.AvPacketStreamVideoFormatAnnexb
})
i := 0
timestamp := uint32(0)
for {
ctx, err := aac.NewAdtsHeaderContext(audioContent[i : i+aac.AdtsHeaderLength])
nazalog.Assert(nil, err)
// demo的输入比较简单一次性将整个es文件读入
content, err := ioutil.ReadFile(h264filename)
nazalog.Assert(nil, err)
packet := base.AvPacket{
PayloadType: base.AvPacketPtAac,
Timestamp: timestamp,
Payload: audioContent[i+aac.AdtsHeaderLength : i+int(ctx.AdtsLength)],
}
reorderFeedFilterFn(packet)
i += int(ctx.AdtsLength)
timestamp += audioDurationInterval
time.Sleep(time.Duration(audioDurationInterval) * time.Millisecond)
if i == len(audioContent) {
break
}
}
wg.Done()
}()
go func() {
timestamp := uint32(0)
for {
// 借助lal中的一个帮助函数将es流切割成一个一个的nal
// 提示这里切割的是整个文件并且函数执行结束后并没有退出for循环换句话说流会无限循环输入到lalserver中
err = avc.IterateNaluAnnexb(content, func(nal []byte) {
err = avc.IterateNaluAnnexb(videoContent, func(nal []byte) {
// 将nal数据转换为lalserver要求的格式输入
packet := base.AvPacket{
PayloadType: base.AvPacketPtAvc,
Timestamp: timestamp,
Payload: append(avc.NaluStartCode4, nal...),
}
session.FeedAvPacket(packet)
reorderFeedFilterFn(packet)
// 发送完后,计算时间戳,并按帧间隔时间延时发送
t := avc.ParseNaluType(nal[0])
if t == avc.NaluTypeSps || t == avc.NaluTypePps || t == avc.NaluTypeSei {
// noop
} else {
timestamp += durationInterval
time.Sleep(time.Duration(durationInterval) * time.Millisecond)
timestamp += videoDurationInterval
time.Sleep(time.Duration(videoDurationInterval) * time.Millisecond)
}
})
nazalog.Assert(nil, err)
break
}
wg.Done()
}()
wg.Wait()
lals.DelCustomizePubSession(session)
}
func main() {
@ -95,7 +176,7 @@ func main() {
option.ConfFilename = confFilename
})
showHowToCustomizePub(lals)
go showHowToCustomizePub(lals)
err := lals.RunLoop()
nazalog.Infof("server manager done. err=%+v", err)

@ -12,9 +12,9 @@ type AvPacketPt int
const (
AvPacketPtUnknown AvPacketPt = -1
AvPacketPtAvc AvPacketPt = RtpPacketTypeAvcOrHevc
AvPacketPtHevc AvPacketPt = RtpPacketTypeHevc
AvPacketPtAac AvPacketPt = RtpPacketTypeAac
AvPacketPtAvc AvPacketPt = 96 // h264
AvPacketPtHevc AvPacketPt = 98 // h265
AvPacketPtAac AvPacketPt = 97
)
// AvPacket
@ -41,3 +41,11 @@ func (a AvPacketPt) ReadableString() string {
}
return ""
}
func (packet AvPacket) IsAudio() bool {
return packet.PayloadType == AvPacketPtAac
}
func (packet AvPacket) IsVideo() bool {
return packet.PayloadType == AvPacketPtAvc || packet.PayloadType == AvPacketPtHevc
}

@ -11,17 +11,18 @@ package base
import "github.com/q191201771/naza/pkg/unique"
const (
UkPreRtmpServerSession = "RTMPPUBSUB"
UkPreRtmpPushSession = "RTMPPUSH"
UkPreRtmpPullSession = "RTMPPULL"
UkPreRtspServerCommandSession = "RTSPSRVCMD"
UkPreRtspPubSession = "RTSPPUB"
UkPreRtspSubSession = "RTSPSUB"
UkPreRtspPushSession = "RTSPPUSH"
UkPreRtspPullSession = "RTSPPULL"
UkPreFlvSubSession = "FLVSUB"
UkPreTsSubSession = "TSSUB"
UkPreFlvPullSession = "FLVPULL"
UkPreCustomizePubSessionContext = "CUSTOMIZEPUB"
UkPreRtmpServerSession = "RTMPPUBSUB"
UkPreRtmpPushSession = "RTMPPUSH"
UkPreRtmpPullSession = "RTMPPULL"
UkPreRtspServerCommandSession = "RTSPSRVCMD"
UkPreRtspPubSession = "RTSPPUB"
UkPreRtspSubSession = "RTSPSUB"
UkPreRtspPushSession = "RTSPPUSH"
UkPreRtspPullSession = "RTSPPULL"
UkPreFlvSubSession = "FLVSUB"
UkPreTsSubSession = "TSSUB"
UkPreFlvPullSession = "FLVPULL"
UkPreGroup = "GROUP"
UkPreHlsMuxer = "HLSMUXER"
@ -32,6 +33,10 @@ const (
// return unique.GenUniqueKey(prefix)
//}
func GenUkCustomizePubSession() string {
return siUkCustomizePubSession.GenUniqueKey()
}
func GenUkRtmpServerSession() string {
return siUkRtmpServerSession.GenUniqueKey()
}
@ -89,6 +94,7 @@ func GenUkRtmp2MpegtsRemuxer() string {
}
var (
siUkCustomizePubSession *unique.SingleGenerator
siUkRtmpServerSession *unique.SingleGenerator
siUkRtmpPushSession *unique.SingleGenerator
siUkRtmpPullSession *unique.SingleGenerator
@ -107,6 +113,7 @@ var (
)
func init() {
siUkCustomizePubSession = unique.NewSingleGenerator(UkPreCustomizePubSessionContext)
siUkRtmpServerSession = unique.NewSingleGenerator(UkPreRtmpServerSession)
siUkRtmpPushSession = unique.NewSingleGenerator(UkPreRtmpPushSession)
siUkRtmpPullSession = unique.NewSingleGenerator(UkPreRtmpPullSession)

@ -136,6 +136,12 @@ var (
// ---------------------------------------------------------------------------------------------------------------------
var _ logic.ICustomizePubSessionContext = &logic.CustomizePubSessionContext{}
var _ remux.IAvPacketStream = &logic.CustomizePubSessionContext{}
var _ remux.IAvPacketStream = &remux.AvPacket2RtmpRemuxer{}
// ---------------------------------------------------------------------------------------------------------------------
var _ logic.ILalServer = &logic.ServerManager{}
var _ rtmp.IServerObserver = &logic.ServerManager{}
var _ logic.IHttpServerHandlerObserver = &logic.ServerManager{}

@ -14,6 +14,8 @@ import (
)
type CustomizePubSessionContext struct {
uniqueKey string
streamName string
remuxer *remux.AvPacket2RtmpRemuxer
onRtmpMsg func(msg base.RtmpMsg)
@ -21,6 +23,7 @@ type CustomizePubSessionContext struct {
func NewCustomizePubSessionContext(streamName string) *CustomizePubSessionContext {
return &CustomizePubSessionContext{
uniqueKey: base.GenUkCustomizePubSession(),
streamName: streamName,
remuxer: remux.NewAvPacket2RtmpRemuxer(),
}
@ -31,6 +34,10 @@ func (ctx *CustomizePubSessionContext) WithOnRtmpMsg(onRtmpMsg func(msg base.Rtm
return ctx
}
func (ctx *CustomizePubSessionContext) UniqueKey() string {
return ctx.uniqueKey
}
func (ctx *CustomizePubSessionContext) StreamName() string {
return ctx.streamName
}
@ -41,6 +48,10 @@ func (ctx *CustomizePubSessionContext) WithOption(modOption func(option *remux.A
ctx.remuxer.WithOption(modOption)
}
func (ctx *CustomizePubSessionContext) FeedAudioSpecificConfig(asc []byte) {
ctx.remuxer.InitWithAvConfig(asc, nil, nil, nil)
}
func (ctx *CustomizePubSessionContext) FeedAvPacket(packet base.AvPacket) {
ctx.remuxer.FeedAvPacket(packet)
}

@ -123,8 +123,10 @@ func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) bool {
// ---------------------------------------------------------------------------------------------------------------------
func (sm *Group) DelCustomizePubSession(sessionCtx ICustomizePubSessionContext) {
func (group *Group) DelCustomizePubSession(sessionCtx ICustomizePubSessionContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delCustomizePubSession(sessionCtx)
}
func (group *Group) DelRtmpPubSession(session *rtmp.ServerSession) {
@ -147,6 +149,18 @@ func (group *Group) DelRtmpPullSession(session *rtmp.PullSession) {
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) delCustomizePubSession(sessionCtx ICustomizePubSessionContext) {
Log.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, sessionCtx.UniqueKey())
if sessionCtx != group.customizePubSession {
Log.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p",
group.UniqueKey, sessionCtx.UniqueKey(), group.rtmpPubSession)
return
}
group.delIn()
}
func (group *Group) delRtmpPubSession(session *rtmp.ServerSession) {
Log.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, session.UniqueKey())
@ -212,6 +226,7 @@ func (group *Group) delIn() {
group.rtmpPubSession = nil
group.rtspPubSession = nil
group.customizePubSession = nil
group.rtsp2RtmpRemuxer = nil
group.rtmp2RtspRemuxer = nil
group.dummyAudioFilter = nil

@ -20,17 +20,20 @@ type ILalServer interface {
RunLoop() error
Dispose()
// AddCustomizePubSession DelCustomizePubSession
// AddCustomizePubSession 定制化功能。业务方可以将自己的流输入到 ILalServer 中
//
// 业务方可以将自己的流输入到 ILalServer 中
// @example 示例见 lal/app/demo/customize_lalserver
//
AddCustomizePubSession(streamName string) (ICustomizePubSessionContext, error)
// DelCustomizePubSession 将 ICustomizePubSessionContext 从 ILalServer 中删除
//
DelCustomizePubSession(ICustomizePubSessionContext)
// StatLalInfo StatXxx... CtrlXxx...
// StatLalInfo StatAllGroup StatGroup CtrlStartPull CtrlKickOutSession
//
// 一些获取状态、发送控制命令的API
// 目的是方便业务方在不修改logic包内代码的前提下在外层实现一些特定逻辑的定制化开发
// 一些获取状态、发送控制命令的API
// 目的是方便业务方在不修改logic包内代码的前提下在外层实现一些特定逻辑的定制化开发
//
StatLalInfo() base.LalInfo
StatAllGroup() (sgs []base.StatGroup)
@ -41,9 +44,7 @@ type ILalServer interface {
// NewLalServer 创建一个lal server
//
// @param modOption
// - 可变参数,如果不关心,可以不填,具体字段见 Option
// - 目的是方便业务方在不修改logic包内代码的前提下在外层实现一些特定逻辑的定制化开发
// @param modOption: 定制化配置。可变参数,如果不关心,可以不填,具体字段见 Option
//
func NewLalServer(modOption ...ModOption) ILalServer {
return NewServerManager(modOption...)
@ -52,12 +53,11 @@ func NewLalServer(modOption ...ModOption) ILalServer {
// ---------------------------------------------------------------------------------------------------------------------
type ICustomizePubSessionContext interface {
// WithOption FeedAvPacket
//
// 见 remux.IAvPacketStream
// IAvPacketStream 传入音视频数据相关的接口。具体见 remux.IAvPacketStream
//
WithOption(modOption func(option *remux.AvPacketStreamOption))
FeedAvPacket(packet base.AvPacket)
remux.IAvPacketStream
UniqueKey() string
StreamName() string
}
@ -77,18 +77,16 @@ type INotifyHandler interface {
}
type Option struct {
// ConfFilename
//
// 配置文件,注意,如果为空,内部会尝试从 DefaultConfFilenameList 读取默认配置文件
// ConfFilename 配置文件,注意,如果为空,内部会尝试从 DefaultConfFilenameList 读取默认配置文件
//
ConfFilename string
// NotifyHandler
//
// 事件监听
// 业务方可实现 INotifyHandler 接口并传入从而获取到对应的事件通知
// 如果不填写保持默认值nil内部默认走http notify的逻辑当然还需要在配置文件中开启http notify功能
// 注意如果业务方实现了自己的事件监听则lal server内部不再走http notify的逻辑也即二选一
// 业务方可实现 INotifyHandler 接口并传入从而获取到对应的事件通知
// 如果不填写保持默认值nil内部默认走http notify的逻辑当然还需要在配置文件中开启http notify功能
// 注意如果业务方实现了自己的事件监听则lal server内部不再走http notify的逻辑也即二选一
//
NotifyHandler INotifyHandler
}
@ -99,9 +97,7 @@ var defaultOption = Option{
type ModOption func(option *Option)
// DefaultConfFilenameList
//
// 没有指定配置文件时,按顺序作为优先级,找到第一个存在的并使用
// DefaultConfFilenameList 没有指定配置文件时,按顺序作为优先级,找到第一个存在的并使用
//
var DefaultConfFilenameList = []string{
filepath.FromSlash("lalserver.conf.json"),

@ -46,9 +46,8 @@ func NewAvPacket2RtmpRemuxer() *AvPacket2RtmpRemuxer {
}
}
func (r *AvPacket2RtmpRemuxer) WithOption(modOption func(option *AvPacketStreamOption)) *AvPacket2RtmpRemuxer {
func (r *AvPacket2RtmpRemuxer) WithOption(modOption func(option *AvPacketStreamOption)) {
modOption(&r.option)
return r
}
func (r *AvPacket2RtmpRemuxer) WithOnRtmpMsg(onRtmpMsg rtmp.OnReadRtmpAvMsg) *AvPacket2RtmpRemuxer {

@ -28,5 +28,29 @@ var DefaultApsOption = AvPacketStreamOption{
type IAvPacketStream interface {
WithOption(modOption func(option *AvPacketStreamOption))
// FeedAudioSpecificConfig 传入音频AAC的初始化数据
//
// @param asc: AudioSpecificConfig。含义可参考 aac.AscContext, aac.MakeAscWithAdtsHeader 等内容
//
FeedAudioSpecificConfig(asc []byte)
// FeedAvPacket
//
// @param packet:
//
// PayloadType: 类型支持avc(h264)hevc(h265)aac
//
// Timestamp: 时间戳,单位毫秒
//
// Payload: 音视频数据,格式如下
//
// 如果是音频AAC格式是裸数据不需要adts头。
// 注意,调用 FeedAvPacket 传入音频数据前,需要先调用 FeedAudioSpecificConfig
//
// 如果是视频支持Avcc和Annexb两种格式。
// Avcc也即[<4字节长度 + nal>...]Annexb也即[<4字节start code 00 00 00 01 + nal>...]。
// 注意sps和pps也通过 FeedAvPacket 传入。sps和pps可以单独调用 FeedAvPacket也可以sps+pps+I帧组合在一起调用一次 FeedAvPacket
//
FeedAvPacket(packet base.AvPacket)
}

Loading…
Cancel
Save