diff --git a/app/demo/calcrtmpdelay/calcrtmpdelay.go b/app/demo/calcrtmpdelay/calcrtmpdelay.go new file mode 100644 index 0000000..87a40a7 --- /dev/null +++ b/app/demo/calcrtmpdelay/calcrtmpdelay.go @@ -0,0 +1,134 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package main + +import ( + "flag" + "fmt" + "os" + "sync" + "time" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/remux" + "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazamd5" +) + +const detailFilename = "delay.txt" + +func main() { + tagKey2writeTime := make(map[string]time.Time) + var delays []int64 + var mu sync.Mutex + + _ = nazalog.Init(func(option *nazalog.Option) { + option.AssertBehavior = nazalog.AssertFatal + }) + filename, pushURL, pullURL := parseFlag() + + tags, err := httpflv.ReadAllTagsFromFLVFile(filename) + nazalog.Assert(nil, err) + nazalog.Infof("read tags from flv file succ. len of tags=%d", len(tags)) + + pushSession := rtmp.NewPushSession() + err = pushSession.Push(pushURL) + nazalog.Assert(nil, err) + nazalog.Info("push succ.") + //defer pushSession.Dispose() + + pullSession := rtmp.NewPullSession() + err = pullSession.Pull(pullURL, func(msg base.RTMPMsg) { + tagKey := nazamd5.MD5(msg.Payload) + mu.Lock() + t, exist := tagKey2writeTime[tagKey] + if !exist { + nazalog.Errorf("tag key not exist.") + } else { + delay := time.Now().Sub(t).Milliseconds() + delays = append(delays, delay) + delete(tagKey2writeTime, tagKey) + } + mu.Unlock() + }) + nazalog.Assert(nil, err) + nazalog.Info("pull succ.") + //defer pullSession.Dispose() + + go func() { + for { + time.Sleep(5 * time.Second) + pushSession.UpdateStat(1) + pullSession.UpdateStat(1) + nazalog.Debugf("stat bitrate. push=%+v, pull=%+v", pushSession.GetStat().Bitrate, pullSession.GetStat().Bitrate) + } + }() + + prevTS := int64(-1) + for _, tag := range tags { + h := remux.FLVTagHeader2RTMPHeader(tag.Header) + chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h) + + if prevTS >= 0 && int64(h.TimestampAbs) > prevTS { + diff := int64(h.TimestampAbs) - prevTS + time.Sleep(time.Duration(diff) * time.Millisecond) + } + prevTS = int64(h.TimestampAbs) + + mu.Lock() + tagKey := nazamd5.MD5(tag.Raw[11 : 11+h.MsgLen]) + if _, exist := tagKey2writeTime[tagKey]; exist { + nazalog.Errorf("tag key already exist. key=%s", tagKey) + } + tagKey2writeTime[tagKey] = time.Now() + mu.Unlock() + + err = pushSession.AsyncWrite(chunks) + nazalog.Assert(nil, err) + //nazalog.Debugf("sent. %d", i) + } + + min := int64(2147483647) + max := int64(0) + avg := int64(0) + sum := int64(0) + fp, _ := os.Create(detailFilename) + defer fp.Close() + for _, d := range delays { + if d < min { + min = d + } + if d > max { + max = d + } + sum += d + _, _ = fp.WriteString(fmt.Sprintf("%d\n", d)) + } + if len(delays) > 0 { + avg = sum / int64(len(delays)) + } + nazalog.Debugf("len(tagKey2writeTime)=%d, delays(len=%d, avg=%d, min=%d, max=%d), detailFilename=%s", len(tagKey2writeTime), len(delays), avg, min, max, detailFilename) +} + +func parseFlag() (filename, pushURL, pullURL string) { + f := flag.String("f", "", "specify flv file") + i := flag.String("i", "", "specify rtmp pull url") + o := flag.String("o", "", "specify rtmp push url") + flag.Parse() + if *f == "" || *i == "" || *o == "" { + flag.Usage() + _, _ = fmt.Fprintf(os.Stderr, `Example: + %s -f test.flv -i rtmp://127.0.0.1:1935/live/test -o rtmp://127.0.0.1:1935/live/test +`, os.Args[0]) + base.OSExitAndWaitPressIfWindows(1) + } + return *f, *i, *o +} diff --git a/app/demo/analysehls/analysehls.go b/app/demo/learnhls/learnhls.go similarity index 100% rename from app/demo/analysehls/analysehls.go rename to app/demo/learnhls/learnhls.go diff --git a/app/demo/pullrtmp/pullrtmp.go b/app/demo/pullrtmp/pullrtmp.go index 645204a..e966b86 100644 --- a/app/demo/pullrtmp/pullrtmp.go +++ b/app/demo/pullrtmp/pullrtmp.go @@ -15,6 +15,8 @@ import ( "strconv" "strings" "sync" + "sync/atomic" + "time" "github.com/q191201771/lal/pkg/remux" @@ -39,13 +41,22 @@ import ( // -o string // specify ouput flv file // Example: -// ./bin/pullrtmp -i rtmp://127.0.0.1:19350/live/test -o out.flv -// ./bin/pullrtmp -i rtmp://127.0.0.1:19350/live/test -n 1000 -// ./bin/pullrtmp -i rtmp://127.0.0.1:19350/live/test_{i} -n 1000 +// ./bin/pullrtmp -i rtmp://127.0.0.1:1935/live/test -o out.flv +// ./bin/pullrtmp -i rtmp://127.0.0.1:1935/live/test -n 1000 +// ./bin/pullrtmp -i rtmp://127.0.0.1:1935/live/test_{i} -n 1000 + +var aliveSessionCount int32 func main() { urlTmpl, fileNameTmpl, num := parseFlag() - urls, filenames := connect(urlTmpl, fileNameTmpl, num) + urls, filenames := collect(urlTmpl, fileNameTmpl, num) + + go func() { + for { + nazalog.Debugf("alive session:%d", atomic.LoadInt32(&aliveSessionCount)) + time.Sleep(1 * time.Second) + } + }() var wg sync.WaitGroup wg.Add(num) @@ -53,9 +64,12 @@ func main() { go func(index int) { pull(urls[index], filenames[index]) wg.Done() + atomic.AddInt32(&aliveSessionCount, -1) }(i) } wg.Wait() + time.Sleep(1 * time.Second) + nazalog.Info("bye.") } func pull(url string, filename string) { @@ -87,12 +101,17 @@ func pull(url string, filename string) { nazalog.Assert(nil, err) } }) - nazalog.Assert(nil, err) + if err != nil { + nazalog.Errorf("pull failed. err=%v", err) + return + } + atomic.AddInt32(&aliveSessionCount, 1) + err = <-session.Wait() nazalog.Debug(err) } -func connect(urlTmpl string, fileNameTmpl string, num int) (urls []string, filenames []string) { +func collect(urlTmpl string, fileNameTmpl string, num int) (urls []string, filenames []string) { for i := 0; i < num; i++ { url := strings.Replace(urlTmpl, "{i}", strconv.Itoa(i), -1) urls = append(urls, url) @@ -105,14 +124,14 @@ func connect(urlTmpl string, fileNameTmpl string, num int) (urls []string, filen func parseFlag() (urlTmpl string, fileNameTmpl string, num int) { i := flag.String("i", "", "specify pull rtmp url") o := flag.String("o", "", "specify ouput flv file") - n := flag.Int("n", 1, "num of pull connection") + n := flag.Int("n", 1, "specify num of pull connection") flag.Parse() if *i == "" { flag.Usage() _, _ = fmt.Fprintf(os.Stderr, `Example: - %s -i rtmp://127.0.0.1:19350/live/test -o out.flv - %s -i rtmp://127.0.0.1:19350/live/test -n 1000 - %s -i rtmp://127.0.0.1:19350/live/test_{i} -n 1000 + %s -i rtmp://127.0.0.1:1935/live/test -o out.flv + %s -i rtmp://127.0.0.1:1935/live/test -n 1000 + %s -i rtmp://127.0.0.1:1935/live/test_{i} -n 1000 `, os.Args[0], os.Args[0], os.Args[0]) base.OSExitAndWaitPressIfWindows(1) } diff --git a/app/demo/pullrtmp2pushrtmp/tunnel.go b/app/demo/pullrtmp2pushrtmp/tunnel.go index ace7c79..d9a6665 100644 --- a/app/demo/pullrtmp2pushrtmp/tunnel.go +++ b/app/demo/pullrtmp2pushrtmp/tunnel.go @@ -11,27 +11,32 @@ package main import ( "encoding/hex" "errors" + "fmt" "time" - "github.com/q191201771/lal/pkg/remux" - "github.com/q191201771/naza/pkg/nazastring" - "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/remux" "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazastring" "github.com/q191201771/naza/pkg/unique" ) +// 注意,当前的策略是,当推流有多个地址时,任意一个失败就会退出整个任务 + var ErrClosedByCaller = errors.New("tunnel closed by caller") type Tunnel struct { - uk string - inURL string - outURLList []string - pushECChan chan ErrorCode - closeChan chan struct{} - waitChan chan ErrorCode - rtmpMsgQ chan base.RTMPMsg + uk string + inURL string + outURLList []string + startTime time.Time + startECChan chan ErrorCode + pullECChan chan ErrorCode + pushECChan chan ErrorCode + closeChan chan ErrorCode + waitChan chan ErrorCode + rtmpMsgQ chan base.RTMPMsg pullSession *rtmp.PullSession pushSessionList []*rtmp.PushSession @@ -46,20 +51,34 @@ type ErrorCode struct { // @param outURLList 推流rtmp url地址列表 // func NewTunnel(inURL string, outURLList []string) *Tunnel { + var streamName string + ctx, err := base.ParseRTMPURL(inURL) + if err != nil { + nazalog.Errorf("parse rtmp url failed. url=%s", inURL) + streamName = "invalid" + } else { + streamName = ctx.LastItemOfPath + } + originUK := unique.GenUniqueKey("TUNNEL") + uk := fmt.Sprintf("%s-%s", originUK, streamName) + return &Tunnel{ - uk: unique.GenUniqueKey("TUNNEL"), - inURL: inURL, - outURLList: outURLList, - pushECChan: make(chan ErrorCode, len(outURLList)), - closeChan: make(chan struct{}, 1), - waitChan: make(chan ErrorCode, len(outURLList)+1), - rtmpMsgQ: make(chan base.RTMPMsg, 1024), + uk: uk, + inURL: inURL, + outURLList: outURLList, + startTime: time.Now(), + startECChan: make(chan ErrorCode, len(outURLList)+1), + pullECChan: make(chan ErrorCode, 1), + pushECChan: make(chan ErrorCode, len(outURLList)), + closeChan: make(chan ErrorCode, 1), + waitChan: make(chan ErrorCode, len(outURLList)+1), + rtmpMsgQ: make(chan base.RTMPMsg, 1024), } } // @return err 为nil时,表示任务启动成功,拉流和推流通道都已成功建立,并开始转推数据 // 不为nil时,表示任务失败,可以通过`code`得到是拉流还是推流失败 -func (t *Tunnel) Start() ErrorCode { +func (t *Tunnel) Start() (ret ErrorCode) { const ( pullTimeoutMS = 10000 pushTimeoutMS = 10000 @@ -68,6 +87,112 @@ func (t *Tunnel) Start() ErrorCode { nazalog.Infof("[%s] new tunnel. inURL=%s, outURLList=%+v", t.uk, t.inURL, t.outURLList) + defer func() { + if ret.err != nil { + t.notifyStartEC(ret) + } + + go func() { + nazalog.Debugf("[%s] > main event loop.", t.uk) + debugWriteCount := 0 + maxDebugWriteCount := 5 + ticker := time.NewTicker(statIntervalSec * time.Second) + defer ticker.Stop() + + // 最后清理所有session + defer func() { + nazalog.Debugf("[%s] < main event loop. duration=%+v", t.uk, time.Now().Sub(t.startTime)) + if t.pullSession != nil { + nazalog.Infof("[%s] dispose pull session. [%s]", t.uk, t.pullSession.UniqueKey()) + t.pullSession.Dispose() + } + + for _, s := range t.pushSessionList { + nazalog.Infof("[%s] dispose push session. [%s]", t.uk, s.UniqueKey()) + s.Dispose() + } + }() + + if t.pullSession != nil { + go func() { + nazalog.Debugf("[%s] > pull event loop. %s", t.uk, t.pullSession.UniqueKey()) + for { + select { + case err := <-t.pullSession.Wait(): + t.notifyPullEC(ErrorCode{-1, err}) + nazalog.Debugf("[%s] < pull event loop. %s", t.uk, t.pullSession.UniqueKey()) + return + } + } + }() + } + + // 将多个pushSession wait事件聚合在一起 + for i, pushSession := range t.pushSessionList { + go func(ii int, s *rtmp.PushSession) { + nazalog.Debugf("[%s] > push event loop. %s", t.uk, s.UniqueKey()) + for { + select { + case err := <-s.Wait(): + nazalog.Errorf("[%s] push wait error. [%s] err=%+v", t.uk, s.UniqueKey(), err) + t.notifyPushEC(ErrorCode{ii, err}) + nazalog.Debugf("[%s] < push event loop. %s", t.uk, s.UniqueKey()) + return + } + } + }(i, pushSession) + } + + // 主事件监听 + for { + select { + case ec := <-t.startECChan: + nazalog.Errorf("[%s] exit main event loop, <- startECChan. err=%s", t.uk, ec.Stringify()) + t.notifyWait(ec) + return + case ec := <-t.pullECChan: + nazalog.Errorf("[%s] exit main event loop, <- pullECChan. err=%s", t.uk, ec.Stringify()) + t.notifyWait(ec) + return + case ec := <-t.pushECChan: + nazalog.Errorf("[%s] exit main event loop, <- pushECChan. err=%s", t.uk, ec.Stringify()) + t.notifyWait(ec) + return + case ec := <-t.closeChan: + nazalog.Errorf("[%s] exit main event loop, <- closeChan.", t.uk) + t.notifyWait(ec) + return + case m := <-t.rtmpMsgQ: + currHeader := remux.MakeDefaultRTMPHeader(m.Header) + chunks := rtmp.Message2Chunks(m.Payload, &currHeader) + if debugWriteCount < maxDebugWriteCount { + nazalog.Infof("[%s] write. header=%+v, %+v, %s", t.uk, m.Header, currHeader, hex.Dump(nazastring.SubSliceSafety(m.Payload, 32))) + debugWriteCount++ + } + + for i, pushSession := range t.pushSessionList { + err := pushSession.AsyncWrite(chunks) + if err != nil { + nazalog.Errorf("[%s] exit main event loop, write error. err=%+v", t.uk, err) + t.notifyWait(ErrorCode{i, err}) + return + } + } + case <-ticker.C: + t.pullSession.UpdateStat(statIntervalSec) + nazalog.Debugf("[%s] tick pull session stat. [%s] streamName=%s, stat=%+v", + t.uk, t.pullSession.UniqueKey(), t.pullSession.StreamName(), t.pullSession.GetStat()) + for _, s := range t.pushSessionList { + s.UpdateStat(statIntervalSec) + nazalog.Debugf("[%s] tick push session stat. [%s] streamName=%s, stat=%+v", + t.uk, s.UniqueKey(), s.StreamName(), s.GetStat()) + } + } + } + }() + }() + + // 逐个开启push session for i, outURL := range t.outURLList { pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { option.PushTimeoutMS = pushTimeoutMS @@ -75,24 +200,16 @@ func (t *Tunnel) Start() ErrorCode { nazalog.Infof("[%s] start push. [%s] url=%s", t.uk, pushSession.UniqueKey(), outURL) err := pushSession.Push(outURL) + // 只有有一个失败就直接退出 if err != nil { nazalog.Errorf("[%s] push error. [%s] err=%+v", t.uk, pushSession.UniqueKey(), err) - return ErrorCode{i, err} + ret = ErrorCode{i, err} + return } nazalog.Infof("[%s] push succ. [%s]", t.uk, pushSession.UniqueKey()) + // 加入的都是成功的 t.pushSessionList = append(t.pushSessionList, pushSession) - - go func(ii int, u string, s *rtmp.PushSession) { - for { - select { - case err := <-s.Wait(): - nazalog.Errorf("[%s] push wait error. [%s] err=%+v", t.uk, s.UniqueKey(), err) - t.pushECChan <- ErrorCode{ii, err} - return - } - } - }(i, outURL, pushSession) } t.pullSession = rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { @@ -104,87 +221,77 @@ func (t *Tunnel) Start() ErrorCode { m := msg.Clone() t.rtmpMsgQ <- m }) + // pull失败就直接退出 if err != nil { nazalog.Errorf("[%s] pull error. [%s] err=%+v", t.uk, t.pullSession.UniqueKey(), err) - return ErrorCode{-1, err} + t.pullSession = nil + ret = ErrorCode{-1, err} + return } nazalog.Infof("[%s] pull succ. [%s]", t.uk, t.pullSession.UniqueKey()) - go func() { - debugWriteCount := 0 - maxDebugWriteCount := 5 - ticker := time.NewTicker(statIntervalSec * time.Second) - defer ticker.Stop() - - defer func() { - if t.pullSession != nil { - nazalog.Infof("[%s] dispose pull session. [%s]", t.uk, t.pullSession.UniqueKey()) - t.pullSession.Dispose() - } - - for _, s := range t.pushSessionList { - nazalog.Infof("[%s] dispose push session. [%s]", t.uk, s.UniqueKey()) - s.Dispose() - } - }() - - for { - select { - case err := <-t.pullSession.Wait(): - nazalog.Errorf("[%s] <- pull wait. [%s] err=%+v", t.uk, t.pullSession.UniqueKey(), err) - t.waitChan <- ErrorCode{-1, err} - return - case ec := <-t.pushECChan: - nazalog.Errorf("[%s] <- pushECChan. err=%+v", t.uk, ec) - t.waitChan <- ec - return - case <-t.closeChan: - nazalog.Errorf("[%s] <- closeChan.", t.uk) - t.waitChan <- ErrorCode{-1, ErrClosedByCaller} - return - case m := <-t.rtmpMsgQ: - currHeader := remux.MakeDefaultRTMPHeader(m.Header) - chunks := rtmp.Message2Chunks(m.Payload, &currHeader) - if debugWriteCount < maxDebugWriteCount { - nazalog.Infof("[%s] write. header=%+v, %+v, %s", t.uk, m.Header, currHeader, hex.Dump(nazastring.SubSliceSafety(m.Payload, 32))) - debugWriteCount++ - } - - for i, pushSession := range t.pushSessionList { - err := pushSession.AsyncWrite(chunks) - if err != nil { - nazalog.Errorf("[%s] write error. err=%+v", t.uk, err) - t.waitChan <- ErrorCode{i, err} - } - } - case <-ticker.C: - t.pullSession.UpdateStat(statIntervalSec) - nazalog.Debugf("[%s] tick pull session stat. [%s] streamName=%s, stat=%+v", - t.uk, t.pullSession.UniqueKey(), t.pullSession.StreamName(), t.pullSession.GetStat()) - for _, s := range t.pushSessionList { - s.UpdateStat(statIntervalSec) - nazalog.Debugf("[%s] tick push session stat. [%s] streamName=%s, stat=%+v", - t.uk, s.UniqueKey(), s.StreamName(), s.GetStat()) - } - } - } - }() - - return ErrorCode{0, nil} + ret = ErrorCode{0, nil} + return } // `Start`函数调用成功后,可调用`Wait`函数,等待任务结束 -// `Start`函数调用失败后,请不要调用`Wait`函数,否则行为未定义 // func (t *Tunnel) Wait() chan ErrorCode { return t.waitChan } // `Start`函数调用成功后,可调用`Close`函数,主动关闭转推任务 -// `Start`函数调用失败后,请不要调用`Close`函数,否则行为未定义 // `Close`函数允许调用多次 // func (t *Tunnel) Close() { - t.closeChan <- struct{}{} + t.notifyClose() +} + +func (t *Tunnel) notifyClose() { + select { + case t.closeChan <- ErrorCode{-1, ErrClosedByCaller}: + nazalog.Debugf("[%s] notifyClose.", t.uk) + default: + nazalog.Debugf("[%s] notifyClose fail, ignore.", t.uk) + } +} + +func (t *Tunnel) notifyWait(ec ErrorCode) { + select { + case t.waitChan <- ec: + nazalog.Debugf("[%s] notifyWait. ec=%s", t.uk, ec.Stringify()) + default: + nazalog.Warnf("[%s] CHEFNOTICEME notifyWait fail, ignore. ec=%s", t.uk, ec.Stringify()) + } +} + +func (t *Tunnel) notifyStartEC(ec ErrorCode) { + select { + case t.startECChan <- ec: + nazalog.Debugf("[%s] notifyStartEC. ec=%s", t.uk, ec.Stringify()) + default: + nazalog.Warnf("[%s] CHEFNOTICEME notifyStartEC fail, ignore. ec=%s", t.uk, ec.Stringify()) + } +} + +func (t *Tunnel) notifyPushEC(ec ErrorCode) { + select { + case t.pushECChan <- ec: + nazalog.Debugf("[%s] notifyPushEC. ec=%s", t.uk, ec.Stringify()) + default: + nazalog.Warnf("[%s] CHEFNOTICEME notifyPushEC fail, ignore. ec=%s", t.uk, ec.Stringify()) + } +} + +func (t *Tunnel) notifyPullEC(ec ErrorCode) { + select { + case t.pullECChan <- ec: + nazalog.Debugf("[%s] notifyPullEC. ec=%s", t.uk, ec.Stringify()) + default: + nazalog.Warnf("[%s] CHEFNOTICEME notifyPullEC fail, ignore. ec=%s", t.uk, ec.Stringify()) + } +} +func (ec *ErrorCode) Stringify() string { + return fmt.Sprintf("(%d, %+v)", ec.code, ec.err) } diff --git a/app/demo/pushrtmp/pushrtmp.go b/app/demo/pushrtmp/pushrtmp.go index 6201d99..9e54534 100644 --- a/app/demo/pushrtmp/pushrtmp.go +++ b/app/demo/pushrtmp/pushrtmp.go @@ -11,7 +11,6 @@ package main import ( "flag" "fmt" - "io" "os" "strconv" "strings" @@ -68,7 +67,12 @@ func main() { urls := collect(urlTmpl, num) - tags := readAllTag(filename) + tags, err := httpflv.ReadAllTagsFromFLVFile(filename) + if err != nil { + nazalog.Errorf("read tags from flv file failed. err=%+v", err) + os.Exit(0) + } + nazalog.Infof("read all tag done. tag num=%d", len(tags)) go func() { for { @@ -91,41 +95,6 @@ func main() { nazalog.Info("bye.") } -// readAllTag 预读取 flv 文件中的所有 tag,缓存在内存中 -func readAllTag(filename string) (ret []httpflv.Tag) { - var ffr httpflv.FLVFileReader - err := ffr.Open(filename) - if err != nil { - nazalog.Errorf("open file failed. file=%s, err=%v", filename, err) - os.Exit(1) - } - nazalog.Infof("open succ. filename=%s", filename) - - for { - tag, err := ffr.ReadTag() - if err == io.EOF { - nazalog.Info("EOF") - break - } - if err != nil { - nazalog.Errorf("read file tag error. tag num=%d, err=%v", len(ret), err) - break - } - if tag.IsMetadata() { - nazalog.Debugf("M %d", tag.Header.Timestamp) - } else if tag.IsVideoKeySeqHeader() { - nazalog.Debugf("V SH %d", tag.Header.Timestamp) - } else if tag.IsVideoKeyNALU() { - nazalog.Debugf("V K %d", tag.Header.Timestamp) - } else if tag.IsAACSeqHeader() { - nazalog.Debugf("A SH %d", tag.Header.Timestamp) - } - ret = append(ret, tag) - } - nazalog.Infof("read all tag done. tag num=%d", len(ret)) - return -} - func push(tags []httpflv.Tag, urls []string, isRecursive bool) { var sessionList []*rtmp.PushSession @@ -282,9 +251,9 @@ func parseFlag() (filename string, urlTmpl string, num int, isRecursive bool, lo if *i == "" || *o == "" { flag.Usage() _, _ = fmt.Fprintf(os.Stderr, `Example: - %s -i test.flv -o rtmp://127.0.0.1:19350/live/test - %s -i test.flv -o rtmp://127.0.0.1:19350/live/test -r - %s -i test.flv -o rtmp://127.0.0.1:19350/live/test_{i} -r -n 1000 + %s -i test.flv -o rtmp://127.0.0.1:1935/live/test + %s -i test.flv -o rtmp://127.0.0.1:1935/live/test -r + %s -i test.flv -o rtmp://127.0.0.1:1935/live/test_{i} -r -n 1000 `, os.Args[0], os.Args[0], os.Args[0]) base.OSExitAndWaitPressIfWindows(1) } diff --git a/conf/onlyrtmp.conf.json b/conf/onlyrtmp.conf.json new file mode 100644 index 0000000..d3b9e72 --- /dev/null +++ b/conf/onlyrtmp.conf.json @@ -0,0 +1,69 @@ +{ + "rtmp": { + "enable": true, + "addr": ":1935", + "gop_num": 0 + }, + "httpflv": { + "enable": false, + "sub_listen_addr": ":8080", + "enable_https": false, + "https_addr": ":4433", + "https_cert_file": "./conf/cert.pem", + "https_key_file": "./conf/key.pem", + "gop_num": 0 + }, + "hls": { + "enable": false, + "sub_listen_addr": ":8081", + "out_path": "/tmp/lal/hls/", + "fragment_duration_ms": 3000, + "fragment_num": 6, + "cleanup_flag": true + }, + "httpts": { + "enable": false, + "sub_listen_addr": ":8082" + }, + "rtsp": { + "enable": false, + "addr": ":5544" + }, + "relay_push": { + "enable": false, + "addr_list":[ + ] + }, + "relay_pull": { + "enable": false, + "addr": "" + }, + "http_api": { + "enable": false, + "addr": ":8083" + }, + "server_id": "1", + "http_notify": { + "enable": false, + "update_interval_sec": 5, + "on_server_start": "http://127.0.0.1:10101/on_server_start", + "on_update": "http://127.0.0.1:10101/on_update", + "on_pub_start": "http://127.0.0.1:10101/on_pub_start", + "on_pub_stop": "http://127.0.0.1:10101/on_pub_stop", + "on_sub_start": "http://127.0.0.1:10101/on_sub_start", + "on_sub_stop": "http://127.0.0.1:10101/on_sub_stop", + "on_rtmp_connect": "http://127.0.0.1:10101/on_rtmp_connect" + }, + "pprof": { + "enable": true, + "addr": ":8084" + }, + "log": { + "level": 1, + "filename": "./logs/lalserver.log", + "is_to_stdout": true, + "is_rotate_daily": true, + "short_file_flag": true, + "assert_behavior": 1 + } +} diff --git a/pkg/httpflv/util.go b/pkg/httpflv/util.go new file mode 100644 index 0000000..22613c5 --- /dev/null +++ b/pkg/httpflv/util.go @@ -0,0 +1,34 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package httpflv + +import "io" + +func ReadAllTagsFromFLVFile(filename string) ([]Tag, error) { + var tags []Tag + + var ffr FLVFileReader + err := ffr.Open(filename) + if err != nil { + return nil, err + } + + for { + tag, err := ffr.ReadTag() + if err != nil { + if err == io.EOF { + return tags, nil + } else { + return tags, err + } + } + tags = append(tags, tag) + } + // never reach here +}