[refator] package httpflv: 新增FlvFilePump,可循环匀速读取flv文件

pull/95/head
q191201771 4 years ago
parent 293fd4d607
commit 6c6999d7f1

@ -24,7 +24,7 @@ import (
"github.com/q191201771/naza/pkg/nazamd5"
)
const detailFilename = "delay.txt"
const outDetailFilename = "delay.txt"
type PullType int
@ -54,24 +54,18 @@ func main() {
})
defer nazalog.Sync()
var mu sync.Mutex
tagKey2writeTime := make(map[string]time.Time)
var delays []int64
var mu sync.Mutex
filename, pushUrl, pullUrl, pullType := parseFlag()
nazalog.Infof("parse flag succ. filename=%s, pushUrl=%s, pullUrl=%s, pullType=%s",
filename, pushUrl, pullUrl, pullType.Readable())
tags, err := httpflv.ReadAllTagsFromFlvFile(filename)
if err != nil {
nazalog.Fatalf("read tags from flv file failed. err=%+v", err)
}
nazalog.Infof("read tags from flv file succ. len of tags=%d", len(tags))
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = 10000
})
err = pushSession.Push(pushUrl)
err := pushSession.Push(pushUrl)
if err != nil {
nazalog.Fatalf("push rtmp failed. url=%s, err=%+v", pushUrl, err)
}
@ -135,17 +129,14 @@ func main() {
}
}()
prevTs := int64(-1)
for _, tag := range tags {
// 读取flv文件
flvFilePump := httpflv.NewFileFilePump(func(option *httpflv.FlvFilePumpOption) {
option.IsRecursive = false
})
err = flvFilePump.Pump(filename, func(tag httpflv.Tag) bool {
h := remux.FlvTagHeader2RtmpHeader(tag.Header)
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h)
if prevTs >= 0 && int64(h.TimestampAbs) > prevTs {
diff := int64(h.TimestampAbs) - prevTs
time.Sleep(time.Duration(diff) * time.Millisecond)
}
prevTs = int64(h.TimestampAbs)
mu.Lock()
tagKey := nazamd5.Md5(tag.Raw[11 : 11+h.MsgLen])
if _, exist := tagKey2writeTime[tagKey]; exist {
@ -157,18 +148,24 @@ func main() {
err = pushSession.Write(chunks)
if err != nil {
nazalog.Fatalf("write failed. err=%+v", err)
return false
}
//nazalog.Debugf("sent. %d", i)
return true
})
if err != nil {
nazalog.Fatalf("pump flv file failed. err=%+v", err)
}
_ = pushSession.Flush()
time.Sleep(300 * time.Millisecond)
// 信息分析汇总输出
min := int64(2147483647)
max := int64(0)
avg := int64(0)
sum := int64(0)
fp, _ := os.Create(detailFilename)
defer fp.Close()
outDetailFp, _ := os.Create(outDetailFilename)
defer outDetailFp.Close()
for _, d := range delays {
if d < min {
min = d
@ -177,12 +174,12 @@ func main() {
max = d
}
sum += d
_, _ = fp.WriteString(fmt.Sprintf("%d\n", d))
_, _ = outDetailFp.WriteString(fmt.Sprintf("%d\n", d))
}
if len(delays) > 0 {
avg = sum / int64(len(delays))
}
nazalog.Debugf("len(tagKey2writeTime)=%d, delays(len=%d, avg=%d, min=%d, max=%d), detailFilename=%s", len(tagKey2writeTime), len(delays), avg, min, max, detailFilename)
nazalog.Debugf("len(tagKey2writeTime)=%d, delays(len=%d, avg=%d, min=%d, max=%d), detailFilename=%s", len(tagKey2writeTime), len(delays), avg, min, max, outDetailFilename)
}
func parseFlag() (filename, pushUrl, pullUrl string, pullType PullType) {

@ -59,8 +59,9 @@ func main() {
urls := collect(urlTmpl, num)
tags, err := httpflv.ReadAllTagsFromFlvFile(filename)
if err != nil {
nazalog.Fatalf("read tags from flv file failed. err=%+v", err)
if err != nil || len(tags) == 0 {
nazalog.Fatalf("read tags from flv file failed. len=%d, err=%+v", len(tags), err)
return
}
nazalog.Infof("read tags from flv file succ. len of tags=%d", len(tags))
@ -75,7 +76,7 @@ func main() {
wg.Add(len(urls))
for _, url := range urls {
go func(u string) {
push(tags, []string{u}, isRecursive)
push(tags, u, isRecursive)
wg.Done()
atomic.AddInt32(&aliveSessionCount, -1)
}(url)
@ -85,141 +86,33 @@ func main() {
nazalog.Info("< main.")
}
func push(tags []httpflv.Tag, urls []string, isRecursive bool) {
var sessionList []*rtmp.PushSession
if len(tags) == 0 || len(urls) == 0 {
return
}
var err error
for i := range urls {
func push(tags []httpflv.Tag, url string, isRecursive bool) {
ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = 5000
option.WriteAvTimeoutMs = 10000
})
err = ps.Push(urls[i])
if err != nil {
if err := ps.Push(url); err != nil {
nazalog.Errorf("push failed. err=%v", err)
continue
return
}
atomic.AddInt32(&aliveSessionCount, 1)
nazalog.Infof("push succ. url=%s", urls[i])
sessionList = append(sessionList, ps)
}
check(sessionList)
var totalBaseTs uint32 // 每轮最后更新
var prevTs uint32 // 上一个tag
var hasReadThisBaseTs bool
var thisBaseTs uint32 // 每轮第一个tag
var hasTraceFirstTagTs bool
var firstTagTs uint32 // 所有轮第一个tag
var firstTagTick int64 // 所有轮第一个tag的物理发送时间
// 1. 保证metadata只在最初发送一次
// 2. 多轮,时间戳会翻转,需要处理,让它线性增长
nazalog.Infof("push succ. url=%s", url)
// 多轮,一个循环代表一次完整文件的发送
for i := 0; ; i++ {
nazalog.Infof(" > round. i=%d, totalBaseTs=%d, prevTs=%d, thisBaseTs=%d",
i, totalBaseTs, prevTs, thisBaseTs)
hasReadThisBaseTs = false
// 一轮遍历文件的所有tag数据
for _, tag := range tags {
flvFilePump := httpflv.NewFileFilePump(func(option *httpflv.FlvFilePumpOption) {
option.IsRecursive = isRecursive
})
_ = flvFilePump.PumpWithTags(tags, func(tag httpflv.Tag) bool {
h := remux.FlvTagHeader2RtmpHeader(tag.Header)
// metadata只发送一次
if tag.IsMetadata() {
if totalBaseTs == 0 {
h.TimestampAbs = 0
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h)
send(sessionList, chunks)
} else {
// noop
}
continue
}
if hasReadThisBaseTs {
// 本轮非第一个tag
// 之前已经读到了这轮读文件的base值ts要减去base
h.TimestampAbs = tag.Header.Timestamp - thisBaseTs + totalBaseTs
} else {
// 本轮第一个tag
// 设置basets设置为上一轮读文件的值
thisBaseTs = tag.Header.Timestamp
h.TimestampAbs = totalBaseTs
hasReadThisBaseTs = true
}
if h.TimestampAbs < prevTs {
// ts比上一个包的还小直接设置为上一包的值并且不sleep直接发送
h.TimestampAbs = prevTs
nazalog.Errorf("this tag timestamp less than prev timestamp. h.TimestampAbs=%d, prevTs=%d", h.TimestampAbs, prevTs)
}
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h)
if hasTraceFirstTagTs {
// 所有轮的非第一个tag
// 当前距离第一个tag的物理发送时间以及距离第一个tag的时间戳
// 如果物理时间短,就睡眠相应的时间
n := time.Now().UnixNano() / 1000000
diffTick := n - firstTagTick
diffTs := h.TimestampAbs - firstTagTs
if diffTick < int64(diffTs) {
time.Sleep(time.Duration(int64(diffTs)-diffTick) * time.Millisecond)
}
} else {
// 所有轮的第一个tag
// 记录所有轮的第一个tag的物理发送时间以及数据的时间戳
firstTagTick = time.Now().UnixNano() / 1000000
firstTagTs = h.TimestampAbs
hasTraceFirstTagTs = true
}
send(sessionList, chunks)
prevTs = h.TimestampAbs
} // tags for loop
totalBaseTs = prevTs + 1
if !isRecursive {
break
}
}
}
func send(sessionList []*rtmp.PushSession, b []byte) {
var s []*rtmp.PushSession
for _, ps := range sessionList {
if err := ps.Write(b); err != nil {
if err := ps.Write(chunks); err != nil {
nazalog.Errorf("write data error. err=%v", err)
continue
}
s = append(s, ps)
}
sessionList = s
check(sessionList)
}
func check(sessionList []*rtmp.PushSession) {
if len(sessionList) == 0 {
nazalog.Errorf("all push session dead.")
os.Exit(1)
return false
}
return true
})
}
func collect(urlTmpl string, num int) (urls []string) {

@ -0,0 +1,143 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpflv
import (
"time"
"github.com/q191201771/naza/pkg/nazalog"
)
// 读取flv文件将tag按时间戳间隔缓慢类似于ffmpeg的-re返回
type FlvFilePumpOption struct {
IsRecursive bool // 如果为true则循环返回文件内容类似于ffmpeg的-stream_loop -1
}
var defaultFlvFilePumpOption = FlvFilePumpOption{
IsRecursive: false,
}
type FlvFilePump struct {
option FlvFilePumpOption
}
type ModFlvFilePumpOption func(option *FlvFilePumpOption)
func NewFileFilePump(modOptions ...ModFlvFilePumpOption) *FlvFilePump {
option := defaultFlvFilePumpOption
for _, fn := range modOptions {
fn(&option)
}
return &FlvFilePump{option: option}
}
type OnPumpFlvTag func(tag Tag) bool
// @param onFlvTag 如果回调中返回false则停止Pump
//
func (f *FlvFilePump) Pump(filename string, onFlvTag OnPumpFlvTag) error {
// 一次性将文件所有内容读入内存,后续不再读取文件
tags, err := ReadAllTagsFromFlvFile(filename)
if err != nil {
return err
}
return f.PumpWithTags(tags, onFlvTag)
}
// @return error 暂时只做预留目前只会返回nil
//
func (f *FlvFilePump) PumpWithTags(tags []Tag, onFlvTag OnPumpFlvTag) error {
var totalBaseTs uint32 // 整体的基础时间戳。每轮最后更新
var hasReadThisBaseTs bool
var thisBaseTs uint32 // 每一轮的第一个tag时间戳
var prevTagTs uint32 // 上一个tag的时间戳
var hasReadTotalFirstTag bool
var totalFirstTagTs uint32 // 第一轮的第一个tag的时间戳
var totalFirstTagTick int64 // 第一轮的第一个tag的物理时间
const addTsBetweenRound = 1
// 循环一次,代表遍历文件一次
for roundIndex := 0; ; roundIndex++ {
nazalog.Debugf("new round. index=%d", roundIndex)
hasReadThisBaseTs = false
// 遍历所有tag数据
for _, tag := range tags {
// metadata只在第一轮发送一次
if tag.IsMetadata() {
if totalBaseTs == 0 {
tag.Header.Timestamp = 0
if !onFlvTag(tag) {
return nil
}
}
continue
}
// 修改时间戳
// 使得不同轮依然线性增长
if !hasReadThisBaseTs {
// 本轮第一个tag
thisBaseTs = tag.Header.Timestamp
hasReadThisBaseTs = true
tag.Header.Timestamp = totalBaseTs
} else {
tag.Header.Timestamp = totalBaseTs + tag.Header.Timestamp - thisBaseTs
}
// 修改时间戳
// 如果时间戳比前一个tag的还小可能发生了跳跃我们直接设置为上一包的值+1然后不sleep直接发送
if tag.Header.Timestamp < prevTagTs {
tag.Header.Timestamp = prevTagTs + 1
}
if hasReadTotalFirstTag {
// 当前时间戳与第一轮的第一个tag的时间戳差值
diffTs := tag.Header.Timestamp - totalFirstTagTs
// 当前物理时间与第一轮的第一个tag的物理时间差值
diffTick := time.Now().UnixNano()/1000000 - totalFirstTagTick
// 如果还没到物理时间差值就sleep
if diffTick < int64(diffTs) {
time.Sleep(time.Duration(int64(diffTs)-diffTick) * time.Millisecond)
}
} else {
// 第一轮的第一个tag记录下来
totalFirstTagTick = time.Now().UnixNano() / 1000000
totalFirstTagTs = tag.Header.Timestamp
hasReadTotalFirstTag = true
}
if !onFlvTag(tag) {
return nil
}
prevTagTs = tag.Header.Timestamp
}
totalBaseTs = prevTagTs + addTsBetweenRound
if !f.option.IsRecursive {
break
}
}
return nil
}
Loading…
Cancel
Save