[refactor] 整理代码

pull/176/head
q191201771 3 years ago
parent 1fee00c317
commit 720ca9d6ff

@ -150,7 +150,7 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) {
// TODO(chef): 还没有测试新的接口start_relay_pull只是保证可以编译通过
url := fmt.Sprintf("http://%s/api/ctrl/start_relay_pull", reqServer.ApiAddr)
var b base.ApiCtrlStartRelayPullReq
b.Url = fmt.Sprintf("%s://%s/%s/%s?%s", base.ProtocolRtmp, pubServer.RtmpAddr, info.AppName, info.StreamName, config.PullSecretParam)
b.Url = fmt.Sprintf("%s://%s/%s/%s?%s", "rtmp", pubServer.RtmpAddr, info.AppName, info.StreamName, config.PullSecretParam)
//b.Protocol = base.ProtocolRtmp
//b.Addr = pubServer.RtmpAddr
//b.AppName = info.AppName

@ -10,8 +10,6 @@ package base
// 文档见: https://pengrl.com/lal/#/HTTPAPI
const HttpApiVersion = "v0.3.1"
const (
ErrorCodeSucc = 0
DespSucc = "succ"

@ -10,8 +10,6 @@ package base
// 文档见: https://pengrl.com/p/20101/
const HttpNotifyVersion = "v0.1.1"
type SessionEventCommonInfo struct {
ServerId string `json:"server_id"`

@ -8,67 +8,32 @@
package base
import (
"io"
"strings"
)
// TODO(chef): [refactor] 整理 subsession 接口部分 IsFresh 和 ShouldWaitVideoKeyFrame
// group中session Dispose表现记录
//
// Dispose结束后回调OnDel:
// rtmp.ServerSession(包含pub和sub) 1
// rtsp.PubSession和rtsp.SubSession 1
// rtmp.PullSession 2
// httpflv.SubSession 3
// httpts.SubSession 3
//
//
// 情况1: 协议正常走完回调OnAdd在自身server的RunLoop结束后回调OnDel
// 情况2: 在group中pull阻塞结束后手动回调OnDel
// 情况3: 在logic中sub RunLoop结束后手动回调OnDel
// TODO(chef): 整理所有Server类型Session的生命周期管理
// -
// - rtmp没有独立的Pub、Sub Session结构体类型而是直接使用ServerSession
// - write失败需要反应到loop来
// - rtsp是否也应该上层使用Command作为代理避免生命周期管理混乱
// ----- 所有session -----
//
// server.pub: rtmp(), rtsp
// server.sub: rtmp(), rtsp, flv, ts, 还有一个比较特殊的hls
// server.pub: rtmp(ServerSession), rtsp(PubSession)
// server.sub: rtmp(ServerSession), rtsp(SubSession), flv(SubSession), ts(SubSession), 还有一个比较特殊的hls
//
// client.push: rtmp, rtsp
// client.pull: rtmp, rtsp, flv
// client.push: rtmp(PushSession), rtsp(PushSession)
// client.pull: rtmp(PullSession), rtsp(PullSession), flv(PullSession)
//
// other: rtmp.ClientSession, rtmp.ServerSession
// rtsp.BaseInSession, rtsp.BaseOutSession, rtsp.ClientCommandSession, rtsp.ServerCommandSessionS
// other: rtmp.ClientSession, (rtmp.ServerSession)
// rtsp.BaseInSession, rtsp.BaseOutSession, rtsp.ClientCommandSession, rtsp.ServerCommandSession
// base.HttpSubSession
// ISessionUrlContext 实际测试
//
// | | 实际url | Url() | AppName, StreamName, RawQuery |
// | - | - | - | - |
// | rtmp pub推流 | rtmp://127.0.0.1:1935/live/test110 | 同实际url | live, test110, |
// | | rtmp://127.0.0.1:1935/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b, c/d/test110, p1=1&p2=2 |
// | rtsp pub推流 | rtsp://localhost:5544/live/test110 | 同实际url | live, test110, |
// | rtsp pub推流 | rtsp://localhost:5544/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | httpflv sub拉流 | http://127.0.0.1:8080/live/test110.flv | 同实际url | live, test110, |
// | | http://127.0.0.1:8080/a/b/c/d/test110.flv?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | rtmp sub拉流 | 同rtmp pub | . | . |
// | rtsp sub拉流 | 同rtsp pub | . | . |
// | httpts sub拉流 | 同httpflv sub只是末尾的.flv换成.ts不再赘述 | . | . |
// ---------------------------------------------------------------------------------------------------------------------
// IsUseClosedConnectionError 当connection处于这些情况时就不需要再Close了
// TODO(chef): 临时放这
// TODO(chef): 目前暂时没有使用因为connection支持多次调用Close
//
func IsUseClosedConnectionError(err error) bool {
if err == io.EOF || (err != nil && strings.Contains(err.Error(), "use of closed network connection")) {
return true
}
return false
}
const (
// ProtocolRtmp StatSession.Protocol
ProtocolRtmp = "RTMP"
ProtocolRtsp = "RTSP"
ProtocolHttpflv = "FLV"
ProtocolHttpts = "TS"
SessionBaseTypePub = "PUB"
SessionBaseTypeSub = "SUB"
SessionBaseTypePush = "PUSH"
SessionBaseTypePull = "PULL"
)
type IClientSession interface {
// PushSession:
@ -191,3 +156,51 @@ type IObject interface {
}
// TODO chef: rtmp.ClientSession修改为BaseClientSession更好些
// TODO(chef): [refactor] 整理 subsession 接口部分 IsFresh 和 ShouldWaitVideoKeyFrame
// ----- group中session Dispose表现记录 -----
//
// Dispose结束后回调OnDel:
// rtmp.ServerSession(包含pub和sub) 1
// rtsp.PubSession和rtsp.SubSession 1
// rtmp.PullSession 2
// httpflv.SubSession 3
// httpts.SubSession 3
//
//
// 情况1: 协议正常走完回调OnAdd在自身server的RunLoop结束后回调OnDel
// 情况2: 在group中pull阻塞结束后手动回调OnDel
// 情况3: 在logic中sub RunLoop结束后手动回调OnDel
// TODO(chef): 整理所有Server类型Session的生命周期管理
// -
// - rtmp没有独立的Pub、Sub Session结构体类型而是直接使用ServerSession
// - write失败需要反应到loop来
// - rtsp是否也应该上层使用Command作为代理避免生命周期管理混乱
//
// ISessionUrlContext 实际测试
//
// | | 实际url | Url() | AppName, StreamName, RawQuery |
// | - | - | - | - |
// | rtmp pub推流 | rtmp://127.0.0.1:1935/live/test110 | 同实际url | live, test110, |
// | | rtmp://127.0.0.1:1935/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b, c/d/test110, p1=1&p2=2 |
// | rtsp pub推流 | rtsp://localhost:5544/live/test110 | 同实际url | live, test110, |
// | rtsp pub推流 | rtsp://localhost:5544/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | httpflv sub拉流 | http://127.0.0.1:8080/live/test110.flv | 同实际url | live, test110, |
// | | http://127.0.0.1:8080/a/b/c/d/test110.flv?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | rtmp sub拉流 | 同rtmp pub | . | . |
// | rtsp sub拉流 | 同rtsp pub | . | . |
// | httpts sub拉流 | 同httpflv sub只是末尾的.flv换成.ts不再赘述 | . | . |
// IsUseClosedConnectionError 当connection处于这些情况时就不需要再Close了
// TODO(chef): 临时放这
// TODO(chef): 目前暂时没有使用因为connection支持多次调用Close
//
//func IsUseClosedConnectionError(err error) bool {
// if err == io.EOF || (err != nil && strings.Contains(err.Error(), "use of closed network connection")) {
// return true
// }
// return false
//}

@ -15,12 +15,6 @@ const (
// VideoCodecAvc StatGroup.VideoCodec
VideoCodecAvc = "H264"
VideoCodecHevc = "H265"
// ProtocolRtmp StatSession.Protocol
ProtocolRtmp = "RTMP"
ProtocolRtsp = "RTSP"
ProtocolHttpflv = "HTTP-FLV"
ProtocolHttpts = "HTTP-TS"
)
type StatGroup struct {

@ -12,17 +12,18 @@ import "github.com/q191201771/naza/pkg/unique"
const (
UkPreCustomizePubSessionContext = "CUSTOMIZEPUB"
UkPreRtmpServerSession = "RTMPPUBSUB"
UkPreRtmpServerSession = "RTMPPUBSUB" // 两种可能pub或者sub
UkPreRtmpPushSession = "RTMPPUSH"
UkPreRtmpPullSession = "RTMPPULL"
UkPreRtspServerCommandSession = "RTSPSRVCMD"
UkPreRtspPubSession = "RTSPPUB"
UkPreRtspSubSession = "RTSPSUB"
UkPreRtspPushSession = "RTSPPUSH"
UkPreRtspPullSession = "RTSPPULL"
UkPreFlvSubSession = "FLVSUB"
UkPreTsSubSession = "TSSUB"
UkPreFlvPullSession = "FLVPULL"
UkPreTsSubSession = "TSSUB"
UkPreRtspServerCommandSession = "RTSPSRVCMD" // 这个不暴露给上层
UkPreGroup = "GROUP"
UkPreHlsMuxer = "HLSMUXER"

@ -18,6 +18,10 @@ import "strings"
// LalVersion 版本,该变量由外部脚本修改维护
const LalVersion = "v0.29.1"
const HttpApiVersion = "v0.3.1"
const HttpNotifyVersion = "v0.1.1"
var (
LalLibraryName = "lal"
LalGithubRepo = "github.com/q191201771/lal"

@ -12,14 +12,12 @@ import (
"flag"
"fmt"
"github.com/q191201771/naza/pkg/nazalog"
"math"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"github.com/q191201771/naza/pkg/bininfo"
"github.com/q191201771/naza/pkg/defertaskthread"
"github.com/q191201771/lal/pkg/hls"
@ -333,123 +331,7 @@ func (sm *ServerManager) Dispose() {
sm.exitChan <- struct{}{}
}
func (sm *ServerManager) StatLalInfo() base.LalInfo {
var lalInfo base.LalInfo
lalInfo.BinInfo = bininfo.StringifySingleLine()
lalInfo.LalVersion = base.LalVersion
lalInfo.ApiVersion = base.HttpApiVersion
lalInfo.NotifyVersion = base.HttpNotifyVersion
lalInfo.StartTime = sm.serverStartTime
lalInfo.ServerId = sm.config.ServerId
return lalInfo
}
func (sm *ServerManager) StatAllGroup() (sgs []base.StatGroup) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.groupManager.Iterate(func(group *Group) bool {
sgs = append(sgs, group.GetStat(math.MaxInt32))
return true
})
return
}
func (sm *ServerManager) StatGroup(streamName string) *base.StatGroup {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("", streamName)
if g == nil {
return nil
}
// copy
var ret base.StatGroup
ret = g.GetStat(math.MaxInt32)
return &ret
}
func (sm *ServerManager) CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) (ret base.ApiCtrlStartRelayPull) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
streamName := info.StreamName
if streamName == "" {
ctx, err := base.ParseUrl(info.Url, -1)
if err != nil {
ret.ErrorCode = base.ErrorCodeStartRelayPullFail
ret.Desp = err.Error()
return
}
streamName = ctx.LastItemOfPath
}
// 注意如果group不存在我们依然relay pull
g := sm.getOrCreateGroup("", streamName)
sessionId, err := g.StartPull(info)
if err != nil {
ret.ErrorCode = base.ErrorCodeStartRelayPullFail
ret.Desp = err.Error()
} else {
ret.ErrorCode = base.ErrorCodeSucc
ret.Desp = base.DespSucc
ret.Data.StreamName = streamName
ret.Data.SessionId = sessionId
}
return
}
// CtrlStopRelayPull
//
// TODO(chef): 整理错误值
//
func (sm *ServerManager) CtrlStopRelayPull(streamName string) (ret base.ApiCtrlStopRelayPull) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("", streamName)
if g == nil {
ret.ErrorCode = base.ErrorCodeGroupNotFound
ret.Desp = base.DespGroupNotFound
return
}
ret.Data.SessionId = g.StopPull()
if ret.Data.SessionId == "" {
ret.ErrorCode = base.ErrorCodeSessionNotFound
ret.Desp = base.DespSessionNotFound
return
}
ret.ErrorCode = base.ErrorCodeSucc
ret.Desp = base.DespSucc
return
}
// CtrlKickSession
//
// TODO(chef): refactor 不要返回http结果返回error吧
//
func (sm *ServerManager) CtrlKickSession(info base.ApiCtrlKickSession) base.HttpResponseBasic {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("", info.StreamName)
if g == nil {
return base.HttpResponseBasic{
ErrorCode: base.ErrorCodeGroupNotFound,
Desp: base.DespGroupNotFound,
}
}
if !g.KickSession(info.SessionId) {
return base.HttpResponseBasic{
ErrorCode: base.ErrorCodeSessionNotFound,
Desp: base.DespSessionNotFound,
}
}
return base.HttpResponseBasic{
ErrorCode: base.ErrorCodeSucc,
Desp: base.DespSucc,
}
}
// ---------------------------------------------------------------------------------------------------------------------
func (sm *ServerManager) AddCustomizePubSession(streamName string) (ICustomizePubSessionContext, error) {
sm.mutex.Lock()

@ -0,0 +1,129 @@
package logic
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/bininfo"
"math"
)
// server_manager__api.go
//
// 支持http-api功能的部分
//
func (sm *ServerManager) StatLalInfo() base.LalInfo {
var lalInfo base.LalInfo
lalInfo.BinInfo = bininfo.StringifySingleLine()
lalInfo.LalVersion = base.LalVersion
lalInfo.ApiVersion = base.HttpApiVersion
lalInfo.NotifyVersion = base.HttpNotifyVersion
lalInfo.StartTime = sm.serverStartTime
lalInfo.ServerId = sm.config.ServerId
return lalInfo
}
func (sm *ServerManager) StatAllGroup() (sgs []base.StatGroup) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.groupManager.Iterate(func(group *Group) bool {
sgs = append(sgs, group.GetStat(math.MaxInt32))
return true
})
return
}
func (sm *ServerManager) StatGroup(streamName string) *base.StatGroup {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("", streamName)
if g == nil {
return nil
}
// copy
var ret base.StatGroup
ret = g.GetStat(math.MaxInt32)
return &ret
}
func (sm *ServerManager) CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) (ret base.ApiCtrlStartRelayPull) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
streamName := info.StreamName
if streamName == "" {
ctx, err := base.ParseUrl(info.Url, -1)
if err != nil {
ret.ErrorCode = base.ErrorCodeStartRelayPullFail
ret.Desp = err.Error()
return
}
streamName = ctx.LastItemOfPath
}
// 注意如果group不存在我们依然relay pull
g := sm.getOrCreateGroup("", streamName)
sessionId, err := g.StartPull(info)
if err != nil {
ret.ErrorCode = base.ErrorCodeStartRelayPullFail
ret.Desp = err.Error()
} else {
ret.ErrorCode = base.ErrorCodeSucc
ret.Desp = base.DespSucc
ret.Data.StreamName = streamName
ret.Data.SessionId = sessionId
}
return
}
// CtrlStopRelayPull
//
// TODO(chef): 整理错误值
//
func (sm *ServerManager) CtrlStopRelayPull(streamName string) (ret base.ApiCtrlStopRelayPull) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("", streamName)
if g == nil {
ret.ErrorCode = base.ErrorCodeGroupNotFound
ret.Desp = base.DespGroupNotFound
return
}
ret.Data.SessionId = g.StopPull()
if ret.Data.SessionId == "" {
ret.ErrorCode = base.ErrorCodeSessionNotFound
ret.Desp = base.DespSessionNotFound
return
}
ret.ErrorCode = base.ErrorCodeSucc
ret.Desp = base.DespSucc
return
}
// CtrlKickSession
//
// TODO(chef): refactor 不要返回http结果返回error吧
//
func (sm *ServerManager) CtrlKickSession(info base.ApiCtrlKickSession) (ret base.HttpResponseBasic) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("", info.StreamName)
if g == nil {
ret.ErrorCode = base.ErrorCodeGroupNotFound
ret.Desp = base.DespGroupNotFound
return
}
if !g.KickSession(info.SessionId) {
ret.ErrorCode = base.ErrorCodeSessionNotFound
ret.Desp = base.DespSessionNotFound
return
}
ret.ErrorCode = base.ErrorCodeSucc
ret.Desp = base.DespSucc
return
}
Loading…
Cancel
Save