From 323d6393d1f3b2992c611a7c9ace8baebeba6c67 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Wed, 26 Jun 2024 09:42:40 +0800 Subject: [PATCH] =?UTF-8?q?[refactor]=20=E6=89=80=E6=9C=89client=E7=B1=BB?= =?UTF-8?q?=E5=9E=8Bsession=E9=83=BD=E5=AE=9E=E7=8E=B0IClientSessionLifecy?= =?UTF-8?q?cle=E6=8E=A5=E5=8F=A3=E4=B8=AD=E7=9A=84Start=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/demo/benchrtmpconnect/benchrtmpconnect.go | 2 +- app/demo/calcrtmpdelay/calcrtmpdelay.go | 4 ++-- app/demo/pullrtmp/pullrtmp.go | 2 +- app/demo/pullrtmp2hls/pullrtmp2hls.go | 2 +- app/demo/pullrtmp2pushrtmp/stream_exist.go | 2 +- app/demo/pullrtmp2pushrtmp/tunnel.go | 4 ++-- .../pullrtmp2pushrtsp/pullrtmp2pushrtsp.go | 2 +- app/demo/pullrtsp/pullrtsp.go | 2 +- .../pullrtsp2pushrtmp/pullrtsp2pushrtmp.go | 4 ++-- .../pullrtsp2pushrtsp/pullrtsp2pushrtsp.go | 2 +- app/demo/pushrtmp/pushrtmp.go | 2 +- pkg/base/t_session.go | 23 +++++++++++++++---- pkg/innertest/innertest.go | 6 ++--- pkg/logic/group__relay_pull.go | 4 ++-- pkg/logic/group__relay_push.go | 2 +- pkg/rtmp/client_pull_session.go | 11 ++++++--- pkg/rtmp/client_push_session.go | 9 ++++++-- pkg/rtmp/client_session.go | 4 ++-- pkg/rtsp/client_command_session.go | 2 +- pkg/rtsp/client_pull_session.go | 11 ++++++--- pkg/rtsp/client_push_session.go | 2 +- 21 files changed, 66 insertions(+), 36 deletions(-) diff --git a/app/demo/benchrtmpconnect/benchrtmpconnect.go b/app/demo/benchrtmpconnect/benchrtmpconnect.go index dfd58ea..1a62d75 100644 --- a/app/demo/benchrtmpconnect/benchrtmpconnect.go +++ b/app/demo/benchrtmpconnect/benchrtmpconnect.go @@ -59,7 +59,7 @@ func main() { option.HandshakeComplexFlag = false }) b := time.Now() - err := pullSession.Pull(u) + err := pullSession.Start(u) e := time.Now() cost := e.Sub(b).Milliseconds() // 耗时不够1毫秒,我们将值取整到1毫秒,并打印更精确的实际耗时 diff --git a/app/demo/calcrtmpdelay/calcrtmpdelay.go b/app/demo/calcrtmpdelay/calcrtmpdelay.go index 394547d..e77dc4d 100644 --- a/app/demo/calcrtmpdelay/calcrtmpdelay.go +++ b/app/demo/calcrtmpdelay/calcrtmpdelay.go @@ -66,7 +66,7 @@ func main() { pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { option.PushTimeoutMs = 10000 }) - err := pushSession.Push(pushUrl) + err := pushSession.Start(pushUrl) if err != nil { nazalog.Fatalf("push rtmp failed. url=%s, err=%+v", pushUrl, err) } @@ -105,7 +105,7 @@ func main() { rtmpPullSession = rtmp.NewPullSession().WithOnReadRtmpAvMsg(func(msg base.RtmpMsg) { handleReadPayloadFn(msg.Payload) }) - err = rtmpPullSession.Pull(pullUrl) + err = rtmpPullSession.Start(pullUrl) if err != nil { nazalog.Fatalf("pull rtmp failed. err=%+v", err) } diff --git a/app/demo/pullrtmp/pullrtmp.go b/app/demo/pullrtmp/pullrtmp.go index 6b5bd0c..18e4bc3 100644 --- a/app/demo/pullrtmp/pullrtmp.go +++ b/app/demo/pullrtmp/pullrtmp.go @@ -104,7 +104,7 @@ func pull(url string, filename string) { } }) - err = session.Pull(url) + err = session.Start(url) if err != nil { nazalog.Errorf("pull failed. err=%+v", err) return diff --git a/app/demo/pullrtmp2hls/pullrtmp2hls.go b/app/demo/pullrtmp2hls/pullrtmp2hls.go index 12eb637..1a844bb 100644 --- a/app/demo/pullrtmp2hls/pullrtmp2hls.go +++ b/app/demo/pullrtmp2hls/pullrtmp2hls.go @@ -54,7 +54,7 @@ func main() { option.PullTimeoutMs = 10000 option.ReadAvTimeoutMs = 10000 }).WithOnReadRtmpAvMsg(rtmp2Mpegts.FeedRtmpMessage) - err = pullSession.Pull(url) + err = pullSession.Start(url) if err != nil { nazalog.Fatalf("pull rtmp failed. err=%+v", err) diff --git a/app/demo/pullrtmp2pushrtmp/stream_exist.go b/app/demo/pullrtmp2pushrtmp/stream_exist.go index 9dd1a06..3a039eb 100644 --- a/app/demo/pullrtmp2pushrtmp/stream_exist.go +++ b/app/demo/pullrtmp2pushrtmp/stream_exist.go @@ -51,7 +51,7 @@ func StreamExist(url string) error { defer s.Dispose() go func() { - err := s.Pull(url) + err := s.Start(url) if err != nil { errChan <- err } diff --git a/app/demo/pullrtmp2pushrtmp/tunnel.go b/app/demo/pullrtmp2pushrtmp/tunnel.go index a4b3b2d..584708f 100644 --- a/app/demo/pullrtmp2pushrtmp/tunnel.go +++ b/app/demo/pullrtmp2pushrtmp/tunnel.go @@ -201,7 +201,7 @@ func (t *Tunnel) Start() (ret ErrorCode) { }) nazalog.Infof("[%s] start push. [%s] url=%s", t.uk, pushSession.UniqueKey(), outUrl) - err := pushSession.Push(outUrl) + err := pushSession.Start(outUrl) // 只有有一个失败就直接退出 if err != nil { nazalog.Errorf("[%s] push error. [%s] err=%+v", t.uk, pushSession.UniqueKey(), err) @@ -223,7 +223,7 @@ func (t *Tunnel) Start() (ret ErrorCode) { }) nazalog.Infof("[%s] start pull. [%s] url=%s", t.uk, t.pullSession.UniqueKey(), t.inUrl) - err := t.pullSession.Pull(t.inUrl) + err := t.pullSession.Start(t.inUrl) // pull失败就直接退出 if err != nil { nazalog.Errorf("[%s] pull error. [%s] err=%+v", t.uk, t.pullSession.UniqueKey(), err) diff --git a/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go b/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go index a4f11e7..e881d32 100644 --- a/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go +++ b/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go @@ -52,7 +52,7 @@ func main() { pullSession := rtmp.NewPullSession().WithOnReadRtmpAvMsg(remuxer.FeedRtmpMsg) nazalog.Info("start pull.") - err := pullSession.Pull(inRtmpUrl) + err := pullSession.Start(inRtmpUrl) nazalog.Assert(nil, err) nazalog.Info("pull succ.") diff --git a/app/demo/pullrtsp/pullrtsp.go b/app/demo/pullrtsp/pullrtsp.go index 3b9787f..5f005d5 100644 --- a/app/demo/pullrtsp/pullrtsp.go +++ b/app/demo/pullrtsp/pullrtsp.go @@ -86,7 +86,7 @@ func main() { option.OverTcp = overTcp != 0 }) - err = pullSession.Pull(inUrl) + err = pullSession.Start(inUrl) nazalog.Assert(nil, err) go func() { diff --git a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go index 1815e72..7362613 100644 --- a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go +++ b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go @@ -36,7 +36,7 @@ func main() { option.WriteAvTimeoutMs = 10000 }) - err := pushSession.Push(outUrl) + err := pushSession.Start(outUrl) nazalog.Assert(nil, err) defer pushSession.Dispose() @@ -49,7 +49,7 @@ func main() { option.OverTcp = overTcp != 0 }) - err = pullSession.Pull(inUrl) + err = pullSession.Start(inUrl) nazalog.Assert(nil, err) defer pullSession.Dispose() diff --git a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go index 640e8bc..6b4aa7a 100644 --- a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go +++ b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go @@ -65,7 +65,7 @@ func (r *RtspTunnel) Start() error { option.PullTimeoutMs = 10000 option.OverTcp = r.pullOverTcp }) - if err := r.pullSession.Pull(r.pullUrl); err != nil { + if err := r.pullSession.Start(r.pullUrl); err != nil { nazalog.Errorf("[%s] start pull failed. err=%+v, url=%s", r.uniqueKey, err, r.pullUrl) return err } diff --git a/app/demo/pushrtmp/pushrtmp.go b/app/demo/pushrtmp/pushrtmp.go index 46489a0..0380884 100644 --- a/app/demo/pushrtmp/pushrtmp.go +++ b/app/demo/pushrtmp/pushrtmp.go @@ -96,7 +96,7 @@ func push(tags []httpflv.Tag, url string, isRecursive bool) { option.WriteChanSize = 0 }) - if err := ps.Push(url); err != nil { + if err := ps.Start(url); err != nil { nazalog.Errorf("push failed. err=%v", err) return } diff --git a/pkg/base/t_session.go b/pkg/base/t_session.go index a33d6dc..74ed867 100644 --- a/pkg/base/t_session.go +++ b/pkg/base/t_session.go @@ -95,17 +95,32 @@ type IServerSession interface { // --------------------------------------------------------------------------------------------------------------------- +// IClientSessionLifecycle +// +// 常规正确调用流程: +// +// New -> WithXXX -> Start -> Read/Write -> Dispose +// +// Start之前,不调用Read, Write, WaitChan函数 +// Start, Read, Write返回失败或者WaitChan返回错误时,直接调用Dispose,之后尽量不再使用这个session type IClientSessionLifecycle interface { - // Dispose 主动关闭session时调用 + Start(rawUrl string) error + + // Dispose // - // 注意,只有Start(具体session的Start类型函数一般命令为Push和Pull)成功后的session才能调用,否则行为未定义 + // 关闭session,主要是在主动关闭时调用。 // - // Dispose可在任意协程内调用 + // - 可以在任意协程内调用。 + // - 可以调用多次。 // - // 注意,目前Dispose允许调用多次,但是未来可能不对该表现做保证 + // - 理论上可以在Start之前调用,但请尽量避免。 + // - 不能和Start并发调用。 + // - Start失败后可以不调用Dispose。 // // Dispose后,调用Write函数将返回错误 // + // - TODO 如果Read或Write函数返回错误,可以不调用Dispose。 + // // @return 可以通过返回值判断调用Dispose前,session是否已经被关闭了 TODO(chef) 这个返回值没有太大意义,后面可能会删掉 // Dispose() error diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index 0978859..7481a17 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -220,7 +220,7 @@ func entry() { assert.Equal(t, nil, err) rtmpPullTagCount.Increment() }) - err := rtmpPullSession.Pull(rtmpPullUrl) + err := rtmpPullSession.Start(rtmpPullUrl) Log.Assert(nil, err) err = <-rtmpPullSession.WaitChan() Log.Debug(err) @@ -262,7 +262,7 @@ func entry() { rtspPullSession = rtsp.NewPullSession(&rtspPullObserver, func(option *rtsp.PullSessionOption) { option.PullTimeoutMs = 10000 }) - err := rtspPullSession.Pull(rtspPullUrl) + err := rtspPullSession.Start(rtspPullUrl) assert.Equal(t, nil, err) entryWaitGroup.Done() }() @@ -273,7 +273,7 @@ func entry() { option.WriteBufSize = 4096 //option.WriteChanSize = 1024 }) - err = pushSession.Push(pushUrl) + err = pushSession.Start(pushUrl) assert.Equal(t, nil, err) for _, tag := range tags { diff --git a/pkg/logic/group__relay_pull.go b/pkg/logic/group__relay_pull.go index 2cebeb3..8d91049 100644 --- a/pkg/logic/group__relay_pull.go +++ b/pkg/logic/group__relay_pull.go @@ -249,7 +249,7 @@ func (group *Group) pullIfNeeded() (string, error) { go func(rtPullUrl string, rtIsPullByRtmp bool, rtRtmpSession *rtmp.PullSession, rtRtspSession *rtsp.PullSession) { if rtIsPullByRtmp { // TODO(chef): 处理数据回调,是否应该等待Add成功之后。避免竞态条件中途加入了其他in session - err := rtRtmpSession.Pull(rtPullUrl) + err := rtRtmpSession.Start(rtPullUrl) if err != nil { Log.Errorf("[%s] relay pull fail. err=%v", rtRtmpSession.UniqueKey(), err) group.DelRtmpPullSession(rtRtmpSession) @@ -262,7 +262,7 @@ func (group *Group) pullIfNeeded() (string, error) { return } - err := rtRtspSession.Pull(rtPullUrl) + err := rtRtspSession.Start(rtPullUrl) if err != nil { Log.Errorf("[%s] relay pull fail. err=%v", rtRtspSession.UniqueKey(), err) group.DelRtspPullSession(rtRtspSession) diff --git a/pkg/logic/group__relay_push.go b/pkg/logic/group__relay_push.go index 6bd717c..2e511c5 100644 --- a/pkg/logic/group__relay_push.go +++ b/pkg/logic/group__relay_push.go @@ -100,7 +100,7 @@ func (group *Group) startPushIfNeeded() { option.PushTimeoutMs = RelayPushTimeoutMs option.WriteAvTimeoutMs = RelayPushWriteAvTimeoutMs }) - err := pushSession.Push(u2) + err := pushSession.Start(u2) if err != nil { Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) group.DelRtmpPushSession(u, pushSession) diff --git a/pkg/rtmp/client_pull_session.go b/pkg/rtmp/client_pull_session.go index d823e20..c682ce7 100644 --- a/pkg/rtmp/client_pull_session.go +++ b/pkg/rtmp/client_pull_session.go @@ -72,7 +72,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession { // WithOnPullSucc Pull成功 // -// 如果你想保证绝对时序,在 WithOnReadRtmpAvMsg 回调音视频数据前,做一些操作,那么使用这个回调替代 Pull 返回成功 +// 如果你想保证绝对时序,在 WithOnReadRtmpAvMsg 回调音视频数据前,做一些操作,那么使用这个回调替代 Start 返回成功 func (s *PullSession) WithOnPullSucc(onPullResult func()) *PullSession { s.core.onDoResult = onPullResult return s @@ -96,9 +96,14 @@ func (s *PullSession) WithOnReadRtmpAvMsg(onReadRtmpAvMsg OnReadRtmpAvMsg) *Pull return s } -// Pull 阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误 +// Start 阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误 +func (s *PullSession) Start(rawUrl string) error { + return s.core.Start(rawUrl) +} + +// Pull deprecated. use Start instead. func (s *PullSession) Pull(rawUrl string) error { - return s.core.Do(rawUrl) + return s.Start(rawUrl) } // --------------------------------------------------------------------------------------------------------------------- diff --git a/pkg/rtmp/client_push_session.go b/pkg/rtmp/client_push_session.go index 69c3d2d..ea6b135 100644 --- a/pkg/rtmp/client_push_session.go +++ b/pkg/rtmp/client_push_session.go @@ -64,9 +64,14 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession { } } -// Push 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误 +// Start 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误 +func (s *PushSession) Start(rawUrl string) error { + return s.core.Start(rawUrl) +} + +// Push deprecated. use Start instead. func (s *PushSession) Push(rawUrl string) error { - return s.core.Do(rawUrl) + return s.Start(rawUrl) } // Write 发送数据 diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 21c9c82..015d0be 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -142,8 +142,8 @@ func NewClientSession(sessionType base.SessionType, modOptions ...ModClientSessi return s } -// Do 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误 -func (s *ClientSession) Do(rawUrl string) error { +// Start 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误 +func (s *ClientSession) Start(rawUrl string) error { Log.Debugf("[%s] Do. url=%s", s.UniqueKey(), rawUrl) var ( diff --git a/pkg/rtsp/client_command_session.go b/pkg/rtsp/client_command_session.go index 426f935..fefd70d 100644 --- a/pkg/rtsp/client_command_session.go +++ b/pkg/rtsp/client_command_session.go @@ -108,7 +108,7 @@ func (session *ClientCommandSession) InitWithSdp(sdpCtx sdp.LogicContext) { session.sdpCtx = sdpCtx } -func (session *ClientCommandSession) Do(rawUrl string) error { +func (session *ClientCommandSession) Start(rawUrl string) error { var ( ctx context.Context cancel context.CancelFunc diff --git a/pkg/rtsp/client_pull_session.go b/pkg/rtsp/client_pull_session.go index 83eb655..28a7ecd 100644 --- a/pkg/rtsp/client_pull_session.go +++ b/pkg/rtsp/client_pull_session.go @@ -74,10 +74,10 @@ func (session *PullSession) WithOnDescribeResponse(onDescribeResponse func()) *P return session } -// Pull 阻塞直到和对端完成拉流前,握手部分的工作(也即收到RTSP Play response),或者发生错误 -func (session *PullSession) Pull(rawUrl string) error { +// Start 阻塞直到和对端完成拉流前,握手部分的工作(也即收到RTSP Play response),或者发生错误 +func (session *PullSession) Start(rawUrl string) error { Log.Debugf("[%s] pull. url=%s", session.UniqueKey(), rawUrl) - if err := session.cmdSession.Do(rawUrl); err != nil { + if err := session.cmdSession.Start(rawUrl); err != nil { _ = session.dispose(err) return err } @@ -128,6 +128,11 @@ func (session *PullSession) Pull(rawUrl string) error { return nil } +// Pull deprecated. use Start instead. +func (session *PullSession) Pull(rawUrl string) error { + return session.Start(rawUrl) +} + func (session *PullSession) GetSdp() sdp.LogicContext { return session.baseInSession.GetSdp() } diff --git a/pkg/rtsp/client_push_session.go b/pkg/rtsp/client_push_session.go index 763aff0..c40a26e 100644 --- a/pkg/rtsp/client_push_session.go +++ b/pkg/rtsp/client_push_session.go @@ -201,7 +201,7 @@ func (session *PushSession) push(rawUrl string) error { Log.Debugf("[%s] push. url=%s", session.UniqueKey(), rawUrl) session.cmdSession.InitWithSdp(*session.sdpCtx) session.baseOutSession.InitWithSdp(*session.sdpCtx) - if err := session.cmdSession.Do(rawUrl); err != nil { + if err := session.cmdSession.Start(rawUrl); err != nil { _ = session.dispose(err) return err }