diff --git a/pkg/base/t_http_an__.go b/pkg/base/t_http_an__.go index 15f3f25..d3314be 100644 --- a/pkg/base/t_http_an__.go +++ b/pkg/base/t_http_an__.go @@ -44,6 +44,13 @@ type StatGroup struct { StatPub StatPub `json:"pub"` StatSubs []StatSub `json:"subs"` // TODO(chef): [opt] 增加数量字段,因为这里不一定全部放入 StatPull StatPull `json:"pull"` + + Fps []RecordPerSec `json:"in_frame_per_sec"` +} + +type RecordPerSec struct { + UnixSec int64 `json:"unix_sec"` + V uint32 `json:"v"` } type StatSession struct { @@ -75,6 +82,11 @@ type StatPull struct { StatSession } +type PeriodRecord struct { + ringBuf []RecordPerSec + nRecord int +} + // --------------------------------------------------------------------------------------------------------------------- func Session2StatPub(session ISession) StatPub { @@ -94,3 +106,61 @@ func Session2StatPull(session ISession) StatPull { session.GetStat(), } } + +/** + @note result s.Fps is not ordered +*/ +func (s *StatGroup) GetFpsFrom(p *PeriodRecord, nowUnixSec int64) { + if s.Fps == nil || cap(s.Fps) < p.nRecord { + s.Fps = make([]RecordPerSec, p.nRecord) + } else { + s.Fps = s.Fps[0:p.nRecord] + } + nRecord := 0 + p.nRecord = 0 + for idx, record := range p.ringBuf { + if record.UnixSec == 0 { + continue + } + if record.UnixSec == nowUnixSec { + // value at nowUnixSec not completely recorded + p.nRecord++ + continue + } + s.Fps[nRecord] = record + nRecord++ + p.ringBuf[idx].UnixSec = 0 + } + s.Fps = s.Fps[0:nRecord] +} + +func NewPeriodRecord(bufSize int) PeriodRecord { + return PeriodRecord{ + ringBuf: make([]RecordPerSec, bufSize), + nRecord: 0, + } +} + +func (p *PeriodRecord) Add(unixSec int64, v uint32) { + var index int64 + var record RecordPerSec + index = unixSec % int64(len(p.ringBuf)) + record = p.ringBuf[index] + if record.UnixSec == unixSec { + p.ringBuf[index].V = record.V + v + } else { + if record.UnixSec == 0 { + p.nRecord++ + } + p.ringBuf[index].UnixSec = unixSec + p.ringBuf[index].V = v + } + return +} + +func (p *PeriodRecord) Clear() { + for idx := range p.ringBuf { + p.ringBuf[idx].UnixSec = 0 + p.ringBuf[idx].V = 0 + } +} diff --git a/pkg/base/t_http_an__test.go b/pkg/base/t_http_an__test.go new file mode 100644 index 0000000..fe8274c --- /dev/null +++ b/pkg/base/t_http_an__test.go @@ -0,0 +1,133 @@ +package base + +import ( + "fmt" + "strings" + "testing" +) + +func ringBufToStr(ringBuf []RecordPerSec) string { + var buf strings.Builder + for idx, record := range ringBuf { + if record.UnixSec == 0 { + continue + } + record_str := fmt.Sprintf(" [%d]:{%d,%d}", idx, record.UnixSec, record.V) + buf.WriteString(record_str) + } + return buf.String() +} + +func TestPeriodRecord(t *testing.T) { + records := NewPeriodRecord(16) + + expected_fps := []RecordPerSec{ + {UnixSec: 1, V: 11}, + {UnixSec: 3, V: 13}, + {UnixSec: 9, V: 19}, + {UnixSec: 16, V: 26}, + {UnixSec: 17, V: 27}, // fpsRingBuf len 16, so replace 1 + {UnixSec: 19, V: 29}, // replace 3 + {UnixSec: 25, V: 35}, // replace 9 + {UnixSec: 26, V: 36}, + } + expected_n := 5 + + for _, record := range expected_fps { + records.Add(record.UnixSec, 1) + records.Add(record.UnixSec, 2) + records.Add(record.UnixSec, 3) + records.Add(record.UnixSec, record.V-6) + t.Logf("records.nRecord:%d", records.nRecord) + } + + if records.nRecord != expected_n { + t.Fatalf("nFpsRecord not match, got:%d expect:%d, %s", + records.nRecord, expected_n, ringBufToStr(records.ringBuf)) + } + + nowUnixSec := int64(26) + expected_n = 4 // 26 not complete + statSess := StatGroup{} + statSess.GetFpsFrom(&records, nowUnixSec) + // { 16, 17, 19, 25 } + // 26 not complete + if len(statSess.Fps) != expected_n { + t.Fatalf("len(statSess.Fps) not match, got:%d expect:%d, %s", + len(statSess.Fps), expected_n, ringBufToStr(statSess.Fps)) + } + + funCmpRecord := func(index int, record *RecordPerSec, expect *RecordPerSec) { + if record.UnixSec != expect.UnixSec { + t.Fatalf("index:%d UnixSec not match, got:%d expect:%d", + index, record.UnixSec, expect.UnixSec) + } + if record.V != expect.V { + t.Fatalf("index:%d V not match, got:%d expect:%d", index, record.V, expect.V) + } + } + + funCmpRecord(0, &statSess.Fps[0], &expected_fps[3]) + funCmpRecord(1, &statSess.Fps[1], &expected_fps[4]) + funCmpRecord(2, &statSess.Fps[2], &expected_fps[5]) + funCmpRecord(3, &statSess.Fps[3], &expected_fps[6]) + t.Log(ringBufToStr(statSess.Fps)) + + t.Log("2nd period record test") + + expected_fps_2nd := []RecordPerSec{ + {UnixSec: 0 + 16*2, V: 10 + 16*2}, + {UnixSec: 1 + 16*2, V: 11 + 16*2}, + {UnixSec: 3 + 16*2, V: 3 + 16*2}, + {UnixSec: 9 + 16*2, V: 9 + 16*2}, + {UnixSec: 11 + 16*2, V: 11 + 16*2}, + } + nowUnixSec = 11 + 16*2 + expected_n = 5 + // records = { 26 } + for _, record := range expected_fps_2nd { + records.Add(record.UnixSec, record.V) + } + statSess.GetFpsFrom(&records, nowUnixSec) + + // {32, 33, 35, 41, 26} + // 11 + 16*2 not complete + if len(statSess.Fps) != expected_n { + t.Fatalf("len(statSess.Fps) not match, got:%d expect:%d, %s", + len(statSess.Fps), expected_n, ringBufToStr(statSess.Fps)) + } + funCmpRecord(0, &statSess.Fps[0], &expected_fps_2nd[0]) + funCmpRecord(1, &statSess.Fps[1], &expected_fps_2nd[1]) + funCmpRecord(2, &statSess.Fps[2], &expected_fps_2nd[2]) + funCmpRecord(3, &statSess.Fps[3], &expected_fps_2nd[3]) + funCmpRecord(4, &statSess.Fps[4], &expected_fps[7]) + + t.Log(ringBufToStr(statSess.Fps)) + + t.Log("3rd period record test") + + expected_fps = []RecordPerSec{ + {UnixSec: 0 + 16*3, V: 10 + 16*3}, + {UnixSec: 1 + 16*3, V: 11 + 16*3}, + {UnixSec: 3 + 16*3, V: 13 + 16*3}, + } + nowUnixSec = 3 + 16*3 + 1 + expected_n = 4 + + // records = { 11 + 16*2 } + for _, record := range expected_fps { + records.Add(record.UnixSec, record.V) + } + statSess.GetFpsFrom(&records, nowUnixSec) + + if len(statSess.Fps) != expected_n { + t.Fatalf("len(statSess.Fps) not match, got:%d expect:%d, %s", + len(statSess.Fps), expected_n, ringBufToStr(statSess.Fps)) + } + + funCmpRecord(0, &statSess.Fps[0], &expected_fps[0]) + funCmpRecord(1, &statSess.Fps[1], &expected_fps[1]) + funCmpRecord(2, &statSess.Fps[2], &expected_fps[2]) + funCmpRecord(3, &statSess.Fps[3], &expected_fps_2nd[4]) + t.Log(ringBufToStr(statSess.Fps)) +} diff --git a/pkg/logic/group__.go b/pkg/logic/group__.go index dc4e18d..c9d9e90 100644 --- a/pkg/logic/group__.go +++ b/pkg/logic/group__.go @@ -12,6 +12,7 @@ import ( "encoding/json" "strings" "sync" + "time" "github.com/q191201771/lal/pkg/gb28181" @@ -135,6 +136,8 @@ type Group struct { // stat base.StatGroup // + inVideoFpsRecords base.PeriodRecord + // hlsCalcSessionStatIntervalSec uint32 // psPubDumpFile *base.DumpFile @@ -166,6 +169,7 @@ func NewGroup(appName string, streamName string, config *Config, option GroupOpt httpflvGopCache: remux.NewGopCache("httpflv", uk, config.HttpflvConfig.GopNum, config.HttpflvConfig.SingleGopMaxFrameNum), httptsGopCache: remux.NewGopCacheMpegts(uk, config.HttptsConfig.GopNum, config.HttptsConfig.SingleGopMaxFrameNum), psPubPrevInactiveCheckTick: -1, + inVideoFpsRecords: base.NewPeriodRecord(32), } g.hlsCalcSessionStatIntervalSec = uint32(config.HlsConfig.FragmentDurationMs / 100) // equals to (ms/1000) * 10 @@ -275,6 +279,7 @@ func (group *Group) GetStat(maxsub int) base.StatGroup { if group.rtmpPubSession != nil { group.stat.StatPub = base.Session2StatPub(group.rtmpPubSession) + group.stat.GetFpsFrom(&group.inVideoFpsRecords, time.Now().Unix()) } else if group.rtspPubSession != nil { group.stat.StatPub = base.Session2StatPub(group.rtspPubSession) } else if group.psPubSession != nil { diff --git a/pkg/logic/group__core_streaming.go b/pkg/logic/group__core_streaming.go index df8b46a..e7a1af5 100644 --- a/pkg/logic/group__core_streaming.go +++ b/pkg/logic/group__core_streaming.go @@ -450,6 +450,9 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { } } } + if msg.Header.MsgTypeId == base.RtmpTypeIdVideo { + group.inVideoFpsRecords.Add(nazalog.Clock.Now().Unix(), 1) + } } // --------------------------------------------------------------------------------------------------------------------- diff --git a/pkg/logic/group__relay_pull.go b/pkg/logic/group__relay_pull.go index c4d1182..b68f30d 100644 --- a/pkg/logic/group__relay_pull.go +++ b/pkg/logic/group__relay_pull.go @@ -11,11 +11,12 @@ package logic import ( "errors" "fmt" + "strings" + "time" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/rtsp" "github.com/q191201771/naza/pkg/nazalog" - "strings" - "time" "github.com/q191201771/lal/pkg/rtmp" ) @@ -117,6 +118,7 @@ func (group *Group) resetRelayPullSession() { func (group *Group) getStatPull() base.StatPull { if group.pullProxy.rtmpSession != nil { + group.stat.GetFpsFrom(&group.inVideoFpsRecords, time.Now().Unix()) return base.Session2StatPull(group.pullProxy.rtmpSession) } if group.pullProxy.rtspSession != nil {