[refactor] 将所有session的ISessionStat的实现聚合到BasicSessionStat

pull/180/head
q191201771 3 years ago
parent 56033db6e8
commit 5ef6241749

@ -15,14 +15,14 @@ import (
"github.com/q191201771/naza/pkg/connection"
)
// TODO(chef): refactor 更名为BasicHttpSubSession 202205
type HttpSubSession struct {
HttpSubSessionOption
suffix string
conn connection.Connection
prevConnStat connection.Stat
staleStat *connection.Stat
stat StatSession
suffix string
conn connection.Connection
sessionStat BasicSessionStat
}
type HttpSubSessionOption struct {
@ -39,11 +39,14 @@ func NewHttpSubSession(option HttpSubSessionOption) *HttpSubSession {
s := &HttpSubSession{
HttpSubSessionOption: option,
conn: connection.New(option.Conn, option.ConnModOption),
stat: StatSession{
Protocol: option.Protocol,
SessionId: option.Uk,
StartTime: ReadableNowTime(),
RemoteAddr: option.Conn.RemoteAddr().String(),
sessionStat: BasicSessionStat{
Stat: StatSession{
SessionId: option.Uk,
Protocol: option.Protocol,
BaseType: SessionBaseTypeSubStr,
StartTime: ReadableNowTime(),
RemoteAddr: option.Conn.RemoteAddr().String(),
},
},
}
return s
@ -112,9 +115,9 @@ func (session *HttpSubSession) AppName() string {
func (session *HttpSubSession) StreamName() string {
var suffix string
switch session.Protocol {
case ProtocolHttpflv:
case SessionProtocolFlvStr:
suffix = ".flv"
case ProtocolHttpts:
case SessionProtocolTsStr:
suffix = ".ts"
default:
Log.Warnf("[%s] acquire stream name but protocol unknown.", session.Uk)
@ -126,39 +129,18 @@ func (session *HttpSubSession) RawQuery() string {
return session.UrlCtx.RawQuery
}
// ---------------------------------------------------------------------------------------------------------------------
// ISessionStat interface
// ---------------------------------------------------------------------------------------------------------------------
// ----- ISessionStat --------------------------------------------------------------------------------------------------
func (session *HttpSubSession) GetStat() StatSession {
currStat := session.conn.GetStat()
session.stat.ReadBytesSum = currStat.ReadBytesSum
session.stat.WroteBytesSum = currStat.WroteBytesSum
return session.stat
return session.sessionStat.GetStat()
}
func (session *HttpSubSession) UpdateStat(intervalSec uint32) {
currStat := session.conn.GetStat()
rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum
session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum
session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
session.stat.Bitrate = session.stat.WriteBitrate
session.prevConnStat = currStat
session.sessionStat.UpdateStatWitchConn(session.conn, intervalSec)
}
func (session *HttpSubSession) IsAlive() (readAlive, writeAlive bool) {
currStat := session.conn.GetStat()
if session.staleStat == nil {
session.staleStat = new(connection.Stat)
*session.staleStat = currStat
return true, true
}
readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0)
writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0)
*session.staleStat = currStat
return
return session.sessionStat.IsAliveWitchConn(session.conn)
}
// ---------------------------------------------------------------------------------------------------------------------

@ -8,6 +8,11 @@
package base
import (
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazalog"
)
// ----- 所有session -----
//
// server.pub: rtmp(ServerSession), rtsp(PubSession)
@ -22,19 +27,158 @@ package base
// ---------------------------------------------------------------------------------------------------------------------
// TODO(chef): [refactor] BasicSessionStat 放入单独的文件中 202205
// BasicSessionStat
//
// 两种方式,一种是通过外部的 connection.Connection 获取最新状态,一种是内部自己管理状态
//
type BasicSessionStat struct {
Stat StatSession
prevConnStat connection.Stat
staleStat *connection.Stat
currConnStat connection.StatAtomic
}
func (s *BasicSessionStat) AddReadBytes(n int) {
s.currConnStat.ReadBytesSum.Add(uint64(n))
}
func (s *BasicSessionStat) AddWriteBytes(n int) {
s.currConnStat.WroteBytesSum.Add(uint64(n))
}
func (s *BasicSessionStat) UpdateStat(intervalSec uint32) {
s.updateStat(s.currConnStat.ReadBytesSum.Load(), s.currConnStat.WroteBytesSum.Load(), s.Stat.BaseType, intervalSec)
}
func (s *BasicSessionStat) UpdateStatWitchConn(conn connection.Connection, intervalSec uint32) {
currStat := conn.GetStat()
s.updateStat(currStat.ReadBytesSum, currStat.WroteBytesSum, s.Stat.BaseType, intervalSec)
}
func (s *BasicSessionStat) GetStat() StatSession {
s.Stat.ReadBytesSum = s.currConnStat.ReadBytesSum.Load()
s.Stat.WroteBytesSum = s.currConnStat.WroteBytesSum.Load()
return s.Stat
}
func (s *BasicSessionStat) GetStatWithConn(conn connection.Connection) StatSession {
connStat := conn.GetStat()
s.Stat.ReadBytesSum = connStat.ReadBytesSum
s.Stat.WroteBytesSum = connStat.WroteBytesSum
return s.Stat
}
func (s *BasicSessionStat) IsAlive() (readAlive, writeAlive bool) {
return s.isAlive(s.currConnStat.ReadBytesSum.Load(), s.currConnStat.WroteBytesSum.Load())
}
func (s *BasicSessionStat) IsAliveWitchConn(conn connection.Connection) (readAlive, writeAlive bool) {
currStat := conn.GetStat()
return s.isAlive(currStat.ReadBytesSum, currStat.WroteBytesSum)
}
func (s *BasicSessionStat) updateStat(readBytesSum, wroteBytesSum uint64, typ string, intervalSec uint32) {
rDiff := readBytesSum - s.prevConnStat.ReadBytesSum
s.Stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := wroteBytesSum - s.prevConnStat.WroteBytesSum
s.Stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
switch typ {
case SessionBaseTypePubStr, SessionBaseTypePullStr:
s.Stat.Bitrate = s.Stat.ReadBitrate
case SessionBaseTypeSubStr, SessionBaseTypePushStr:
s.Stat.Bitrate = s.Stat.WriteBitrate
default:
nazalog.Errorf("invalid session base type. type=%s", typ)
}
s.prevConnStat.ReadBytesSum = readBytesSum
s.prevConnStat.WroteBytesSum = wroteBytesSum
}
func (s *BasicSessionStat) isAlive(readBytesSum, wroteBytesSum uint64) (readAlive, writeAlive bool) {
if s.staleStat == nil {
s.staleStat = new(connection.Stat)
s.staleStat.ReadBytesSum = readBytesSum
s.staleStat.WroteBytesSum = wroteBytesSum
return true, true
}
readAlive = !(readBytesSum-s.staleStat.ReadBytesSum == 0)
writeAlive = !(wroteBytesSum-s.staleStat.WroteBytesSum == 0)
s.staleStat.ReadBytesSum = readBytesSum
s.staleStat.WroteBytesSum = wroteBytesSum
return
}
type (
SessionProtocol int
SessionBaseType int
)
const (
// ProtocolRtmp StatSession.Protocol
ProtocolRtmp = "RTMP"
ProtocolRtsp = "RTSP"
ProtocolHttpflv = "FLV"
ProtocolHttpts = "TS"
SessionBaseTypePub = "PUB"
SessionBaseTypeSub = "SUB"
SessionBaseTypePush = "PUSH"
SessionBaseTypePull = "PULL"
SessionProtocolCustomize = 1
SessionProtocolRtmp = 2
SessionProtocolRtsp = 3
SessionProtocolFlv = 4
SessionProtocolTs = 5
SessionBaseTypePubSub = 1
SessionBaseTypePub = 2
SessionBaseTypeSub = 3
SessionBaseTypePush = 4
SessionBaseTypePull = 5
SessionProtocolCustomizeStr = "CUSTOMIZE"
SessionProtocolRtmpStr = "RTMP"
SessionProtocolRtspStr = "RTSP"
SessionProtocolFlvStr = "FLV"
SessionProtocolTsStr = "TS"
SessionBaseTypePubSubStr = "PUBSUB"
SessionBaseTypePubStr = "PUB"
SessionBaseTypeSubStr = "SUB"
SessionBaseTypePushStr = "PUSH"
SessionBaseTypePullStr = "PULL"
)
func (protocol SessionProtocol) Stringify() string {
switch protocol {
case SessionProtocolCustomize:
return SessionProtocolCustomizeStr
case SessionProtocolRtmp:
return SessionProtocolRtmpStr
case SessionProtocolRtsp:
return SessionProtocolRtspStr
case SessionProtocolFlv:
return SessionProtocolFlvStr
case SessionProtocolTs:
return SessionProtocolTsStr
}
return "INVALID"
}
func (typ SessionBaseType) Stringify() string {
switch typ {
case SessionBaseTypePubSub:
return SessionBaseTypePubSubStr
case SessionBaseTypePub:
return SessionBaseTypePubStr
case SessionBaseTypeSub:
return SessionBaseTypeSubStr
case SessionBaseTypePush:
return SessionBaseTypePushStr
case SessionBaseTypePull:
return SessionBaseTypePullStr
}
return "INVALID"
}
type IClientSession interface {
// PushSession:
// Push()
@ -155,6 +299,67 @@ type IObject interface {
UniqueKey() string
}
type ISessionType interface {
Protocol() SessionProtocol
BaseType() SessionBaseType
UniqueKey() string
}
type SessionType struct {
protocol SessionProtocol
baseType SessionBaseType
uniqueKey string
}
func NewSessionType(protocol SessionProtocol, typ SessionBaseType) SessionType {
var uk string
switch protocol {
case SessionProtocolCustomize:
if typ == SessionBaseTypePub {
uk = GenUkCustomizePubSession()
}
case SessionProtocolRtmp:
if typ == SessionBaseTypePubSub {
uk = GenUkRtmpServerSession()
} else if typ == SessionBaseTypePush {
uk = GenUkRtmpPushSession()
} else if typ == SessionBaseTypePull {
uk = GenUkRtmpPullSession()
}
case SessionProtocolRtsp:
if typ == SessionBaseTypePub {
uk = GenUkRtspPubSession()
} else if typ == SessionBaseTypeSub {
uk = GenUkRtspSubSession()
} else if typ == SessionBaseTypePush {
uk = GenUkRtspPushSession()
} else if typ == SessionBaseTypePull {
uk = GenUkRtspPullSession()
}
case SessionProtocolFlv:
if typ == SessionBaseTypeSub {
uk = GenUkFlvSubSession()
} else if typ == SessionBaseTypePull {
uk = GenUkFlvPullSession()
}
case SessionProtocolTs:
if typ == SessionBaseTypeSub {
uk = GenUkTsSubSession()
}
}
if uk == "" {
nazalog.Errorf("session type invalid. protocol=%s, typ=%s", protocol.Stringify(), typ.Stringify())
uk = "INVALID"
}
return SessionType{
protocol: protocol,
baseType: typ,
uniqueKey: uk,
}
}
// TODO chef: rtmp.ClientSession修改为BaseClientSession更好些
// TODO(chef): [refactor] 整理 subsession 接口部分 IsFresh 和 ShouldWaitVideoKeyFrame

@ -41,8 +41,10 @@ type StatPull struct {
}
type StatSession struct {
Protocol string `json:"protocol"`
SessionId string `json:"session_id"`
SessionId string `json:"session_id"`
Protocol string `json:"protocol"`
BaseType string `json:"base_type"`
RemoteAddr string `json:"remote_addr"`
StartTime string `json:"start_time"`

@ -11,17 +11,17 @@ package base
import "github.com/q191201771/naza/pkg/unique"
const (
UkPreCustomizePubSessionContext = "CUSTOMIZEPUB"
UkPreRtmpServerSession = "RTMPPUBSUB" // 两种可能pub或者sub
UkPreRtmpPushSession = "RTMPPUSH"
UkPreRtmpPullSession = "RTMPPULL"
UkPreRtspPubSession = "RTSPPUB"
UkPreRtspSubSession = "RTSPSUB"
UkPreRtspPushSession = "RTSPPUSH"
UkPreRtspPullSession = "RTSPPULL"
UkPreFlvSubSession = "FLVSUB"
UkPreFlvPullSession = "FLVPULL"
UkPreTsSubSession = "TSSUB"
UkPreCustomizePubSessionContext = SessionProtocolCustomizeStr + SessionBaseTypePubStr // "CUSTOMIZEPUB"
UkPreRtmpServerSession = SessionProtocolRtmpStr + SessionBaseTypePubSubStr // "RTMPPUBSUB" // 两种可能pub或者sub
UkPreRtmpPushSession = SessionProtocolRtmpStr + SessionBaseTypePushStr // "RTMPPUSH"
UkPreRtmpPullSession = SessionProtocolRtmpStr + SessionBaseTypePullStr // "RTMPPULL"
UkPreRtspPubSession = SessionProtocolRtspStr + SessionBaseTypePubStr // "RTSPPUB"
UkPreRtspSubSession = SessionProtocolRtspStr + SessionBaseTypePubSubStr // "RTSPSUB"
UkPreRtspPushSession = SessionProtocolRtspStr + SessionBaseTypePushStr // "RTSPPUSH"
UkPreRtspPullSession = SessionProtocolRtspStr + SessionBaseTypePullStr // "RTSPPULL"
UkPreFlvSubSession = SessionProtocolFlvStr + SessionBaseTypePubSubStr // "FLVSUB"
UkPreFlvPullSession = SessionProtocolFlvStr + SessionBaseTypePullStr // "FLVPULL"
UkPreTsSubSession = SessionProtocolTsStr + SessionBaseTypePubSubStr // "TSSUB"
UkPreRtspServerCommandSession = "RTSPSRVCMD" // 这个不暴露给上层

@ -18,9 +18,9 @@ import "strings"
// LalVersion 版本,该变量由外部脚本修改维护
const LalVersion = "v0.29.1"
const HttpApiVersion = "v0.3.1"
const HttpApiVersion = "v0.3.2"
const HttpNotifyVersion = "v0.1.1"
const HttpNotifyVersion = "v0.1.2"
var (
LalLibraryName = "lal"

@ -41,10 +41,8 @@ type PullSession struct {
uniqueKey string // const after ctor
option PullSessionOption // const after ctor
conn connection.Connection
prevConnStat connection.Stat
staleStat *connection.Stat
stat base.StatSession
conn connection.Connection
sessionStat base.BasicSessionStat // TODO(chef): fix 没有初始化 202205
urlCtx base.UrlContext
@ -140,40 +138,25 @@ func (session *PullSession) UniqueKey() string {
return session.uniqueKey
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
// UpdateStat 文档请参考: interface ISessionStat
func (session *PullSession) UpdateStat(intervalSec uint32) {
currStat := session.conn.GetStat()
rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum
session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum
session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
session.stat.Bitrate = session.stat.ReadBitrate
session.prevConnStat = currStat
session.sessionStat.UpdateStatWitchConn(session.conn, intervalSec)
}
// GetStat 文档请参考: interface ISessionStat
func (session *PullSession) GetStat() base.StatSession {
connStat := session.conn.GetStat()
session.stat.ReadBytesSum = connStat.ReadBytesSum
session.stat.WroteBytesSum = connStat.WroteBytesSum
return session.stat
return session.sessionStat.GetStatWithConn(session.conn)
}
// IsAlive 文档请参考: interface ISessionStat
func (session *PullSession) IsAlive() (readAlive, writeAlive bool) {
currStat := session.conn.GetStat()
if session.staleStat == nil {
session.staleStat = new(connection.Stat)
*session.staleStat = currStat
return true, true
}
readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0)
writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0)
*session.staleStat = currStat
return
return session.sessionStat.IsAliveWitchConn(session.conn)
}
// ---------------------------------------------------------------------------------------------------------------------
func (session *PullSession) pullContext(ctx context.Context, rawUrl string, onReadFlvTag OnReadFlvTag) error {
errChan := make(chan error, 1)
url := rawUrl

@ -34,7 +34,7 @@ func NewSubSession(conn net.Conn, urlCtx base.UrlContext, isWebSocket bool, webs
option.WriteTimeoutMs = SubSessionWriteTimeoutMs
},
Uk: uk,
Protocol: base.ProtocolHttpflv,
Protocol: base.SessionProtocolFlvStr,
UrlCtx: urlCtx,
IsWebSocket: isWebSocket,
WebSocketKey: websocketKey,
@ -107,9 +107,7 @@ func (session *SubSession) RawQuery() string {
return session.core.RawQuery()
}
// ---------------------------------------------------------------------------------------------------------------------
// ISessionStat interface
// ---------------------------------------------------------------------------------------------------------------------
// ----- ISessionStat --------------------------------------------------------------------------------------------------
func (session *SubSession) UpdateStat(intervalSec uint32) {
session.core.UpdateStat(intervalSec)

@ -33,7 +33,7 @@ func NewSubSession(conn net.Conn, urlCtx base.UrlContext, isWebSocket bool, webs
option.WriteTimeoutMs = SubSessionWriteTimeoutMs
},
Uk: uk,
Protocol: base.ProtocolHttpts,
Protocol: base.SessionProtocolTsStr,
UrlCtx: urlCtx,
IsWebSocket: isWebSocket,
WebSocketKey: websocketKey,

@ -129,7 +129,7 @@ func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) error {
var info base.PullStartInfo
info.SessionId = session.UniqueKey()
info.Url = session.Url()
info.Protocol = base.ProtocolRtmp
info.Protocol = session.GetStat().Protocol
info.RemoteAddr = session.GetStat().RemoteAddr
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -160,7 +160,7 @@ func (group *Group) AddRtspPullSession(session *rtsp.PullSession) error {
var info base.PullStartInfo
info.SessionId = session.UniqueKey()
info.Url = session.Url()
info.Protocol = base.ProtocolRtsp
info.Protocol = session.GetStat().Protocol
info.RemoteAddr = session.GetStat().RemoteAddr
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -200,7 +200,7 @@ func (group *Group) DelRtmpPullSession(session *rtmp.PullSession) {
var info base.PullStopInfo
info.SessionId = session.UniqueKey()
info.Url = session.Url()
info.Protocol = base.ProtocolRtmp
info.Protocol = session.GetStat().Protocol
info.RemoteAddr = session.GetStat().RemoteAddr
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -218,7 +218,7 @@ func (group *Group) DelRtspPullSession(session *rtsp.PullSession) {
var info base.PullStopInfo
info.SessionId = session.UniqueKey()
info.Url = session.Url()
info.Protocol = base.ProtocolRtsp
info.Protocol = session.GetStat().Protocol
info.RemoteAddr = session.GetStat().RemoteAddr
info.AppName = session.AppName()
info.StreamName = session.StreamName()

@ -379,7 +379,7 @@ func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) error
// TODO chef: 每次赋值都逐个拼代码冗余考虑直接用ISession抽离一下代码
var info base.PubStartInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtmp
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -416,7 +416,7 @@ func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) {
var info base.PubStopInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtmp
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -434,7 +434,7 @@ func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) error
var info base.SubStartInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtmp
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -468,7 +468,7 @@ func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) {
var info base.SubStopInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtmp
info.Protocol = session.GetStat().Protocol
info.AppName = session.AppName()
info.StreamName = session.StreamName()
info.UrlParam = session.RawQuery()
@ -487,7 +487,7 @@ func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) err
var info base.SubStartInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolHttpflv
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -521,7 +521,7 @@ func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) {
var info base.SubStopInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolHttpflv
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -539,7 +539,7 @@ func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) error
var info base.SubStartInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolHttpts
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -575,7 +575,7 @@ func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) {
var info base.SubStopInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolHttpts
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -603,7 +603,7 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) error {
var info base.PubStartInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtsp
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -639,7 +639,7 @@ func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) {
var info base.PubStopInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtsp
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -664,7 +664,7 @@ func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) error
var info base.SubStartInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtsp
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()
@ -698,7 +698,7 @@ func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) {
var info base.SubStopInfo
info.ServerId = sm.config.ServerId
info.Protocol = base.ProtocolRtsp
info.Protocol = session.GetStat().Protocol
info.Url = session.Url()
info.AppName = session.AppName()
info.StreamName = session.StreamName()

@ -37,18 +37,18 @@ func NewSimpleAuthCtx(config SimpleAuthConfig) *SimpleAuthCtx {
}
func (s *SimpleAuthCtx) OnPubStart(info base.PubStartInfo) error {
if s.config.PubRtmpEnable && info.Protocol == base.ProtocolRtmp ||
s.config.PubRtspEnable && info.Protocol == base.ProtocolRtsp {
if s.config.PubRtmpEnable && info.Protocol == base.SessionProtocolRtmpStr ||
s.config.PubRtspEnable && info.Protocol == base.SessionProtocolRtspStr {
return s.check(info.StreamName, info.UrlParam)
}
return nil
}
func (s *SimpleAuthCtx) OnSubStart(info base.SubStartInfo) error {
if (s.config.SubRtmpEnable && info.Protocol == base.ProtocolRtmp) ||
(s.config.SubHttpflvEnable && info.Protocol == base.ProtocolHttpflv) ||
(s.config.SubHttptsEnable && info.Protocol == base.ProtocolHttpts) ||
(s.config.SubRtspEnable && info.Protocol == base.ProtocolRtsp) {
if (s.config.SubRtmpEnable && info.Protocol == base.SessionProtocolRtmpStr) ||
(s.config.SubHttpflvEnable && info.Protocol == base.SessionProtocolFlvStr) ||
(s.config.SubHttptsEnable && info.Protocol == base.SessionProtocolTsStr) ||
(s.config.SubRtspEnable && info.Protocol == base.SessionProtocolRtspStr) {
return s.check(info.StreamName, info.UrlParam)
}
return nil

@ -27,7 +27,7 @@ func TestSimpleAuthCtx(t *testing.T) {
PubRtmpEnable: true,
})
var info base.PubStartInfo
info.Protocol = base.ProtocolRtmp
info.Protocol = base.SessionProtocolRtmpStr
info.StreamName = "test110"
info.UrlParam = "lal_secret=700997e1595a06c9ffa60ebef79105b0"

@ -128,6 +128,8 @@ func (s *PullSession) UniqueKey() string {
return s.core.uniqueKey
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
// GetStat 文档请参考: interface ISessionStat
func (s *PullSession) GetStat() base.StatSession {
return s.core.GetStat()

@ -114,6 +114,8 @@ func (s *PushSession) UniqueKey() string {
return s.core.uniqueKey
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
// GetStat 文档请参考: interface ISessionStat
func (s *PushSession) GetStat() base.StatSession {
return s.core.GetStat()

@ -36,7 +36,7 @@ type ClientSession struct {
// 只有PullSession使用
onReadRtmpAvMsg OnReadRtmpAvMsg
t ClientSessionType
t ClientSessionType // TODO(chef): refactor 使用basetype替代 202205
option ClientSessionOption
packer *MessagePacker
@ -45,13 +45,12 @@ type ClientSession struct {
hc IHandshakeClient
conn connection.Connection
prevConnStat connection.Stat
staleStat *connection.Stat
stat base.StatSession
doResultChan chan struct{}
errChan chan error
hasNotifyDoResultSucc bool
sessionStat base.BasicSessionStat
debugLogReadUserCtrlMsgCount int
debugLogReadUserCtrlMsgMax int
@ -106,11 +105,14 @@ type ModClientSessionOption func(option *ClientSessionOption)
// NewClientSession @param t: session的类型只能是推或者拉
func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession {
var uk string
var baseType string
switch t {
case CstPullSession:
uk = base.GenUkRtmpPullSession()
baseType = base.SessionBaseTypePullStr
case CstPushSession:
uk = base.GenUkRtmpPushSession()
baseType = base.SessionBaseTypePushStr
}
option := defaultClientSessOption
@ -134,10 +136,13 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
doResultChan: make(chan struct{}, 1),
packer: NewMessagePacker(),
chunkComposer: NewChunkComposer(),
stat: base.StatSession{
Protocol: base.ProtocolRtmp,
SessionId: uk,
StartTime: base.ReadableNowTime(),
sessionStat: base.BasicSessionStat{
Stat: base.StatSession{
SessionId: uk,
Protocol: base.SessionProtocolRtmpStr,
BaseType: baseType,
StartTime: base.ReadableNowTime(),
},
},
debugLogReadUserCtrlMsgMax: 5,
hc: hc,
@ -224,42 +229,22 @@ func (s *ClientSession) UniqueKey() string {
return s.uniqueKey
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
func (s *ClientSession) GetStat() base.StatSession {
connStat := s.conn.GetStat()
s.stat.ReadBytesSum = connStat.ReadBytesSum
s.stat.WroteBytesSum = connStat.WroteBytesSum
return s.stat
return s.sessionStat.GetStatWithConn(s.conn)
}
func (s *ClientSession) UpdateStat(intervalSec uint32) {
currStat := s.conn.GetStat()
rDiff := currStat.ReadBytesSum - s.prevConnStat.ReadBytesSum
s.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := currStat.WroteBytesSum - s.prevConnStat.WroteBytesSum
s.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
switch s.t {
case CstPushSession:
s.stat.Bitrate = s.stat.WriteBitrate
case CstPullSession:
s.stat.Bitrate = s.stat.ReadBitrate
}
s.prevConnStat = currStat
s.sessionStat.UpdateStatWitchConn(s.conn, intervalSec)
}
func (s *ClientSession) IsAlive() (readAlive, writeAlive bool) {
currStat := s.conn.GetStat()
if s.staleStat == nil {
s.staleStat = new(connection.Stat)
*s.staleStat = currStat
return true, true
}
readAlive = !(currStat.ReadBytesSum-s.staleStat.ReadBytesSum == 0)
writeAlive = !(currStat.WroteBytesSum-s.staleStat.WroteBytesSum == 0)
*s.staleStat = currStat
return
return s.sessionStat.IsAliveWitchConn(s.conn)
}
// ---------------------------------------------------------------------------------------------------------------------
func (s *ClientSession) connect() {
if err := s.tcpConnect(); err != nil {
s.errChan <- err
@ -328,7 +313,7 @@ func (s *ClientSession) tcpConnect() error {
Log.Infof("[%s] > tcp connect.", s.uniqueKey)
var err error
s.stat.RemoteAddr = s.urlCtx.HostWithPort
s.sessionStat.Stat.RemoteAddr = s.urlCtx.HostWithPort
var conn net.Conn
if s.urlCtx.Scheme == "rtmps" {

@ -65,15 +65,13 @@ type ServerSession struct {
rawQuery string //const after set
observer IServerSessionObserver
t ServerSessionType
t ServerSessionType // TODO(chef): refactor 改用 sessionStat.Stat.BaseType 202205
hs HandshakeServer
chunkComposer *ChunkComposer
packer *MessagePacker
conn connection.Connection
prevConnStat connection.Stat
staleStat *connection.Stat
stat base.StatSession
conn connection.Connection
sessionStat base.BasicSessionStat
// only for PubSession
avObserver IPubSessionObserver
@ -103,11 +101,14 @@ func NewServerSession(observer IServerSessionObserver, conn net.Conn) *ServerSes
conn: connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = readBufSize
}),
stat: base.StatSession{
Protocol: base.ProtocolRtmp,
SessionId: uk,
StartTime: base.ReadableNowTime(),
RemoteAddr: conn.RemoteAddr().String(),
sessionStat: base.BasicSessionStat{
Stat: base.StatSession{
SessionId: uk,
Protocol: base.SessionProtocolRtmpStr,
BaseType: base.SessionBaseTypePubSubStr,
StartTime: base.ReadableNowTime(),
RemoteAddr: conn.RemoteAddr().String(),
},
},
uniqueKey: uk,
observer: observer,
@ -171,42 +172,22 @@ func (s *ServerSession) UniqueKey() string {
return s.uniqueKey
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
func (s *ServerSession) UpdateStat(intervalSec uint32) {
currStat := s.conn.GetStat()
rDiff := currStat.ReadBytesSum - s.prevConnStat.ReadBytesSum
s.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := currStat.WroteBytesSum - s.prevConnStat.WroteBytesSum
s.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
switch s.t {
case ServerSessionTypePub:
s.stat.Bitrate = s.stat.ReadBitrate
case ServerSessionTypeSub:
s.stat.Bitrate = s.stat.WriteBitrate
}
s.prevConnStat = currStat
s.sessionStat.UpdateStatWitchConn(s.conn, intervalSec)
}
func (s *ServerSession) GetStat() base.StatSession {
connStat := s.conn.GetStat()
s.stat.ReadBytesSum = connStat.ReadBytesSum
s.stat.WroteBytesSum = connStat.WroteBytesSum
return s.stat
return s.sessionStat.GetStatWithConn(s.conn)
}
func (s *ServerSession) IsAlive() (readAlive, writeAlive bool) {
currStat := s.conn.GetStat()
if s.staleStat == nil {
s.staleStat = new(connection.Stat)
*s.staleStat = currStat
return true, true
}
readAlive = !(currStat.ReadBytesSum-s.staleStat.ReadBytesSum == 0)
writeAlive = !(currStat.WroteBytesSum-s.staleStat.WroteBytesSum == 0)
*s.staleStat = currStat
return
return s.sessionStat.IsAliveWitchConn(s.conn)
}
// ---------------------------------------------------------------------------------------------------------------------
func (s *ServerSession) runReadLoop() error {
return s.chunkComposer.RunLoop(s.conn, s.doMsg)
}
@ -451,6 +432,7 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) {
s.modConnProps()
s.t = ServerSessionTypePub
s.sessionStat.Stat.BaseType = base.SessionBaseTypePubStr
err = s.observer.OnNewRtmpPubSession(s)
if err != nil {
s.DisposeByObserverFlag = true
@ -493,6 +475,7 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
s.modConnProps()
s.t = ServerSessionTypeSub
s.sessionStat.Stat.BaseType = base.SessionBaseTypeSubStr
err = s.observer.OnNewRtmpSubSession(s)
if err != nil {
s.DisposeByObserverFlag = true

@ -21,7 +21,6 @@ import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazanet"
)
@ -62,10 +61,7 @@ type BaseInSession struct {
videoRtpChannel int
videoRtcpChannel int
currConnStat connection.StatAtomic
prevConnStat connection.Stat
staleStat *connection.Stat
stat base.StatSession
sessionStat base.BasicSessionStat
mu sync.Mutex
sdpCtx sdp.LogicContext // const after set
@ -88,13 +84,16 @@ type BaseInSession struct {
dumpReadSr base.LogDump
}
func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *BaseInSession {
func NewBaseInSession(uniqueKey string, sessionBaseType string, cmdSession IInterleavedPacketWriter) *BaseInSession {
s := &BaseInSession{
uniqueKey: uniqueKey,
stat: base.StatSession{
Protocol: base.ProtocolRtsp,
SessionId: uniqueKey,
StartTime: base.ReadableNowTime(),
sessionStat: base.BasicSessionStat{
Stat: base.StatSession{
SessionId: uniqueKey,
Protocol: base.SessionProtocolRtspStr,
BaseType: sessionBaseType,
StartTime: base.ReadableNowTime(),
},
},
cmdSession: cmdSession,
waitChan: make(chan error, 1),
@ -106,8 +105,8 @@ func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *Ba
return s
}
func NewBaseInSessionWithObserver(uniqueKey string, cmdSession IInterleavedPacketWriter, observer IBaseInSessionObserver) *BaseInSession {
s := NewBaseInSession(uniqueKey, cmdSession)
func NewBaseInSessionWithObserver(uniqueKey string, sessionBaseType string, cmdSession IInterleavedPacketWriter, observer IBaseInSessionObserver) *BaseInSession {
s := NewBaseInSession(uniqueKey, sessionBaseType, cmdSession)
s.observer = observer
return s
}
@ -240,41 +239,22 @@ func (session *BaseInSession) WriteRtpRtcpDummy() {
}
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
func (session *BaseInSession) GetStat() base.StatSession {
session.stat.ReadBytesSum = session.currConnStat.ReadBytesSum.Load()
session.stat.WroteBytesSum = session.currConnStat.WroteBytesSum.Load()
return session.stat
return session.sessionStat.GetStat()
}
func (session *BaseInSession) UpdateStat(intervalSec uint32) {
readBytesSum := session.currConnStat.ReadBytesSum.Load()
wroteBytesSum := session.currConnStat.WroteBytesSum.Load()
rDiff := readBytesSum - session.prevConnStat.ReadBytesSum
session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := wroteBytesSum - session.prevConnStat.WroteBytesSum
session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
session.stat.Bitrate = session.stat.ReadBitrate
session.prevConnStat.ReadBytesSum = readBytesSum
session.prevConnStat.WroteBytesSum = wroteBytesSum
session.sessionStat.UpdateStat(intervalSec)
}
func (session *BaseInSession) IsAlive() (readAlive, writeAlive bool) {
readBytesSum := session.currConnStat.ReadBytesSum.Load()
wroteBytesSum := session.currConnStat.WroteBytesSum.Load()
if session.staleStat == nil {
session.staleStat = new(connection.Stat)
session.staleStat.ReadBytesSum = readBytesSum
session.staleStat.WroteBytesSum = wroteBytesSum
return true, true
}
readAlive = !(readBytesSum-session.staleStat.ReadBytesSum == 0)
writeAlive = !(wroteBytesSum-session.staleStat.WroteBytesSum == 0)
session.staleStat.ReadBytesSum = readBytesSum
session.staleStat.WroteBytesSum = wroteBytesSum
return
return session.sessionStat.IsAlive()
}
// ---------------------------------------------------------------------------------------------------------------------
func (session *BaseInSession) UniqueKey() string {
return session.uniqueKey
}
@ -323,7 +303,7 @@ func (session *BaseInSession) onReadRtcpPacket(b []byte, rAddr *net.UDPAddr, err
// @param rAddr 对端地址往对端发送数据时使用注意如果nil则表示是interleaved模式我们直接往TCP连接发数据
func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) error {
session.currConnStat.ReadBytesSum.Add(uint64(len(b)))
session.sessionStat.AddReadBytes(len(b))
if len(b) <= 0 {
Log.Errorf("[%s] handleRtcpPacket but length invalid. len=%d", session.uniqueKey, len(b))
@ -350,7 +330,7 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err
} else {
_ = session.cmdSession.WriteInterleavedPacket(rrBuf, session.audioRtcpChannel)
}
session.currConnStat.WroteBytesSum.Add(uint64(len(b)))
session.sessionStat.AddWriteBytes(len(b))
}
case session.videoSsrc.Load():
session.mu.Lock()
@ -362,7 +342,7 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err
} else {
_ = session.cmdSession.WriteInterleavedPacket(rrBuf, session.videoRtcpChannel)
}
session.currConnStat.WroteBytesSum.Add(uint64(len(b)))
session.sessionStat.AddWriteBytes(len(b))
}
default:
// noop
@ -381,7 +361,7 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err
}
func (session *BaseInSession) handleRtpPacket(b []byte) error {
session.currConnStat.ReadBytesSum.Add(uint64(len(b)))
session.sessionStat.AddReadBytes(len(b))
if len(b) < rtprtcp.RtpFixedHeaderLength {
Log.Errorf("[%s] handleRtpPacket but length invalid. len=%d", session.uniqueKey, len(b))

@ -21,7 +21,6 @@ import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazanet"
)
@ -42,10 +41,7 @@ type BaseOutSession struct {
videoRtpChannel int
videoRtcpChannel int
stat base.StatSession
currConnStat connection.StatAtomic
prevConnStat connection.Stat
staleStat *connection.Stat
sessionStat base.BasicSessionStat
// only for debug log
debugLogMaxCount int
@ -58,14 +54,17 @@ type BaseOutSession struct {
waitChan chan error
}
func NewBaseOutSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *BaseOutSession {
func NewBaseOutSession(uniqueKey string, sessionBaseType string, cmdSession IInterleavedPacketWriter) *BaseOutSession {
s := &BaseOutSession{
uniqueKey: uniqueKey,
cmdSession: cmdSession,
stat: base.StatSession{
Protocol: base.ProtocolRtsp,
SessionId: uniqueKey,
StartTime: base.ReadableNowTime(),
sessionStat: base.BasicSessionStat{
Stat: base.StatSession{
SessionId: uniqueKey,
Protocol: base.SessionProtocolRtspStr,
BaseType: sessionBaseType,
StartTime: base.ReadableNowTime(),
},
},
audioRtpChannel: -1,
videoRtpChannel: -1,
@ -181,51 +180,34 @@ func (session *BaseOutSession) WriteRtpPacket(packet rtprtcp.RtpPacket) error {
}
if err == nil {
session.currConnStat.WroteBytesSum.Add(uint64(len(packet.Raw)))
session.sessionStat.AddWriteBytes(len(packet.Raw))
}
return err
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
func (session *BaseOutSession) GetStat() base.StatSession {
session.stat.ReadBytesSum = session.currConnStat.ReadBytesSum.Load()
session.stat.WroteBytesSum = session.currConnStat.WroteBytesSum.Load()
return session.stat
return session.sessionStat.GetStat()
}
func (session *BaseOutSession) UpdateStat(intervalSec uint32) {
readBytesSum := session.currConnStat.ReadBytesSum.Load()
wroteBytesSum := session.currConnStat.WroteBytesSum.Load()
rDiff := readBytesSum - session.prevConnStat.ReadBytesSum
session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := wroteBytesSum - session.prevConnStat.WroteBytesSum
session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
session.stat.Bitrate = session.stat.WriteBitrate
session.prevConnStat.ReadBytesSum = readBytesSum
session.prevConnStat.WroteBytesSum = wroteBytesSum
session.sessionStat.UpdateStat(intervalSec)
}
func (session *BaseOutSession) IsAlive() (readAlive, writeAlive bool) {
readBytesSum := session.currConnStat.ReadBytesSum.Load()
wroteBytesSum := session.currConnStat.WroteBytesSum.Load()
if session.staleStat == nil {
session.staleStat = new(connection.Stat)
session.staleStat.ReadBytesSum = readBytesSum
session.staleStat.WroteBytesSum = wroteBytesSum
return true, true
}
readAlive = !(readBytesSum-session.staleStat.ReadBytesSum == 0)
writeAlive = !(wroteBytesSum-session.staleStat.WroteBytesSum == 0)
session.staleStat.ReadBytesSum = readBytesSum
session.staleStat.WroteBytesSum = wroteBytesSum
return
return session.sessionStat.IsAlive()
}
// ---------------------------------------------------------------------------------------------------------------------
func (session *BaseOutSession) UniqueKey() string {
return session.uniqueKey
}
func (session *BaseOutSession) onReadRtpPacket(b []byte, rAddr *net.UDPAddr, err error) bool {
// TODO(chef): [fix] 在收到rtp和rtcp的地方加入stat统计 202205
if session.loggedReadRtpCount.Load() < int32(session.debugLogMaxCount) {
Log.Debugf("[%s] LOGPACKET. read rtp=%s", session.uniqueKey, hex.Dump(nazabytes.Prefix(b, 32)))
session.loggedReadRtpCount.Increment()

@ -66,7 +66,7 @@ func NewPullSession(observer IPullSessionObserver, modOptions ...ModPullSessionO
opt.DoTimeoutMs = option.PullTimeoutMs
opt.OverTcp = option.OverTcp
})
baseInSession := NewBaseInSessionWithObserver(uk, s, observer)
baseInSession := NewBaseInSessionWithObserver(uk, base.SessionBaseTypePullStr, s, observer)
s.baseInSession = baseInSession
s.cmdSession = cmdSession
Log.Infof("[%s] lifecycle new rtsp PullSession. session=%p", uk, s)
@ -179,6 +179,8 @@ func (session *PullSession) UniqueKey() string {
return session.uniqueKey
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
// GetStat 文档请参考: interface ISessionStat
func (session *PullSession) GetStat() base.StatSession {
stat := session.baseInSession.GetStat()
@ -196,6 +198,8 @@ func (session *PullSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseInSession.IsAlive()
}
// ---------------------------------------------------------------------------------------------------------------------
// OnConnectResult IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnConnectResult() {
// noop

@ -54,7 +54,7 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession {
opt.DoTimeoutMs = option.PushTimeoutMs
opt.OverTcp = option.OverTcp
})
baseOutSession := NewBaseOutSession(uk, s)
baseOutSession := NewBaseOutSession(uk, base.SessionBaseTypePushStr, s)
s.cmdSession = cmdSession
s.baseOutSession = baseOutSession
Log.Infof("[%s] lifecycle new rtsp PushSession. session=%p", uk, s)
@ -161,6 +161,8 @@ func (session *PushSession) UniqueKey() string {
return session.uniqueKey
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
// GetStat 文档请参考: interface ISessionStat
func (session *PushSession) GetStat() base.StatSession {
stat := session.baseOutSession.GetStat()
@ -178,6 +180,8 @@ func (session *PushSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseOutSession.IsAlive()
}
// ---------------------------------------------------------------------------------------------------------------------
// OnConnectResult IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnConnectResult() {
// noop

@ -103,6 +103,8 @@ func (session *ServerCommandSession) RemoteAddr() string {
// ----- ISessionStat --------------------------------------------------------------------------------------------------
func (session *ServerCommandSession) UpdateStat(intervalSec uint32) {
// TODO(chef): 梳理interleaved模式下command session的ISessionStat 202205
currStat := session.conn.GetStat()
rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum
session.stat.Bitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))

@ -36,7 +36,7 @@ func NewPubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *Pu
urlCtx: urlCtx,
cmdSession: cmdSession,
}
baseInSession := NewBaseInSession(uk, s)
baseInSession := NewBaseInSession(uk, base.SessionBaseTypePubStr, s)
s.baseInSession = baseInSession
Log.Infof("[%s] lifecycle new rtsp PubSession. session=%p, streamName=%s", uk, s, urlCtx.LastItemOfPath)
return s
@ -93,6 +93,8 @@ func (session *PubSession) UniqueKey() string {
return session.uniqueKey
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
func (session *PubSession) GetStat() base.StatSession {
stat := session.baseInSession.GetStat()
stat.RemoteAddr = session.cmdSession.RemoteAddr()
@ -107,6 +109,8 @@ func (session *PubSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseInSession.IsAlive()
}
// ---------------------------------------------------------------------------------------------------------------------
// WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseInSession
func (session *PubSession) WriteInterleavedPacket(packet []byte, channel int) error {
return session.cmdSession.WriteInterleavedPacket(packet, channel)

@ -35,7 +35,7 @@ func NewSubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *Su
ShouldWaitVideoKeyFrame: true,
}
baseOutSession := NewBaseOutSession(uk, s)
baseOutSession := NewBaseOutSession(uk, base.SessionBaseTypeSubStr, s)
s.baseOutSession = baseOutSession
Log.Infof("[%s] lifecycle new rtsp SubSession. session=%p, streamName=%s", uk, s, urlCtx.LastItemOfPath)
return s

Loading…
Cancel
Save