diff --git a/app/demo/calcrtmpdelay/calcrtmpdelay.go b/app/demo/calcrtmpdelay/calcrtmpdelay.go index 89950a4..d1e596b 100644 --- a/app/demo/calcrtmpdelay/calcrtmpdelay.go +++ b/app/demo/calcrtmpdelay/calcrtmpdelay.go @@ -24,7 +24,7 @@ import ( "github.com/q191201771/naza/pkg/nazamd5" ) -const detailFilename = "delay.txt" +const outDetailFilename = "delay.txt" type PullType int @@ -54,24 +54,18 @@ func main() { }) defer nazalog.Sync() + var mu sync.Mutex tagKey2writeTime := make(map[string]time.Time) var delays []int64 - var mu sync.Mutex filename, pushUrl, pullUrl, pullType := parseFlag() nazalog.Infof("parse flag succ. filename=%s, pushUrl=%s, pullUrl=%s, pullType=%s", filename, pushUrl, pullUrl, pullType.Readable()) - tags, err := httpflv.ReadAllTagsFromFlvFile(filename) - if err != nil { - nazalog.Fatalf("read tags from flv file failed. err=%+v", err) - } - nazalog.Infof("read tags from flv file succ. len of tags=%d", len(tags)) - pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { option.PushTimeoutMs = 10000 }) - err = pushSession.Push(pushUrl) + err := pushSession.Push(pushUrl) if err != nil { nazalog.Fatalf("push rtmp failed. url=%s, err=%+v", pushUrl, err) } @@ -135,17 +129,14 @@ func main() { } }() - prevTs := int64(-1) - for _, tag := range tags { + // 读取flv文件 + flvFilePump := httpflv.NewFileFilePump(func(option *httpflv.FlvFilePumpOption) { + option.IsRecursive = false + }) + err = flvFilePump.Pump(filename, func(tag httpflv.Tag) bool { h := remux.FlvTagHeader2RtmpHeader(tag.Header) chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h) - if prevTs >= 0 && int64(h.TimestampAbs) > prevTs { - diff := int64(h.TimestampAbs) - prevTs - time.Sleep(time.Duration(diff) * time.Millisecond) - } - prevTs = int64(h.TimestampAbs) - mu.Lock() tagKey := nazamd5.Md5(tag.Raw[11 : 11+h.MsgLen]) if _, exist := tagKey2writeTime[tagKey]; exist { @@ -157,18 +148,24 @@ func main() { err = pushSession.Write(chunks) if err != nil { nazalog.Fatalf("write failed. err=%+v", err) + return false } - //nazalog.Debugf("sent. %d", i) + return true + }) + if err != nil { + nazalog.Fatalf("pump flv file failed. err=%+v", err) } + _ = pushSession.Flush() time.Sleep(300 * time.Millisecond) + // 信息分析汇总输出 min := int64(2147483647) max := int64(0) avg := int64(0) sum := int64(0) - fp, _ := os.Create(detailFilename) - defer fp.Close() + outDetailFp, _ := os.Create(outDetailFilename) + defer outDetailFp.Close() for _, d := range delays { if d < min { min = d @@ -177,12 +174,12 @@ func main() { max = d } sum += d - _, _ = fp.WriteString(fmt.Sprintf("%d\n", d)) + _, _ = outDetailFp.WriteString(fmt.Sprintf("%d\n", d)) } if len(delays) > 0 { avg = sum / int64(len(delays)) } - nazalog.Debugf("len(tagKey2writeTime)=%d, delays(len=%d, avg=%d, min=%d, max=%d), detailFilename=%s", len(tagKey2writeTime), len(delays), avg, min, max, detailFilename) + nazalog.Debugf("len(tagKey2writeTime)=%d, delays(len=%d, avg=%d, min=%d, max=%d), detailFilename=%s", len(tagKey2writeTime), len(delays), avg, min, max, outDetailFilename) } func parseFlag() (filename, pushUrl, pullUrl string, pullType PullType) { diff --git a/app/demo/pushrtmp/pushrtmp.go b/app/demo/pushrtmp/pushrtmp.go index ae1909f..9cb5641 100644 --- a/app/demo/pushrtmp/pushrtmp.go +++ b/app/demo/pushrtmp/pushrtmp.go @@ -59,8 +59,9 @@ func main() { urls := collect(urlTmpl, num) tags, err := httpflv.ReadAllTagsFromFlvFile(filename) - if err != nil { - nazalog.Fatalf("read tags from flv file failed. err=%+v", err) + if err != nil || len(tags) == 0 { + nazalog.Fatalf("read tags from flv file failed. len=%d, err=%+v", len(tags), err) + return } nazalog.Infof("read tags from flv file succ. len of tags=%d", len(tags)) @@ -75,7 +76,7 @@ func main() { wg.Add(len(urls)) for _, url := range urls { go func(u string) { - push(tags, []string{u}, isRecursive) + push(tags, u, isRecursive) wg.Done() atomic.AddInt32(&aliveSessionCount, -1) }(url) @@ -85,141 +86,33 @@ func main() { nazalog.Info("< main.") } -func push(tags []httpflv.Tag, urls []string, isRecursive bool) { - var sessionList []*rtmp.PushSession +func push(tags []httpflv.Tag, url string, isRecursive bool) { + ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { + option.PushTimeoutMs = 5000 + option.WriteAvTimeoutMs = 10000 + }) - if len(tags) == 0 || len(urls) == 0 { + if err := ps.Push(url); err != nil { + nazalog.Errorf("push failed. err=%v", err) return } + atomic.AddInt32(&aliveSessionCount, 1) - var err error - - for i := range urls { - ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.PushTimeoutMs = 5000 - option.WriteAvTimeoutMs = 10000 - }) - - err = ps.Push(urls[i]) - if err != nil { - nazalog.Errorf("push failed. err=%v", err) - continue - } - atomic.AddInt32(&aliveSessionCount, 1) - - nazalog.Infof("push succ. url=%s", urls[i]) - sessionList = append(sessionList, ps) - } - check(sessionList) - - var totalBaseTs uint32 // 每轮最后更新 - var prevTs uint32 // 上一个tag - var hasReadThisBaseTs bool - var thisBaseTs uint32 // 每轮第一个tag - var hasTraceFirstTagTs bool - var firstTagTs uint32 // 所有轮第一个tag - var firstTagTick int64 // 所有轮第一个tag的物理发送时间 - - // 1. 保证metadata只在最初发送一次 - // 2. 多轮,时间戳会翻转,需要处理,让它线性增长 - - // 多轮,一个循环代表一次完整文件的发送 - for i := 0; ; i++ { - nazalog.Infof(" > round. i=%d, totalBaseTs=%d, prevTs=%d, thisBaseTs=%d", - i, totalBaseTs, prevTs, thisBaseTs) - - hasReadThisBaseTs = false - - // 一轮,遍历文件的所有tag数据 - for _, tag := range tags { - h := remux.FlvTagHeader2RtmpHeader(tag.Header) - - // metadata只发送一次 - if tag.IsMetadata() { - if totalBaseTs == 0 { - h.TimestampAbs = 0 - chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h) - send(sessionList, chunks) - } else { - // noop - } - continue - } - - if hasReadThisBaseTs { - // 本轮非第一个tag - - // 之前已经读到了这轮读文件的base值,ts要减去base - h.TimestampAbs = tag.Header.Timestamp - thisBaseTs + totalBaseTs - } else { - // 本轮第一个tag - - // 设置base,ts设置为上一轮读文件的值 - thisBaseTs = tag.Header.Timestamp - h.TimestampAbs = totalBaseTs - hasReadThisBaseTs = true - } + nazalog.Infof("push succ. url=%s", url) - if h.TimestampAbs < prevTs { - // ts比上一个包的还小,直接设置为上一包的值,并且不sleep直接发送 - h.TimestampAbs = prevTs - nazalog.Errorf("this tag timestamp less than prev timestamp. h.TimestampAbs=%d, prevTs=%d", h.TimestampAbs, prevTs) - } + flvFilePump := httpflv.NewFileFilePump(func(option *httpflv.FlvFilePumpOption) { + option.IsRecursive = isRecursive + }) + _ = flvFilePump.PumpWithTags(tags, func(tag httpflv.Tag) bool { + h := remux.FlvTagHeader2RtmpHeader(tag.Header) + chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h) - chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h) - - if hasTraceFirstTagTs { - // 所有轮的非第一个tag - - // 当前距离第一个tag的物理发送时间,以及距离第一个tag的时间戳 - // 如果物理时间短,就睡眠相应的时间 - n := time.Now().UnixNano() / 1000000 - diffTick := n - firstTagTick - diffTs := h.TimestampAbs - firstTagTs - if diffTick < int64(diffTs) { - time.Sleep(time.Duration(int64(diffTs)-diffTick) * time.Millisecond) - } - } else { - // 所有轮的第一个tag - - // 记录所有轮的第一个tag的物理发送时间,以及数据的时间戳 - firstTagTick = time.Now().UnixNano() / 1000000 - firstTagTs = h.TimestampAbs - hasTraceFirstTagTs = true - } - - send(sessionList, chunks) - - prevTs = h.TimestampAbs - } // tags for loop - - totalBaseTs = prevTs + 1 - - if !isRecursive { - break - } - } -} - -func send(sessionList []*rtmp.PushSession, b []byte) { - var s []*rtmp.PushSession - for _, ps := range sessionList { - if err := ps.Write(b); err != nil { + if err := ps.Write(chunks); err != nil { nazalog.Errorf("write data error. err=%v", err) - continue + return false } - s = append(s, ps) - } - sessionList = s - - check(sessionList) -} - -func check(sessionList []*rtmp.PushSession) { - if len(sessionList) == 0 { - nazalog.Errorf("all push session dead.") - os.Exit(1) - } + return true + }) } func collect(urlTmpl string, num int) (urls []string) { diff --git a/pkg/httpflv/flv_file_pump.go b/pkg/httpflv/flv_file_pump.go new file mode 100644 index 0000000..820b761 --- /dev/null +++ b/pkg/httpflv/flv_file_pump.go @@ -0,0 +1,143 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package httpflv + +import ( + "time" + + "github.com/q191201771/naza/pkg/nazalog" +) + +// 读取flv文件,将tag按时间戳间隔缓慢(类似于ffmpeg的-re)返回 + +type FlvFilePumpOption struct { + IsRecursive bool // 如果为true,则循环返回文件内容(类似于ffmpeg的-stream_loop -1) +} + +var defaultFlvFilePumpOption = FlvFilePumpOption{ + IsRecursive: false, +} + +type FlvFilePump struct { + option FlvFilePumpOption +} + +type ModFlvFilePumpOption func(option *FlvFilePumpOption) + +func NewFileFilePump(modOptions ...ModFlvFilePumpOption) *FlvFilePump { + option := defaultFlvFilePumpOption + for _, fn := range modOptions { + fn(&option) + } + + return &FlvFilePump{option: option} +} + +type OnPumpFlvTag func(tag Tag) bool + +// @param onFlvTag 如果回调中返回false,则停止Pump +// +func (f *FlvFilePump) Pump(filename string, onFlvTag OnPumpFlvTag) error { + // 一次性将文件所有内容读入内存,后续不再读取文件 + tags, err := ReadAllTagsFromFlvFile(filename) + if err != nil { + return err + } + + return f.PumpWithTags(tags, onFlvTag) +} + +// @return error 暂时只做预留,目前只会返回nil +// +func (f *FlvFilePump) PumpWithTags(tags []Tag, onFlvTag OnPumpFlvTag) error { + var totalBaseTs uint32 // 整体的基础时间戳。每轮最后更新 + + var hasReadThisBaseTs bool + var thisBaseTs uint32 // 每一轮的第一个tag时间戳 + + var prevTagTs uint32 // 上一个tag的时间戳 + + var hasReadTotalFirstTag bool + var totalFirstTagTs uint32 // 第一轮的第一个tag的时间戳 + var totalFirstTagTick int64 // 第一轮的第一个tag的物理时间 + + const addTsBetweenRound = 1 + + // 循环一次,代表遍历文件一次 + for roundIndex := 0; ; roundIndex++ { + nazalog.Debugf("new round. index=%d", roundIndex) + + hasReadThisBaseTs = false + + // 遍历所有tag数据 + for _, tag := range tags { + // metadata只在第一轮发送一次 + if tag.IsMetadata() { + if totalBaseTs == 0 { + tag.Header.Timestamp = 0 + if !onFlvTag(tag) { + return nil + } + } + continue + } + + // 修改时间戳 + // 使得不同轮依然线性增长 + if !hasReadThisBaseTs { + // 本轮第一个tag + + thisBaseTs = tag.Header.Timestamp + hasReadThisBaseTs = true + + tag.Header.Timestamp = totalBaseTs + } else { + tag.Header.Timestamp = totalBaseTs + tag.Header.Timestamp - thisBaseTs + } + + // 修改时间戳 + // 如果时间戳比前一个tag的还小,可能发生了跳跃,我们直接设置为上一包的值+1,然后不sleep直接发送 + if tag.Header.Timestamp < prevTagTs { + tag.Header.Timestamp = prevTagTs + 1 + } + + if hasReadTotalFirstTag { + // 当前时间戳与第一轮的第一个tag的时间戳差值 + diffTs := tag.Header.Timestamp - totalFirstTagTs + + // 当前物理时间与第一轮的第一个tag的物理时间差值 + diffTick := time.Now().UnixNano()/1000000 - totalFirstTagTick + + // 如果还没到物理时间差值,就sleep + if diffTick < int64(diffTs) { + time.Sleep(time.Duration(int64(diffTs)-diffTick) * time.Millisecond) + } + } else { + // 第一轮的第一个tag,记录下来 + + totalFirstTagTick = time.Now().UnixNano() / 1000000 + totalFirstTagTs = tag.Header.Timestamp + hasReadTotalFirstTag = true + } + + if !onFlvTag(tag) { + return nil + } + + prevTagTs = tag.Header.Timestamp + } + + totalBaseTs = prevTagTs + addTsBetweenRound + + if !f.option.IsRecursive { + break + } + } + return nil +}