You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lal/demo/flvfile2rtmppush/flvfile2rtmppush.go

220 lines
5.6 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"flag"
"fmt"
"io"
"os"
"strconv"
"strings"
"time"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/bininfo"
log "github.com/q191201771/naza/pkg/nazalog"
)
// RTMP推流客户端读取本地FLV文件使用RTMP协议推送出去
//
// 支持循环推送文件推送完毕后可循环推送RTMP push 流并不断开)
// 支持推送多路流相当于一个RTMP推流压测工具
//
// Usage of ./bin/flvfile2rtmppush:
// -i string
// specify flv file
// -n int
// num of push connection (default 1)
// -o string
// specify rtmp push url
// -r recursive push if reach end of file
// -v show bin info
// Example:
// ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test
// ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test -r
// ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test_{i} -r -n 1000
func main() {
log.Info(bininfo.StringifySingleLine())
filename, urlTmpl, num, isRecursive := parseFlag()
urls := collect(urlTmpl, num)
tags := readAllTag(filename)
log.Debug(urls, num)
push(tags, urls, isRecursive)
log.Info("bye.")
}
// readAllTag 预读取 flv 文件中的所有 tag缓存在内存中
func readAllTag(filename string) (ret []httpflv.Tag) {
var ffr httpflv.FLVFileReader
err := ffr.Open(filename)
log.Assert(nil, err)
log.Infof("open succ. filename=%s", filename)
for {
tag, err := ffr.ReadTag()
if err == io.EOF {
log.Info("EOF")
break
}
if err != nil {
log.Info(err)
break
}
if tag.IsMetadata() {
log.Debugf("M %d", tag.Header.Timestamp)
} else if tag.IsVideoKeySeqHeader() {
log.Debugf("V SH %d", tag.Header.Timestamp)
} else if tag.IsVideoKeyNalu() {
log.Debugf("V K %d", tag.Header.Timestamp)
} else if tag.IsAACSeqHeader() {
log.Debugf("A SH %d", tag.Header.Timestamp)
}
ret = append(ret, tag)
}
log.Infof("read all tag done. num=%d", len(ret))
return
}
func push(tags []httpflv.Tag, urls []string, isRecursive bool) {
if len(tags) == 0 || len(urls) == 0 {
return
}
var err error
var psList []*rtmp.PushSession
for i := range urls {
ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.ConnectTimeoutMS = 3000
option.PushTimeoutMS = 5000
option.WriteAVTimeoutMS = 10000
})
err = ps.Push(urls[i])
log.Assert(nil, err)
log.Infof("push succ. url=%s", urls[i])
psList = append(psList, ps)
}
var totalBaseTS uint32
var prevTS uint32
var hasReadThisBaseTS bool
var thisBaseTS uint32
var hasTraceFirstTagTS bool
var firstTagTS uint32
var firstTagTick int64
for i := 0; ; i++ {
log.Infof(" > round. i=%d, totalBaseTS=%d, prevTS=%d, thisBaseTS=%d",
i, totalBaseTS, prevTS, thisBaseTS)
hasReadThisBaseTS = false
for _, tag := range tags {
h := logic.Trans.FLVTagHeader2RTMPHeader(tag.Header)
if tag.IsMetadata() {
if totalBaseTS == 0 {
// 第一个metadata直接发送
h.TimestampAbs = 0
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h)
for _, ps := range psList {
err = ps.AsyncWrite(chunks)
log.Assert(nil, err)
}
} else {
// noop
}
continue
}
if hasReadThisBaseTS {
// 之前已经读到了这轮读文件的base值ts要减去base
h.TimestampAbs = tag.Header.Timestamp - thisBaseTS + totalBaseTS
} else {
// 设置basets设置为上一轮读文件的值
thisBaseTS = tag.Header.Timestamp
h.TimestampAbs = totalBaseTS
hasReadThisBaseTS = true
}
if h.TimestampAbs < prevTS {
// ts比上一个包的还小直接设置为上一包的值并且不sleep直接发送
h.TimestampAbs = prevTS
}
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h)
if hasTraceFirstTagTS {
n := time.Now().UnixNano() / 1000000
diffTick := n - firstTagTick
diffTS := h.TimestampAbs - firstTagTS
if diffTick < int64(diffTS) {
time.Sleep(time.Duration(int64(diffTS)-diffTick) * time.Millisecond)
}
} else {
firstTagTick = time.Now().UnixNano() / 1000000
firstTagTS = h.TimestampAbs
hasTraceFirstTagTS = true
}
for _, ps := range psList {
err = ps.AsyncWrite(chunks)
log.Assert(nil, err)
}
prevTS = h.TimestampAbs
}
totalBaseTS = prevTS + 1
if !isRecursive {
break
}
}
}
func collect(urlTmpl string, num int) (urls []string) {
for i := 0; i < num; i++ {
url := strings.Replace(urlTmpl, "{i}", strconv.Itoa(i), -1)
urls = append(urls, url)
}
return
}
func parseFlag() (filename string, urlTmpl string, num int, isRecursive bool) {
v := flag.Bool("v", false, "show bin info")
i := flag.String("i", "", "specify flv file")
o := flag.String("o", "", "specify rtmp push url")
r := flag.Bool("r", false, "recursive push if reach end of file")
n := flag.Int("n", 1, "num of push connection")
flag.Parse()
if *v {
_, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine())
os.Exit(1)
}
if *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test
./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test -r
./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test_{i} -r -n 1000
`)
os.Exit(1)
}
return *i, *o, *n, *r
}