diff --git a/pkg/base/http_sub_session.go b/pkg/base/http_sub_session.go index 62c12dd..2d58e3a 100644 --- a/pkg/base/http_sub_session.go +++ b/pkg/base/http_sub_session.go @@ -15,14 +15,14 @@ import ( "github.com/q191201771/naza/pkg/connection" ) +// TODO(chef): refactor 更名为BasicHttpSubSession 202205 + type HttpSubSession struct { HttpSubSessionOption - suffix string - conn connection.Connection - prevConnStat connection.Stat - staleStat *connection.Stat - stat StatSession + suffix string + conn connection.Connection + sessionStat BasicSessionStat } type HttpSubSessionOption struct { @@ -39,11 +39,14 @@ func NewHttpSubSession(option HttpSubSessionOption) *HttpSubSession { s := &HttpSubSession{ HttpSubSessionOption: option, conn: connection.New(option.Conn, option.ConnModOption), - stat: StatSession{ - Protocol: option.Protocol, - SessionId: option.Uk, - StartTime: ReadableNowTime(), - RemoteAddr: option.Conn.RemoteAddr().String(), + sessionStat: BasicSessionStat{ + Stat: StatSession{ + SessionId: option.Uk, + Protocol: option.Protocol, + BaseType: SessionBaseTypeSubStr, + StartTime: ReadableNowTime(), + RemoteAddr: option.Conn.RemoteAddr().String(), + }, }, } return s @@ -112,9 +115,9 @@ func (session *HttpSubSession) AppName() string { func (session *HttpSubSession) StreamName() string { var suffix string switch session.Protocol { - case ProtocolHttpflv: + case SessionProtocolFlvStr: suffix = ".flv" - case ProtocolHttpts: + case SessionProtocolTsStr: suffix = ".ts" default: Log.Warnf("[%s] acquire stream name but protocol unknown.", session.Uk) @@ -126,39 +129,18 @@ func (session *HttpSubSession) RawQuery() string { return session.UrlCtx.RawQuery } -// --------------------------------------------------------------------------------------------------------------------- -// ISessionStat interface -// --------------------------------------------------------------------------------------------------------------------- +// ----- ISessionStat -------------------------------------------------------------------------------------------------- func (session *HttpSubSession) GetStat() StatSession { - currStat := session.conn.GetStat() - session.stat.ReadBytesSum = currStat.ReadBytesSum - session.stat.WroteBytesSum = currStat.WroteBytesSum - return session.stat + return session.sessionStat.GetStat() } func (session *HttpSubSession) UpdateStat(intervalSec uint32) { - currStat := session.conn.GetStat() - rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum - session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) - wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum - session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) - session.stat.Bitrate = session.stat.WriteBitrate - session.prevConnStat = currStat + session.sessionStat.UpdateStatWitchConn(session.conn, intervalSec) } func (session *HttpSubSession) IsAlive() (readAlive, writeAlive bool) { - currStat := session.conn.GetStat() - if session.staleStat == nil { - session.staleStat = new(connection.Stat) - *session.staleStat = currStat - return true, true - } - - readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0) - writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0) - *session.staleStat = currStat - return + return session.sessionStat.IsAliveWitchConn(session.conn) } // --------------------------------------------------------------------------------------------------------------------- diff --git a/pkg/base/t_session.go b/pkg/base/t_session.go index 2492272..17aff95 100644 --- a/pkg/base/t_session.go +++ b/pkg/base/t_session.go @@ -8,6 +8,11 @@ package base +import ( + "github.com/q191201771/naza/pkg/connection" + "github.com/q191201771/naza/pkg/nazalog" +) + // ----- 所有session ----- // // server.pub: rtmp(ServerSession), rtsp(PubSession) @@ -22,19 +27,158 @@ package base // --------------------------------------------------------------------------------------------------------------------- +// TODO(chef): [refactor] BasicSessionStat 放入单独的文件中 202205 + +// BasicSessionStat +// +// 两种方式,一种是通过外部的 connection.Connection 获取最新状态,一种是内部自己管理状态 +// +type BasicSessionStat struct { + Stat StatSession + + prevConnStat connection.Stat + staleStat *connection.Stat + + currConnStat connection.StatAtomic +} + +func (s *BasicSessionStat) AddReadBytes(n int) { + s.currConnStat.ReadBytesSum.Add(uint64(n)) +} + +func (s *BasicSessionStat) AddWriteBytes(n int) { + s.currConnStat.WroteBytesSum.Add(uint64(n)) +} + +func (s *BasicSessionStat) UpdateStat(intervalSec uint32) { + s.updateStat(s.currConnStat.ReadBytesSum.Load(), s.currConnStat.WroteBytesSum.Load(), s.Stat.BaseType, intervalSec) +} + +func (s *BasicSessionStat) UpdateStatWitchConn(conn connection.Connection, intervalSec uint32) { + currStat := conn.GetStat() + s.updateStat(currStat.ReadBytesSum, currStat.WroteBytesSum, s.Stat.BaseType, intervalSec) +} + +func (s *BasicSessionStat) GetStat() StatSession { + s.Stat.ReadBytesSum = s.currConnStat.ReadBytesSum.Load() + s.Stat.WroteBytesSum = s.currConnStat.WroteBytesSum.Load() + return s.Stat +} + +func (s *BasicSessionStat) GetStatWithConn(conn connection.Connection) StatSession { + connStat := conn.GetStat() + s.Stat.ReadBytesSum = connStat.ReadBytesSum + s.Stat.WroteBytesSum = connStat.WroteBytesSum + return s.Stat +} + +func (s *BasicSessionStat) IsAlive() (readAlive, writeAlive bool) { + return s.isAlive(s.currConnStat.ReadBytesSum.Load(), s.currConnStat.WroteBytesSum.Load()) +} + +func (s *BasicSessionStat) IsAliveWitchConn(conn connection.Connection) (readAlive, writeAlive bool) { + currStat := conn.GetStat() + return s.isAlive(currStat.ReadBytesSum, currStat.WroteBytesSum) +} + +func (s *BasicSessionStat) updateStat(readBytesSum, wroteBytesSum uint64, typ string, intervalSec uint32) { + rDiff := readBytesSum - s.prevConnStat.ReadBytesSum + s.Stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) + wDiff := wroteBytesSum - s.prevConnStat.WroteBytesSum + s.Stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) + + switch typ { + case SessionBaseTypePubStr, SessionBaseTypePullStr: + s.Stat.Bitrate = s.Stat.ReadBitrate + case SessionBaseTypeSubStr, SessionBaseTypePushStr: + s.Stat.Bitrate = s.Stat.WriteBitrate + default: + nazalog.Errorf("invalid session base type. type=%s", typ) + } + + s.prevConnStat.ReadBytesSum = readBytesSum + s.prevConnStat.WroteBytesSum = wroteBytesSum +} + +func (s *BasicSessionStat) isAlive(readBytesSum, wroteBytesSum uint64) (readAlive, writeAlive bool) { + if s.staleStat == nil { + s.staleStat = new(connection.Stat) + s.staleStat.ReadBytesSum = readBytesSum + s.staleStat.WroteBytesSum = wroteBytesSum + return true, true + } + + readAlive = !(readBytesSum-s.staleStat.ReadBytesSum == 0) + writeAlive = !(wroteBytesSum-s.staleStat.WroteBytesSum == 0) + s.staleStat.ReadBytesSum = readBytesSum + s.staleStat.WroteBytesSum = wroteBytesSum + return +} + +type ( + SessionProtocol int + + SessionBaseType int +) + const ( - // ProtocolRtmp StatSession.Protocol - ProtocolRtmp = "RTMP" - ProtocolRtsp = "RTSP" - ProtocolHttpflv = "FLV" - ProtocolHttpts = "TS" - - SessionBaseTypePub = "PUB" - SessionBaseTypeSub = "SUB" - SessionBaseTypePush = "PUSH" - SessionBaseTypePull = "PULL" + SessionProtocolCustomize = 1 + SessionProtocolRtmp = 2 + SessionProtocolRtsp = 3 + SessionProtocolFlv = 4 + SessionProtocolTs = 5 + + SessionBaseTypePubSub = 1 + SessionBaseTypePub = 2 + SessionBaseTypeSub = 3 + SessionBaseTypePush = 4 + SessionBaseTypePull = 5 + + SessionProtocolCustomizeStr = "CUSTOMIZE" + SessionProtocolRtmpStr = "RTMP" + SessionProtocolRtspStr = "RTSP" + SessionProtocolFlvStr = "FLV" + SessionProtocolTsStr = "TS" + + SessionBaseTypePubSubStr = "PUBSUB" + SessionBaseTypePubStr = "PUB" + SessionBaseTypeSubStr = "SUB" + SessionBaseTypePushStr = "PUSH" + SessionBaseTypePullStr = "PULL" ) +func (protocol SessionProtocol) Stringify() string { + switch protocol { + case SessionProtocolCustomize: + return SessionProtocolCustomizeStr + case SessionProtocolRtmp: + return SessionProtocolRtmpStr + case SessionProtocolRtsp: + return SessionProtocolRtspStr + case SessionProtocolFlv: + return SessionProtocolFlvStr + case SessionProtocolTs: + return SessionProtocolTsStr + } + return "INVALID" +} + +func (typ SessionBaseType) Stringify() string { + switch typ { + case SessionBaseTypePubSub: + return SessionBaseTypePubSubStr + case SessionBaseTypePub: + return SessionBaseTypePubStr + case SessionBaseTypeSub: + return SessionBaseTypeSubStr + case SessionBaseTypePush: + return SessionBaseTypePushStr + case SessionBaseTypePull: + return SessionBaseTypePullStr + } + return "INVALID" +} + type IClientSession interface { // PushSession: // Push() @@ -155,6 +299,67 @@ type IObject interface { UniqueKey() string } +type ISessionType interface { + Protocol() SessionProtocol + BaseType() SessionBaseType + + UniqueKey() string +} + +type SessionType struct { + protocol SessionProtocol + baseType SessionBaseType + uniqueKey string +} + +func NewSessionType(protocol SessionProtocol, typ SessionBaseType) SessionType { + var uk string + switch protocol { + case SessionProtocolCustomize: + if typ == SessionBaseTypePub { + uk = GenUkCustomizePubSession() + } + case SessionProtocolRtmp: + if typ == SessionBaseTypePubSub { + uk = GenUkRtmpServerSession() + } else if typ == SessionBaseTypePush { + uk = GenUkRtmpPushSession() + } else if typ == SessionBaseTypePull { + uk = GenUkRtmpPullSession() + } + case SessionProtocolRtsp: + if typ == SessionBaseTypePub { + uk = GenUkRtspPubSession() + } else if typ == SessionBaseTypeSub { + uk = GenUkRtspSubSession() + } else if typ == SessionBaseTypePush { + uk = GenUkRtspPushSession() + } else if typ == SessionBaseTypePull { + uk = GenUkRtspPullSession() + } + case SessionProtocolFlv: + if typ == SessionBaseTypeSub { + uk = GenUkFlvSubSession() + } else if typ == SessionBaseTypePull { + uk = GenUkFlvPullSession() + } + case SessionProtocolTs: + if typ == SessionBaseTypeSub { + uk = GenUkTsSubSession() + } + } + if uk == "" { + nazalog.Errorf("session type invalid. protocol=%s, typ=%s", protocol.Stringify(), typ.Stringify()) + uk = "INVALID" + } + + return SessionType{ + protocol: protocol, + baseType: typ, + uniqueKey: uk, + } +} + // TODO chef: rtmp.ClientSession修改为BaseClientSession更好些 // TODO(chef): [refactor] 整理 subsession 接口部分 IsFresh 和 ShouldWaitVideoKeyFrame diff --git a/pkg/base/t_stat.go b/pkg/base/t_stat.go index f4d444b..e2e9364 100644 --- a/pkg/base/t_stat.go +++ b/pkg/base/t_stat.go @@ -41,8 +41,10 @@ type StatPull struct { } type StatSession struct { - Protocol string `json:"protocol"` - SessionId string `json:"session_id"` + SessionId string `json:"session_id"` + Protocol string `json:"protocol"` + BaseType string `json:"base_type"` + RemoteAddr string `json:"remote_addr"` StartTime string `json:"start_time"` diff --git a/pkg/base/t_unique.go b/pkg/base/t_unique.go index 97ce94f..f3bb42d 100644 --- a/pkg/base/t_unique.go +++ b/pkg/base/t_unique.go @@ -11,17 +11,17 @@ package base import "github.com/q191201771/naza/pkg/unique" const ( - UkPreCustomizePubSessionContext = "CUSTOMIZEPUB" - UkPreRtmpServerSession = "RTMPPUBSUB" // 两种可能,pub或者sub - UkPreRtmpPushSession = "RTMPPUSH" - UkPreRtmpPullSession = "RTMPPULL" - UkPreRtspPubSession = "RTSPPUB" - UkPreRtspSubSession = "RTSPSUB" - UkPreRtspPushSession = "RTSPPUSH" - UkPreRtspPullSession = "RTSPPULL" - UkPreFlvSubSession = "FLVSUB" - UkPreFlvPullSession = "FLVPULL" - UkPreTsSubSession = "TSSUB" + UkPreCustomizePubSessionContext = SessionProtocolCustomizeStr + SessionBaseTypePubStr // "CUSTOMIZEPUB" + UkPreRtmpServerSession = SessionProtocolRtmpStr + SessionBaseTypePubSubStr // "RTMPPUBSUB" // 两种可能,pub或者sub + UkPreRtmpPushSession = SessionProtocolRtmpStr + SessionBaseTypePushStr // "RTMPPUSH" + UkPreRtmpPullSession = SessionProtocolRtmpStr + SessionBaseTypePullStr // "RTMPPULL" + UkPreRtspPubSession = SessionProtocolRtspStr + SessionBaseTypePubStr // "RTSPPUB" + UkPreRtspSubSession = SessionProtocolRtspStr + SessionBaseTypePubSubStr // "RTSPSUB" + UkPreRtspPushSession = SessionProtocolRtspStr + SessionBaseTypePushStr // "RTSPPUSH" + UkPreRtspPullSession = SessionProtocolRtspStr + SessionBaseTypePullStr // "RTSPPULL" + UkPreFlvSubSession = SessionProtocolFlvStr + SessionBaseTypePubSubStr // "FLVSUB" + UkPreFlvPullSession = SessionProtocolFlvStr + SessionBaseTypePullStr // "FLVPULL" + UkPreTsSubSession = SessionProtocolTsStr + SessionBaseTypePubSubStr // "TSSUB" UkPreRtspServerCommandSession = "RTSPSRVCMD" // 这个不暴露给上层 diff --git a/pkg/base/t_version.go b/pkg/base/t_version.go index 125170d..da10d7a 100644 --- a/pkg/base/t_version.go +++ b/pkg/base/t_version.go @@ -18,9 +18,9 @@ import "strings" // LalVersion 版本,该变量由外部脚本修改维护 const LalVersion = "v0.29.1" -const HttpApiVersion = "v0.3.1" +const HttpApiVersion = "v0.3.2" -const HttpNotifyVersion = "v0.1.1" +const HttpNotifyVersion = "v0.1.2" var ( LalLibraryName = "lal" diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 4469844..b3f75e9 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -41,10 +41,8 @@ type PullSession struct { uniqueKey string // const after ctor option PullSessionOption // const after ctor - conn connection.Connection - prevConnStat connection.Stat - staleStat *connection.Stat - stat base.StatSession + conn connection.Connection + sessionStat base.BasicSessionStat // TODO(chef): fix 没有初始化 202205 urlCtx base.UrlContext @@ -140,40 +138,25 @@ func (session *PullSession) UniqueKey() string { return session.uniqueKey } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + // UpdateStat 文档请参考: interface ISessionStat func (session *PullSession) UpdateStat(intervalSec uint32) { - currStat := session.conn.GetStat() - rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum - session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) - wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum - session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) - session.stat.Bitrate = session.stat.ReadBitrate - session.prevConnStat = currStat + session.sessionStat.UpdateStatWitchConn(session.conn, intervalSec) } // GetStat 文档请参考: interface ISessionStat func (session *PullSession) GetStat() base.StatSession { - connStat := session.conn.GetStat() - session.stat.ReadBytesSum = connStat.ReadBytesSum - session.stat.WroteBytesSum = connStat.WroteBytesSum - return session.stat + return session.sessionStat.GetStatWithConn(session.conn) } // IsAlive 文档请参考: interface ISessionStat func (session *PullSession) IsAlive() (readAlive, writeAlive bool) { - currStat := session.conn.GetStat() - if session.staleStat == nil { - session.staleStat = new(connection.Stat) - *session.staleStat = currStat - return true, true - } - - readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0) - writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0) - *session.staleStat = currStat - return + return session.sessionStat.IsAliveWitchConn(session.conn) } +// --------------------------------------------------------------------------------------------------------------------- + func (session *PullSession) pullContext(ctx context.Context, rawUrl string, onReadFlvTag OnReadFlvTag) error { errChan := make(chan error, 1) url := rawUrl diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 33c8661..3437e39 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -34,7 +34,7 @@ func NewSubSession(conn net.Conn, urlCtx base.UrlContext, isWebSocket bool, webs option.WriteTimeoutMs = SubSessionWriteTimeoutMs }, Uk: uk, - Protocol: base.ProtocolHttpflv, + Protocol: base.SessionProtocolFlvStr, UrlCtx: urlCtx, IsWebSocket: isWebSocket, WebSocketKey: websocketKey, @@ -107,9 +107,7 @@ func (session *SubSession) RawQuery() string { return session.core.RawQuery() } -// --------------------------------------------------------------------------------------------------------------------- -// ISessionStat interface -// --------------------------------------------------------------------------------------------------------------------- +// ----- ISessionStat -------------------------------------------------------------------------------------------------- func (session *SubSession) UpdateStat(intervalSec uint32) { session.core.UpdateStat(intervalSec) diff --git a/pkg/httpts/server_sub_session.go b/pkg/httpts/server_sub_session.go index c4672e1..f836095 100644 --- a/pkg/httpts/server_sub_session.go +++ b/pkg/httpts/server_sub_session.go @@ -33,7 +33,7 @@ func NewSubSession(conn net.Conn, urlCtx base.UrlContext, isWebSocket bool, webs option.WriteTimeoutMs = SubSessionWriteTimeoutMs }, Uk: uk, - Protocol: base.ProtocolHttpts, + Protocol: base.SessionProtocolTsStr, UrlCtx: urlCtx, IsWebSocket: isWebSocket, WebSocketKey: websocketKey, diff --git a/pkg/logic/group__in.go b/pkg/logic/group__in.go index 4c9a4fd..6bdef04 100644 --- a/pkg/logic/group__in.go +++ b/pkg/logic/group__in.go @@ -129,7 +129,7 @@ func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) error { var info base.PullStartInfo info.SessionId = session.UniqueKey() info.Url = session.Url() - info.Protocol = base.ProtocolRtmp + info.Protocol = session.GetStat().Protocol info.RemoteAddr = session.GetStat().RemoteAddr info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -160,7 +160,7 @@ func (group *Group) AddRtspPullSession(session *rtsp.PullSession) error { var info base.PullStartInfo info.SessionId = session.UniqueKey() info.Url = session.Url() - info.Protocol = base.ProtocolRtsp + info.Protocol = session.GetStat().Protocol info.RemoteAddr = session.GetStat().RemoteAddr info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -200,7 +200,7 @@ func (group *Group) DelRtmpPullSession(session *rtmp.PullSession) { var info base.PullStopInfo info.SessionId = session.UniqueKey() info.Url = session.Url() - info.Protocol = base.ProtocolRtmp + info.Protocol = session.GetStat().Protocol info.RemoteAddr = session.GetStat().RemoteAddr info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -218,7 +218,7 @@ func (group *Group) DelRtspPullSession(session *rtsp.PullSession) { var info base.PullStopInfo info.SessionId = session.UniqueKey() info.Url = session.Url() - info.Protocol = base.ProtocolRtsp + info.Protocol = session.GetStat().Protocol info.RemoteAddr = session.GetStat().RemoteAddr info.AppName = session.AppName() info.StreamName = session.StreamName() diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 89a0ee6..1230fcc 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -379,7 +379,7 @@ func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) error // TODO chef: 每次赋值都逐个拼,代码冗余,考虑直接用ISession抽离一下代码 var info base.PubStartInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolRtmp + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -416,7 +416,7 @@ func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) { var info base.PubStopInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolRtmp + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -434,7 +434,7 @@ func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) error var info base.SubStartInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolRtmp + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -468,7 +468,7 @@ func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) { var info base.SubStopInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolRtmp + info.Protocol = session.GetStat().Protocol info.AppName = session.AppName() info.StreamName = session.StreamName() info.UrlParam = session.RawQuery() @@ -487,7 +487,7 @@ func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) err var info base.SubStartInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolHttpflv + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -521,7 +521,7 @@ func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) { var info base.SubStopInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolHttpflv + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -539,7 +539,7 @@ func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) error var info base.SubStartInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolHttpts + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -575,7 +575,7 @@ func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) { var info base.SubStopInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolHttpts + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -603,7 +603,7 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) error { var info base.PubStartInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolRtsp + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -639,7 +639,7 @@ func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) { var info base.PubStopInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolRtsp + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -664,7 +664,7 @@ func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) error var info base.SubStartInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolRtsp + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() @@ -698,7 +698,7 @@ func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) { var info base.SubStopInfo info.ServerId = sm.config.ServerId - info.Protocol = base.ProtocolRtsp + info.Protocol = session.GetStat().Protocol info.Url = session.Url() info.AppName = session.AppName() info.StreamName = session.StreamName() diff --git a/pkg/logic/simple_auth.go b/pkg/logic/simple_auth.go index 19d37f2..4a16462 100644 --- a/pkg/logic/simple_auth.go +++ b/pkg/logic/simple_auth.go @@ -37,18 +37,18 @@ func NewSimpleAuthCtx(config SimpleAuthConfig) *SimpleAuthCtx { } func (s *SimpleAuthCtx) OnPubStart(info base.PubStartInfo) error { - if s.config.PubRtmpEnable && info.Protocol == base.ProtocolRtmp || - s.config.PubRtspEnable && info.Protocol == base.ProtocolRtsp { + if s.config.PubRtmpEnable && info.Protocol == base.SessionProtocolRtmpStr || + s.config.PubRtspEnable && info.Protocol == base.SessionProtocolRtspStr { return s.check(info.StreamName, info.UrlParam) } return nil } func (s *SimpleAuthCtx) OnSubStart(info base.SubStartInfo) error { - if (s.config.SubRtmpEnable && info.Protocol == base.ProtocolRtmp) || - (s.config.SubHttpflvEnable && info.Protocol == base.ProtocolHttpflv) || - (s.config.SubHttptsEnable && info.Protocol == base.ProtocolHttpts) || - (s.config.SubRtspEnable && info.Protocol == base.ProtocolRtsp) { + if (s.config.SubRtmpEnable && info.Protocol == base.SessionProtocolRtmpStr) || + (s.config.SubHttpflvEnable && info.Protocol == base.SessionProtocolFlvStr) || + (s.config.SubHttptsEnable && info.Protocol == base.SessionProtocolTsStr) || + (s.config.SubRtspEnable && info.Protocol == base.SessionProtocolRtspStr) { return s.check(info.StreamName, info.UrlParam) } return nil diff --git a/pkg/logic/simple_auth_test.go b/pkg/logic/simple_auth_test.go index 1093963..2cdd801 100644 --- a/pkg/logic/simple_auth_test.go +++ b/pkg/logic/simple_auth_test.go @@ -27,7 +27,7 @@ func TestSimpleAuthCtx(t *testing.T) { PubRtmpEnable: true, }) var info base.PubStartInfo - info.Protocol = base.ProtocolRtmp + info.Protocol = base.SessionProtocolRtmpStr info.StreamName = "test110" info.UrlParam = "lal_secret=700997e1595a06c9ffa60ebef79105b0" diff --git a/pkg/rtmp/client_pull_session.go b/pkg/rtmp/client_pull_session.go index 9a51acc..dd251ec 100644 --- a/pkg/rtmp/client_pull_session.go +++ b/pkg/rtmp/client_pull_session.go @@ -128,6 +128,8 @@ func (s *PullSession) UniqueKey() string { return s.core.uniqueKey } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + // GetStat 文档请参考: interface ISessionStat func (s *PullSession) GetStat() base.StatSession { return s.core.GetStat() diff --git a/pkg/rtmp/client_push_session.go b/pkg/rtmp/client_push_session.go index ffa35ce..f04d79c 100644 --- a/pkg/rtmp/client_push_session.go +++ b/pkg/rtmp/client_push_session.go @@ -114,6 +114,8 @@ func (s *PushSession) UniqueKey() string { return s.core.uniqueKey } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + // GetStat 文档请参考: interface ISessionStat func (s *PushSession) GetStat() base.StatSession { return s.core.GetStat() diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 1b9c50b..3af35c1 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -36,7 +36,7 @@ type ClientSession struct { // 只有PullSession使用 onReadRtmpAvMsg OnReadRtmpAvMsg - t ClientSessionType + t ClientSessionType // TODO(chef): refactor 使用basetype替代 202205 option ClientSessionOption packer *MessagePacker @@ -45,13 +45,12 @@ type ClientSession struct { hc IHandshakeClient conn connection.Connection - prevConnStat connection.Stat - staleStat *connection.Stat - stat base.StatSession doResultChan chan struct{} errChan chan error hasNotifyDoResultSucc bool + sessionStat base.BasicSessionStat + debugLogReadUserCtrlMsgCount int debugLogReadUserCtrlMsgMax int @@ -106,11 +105,14 @@ type ModClientSessionOption func(option *ClientSessionOption) // NewClientSession @param t: session的类型,只能是推或者拉 func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession { var uk string + var baseType string switch t { case CstPullSession: uk = base.GenUkRtmpPullSession() + baseType = base.SessionBaseTypePullStr case CstPushSession: uk = base.GenUkRtmpPushSession() + baseType = base.SessionBaseTypePushStr } option := defaultClientSessOption @@ -134,10 +136,13 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) doResultChan: make(chan struct{}, 1), packer: NewMessagePacker(), chunkComposer: NewChunkComposer(), - stat: base.StatSession{ - Protocol: base.ProtocolRtmp, - SessionId: uk, - StartTime: base.ReadableNowTime(), + sessionStat: base.BasicSessionStat{ + Stat: base.StatSession{ + SessionId: uk, + Protocol: base.SessionProtocolRtmpStr, + BaseType: baseType, + StartTime: base.ReadableNowTime(), + }, }, debugLogReadUserCtrlMsgMax: 5, hc: hc, @@ -224,42 +229,22 @@ func (s *ClientSession) UniqueKey() string { return s.uniqueKey } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + func (s *ClientSession) GetStat() base.StatSession { - connStat := s.conn.GetStat() - s.stat.ReadBytesSum = connStat.ReadBytesSum - s.stat.WroteBytesSum = connStat.WroteBytesSum - return s.stat + return s.sessionStat.GetStatWithConn(s.conn) } func (s *ClientSession) UpdateStat(intervalSec uint32) { - currStat := s.conn.GetStat() - rDiff := currStat.ReadBytesSum - s.prevConnStat.ReadBytesSum - s.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) - wDiff := currStat.WroteBytesSum - s.prevConnStat.WroteBytesSum - s.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) - switch s.t { - case CstPushSession: - s.stat.Bitrate = s.stat.WriteBitrate - case CstPullSession: - s.stat.Bitrate = s.stat.ReadBitrate - } - s.prevConnStat = currStat + s.sessionStat.UpdateStatWitchConn(s.conn, intervalSec) } func (s *ClientSession) IsAlive() (readAlive, writeAlive bool) { - currStat := s.conn.GetStat() - if s.staleStat == nil { - s.staleStat = new(connection.Stat) - *s.staleStat = currStat - return true, true - } - - readAlive = !(currStat.ReadBytesSum-s.staleStat.ReadBytesSum == 0) - writeAlive = !(currStat.WroteBytesSum-s.staleStat.WroteBytesSum == 0) - *s.staleStat = currStat - return + return s.sessionStat.IsAliveWitchConn(s.conn) } +// --------------------------------------------------------------------------------------------------------------------- + func (s *ClientSession) connect() { if err := s.tcpConnect(); err != nil { s.errChan <- err @@ -328,7 +313,7 @@ func (s *ClientSession) tcpConnect() error { Log.Infof("[%s] > tcp connect.", s.uniqueKey) var err error - s.stat.RemoteAddr = s.urlCtx.HostWithPort + s.sessionStat.Stat.RemoteAddr = s.urlCtx.HostWithPort var conn net.Conn if s.urlCtx.Scheme == "rtmps" { diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index d4feaab..ba6e5df 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -65,15 +65,13 @@ type ServerSession struct { rawQuery string //const after set observer IServerSessionObserver - t ServerSessionType + t ServerSessionType // TODO(chef): refactor 改用 sessionStat.Stat.BaseType 202205 hs HandshakeServer chunkComposer *ChunkComposer packer *MessagePacker - conn connection.Connection - prevConnStat connection.Stat - staleStat *connection.Stat - stat base.StatSession + conn connection.Connection + sessionStat base.BasicSessionStat // only for PubSession avObserver IPubSessionObserver @@ -103,11 +101,14 @@ func NewServerSession(observer IServerSessionObserver, conn net.Conn) *ServerSes conn: connection.New(conn, func(option *connection.Option) { option.ReadBufSize = readBufSize }), - stat: base.StatSession{ - Protocol: base.ProtocolRtmp, - SessionId: uk, - StartTime: base.ReadableNowTime(), - RemoteAddr: conn.RemoteAddr().String(), + sessionStat: base.BasicSessionStat{ + Stat: base.StatSession{ + SessionId: uk, + Protocol: base.SessionProtocolRtmpStr, + BaseType: base.SessionBaseTypePubSubStr, + StartTime: base.ReadableNowTime(), + RemoteAddr: conn.RemoteAddr().String(), + }, }, uniqueKey: uk, observer: observer, @@ -171,42 +172,22 @@ func (s *ServerSession) UniqueKey() string { return s.uniqueKey } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + func (s *ServerSession) UpdateStat(intervalSec uint32) { - currStat := s.conn.GetStat() - rDiff := currStat.ReadBytesSum - s.prevConnStat.ReadBytesSum - s.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) - wDiff := currStat.WroteBytesSum - s.prevConnStat.WroteBytesSum - s.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) - switch s.t { - case ServerSessionTypePub: - s.stat.Bitrate = s.stat.ReadBitrate - case ServerSessionTypeSub: - s.stat.Bitrate = s.stat.WriteBitrate - } - s.prevConnStat = currStat + s.sessionStat.UpdateStatWitchConn(s.conn, intervalSec) } func (s *ServerSession) GetStat() base.StatSession { - connStat := s.conn.GetStat() - s.stat.ReadBytesSum = connStat.ReadBytesSum - s.stat.WroteBytesSum = connStat.WroteBytesSum - return s.stat + return s.sessionStat.GetStatWithConn(s.conn) } func (s *ServerSession) IsAlive() (readAlive, writeAlive bool) { - currStat := s.conn.GetStat() - if s.staleStat == nil { - s.staleStat = new(connection.Stat) - *s.staleStat = currStat - return true, true - } - - readAlive = !(currStat.ReadBytesSum-s.staleStat.ReadBytesSum == 0) - writeAlive = !(currStat.WroteBytesSum-s.staleStat.WroteBytesSum == 0) - *s.staleStat = currStat - return + return s.sessionStat.IsAliveWitchConn(s.conn) } +// --------------------------------------------------------------------------------------------------------------------- + func (s *ServerSession) runReadLoop() error { return s.chunkComposer.RunLoop(s.conn, s.doMsg) } @@ -451,6 +432,7 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) { s.modConnProps() s.t = ServerSessionTypePub + s.sessionStat.Stat.BaseType = base.SessionBaseTypePubStr err = s.observer.OnNewRtmpPubSession(s) if err != nil { s.DisposeByObserverFlag = true @@ -493,6 +475,7 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) { s.modConnProps() s.t = ServerSessionTypeSub + s.sessionStat.Stat.BaseType = base.SessionBaseTypeSubStr err = s.observer.OnNewRtmpSubSession(s) if err != nil { s.DisposeByObserverFlag = true diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index 4396889..172a4f6 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -21,7 +21,6 @@ import ( "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/rtprtcp" "github.com/q191201771/lal/pkg/sdp" - "github.com/q191201771/naza/pkg/connection" "github.com/q191201771/naza/pkg/nazanet" ) @@ -62,10 +61,7 @@ type BaseInSession struct { videoRtpChannel int videoRtcpChannel int - currConnStat connection.StatAtomic - prevConnStat connection.Stat - staleStat *connection.Stat - stat base.StatSession + sessionStat base.BasicSessionStat mu sync.Mutex sdpCtx sdp.LogicContext // const after set @@ -88,13 +84,16 @@ type BaseInSession struct { dumpReadSr base.LogDump } -func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *BaseInSession { +func NewBaseInSession(uniqueKey string, sessionBaseType string, cmdSession IInterleavedPacketWriter) *BaseInSession { s := &BaseInSession{ uniqueKey: uniqueKey, - stat: base.StatSession{ - Protocol: base.ProtocolRtsp, - SessionId: uniqueKey, - StartTime: base.ReadableNowTime(), + sessionStat: base.BasicSessionStat{ + Stat: base.StatSession{ + SessionId: uniqueKey, + Protocol: base.SessionProtocolRtspStr, + BaseType: sessionBaseType, + StartTime: base.ReadableNowTime(), + }, }, cmdSession: cmdSession, waitChan: make(chan error, 1), @@ -106,8 +105,8 @@ func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *Ba return s } -func NewBaseInSessionWithObserver(uniqueKey string, cmdSession IInterleavedPacketWriter, observer IBaseInSessionObserver) *BaseInSession { - s := NewBaseInSession(uniqueKey, cmdSession) +func NewBaseInSessionWithObserver(uniqueKey string, sessionBaseType string, cmdSession IInterleavedPacketWriter, observer IBaseInSessionObserver) *BaseInSession { + s := NewBaseInSession(uniqueKey, sessionBaseType, cmdSession) s.observer = observer return s } @@ -240,41 +239,22 @@ func (session *BaseInSession) WriteRtpRtcpDummy() { } } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + func (session *BaseInSession) GetStat() base.StatSession { - session.stat.ReadBytesSum = session.currConnStat.ReadBytesSum.Load() - session.stat.WroteBytesSum = session.currConnStat.WroteBytesSum.Load() - return session.stat + return session.sessionStat.GetStat() } func (session *BaseInSession) UpdateStat(intervalSec uint32) { - readBytesSum := session.currConnStat.ReadBytesSum.Load() - wroteBytesSum := session.currConnStat.WroteBytesSum.Load() - rDiff := readBytesSum - session.prevConnStat.ReadBytesSum - session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) - wDiff := wroteBytesSum - session.prevConnStat.WroteBytesSum - session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) - session.stat.Bitrate = session.stat.ReadBitrate - session.prevConnStat.ReadBytesSum = readBytesSum - session.prevConnStat.WroteBytesSum = wroteBytesSum + session.sessionStat.UpdateStat(intervalSec) } func (session *BaseInSession) IsAlive() (readAlive, writeAlive bool) { - readBytesSum := session.currConnStat.ReadBytesSum.Load() - wroteBytesSum := session.currConnStat.WroteBytesSum.Load() - if session.staleStat == nil { - session.staleStat = new(connection.Stat) - session.staleStat.ReadBytesSum = readBytesSum - session.staleStat.WroteBytesSum = wroteBytesSum - return true, true - } - - readAlive = !(readBytesSum-session.staleStat.ReadBytesSum == 0) - writeAlive = !(wroteBytesSum-session.staleStat.WroteBytesSum == 0) - session.staleStat.ReadBytesSum = readBytesSum - session.staleStat.WroteBytesSum = wroteBytesSum - return + return session.sessionStat.IsAlive() } +// --------------------------------------------------------------------------------------------------------------------- + func (session *BaseInSession) UniqueKey() string { return session.uniqueKey } @@ -323,7 +303,7 @@ func (session *BaseInSession) onReadRtcpPacket(b []byte, rAddr *net.UDPAddr, err // @param rAddr 对端地址,往对端发送数据时使用,注意,如果nil,则表示是interleaved模式,我们直接往TCP连接发数据 func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) error { - session.currConnStat.ReadBytesSum.Add(uint64(len(b))) + session.sessionStat.AddReadBytes(len(b)) if len(b) <= 0 { Log.Errorf("[%s] handleRtcpPacket but length invalid. len=%d", session.uniqueKey, len(b)) @@ -350,7 +330,7 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err } else { _ = session.cmdSession.WriteInterleavedPacket(rrBuf, session.audioRtcpChannel) } - session.currConnStat.WroteBytesSum.Add(uint64(len(b))) + session.sessionStat.AddWriteBytes(len(b)) } case session.videoSsrc.Load(): session.mu.Lock() @@ -362,7 +342,7 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err } else { _ = session.cmdSession.WriteInterleavedPacket(rrBuf, session.videoRtcpChannel) } - session.currConnStat.WroteBytesSum.Add(uint64(len(b))) + session.sessionStat.AddWriteBytes(len(b)) } default: // noop @@ -381,7 +361,7 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err } func (session *BaseInSession) handleRtpPacket(b []byte) error { - session.currConnStat.ReadBytesSum.Add(uint64(len(b))) + session.sessionStat.AddReadBytes(len(b)) if len(b) < rtprtcp.RtpFixedHeaderLength { Log.Errorf("[%s] handleRtpPacket but length invalid. len=%d", session.uniqueKey, len(b)) diff --git a/pkg/rtsp/base_out_session.go b/pkg/rtsp/base_out_session.go index f03b6df..f8a9801 100644 --- a/pkg/rtsp/base_out_session.go +++ b/pkg/rtsp/base_out_session.go @@ -21,7 +21,6 @@ import ( "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/sdp" - "github.com/q191201771/naza/pkg/connection" "github.com/q191201771/naza/pkg/nazanet" ) @@ -42,10 +41,7 @@ type BaseOutSession struct { videoRtpChannel int videoRtcpChannel int - stat base.StatSession - currConnStat connection.StatAtomic - prevConnStat connection.Stat - staleStat *connection.Stat + sessionStat base.BasicSessionStat // only for debug log debugLogMaxCount int @@ -58,14 +54,17 @@ type BaseOutSession struct { waitChan chan error } -func NewBaseOutSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *BaseOutSession { +func NewBaseOutSession(uniqueKey string, sessionBaseType string, cmdSession IInterleavedPacketWriter) *BaseOutSession { s := &BaseOutSession{ uniqueKey: uniqueKey, cmdSession: cmdSession, - stat: base.StatSession{ - Protocol: base.ProtocolRtsp, - SessionId: uniqueKey, - StartTime: base.ReadableNowTime(), + sessionStat: base.BasicSessionStat{ + Stat: base.StatSession{ + SessionId: uniqueKey, + Protocol: base.SessionProtocolRtspStr, + BaseType: sessionBaseType, + StartTime: base.ReadableNowTime(), + }, }, audioRtpChannel: -1, videoRtpChannel: -1, @@ -181,51 +180,34 @@ func (session *BaseOutSession) WriteRtpPacket(packet rtprtcp.RtpPacket) error { } if err == nil { - session.currConnStat.WroteBytesSum.Add(uint64(len(packet.Raw))) + session.sessionStat.AddWriteBytes(len(packet.Raw)) } return err } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + func (session *BaseOutSession) GetStat() base.StatSession { - session.stat.ReadBytesSum = session.currConnStat.ReadBytesSum.Load() - session.stat.WroteBytesSum = session.currConnStat.WroteBytesSum.Load() - return session.stat + return session.sessionStat.GetStat() } func (session *BaseOutSession) UpdateStat(intervalSec uint32) { - readBytesSum := session.currConnStat.ReadBytesSum.Load() - wroteBytesSum := session.currConnStat.WroteBytesSum.Load() - rDiff := readBytesSum - session.prevConnStat.ReadBytesSum - session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) - wDiff := wroteBytesSum - session.prevConnStat.WroteBytesSum - session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) - session.stat.Bitrate = session.stat.WriteBitrate - session.prevConnStat.ReadBytesSum = readBytesSum - session.prevConnStat.WroteBytesSum = wroteBytesSum + session.sessionStat.UpdateStat(intervalSec) } func (session *BaseOutSession) IsAlive() (readAlive, writeAlive bool) { - readBytesSum := session.currConnStat.ReadBytesSum.Load() - wroteBytesSum := session.currConnStat.WroteBytesSum.Load() - if session.staleStat == nil { - session.staleStat = new(connection.Stat) - session.staleStat.ReadBytesSum = readBytesSum - session.staleStat.WroteBytesSum = wroteBytesSum - return true, true - } - - readAlive = !(readBytesSum-session.staleStat.ReadBytesSum == 0) - writeAlive = !(wroteBytesSum-session.staleStat.WroteBytesSum == 0) - session.staleStat.ReadBytesSum = readBytesSum - session.staleStat.WroteBytesSum = wroteBytesSum - return + return session.sessionStat.IsAlive() } +// --------------------------------------------------------------------------------------------------------------------- + func (session *BaseOutSession) UniqueKey() string { return session.uniqueKey } func (session *BaseOutSession) onReadRtpPacket(b []byte, rAddr *net.UDPAddr, err error) bool { + // TODO(chef): [fix] 在收到rtp和rtcp的地方,加入stat统计 202205 + if session.loggedReadRtpCount.Load() < int32(session.debugLogMaxCount) { Log.Debugf("[%s] LOGPACKET. read rtp=%s", session.uniqueKey, hex.Dump(nazabytes.Prefix(b, 32))) session.loggedReadRtpCount.Increment() diff --git a/pkg/rtsp/client_pull_session.go b/pkg/rtsp/client_pull_session.go index cdb88a0..b12adab 100644 --- a/pkg/rtsp/client_pull_session.go +++ b/pkg/rtsp/client_pull_session.go @@ -66,7 +66,7 @@ func NewPullSession(observer IPullSessionObserver, modOptions ...ModPullSessionO opt.DoTimeoutMs = option.PullTimeoutMs opt.OverTcp = option.OverTcp }) - baseInSession := NewBaseInSessionWithObserver(uk, s, observer) + baseInSession := NewBaseInSessionWithObserver(uk, base.SessionBaseTypePullStr, s, observer) s.baseInSession = baseInSession s.cmdSession = cmdSession Log.Infof("[%s] lifecycle new rtsp PullSession. session=%p", uk, s) @@ -179,6 +179,8 @@ func (session *PullSession) UniqueKey() string { return session.uniqueKey } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + // GetStat 文档请参考: interface ISessionStat func (session *PullSession) GetStat() base.StatSession { stat := session.baseInSession.GetStat() @@ -196,6 +198,8 @@ func (session *PullSession) IsAlive() (readAlive, writeAlive bool) { return session.baseInSession.IsAlive() } +// --------------------------------------------------------------------------------------------------------------------- + // OnConnectResult IClientCommandSessionObserver, callback by ClientCommandSession func (session *PullSession) OnConnectResult() { // noop diff --git a/pkg/rtsp/client_push_session.go b/pkg/rtsp/client_push_session.go index e3767c4..84d4021 100644 --- a/pkg/rtsp/client_push_session.go +++ b/pkg/rtsp/client_push_session.go @@ -54,7 +54,7 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession { opt.DoTimeoutMs = option.PushTimeoutMs opt.OverTcp = option.OverTcp }) - baseOutSession := NewBaseOutSession(uk, s) + baseOutSession := NewBaseOutSession(uk, base.SessionBaseTypePushStr, s) s.cmdSession = cmdSession s.baseOutSession = baseOutSession Log.Infof("[%s] lifecycle new rtsp PushSession. session=%p", uk, s) @@ -161,6 +161,8 @@ func (session *PushSession) UniqueKey() string { return session.uniqueKey } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + // GetStat 文档请参考: interface ISessionStat func (session *PushSession) GetStat() base.StatSession { stat := session.baseOutSession.GetStat() @@ -178,6 +180,8 @@ func (session *PushSession) IsAlive() (readAlive, writeAlive bool) { return session.baseOutSession.IsAlive() } +// --------------------------------------------------------------------------------------------------------------------- + // OnConnectResult IClientCommandSessionObserver, callback by ClientCommandSession func (session *PushSession) OnConnectResult() { // noop diff --git a/pkg/rtsp/server_command_session.go b/pkg/rtsp/server_command_session.go index e342e1a..853afb7 100644 --- a/pkg/rtsp/server_command_session.go +++ b/pkg/rtsp/server_command_session.go @@ -103,6 +103,8 @@ func (session *ServerCommandSession) RemoteAddr() string { // ----- ISessionStat -------------------------------------------------------------------------------------------------- func (session *ServerCommandSession) UpdateStat(intervalSec uint32) { + // TODO(chef): 梳理interleaved模式下,command session的ISessionStat 202205 + currStat := session.conn.GetStat() rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum session.stat.Bitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) diff --git a/pkg/rtsp/server_pub_session.go b/pkg/rtsp/server_pub_session.go index 9a825ea..58eb20b 100644 --- a/pkg/rtsp/server_pub_session.go +++ b/pkg/rtsp/server_pub_session.go @@ -36,7 +36,7 @@ func NewPubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *Pu urlCtx: urlCtx, cmdSession: cmdSession, } - baseInSession := NewBaseInSession(uk, s) + baseInSession := NewBaseInSession(uk, base.SessionBaseTypePubStr, s) s.baseInSession = baseInSession Log.Infof("[%s] lifecycle new rtsp PubSession. session=%p, streamName=%s", uk, s, urlCtx.LastItemOfPath) return s @@ -93,6 +93,8 @@ func (session *PubSession) UniqueKey() string { return session.uniqueKey } +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + func (session *PubSession) GetStat() base.StatSession { stat := session.baseInSession.GetStat() stat.RemoteAddr = session.cmdSession.RemoteAddr() @@ -107,6 +109,8 @@ func (session *PubSession) IsAlive() (readAlive, writeAlive bool) { return session.baseInSession.IsAlive() } +// --------------------------------------------------------------------------------------------------------------------- + // WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseInSession func (session *PubSession) WriteInterleavedPacket(packet []byte, channel int) error { return session.cmdSession.WriteInterleavedPacket(packet, channel) diff --git a/pkg/rtsp/server_sub_session.go b/pkg/rtsp/server_sub_session.go index e6471dd..f0f8a13 100644 --- a/pkg/rtsp/server_sub_session.go +++ b/pkg/rtsp/server_sub_session.go @@ -35,7 +35,7 @@ func NewSubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *Su ShouldWaitVideoKeyFrame: true, } - baseOutSession := NewBaseOutSession(uk, s) + baseOutSession := NewBaseOutSession(uk, base.SessionBaseTypeSubStr, s) s.baseOutSession = baseOutSession Log.Infof("[%s] lifecycle new rtsp SubSession. session=%p, streamName=%s", uk, s, urlCtx.LastItemOfPath) return s