[feature]: record video fps in group for rtmp pub&pull

pull/283/head
HustCoderHu 2 years ago
parent 863ef3e110
commit 9681a530b7

@ -44,6 +44,13 @@ type StatGroup struct {
StatPub StatPub `json:"pub"`
StatSubs []StatSub `json:"subs"` // TODO(chef): [opt] 增加数量字段,因为这里不一定全部放入
StatPull StatPull `json:"pull"`
Fps []RecordPerSec `json:"in_frame_per_sec"`
}
type RecordPerSec struct {
UnixSec int64 `json:"unix_sec"`
V uint32 `json:"v"`
}
type StatSession struct {
@ -75,6 +82,11 @@ type StatPull struct {
StatSession
}
type PeriodRecord struct {
ringBuf []RecordPerSec
nRecord int
}
// ---------------------------------------------------------------------------------------------------------------------
func Session2StatPub(session ISession) StatPub {
@ -94,3 +106,61 @@ func Session2StatPull(session ISession) StatPull {
session.GetStat(),
}
}
/**
@note result s.Fps is not ordered
*/
func (s *StatGroup) GetFpsFrom(p *PeriodRecord, nowUnixSec int64) {
if s.Fps == nil || cap(s.Fps) < p.nRecord {
s.Fps = make([]RecordPerSec, p.nRecord)
} else {
s.Fps = s.Fps[0:p.nRecord]
}
nRecord := 0
p.nRecord = 0
for idx, record := range p.ringBuf {
if record.UnixSec == 0 {
continue
}
if record.UnixSec == nowUnixSec {
// value at nowUnixSec not completely recorded
p.nRecord++
continue
}
s.Fps[nRecord] = record
nRecord++
p.ringBuf[idx].UnixSec = 0
}
s.Fps = s.Fps[0:nRecord]
}
func NewPeriodRecord(bufSize int) PeriodRecord {
return PeriodRecord{
ringBuf: make([]RecordPerSec, bufSize),
nRecord: 0,
}
}
func (p *PeriodRecord) Add(unixSec int64, v uint32) {
var index int64
var record RecordPerSec
index = unixSec % int64(len(p.ringBuf))
record = p.ringBuf[index]
if record.UnixSec == unixSec {
p.ringBuf[index].V = record.V + v
} else {
if record.UnixSec == 0 {
p.nRecord++
}
p.ringBuf[index].UnixSec = unixSec
p.ringBuf[index].V = v
}
return
}
func (p *PeriodRecord) Clear() {
for idx := range p.ringBuf {
p.ringBuf[idx].UnixSec = 0
p.ringBuf[idx].V = 0
}
}

@ -0,0 +1,133 @@
package base
import (
"fmt"
"strings"
"testing"
)
func ringBufToStr(ringBuf []RecordPerSec) string {
var buf strings.Builder
for idx, record := range ringBuf {
if record.UnixSec == 0 {
continue
}
record_str := fmt.Sprintf(" [%d]:{%d,%d}", idx, record.UnixSec, record.V)
buf.WriteString(record_str)
}
return buf.String()
}
func TestPeriodRecord(t *testing.T) {
records := NewPeriodRecord(16)
expected_fps := []RecordPerSec{
{UnixSec: 1, V: 11},
{UnixSec: 3, V: 13},
{UnixSec: 9, V: 19},
{UnixSec: 16, V: 26},
{UnixSec: 17, V: 27}, // fpsRingBuf len 16, so replace 1
{UnixSec: 19, V: 29}, // replace 3
{UnixSec: 25, V: 35}, // replace 9
{UnixSec: 26, V: 36},
}
expected_n := 5
for _, record := range expected_fps {
records.Add(record.UnixSec, 1)
records.Add(record.UnixSec, 2)
records.Add(record.UnixSec, 3)
records.Add(record.UnixSec, record.V-6)
t.Logf("records.nRecord:%d", records.nRecord)
}
if records.nRecord != expected_n {
t.Fatalf("nFpsRecord not match, got:%d expect:%d, %s",
records.nRecord, expected_n, ringBufToStr(records.ringBuf))
}
nowUnixSec := int64(26)
expected_n = 4 // 26 not complete
statSess := StatGroup{}
statSess.GetFpsFrom(&records, nowUnixSec)
// { 16, 17, 19, 25 }
// 26 not complete
if len(statSess.Fps) != expected_n {
t.Fatalf("len(statSess.Fps) not match, got:%d expect:%d, %s",
len(statSess.Fps), expected_n, ringBufToStr(statSess.Fps))
}
funCmpRecord := func(index int, record *RecordPerSec, expect *RecordPerSec) {
if record.UnixSec != expect.UnixSec {
t.Fatalf("index:%d UnixSec not match, got:%d expect:%d",
index, record.UnixSec, expect.UnixSec)
}
if record.V != expect.V {
t.Fatalf("index:%d V not match, got:%d expect:%d", index, record.V, expect.V)
}
}
funCmpRecord(0, &statSess.Fps[0], &expected_fps[3])
funCmpRecord(1, &statSess.Fps[1], &expected_fps[4])
funCmpRecord(2, &statSess.Fps[2], &expected_fps[5])
funCmpRecord(3, &statSess.Fps[3], &expected_fps[6])
t.Log(ringBufToStr(statSess.Fps))
t.Log("2nd period record test")
expected_fps_2nd := []RecordPerSec{
{UnixSec: 0 + 16*2, V: 10 + 16*2},
{UnixSec: 1 + 16*2, V: 11 + 16*2},
{UnixSec: 3 + 16*2, V: 3 + 16*2},
{UnixSec: 9 + 16*2, V: 9 + 16*2},
{UnixSec: 11 + 16*2, V: 11 + 16*2},
}
nowUnixSec = 11 + 16*2
expected_n = 5
// records = { 26 }
for _, record := range expected_fps_2nd {
records.Add(record.UnixSec, record.V)
}
statSess.GetFpsFrom(&records, nowUnixSec)
// {32, 33, 35, 41, 26}
// 11 + 16*2 not complete
if len(statSess.Fps) != expected_n {
t.Fatalf("len(statSess.Fps) not match, got:%d expect:%d, %s",
len(statSess.Fps), expected_n, ringBufToStr(statSess.Fps))
}
funCmpRecord(0, &statSess.Fps[0], &expected_fps_2nd[0])
funCmpRecord(1, &statSess.Fps[1], &expected_fps_2nd[1])
funCmpRecord(2, &statSess.Fps[2], &expected_fps_2nd[2])
funCmpRecord(3, &statSess.Fps[3], &expected_fps_2nd[3])
funCmpRecord(4, &statSess.Fps[4], &expected_fps[7])
t.Log(ringBufToStr(statSess.Fps))
t.Log("3rd period record test")
expected_fps = []RecordPerSec{
{UnixSec: 0 + 16*3, V: 10 + 16*3},
{UnixSec: 1 + 16*3, V: 11 + 16*3},
{UnixSec: 3 + 16*3, V: 13 + 16*3},
}
nowUnixSec = 3 + 16*3 + 1
expected_n = 4
// records = { 11 + 16*2 }
for _, record := range expected_fps {
records.Add(record.UnixSec, record.V)
}
statSess.GetFpsFrom(&records, nowUnixSec)
if len(statSess.Fps) != expected_n {
t.Fatalf("len(statSess.Fps) not match, got:%d expect:%d, %s",
len(statSess.Fps), expected_n, ringBufToStr(statSess.Fps))
}
funCmpRecord(0, &statSess.Fps[0], &expected_fps[0])
funCmpRecord(1, &statSess.Fps[1], &expected_fps[1])
funCmpRecord(2, &statSess.Fps[2], &expected_fps[2])
funCmpRecord(3, &statSess.Fps[3], &expected_fps_2nd[4])
t.Log(ringBufToStr(statSess.Fps))
}

@ -12,6 +12,7 @@ import (
"encoding/json"
"strings"
"sync"
"time"
"github.com/q191201771/lal/pkg/gb28181"
@ -135,6 +136,8 @@ type Group struct {
//
stat base.StatGroup
//
inVideoFpsRecords base.PeriodRecord
//
hlsCalcSessionStatIntervalSec uint32
//
psPubDumpFile *base.DumpFile
@ -166,6 +169,7 @@ func NewGroup(appName string, streamName string, config *Config, option GroupOpt
httpflvGopCache: remux.NewGopCache("httpflv", uk, config.HttpflvConfig.GopNum, config.HttpflvConfig.SingleGopMaxFrameNum),
httptsGopCache: remux.NewGopCacheMpegts(uk, config.HttptsConfig.GopNum, config.HttptsConfig.SingleGopMaxFrameNum),
psPubPrevInactiveCheckTick: -1,
inVideoFpsRecords: base.NewPeriodRecord(32),
}
g.hlsCalcSessionStatIntervalSec = uint32(config.HlsConfig.FragmentDurationMs / 100) // equals to (ms/1000) * 10
@ -275,6 +279,7 @@ func (group *Group) GetStat(maxsub int) base.StatGroup {
if group.rtmpPubSession != nil {
group.stat.StatPub = base.Session2StatPub(group.rtmpPubSession)
group.stat.GetFpsFrom(&group.inVideoFpsRecords, time.Now().Unix())
} else if group.rtspPubSession != nil {
group.stat.StatPub = base.Session2StatPub(group.rtspPubSession)
} else if group.psPubSession != nil {

@ -450,6 +450,9 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
}
}
}
if msg.Header.MsgTypeId == base.RtmpTypeIdVideo {
group.inVideoFpsRecords.Add(nazalog.Clock.Now().Unix(), 1)
}
}
// ---------------------------------------------------------------------------------------------------------------------

@ -11,11 +11,12 @@ package logic
import (
"errors"
"fmt"
"strings"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/naza/pkg/nazalog"
"strings"
"time"
"github.com/q191201771/lal/pkg/rtmp"
)
@ -117,6 +118,7 @@ func (group *Group) resetRelayPullSession() {
func (group *Group) getStatPull() base.StatPull {
if group.pullProxy.rtmpSession != nil {
group.stat.GetFpsFrom(&group.inVideoFpsRecords, time.Now().Unix())
return base.Session2StatPull(group.pullProxy.rtmpSession)
}
if group.pullProxy.rtspSession != nil {

Loading…
Cancel
Save