You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lal/app/demo/pullrtmp2pushrtmp/tunnel.go

298 lines
8.6 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"encoding/hex"
"errors"
"fmt"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazastring"
"github.com/q191201771/naza/pkg/unique"
)
// 注意,当前的策略是,当推流有多个地址时,任意一个失败就会退出整个任务
var ErrClosedByCaller = errors.New("tunnel closed by caller")
type Tunnel struct {
uk string
inUrl string
outUrlList []string
startTime time.Time
startEcChan chan ErrorCode
pullEcChan chan ErrorCode
pushEcChan chan ErrorCode
closeChan chan ErrorCode
waitChan chan ErrorCode
rtmpMsgQ chan base.RtmpMsg
pullSession *rtmp.PullSession
pushSessionList []*rtmp.PushSession
}
type ErrorCode struct {
code int // -1表示拉流失败或者结束>=0表示推流失败或者结束值对应outUrlList的下标
err error
}
// @param inUrl 拉流rtmp url地址
// @param outUrlList 推流rtmp url地址列表
//
func NewTunnel(inUrl string, outUrlList []string) *Tunnel {
var streamName string
ctx, err := base.ParseRtmpUrl(inUrl)
if err != nil {
nazalog.Errorf("parse rtmp url failed. url=%s", inUrl)
streamName = "invalid"
} else {
streamName = ctx.LastItemOfPath
}
originUk := unique.GenUniqueKey("TUNNEL")
uk := fmt.Sprintf("%s-%s", originUk, streamName)
return &Tunnel{
uk: uk,
inUrl: inUrl,
outUrlList: outUrlList,
startTime: time.Now(),
startEcChan: make(chan ErrorCode, len(outUrlList)+1),
pullEcChan: make(chan ErrorCode, 1),
pushEcChan: make(chan ErrorCode, len(outUrlList)),
closeChan: make(chan ErrorCode, 1),
waitChan: make(chan ErrorCode, len(outUrlList)+1),
rtmpMsgQ: make(chan base.RtmpMsg, 1024),
}
}
// @return err 为nil时表示任务启动成功拉流和推流通道都已成功建立并开始转推数据
// 不为nil时表示任务失败可以通过`code`得到是拉流还是推流失败
func (t *Tunnel) Start() (ret ErrorCode) {
const (
pullTimeoutMs = 10000
pushTimeoutMs = 10000
statIntervalSec = 5
)
nazalog.Infof("[%s] new tunnel. inUrl=%s, outUrlList=%+v", t.uk, t.inUrl, t.outUrlList)
defer func() {
if ret.err != nil {
t.notifyStartEc(ret)
}
go func() {
nazalog.Debugf("[%s] > main event loop.", t.uk)
debugWriteCount := 0
maxDebugWriteCount := 5
ticker := time.NewTicker(statIntervalSec * time.Second)
defer ticker.Stop()
// 最后清理所有session
defer func() {
nazalog.Debugf("[%s] < main event loop. duration=%+v", t.uk, time.Now().Sub(t.startTime))
if t.pullSession != nil {
nazalog.Infof("[%s] dispose pull session. [%s]", t.uk, t.pullSession.UniqueKey())
t.pullSession.Dispose()
}
for _, s := range t.pushSessionList {
nazalog.Infof("[%s] dispose push session. [%s]", t.uk, s.UniqueKey())
s.Dispose()
}
}()
if t.pullSession != nil {
go func() {
nazalog.Debugf("[%s] > pull event loop. %s", t.uk, t.pullSession.UniqueKey())
for {
select {
case err := <-t.pullSession.WaitChan():
t.notifyPullEc(ErrorCode{-1, err})
nazalog.Debugf("[%s] < pull event loop. %s", t.uk, t.pullSession.UniqueKey())
return
}
}
}()
}
// 将多个pushSession wait事件聚合在一起
for i, pushSession := range t.pushSessionList {
go func(ii int, s *rtmp.PushSession) {
nazalog.Debugf("[%s] > push event loop. %s", t.uk, s.UniqueKey())
for {
select {
case err := <-s.WaitChan():
nazalog.Errorf("[%s] push wait error. [%s] err=%+v", t.uk, s.UniqueKey(), err)
t.notifyPushEc(ErrorCode{ii, err})
nazalog.Debugf("[%s] < push event loop. %s", t.uk, s.UniqueKey())
return
}
}
}(i, pushSession)
}
// 主事件监听
for {
select {
case ec := <-t.startEcChan:
nazalog.Errorf("[%s] exit main event loop, <- startEcChan. err=%s", t.uk, ec.Stringify())
t.notifyWait(ec)
return
case ec := <-t.pullEcChan:
nazalog.Errorf("[%s] exit main event loop, <- pullEcChan. err=%s", t.uk, ec.Stringify())
t.notifyWait(ec)
return
case ec := <-t.pushEcChan:
nazalog.Errorf("[%s] exit main event loop, <- pushEcChan. err=%s", t.uk, ec.Stringify())
t.notifyWait(ec)
return
case ec := <-t.closeChan:
nazalog.Errorf("[%s] exit main event loop, <- closeChan.", t.uk)
t.notifyWait(ec)
return
case m := <-t.rtmpMsgQ:
currHeader := remux.MakeDefaultRtmpHeader(m.Header)
chunks := rtmp.Message2Chunks(m.Payload, &currHeader)
if debugWriteCount < maxDebugWriteCount {
nazalog.Infof("[%s] write. header=%+v, %+v, %s", t.uk, m.Header, currHeader, hex.Dump(nazastring.SubSliceSafety(m.Payload, 32)))
debugWriteCount++
}
for i, pushSession := range t.pushSessionList {
err := pushSession.Write(chunks)
if err != nil {
nazalog.Errorf("[%s] exit main event loop, write error. err=%+v", t.uk, err)
t.notifyWait(ErrorCode{i, err})
return
}
}
case <-ticker.C:
t.pullSession.UpdateStat(statIntervalSec)
nazalog.Debugf("[%s] tick pull session stat. [%s] streamName=%s, stat=%+v",
t.uk, t.pullSession.UniqueKey(), t.pullSession.StreamName(), t.pullSession.GetStat())
for _, s := range t.pushSessionList {
s.UpdateStat(statIntervalSec)
nazalog.Debugf("[%s] tick push session stat. [%s] streamName=%s, stat=%+v",
t.uk, s.UniqueKey(), s.StreamName(), s.GetStat())
}
}
}
}()
}()
// 逐个开启push session
for i, outUrl := range t.outUrlList {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = pushTimeoutMs
})
nazalog.Infof("[%s] start push. [%s] url=%s", t.uk, pushSession.UniqueKey(), outUrl)
err := pushSession.Push(outUrl)
// 只有有一个失败就直接退出
if err != nil {
nazalog.Errorf("[%s] push error. [%s] err=%+v", t.uk, pushSession.UniqueKey(), err)
ret = ErrorCode{i, err}
return
}
nazalog.Infof("[%s] push succ. [%s]", t.uk, pushSession.UniqueKey())
// 加入的都是成功的
t.pushSessionList = append(t.pushSessionList, pushSession)
}
t.pullSession = rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = pullTimeoutMs
})
nazalog.Infof("[%s] start pull. [%s] url=%s", t.uk, t.pullSession.UniqueKey(), t.inUrl)
err := t.pullSession.Pull(t.inUrl, func(msg base.RtmpMsg) {
m := msg.Clone()
t.rtmpMsgQ <- m
})
// pull失败就直接退出
if err != nil {
nazalog.Errorf("[%s] pull error. [%s] err=%+v", t.uk, t.pullSession.UniqueKey(), err)
t.pullSession = nil
ret = ErrorCode{-1, err}
return
}
nazalog.Infof("[%s] pull succ. [%s]", t.uk, t.pullSession.UniqueKey())
ret = ErrorCode{0, nil}
return
}
// `Start`函数调用成功后,可调用`Wait`函数,等待任务结束
//
func (t *Tunnel) Wait() chan ErrorCode {
return t.waitChan
}
// `Start`函数调用成功后,可调用`Close`函数,主动关闭转推任务
// `Close`函数允许调用多次
//
func (t *Tunnel) Close() {
t.notifyClose()
}
func (t *Tunnel) notifyClose() {
select {
case t.closeChan <- ErrorCode{-1, ErrClosedByCaller}:
nazalog.Debugf("[%s] notifyClose.", t.uk)
default:
nazalog.Debugf("[%s] notifyClose fail, ignore.", t.uk)
}
}
func (t *Tunnel) notifyWait(ec ErrorCode) {
select {
case t.waitChan <- ec:
nazalog.Debugf("[%s] notifyWait. ec=%s", t.uk, ec.Stringify())
default:
nazalog.Warnf("[%s] CHEFNOTICEME notifyWait fail, ignore. ec=%s", t.uk, ec.Stringify())
}
}
func (t *Tunnel) notifyStartEc(ec ErrorCode) {
select {
case t.startEcChan <- ec:
nazalog.Debugf("[%s] notifyStartEc. ec=%s", t.uk, ec.Stringify())
default:
nazalog.Warnf("[%s] CHEFNOTICEME notifyStartEc fail, ignore. ec=%s", t.uk, ec.Stringify())
}
}
func (t *Tunnel) notifyPushEc(ec ErrorCode) {
select {
case t.pushEcChan <- ec:
nazalog.Debugf("[%s] notifyPushEc. ec=%s", t.uk, ec.Stringify())
default:
nazalog.Warnf("[%s] CHEFNOTICEME notifyPushEc fail, ignore. ec=%s", t.uk, ec.Stringify())
}
}
func (t *Tunnel) notifyPullEc(ec ErrorCode) {
select {
case t.pullEcChan <- ec:
nazalog.Debugf("[%s] notifyPullEc. ec=%s", t.uk, ec.Stringify())
default:
nazalog.Warnf("[%s] CHEFNOTICEME notifyPullEc fail, ignore. ec=%s", t.uk, ec.Stringify())
}
}
func (ec *ErrorCode) Stringify() string {
return fmt.Sprintf("(%d, %+v)", ec.code, ec.err)
}