[refactor]:将gop缓存的数据结构由切片改为环形队列

pull/8/head
lqq 5 years ago committed by yoko
parent 9630c5b3a9
commit 720085efaa

@ -20,7 +20,6 @@ import (
type LazyChunkDivider struct {
message []byte
header *rtmp.Header
chunks []byte
}
@ -56,21 +55,17 @@ func (l *LazyRTMPMsg2FLVTag) Get() []byte {
type GOPCache struct {
t string
uniqueKey string
gopNum int
Metadata []byte
VideoSeqHeader []byte
AACSeqHeader []byte
// TODO 考虑优化成环形队列
gopList []*GOP
gopCircularQue *gopCircularQueue
}
func NewGopCache(t string, uniqueKey string, gopNum int) *GOPCache {
return &GOPCache{
t: t,
uniqueKey: uniqueKey,
gopNum: gopNum,
gopCircularQue: NewGopCircularQueue(gopNum),
}
}
@ -95,43 +90,37 @@ func (gc *GOPCache) Feed(msg rtmp.AVMsg, lg LazyGet) {
return
}
}
if gc.gopNum != 0 {
if gc.gopCircularQue.Cap() != 0 {
if msg.IsVideoKeyNalu() {
var gop GOP
gop.Feed(msg, lg())
gc.gopList = append(gc.gopList, &gop)
if len(gc.gopList) > gc.gopNum {
gc.gopList = gc.gopList[1:]
}
gc.gopCircularQue.Enqueue(&gop)
} else {
if len(gc.gopList) > 0 {
gc.gopList[len(gc.gopList)-1].Feed(msg, lg())
gop := gc.gopCircularQue.Back()
if gop != nil {
gop.Feed(msg, lg())
}
}
}
}
func (gc *GOPCache) GetFullGOP() []*GOP {
if len(gc.gopList) < 2 {
return nil
}
func (gc *GOPCache) GetGopLen() int{
return gc.gopCircularQue.Len()
}
return gc.gopList[:len(gc.gopList)-2]
func (gc *GOPCache) GetGopAt(pos int) *GOP {
return gc.gopCircularQue.At(pos)
}
func (gc *GOPCache) GetLastGOP() *GOP {
if len(gc.gopList) < 1 {
return nil
}
return gc.gopList[len(gc.gopList)-1]
func (gc *GOPCache) LastGOP() *GOP {
return gc.gopCircularQue.Back()
}
func (gc *GOPCache) Clear() {
gc.Metadata = nil
gc.VideoSeqHeader = nil
gc.AACSeqHeader = nil
gc.gopList = nil
gc.gopCircularQue.Clear()
}
type GOP struct {
@ -141,3 +130,111 @@ type GOP struct {
func (g *GOP) Feed(msg rtmp.AVMsg, b []byte) {
g.data = append(g.data, b)
}
type gopCircularQueue struct {
buf []*GOP
size int
first int
last int
}
func NewGopCircularQueue(cap int) *gopCircularQueue {
if cap < 0 {
panic("negative cap argument in NewGopCircularQueue")
}
size := cap + 1
return &gopCircularQueue{
buf: make([]*GOP, size, size),
size: size,
first: 0,
last: 0,
}
}
//Enqueue 入队
func (gcq *gopCircularQueue) Enqueue(gop *GOP) {
if gcq.Full() {
//队列满了,抛弃队首元素
gcq.firstInc()
}
gcq.buf[gcq.last] = gop
gcq.lastInc()
}
//Dequeue 队首元素出队
func (gcq *gopCircularQueue) Dequeue() *GOP{
if gcq.Empty() {
return nil
}
//把first return
gop := gcq.buf[gcq.first]
gcq.firstInc()
return gop
}
func (gcq *gopCircularQueue) Full() bool {
return (gcq.last + 1) % gcq.size == gcq.first
}
func (gcq *gopCircularQueue) Empty() bool{
return gcq.first == gcq.last
}
//Len 获取元素的个数
func (gcq *gopCircularQueue) Len() int{
var l int
if gcq.first > gcq.last {
l = gcq.last + gcq.size - gcq.first
} else {
l = gcq.last - gcq.first
}
return l
}
//Cap 获取队列的容量
func (gcq *gopCircularQueue) Cap() int {
return gcq.size - 1
}
//Front 获取队首元素,不出队
func (gcq *gopCircularQueue) Front() *GOP {
if gcq.Empty() {
return nil
}
return gcq.buf[gcq.first]
}
//Back 获取队尾元素,不出队
func (gcq *gopCircularQueue) Back() *GOP {
if gcq.Empty() {
return nil
}
return gcq.buf[(gcq.last + gcq.size - 1) % gcq.size]
}
//At 获取第pos个元素
func (gcq *gopCircularQueue) At(pos int) *GOP {
if pos >= gcq.Len() || pos < 0 {
return nil
}
return gcq.buf[(pos + gcq.first) % gcq.size]
}
func (gcq *gopCircularQueue) Clear() {
gcq.first = 0
gcq.last = 0
}
func (gcq *gopCircularQueue) lastInc() {
gcq.last = (gcq.last + 1) % gcq.size
}
func (gcq *gopCircularQueue) firstInc() {
gcq.first = (gcq.first + 1) % gcq.size
}

@ -0,0 +1,371 @@
package logic
import (
`github.com/q191201771/naza/pkg/assert`
`testing`
)
func TestGopCircularQueueCap0(t *testing.T) {
var gcqCap int = 0
gcq := NewGopCircularQueue(gcqCap)
//fl
assert.Equal(t, true, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 0, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.Front())
assert.Equal(t, nil, gcq.Back())
assert.Equal(t, nil, gcq.Dequeue())
//fl
tGOP := &GOP{
data:nil,
}
gcq.Enqueue(tGOP)
//fg
assert.Equal(t, true, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 0, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.Front())
assert.Equal(t, nil, gcq.Back())
//弹出一个后
assert.Equal(t, nil, gcq.Dequeue())
//fg
assert.Equal(t, true, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 0, gcq.Len())
assert.Equal(t, 0, gcq.Cap())
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.At(2))
assert.Equal(t, nil, gcq.Front())
assert.Equal(t, nil, gcq.Back())
assert.Equal(t, nil, gcq.Dequeue())
//放入2个元素后
gcq.Enqueue(tGOP)
gcq.Enqueue(tGOP)
assert.Equal(t, true, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 0, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.Front())
assert.Equal(t, nil, gcq.Back())
assert.Equal(t, nil, gcq.Dequeue())
}
func TestGopCircularQueue(t *testing.T) {
var gcqCap int = 3
//元素为个数为0
//fl,_, _, _
gcq := NewGopCircularQueue(gcqCap)
assert.Equal(t, true, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 0, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
//gop :=
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.Front())
assert.Equal(t, nil, gcq.Back())
assert.Equal(t, nil, gcq.Dequeue())
//fl,, _, _
tGOP := &GOP{
data:[][]byte{[]byte("t")},
}
gcq.Enqueue(tGOP)
//放入一个元素后
//tf,l, _, _
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 1, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, tGOP, gcq.At(0))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, tGOP, gcq.Front())
assert.Equal(t, tGOP, gcq.Back())
//在弹出一个元素后
//tf,l, _,_
assert.Equal(t, tGOP, gcq.Dequeue())
//_,fl, _,_
assert.Equal(t, true, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 0, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.Front())
assert.Equal(t, nil, gcq.Back())
assert.Equal(t, nil, gcq.Dequeue())
//_,fl, _,_
GOP0 := &GOP{
data:[][]byte{[]byte("0")},
}
gcq.Enqueue(GOP0)
//_,f0,l_,_
GOP1 := &GOP{
data:[][]byte{[]byte("1")},
}
gcq.Enqueue(GOP1)
//_,f0, 1, l
//验证遍历情况
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 2, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP0, gcq.At(0))
assert.Equal(t, GOP1, gcq.At(1))
assert.Equal(t, nil, gcq.At(2))
assert.Equal(t, GOP0, gcq.Front())
assert.Equal(t, GOP1, gcq.Back())
GOP2 := &GOP{
data:[][]byte{[]byte("2")},
}
gcq.Enqueue(GOP2)
//l,f0, 1, 2
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 3, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP0, gcq.At(0))
assert.Equal(t, GOP1, gcq.At(1))
assert.Equal(t, GOP2, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, GOP0, gcq.Front())
assert.Equal(t, GOP2, gcq.Back())
//
GOP3 := &GOP{
data:[][]byte{[]byte("3")},
}
gcq.Enqueue(GOP3)
//3, l, f1, 2
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 3, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP1, gcq.At(0))
assert.Equal(t, GOP2, gcq.At(1))
assert.Equal(t, GOP3, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, GOP1, gcq.Front())
assert.Equal(t, GOP3, gcq.Back())
//3, l, f1, 2
assert.Equal(t, GOP1, gcq.Dequeue())
//出队后
//3, l, _, f2
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 2, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP2, gcq.At(0))
assert.Equal(t, GOP3, gcq.At(1))
assert.Equal(t, nil, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, GOP2, gcq.Front())
assert.Equal(t, GOP3, gcq.Back())
//3, l, _, f2
GOP4 := &GOP{
data:[][]byte{[]byte("4")},
}
gcq.Enqueue(GOP4)
//入队后
//3, 4, l, f2
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 3, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP2, gcq.At(0))
assert.Equal(t, GOP3, gcq.At(1))
assert.Equal(t, GOP4, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, GOP2, gcq.Front())
assert.Equal(t, GOP4, gcq.Back())
//3, 4, l, 2f
assert.Equal(t, GOP2, gcq.Dequeue())
//3f, 4, l, _
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 2, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP3, gcq.At(0))
assert.Equal(t, GOP4, gcq.At(1))
assert.Equal(t, nil, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, GOP3, gcq.Front())
assert.Equal(t, GOP4, gcq.Back())
//3f, 4, l, _
GOP5 := &GOP{
data:[][]byte{[]byte("5")},
}
gcq.Enqueue(GOP5)
//3f, 4, 5, l
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 3, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP3, gcq.At(0))
assert.Equal(t, GOP4, gcq.At(1))
assert.Equal(t, GOP5, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, GOP3, gcq.Front())
assert.Equal(t, GOP5, gcq.Back())
//3f, 4, 5, l
assert.Equal(t, GOP3, gcq.Dequeue())
//_, 4f, 5, l
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 2, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP4, gcq.At(0))
assert.Equal(t, GOP5, gcq.At(1))
assert.Equal(t, nil, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, GOP4, gcq.Front())
assert.Equal(t, GOP5, gcq.Back())
//_, 4f, 5, l
assert.Equal(t, GOP4, gcq.Dequeue())
//_, _, 5f, l
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 1, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP5, gcq.At(0))
assert.Equal(t, nil, gcq.At(1))
assert.Equal(t, nil, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, GOP5, gcq.Front())
assert.Equal(t, GOP5, gcq.Back())
//_, _, 5f, l
assert.Equal(t, GOP5, gcq.Dequeue())
//_, _, _, fl
assert.Equal(t, true, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 0, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.At(1))
assert.Equal(t, nil, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, nil, gcq.Front())
assert.Equal(t, nil, gcq.Back())
//_, _, _, fl
GOP6 := &GOP{
data:[][]byte{[]byte("6")},
}
gcq.Enqueue(GOP6)
//l, _, _, f6
GOP7 := &GOP{
data:[][]byte{[]byte("7")},
}
gcq.Enqueue(GOP7)
//7, l, _, f6
GOP8 := &GOP{
data:[][]byte{[]byte("8")},
}
gcq.Enqueue(GOP8)
//7, 8, l, f6
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 3, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP6, gcq.At(0))
assert.Equal(t, GOP7, gcq.At(1))
assert.Equal(t, GOP8, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, GOP6, gcq.Front())
assert.Equal(t, GOP8, gcq.Back())
GOP9 := &GOP{
data:[][]byte{[]byte("9")},
}
gcq.Enqueue(GOP9)
//f7, 8, 9, l
assert.Equal(t, false, gcq.Empty())
assert.Equal(t, true, gcq.Full())
assert.Equal(t, 3, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, GOP7, gcq.At(0))
assert.Equal(t, GOP8, gcq.At(1))
assert.Equal(t, GOP9, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, GOP7, gcq.Front())
assert.Equal(t, GOP9, gcq.Back())
//f7, 8, 9, l
assert.Equal(t, GOP7, gcq.Dequeue())
//_, f8, 9, l
assert.Equal(t, GOP8, gcq.Dequeue())
//_, _, f9, l
assert.Equal(t, GOP9, gcq.Dequeue())
//_, _, _, fl
assert.Equal(t, true, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 0, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.At(1))
assert.Equal(t, nil, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, nil, gcq.Front())
assert.Equal(t, nil, gcq.Back())
//_, _, _, fl
assert.Equal(t, nil, gcq.Dequeue())
//_, _, _, fl
assert.Equal(t, true, gcq.Empty())
assert.Equal(t, false, gcq.Full())
assert.Equal(t, 0, gcq.Len())
assert.Equal(t, gcqCap, gcq.Cap())
assert.Equal(t, nil, gcq.At(0))
assert.Equal(t, nil, gcq.At(1))
assert.Equal(t, nil, gcq.At(2))
assert.Equal(t, nil, gcq.At(3))
assert.Equal(t, nil, gcq.At(4))
assert.Equal(t, nil, gcq.Front())
assert.Equal(t, nil, gcq.Back())
}

@ -193,13 +193,13 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
if group.gopCache.AACSeqHeader != nil {
_ = session.AsyncWrite(group.gopCache.AACSeqHeader)
}
fullGOP := group.gopCache.GetFullGOP()
for _, gop := range fullGOP {
for _, item := range gop.data {
for i := 0; i < group.gopCache.GetGopLen();i++ {
for _, item := range group.gopCache.GetGopAt(i).data {
_ = session.AsyncWrite(item)
}
}
lastGOP := group.gopCache.GetLastGOP()
lastGOP := group.gopCache.LastGOP()
if lastGOP != nil {
for _, item := range lastGOP.data {
_ = session.AsyncWrite(item)
@ -224,13 +224,14 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
if group.httpflvGopCache.AACSeqHeader != nil {
session.WriteRawPacket(group.httpflvGopCache.AACSeqHeader)
}
fullGOP := group.httpflvGopCache.GetFullGOP()
for _, gop := range fullGOP {
for _, item := range gop.data {
for i := 0; i < group.httpflvGopCache.GetGopLen(); i++ {
for _, item := range group.httpflvGopCache.GetGopAt(i).data {
session.WriteRawPacket(item)
}
}
lastGOP := group.httpflvGopCache.GetLastGOP()
lastGOP := group.httpflvGopCache.LastGOP()
if lastGOP != nil {
for _, item := range lastGOP.data {
session.WriteRawPacket(item)

Loading…
Cancel
Save