diff --git a/app/demo/customize_lalserver/customize_lalserver.go b/app/demo/customize_lalserver/customize_lalserver.go index 8f2c03a..7805250 100644 --- a/app/demo/customize_lalserver/customize_lalserver.go +++ b/app/demo/customize_lalserver/customize_lalserver.go @@ -14,13 +14,11 @@ import ( "github.com/q191201771/lal/pkg/aac" "github.com/q191201771/lal/pkg/avc" "github.com/q191201771/lal/pkg/remux" + "github.com/q191201771/naza/pkg/nazalog" "io/ioutil" "os" - "sync" "time" - "github.com/q191201771/naza/pkg/nazalog" - "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/logic" @@ -47,151 +45,169 @@ import ( // 业务方只要将流转换成lalserver所要求的格式,调用相应的接口传入数据即可 // +func main() { + defer nazalog.Sync() + + confFilename := parseFlag() + lals := logic.NewLalServer(func(option *logic.Option) { + option.ConfFilename = confFilename + }) + + // 比常规lalserver多加了这一行 + go showHowToCustomizePub(lals) + + err := lals.RunLoop() + nazalog.Infof("server manager done. err=%+v", err) +} + +func parseFlag() string { + binInfoFlag := flag.Bool("v", false, "show bin info") + cf := flag.String("c", "", "specify conf file") + flag.Parse() + + if *binInfoFlag { + _, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine()) + _, _ = fmt.Fprintln(os.Stderr, base.LalFullInfo) + os.Exit(0) + } + + return *cf +} + func showHowToCustomizePub(lals logic.ILalServer) { const ( - customizePubStreamName = "c110" - h264filename = "/tmp/test.h264" - aacfilename = "/tmp/test.aac" + h264filename = "/tmp/test.h264" + aacfilename = "/tmp/test.aac" - audioDurationInterval = uint32(23) - videoDurationInterval = uint32(66) + customizePubStreamName = "c110" ) time.Sleep(200 * time.Millisecond) + + // 从音频和视频各自的ES流文件中读取出所有数据 + // 然后将它们按时间戳排序,合并到一个AvPacket数组中 + audioContent, audioPackets := readAudioPacketsFromFile(aacfilename) + _, videoPackets := readVideoPacketsFromFile(h264filename) + packets := mergePackets(audioPackets, videoPackets) + + // 1. 向lalserver中加入自定义的pub session session, err := lals.AddCustomizePubSession(customizePubStreamName) nazalog.Assert(nil, err) + // 2. 配置session session.WithOption(func(option *remux.AvPacketStreamOption) { option.VideoFormat = remux.AvPacketStreamVideoFormatAnnexb }) - audioContent, err := ioutil.ReadFile(aacfilename) - nazalog.Assert(nil, err) - - videoContent, err := ioutil.ReadFile(h264filename) - nazalog.Assert(nil, err) - asc, err := aac.MakeAscWithAdtsHeader(audioContent[:aac.AdtsHeaderLength]) nazalog.Assert(nil, err) - + // 3. 填入aac的audio specific config信息 session.FeedAudioSpecificConfig(asc) - var ( - m sync.Mutex - audios []base.AvPacket - videos []base.AvPacket - ) - - var reorderFeedFilterFn = func(packet base.AvPacket) { - m.Lock() - defer m.Unlock() - - if packet.IsAudio() { - audios = append(audios, packet) - } else { - videos = append(videos, packet) - } - - for len(audios) > 0 && len(videos) > 0 { - if audios[0].Timestamp <= videos[0].Timestamp { - session.FeedAvPacket(audios[0]) - audios = audios[1:] - } else { - session.FeedAvPacket(videos[0]) - videos = videos[1:] - } + // 4. 按时间戳间隔匀速发送音频和视频 + startRealTime := time.Now() + startTs := uint32(0) + for i := range packets { + diffTs := time.Duration(packets[i].Timestamp - startTs) + diffReal := time.Duration(time.Now().Sub(startRealTime).Milliseconds()) + //nazalog.Debugf("%d: %s, %d, %d", i, packets[i].DebugString(), diffTs, diffReal) + if diffReal < diffTs { + time.Sleep((diffTs - diffReal) * time.Millisecond) } + session.FeedAvPacket(packets[i]) } - var wg sync.WaitGroup - wg.Add(2) - - go func() { - i := 0 - timestamp := uint32(0) - for { - ctx, err := aac.NewAdtsHeaderContext(audioContent[i : i+aac.AdtsHeaderLength]) - nazalog.Assert(nil, err) - - packet := base.AvPacket{ - PayloadType: base.AvPacketPtAac, - Timestamp: timestamp, - Payload: audioContent[i+aac.AdtsHeaderLength : i+int(ctx.AdtsLength)], - } - reorderFeedFilterFn(packet) + // 5. 所有数据发送关闭后,将pub session从lal server移除 + lals.DelCustomizePubSession(session) +} - i += int(ctx.AdtsLength) +// readAudioPacketsFromFile 从aac es流文件读取所有音频包 +// +func readAudioPacketsFromFile(filename string) (audioContent []byte, audioPackets []base.AvPacket) { + var err error + audioContent, err = ioutil.ReadFile(filename) + nazalog.Assert(nil, err) - timestamp += audioDurationInterval - time.Sleep(time.Duration(audioDurationInterval) * time.Millisecond) + pos := 0 + timestamp := float32(0) + for { + ctx, err := aac.NewAdtsHeaderContext(audioContent[pos : pos+aac.AdtsHeaderLength]) + nazalog.Assert(nil, err) - if i == len(audioContent) { - break - } + packet := base.AvPacket{ + PayloadType: base.AvPacketPtAac, + Timestamp: uint32(timestamp), + Payload: audioContent[pos+aac.AdtsHeaderLength : pos+int(ctx.AdtsLength)], } - wg.Done() - }() - - go func() { - timestamp := uint32(0) - for { - // 借助lal中的一个帮助函数,将es流切割成一个一个的nal - // 提示,这里切割的是整个文件,并且,函数执行结束后,并没有退出for循环,换句话说,流会无限循环输入到lalserver中 - err = avc.IterateNaluAnnexb(videoContent, func(nal []byte) { - // 将nal数据转换为lalserver要求的格式输入 - packet := base.AvPacket{ - PayloadType: base.AvPacketPtAvc, - Timestamp: timestamp, - Payload: append(avc.NaluStartCode4, nal...), - } - reorderFeedFilterFn(packet) - - // 发送完后,计算时间戳,并按帧间隔时间延时发送 - t := avc.ParseNaluType(nal[0]) - if t == avc.NaluTypeSps || t == avc.NaluTypePps || t == avc.NaluTypeSei { - // noop - } else { - timestamp += videoDurationInterval - time.Sleep(time.Duration(videoDurationInterval) * time.Millisecond) - } - }) - nazalog.Assert(nil, err) + audioPackets = append(audioPackets, packet) + + timestamp += float32(48000*4*2) / float32(8192*2) // (frequence * bytePerSample * channel) / (packetSize * channel) + pos += int(ctx.AdtsLength) + if pos == len(audioContent) { break } + } - wg.Done() - }() + return +} - wg.Wait() +// readVideoPacketsFromFile 从h264 es流文件读取所有视频包 +// +func readVideoPacketsFromFile(filename string) (videoContent []byte, videoPackets []base.AvPacket) { + var err error + videoContent, err = ioutil.ReadFile(filename) + nazalog.Assert(nil, err) - lals.DelCustomizePubSession(session) -} + timestamp := float32(0) + err = avc.IterateNaluAnnexb(videoContent, func(nal []byte) { + // 将nal数据转换为lalserver要求的格式输入 + packet := base.AvPacket{ + PayloadType: base.AvPacketPtAvc, + Timestamp: uint32(timestamp), + Payload: append(avc.NaluStartCode4, nal...), + } -func main() { - defer nazalog.Sync() + videoPackets = append(videoPackets, packet) - confFilename := parseFlag() - lals := logic.NewLalServer(func(option *logic.Option) { - option.ConfFilename = confFilename + t := avc.ParseNaluType(nal[0]) + if t == avc.NaluTypeSps || t == avc.NaluTypePps || t == avc.NaluTypeSei { + // noop + } else { + timestamp += float32(1000) / float32(15) // 1秒 / fps + } }) + nazalog.Assert(nil, err) - go showHowToCustomizePub(lals) - - err := lals.RunLoop() - nazalog.Infof("server manager done. err=%+v", err) + return } -func parseFlag() string { - binInfoFlag := flag.Bool("v", false, "show bin info") - cf := flag.String("c", "", "specify conf file") - flag.Parse() +// mergePackets 将音频队列和视频队列按时间戳有序合并为一个队列 +// +func mergePackets(audioPackets, videoPackets []base.AvPacket) (packets []base.AvPacket) { + var i, j int + for { + // audio数组为空,将video的剩余数据取出,然后merge结束 + if i == len(audioPackets) { + packets = append(packets, videoPackets[j:]...) + break + } - if *binInfoFlag { - _, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine()) - _, _ = fmt.Fprintln(os.Stderr, base.LalFullInfo) - os.Exit(0) + // + if j == len(videoPackets) { + packets = append(packets, audioPackets[i:]...) + break + } + + // 音频和视频都有数据,取时间戳小的 + if audioPackets[i].Timestamp < videoPackets[j].Timestamp { + packets = append(packets, audioPackets[i]) + i++ + } else { + packets = append(packets, videoPackets[j]) + j++ + } } - return *cf + return } diff --git a/pkg/base/avpacket.go b/pkg/base/avpacket.go index 9ad16b4..0b5e096 100644 --- a/pkg/base/avpacket.go +++ b/pkg/base/avpacket.go @@ -8,6 +8,12 @@ package base +import ( + "encoding/hex" + "fmt" + "github.com/q191201771/naza/pkg/nazabytes" +) + type AvPacketPt int const ( @@ -17,35 +23,40 @@ const ( AvPacketPtAac AvPacketPt = 97 ) -// AvPacket -// -// 不同场景使用时,字段含义可能不同。 -// 使用AvPacket的地方,应注明各字段的含义。 -// -type AvPacket struct { - PayloadType AvPacketPt - Timestamp uint32 // TODO(chef): 改成int64 - Payload []byte -} - func (a AvPacketPt) ReadableString() string { switch a { case AvPacketPtUnknown: return "unknown" case AvPacketPtAvc: - return "avc" + return "h264" case AvPacketPtHevc: - return "hevc" + return "h265" case AvPacketPtAac: return "aac" } return "" } -func (packet AvPacket) IsAudio() bool { +// AvPacket +// +// 不同场景使用时,字段含义可能不同。 +// 使用AvPacket的地方,应注明各字段的含义。 +// +type AvPacket struct { + PayloadType AvPacketPt + Timestamp uint32 // TODO(chef): 改成int64 + Payload []byte +} + +func (packet *AvPacket) IsAudio() bool { return packet.PayloadType == AvPacketPtAac } -func (packet AvPacket) IsVideo() bool { +func (packet *AvPacket) IsVideo() bool { return packet.PayloadType == AvPacketPtAvc || packet.PayloadType == AvPacketPtHevc } + +func (packet *AvPacket) DebugString() string { + return fmt.Sprintf("[%p] type=%s, timestamp=%d, len=%d, payload=%s", + packet, packet.PayloadType.ReadableString(), packet.Timestamp, len(packet.Payload), hex.Dump(nazabytes.Prefix(packet.Payload, 32))) +} diff --git a/pkg/remux/avpacket_stream.go b/pkg/remux/avpacket_stream.go index 8ecc726..c2362bb 100644 --- a/pkg/remux/avpacket_stream.go +++ b/pkg/remux/avpacket_stream.go @@ -41,7 +41,7 @@ type IAvPacketStream interface { // // PayloadType: 类型,支持avc(h264),hevc(h265),aac // - // Timestamp: 时间戳,单位毫秒 + // Timestamp: 时间戳,单位毫秒。注意,是累计递增值,不是单个包的duration时长。 // // Payload: 音视频数据,格式如下 //