diff --git a/pkg/logic/gop_cache.go b/pkg/logic/gop_cache.go index 5fb76bc..0ada1f3 100644 --- a/pkg/logic/gop_cache.go +++ b/pkg/logic/gop_cache.go @@ -20,7 +20,6 @@ import ( type LazyChunkDivider struct { message []byte header *rtmp.Header - chunks []byte } @@ -56,21 +55,17 @@ func (l *LazyRTMPMsg2FLVTag) Get() []byte { type GOPCache struct { t string uniqueKey string - gopNum int - Metadata []byte VideoSeqHeader []byte AACSeqHeader []byte - - // TODO 考虑优化成环形队列 - gopList []*GOP + gopCircularQue *gopCircularQueue } func NewGopCache(t string, uniqueKey string, gopNum int) *GOPCache { return &GOPCache{ t: t, uniqueKey: uniqueKey, - gopNum: gopNum, + gopCircularQue: NewGopCircularQueue(gopNum), } } @@ -95,43 +90,37 @@ func (gc *GOPCache) Feed(msg rtmp.AVMsg, lg LazyGet) { return } } - - if gc.gopNum != 0 { + if gc.gopCircularQue.Cap() != 0 { if msg.IsVideoKeyNalu() { var gop GOP gop.Feed(msg, lg()) - gc.gopList = append(gc.gopList, &gop) - if len(gc.gopList) > gc.gopNum { - gc.gopList = gc.gopList[1:] - } + gc.gopCircularQue.Enqueue(&gop) } else { - if len(gc.gopList) > 0 { - gc.gopList[len(gc.gopList)-1].Feed(msg, lg()) + gop := gc.gopCircularQue.Back() + if gop != nil { + gop.Feed(msg, lg()) } } } } -func (gc *GOPCache) GetFullGOP() []*GOP { - if len(gc.gopList) < 2 { - return nil - } +func (gc *GOPCache) GetGopLen() int{ + return gc.gopCircularQue.Len() +} - return gc.gopList[:len(gc.gopList)-2] +func (gc *GOPCache) GetGopAt(pos int) *GOP { + return gc.gopCircularQue.At(pos) } -func (gc *GOPCache) GetLastGOP() *GOP { - if len(gc.gopList) < 1 { - return nil - } - return gc.gopList[len(gc.gopList)-1] +func (gc *GOPCache) LastGOP() *GOP { + return gc.gopCircularQue.Back() } func (gc *GOPCache) Clear() { gc.Metadata = nil gc.VideoSeqHeader = nil gc.AACSeqHeader = nil - gc.gopList = nil + gc.gopCircularQue.Clear() } type GOP struct { @@ -141,3 +130,111 @@ type GOP struct { func (g *GOP) Feed(msg rtmp.AVMsg, b []byte) { g.data = append(g.data, b) } + +type gopCircularQueue struct { + buf []*GOP + size int + first int + last int +} + +func NewGopCircularQueue(cap int) *gopCircularQueue { + + if cap < 0 { + panic("negative cap argument in NewGopCircularQueue") + } + + size := cap + 1 + return &gopCircularQueue{ + buf: make([]*GOP, size, size), + size: size, + first: 0, + last: 0, + } +} + +//Enqueue 入队 +func (gcq *gopCircularQueue) Enqueue(gop *GOP) { + if gcq.Full() { + //队列满了,抛弃队首元素 + gcq.firstInc() + } + gcq.buf[gcq.last] = gop + gcq.lastInc() +} + +//Dequeue 队首元素出队 +func (gcq *gopCircularQueue) Dequeue() *GOP{ + if gcq.Empty() { + return nil + } + + //把first return + gop := gcq.buf[gcq.first] + gcq.firstInc() + + return gop +} + +func (gcq *gopCircularQueue) Full() bool { + return (gcq.last + 1) % gcq.size == gcq.first +} + +func (gcq *gopCircularQueue) Empty() bool{ + return gcq.first == gcq.last +} + +//Len 获取元素的个数 +func (gcq *gopCircularQueue) Len() int{ + var l int + if gcq.first > gcq.last { + l = gcq.last + gcq.size - gcq.first + } else { + l = gcq.last - gcq.first + } + + return l +} + +//Cap 获取队列的容量 +func (gcq *gopCircularQueue) Cap() int { + return gcq.size - 1 +} + +//Front 获取队首元素,不出队 +func (gcq *gopCircularQueue) Front() *GOP { + if gcq.Empty() { + return nil + } + return gcq.buf[gcq.first] +} + +//Back 获取队尾元素,不出队 +func (gcq *gopCircularQueue) Back() *GOP { + if gcq.Empty() { + return nil + } + return gcq.buf[(gcq.last + gcq.size - 1) % gcq.size] +} + +//At 获取第pos个元素 +func (gcq *gopCircularQueue) At(pos int) *GOP { + if pos >= gcq.Len() || pos < 0 { + return nil + } + return gcq.buf[(pos + gcq.first) % gcq.size] +} + +func (gcq *gopCircularQueue) Clear() { + gcq.first = 0 + gcq.last = 0 + +} + +func (gcq *gopCircularQueue) lastInc() { + gcq.last = (gcq.last + 1) % gcq.size +} + +func (gcq *gopCircularQueue) firstInc() { + gcq.first = (gcq.first + 1) % gcq.size +} diff --git a/pkg/logic/gop_cache_test.go b/pkg/logic/gop_cache_test.go new file mode 100644 index 0000000..bd8656b --- /dev/null +++ b/pkg/logic/gop_cache_test.go @@ -0,0 +1,371 @@ +package logic + +import ( + `github.com/q191201771/naza/pkg/assert` + `testing` +) + +func TestGopCircularQueueCap0(t *testing.T) { + var gcqCap int = 0 + + gcq := NewGopCircularQueue(gcqCap) + + //fl + assert.Equal(t, true, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 0, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.Front()) + assert.Equal(t, nil, gcq.Back()) + assert.Equal(t, nil, gcq.Dequeue()) + + + //fl + tGOP := &GOP{ + data:nil, + } + gcq.Enqueue(tGOP) + //fg + assert.Equal(t, true, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 0, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.Front()) + assert.Equal(t, nil, gcq.Back()) + + + //弹出一个后 + assert.Equal(t, nil, gcq.Dequeue()) + //fg + assert.Equal(t, true, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 0, gcq.Len()) + assert.Equal(t, 0, gcq.Cap()) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.At(2)) + assert.Equal(t, nil, gcq.Front()) + assert.Equal(t, nil, gcq.Back()) + assert.Equal(t, nil, gcq.Dequeue()) + + //放入2个元素后 + gcq.Enqueue(tGOP) + gcq.Enqueue(tGOP) + assert.Equal(t, true, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 0, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.Front()) + assert.Equal(t, nil, gcq.Back()) + assert.Equal(t, nil, gcq.Dequeue()) +} + +func TestGopCircularQueue(t *testing.T) { + var gcqCap int = 3 + + //元素为个数为0 + //fl,_, _, _ + gcq := NewGopCircularQueue(gcqCap) + assert.Equal(t, true, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 0, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + //gop := + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.Front()) + assert.Equal(t, nil, gcq.Back()) + assert.Equal(t, nil, gcq.Dequeue()) + + //fl,, _, _ + tGOP := &GOP{ + data:[][]byte{[]byte("t")}, + } + gcq.Enqueue(tGOP) + //放入一个元素后 + //tf,l, _, _ + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 1, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, tGOP, gcq.At(0)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, tGOP, gcq.Front()) + assert.Equal(t, tGOP, gcq.Back()) + + //在弹出一个元素后 + //tf,l, _,_ + assert.Equal(t, tGOP, gcq.Dequeue()) + //_,fl, _,_ + assert.Equal(t, true, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 0, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.Front()) + assert.Equal(t, nil, gcq.Back()) + assert.Equal(t, nil, gcq.Dequeue()) + + //_,fl, _,_ + GOP0 := &GOP{ + data:[][]byte{[]byte("0")}, + } + gcq.Enqueue(GOP0) + //_,f0,l_,_ + GOP1 := &GOP{ + data:[][]byte{[]byte("1")}, + } + gcq.Enqueue(GOP1) + //_,f0, 1, l + //验证遍历情况 + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 2, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP0, gcq.At(0)) + assert.Equal(t, GOP1, gcq.At(1)) + assert.Equal(t, nil, gcq.At(2)) + assert.Equal(t, GOP0, gcq.Front()) + assert.Equal(t, GOP1, gcq.Back()) + + GOP2 := &GOP{ + data:[][]byte{[]byte("2")}, + } + gcq.Enqueue(GOP2) + //l,f0, 1, 2 + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 3, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP0, gcq.At(0)) + assert.Equal(t, GOP1, gcq.At(1)) + assert.Equal(t, GOP2, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, GOP0, gcq.Front()) + assert.Equal(t, GOP2, gcq.Back()) + + // + GOP3 := &GOP{ + data:[][]byte{[]byte("3")}, + } + gcq.Enqueue(GOP3) + //3, l, f1, 2 + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 3, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP1, gcq.At(0)) + assert.Equal(t, GOP2, gcq.At(1)) + assert.Equal(t, GOP3, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, GOP1, gcq.Front()) + assert.Equal(t, GOP3, gcq.Back()) + + + + //3, l, f1, 2 + assert.Equal(t, GOP1, gcq.Dequeue()) + //出队后 + //3, l, _, f2 + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 2, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP2, gcq.At(0)) + assert.Equal(t, GOP3, gcq.At(1)) + assert.Equal(t, nil, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, GOP2, gcq.Front()) + assert.Equal(t, GOP3, gcq.Back()) + + + + //3, l, _, f2 + GOP4 := &GOP{ + data:[][]byte{[]byte("4")}, + } + gcq.Enqueue(GOP4) + //入队后 + //3, 4, l, f2 + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 3, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP2, gcq.At(0)) + assert.Equal(t, GOP3, gcq.At(1)) + assert.Equal(t, GOP4, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, GOP2, gcq.Front()) + assert.Equal(t, GOP4, gcq.Back()) + + //3, 4, l, 2f + assert.Equal(t, GOP2, gcq.Dequeue()) + //3f, 4, l, _ + + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 2, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP3, gcq.At(0)) + assert.Equal(t, GOP4, gcq.At(1)) + assert.Equal(t, nil, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, GOP3, gcq.Front()) + assert.Equal(t, GOP4, gcq.Back()) + + //3f, 4, l, _ + GOP5 := &GOP{ + data:[][]byte{[]byte("5")}, + } + gcq.Enqueue(GOP5) + //3f, 4, 5, l + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 3, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP3, gcq.At(0)) + assert.Equal(t, GOP4, gcq.At(1)) + assert.Equal(t, GOP5, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, GOP3, gcq.Front()) + assert.Equal(t, GOP5, gcq.Back()) + + + + //3f, 4, 5, l + assert.Equal(t, GOP3, gcq.Dequeue()) + //_, 4f, 5, l + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 2, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP4, gcq.At(0)) + assert.Equal(t, GOP5, gcq.At(1)) + assert.Equal(t, nil, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, GOP4, gcq.Front()) + assert.Equal(t, GOP5, gcq.Back()) + + //_, 4f, 5, l + assert.Equal(t, GOP4, gcq.Dequeue()) + //_, _, 5f, l + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 1, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP5, gcq.At(0)) + assert.Equal(t, nil, gcq.At(1)) + assert.Equal(t, nil, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, GOP5, gcq.Front()) + assert.Equal(t, GOP5, gcq.Back()) + + //_, _, 5f, l + assert.Equal(t, GOP5, gcq.Dequeue()) + //_, _, _, fl + assert.Equal(t, true, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 0, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.At(1)) + assert.Equal(t, nil, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, nil, gcq.Front()) + assert.Equal(t, nil, gcq.Back()) + + + //_, _, _, fl + GOP6 := &GOP{ + data:[][]byte{[]byte("6")}, + } + gcq.Enqueue(GOP6) + //l, _, _, f6 + GOP7 := &GOP{ + data:[][]byte{[]byte("7")}, + } + gcq.Enqueue(GOP7) + //7, l, _, f6 + GOP8 := &GOP{ + data:[][]byte{[]byte("8")}, + } + gcq.Enqueue(GOP8) + //7, 8, l, f6 + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 3, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP6, gcq.At(0)) + assert.Equal(t, GOP7, gcq.At(1)) + assert.Equal(t, GOP8, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, GOP6, gcq.Front()) + assert.Equal(t, GOP8, gcq.Back()) + + + GOP9 := &GOP{ + data:[][]byte{[]byte("9")}, + } + gcq.Enqueue(GOP9) + //f7, 8, 9, l + assert.Equal(t, false, gcq.Empty()) + assert.Equal(t, true, gcq.Full()) + assert.Equal(t, 3, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, GOP7, gcq.At(0)) + assert.Equal(t, GOP8, gcq.At(1)) + assert.Equal(t, GOP9, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, GOP7, gcq.Front()) + assert.Equal(t, GOP9, gcq.Back()) + + //f7, 8, 9, l + assert.Equal(t, GOP7, gcq.Dequeue()) + //_, f8, 9, l + assert.Equal(t, GOP8, gcq.Dequeue()) + //_, _, f9, l + assert.Equal(t, GOP9, gcq.Dequeue()) + //_, _, _, fl + assert.Equal(t, true, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 0, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.At(1)) + assert.Equal(t, nil, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, nil, gcq.Front()) + assert.Equal(t, nil, gcq.Back()) + + + //_, _, _, fl + assert.Equal(t, nil, gcq.Dequeue()) + //_, _, _, fl + assert.Equal(t, true, gcq.Empty()) + assert.Equal(t, false, gcq.Full()) + assert.Equal(t, 0, gcq.Len()) + assert.Equal(t, gcqCap, gcq.Cap()) + assert.Equal(t, nil, gcq.At(0)) + assert.Equal(t, nil, gcq.At(1)) + assert.Equal(t, nil, gcq.At(2)) + assert.Equal(t, nil, gcq.At(3)) + assert.Equal(t, nil, gcq.At(4)) + assert.Equal(t, nil, gcq.Front()) + assert.Equal(t, nil, gcq.Back()) +} diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 3a89769..95b2dc6 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -193,13 +193,13 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { if group.gopCache.AACSeqHeader != nil { _ = session.AsyncWrite(group.gopCache.AACSeqHeader) } - fullGOP := group.gopCache.GetFullGOP() - for _, gop := range fullGOP { - for _, item := range gop.data { + for i := 0; i < group.gopCache.GetGopLen();i++ { + for _, item := range group.gopCache.GetGopAt(i).data { _ = session.AsyncWrite(item) } } - lastGOP := group.gopCache.GetLastGOP() + + lastGOP := group.gopCache.LastGOP() if lastGOP != nil { for _, item := range lastGOP.data { _ = session.AsyncWrite(item) @@ -224,13 +224,14 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { if group.httpflvGopCache.AACSeqHeader != nil { session.WriteRawPacket(group.httpflvGopCache.AACSeqHeader) } - fullGOP := group.httpflvGopCache.GetFullGOP() - for _, gop := range fullGOP { - for _, item := range gop.data { + + for i := 0; i < group.httpflvGopCache.GetGopLen(); i++ { + for _, item := range group.httpflvGopCache.GetGopAt(i).data { session.WriteRawPacket(item) } } - lastGOP := group.httpflvGopCache.GetLastGOP() + + lastGOP := group.httpflvGopCache.LastGOP() if lastGOP != nil { for _, item := range lastGOP.data { session.WriteRawPacket(item)