From 8aea2a98ae0e0799552364761e71dea138bcbb65 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 27 Mar 2021 20:43:51 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=20rtsp=E6=8E=A8=E6=B5=81=E6=97=B6?= =?UTF-8?q?=EF=BC=8Crtp=E5=8C=85=E6=97=B6=E9=97=B4=E6=88=B3=E7=BF=BB?= =?UTF-8?q?=E8=BD=AC=E5=AF=BC=E8=87=B4=E7=9A=84=E9=94=99=E8=AF=AF=EF=BC=88?= =?UTF-8?q?=E6=AF=94=E5=A6=82hls=E4=B8=80=E7=9B=B4=E5=BC=BA=E5=88=B6?= =?UTF-8?q?=E5=88=87=E7=89=87=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rtsp/avpacket_queue.go | 87 ++++++++--- pkg/rtsp/avpacket_queue_test.go | 249 ++++++++++++++++++++++++++++++++ 2 files changed, 315 insertions(+), 21 deletions(-) create mode 100644 pkg/rtsp/avpacket_queue_test.go diff --git a/pkg/rtsp/avpacket_queue.go b/pkg/rtsp/avpacket_queue.go index 06d16a5..c15ebbd 100644 --- a/pkg/rtsp/avpacket_queue.go +++ b/pkg/rtsp/avpacket_queue.go @@ -11,6 +11,7 @@ package rtsp import ( "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/circularqueue" + "github.com/q191201771/naza/pkg/nazalog" ) // 处理音频和视频的时间戳: @@ -48,37 +49,38 @@ func (a *AVPacketQueue) Feed(pkt base.AVPacket) { case base.AVPacketPTAVC: fallthrough case base.AVPacketPTHEVC: + // 时间戳回退了 + if int64(pkt.Timestamp) < a.videoBaseTS { + nazalog.Warnf("video ts rotate. pktTS=%d, audioBaseTS=%d, videoBaseTS=%d, audioQueue=%d, videoQueue=%d", + pkt.Timestamp, a.audioBaseTS, a.videoBaseTS, a.audioQueue.Size(), a.videoQueue.Size()) + a.videoBaseTS = -1 + a.audioBaseTS = -1 + a.PopAllByForce() + } + // 第一次 if a.videoBaseTS == -1 { a.videoBaseTS = int64(pkt.Timestamp) } + // 根据基准调节 pkt.Timestamp -= uint32(a.videoBaseTS) - _ = a.videoQueue.PushBack(pkt) - if a.videoQueue.Full() { - pkt, _ := a.videoQueue.Front() - _, _ = a.videoQueue.PopFront() - ppkt := pkt.(base.AVPacket) - a.onAVPacket(ppkt) - return - } - //nazalog.Debugf("AVQ v push. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size()) + _ = a.videoQueue.PushBack(pkt) case base.AVPacketPTAAC: + if int64(pkt.Timestamp) < a.audioBaseTS { + nazalog.Warnf("audio ts rotate. pktTS=%d, audioBaseTS=%d, videoBaseTS=%d, audioQueue=%d, videoQueue=%d", + pkt.Timestamp, a.audioBaseTS, a.videoBaseTS, a.audioQueue.Size(), a.videoQueue.Size()) + a.videoBaseTS = -1 + a.audioBaseTS = -1 + a.PopAllByForce() + } if a.audioBaseTS == -1 { a.audioBaseTS = int64(pkt.Timestamp) } pkt.Timestamp -= uint32(a.audioBaseTS) - _ = a.audioQueue.PushBack(pkt) - if a.audioQueue.Full() { - pkt, _ := a.audioQueue.Front() - _, _ = a.audioQueue.PopFront() - ppkt := pkt.(base.AVPacket) - a.onAVPacket(ppkt) - return - } - //nazalog.Debugf("AVQ a push. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size()) - } //switch loop + } + // 如果音频和视频都存在,则按序输出,直到其中一个为空 for !a.audioQueue.Empty() && !a.videoQueue.Empty() { apkt, _ := a.audioQueue.Front() vpkt, _ := a.videoQueue.Front() @@ -86,12 +88,55 @@ func (a *AVPacketQueue) Feed(pkt base.AVPacket) { vvpkt := vpkt.(base.AVPacket) if aapkt.Timestamp < vvpkt.Timestamp { _, _ = a.audioQueue.PopFront() - //nazalog.Debugf("AVQ a pop. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size()) a.onAVPacket(aapkt) } else { _, _ = a.videoQueue.PopFront() - //nazalog.Debugf("AVQ v pop. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size()) a.onAVPacket(vvpkt) } } + + // 如果视频满了,则全部输出 + if a.videoQueue.Full() { + nazalog.Assert(true, a.audioQueue.Empty()) + a.popAllVideo() + return + } + + // 如果音频满了,则全部输出 + if a.audioQueue.Full() { + nazalog.Assert(true, a.videoQueue.Empty()) + a.popAllAudio() + return + } +} + +func (a *AVPacketQueue) PopAllByForce() { + if a.audioQueue.Empty() && a.videoQueue.Empty() { + // noop + } else if a.audioQueue.Empty() && !a.videoQueue.Empty() { + a.popAllVideo() + } else if !a.audioQueue.Empty() && a.videoQueue.Empty() { + a.popAllAudio() + } + + // never reach here + nazalog.Assert(false, !a.audioQueue.Empty() && !a.videoQueue.Empty()) +} + +func (a *AVPacketQueue) popAllAudio() { + for !a.audioQueue.Empty() { + pkt, _ := a.audioQueue.Front() + ppkt := pkt.(base.AVPacket) + _, _ = a.audioQueue.PopFront() + a.onAVPacket(ppkt) + } +} + +func (a *AVPacketQueue) popAllVideo() { + for !a.videoQueue.Empty() { + pkt, _ := a.videoQueue.Front() + ppkt := pkt.(base.AVPacket) + _, _ = a.videoQueue.PopFront() + a.onAVPacket(ppkt) + } } diff --git a/pkg/rtsp/avpacket_queue_test.go b/pkg/rtsp/avpacket_queue_test.go new file mode 100644 index 0000000..3258ada --- /dev/null +++ b/pkg/rtsp/avpacket_queue_test.go @@ -0,0 +1,249 @@ +package rtsp + +import ( + "bytes" + "fmt" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/assert" + "github.com/q191201771/naza/pkg/nazalog" + "testing" +) + +var ( + diffA = uint32(23) + diffV = uint32(40) +) + +var golden = []base.AVPacket{ + v(0), // 注意一个小细节,音频视频相等时,视频先输出 + a(0), + a(23), + v(40), + a(46), + a(69), + v(80), + a(92), + a(115), + v(120), + } + +func TestAVPacketQueue(t *testing.T) { + var in []base.AVPacket + + // case. 只有音频,且数量小于队列容量 + oneCase(t, []base.AVPacket{ + a(1), + }, nil) + + // case. 只有音频,且数量大于队列容量 + in = nil + for i := uint32(0); i <= maxQueueSize; i++ { + in = append(in, a(i * diffA)) + } + oneCase(t, in, in[:len(in)-1]) + + // case. 只有视频,且数量大于队列容量,只是为了测试覆盖率 + in = nil + for i := uint32(0); i <= maxQueueSize; i++ { + in = append(in, v(i * diffV)) + } + oneCase(t, in, in[:len(in)-1]) + + + // case. 最正常的数据 + oneCase(t, golden, golden[:len(golden)-1]) + + // case. 音频和视频之间不完全有序 + oneCase(t, []base.AVPacket{ + a(0), + a(23), + a(46), + a(69), + v(0), + v(40), + v(80), + v(120), + a(92), + a(115), + }, golden[:len(golden)-1]) + + // case. 起始非0,且起始不对齐 + in = nil + for _, pkt := range golden { + pkt2 := pkt + if pkt2.PayloadType == base.AVPacketPTAAC { + pkt2.Timestamp += 100 + } else { + pkt2.Timestamp += 10000 + } + in = append(in, pkt2) + } + oneCase(t, in, golden[:len(golden)-1]) + + // case. 起始非0,且起始不对齐,且乱序 + oneCase(t, []base.AVPacket{ + a(0+99), + a(23+99), + a(46+99), + a(69+99), + v(0+1234), + v(40+1234), + v(80+1234), + v(120+1234), + a(92+99), + a(115+99), + }, golden[:len(golden)-1]) + + // case. + oneCase(t, []base.AVPacket{ + a(4294967293), + v(4294967294), + a(4294967295), + }, []base.AVPacket{ + v(0), + }) + + // case. 翻转1 + // i:[ A(4294967226) V(66666) A(4294967249) V(66706) A(4294967272) A(4294967295) V(66746) A(22) A(45) V(66786) A(68) V(66826) ] + // o:[ V(0) A(0) A(23) V(40) A(46) A(69) V(80) V(0) A(0) A(23) V(40) ] + // q:[ A(46) ] + ab := uint32(4294967295-diffA*3) // max 4294967295 + vb := uint32(66666) + in = []base.AVPacket{ + a(ab), // 0: 0 + v(vb), // 0 + a(ab+diffA), // 23 + v(vb+diffV), // 40 + a(ab+diffA*2), // 46 + a(ab+diffA*3), // 69 + v(vb+diffV*2), // 80 + a(ab+diffA*4), // 92 rotate + a(ab+diffA*5), // 115 -> 23 + v(vb+diffV*3), // 120 -> 0 + a(ab+diffA*6), // 138 + v(vb+diffV*4), // 160 + } + expects := [][]base.AVPacket{ + nil, + {v(0)}, + {v(0)}, + {v(0), a(0), a(diffA)}, + {v(0), a(0), a(diffA), v(diffV)}, + {v(0), a(0), a(diffA), v(diffV)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), v(0)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), v(0)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), v(0), a(0), a(diffA), v(diffV)}, + } + for i := 0; i < len(in); i++ { + out, q := oneCase(t, in[:i+1], expects[i]) + nazalog.Infof("-----%d", i) + nazalog.Infof("i:%s", packetsReadable(in[:i+1])) + nazalog.Infof("o:%s", packetsReadable(out)) + nazalog.Infof("e:%s", packetsReadable(expects[i])) + nazalog.Infof("q:%s", packetsReadable(peekQueuePackets(q))) + } + + // case. 翻转2 + // i:[ V(4294967215) A(12345) A(12368) V(4294967255) A(12391) A(12414) V(4294967295) A(12437) A(12460) V(39) A(12483) V(79) A(12506) A(12529) ] + // o:[ V(0) A(0) A(23) V(40) A(46) A(69) V(80) A(92) A(115) V(0) A(0) A(23) V(40) ] + // q:[ A(46) ] + ab = uint32(12345) + vb = uint32(4294967295-diffV*2) // max 4294967295 + in = []base.AVPacket{ + v(vb), // 0 + a(ab), // 0 + a(ab+diffA), // 23 + v(vb+diffV), // 40 + a(ab+diffA*2), // 46 + a(ab+diffA*3), // 69 + v(vb+diffV*2), // 80 + a(ab+diffA*4), // 92 + a(ab+diffA*5), // 115 + v(vb+diffV*3), // 120 rotate + a(ab+diffA*6), // 138 -> 0 + v(vb+diffV*4), // 160 -> 40 + a(ab+diffA*7), // 161 + a(ab+diffA*8), // 184 + } + expects = [][]base.AVPacket{ + nil, + {v(0)}, + {v(0)}, + {v(0), a(0), a(diffA)}, + {v(0), a(0), a(diffA), v(diffV)}, + {v(0), a(0), a(diffA), v(diffV)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5), v(0)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5), v(0), a(0)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5), v(0), a(0), a(diffA)}, + {v(0), a(0), a(diffA), v(diffV), a(diffA*2), a(diffA*3), v(diffV*2), a(diffA*4), a(diffA*5), v(0), a(0), a(diffA), v(diffV)}, + } + for i := 0; i < len(in); i++ { + oneCase(t, in[:i+1], expects[i]) + } +} + +func a(t uint32) base.AVPacket { + return base.AVPacket{ + PayloadType:base.AVPacketPTAAC, + Timestamp:t, + } +} + +func v(t uint32) base.AVPacket { + return base.AVPacket{ + PayloadType:base.AVPacketPTAVC, + Timestamp:t, + } +} + +func oneCase(t *testing.T, in []base.AVPacket, expected []base.AVPacket) (out []base.AVPacket, q *AVPacketQueue) { + out, q = calc(in) + assert.Equal(t, expected, out) + return out, q +} + +func calc(in []base.AVPacket) (out []base.AVPacket, q *AVPacketQueue) { + q = NewAVPacketQueue(func(pkt base.AVPacket) { + out = append(out, pkt) + }) + for _, pkt := range in { + q.Feed(pkt) + } + return out, q +} + +func packetsReadable(pkts []base.AVPacket) string { + var buf bytes.Buffer + buf.WriteString("[") + for _, pkt := range pkts { + if pkt.PayloadType == base.AVPacketPTAAC { + buf.WriteString(fmt.Sprintf(" A(%d) ", pkt.Timestamp)) + } else { + buf.WriteString(fmt.Sprintf(" V(%d) ", pkt.Timestamp)) + } + } + buf.WriteString("]") + return buf.String() +} + +func peekQueuePackets(q *AVPacketQueue) []base.AVPacket { + var out []base.AVPacket + for i := 0; i < q.audioQueue.Size(); i++ { + pkt, _ := q.audioQueue.At(i) + ppkt := pkt.(base.AVPacket) + out = append(out, ppkt) + } + for i := 0; i < q.videoQueue.Size(); i++ { + pkt, _ := q.videoQueue.At(i) + ppkt := pkt.(base.AVPacket) + out = append(out, ppkt) + } + return out +}