// Copyright 2022, Chef. All rights reserved. // https://github.com/q191201771/lal // // Use of this source code is governed by a MIT-style license // that can be found in the License file. // // Author: Chef (191201771@qq.com) package main import ( "flag" "fmt" "github.com/q191201771/lal/pkg/rtsp" "os" "time" "github.com/q191201771/lal/pkg/aac" "github.com/q191201771/lal/pkg/avc" "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/remux" "github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/logic" "github.com/q191201771/naza/pkg/bininfo" ) // 注意,使用这个demo时,请确保这三个文件存在,文件下载地址 https://github.com/q191201771/lalext/tree/master/avfile const ( h264filename = "/tmp/test.h264" aacfilename = "/tmp/test.aac" flvfilename = "/tmp/test.flv" ) // 文档见 // https://pengrl.com/lal/#/customize_pub // // MySession 演示业务方实现 logic.ICustomizeHookSessionContext 接口,从而hook所有输入到lalserver中的流以及流中的数据。 type MySession struct { uniqueKey string streamName string } func (i *MySession) OnMsg(msg base.RtmpMsg) { // 业务方可以在这里对流做处理 if msg.IsAacSeqHeader() || msg.IsVideoKeySeqHeader() || msg.IsVideoKeyNalu() { nazalog.Debugf("%s", msg.DebugString()) } } func (i *MySession) OnStop() { nazalog.Debugf("OnStop") } func main() { defer nazalog.Sync() rtsp.BaseInSessionTimestampFilterFlag = false confFilename := parseFlag() lals := logic.NewLalServer(func(option *logic.Option) { option.ConfFilename = confFilename }) // 在常规lalserver基础上增加这行,用于演示hook lalserver中的流 lals.WithOnHookSession(func(uniqueKey string, streamName string) logic.ICustomizeHookSessionContext { // 有新的流了,创建业务层的对象,用于hook这个流 return &MySession{ uniqueKey: uniqueKey, streamName: streamName, } }) // 在常规lalserver基础上增加这两个例子,用于演示向lalserver输入自定义流 go showHowToCustomizePub(lals) go showHowToFlvCustomizePub(lals) err := lals.RunLoop() nazalog.Infof("lal server loop done. err=%+v", err) } func parseFlag() string { binInfoFlag := flag.Bool("v", false, "show bin info") cf := flag.String("c", "", "specify conf file") flag.Parse() if *binInfoFlag { _, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine()) _, _ = fmt.Fprintln(os.Stderr, base.LalFullInfo) os.Exit(0) } return *cf } func showHowToFlvCustomizePub(lals logic.ILalServer) { const customizePubStreamName = "f110" time.Sleep(200 * time.Millisecond) tags, err := httpflv.ReadAllTagsFromFlvFile(flvfilename) nazalog.Assert(nil, err) session, err := lals.AddCustomizePubSession(customizePubStreamName) nazalog.Assert(nil, err) startRealTime := time.Now() startTs := int64(0) for _, tag := range tags { msg := remux.FlvTag2RtmpMsg(tag) diffTs := int64(msg.Header.TimestampAbs) - startTs diffReal := time.Since(startRealTime).Milliseconds() if diffReal < diffTs { time.Sleep(time.Duration(diffTs-diffReal) * time.Millisecond) } session.FeedRtmpMsg(msg) } lals.DelCustomizePubSession(session) } func showHowToCustomizePub(lals logic.ILalServer) { const customizePubStreamName = "c110" time.Sleep(200 * time.Millisecond) // 从音频和视频各自的ES流文件中读取出所有数据 // 然后将它们按时间戳排序,合并到一个AvPacket数组中 audioContent, audioPackets := readAudioPacketsFromFile(aacfilename) _, videoPackets := readVideoPacketsFromFile(h264filename) packets := mergePackets(audioPackets, videoPackets) // 1. 向lalserver中加入自定义的pub session session, err := lals.AddCustomizePubSession(customizePubStreamName) nazalog.Assert(nil, err) // 2. 配置session session.WithOption(func(option *base.AvPacketStreamOption) { option.VideoFormat = base.AvPacketStreamVideoFormatAnnexb }) asc, err := aac.MakeAscWithAdtsHeader(audioContent[:aac.AdtsHeaderLength]) nazalog.Assert(nil, err) // 3. 填入aac的audio specific config信息 session.FeedAudioSpecificConfig(asc) // 4. 按时间戳间隔匀速发送音频和视频 startRealTime := time.Now() startTs := int64(0) for i := range packets { diffTs := packets[i].Timestamp - startTs diffReal := time.Now().Sub(startRealTime).Milliseconds() //nazalog.Debugf("%d: %s, %d, %d", i, packets[i].DebugString(), diffTs, diffReal) if diffReal < diffTs { time.Sleep(time.Duration(diffTs-diffReal) * time.Millisecond) } session.FeedAvPacket(packets[i]) } // 5. 所有数据发送关闭后,将pub session从lal server移除 lals.DelCustomizePubSession(session) } // readAudioPacketsFromFile 从aac es流文件读取所有音频包 func readAudioPacketsFromFile(filename string) (audioContent []byte, audioPackets []base.AvPacket) { var err error audioContent, err = os.ReadFile(filename) nazalog.Assert(nil, err) pos := 0 timestamp := float32(0) for { ctx, err := aac.NewAdtsHeaderContext(audioContent[pos : pos+aac.AdtsHeaderLength]) nazalog.Assert(nil, err) packet := base.AvPacket{ PayloadType: base.AvPacketPtAac, Timestamp: int64(timestamp), Payload: audioContent[pos+aac.AdtsHeaderLength : pos+int(ctx.AdtsLength)], } audioPackets = append(audioPackets, packet) timestamp += float32(48000*4*2) / float32(8192*2) // (frequence * bytePerSample * channel) / (packetSize * channel) pos += int(ctx.AdtsLength) if pos == len(audioContent) { break } } return } // readVideoPacketsFromFile 从h264 es流文件读取所有视频包 func readVideoPacketsFromFile(filename string) (videoContent []byte, videoPackets []base.AvPacket) { var err error videoContent, err = os.ReadFile(filename) nazalog.Assert(nil, err) timestamp := float32(0) err = avc.IterateNaluAnnexb(videoContent, func(nal []byte) { // 将nal数据转换为lalserver要求的格式输入 packet := base.AvPacket{ PayloadType: base.AvPacketPtAvc, Timestamp: int64(timestamp), Payload: append(avc.NaluStartCode4, nal...), } videoPackets = append(videoPackets, packet) t := avc.ParseNaluType(nal[0]) if t == avc.NaluTypeSps || t == avc.NaluTypePps || t == avc.NaluTypeSei { // noop } else { timestamp += float32(1000) / float32(15) // 1秒 / fps } }) nazalog.Assert(nil, err) return } // mergePackets 将音频队列和视频队列按时间戳有序合并为一个队列 func mergePackets(audioPackets, videoPackets []base.AvPacket) (packets []base.AvPacket) { var i, j int for { // audio数组为空,将video的剩余数据取出,然后merge结束 if i == len(audioPackets) { packets = append(packets, videoPackets[j:]...) break } // if j == len(videoPackets) { packets = append(packets, audioPackets[i:]...) break } // 音频和视频都有数据,取时间戳小的 if audioPackets[i].Timestamp < videoPackets[j].Timestamp { packets = append(packets, audioPackets[i]) i++ } else { packets = append(packets, videoPackets[j]) j++ } } return }