[opt] demo: 优化customize pub,匀速发送流,避免卡顿

pull/154/head
q191201771 3 years ago
parent c54b7cace3
commit 09f903d5ca

@ -14,13 +14,11 @@ import (
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/naza/pkg/nazalog"
"io/ioutil"
"os"
"sync"
"time"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/logic"
@ -47,151 +45,169 @@ import (
// 业务方只要将流转换成lalserver所要求的格式调用相应的接口传入数据即可
//
func main() {
defer nazalog.Sync()
confFilename := parseFlag()
lals := logic.NewLalServer(func(option *logic.Option) {
option.ConfFilename = confFilename
})
// 比常规lalserver多加了这一行
go showHowToCustomizePub(lals)
err := lals.RunLoop()
nazalog.Infof("server manager done. err=%+v", err)
}
func parseFlag() string {
binInfoFlag := flag.Bool("v", false, "show bin info")
cf := flag.String("c", "", "specify conf file")
flag.Parse()
if *binInfoFlag {
_, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine())
_, _ = fmt.Fprintln(os.Stderr, base.LalFullInfo)
os.Exit(0)
}
return *cf
}
func showHowToCustomizePub(lals logic.ILalServer) {
const (
customizePubStreamName = "c110"
h264filename = "/tmp/test.h264"
aacfilename = "/tmp/test.aac"
h264filename = "/tmp/test.h264"
aacfilename = "/tmp/test.aac"
audioDurationInterval = uint32(23)
videoDurationInterval = uint32(66)
customizePubStreamName = "c110"
)
time.Sleep(200 * time.Millisecond)
// 从音频和视频各自的ES流文件中读取出所有数据
// 然后将它们按时间戳排序合并到一个AvPacket数组中
audioContent, audioPackets := readAudioPacketsFromFile(aacfilename)
_, videoPackets := readVideoPacketsFromFile(h264filename)
packets := mergePackets(audioPackets, videoPackets)
// 1. 向lalserver中加入自定义的pub session
session, err := lals.AddCustomizePubSession(customizePubStreamName)
nazalog.Assert(nil, err)
// 2. 配置session
session.WithOption(func(option *remux.AvPacketStreamOption) {
option.VideoFormat = remux.AvPacketStreamVideoFormatAnnexb
})
audioContent, err := ioutil.ReadFile(aacfilename)
nazalog.Assert(nil, err)
videoContent, err := ioutil.ReadFile(h264filename)
nazalog.Assert(nil, err)
asc, err := aac.MakeAscWithAdtsHeader(audioContent[:aac.AdtsHeaderLength])
nazalog.Assert(nil, err)
// 3. 填入aac的audio specific config信息
session.FeedAudioSpecificConfig(asc)
var (
m sync.Mutex
audios []base.AvPacket
videos []base.AvPacket
)
var reorderFeedFilterFn = func(packet base.AvPacket) {
m.Lock()
defer m.Unlock()
if packet.IsAudio() {
audios = append(audios, packet)
} else {
videos = append(videos, packet)
}
for len(audios) > 0 && len(videos) > 0 {
if audios[0].Timestamp <= videos[0].Timestamp {
session.FeedAvPacket(audios[0])
audios = audios[1:]
} else {
session.FeedAvPacket(videos[0])
videos = videos[1:]
}
// 4. 按时间戳间隔匀速发送音频和视频
startRealTime := time.Now()
startTs := uint32(0)
for i := range packets {
diffTs := time.Duration(packets[i].Timestamp - startTs)
diffReal := time.Duration(time.Now().Sub(startRealTime).Milliseconds())
//nazalog.Debugf("%d: %s, %d, %d", i, packets[i].DebugString(), diffTs, diffReal)
if diffReal < diffTs {
time.Sleep((diffTs - diffReal) * time.Millisecond)
}
session.FeedAvPacket(packets[i])
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
i := 0
timestamp := uint32(0)
for {
ctx, err := aac.NewAdtsHeaderContext(audioContent[i : i+aac.AdtsHeaderLength])
nazalog.Assert(nil, err)
packet := base.AvPacket{
PayloadType: base.AvPacketPtAac,
Timestamp: timestamp,
Payload: audioContent[i+aac.AdtsHeaderLength : i+int(ctx.AdtsLength)],
}
reorderFeedFilterFn(packet)
// 5. 所有数据发送关闭后将pub session从lal server移除
lals.DelCustomizePubSession(session)
}
i += int(ctx.AdtsLength)
// readAudioPacketsFromFile 从aac es流文件读取所有音频包
//
func readAudioPacketsFromFile(filename string) (audioContent []byte, audioPackets []base.AvPacket) {
var err error
audioContent, err = ioutil.ReadFile(filename)
nazalog.Assert(nil, err)
timestamp += audioDurationInterval
time.Sleep(time.Duration(audioDurationInterval) * time.Millisecond)
pos := 0
timestamp := float32(0)
for {
ctx, err := aac.NewAdtsHeaderContext(audioContent[pos : pos+aac.AdtsHeaderLength])
nazalog.Assert(nil, err)
if i == len(audioContent) {
break
}
packet := base.AvPacket{
PayloadType: base.AvPacketPtAac,
Timestamp: uint32(timestamp),
Payload: audioContent[pos+aac.AdtsHeaderLength : pos+int(ctx.AdtsLength)],
}
wg.Done()
}()
go func() {
timestamp := uint32(0)
for {
// 借助lal中的一个帮助函数将es流切割成一个一个的nal
// 提示这里切割的是整个文件并且函数执行结束后并没有退出for循环换句话说流会无限循环输入到lalserver中
err = avc.IterateNaluAnnexb(videoContent, func(nal []byte) {
// 将nal数据转换为lalserver要求的格式输入
packet := base.AvPacket{
PayloadType: base.AvPacketPtAvc,
Timestamp: timestamp,
Payload: append(avc.NaluStartCode4, nal...),
}
reorderFeedFilterFn(packet)
// 发送完后,计算时间戳,并按帧间隔时间延时发送
t := avc.ParseNaluType(nal[0])
if t == avc.NaluTypeSps || t == avc.NaluTypePps || t == avc.NaluTypeSei {
// noop
} else {
timestamp += videoDurationInterval
time.Sleep(time.Duration(videoDurationInterval) * time.Millisecond)
}
})
nazalog.Assert(nil, err)
audioPackets = append(audioPackets, packet)
timestamp += float32(48000*4*2) / float32(8192*2) // (frequence * bytePerSample * channel) / (packetSize * channel)
pos += int(ctx.AdtsLength)
if pos == len(audioContent) {
break
}
}
wg.Done()
}()
return
}
wg.Wait()
// readVideoPacketsFromFile 从h264 es流文件读取所有视频包
//
func readVideoPacketsFromFile(filename string) (videoContent []byte, videoPackets []base.AvPacket) {
var err error
videoContent, err = ioutil.ReadFile(filename)
nazalog.Assert(nil, err)
lals.DelCustomizePubSession(session)
}
timestamp := float32(0)
err = avc.IterateNaluAnnexb(videoContent, func(nal []byte) {
// 将nal数据转换为lalserver要求的格式输入
packet := base.AvPacket{
PayloadType: base.AvPacketPtAvc,
Timestamp: uint32(timestamp),
Payload: append(avc.NaluStartCode4, nal...),
}
func main() {
defer nazalog.Sync()
videoPackets = append(videoPackets, packet)
confFilename := parseFlag()
lals := logic.NewLalServer(func(option *logic.Option) {
option.ConfFilename = confFilename
t := avc.ParseNaluType(nal[0])
if t == avc.NaluTypeSps || t == avc.NaluTypePps || t == avc.NaluTypeSei {
// noop
} else {
timestamp += float32(1000) / float32(15) // 1秒 / fps
}
})
nazalog.Assert(nil, err)
go showHowToCustomizePub(lals)
err := lals.RunLoop()
nazalog.Infof("server manager done. err=%+v", err)
return
}
func parseFlag() string {
binInfoFlag := flag.Bool("v", false, "show bin info")
cf := flag.String("c", "", "specify conf file")
flag.Parse()
// mergePackets 将音频队列和视频队列按时间戳有序合并为一个队列
//
func mergePackets(audioPackets, videoPackets []base.AvPacket) (packets []base.AvPacket) {
var i, j int
for {
// audio数组为空将video的剩余数据取出然后merge结束
if i == len(audioPackets) {
packets = append(packets, videoPackets[j:]...)
break
}
if *binInfoFlag {
_, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine())
_, _ = fmt.Fprintln(os.Stderr, base.LalFullInfo)
os.Exit(0)
//
if j == len(videoPackets) {
packets = append(packets, audioPackets[i:]...)
break
}
// 音频和视频都有数据,取时间戳小的
if audioPackets[i].Timestamp < videoPackets[j].Timestamp {
packets = append(packets, audioPackets[i])
i++
} else {
packets = append(packets, videoPackets[j])
j++
}
}
return *cf
return
}

@ -8,6 +8,12 @@
package base
import (
"encoding/hex"
"fmt"
"github.com/q191201771/naza/pkg/nazabytes"
)
type AvPacketPt int
const (
@ -17,35 +23,40 @@ const (
AvPacketPtAac AvPacketPt = 97
)
// AvPacket
//
// 不同场景使用时,字段含义可能不同。
// 使用AvPacket的地方应注明各字段的含义。
//
type AvPacket struct {
PayloadType AvPacketPt
Timestamp uint32 // TODO(chef): 改成int64
Payload []byte
}
func (a AvPacketPt) ReadableString() string {
switch a {
case AvPacketPtUnknown:
return "unknown"
case AvPacketPtAvc:
return "avc"
return "h264"
case AvPacketPtHevc:
return "hevc"
return "h265"
case AvPacketPtAac:
return "aac"
}
return ""
}
func (packet AvPacket) IsAudio() bool {
// AvPacket
//
// 不同场景使用时,字段含义可能不同。
// 使用AvPacket的地方应注明各字段的含义。
//
type AvPacket struct {
PayloadType AvPacketPt
Timestamp uint32 // TODO(chef): 改成int64
Payload []byte
}
func (packet *AvPacket) IsAudio() bool {
return packet.PayloadType == AvPacketPtAac
}
func (packet AvPacket) IsVideo() bool {
func (packet *AvPacket) IsVideo() bool {
return packet.PayloadType == AvPacketPtAvc || packet.PayloadType == AvPacketPtHevc
}
func (packet *AvPacket) DebugString() string {
return fmt.Sprintf("[%p] type=%s, timestamp=%d, len=%d, payload=%s",
packet, packet.PayloadType.ReadableString(), packet.Timestamp, len(packet.Payload), hex.Dump(nazabytes.Prefix(packet.Payload, 32)))
}

@ -41,7 +41,7 @@ type IAvPacketStream interface {
//
// PayloadType: 类型支持avc(h264)hevc(h265)aac
//
// Timestamp: 时间戳,单位毫秒
// Timestamp: 时间戳,单位毫秒。注意是累计递增值不是单个包的duration时长。
//
// Payload: 音视频数据,格式如下
//

Loading…
Cancel
Save