lal/pkg/logic/group.go

324 lines
8.6 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"sync"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
// TODO chef: group可以考虑搞个协程
type Group struct {
UniqueKey string
appName string
streamName string
exitChan chan struct{}
mutex sync.Mutex
pubSession *rtmp.ServerSession
rtmpSubSessionSet map[*rtmp.ServerSession]struct{}
httpflvSubSessionSet map[*httpflv.SubSession]struct{}
hlsMuxer *hls.Muxer
gopCache *GOPCache
httpflvGopCache *GOPCache
relayPushList []*RelayPush
}
func NewGroup(appName string, streamName string) *Group {
uk := unique.GenUniqueKey("GROUP")
nazalog.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName)
var relayPushList []*RelayPush
if config.RelayPushConfig.Enable {
for _, addr := range config.RelayPushConfig.AddrList {
url := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName)
relayPush := NewRelayPush(url)
relayPushList = append(relayPushList, relayPush)
}
}
return &Group{
UniqueKey: uk,
appName: appName,
streamName: streamName,
exitChan: make(chan struct{}, 1),
rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}),
httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}),
gopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum),
httpflvGopCache: NewGOPCache("httpflv", uk, config.HTTPFLVConfig.GOPNum),
relayPushList: relayPushList,
}
}
func (group *Group) RunLoop() {
<-group.exitChan
}
func (group *Group) Dispose() {
nazalog.Infof("lifecycle dispose group. [%s]", group.UniqueKey)
group.exitChan <- struct{}{}
group.mutex.Lock()
defer group.mutex.Unlock()
if group.pubSession != nil {
group.pubSession.Dispose()
group.pubSession = nil
}
for session := range group.rtmpSubSessionSet {
session.Dispose()
}
group.rtmpSubSessionSet = nil
for session := range group.httpflvSubSessionSet {
session.Dispose()
}
group.httpflvSubSessionSet = nil
if group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.hlsMuxer = nil
}
if config.RelayPushConfig.Enable {
for _, rp := range group.relayPushList {
rp.Dispose()
}
}
}
func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool {
nazalog.Debugf("add PubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
if group.pubSession != nil {
nazalog.Errorf("PubSession already exist in group. [%s] old=%s, new=%s", group.UniqueKey, group.pubSession.UniqueKey, session.UniqueKey)
return false
}
group.pubSession = session
if config.HLSConfig.Enable {
group.hlsMuxer = hls.NewMuxer(group.streamName, &config.HLSConfig.MuxerConfig)
group.hlsMuxer.Start()
}
if config.RelayPushConfig.Enable {
for _, rp := range group.relayPushList {
rp.Start()
}
}
session.SetPubSessionObserver(group)
return true
}
func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) {
nazalog.Debugf("del PubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
group.pubSession = nil
if config.HLSConfig.Enable && group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.hlsMuxer = nil
}
if config.RelayPushConfig.Enable {
for _, rp := range group.relayPushList {
rp.Stop()
}
}
group.gopCache.Clear()
group.httpflvGopCache.Clear()
}
func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) {
nazalog.Debugf("add SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtmpSubSessionSet[session] = struct{}{}
// TODO chef: 多长没有拉流session存在的功能
//group.turnToEmptyTick = 0
}
func (group *Group) DelRTMPSubSession(session *rtmp.ServerSession) {
nazalog.Debugf("del SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
delete(group.rtmpSubSessionSet, session)
}
func (group *Group) AddHTTPFLVSubSession(session *httpflv.SubSession) {
nazalog.Debugf("add httpflv SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
session.WriteHTTPResponseHeader()
session.WriteFLVHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
group.httpflvSubSessionSet[session] = struct{}{}
}
func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) {
nazalog.Debugf("del httpflv SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
delete(group.httpflvSubSessionSet, session)
}
func (group *Group) IsTotalEmpty() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.pubSession == nil && len(group.rtmpSubSessionSet) == 0 && len(group.httpflvSubSessionSet) == 0
}
func (group *Group) IsInExist() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.pubSession != nil
}
// PubSession or PullSession
func (group *Group) OnReadRTMPAVMsg(msg rtmp.AVMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
p := make([]byte, len(msg.Payload))
copy(p, msg.Payload)
msg.Payload = p
//nazalog.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1])
group.broadcastRTMP(msg)
if config.HLSConfig.Enable && group.hlsMuxer != nil {
group.hlsMuxer.FeedRTMPMessage(msg)
}
}
func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
var (
lcd LazyChunkDivider
lrm2ft LazyRTMPMsg2FLVTag
)
// # 1. 设置好用于发送的 rtmp 头部信息
currHeader := Trans.MakeDefaultRTMPHeader(msg.Header)
// TODO 这行代码是否放到 MakeDefaultRTMPHeader 中
currHeader.MsgLen = uint32(len(msg.Payload))
// # 2. 懒初始化rtmp chunk切片以及httpflv转换
lcd.Init(msg.Payload, &currHeader)
lrm2ft.Init(msg)
// # 3. 广播。遍历所有 rtmp sub session转发数据
for session := range group.rtmpSubSessionSet {
// ## 3.1. 如果是新的 sub session发送已缓存的信息
if session.IsFresh {
// TODO 头信息和full gop也可以在SubSession刚加入时发送
if group.gopCache.Metadata != nil {
_ = session.AsyncWrite(group.gopCache.Metadata)
}
if group.gopCache.VideoSeqHeader != nil {
_ = session.AsyncWrite(group.gopCache.VideoSeqHeader)
}
if group.gopCache.AACSeqHeader != nil {
_ = session.AsyncWrite(group.gopCache.AACSeqHeader)
}
for i := 0; i < group.gopCache.GetGOPCount(); i++ {
for _, item := range group.gopCache.GetGOPDataAt(i) {
_ = session.AsyncWrite(item)
}
}
session.IsFresh = false
}
// ## 3.2. 转发本次数据
_ = session.AsyncWrite(lcd.Get())
}
// TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下
if config.RelayPushConfig.Enable {
for _, rp := range group.relayPushList {
if !rp.Connected() {
continue
}
if rp.IsFresh {
if group.gopCache.Metadata != nil {
_ = rp.AsyncWrite(group.gopCache.Metadata)
}
if group.gopCache.VideoSeqHeader != nil {
_ = rp.AsyncWrite(group.gopCache.VideoSeqHeader)
}
if group.gopCache.AACSeqHeader != nil {
_ = rp.AsyncWrite(group.gopCache.AACSeqHeader)
}
for i := 0; i < group.gopCache.GetGOPCount(); i++ {
for _, item := range group.gopCache.GetGOPDataAt(i) {
_ = rp.AsyncWrite(item)
}
}
rp.IsFresh = false
}
_ = rp.AsyncWrite(lcd.Get())
}
}
// # 4. 广播。遍历所有 httpflv sub session转发数据
for session := range group.httpflvSubSessionSet {
if session.IsFresh {
if group.httpflvGopCache.Metadata != nil {
session.WriteRawPacket(group.httpflvGopCache.Metadata)
}
if group.httpflvGopCache.VideoSeqHeader != nil {
session.WriteRawPacket(group.httpflvGopCache.VideoSeqHeader)
}
if group.httpflvGopCache.AACSeqHeader != nil {
session.WriteRawPacket(group.httpflvGopCache.AACSeqHeader)
}
for i := 0; i < group.httpflvGopCache.GetGOPCount(); i++ {
for _, item := range group.httpflvGopCache.GetGOPDataAt(i) {
session.WriteRawPacket(item)
}
}
session.IsFresh = false
}
session.WriteRawPacket(lrm2ft.Get())
}
// # 5. 缓存关键信息以及gop
if config.RTMPConfig.Enable {
group.gopCache.Feed(msg, lcd.Get)
}
if config.HTTPFLVConfig.Enable {
group.httpflvGopCache.Feed(msg, lrm2ft.Get)
}
}