[opt] 允许没有输入流时先创建rtsp SubSession

pull/197/head
q191201771
parent a9b78c078d
commit 44dd886b38

@ -13,6 +13,8 @@ import (
"io/ioutil"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"
@ -118,6 +120,9 @@ func Entry(tt *testing.T) {
}
func entry() {
var entryWaitGroup sync.WaitGroup // 用于等待所有协程结束
entryWaitGroup.Add(4)
Log.Debugf("> innertest")
if _, err := os.Lstat(confFilename); err != nil {
@ -132,6 +137,7 @@ func entry() {
httpflvPullTagCount.Store(0)
rtmpPullTagCount.Store(0)
httptsSize.Store(0)
rtspPullAvPacketCount.Store(0)
hls.Clock = mock.NewFakeClock()
hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.UTC))
httpts.SubSessionWriteChanSize = 0
@ -143,9 +149,6 @@ func entry() {
})
config := sm.Config()
//Log.Init(func(option *nazalog.Option) {
// option.Level = nazalog.LevelLogNothing
//})
_ = os.RemoveAll(config.HlsConfig.OutPath)
go sm.RunLoop()
@ -200,6 +203,7 @@ func entry() {
rtmpPullSession = rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.ReadAvTimeoutMs = 10000
option.ReadBufSize = 0
option.ReuseReadMessageBufferFlag = false
}).WithOnReadRtmpAvMsg(func(msg base.RtmpMsg) {
tag := remux.RtmpMsg2FlvTag(msg)
err := rtmpWriter.WriteTag(*tag)
@ -210,6 +214,8 @@ func entry() {
Log.Assert(nil, err)
err = <-rtmpPullSession.WaitChan()
Log.Debug(err)
entryWaitGroup.Done()
}()
go func() {
@ -225,6 +231,8 @@ func entry() {
Log.Assert(nil, err)
flvErr = <-httpflvPullSession.WaitChan()
Log.Debug(flvErr)
entryWaitGroup.Done()
}()
go func() {
@ -232,25 +240,21 @@ func entry() {
_ = ioutil.WriteFile(wTsPullFileName, b, 0666)
assert.Equal(t, goldenHttptsLenList[mode], len(b))
assert.Equal(t, goldenHttptsMd5List[mode], nazamd5.Md5(b))
entryWaitGroup.Done()
}()
time.Sleep(100 * time.Millisecond)
// TODO(chef): [test] [2021.12.25] rtsp sub测试 由于rtsp sub不支持没有pub时sub只能sub失败后重试所以没有验证收到的数据
// TODO(chef): [test] rtsp sub没有验证收到的数据因为即使是先sub它还有一个数据到来后才能完成信令交互的逻辑 202206
// TODO(chef): [perf] [2021.12.25] rtmp推rtsp拉的性能。开启rtsp pull后rtmp pull的总时长增加了
go func() {
for {
rtspPullAvPacketCount.Store(0)
var rtspPullObserver RtspPullObserver
rtspPullSession = rtsp.NewPullSession(&rtspPullObserver, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMs = 500
})
err := rtspPullSession.Pull(rtspPullUrl)
Log.Debug(err)
if rtspSdpCtx.RawSdp != nil {
break
}
time.Sleep(100 * time.Millisecond)
}
var rtspPullObserver RtspPullObserver
rtspPullSession = rtsp.NewPullSession(&rtspPullObserver, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMs = 10000
})
err := rtspPullSession.Pull(rtspPullUrl)
assert.Equal(t, nil, err)
entryWaitGroup.Done()
}()
time.Sleep(100 * time.Millisecond)
@ -284,7 +288,8 @@ func entry() {
break
}
nazalog.Debugf("%d(%d, %d) %d(%d)",
fileTagCount, httpflvPullTagCount.Load(), rtmpPullTagCount.Load(), goldenHttptsLenList[mode], httptsSize.Load())
fileTagCount, httpflvPullTagCount.Load(), rtmpPullTagCount.Load(),
goldenHttptsLenList[mode], httptsSize.Load())
time.Sleep(100 * time.Millisecond)
}
@ -301,10 +306,13 @@ func entry() {
//_ = syscall.Kill(syscall.Getpid(), syscall.SIGUSR1)
sm.Dispose()
entryWaitGroup.Wait()
Log.Debugf("tag count. in=%d, out httpflv=%d, out rtmp=%d, out rtsp=%d",
fileTagCount, httpflvPullTagCount.Load(), rtmpPullTagCount.Load(), rtspPullAvPacketCount.Load())
compareFile()
assert.Equal(t, strings.ReplaceAll(goldenRtspSdpList[mode], "\n", "\r\n"), string(rtspSdpCtx.RawSdp))
}
func compareFile() {
@ -598,3 +606,45 @@ innertest-1642375465000-7.ts
#EXT-X-ENDLIST
`,
}
var goldenRtspSdpList = []string{
`v=0
o=- 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
a=tool:lal 0.30.1
m=video 0 RTP/AVP 96
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAFqyyAUBf8uAiAAADAAIAAAMAPB4sXJA=,aOvDyyLA; profile-level-id=640016
a=control:streamid=0
m=audio 0 RTP/AVP 97
b=AS:128
a=rtpmap:97 MPEG4-GENERIC/44100/2
a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=121056e500
a=control:streamid=1
`,
`v=0
o=- 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
a=tool:lal 0.30.1
m=audio 0 RTP/AVP 97
b=AS:128
a=rtpmap:97 MPEG4-GENERIC/44100/2
a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=121056e500
a=control:streamid=0
`,
`v=0
o=- 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
a=tool:lal 0.30.1
m=video 0 RTP/AVP 96
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAFqyyAUBf8uAiAAADAAIAAAMAPB4sXJA=,aOvDyyLA; profile-level-id=640016
a=control:streamid=0
`,
}

@ -63,10 +63,11 @@ type Group struct {
// mpegts使用
patpmt []byte
// sub
rtmpSubSessionSet map[*rtmp.ServerSession]struct{}
httpflvSubSessionSet map[*httpflv.SubSession]struct{}
httptsSubSessionSet map[*httpts.SubSession]struct{}
rtspSubSessionSet map[*rtsp.SubSession]struct{}
rtmpSubSessionSet map[*rtmp.ServerSession]struct{}
httpflvSubSessionSet map[*httpflv.SubSession]struct{}
httptsSubSessionSet map[*httpts.SubSession]struct{}
rtspSubSessionSet map[*rtsp.SubSession]struct{}
waitRtspSubSessionSet map[*rtsp.SubSession]struct{}
// push
pushEnable bool
url2PushProxy map[string]*pushProxy
@ -93,14 +94,15 @@ func NewGroup(appName string, streamName string, config *Config, observer IGroup
stat: base.StatGroup{
StreamName: streamName,
},
exitChan: make(chan struct{}, 1),
rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}),
httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}),
httptsSubSessionSet: make(map[*httpts.SubSession]struct{}),
rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}),
rtmpGopCache: remux.NewGopCache("rtmp", uk, config.RtmpConfig.GopNum),
httpflvGopCache: remux.NewGopCache("httpflv", uk, config.HttpflvConfig.GopNum),
httptsGopCache: remux.NewGopCacheMpegts(uk, config.HttptsConfig.GopNum),
exitChan: make(chan struct{}, 1),
rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}),
httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}),
httptsSubSessionSet: make(map[*httpts.SubSession]struct{}),
rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}),
waitRtspSubSessionSet: make(map[*rtsp.SubSession]struct{}),
rtmpGopCache: remux.NewGopCache("rtmp", uk, config.RtmpConfig.GopNum),
httpflvGopCache: remux.NewGopCache("httpflv", uk, config.HttpflvConfig.GopNum),
httptsGopCache: remux.NewGopCacheMpegts(uk, config.HttptsConfig.GopNum),
}
g.initRelayPushByConfig()
@ -164,6 +166,10 @@ func (group *Group) Dispose() {
session.Dispose()
}
group.rtspSubSessionSet = nil
for session := range group.waitRtspSubSessionSet {
session.Dispose()
}
group.waitRtspSubSessionSet = nil
for session := range group.httpflvSubSessionSet {
session.Dispose()
@ -231,6 +237,13 @@ func (group *Group) GetStat(maxsub int) base.StatGroup {
}
group.stat.StatSubs = append(group.stat.StatSubs, base.Session2StatSub(s))
}
for s := range group.waitRtspSubSessionSet {
statSubCount++
if statSubCount > maxsub {
break
}
group.stat.StatSubs = append(group.stat.StatSubs, base.Session2StatSub(s))
}
return group.stat
}
@ -281,6 +294,12 @@ func (group *Group) KickSession(sessionId string) bool {
return true
}
}
for s := range group.waitRtspSubSessionSet {
if s.UniqueKey() == sessionId {
s.Dispose()
return true
}
}
} else {
Log.Errorf("[%s] kick session while session id format invalid. %s", group.UniqueKey, sessionId)
}
@ -319,7 +338,8 @@ func (group *Group) OutSessionNum() int {
pushNum++
}
}
return len(group.rtmpSubSessionSet) + len(group.rtspSubSessionSet) + len(group.httpflvSubSessionSet) + len(group.httptsSubSessionSet) + pushNum
return len(group.rtmpSubSessionSet) + len(group.rtspSubSessionSet) + len(group.waitRtspSubSessionSet) +
len(group.httpflvSubSessionSet) + len(group.httptsSubSessionSet) + pushNum
}
// ---------------------------------------------------------------------------------------------------------------------
@ -356,6 +376,12 @@ func (group *Group) disposeInactiveSessions() {
session.Dispose()
}
}
for session := range group.waitRtspSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
}
}
for session := range group.httpflvSubSessionSet {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
@ -403,6 +429,9 @@ func (group *Group) updateAllSessionStat() {
for session := range group.rtspSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for session := range group.waitRtspSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for _, item := range group.url2PushProxy {
session := item.pushSession
if item.isPushing && session != nil {
@ -419,7 +448,8 @@ func (group *Group) hasSubSession() bool {
return len(group.rtmpSubSessionSet) != 0 ||
len(group.httpflvSubSessionSet) != 0 ||
len(group.httptsSubSessionSet) != 0 ||
len(group.rtspSubSessionSet) != 0
len(group.rtspSubSessionSet) != 0 ||
len(group.waitRtspSubSessionSet) != 0
}
func (group *Group) hasPushSession() bool {

@ -50,6 +50,7 @@ func (group *Group) OnSdp(sdpCtx sdp.LogicContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.sdpCtx = &sdpCtx
group.feedWaitRtspSubSessions()
if group.rtsp2RtmpRemuxer != nil {
group.rtsp2RtmpRemuxer.OnSdp(sdpCtx)
}
@ -121,6 +122,7 @@ func (group *Group) onRtmpMsgFromRemux(msg base.RtmpMsg) {
//
func (group *Group) onSdpFromRemux(sdpCtx sdp.LogicContext) {
group.sdpCtx = &sdpCtx
group.feedWaitRtspSubSessions()
}
// onRtpPacketFromRemux ...
@ -506,3 +508,11 @@ func (group *Group) writev2RtmpSubSessions(bs net.Buffers) {
_ = session.Writev(bs)
}
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) feedWaitRtspSubSessions() {
for session := range group.waitRtspSubSessionSet {
session.FeedSdp(*group.sdpCtx)
}
}

@ -61,23 +61,26 @@ func (group *Group) AddHttptsSubSession(session *httpts.SubSession) {
}
func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) {
Log.Debugf("[%s] [%s] rtsp sub describe.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
// TODO(chef): 应该有等待机制,而不是直接关闭
if group.sdpCtx == nil {
Log.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]",
group.UniqueKey, session.UniqueKey())
return false, nil
}
Log.Warnf("[%s] [%s] rtsp subSession describe but sdp not exist.", group.UniqueKey, session.UniqueKey())
group.waitRtspSubSessionSet[session] = struct{}{}
return true, nil
}
return true, group.sdpCtx.RawSdp
}
func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) {
Log.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey())
Log.Debugf("[%s] [%s] rtsp sub play.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
delete(group.waitRtspSubSessionSet, session)
group.rtspSubSessionSet[session] = struct{}{}
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false

@ -308,6 +308,10 @@ func (sm *ServerManager) Dispose() {
sm.rtmpServer.Dispose()
}
if sm.rtspServer != nil {
sm.rtspServer.Dispose()
}
if sm.httpServerManager != nil {
sm.httpServerManager.Dispose()
}
@ -569,27 +573,32 @@ func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) {
func (sm *ServerManager) OnNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
return group.HandleNewRtspSubSessionDescribe(session)
}
func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2SubStartInfo(session)
if err := sm.simpleAuthCtx.OnSubStart(info); err != nil {
return err
return false, nil
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.HandleNewRtspSubSessionPlay(session)
ok, sdp = group.HandleNewRtspSubSessionDescribe(session)
if !ok {
return
}
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnSubStart(info)
return
}
func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
group.HandleNewRtspSubSessionPlay(session)
return nil
}

@ -33,9 +33,15 @@ type IServerCommandSessionObserver interface {
// OnNewRtspSubSessionDescribe
//
// @brief Describe阶段回调
// @return ok 如果返回false则表示上层要强制关闭这个拉流请求
// @return sdp
// Describe阶段回调
//
// 上层的几种逻辑对应的返回值的组合情况:
//
// 1. 强制关闭这个session`ok`设置为false
// 2. 当前有sdp在回调中设置sdp让session按正常逻辑往下走`ok`设置为true`sdp`设置为对应的值
// 3. 当前没有sdp后续在回调外通过 ServerCommandSession.FeedSdp 可以让session按正常逻辑往下走`ok`设置为true`sdp`设置为nil
//
// TODO(chef): bool参数类型统一为error类型 202206
//
OnNewRtspSubSessionDescribe(session *SubSession) (ok bool, sdp []byte)
@ -59,6 +65,8 @@ type ServerCommandSession struct {
pubSession *PubSession
subSession *SubSession
describeSeq string // only for sub session
}
func NewServerCommandSession(observer IServerCommandSessionObserver, conn net.Conn, authConf ServerAuthConfig) *ServerCommandSession {
@ -87,7 +95,7 @@ func (session *ServerCommandSession) Dispose() error {
}
func (session *ServerCommandSession) FeedSdp(b []byte) {
session.feedSdp(b)
}
// WriteInterleavedPacket
@ -273,6 +281,8 @@ func (session *ServerCommandSession) handleDescribe(requestCtx nazahttp.HttpReqM
return err
}
session.describeSeq = requestCtx.Headers.Get(HeaderCSeq)
session.subSession = NewSubSession(urlCtx, session)
Log.Infof("[%s] link new SubSession. [%s]", session.uniqueKey, session.subSession.UniqueKey())
ok, rawSdp := session.observer.OnNewRtspSubSessionDescribe(session.subSession)
@ -281,11 +291,18 @@ func (session *ServerCommandSession) handleDescribe(requestCtx nazahttp.HttpReqM
return base.ErrRtspClosedByObserver
}
if rawSdp != nil {
return session.feedSdp(rawSdp)
}
return nil
}
func (session *ServerCommandSession) feedSdp(rawSdp []byte) error {
sdpCtx, _ := sdp.ParseSdp2LogicContext(rawSdp)
session.subSession.InitWithSdp(sdpCtx)
resp := PackResponseDescribe(requestCtx.Headers.Get(HeaderCSeq), string(rawSdp))
_, err = session.conn.Write([]byte(resp))
resp := PackResponseDescribe(session.describeSeq, string(rawSdp))
_, err := session.conn.Write([]byte(resp))
return err
}

@ -9,11 +9,10 @@
package rtsp
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/nazaerrors"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazanet"
)
@ -38,6 +37,14 @@ func NewSubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *Su
return s
}
// FeedSdp 供上层调用
//
func (session *SubSession) FeedSdp(sdpCtx sdp.LogicContext) {
session.cmdSession.FeedSdp(sdpCtx.RawSdp)
}
// InitWithSdp 供 ServerCommandSession 调用
//
func (session *SubSession) InitWithSdp(sdpCtx sdp.LogicContext) {
session.baseOutSession.InitWithSdp(sdpCtx)
}

Loading…
Cancel
Save