You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lal/pkg/logic/group__relay_push.go

128 lines
3.4 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"github.com/q191201771/lal/pkg/rtmp"
)
// TODO(chef): [refactor] 参照relay pull整体重构一次relay push 202205
func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) {
Log.Debugf("[%s] [%s] add rtmp PushSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.url2PushProxy != nil {
group.url2PushProxy[url].pushSession = session
}
}
func (group *Group) DelRtmpPushSession(url string, session *rtmp.PushSession) {
Log.Debugf("[%s] [%s] del rtmp PushSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.url2PushProxy != nil {
group.url2PushProxy[url].pushSession = nil
group.url2PushProxy[url].isPushing = false
}
}
// ---------------------------------------------------------------------------------------------------------------------
type pushProxy struct {
isPushing bool
pushSession *rtmp.PushSession
}
func (group *Group) initRelayPushByConfig() {
enable := group.config.RelayPushConfig.Enable
addrList := group.config.RelayPushConfig.AddrList
appName := group.appName
streamName := group.streamName
url2PushProxy := make(map[string]*pushProxy)
if enable {
for _, addr := range addrList {
pushUrl := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName)
url2PushProxy[pushUrl] = &pushProxy{
isPushing: false,
pushSession: nil,
}
}
}
group.pushEnable = group.config.RelayPushConfig.Enable
group.url2PushProxy = url2PushProxy
}
// startPushIfNeeded 必要时进行replay push转推
func (group *Group) startPushIfNeeded() {
// push转推功能没开
if !group.pushEnable {
return
}
// 没有pub发布者
// TODO(chef): [refactor] 判断所有pub是否存在的方式 202208
if group.rtmpPubSession == nil && group.rtspPubSession == nil {
return
}
// relay push时携带rtmp pub的参数
// TODO chef: 这个逻辑放这里不太好看
var urlParam string
if group.rtmpPubSession != nil {
urlParam = group.rtmpPubSession.RawQuery()
}
for url, v := range group.url2PushProxy {
// 正在转推中
if v.isPushing {
continue
}
v.isPushing = true
urlWithParam := url
if urlParam != "" {
urlWithParam += "?" + urlParam
}
Log.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam)
go func(u, u2 string) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = RelayPushTimeoutMs
option.WriteAvTimeoutMs = RelayPushWriteAvTimeoutMs
})
err := pushSession.Start(u2)
if err != nil {
Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
return
}
group.AddRtmpPushSession(u, pushSession)
err = <-pushSession.WaitChan()
Log.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
}(url, urlWithParam)
}
}
func (group *Group) stopPushIfNeeded() {
if !group.pushEnable {
return
}
for _, v := range group.url2PushProxy {
if v.pushSession != nil {
v.pushSession.Dispose()
}
v.pushSession = nil
}
}