[feat] 为rtmp pub推流添加静音AAC音频(可动态检测是否需要添加;配置文件中可开启或关闭这个功能) (#56)

pull/114/head
q191201771 3 years ago
parent 03c459a024
commit 2c913f41e3

1
.gitignore vendored

@ -1,3 +1,4 @@
delay.txt
profile.out
coverage.html
*.aac

@ -1,11 +1,13 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.2.2",
"conf_version": "v0.2.3",
"rtmp": {
"enable": true,
"addr": ":1935",
"gop_num": 0,
"merge_write_size": 0
"merge_write_size": 0,
"add_dummy_audio_enable": false,
"add_dummy_audio_wait_audio_ms": 150
},
"default_http": {
"http_listen_addr": ":8080",

@ -48,8 +48,8 @@ func MakeAudioDataSeqHeaderWithAsc(asc []byte) (out []byte, err error) {
return nil, ErrAac
}
// 注意前两个字节是SequenceHeaderContext后面跟着asc
out = make([]byte, 2+len(asc))
// <spec-video_file_format_spec_v10.pdf>, <Audio tags, AUDIODATA>, <page 10/48>
out[0] = 0xaf
out[1] = 0
copy(out[2:], asc)

@ -54,6 +54,8 @@ type RtmpConfig struct {
Addr string `json:"addr"`
GopNum int `json:"gop_num"`
MergeWriteSize int `json:"merge_write_size"`
AddDummyAudioEnable bool `json:"add_dummy_audio_enable"`
AddDummyAudioWaitAudioMs int `json:"add_dummy_audio_wait_audio_ms"`
}
type DefaultHttpConfig struct {

@ -0,0 +1,211 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
"math"
)
const dummyAudioFilterStageAnalysis = 1
const dummyAudioFilterStageNormal = 2
const dummyAudioFilterStageDummy = 3
type DummyAudioFilter struct {
uk string
waitAudioMs int
onPop rtmp.OnReadRtmpAvMsg
stage int
earlyStageQueue []base.RtmpMsg
firstVideoTs uint32
prevAudioTs uint32
audioCount int
}
// NewDummyAudioFilter 检测输入的rtmp流中是否有音频如果有则原样返回如果没有则制造静音音频数据叠加在rtmp流里面
//
// @param waitAudioMs 等待音频数据时间,如果超出这个时间还没有接收到音频数据,则开始制造静音数据
// @param onPop 注意,所有回调都发生在输入函数调用中
//
func NewDummyAudioFilter(uk string, waitAudioMs int, onPop rtmp.OnReadRtmpAvMsg) *DummyAudioFilter {
return &DummyAudioFilter{
uk: uk,
waitAudioMs: waitAudioMs,
onPop: onPop,
stage: dummyAudioFilterStageAnalysis,
firstVideoTs: math.MaxUint32,
prevAudioTs: math.MaxUint32,
}
}
func (filter *DummyAudioFilter) OnReadRtmpAvMsg(msg base.RtmpMsg) {
filter.Feed(msg)
}
func (filter *DummyAudioFilter) Feed(msg base.RtmpMsg) {
//nazalog.Debugf("trace_DummyAudioFilter, push. header=%+v, payload=%s, stage=%d", msg.Header, hex.EncodeToString(nazastring.SubSliceSafety(msg.Payload, 8)), filter.stage)
switch filter.stage {
case dummyAudioFilterStageAnalysis:
filter.handleAnalysisStage(msg)
case dummyAudioFilterStageNormal:
filter.handleNormalStage(msg)
case dummyAudioFilterStageDummy:
filter.handleDummyStage(msg)
}
}
// 初始阶段,分析是否存在音频
func (filter *DummyAudioFilter) handleAnalysisStage(msg base.RtmpMsg) {
switch msg.Header.MsgTypeId {
case base.RtmpTypeIdMetadata:
// metadata直接入队列
filter.cache(msg)
case base.RtmpTypeIdAudio:
// 原始流中存在音频将所有缓存数据出队列进入normal stage
for i := range filter.earlyStageQueue {
filter.onPopProxy(filter.earlyStageQueue[i])
}
filter.onPopProxy(msg)
filter.stage = dummyAudioFilterStageNormal
case base.RtmpTypeIdVideo:
// 分析视频数据累计时长是否达到阈值
// 注意为了避免seq header的时间戳和视频帧不是线性的0或其他特殊的值我们直接入队列并跳过
if msg.IsVideoKeySeqHeader() {
filter.cache(msg)
return
}
// 记录首个视频帧的时间戳
if filter.firstVideoTs == math.MaxUint32 {
filter.cache(msg)
filter.firstVideoTs = msg.Header.TimestampAbs
return
}
// 没有达到阈值
if msg.Header.TimestampAbs-filter.firstVideoTs < uint32(filter.waitAudioMs) {
filter.cache(msg)
return
}
// 达到阈值
nazalog.Debugf("[%s] start make dummy audio.", filter.uk)
filter.stage = dummyAudioFilterStageDummy
for i := range filter.earlyStageQueue {
filter.handleDummyStage(filter.earlyStageQueue[i])
}
filter.clearCache()
filter.handleDummyStage(msg)
}
}
// 原始流中存在音频
func (filter *DummyAudioFilter) handleNormalStage(msg base.RtmpMsg) {
filter.onPopProxy(msg)
}
// 原始流中不存在音频
func (filter *DummyAudioFilter) handleDummyStage(msg base.RtmpMsg) {
if msg.Header.MsgTypeId == base.RtmpTypeIdAudio {
// 由于我们已经开始制造静音包了,静音包的编码参数可能会和实际音频参数不一致,所以我们只能过滤掉原始的音频数据了
nazalog.Warnf("[%s] recv audio but we are making dummy audio.", filter.uk)
return
}
if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
filter.onPopProxy(msg)
return
}
if msg.IsVideoKeySeqHeader() {
// TODO(chef): 这里的时间戳可以考虑减1但是注意处理一些边界条件
ats := msg.Header.TimestampAbs
amsg := filter.makeAudioSeqHeader(ats)
filter.onPopProxy(amsg)
filter.onPopProxy(msg)
return
}
if filter.prevAudioTs == math.MaxUint32 {
ats := msg.Header.TimestampAbs
amsg := filter.makeOneAudio(ats)
filter.onPopProxy(amsg)
filter.onPopProxy(msg)
filter.prevAudioTs = ats
} else {
for {
ats := filter.prevAudioTs + filter.calcAudioDurationMs()
if ats > msg.Header.TimestampAbs {
break
}
amsg := filter.makeOneAudio(ats)
filter.onPopProxy(amsg)
filter.prevAudioTs = ats
}
filter.onPopProxy(msg)
}
}
func (filter *DummyAudioFilter) cache(msg base.RtmpMsg) {
filter.earlyStageQueue = append(filter.earlyStageQueue, msg.Clone())
}
func (filter *DummyAudioFilter) clearCache() {
filter.earlyStageQueue = nil
}
func (filter *DummyAudioFilter) onPopProxy(msg base.RtmpMsg) {
//nazalog.Debugf("trace_DummyAudioFilter, onPopProxy. header=%+v, payload=%s, stage=%d", msg.Header, hex.EncodeToString(nazastring.SubSliceSafety(msg.Payload, 8)), filter.stage)
if filter.onPop != nil {
filter.onPop(msg)
}
}
func (filter *DummyAudioFilter) makeAudioSeqHeader(ts uint32) base.RtmpMsg {
// aac (LC), 48000 Hz, stereo, fltp
return base.RtmpMsg{
Header: base.RtmpHeader{
Csid: rtmp.CsidAudio,
MsgLen: 4,
MsgTypeId: base.RtmpTypeIdAudio,
MsgStreamId: rtmp.Msid1,
TimestampAbs: ts,
},
Payload: []byte{0xaf, 0x00, 0x11, 0x90},
}
}
func (filter *DummyAudioFilter) makeOneAudio(ts uint32) base.RtmpMsg {
filter.audioCount++
return base.RtmpMsg{
Header: base.RtmpHeader{
Csid: rtmp.CsidAudio,
MsgLen: 8,
MsgTypeId: base.RtmpTypeIdAudio,
MsgStreamId: rtmp.Msid1,
TimestampAbs: ts,
},
// 注意前面2字节是seq header头部信息后面6个字节是AAC静音包
Payload: []byte{0xaf, 0x01, 0x21, 0x10, 0x04, 0x60, 0x8c, 0x1c},
}
}
func (filter *DummyAudioFilter) calcAudioDurationMs() uint32 {
v := filter.audioCount % 3
if v == 1 || v == 2 {
return 21
}
return 22
}

@ -0,0 +1,129 @@
package logic_test
import (
"encoding/hex"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/naza/pkg/assert"
"github.com/q191201771/naza/pkg/nazalog"
"strconv"
"strings"
"testing"
)
func TestDummyAudioFilter(t *testing.T) {
// case1 一个音视频都有的流
{
in := []base.RtmpMsg{
helperUnpackRtmpMsg("header={Csid:4 MsgLen:378 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0}, payload=02000d40"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:48 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0}, payload=17000000"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:7 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af001210"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:26 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af01de04"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1170 MsgTypeId:9 MsgStreamId:1 TimestampAbs:23}, payload=17010000"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:23}, payload=af012110"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:46}, payload=af012120"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:849 MsgTypeId:8 MsgStreamId:1 TimestampAbs:69}, payload=af01214c"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:372 MsgTypeId:9 MsgStreamId:1 TimestampAbs:90}, payload=27010000"),
}
var out []base.RtmpMsg
filter := logic.NewDummyAudioFilter("test1", 150, func(msg base.RtmpMsg) {
out = append(out, msg)
})
//filter.Feed(helperUnpackRtmpMsg(""))
for i := 0; i <= 1; i++ {
filter.Feed(in[i])
assert.Equal(t, nil, out)
}
for i := 2; i < len(in); i++ {
filter.Feed(in[i])
assert.Equal(t, in[:i+1], out)
}
}
// case2 一个只有视频的流
{
in := []base.RtmpMsg{
helperUnpackRtmpMsg("header={Csid:4 MsgLen:269 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0}, payload=02000d4073657444"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:48 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0}, payload=1700000000016400"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1170 MsgTypeId:9 MsgStreamId:1 TimestampAbs:23}, payload=1701000000000002"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:372 MsgTypeId:9 MsgStreamId:1 TimestampAbs:90}, payload=2701000000000001"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1226 MsgTypeId:9 MsgStreamId:1 TimestampAbs:156}, payload=2701000000000004"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1541 MsgTypeId:9 MsgStreamId:1 TimestampAbs:223}, payload=2701000000000005"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1931 MsgTypeId:9 MsgStreamId:1 TimestampAbs:290}, payload=2701000000000005"),
}
var out []base.RtmpMsg
filter := logic.NewDummyAudioFilter("test1", 150, func(msg base.RtmpMsg) {
out = append(out, msg)
})
for i := 0; i <= 4; i++ {
filter.Feed(in[i])
assert.Equal(t, nil, out)
}
filter.Feed(in[5])
assert.Equal(t, 17, len(out))
assert.Equal(t, in[0], out[0])
assert.Equal(t, helperUnpackRtmpMsg("header={Csid:6 MsgLen:4 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af001190"), out[1])
assert.Equal(t, in[1], out[2])
assert.Equal(t, helperUnpackRtmpMsg("header={Csid:6 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:215}, payload=af01211004608c1c"), out[15])
assert.Equal(t, in[5], out[16])
filter.Feed(in[6])
assert.Equal(t, 21, len(out))
assert.Equal(t, helperUnpackRtmpMsg("header={Csid:6 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:236}, payload=af01211004608c1c"), out[17])
assert.Equal(t, in[6], out[20])
}
// case3 一个只有音频的流
{
in := []base.RtmpMsg{
helperUnpackRtmpMsg("header={Csid:4 MsgLen:278 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0}, payload=02000d4073657444"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:7 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af00121056e500"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:26 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af01de04004c6176"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:23}, payload=af01211004608c1c"),
}
var out []base.RtmpMsg
filter := logic.NewDummyAudioFilter("test1", 150, func(msg base.RtmpMsg) {
out = append(out, msg)
})
filter.Feed(in[0])
assert.Equal(t, nil, out)
for i := 1; i <= 3; i++ {
filter.Feed(in[i])
assert.Equal(t, in[:i+1], out)
}
}
}
// @param logstr e.g. "header={Csid:4 MsgLen:378 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0}"
///
func helperUnpackRtmpMsg(logstr string) base.RtmpMsg {
var fetchItemFn = func(str string, prefix string, suffix string) string {
b := strings.Index(str, prefix)
if suffix == "" {
return str[b+len(prefix):]
}
e := strings.Index(str[b:], suffix)
return str[b+len(prefix) : b+e]
}
var fetchIntItemFn = func(str string, prefix string, suffix string) int {
ret, err := strconv.Atoi(fetchItemFn(str, prefix, suffix))
nazalog.Assert(nil, err)
return ret
}
var header base.RtmpHeader
header.Csid = fetchIntItemFn(logstr, "Csid:", " ")
header.MsgLen = uint32(fetchIntItemFn(logstr, "MsgLen:", " "))
header.MsgTypeId = uint8(fetchIntItemFn(logstr, "MsgTypeId:", " "))
header.MsgStreamId = fetchIntItemFn(logstr, "MsgStreamId:", " ")
header.TimestampAbs = uint32(fetchIntItemFn(logstr, "TimestampAbs:", "}"))
hexStr := fetchItemFn(logstr, "payload=", "")
payload, err := hex.DecodeString(hexStr)
nazalog.Assert(nil, err)
return base.RtmpMsg{
Header: header,
Payload: payload,
}
}

@ -77,6 +77,8 @@ type Group struct {
// rtmp pub/pull使用
rtmpGopCache *GopCache
httpflvGopCache *GopCache
// rtmp pub使用
dummyAudioFilter *DummyAudioFilter
// rtmp sub使用
rtmpBufWriter base.IBufWriter // TODO(chef): 后面可以在业务层加一个定时Flush
// mpegts使用
@ -317,7 +319,14 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) bool {
)
}
// TODO(chef): 为rtmp pull以及rtsp也添加叠加静音音频的功能
if config.RtmpConfig.AddDummyAudioEnable {
// TODO(chef): 从整体控制和锁关系来说应该让pub的数据回调到group中进锁后再让数据流入filter
group.dummyAudioFilter = NewDummyAudioFilter(group.UniqueKey, config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg)
session.SetPubSessionObserver(group.dummyAudioFilter)
} else {
session.SetPubSessionObserver(group)
}
return true
}
@ -640,6 +649,7 @@ func (group *Group) delRtmpPubSession(session *rtmp.ServerSession) {
group.rtmpPubSession = nil
group.rtmp2RtspRemuxer = nil
group.dummyAudioFilter = nil
group.delIn()
}
@ -1041,7 +1051,7 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
nazalog.Debugf("[%s] [%s] write gop cahe. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount)
nazalog.Debugf("[%s] [%s] write gop cache. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount)
}
for i := 0; i < gopCount; i++ {
for _, item := range group.rtmpGopCache.GetGopDataAt(i) {

Loading…
Cancel
Save