diff --git a/app/demo/dispatch/dispatch.go b/app/demo/dispatch/dispatch.go index ac03d66..57de9b7 100644 --- a/app/demo/dispatch/dispatch.go +++ b/app/demo/dispatch/dispatch.go @@ -150,7 +150,7 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) { // TODO(chef): 还没有测试新的接口start_relay_pull,只是保证可以编译通过 url := fmt.Sprintf("http://%s/api/ctrl/start_relay_pull", reqServer.ApiAddr) var b base.ApiCtrlStartRelayPullReq - b.Url = fmt.Sprintf("%s://%s/%s/%s?%s", base.ProtocolRtmp, pubServer.RtmpAddr, info.AppName, info.StreamName, config.PullSecretParam) + b.Url = fmt.Sprintf("%s://%s/%s/%s?%s", "rtmp", pubServer.RtmpAddr, info.AppName, info.StreamName, config.PullSecretParam) //b.Protocol = base.ProtocolRtmp //b.Addr = pubServer.RtmpAddr //b.AppName = info.AppName diff --git a/pkg/base/http_api_t.go b/pkg/base/t_http_api.go similarity index 98% rename from pkg/base/http_api_t.go rename to pkg/base/t_http_api.go index 2f93b3a..4153f1f 100644 --- a/pkg/base/http_api_t.go +++ b/pkg/base/t_http_api.go @@ -10,8 +10,6 @@ package base // 文档见: https://pengrl.com/lal/#/HTTPAPI -const HttpApiVersion = "v0.3.1" - const ( ErrorCodeSucc = 0 DespSucc = "succ" diff --git a/pkg/base/http_notify_t.go b/pkg/base/t_http_notify.go similarity index 97% rename from pkg/base/http_notify_t.go rename to pkg/base/t_http_notify.go index 2da6df9..4c281a2 100644 --- a/pkg/base/http_notify_t.go +++ b/pkg/base/t_http_notify.go @@ -10,8 +10,6 @@ package base // 文档见: https://pengrl.com/p/20101/ -const HttpNotifyVersion = "v0.1.1" - type SessionEventCommonInfo struct { ServerId string `json:"server_id"` diff --git a/pkg/base/rtmp_t.go b/pkg/base/t_rtmp.go similarity index 100% rename from pkg/base/rtmp_t.go rename to pkg/base/t_rtmp.go diff --git a/pkg/base/rtprtcp_t.go b/pkg/base/t_rtprtcp.go similarity index 100% rename from pkg/base/rtprtcp_t.go rename to pkg/base/t_rtprtcp.go diff --git a/pkg/base/session.go b/pkg/base/t_session.go similarity index 85% rename from pkg/base/session.go rename to pkg/base/t_session.go index b6b4c82..2492272 100644 --- a/pkg/base/session.go +++ b/pkg/base/t_session.go @@ -8,67 +8,32 @@ package base -import ( - "io" - "strings" -) - -// TODO(chef): [refactor] 整理 subsession 接口部分 IsFresh 和 ShouldWaitVideoKeyFrame - -// group中,session Dispose表现记录 -// -// Dispose结束后回调OnDel: -// rtmp.ServerSession(包含pub和sub) 1 -// rtsp.PubSession和rtsp.SubSession 1 -// rtmp.PullSession 2 -// httpflv.SubSession 3 -// httpts.SubSession 3 -// -// -// 情况1: 协议正常走完回调OnAdd,在自身server的RunLoop结束后,回调OnDel -// 情况2: 在group中pull阻塞结束后,手动回调OnDel -// 情况3: 在logic中sub RunLoop结束后,手动回调OnDel - -// TODO(chef): 整理所有Server类型Session的生命周期管理 -// - -// - rtmp没有独立的Pub、Sub Session结构体类型,而是直接使用ServerSession -// - write失败,需要反应到loop来 -// - rtsp是否也应该上层使用Command作为代理,避免生命周期管理混乱 +// ----- 所有session ----- // -// server.pub: rtmp(), rtsp -// server.sub: rtmp(), rtsp, flv, ts, 还有一个比较特殊的hls +// server.pub: rtmp(ServerSession), rtsp(PubSession) +// server.sub: rtmp(ServerSession), rtsp(SubSession), flv(SubSession), ts(SubSession), 还有一个比较特殊的hls // -// client.push: rtmp, rtsp -// client.pull: rtmp, rtsp, flv +// client.push: rtmp(PushSession), rtsp(PushSession) +// client.pull: rtmp(PullSession), rtsp(PullSession), flv(PullSession) // -// other: rtmp.ClientSession, rtmp.ServerSession -// rtsp.BaseInSession, rtsp.BaseOutSession, rtsp.ClientCommandSession, rtsp.ServerCommandSessionS +// other: rtmp.ClientSession, (rtmp.ServerSession) +// rtsp.BaseInSession, rtsp.BaseOutSession, rtsp.ClientCommandSession, rtsp.ServerCommandSession // base.HttpSubSession -// ISessionUrlContext 实际测试 -// -// | | 实际url | Url() | AppName, StreamName, RawQuery | -// | - | - | - | - | -// | rtmp pub推流 | rtmp://127.0.0.1:1935/live/test110 | 同实际url | live, test110, | -// | | rtmp://127.0.0.1:1935/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b, c/d/test110, p1=1&p2=2 | -// | rtsp pub推流 | rtsp://localhost:5544/live/test110 | 同实际url | live, test110, | -// | rtsp pub推流 | rtsp://localhost:5544/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 | -// | httpflv sub拉流 | http://127.0.0.1:8080/live/test110.flv | 同实际url | live, test110, | -// | | http://127.0.0.1:8080/a/b/c/d/test110.flv?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 | -// | rtmp sub拉流 | 同rtmp pub | . | . | -// | rtsp sub拉流 | 同rtsp pub | . | . | -// | httpts sub拉流 | 同httpflv sub,只是末尾的.flv换成.ts,不再赘述 | . | . | +// --------------------------------------------------------------------------------------------------------------------- -// IsUseClosedConnectionError 当connection处于这些情况时,就不需要再Close了 -// TODO(chef): 临时放这 -// TODO(chef): 目前暂时没有使用,因为connection支持多次调用Close -// -func IsUseClosedConnectionError(err error) bool { - if err == io.EOF || (err != nil && strings.Contains(err.Error(), "use of closed network connection")) { - return true - } - return false -} +const ( + // ProtocolRtmp StatSession.Protocol + ProtocolRtmp = "RTMP" + ProtocolRtsp = "RTSP" + ProtocolHttpflv = "FLV" + ProtocolHttpts = "TS" + + SessionBaseTypePub = "PUB" + SessionBaseTypeSub = "SUB" + SessionBaseTypePush = "PUSH" + SessionBaseTypePull = "PULL" +) type IClientSession interface { // PushSession: @@ -191,3 +156,51 @@ type IObject interface { } // TODO chef: rtmp.ClientSession修改为BaseClientSession更好些 + +// TODO(chef): [refactor] 整理 subsession 接口部分 IsFresh 和 ShouldWaitVideoKeyFrame + +// ----- group中,session Dispose表现记录 ----- +// +// Dispose结束后回调OnDel: +// rtmp.ServerSession(包含pub和sub) 1 +// rtsp.PubSession和rtsp.SubSession 1 +// rtmp.PullSession 2 +// httpflv.SubSession 3 +// httpts.SubSession 3 +// +// +// 情况1: 协议正常走完回调OnAdd,在自身server的RunLoop结束后,回调OnDel +// 情况2: 在group中pull阻塞结束后,手动回调OnDel +// 情况3: 在logic中sub RunLoop结束后,手动回调OnDel + +// TODO(chef): 整理所有Server类型Session的生命周期管理 +// - +// - rtmp没有独立的Pub、Sub Session结构体类型,而是直接使用ServerSession +// - write失败,需要反应到loop来 +// - rtsp是否也应该上层使用Command作为代理,避免生命周期管理混乱 +// + +// ISessionUrlContext 实际测试 +// +// | | 实际url | Url() | AppName, StreamName, RawQuery | +// | - | - | - | - | +// | rtmp pub推流 | rtmp://127.0.0.1:1935/live/test110 | 同实际url | live, test110, | +// | | rtmp://127.0.0.1:1935/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b, c/d/test110, p1=1&p2=2 | +// | rtsp pub推流 | rtsp://localhost:5544/live/test110 | 同实际url | live, test110, | +// | rtsp pub推流 | rtsp://localhost:5544/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 | +// | httpflv sub拉流 | http://127.0.0.1:8080/live/test110.flv | 同实际url | live, test110, | +// | | http://127.0.0.1:8080/a/b/c/d/test110.flv?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 | +// | rtmp sub拉流 | 同rtmp pub | . | . | +// | rtsp sub拉流 | 同rtsp pub | . | . | +// | httpts sub拉流 | 同httpflv sub,只是末尾的.flv换成.ts,不再赘述 | . | . | + +// IsUseClosedConnectionError 当connection处于这些情况时,就不需要再Close了 +// TODO(chef): 临时放这 +// TODO(chef): 目前暂时没有使用,因为connection支持多次调用Close +// +//func IsUseClosedConnectionError(err error) bool { +// if err == io.EOF || (err != nil && strings.Contains(err.Error(), "use of closed network connection")) { +// return true +// } +// return false +//} diff --git a/pkg/base/stat.go b/pkg/base/t_stat.go similarity index 94% rename from pkg/base/stat.go rename to pkg/base/t_stat.go index bb03f79..f4d444b 100644 --- a/pkg/base/stat.go +++ b/pkg/base/t_stat.go @@ -15,12 +15,6 @@ const ( // VideoCodecAvc StatGroup.VideoCodec VideoCodecAvc = "H264" VideoCodecHevc = "H265" - - // ProtocolRtmp StatSession.Protocol - ProtocolRtmp = "RTMP" - ProtocolRtsp = "RTSP" - ProtocolHttpflv = "HTTP-FLV" - ProtocolHttpts = "HTTP-TS" ) type StatGroup struct { diff --git a/pkg/base/unique.go b/pkg/base/t_unique.go similarity index 96% rename from pkg/base/unique.go rename to pkg/base/t_unique.go index de9db4d..97ce94f 100644 --- a/pkg/base/unique.go +++ b/pkg/base/t_unique.go @@ -12,17 +12,18 @@ import "github.com/q191201771/naza/pkg/unique" const ( UkPreCustomizePubSessionContext = "CUSTOMIZEPUB" - UkPreRtmpServerSession = "RTMPPUBSUB" + UkPreRtmpServerSession = "RTMPPUBSUB" // 两种可能,pub或者sub UkPreRtmpPushSession = "RTMPPUSH" UkPreRtmpPullSession = "RTMPPULL" - UkPreRtspServerCommandSession = "RTSPSRVCMD" UkPreRtspPubSession = "RTSPPUB" UkPreRtspSubSession = "RTSPSUB" UkPreRtspPushSession = "RTSPPUSH" UkPreRtspPullSession = "RTSPPULL" UkPreFlvSubSession = "FLVSUB" - UkPreTsSubSession = "TSSUB" UkPreFlvPullSession = "FLVPULL" + UkPreTsSubSession = "TSSUB" + + UkPreRtspServerCommandSession = "RTSPSRVCMD" // 这个不暴露给上层 UkPreGroup = "GROUP" UkPreHlsMuxer = "HLSMUXER" diff --git a/pkg/base/version.go b/pkg/base/t_version.go similarity index 98% rename from pkg/base/version.go rename to pkg/base/t_version.go index ac4df30..125170d 100644 --- a/pkg/base/version.go +++ b/pkg/base/t_version.go @@ -18,6 +18,10 @@ import "strings" // LalVersion 版本,该变量由外部脚本修改维护 const LalVersion = "v0.29.1" +const HttpApiVersion = "v0.3.1" + +const HttpNotifyVersion = "v0.1.1" + var ( LalLibraryName = "lal" LalGithubRepo = "github.com/q191201771/lal" diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 9628dc4..89a0ee6 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -12,14 +12,12 @@ import ( "flag" "fmt" "github.com/q191201771/naza/pkg/nazalog" - "math" "net/http" "os" "path/filepath" "sync" "time" - "github.com/q191201771/naza/pkg/bininfo" "github.com/q191201771/naza/pkg/defertaskthread" "github.com/q191201771/lal/pkg/hls" @@ -333,123 +331,7 @@ func (sm *ServerManager) Dispose() { sm.exitChan <- struct{}{} } -func (sm *ServerManager) StatLalInfo() base.LalInfo { - var lalInfo base.LalInfo - lalInfo.BinInfo = bininfo.StringifySingleLine() - lalInfo.LalVersion = base.LalVersion - lalInfo.ApiVersion = base.HttpApiVersion - lalInfo.NotifyVersion = base.HttpNotifyVersion - lalInfo.StartTime = sm.serverStartTime - lalInfo.ServerId = sm.config.ServerId - return lalInfo -} - -func (sm *ServerManager) StatAllGroup() (sgs []base.StatGroup) { - sm.mutex.Lock() - defer sm.mutex.Unlock() - sm.groupManager.Iterate(func(group *Group) bool { - sgs = append(sgs, group.GetStat(math.MaxInt32)) - return true - }) - return -} - -func (sm *ServerManager) StatGroup(streamName string) *base.StatGroup { - sm.mutex.Lock() - defer sm.mutex.Unlock() - g := sm.getGroup("", streamName) - if g == nil { - return nil - } - // copy - var ret base.StatGroup - ret = g.GetStat(math.MaxInt32) - return &ret -} - -func (sm *ServerManager) CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) (ret base.ApiCtrlStartRelayPull) { - sm.mutex.Lock() - defer sm.mutex.Unlock() - - streamName := info.StreamName - if streamName == "" { - ctx, err := base.ParseUrl(info.Url, -1) - if err != nil { - ret.ErrorCode = base.ErrorCodeStartRelayPullFail - ret.Desp = err.Error() - return - } - streamName = ctx.LastItemOfPath - } - - // 注意,如果group不存在,我们依然relay pull - g := sm.getOrCreateGroup("", streamName) - - sessionId, err := g.StartPull(info) - if err != nil { - ret.ErrorCode = base.ErrorCodeStartRelayPullFail - ret.Desp = err.Error() - } else { - ret.ErrorCode = base.ErrorCodeSucc - ret.Desp = base.DespSucc - ret.Data.StreamName = streamName - ret.Data.SessionId = sessionId - } - return -} - -// CtrlStopRelayPull -// -// TODO(chef): 整理错误值 -// -func (sm *ServerManager) CtrlStopRelayPull(streamName string) (ret base.ApiCtrlStopRelayPull) { - sm.mutex.Lock() - defer sm.mutex.Unlock() - - g := sm.getGroup("", streamName) - if g == nil { - ret.ErrorCode = base.ErrorCodeGroupNotFound - ret.Desp = base.DespGroupNotFound - return - } - - ret.Data.SessionId = g.StopPull() - if ret.Data.SessionId == "" { - ret.ErrorCode = base.ErrorCodeSessionNotFound - ret.Desp = base.DespSessionNotFound - return - } - - ret.ErrorCode = base.ErrorCodeSucc - ret.Desp = base.DespSucc - return -} - -// CtrlKickSession -// -// TODO(chef): refactor 不要返回http结果,返回error吧 -// -func (sm *ServerManager) CtrlKickSession(info base.ApiCtrlKickSession) base.HttpResponseBasic { - sm.mutex.Lock() - defer sm.mutex.Unlock() - g := sm.getGroup("", info.StreamName) - if g == nil { - return base.HttpResponseBasic{ - ErrorCode: base.ErrorCodeGroupNotFound, - Desp: base.DespGroupNotFound, - } - } - if !g.KickSession(info.SessionId) { - return base.HttpResponseBasic{ - ErrorCode: base.ErrorCodeSessionNotFound, - Desp: base.DespSessionNotFound, - } - } - return base.HttpResponseBasic{ - ErrorCode: base.ErrorCodeSucc, - Desp: base.DespSucc, - } -} +// --------------------------------------------------------------------------------------------------------------------- func (sm *ServerManager) AddCustomizePubSession(streamName string) (ICustomizePubSessionContext, error) { sm.mutex.Lock() diff --git a/pkg/logic/server_manager__api.go b/pkg/logic/server_manager__api.go new file mode 100644 index 0000000..200a05a --- /dev/null +++ b/pkg/logic/server_manager__api.go @@ -0,0 +1,129 @@ +package logic + +import ( + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/bininfo" + "math" +) + +// server_manager__api.go +// +// 支持http-api功能的部分 +// + +func (sm *ServerManager) StatLalInfo() base.LalInfo { + var lalInfo base.LalInfo + lalInfo.BinInfo = bininfo.StringifySingleLine() + lalInfo.LalVersion = base.LalVersion + lalInfo.ApiVersion = base.HttpApiVersion + lalInfo.NotifyVersion = base.HttpNotifyVersion + lalInfo.StartTime = sm.serverStartTime + lalInfo.ServerId = sm.config.ServerId + return lalInfo +} + +func (sm *ServerManager) StatAllGroup() (sgs []base.StatGroup) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + sm.groupManager.Iterate(func(group *Group) bool { + sgs = append(sgs, group.GetStat(math.MaxInt32)) + return true + }) + return +} + +func (sm *ServerManager) StatGroup(streamName string) *base.StatGroup { + sm.mutex.Lock() + defer sm.mutex.Unlock() + g := sm.getGroup("", streamName) + if g == nil { + return nil + } + // copy + var ret base.StatGroup + ret = g.GetStat(math.MaxInt32) + return &ret +} + +func (sm *ServerManager) CtrlStartRelayPull(info base.ApiCtrlStartRelayPullReq) (ret base.ApiCtrlStartRelayPull) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + + streamName := info.StreamName + if streamName == "" { + ctx, err := base.ParseUrl(info.Url, -1) + if err != nil { + ret.ErrorCode = base.ErrorCodeStartRelayPullFail + ret.Desp = err.Error() + return + } + streamName = ctx.LastItemOfPath + } + + // 注意,如果group不存在,我们依然relay pull + g := sm.getOrCreateGroup("", streamName) + + sessionId, err := g.StartPull(info) + if err != nil { + ret.ErrorCode = base.ErrorCodeStartRelayPullFail + ret.Desp = err.Error() + } else { + ret.ErrorCode = base.ErrorCodeSucc + ret.Desp = base.DespSucc + ret.Data.StreamName = streamName + ret.Data.SessionId = sessionId + } + return +} + +// CtrlStopRelayPull +// +// TODO(chef): 整理错误值 +// +func (sm *ServerManager) CtrlStopRelayPull(streamName string) (ret base.ApiCtrlStopRelayPull) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + + g := sm.getGroup("", streamName) + if g == nil { + ret.ErrorCode = base.ErrorCodeGroupNotFound + ret.Desp = base.DespGroupNotFound + return + } + + ret.Data.SessionId = g.StopPull() + if ret.Data.SessionId == "" { + ret.ErrorCode = base.ErrorCodeSessionNotFound + ret.Desp = base.DespSessionNotFound + return + } + + ret.ErrorCode = base.ErrorCodeSucc + ret.Desp = base.DespSucc + return +} + +// CtrlKickSession +// +// TODO(chef): refactor 不要返回http结果,返回error吧 +// +func (sm *ServerManager) CtrlKickSession(info base.ApiCtrlKickSession) (ret base.HttpResponseBasic) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + g := sm.getGroup("", info.StreamName) + if g == nil { + ret.ErrorCode = base.ErrorCodeGroupNotFound + ret.Desp = base.DespGroupNotFound + return + } + + if !g.KickSession(info.SessionId) { + ret.ErrorCode = base.ErrorCodeSessionNotFound + ret.Desp = base.DespSessionNotFound + return + } + + ret.ErrorCode = base.ErrorCodeSucc + ret.Desp = base.DespSucc + return +}