You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lal/pkg/logic/group__relay_pull.go

134 lines
3.3 KiB
Go

// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"github.com/q191201771/lal/pkg/rtmp"
)
// StartPull 外部命令主动触发pull拉流
//
// 当前调用时机:
// 1. 比如http api
//
func (group *Group) StartPull(url string) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.setPullUrl(true, url)
group.pullIfNeeded()
}
// ---------------------------------------------------------------------------------------------------------------------
type pullProxy struct {
isPulling bool
pullSession *rtmp.PullSession
}
func (group *Group) initRelayPull() {
enable := group.config.RelayPullConfig.Enable
addr := group.config.RelayPullConfig.Addr
appName := group.appName
streamName := group.streamName
// 根据配置文件中的静态回源配置来初始化回源设置
var pullUrl string
if enable {
pullUrl = fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName)
}
group.setPullUrl(enable, pullUrl)
}
func (group *Group) isPullEnable() bool {
return group.pullEnable
}
func (group *Group) setPullUrl(enable bool, url string) {
group.pullEnable = enable
group.pullUrl = url
}
func (group *Group) getPullUrl() string {
return group.pullUrl
}
func (group *Group) setPullingFlag(flag bool) {
group.pullProxy.isPulling = flag
}
func (group *Group) getPullingFlag() bool {
return group.pullProxy.isPulling
}
// 判断是否需要pull从远端拉流至本地如果需要则触发pull
//
// 当前调用时机:
// 1. 添加新sub session
// 2. 外部命令比如http api
// 3. 定时器比如pull的连接断了通过定时器可以重启触发pull
//
func (group *Group) pullIfNeeded() {
if !group.isPullEnable() {
return
}
// 如果没有从本地拉流的就不需要pull了
if !group.hasSubSession() {
return
}
// 如果本地已经有输入型的流就不需要pull了
if group.hasInSession() {
return
}
// 已经在pull中就不需要pull了
if group.getPullingFlag() {
return
}
group.setPullingFlag(true)
Log.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.getPullUrl())
go func() {
pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = relayPullTimeoutMs
option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs
})
// TODO(chef): 处理数据回调是否应该等待Add成功之后。避免竞态条件中途加入了其他in session
err := pullSession.Pull(group.getPullUrl(), group.OnReadRtmpAvMsg)
if err != nil {
Log.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
return
}
res := group.AddRtmpPullSession(pullSession)
if res {
err = <-pullSession.WaitChan()
Log.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
} else {
pullSession.Dispose()
}
}()
}
// 判断是否需要停止pull
//
// 当前调用时机:
// 1. 定时器定时检查
//
func (group *Group) stopPullIfNeeded() {
// 没有输出型的流了
if group.pullProxy.pullSession != nil && !group.hasSubSession() {
Log.Infof("[%s] stop pull since no sub session.", group.UniqueKey)
group.pullProxy.pullSession.Dispose()
}
}