[fix] rtsp推流时,rtp包时间戳翻转导致的错误(比如hls一直强制切片)

pull/49/head
q191201771 4 years ago
parent 9bb3dac3d1
commit 8aea2a98ae

@ -11,6 +11,7 @@ package rtsp
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/circularqueue"
"github.com/q191201771/naza/pkg/nazalog"
)
// 处理音频和视频的时间戳:
@ -48,37 +49,38 @@ func (a *AVPacketQueue) Feed(pkt base.AVPacket) {
case base.AVPacketPTAVC:
fallthrough
case base.AVPacketPTHEVC:
// 时间戳回退了
if int64(pkt.Timestamp) < a.videoBaseTS {
nazalog.Warnf("video ts rotate. pktTS=%d, audioBaseTS=%d, videoBaseTS=%d, audioQueue=%d, videoQueue=%d",
pkt.Timestamp, a.audioBaseTS, a.videoBaseTS, a.audioQueue.Size(), a.videoQueue.Size())
a.videoBaseTS = -1
a.audioBaseTS = -1
a.PopAllByForce()
}
// 第一次
if a.videoBaseTS == -1 {
a.videoBaseTS = int64(pkt.Timestamp)
}
// 根据基准调节
pkt.Timestamp -= uint32(a.videoBaseTS)
_ = a.videoQueue.PushBack(pkt)
if a.videoQueue.Full() {
pkt, _ := a.videoQueue.Front()
_, _ = a.videoQueue.PopFront()
ppkt := pkt.(base.AVPacket)
a.onAVPacket(ppkt)
return
}
//nazalog.Debugf("AVQ v push. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size())
_ = a.videoQueue.PushBack(pkt)
case base.AVPacketPTAAC:
if int64(pkt.Timestamp) < a.audioBaseTS {
nazalog.Warnf("audio ts rotate. pktTS=%d, audioBaseTS=%d, videoBaseTS=%d, audioQueue=%d, videoQueue=%d",
pkt.Timestamp, a.audioBaseTS, a.videoBaseTS, a.audioQueue.Size(), a.videoQueue.Size())
a.videoBaseTS = -1
a.audioBaseTS = -1
a.PopAllByForce()
}
if a.audioBaseTS == -1 {
a.audioBaseTS = int64(pkt.Timestamp)
}
pkt.Timestamp -= uint32(a.audioBaseTS)
_ = a.audioQueue.PushBack(pkt)
if a.audioQueue.Full() {
pkt, _ := a.audioQueue.Front()
_, _ = a.audioQueue.PopFront()
ppkt := pkt.(base.AVPacket)
a.onAVPacket(ppkt)
return
}
//nazalog.Debugf("AVQ a push. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size())
} //switch loop
}
// 如果音频和视频都存在,则按序输出,直到其中一个为空
for !a.audioQueue.Empty() && !a.videoQueue.Empty() {
apkt, _ := a.audioQueue.Front()
vpkt, _ := a.videoQueue.Front()
@ -86,12 +88,55 @@ func (a *AVPacketQueue) Feed(pkt base.AVPacket) {
vvpkt := vpkt.(base.AVPacket)
if aapkt.Timestamp < vvpkt.Timestamp {
_, _ = a.audioQueue.PopFront()
//nazalog.Debugf("AVQ a pop. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size())
a.onAVPacket(aapkt)
} else {
_, _ = a.videoQueue.PopFront()
//nazalog.Debugf("AVQ v pop. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size())
a.onAVPacket(vvpkt)
}
}
// 如果视频满了,则全部输出
if a.videoQueue.Full() {
nazalog.Assert(true, a.audioQueue.Empty())
a.popAllVideo()
return
}
// 如果音频满了,则全部输出
if a.audioQueue.Full() {
nazalog.Assert(true, a.videoQueue.Empty())
a.popAllAudio()
return
}
}
func (a *AVPacketQueue) PopAllByForce() {
if a.audioQueue.Empty() && a.videoQueue.Empty() {
// noop
} else if a.audioQueue.Empty() && !a.videoQueue.Empty() {
a.popAllVideo()
} else if !a.audioQueue.Empty() && a.videoQueue.Empty() {
a.popAllAudio()
}
// never reach here
nazalog.Assert(false, !a.audioQueue.Empty() && !a.videoQueue.Empty())
}
func (a *AVPacketQueue) popAllAudio() {
for !a.audioQueue.Empty() {
pkt, _ := a.audioQueue.Front()
ppkt := pkt.(base.AVPacket)
_, _ = a.audioQueue.PopFront()
a.onAVPacket(ppkt)
}
}
func (a *AVPacketQueue) popAllVideo() {
for !a.videoQueue.Empty() {
pkt, _ := a.videoQueue.Front()
ppkt := pkt.(base.AVPacket)
_, _ = a.videoQueue.PopFront()
a.onAVPacket(ppkt)
}
}

@ -0,0 +1,249 @@
package rtsp
import (
"bytes"
"fmt"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/assert"
"github.com/q191201771/naza/pkg/nazalog"
"testing"
)
var (
diffA = uint32(23)
diffV = uint32(40)
)
var golden = []base.AVPacket{
v(0), // 注意一个小细节,音频视频相等时,视频先输出
a(0),
a(23),
v(40),
a(46),
a(69),
v(80),
a(92),
a(115),
v(120),
}
func TestAVPacketQueue(t *testing.T) {
var in []base.AVPacket
// case. 只有音频,且数量小于队列容量
oneCase(t, []base.AVPacket{
a(1),
}, nil)
// case. 只有音频,且数量大于队列容量
in = nil
for i := uint32(0); i <= maxQueueSize; i++ {
in = append(in, a(i * diffA))
}
oneCase(t, in, in[:len(in)-1])
// case. 只有视频,且数量大于队列容量,只是为了测试覆盖率
in = nil
for i := uint32(0); i <= maxQueueSize; i++ {
in = append(in, v(i * diffV))
}
oneCase(t, in, in[:len(in)-1])
// case. 最正常的数据
oneCase(t, golden, golden[:len(golden)-1])
// case. 音频和视频之间不完全有序
oneCase(t, []base.AVPacket{
a(0),
a(23),
a(46),
a(69),
v(0),
v(40),
v(80),
v(120),
a(92),
a(115),
}, golden[:len(golden)-1])
// case. 起始非0且起始不对齐
in = nil
for _, pkt := range golden {
pkt2 := pkt
if pkt2.PayloadType == base.AVPacketPTAAC {
pkt2.Timestamp += 100
} else {
pkt2.Timestamp += 10000
}
in = append(in, pkt2)
}
oneCase(t, in, golden[:len(golden)-1])
// case. 起始非0且起始不对齐且乱序
oneCase(t, []base.AVPacket{
a(0+99),
a(23+99),
a(46+99),
a(69+99),
v(0+1234),
v(40+1234),
v(80+1234),
v(120+1234),
a(92+99),
a(115+99),
}, golden[:len(golden)-1])
// case.
oneCase(t, []base.AVPacket{
a(4294967293),
v(4294967294),
a(4294967295),
}, []base.AVPacket{
v(0),
})
// case. 翻转1
// i:[ A(4294967226) V(66666) A(4294967249) V(66706) A(4294967272) A(4294967295) V(66746) A(22) A(45) V(66786) A(68) V(66826) ]
// o:[ V(0) A(0) A(23) V(40) A(46) A(69) V(80) V(0) A(0) A(23) V(40) ]
// q:[ A(46) ]
ab := uint32(4294967295-diffA*3) // max 4294967295
vb := uint32(66666)
in = []base.AVPacket{
a(ab), // 0: 0
v(vb), // 0
a(ab+diffA), // 23
v(vb+diffV), // 40
a(ab+diffA*2), // 46
a(ab+diffA*3), // 69
v(vb+diffV*2), // 80
a(ab+diffA*4), // 92 rotate
a(ab+diffA*5), // 115 -> 23
v(vb+diffV*3), // 120 -> 0
a(ab+diffA*6), // 138
v(vb+diffV*4), // 160
}
expects := [][]base.AVPacket{
nil,
{v(0)},
{v(0)},
{v(0), a(0), a(diffA)},
{v(0), a(0), a(diffA), v(diffV)},
{v(0), a(0), a(diffA), v(diffV)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), v(0)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), v(0)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), v(0), a(0), a(diffA), v(diffV)},
}
for i := 0; i < len(in); i++ {
out, q := oneCase(t, in[:i+1], expects[i])
nazalog.Infof("-----%d", i)
nazalog.Infof("i:%s", packetsReadable(in[:i+1]))
nazalog.Infof("o:%s", packetsReadable(out))
nazalog.Infof("e:%s", packetsReadable(expects[i]))
nazalog.Infof("q:%s", packetsReadable(peekQueuePackets(q)))
}
// case. 翻转2
// i:[ V(4294967215) A(12345) A(12368) V(4294967255) A(12391) A(12414) V(4294967295) A(12437) A(12460) V(39) A(12483) V(79) A(12506) A(12529) ]
// o:[ V(0) A(0) A(23) V(40) A(46) A(69) V(80) A(92) A(115) V(0) A(0) A(23) V(40) ]
// q:[ A(46) ]
ab = uint32(12345)
vb = uint32(4294967295-diffV*2) // max 4294967295
in = []base.AVPacket{
v(vb), // 0
a(ab), // 0
a(ab+diffA), // 23
v(vb+diffV), // 40
a(ab+diffA*2), // 46
a(ab+diffA*3), // 69
v(vb+diffV*2), // 80
a(ab+diffA*4), // 92
a(ab+diffA*5), // 115
v(vb+diffV*3), // 120 rotate
a(ab+diffA*6), // 138 -> 0
v(vb+diffV*4), // 160 -> 40
a(ab+diffA*7), // 161
a(ab+diffA*8), // 184
}
expects = [][]base.AVPacket{
nil,
{v(0)},
{v(0)},
{v(0), a(0), a(diffA)},
{v(0), a(0), a(diffA), v(diffV)},
{v(0), a(0), a(diffA), v(diffV)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5), v(0)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5), v(0), a(0)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5), v(0), a(0), a(diffA)},
{v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5), v(0), a(0), a(diffA), v(diffV)},
}
for i := 0; i < len(in); i++ {
oneCase(t, in[:i+1], expects[i])
}
}
func a(t uint32) base.AVPacket {
return base.AVPacket{
PayloadType:base.AVPacketPTAAC,
Timestamp:t,
}
}
func v(t uint32) base.AVPacket {
return base.AVPacket{
PayloadType:base.AVPacketPTAVC,
Timestamp:t,
}
}
func oneCase(t *testing.T, in []base.AVPacket, expected []base.AVPacket) (out []base.AVPacket, q *AVPacketQueue) {
out, q = calc(in)
assert.Equal(t, expected, out)
return out, q
}
func calc(in []base.AVPacket) (out []base.AVPacket, q *AVPacketQueue) {
q = NewAVPacketQueue(func(pkt base.AVPacket) {
out = append(out, pkt)
})
for _, pkt := range in {
q.Feed(pkt)
}
return out, q
}
func packetsReadable(pkts []base.AVPacket) string {
var buf bytes.Buffer
buf.WriteString("[")
for _, pkt := range pkts {
if pkt.PayloadType == base.AVPacketPTAAC {
buf.WriteString(fmt.Sprintf(" A(%d) ", pkt.Timestamp))
} else {
buf.WriteString(fmt.Sprintf(" V(%d) ", pkt.Timestamp))
}
}
buf.WriteString("]")
return buf.String()
}
func peekQueuePackets(q *AVPacketQueue) []base.AVPacket {
var out []base.AVPacket
for i := 0; i < q.audioQueue.Size(); i++ {
pkt, _ := q.audioQueue.At(i)
ppkt := pkt.(base.AVPacket)
out = append(out, ppkt)
}
for i := 0; i < q.videoQueue.Size(); i++ {
pkt, _ := q.videoQueue.At(i)
ppkt := pkt.(base.AVPacket)
out = append(out, ppkt)
}
return out
}
Loading…
Cancel
Save