1. [feat] 新增app/demo/calcrtmpdelay,可用于测量rtmp服务器的转发延时 2. [refactor] 重构app/demo/pullrtmp2pushrtmp的对象管理逻辑

pull/49/head
q191201771 4 years ago
parent fd0f71f9af
commit 4a48518f5a

@ -0,0 +1,134 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"flag"
"fmt"
"os"
"sync"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazamd5"
)
const detailFilename = "delay.txt"
func main() {
tagKey2writeTime := make(map[string]time.Time)
var delays []int64
var mu sync.Mutex
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
})
filename, pushURL, pullURL := parseFlag()
tags, err := httpflv.ReadAllTagsFromFLVFile(filename)
nazalog.Assert(nil, err)
nazalog.Infof("read tags from flv file succ. len of tags=%d", len(tags))
pushSession := rtmp.NewPushSession()
err = pushSession.Push(pushURL)
nazalog.Assert(nil, err)
nazalog.Info("push succ.")
//defer pushSession.Dispose()
pullSession := rtmp.NewPullSession()
err = pullSession.Pull(pullURL, func(msg base.RTMPMsg) {
tagKey := nazamd5.MD5(msg.Payload)
mu.Lock()
t, exist := tagKey2writeTime[tagKey]
if !exist {
nazalog.Errorf("tag key not exist.")
} else {
delay := time.Now().Sub(t).Milliseconds()
delays = append(delays, delay)
delete(tagKey2writeTime, tagKey)
}
mu.Unlock()
})
nazalog.Assert(nil, err)
nazalog.Info("pull succ.")
//defer pullSession.Dispose()
go func() {
for {
time.Sleep(5 * time.Second)
pushSession.UpdateStat(1)
pullSession.UpdateStat(1)
nazalog.Debugf("stat bitrate. push=%+v, pull=%+v", pushSession.GetStat().Bitrate, pullSession.GetStat().Bitrate)
}
}()
prevTS := int64(-1)
for _, tag := range tags {
h := remux.FLVTagHeader2RTMPHeader(tag.Header)
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h)
if prevTS >= 0 && int64(h.TimestampAbs) > prevTS {
diff := int64(h.TimestampAbs) - prevTS
time.Sleep(time.Duration(diff) * time.Millisecond)
}
prevTS = int64(h.TimestampAbs)
mu.Lock()
tagKey := nazamd5.MD5(tag.Raw[11 : 11+h.MsgLen])
if _, exist := tagKey2writeTime[tagKey]; exist {
nazalog.Errorf("tag key already exist. key=%s", tagKey)
}
tagKey2writeTime[tagKey] = time.Now()
mu.Unlock()
err = pushSession.AsyncWrite(chunks)
nazalog.Assert(nil, err)
//nazalog.Debugf("sent. %d", i)
}
min := int64(2147483647)
max := int64(0)
avg := int64(0)
sum := int64(0)
fp, _ := os.Create(detailFilename)
defer fp.Close()
for _, d := range delays {
if d < min {
min = d
}
if d > max {
max = d
}
sum += d
_, _ = fp.WriteString(fmt.Sprintf("%d\n", d))
}
if len(delays) > 0 {
avg = sum / int64(len(delays))
}
nazalog.Debugf("len(tagKey2writeTime)=%d, delays(len=%d, avg=%d, min=%d, max=%d), detailFilename=%s", len(tagKey2writeTime), len(delays), avg, min, max, detailFilename)
}
func parseFlag() (filename, pushURL, pullURL string) {
f := flag.String("f", "", "specify flv file")
i := flag.String("i", "", "specify rtmp pull url")
o := flag.String("o", "", "specify rtmp push url")
flag.Parse()
if *f == "" || *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
%s -f test.flv -i rtmp://127.0.0.1:1935/live/test -o rtmp://127.0.0.1:1935/live/test
`, os.Args[0])
base.OSExitAndWaitPressIfWindows(1)
}
return *f, *i, *o
}

@ -15,6 +15,8 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/q191201771/lal/pkg/remux"
@ -39,13 +41,22 @@ import (
// -o string
// specify ouput flv file
// Example:
// ./bin/pullrtmp -i rtmp://127.0.0.1:19350/live/test -o out.flv
// ./bin/pullrtmp -i rtmp://127.0.0.1:19350/live/test -n 1000
// ./bin/pullrtmp -i rtmp://127.0.0.1:19350/live/test_{i} -n 1000
// ./bin/pullrtmp -i rtmp://127.0.0.1:1935/live/test -o out.flv
// ./bin/pullrtmp -i rtmp://127.0.0.1:1935/live/test -n 1000
// ./bin/pullrtmp -i rtmp://127.0.0.1:1935/live/test_{i} -n 1000
var aliveSessionCount int32
func main() {
urlTmpl, fileNameTmpl, num := parseFlag()
urls, filenames := connect(urlTmpl, fileNameTmpl, num)
urls, filenames := collect(urlTmpl, fileNameTmpl, num)
go func() {
for {
nazalog.Debugf("alive session:%d", atomic.LoadInt32(&aliveSessionCount))
time.Sleep(1 * time.Second)
}
}()
var wg sync.WaitGroup
wg.Add(num)
@ -53,9 +64,12 @@ func main() {
go func(index int) {
pull(urls[index], filenames[index])
wg.Done()
atomic.AddInt32(&aliveSessionCount, -1)
}(i)
}
wg.Wait()
time.Sleep(1 * time.Second)
nazalog.Info("bye.")
}
func pull(url string, filename string) {
@ -87,12 +101,17 @@ func pull(url string, filename string) {
nazalog.Assert(nil, err)
}
})
nazalog.Assert(nil, err)
if err != nil {
nazalog.Errorf("pull failed. err=%v", err)
return
}
atomic.AddInt32(&aliveSessionCount, 1)
err = <-session.Wait()
nazalog.Debug(err)
}
func connect(urlTmpl string, fileNameTmpl string, num int) (urls []string, filenames []string) {
func collect(urlTmpl string, fileNameTmpl string, num int) (urls []string, filenames []string) {
for i := 0; i < num; i++ {
url := strings.Replace(urlTmpl, "{i}", strconv.Itoa(i), -1)
urls = append(urls, url)
@ -105,14 +124,14 @@ func connect(urlTmpl string, fileNameTmpl string, num int) (urls []string, filen
func parseFlag() (urlTmpl string, fileNameTmpl string, num int) {
i := flag.String("i", "", "specify pull rtmp url")
o := flag.String("o", "", "specify ouput flv file")
n := flag.Int("n", 1, "num of pull connection")
n := flag.Int("n", 1, "specify num of pull connection")
flag.Parse()
if *i == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
%s -i rtmp://127.0.0.1:19350/live/test -o out.flv
%s -i rtmp://127.0.0.1:19350/live/test -n 1000
%s -i rtmp://127.0.0.1:19350/live/test_{i} -n 1000
%s -i rtmp://127.0.0.1:1935/live/test -o out.flv
%s -i rtmp://127.0.0.1:1935/live/test -n 1000
%s -i rtmp://127.0.0.1:1935/live/test_{i} -n 1000
`, os.Args[0], os.Args[0], os.Args[0])
base.OSExitAndWaitPressIfWindows(1)
}

@ -11,27 +11,32 @@ package main
import (
"encoding/hex"
"errors"
"fmt"
"time"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/naza/pkg/nazastring"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazastring"
"github.com/q191201771/naza/pkg/unique"
)
// 注意,当前的策略是,当推流有多个地址时,任意一个失败就会退出整个任务
var ErrClosedByCaller = errors.New("tunnel closed by caller")
type Tunnel struct {
uk string
inURL string
outURLList []string
pushECChan chan ErrorCode
closeChan chan struct{}
waitChan chan ErrorCode
rtmpMsgQ chan base.RTMPMsg
uk string
inURL string
outURLList []string
startTime time.Time
startECChan chan ErrorCode
pullECChan chan ErrorCode
pushECChan chan ErrorCode
closeChan chan ErrorCode
waitChan chan ErrorCode
rtmpMsgQ chan base.RTMPMsg
pullSession *rtmp.PullSession
pushSessionList []*rtmp.PushSession
@ -46,20 +51,34 @@ type ErrorCode struct {
// @param outURLList 推流rtmp url地址列表
//
func NewTunnel(inURL string, outURLList []string) *Tunnel {
var streamName string
ctx, err := base.ParseRTMPURL(inURL)
if err != nil {
nazalog.Errorf("parse rtmp url failed. url=%s", inURL)
streamName = "invalid"
} else {
streamName = ctx.LastItemOfPath
}
originUK := unique.GenUniqueKey("TUNNEL")
uk := fmt.Sprintf("%s-%s", originUK, streamName)
return &Tunnel{
uk: unique.GenUniqueKey("TUNNEL"),
inURL: inURL,
outURLList: outURLList,
pushECChan: make(chan ErrorCode, len(outURLList)),
closeChan: make(chan struct{}, 1),
waitChan: make(chan ErrorCode, len(outURLList)+1),
rtmpMsgQ: make(chan base.RTMPMsg, 1024),
uk: uk,
inURL: inURL,
outURLList: outURLList,
startTime: time.Now(),
startECChan: make(chan ErrorCode, len(outURLList)+1),
pullECChan: make(chan ErrorCode, 1),
pushECChan: make(chan ErrorCode, len(outURLList)),
closeChan: make(chan ErrorCode, 1),
waitChan: make(chan ErrorCode, len(outURLList)+1),
rtmpMsgQ: make(chan base.RTMPMsg, 1024),
}
}
// @return err 为nil时表示任务启动成功拉流和推流通道都已成功建立并开始转推数据
// 不为nil时表示任务失败可以通过`code`得到是拉流还是推流失败
func (t *Tunnel) Start() ErrorCode {
func (t *Tunnel) Start() (ret ErrorCode) {
const (
pullTimeoutMS = 10000
pushTimeoutMS = 10000
@ -68,6 +87,112 @@ func (t *Tunnel) Start() ErrorCode {
nazalog.Infof("[%s] new tunnel. inURL=%s, outURLList=%+v", t.uk, t.inURL, t.outURLList)
defer func() {
if ret.err != nil {
t.notifyStartEC(ret)
}
go func() {
nazalog.Debugf("[%s] > main event loop.", t.uk)
debugWriteCount := 0
maxDebugWriteCount := 5
ticker := time.NewTicker(statIntervalSec * time.Second)
defer ticker.Stop()
// 最后清理所有session
defer func() {
nazalog.Debugf("[%s] < main event loop. duration=%+v", t.uk, time.Now().Sub(t.startTime))
if t.pullSession != nil {
nazalog.Infof("[%s] dispose pull session. [%s]", t.uk, t.pullSession.UniqueKey())
t.pullSession.Dispose()
}
for _, s := range t.pushSessionList {
nazalog.Infof("[%s] dispose push session. [%s]", t.uk, s.UniqueKey())
s.Dispose()
}
}()
if t.pullSession != nil {
go func() {
nazalog.Debugf("[%s] > pull event loop. %s", t.uk, t.pullSession.UniqueKey())
for {
select {
case err := <-t.pullSession.Wait():
t.notifyPullEC(ErrorCode{-1, err})
nazalog.Debugf("[%s] < pull event loop. %s", t.uk, t.pullSession.UniqueKey())
return
}
}
}()
}
// 将多个pushSession wait事件聚合在一起
for i, pushSession := range t.pushSessionList {
go func(ii int, s *rtmp.PushSession) {
nazalog.Debugf("[%s] > push event loop. %s", t.uk, s.UniqueKey())
for {
select {
case err := <-s.Wait():
nazalog.Errorf("[%s] push wait error. [%s] err=%+v", t.uk, s.UniqueKey(), err)
t.notifyPushEC(ErrorCode{ii, err})
nazalog.Debugf("[%s] < push event loop. %s", t.uk, s.UniqueKey())
return
}
}
}(i, pushSession)
}
// 主事件监听
for {
select {
case ec := <-t.startECChan:
nazalog.Errorf("[%s] exit main event loop, <- startECChan. err=%s", t.uk, ec.Stringify())
t.notifyWait(ec)
return
case ec := <-t.pullECChan:
nazalog.Errorf("[%s] exit main event loop, <- pullECChan. err=%s", t.uk, ec.Stringify())
t.notifyWait(ec)
return
case ec := <-t.pushECChan:
nazalog.Errorf("[%s] exit main event loop, <- pushECChan. err=%s", t.uk, ec.Stringify())
t.notifyWait(ec)
return
case ec := <-t.closeChan:
nazalog.Errorf("[%s] exit main event loop, <- closeChan.", t.uk)
t.notifyWait(ec)
return
case m := <-t.rtmpMsgQ:
currHeader := remux.MakeDefaultRTMPHeader(m.Header)
chunks := rtmp.Message2Chunks(m.Payload, &currHeader)
if debugWriteCount < maxDebugWriteCount {
nazalog.Infof("[%s] write. header=%+v, %+v, %s", t.uk, m.Header, currHeader, hex.Dump(nazastring.SubSliceSafety(m.Payload, 32)))
debugWriteCount++
}
for i, pushSession := range t.pushSessionList {
err := pushSession.AsyncWrite(chunks)
if err != nil {
nazalog.Errorf("[%s] exit main event loop, write error. err=%+v", t.uk, err)
t.notifyWait(ErrorCode{i, err})
return
}
}
case <-ticker.C:
t.pullSession.UpdateStat(statIntervalSec)
nazalog.Debugf("[%s] tick pull session stat. [%s] streamName=%s, stat=%+v",
t.uk, t.pullSession.UniqueKey(), t.pullSession.StreamName(), t.pullSession.GetStat())
for _, s := range t.pushSessionList {
s.UpdateStat(statIntervalSec)
nazalog.Debugf("[%s] tick push session stat. [%s] streamName=%s, stat=%+v",
t.uk, s.UniqueKey(), s.StreamName(), s.GetStat())
}
}
}
}()
}()
// 逐个开启push session
for i, outURL := range t.outURLList {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMS = pushTimeoutMS
@ -75,24 +200,16 @@ func (t *Tunnel) Start() ErrorCode {
nazalog.Infof("[%s] start push. [%s] url=%s", t.uk, pushSession.UniqueKey(), outURL)
err := pushSession.Push(outURL)
// 只有有一个失败就直接退出
if err != nil {
nazalog.Errorf("[%s] push error. [%s] err=%+v", t.uk, pushSession.UniqueKey(), err)
return ErrorCode{i, err}
ret = ErrorCode{i, err}
return
}
nazalog.Infof("[%s] push succ. [%s]", t.uk, pushSession.UniqueKey())
// 加入的都是成功的
t.pushSessionList = append(t.pushSessionList, pushSession)
go func(ii int, u string, s *rtmp.PushSession) {
for {
select {
case err := <-s.Wait():
nazalog.Errorf("[%s] push wait error. [%s] err=%+v", t.uk, s.UniqueKey(), err)
t.pushECChan <- ErrorCode{ii, err}
return
}
}
}(i, outURL, pushSession)
}
t.pullSession = rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
@ -104,87 +221,77 @@ func (t *Tunnel) Start() ErrorCode {
m := msg.Clone()
t.rtmpMsgQ <- m
})
// pull失败就直接退出
if err != nil {
nazalog.Errorf("[%s] pull error. [%s] err=%+v", t.uk, t.pullSession.UniqueKey(), err)
return ErrorCode{-1, err}
t.pullSession = nil
ret = ErrorCode{-1, err}
return
}
nazalog.Infof("[%s] pull succ. [%s]", t.uk, t.pullSession.UniqueKey())
go func() {
debugWriteCount := 0
maxDebugWriteCount := 5
ticker := time.NewTicker(statIntervalSec * time.Second)
defer ticker.Stop()
defer func() {
if t.pullSession != nil {
nazalog.Infof("[%s] dispose pull session. [%s]", t.uk, t.pullSession.UniqueKey())
t.pullSession.Dispose()
}
for _, s := range t.pushSessionList {
nazalog.Infof("[%s] dispose push session. [%s]", t.uk, s.UniqueKey())
s.Dispose()
}
}()
for {
select {
case err := <-t.pullSession.Wait():
nazalog.Errorf("[%s] <- pull wait. [%s] err=%+v", t.uk, t.pullSession.UniqueKey(), err)
t.waitChan <- ErrorCode{-1, err}
return
case ec := <-t.pushECChan:
nazalog.Errorf("[%s] <- pushECChan. err=%+v", t.uk, ec)
t.waitChan <- ec
return
case <-t.closeChan:
nazalog.Errorf("[%s] <- closeChan.", t.uk)
t.waitChan <- ErrorCode{-1, ErrClosedByCaller}
return
case m := <-t.rtmpMsgQ:
currHeader := remux.MakeDefaultRTMPHeader(m.Header)
chunks := rtmp.Message2Chunks(m.Payload, &currHeader)
if debugWriteCount < maxDebugWriteCount {
nazalog.Infof("[%s] write. header=%+v, %+v, %s", t.uk, m.Header, currHeader, hex.Dump(nazastring.SubSliceSafety(m.Payload, 32)))
debugWriteCount++
}
for i, pushSession := range t.pushSessionList {
err := pushSession.AsyncWrite(chunks)
if err != nil {
nazalog.Errorf("[%s] write error. err=%+v", t.uk, err)
t.waitChan <- ErrorCode{i, err}
}
}
case <-ticker.C:
t.pullSession.UpdateStat(statIntervalSec)
nazalog.Debugf("[%s] tick pull session stat. [%s] streamName=%s, stat=%+v",
t.uk, t.pullSession.UniqueKey(), t.pullSession.StreamName(), t.pullSession.GetStat())
for _, s := range t.pushSessionList {
s.UpdateStat(statIntervalSec)
nazalog.Debugf("[%s] tick push session stat. [%s] streamName=%s, stat=%+v",
t.uk, s.UniqueKey(), s.StreamName(), s.GetStat())
}
}
}
}()
return ErrorCode{0, nil}
ret = ErrorCode{0, nil}
return
}
// `Start`函数调用成功后,可调用`Wait`函数,等待任务结束
// `Start`函数调用失败后,请不要调用`Wait`函数,否则行为未定义
//
func (t *Tunnel) Wait() chan ErrorCode {
return t.waitChan
}
// `Start`函数调用成功后,可调用`Close`函数,主动关闭转推任务
// `Start`函数调用失败后,请不要调用`Close`函数,否则行为未定义
// `Close`函数允许调用多次
//
func (t *Tunnel) Close() {
t.closeChan <- struct{}{}
t.notifyClose()
}
func (t *Tunnel) notifyClose() {
select {
case t.closeChan <- ErrorCode{-1, ErrClosedByCaller}:
nazalog.Debugf("[%s] notifyClose.", t.uk)
default:
nazalog.Debugf("[%s] notifyClose fail, ignore.", t.uk)
}
}
func (t *Tunnel) notifyWait(ec ErrorCode) {
select {
case t.waitChan <- ec:
nazalog.Debugf("[%s] notifyWait. ec=%s", t.uk, ec.Stringify())
default:
nazalog.Warnf("[%s] CHEFNOTICEME notifyWait fail, ignore. ec=%s", t.uk, ec.Stringify())
}
}
func (t *Tunnel) notifyStartEC(ec ErrorCode) {
select {
case t.startECChan <- ec:
nazalog.Debugf("[%s] notifyStartEC. ec=%s", t.uk, ec.Stringify())
default:
nazalog.Warnf("[%s] CHEFNOTICEME notifyStartEC fail, ignore. ec=%s", t.uk, ec.Stringify())
}
}
func (t *Tunnel) notifyPushEC(ec ErrorCode) {
select {
case t.pushECChan <- ec:
nazalog.Debugf("[%s] notifyPushEC. ec=%s", t.uk, ec.Stringify())
default:
nazalog.Warnf("[%s] CHEFNOTICEME notifyPushEC fail, ignore. ec=%s", t.uk, ec.Stringify())
}
}
func (t *Tunnel) notifyPullEC(ec ErrorCode) {
select {
case t.pullECChan <- ec:
nazalog.Debugf("[%s] notifyPullEC. ec=%s", t.uk, ec.Stringify())
default:
nazalog.Warnf("[%s] CHEFNOTICEME notifyPullEC fail, ignore. ec=%s", t.uk, ec.Stringify())
}
}
func (ec *ErrorCode) Stringify() string {
return fmt.Sprintf("(%d, %+v)", ec.code, ec.err)
}

@ -11,7 +11,6 @@ package main
import (
"flag"
"fmt"
"io"
"os"
"strconv"
"strings"
@ -68,7 +67,12 @@ func main() {
urls := collect(urlTmpl, num)
tags := readAllTag(filename)
tags, err := httpflv.ReadAllTagsFromFLVFile(filename)
if err != nil {
nazalog.Errorf("read tags from flv file failed. err=%+v", err)
os.Exit(0)
}
nazalog.Infof("read all tag done. tag num=%d", len(tags))
go func() {
for {
@ -91,41 +95,6 @@ func main() {
nazalog.Info("bye.")
}
// readAllTag 预读取 flv 文件中的所有 tag缓存在内存中
func readAllTag(filename string) (ret []httpflv.Tag) {
var ffr httpflv.FLVFileReader
err := ffr.Open(filename)
if err != nil {
nazalog.Errorf("open file failed. file=%s, err=%v", filename, err)
os.Exit(1)
}
nazalog.Infof("open succ. filename=%s", filename)
for {
tag, err := ffr.ReadTag()
if err == io.EOF {
nazalog.Info("EOF")
break
}
if err != nil {
nazalog.Errorf("read file tag error. tag num=%d, err=%v", len(ret), err)
break
}
if tag.IsMetadata() {
nazalog.Debugf("M %d", tag.Header.Timestamp)
} else if tag.IsVideoKeySeqHeader() {
nazalog.Debugf("V SH %d", tag.Header.Timestamp)
} else if tag.IsVideoKeyNALU() {
nazalog.Debugf("V K %d", tag.Header.Timestamp)
} else if tag.IsAACSeqHeader() {
nazalog.Debugf("A SH %d", tag.Header.Timestamp)
}
ret = append(ret, tag)
}
nazalog.Infof("read all tag done. tag num=%d", len(ret))
return
}
func push(tags []httpflv.Tag, urls []string, isRecursive bool) {
var sessionList []*rtmp.PushSession
@ -282,9 +251,9 @@ func parseFlag() (filename string, urlTmpl string, num int, isRecursive bool, lo
if *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
%s -i test.flv -o rtmp://127.0.0.1:19350/live/test
%s -i test.flv -o rtmp://127.0.0.1:19350/live/test -r
%s -i test.flv -o rtmp://127.0.0.1:19350/live/test_{i} -r -n 1000
%s -i test.flv -o rtmp://127.0.0.1:1935/live/test
%s -i test.flv -o rtmp://127.0.0.1:1935/live/test -r
%s -i test.flv -o rtmp://127.0.0.1:1935/live/test_{i} -r -n 1000
`, os.Args[0], os.Args[0], os.Args[0])
base.OSExitAndWaitPressIfWindows(1)
}

@ -0,0 +1,69 @@
{
"rtmp": {
"enable": true,
"addr": ":1935",
"gop_num": 0
},
"httpflv": {
"enable": false,
"sub_listen_addr": ":8080",
"enable_https": false,
"https_addr": ":4433",
"https_cert_file": "./conf/cert.pem",
"https_key_file": "./conf/key.pem",
"gop_num": 0
},
"hls": {
"enable": false,
"sub_listen_addr": ":8081",
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6,
"cleanup_flag": true
},
"httpts": {
"enable": false,
"sub_listen_addr": ":8082"
},
"rtsp": {
"enable": false,
"addr": ":5544"
},
"relay_push": {
"enable": false,
"addr_list":[
]
},
"relay_pull": {
"enable": false,
"addr": ""
},
"http_api": {
"enable": false,
"addr": ":8083"
},
"server_id": "1",
"http_notify": {
"enable": false,
"update_interval_sec": 5,
"on_server_start": "http://127.0.0.1:10101/on_server_start",
"on_update": "http://127.0.0.1:10101/on_update",
"on_pub_start": "http://127.0.0.1:10101/on_pub_start",
"on_pub_stop": "http://127.0.0.1:10101/on_pub_stop",
"on_sub_start": "http://127.0.0.1:10101/on_sub_start",
"on_sub_stop": "http://127.0.0.1:10101/on_sub_stop",
"on_rtmp_connect": "http://127.0.0.1:10101/on_rtmp_connect"
},
"pprof": {
"enable": true,
"addr": ":8084"
},
"log": {
"level": 1,
"filename": "./logs/lalserver.log",
"is_to_stdout": true,
"is_rotate_daily": true,
"short_file_flag": true,
"assert_behavior": 1
}
}

@ -0,0 +1,34 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpflv
import "io"
func ReadAllTagsFromFLVFile(filename string) ([]Tag, error) {
var tags []Tag
var ffr FLVFileReader
err := ffr.Open(filename)
if err != nil {
return nil, err
}
for {
tag, err := ffr.ReadTag()
if err != nil {
if err == io.EOF {
return tags, nil
} else {
return tags, err
}
}
tags = append(tags, tag)
}
// never reach here
}
Loading…
Cancel
Save