From 9dbd049e792f0811e12eb30be60559d2cacc3331 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 16 Jan 2020 17:56:55 +0800 Subject: [PATCH] For #1568, extract SrsSourceManager from SrsSource. --- trunk/src/app/srs_app_http_stream.cpp | 2 +- trunk/src/app/srs_app_rtmp_conn.cpp | 2 +- trunk/src/app/srs_app_server.cpp | 4 ++-- trunk/src/app/srs_app_source.cpp | 31 +++++++++++++++++-------- trunk/src/app/srs_app_source.hpp | 33 +++++++++++++++++++-------- 5 files changed, 49 insertions(+), 23 deletions(-) diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index dddacdf92..2a0d8e939 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -1089,7 +1089,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle } SrsSource* s = NULL; - if ((err = SrsSource::fetch_or_create(r, server, &s)) != srs_success) { + if ((err = _srs_sources->fetch_or_create(r, server, &s)) != srs_success) { return srs_error_wrap(err, "source create"); } srs_assert(s != NULL); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index f846df0dc..5ffe4b0cd 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -496,7 +496,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle() // find a source to serve. SrsSource* source = NULL; - if ((err = SrsSource::fetch_or_create(req, server, &source)) != srs_success) { + if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) { return srs_error_wrap(err, "rtmp: fetch source"); } srs_assert(source != NULL); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 8c6d21470..2a3039f1a 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -523,7 +523,7 @@ void SrsServer::dispose() // @remark don't dispose ingesters, for too slow. // dispose the source for hls and dvr. - SrsSource::dispose_all(); + _srs_sources->dispose(); // @remark don't dispose all connections, for too slow. @@ -952,7 +952,7 @@ srs_error_t SrsServer::do_cycle() } // notice the stream sources to cycle. - if ((err = SrsSource::cycle_all()) != srs_success) { + if ((err = _srs_sources->cycle()) != srs_success) { return srs_error_wrap(err, "source cycle"); } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 39e704226..f48528015 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1635,9 +1635,17 @@ srs_error_t SrsMetaCache::update_vsh(SrsSharedPtrMessage* msg) return vformat->on_video(msg); } -std::map SrsSource::pool; +SrsSourceManager* _srs_sources = new SrsSourceManager(); -srs_error_t SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) +SrsSourceManager::SrsSourceManager() +{ +} + +SrsSourceManager::~SrsSourceManager() +{ +} + +srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) { srs_error_t err = srs_success; @@ -1665,7 +1673,7 @@ srs_error_t SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsS return err; } -SrsSource* SrsSource::fetch(SrsRequest* r) +SrsSource* SrsSourceManager::fetch(SrsRequest* r) { SrsSource* source = NULL; @@ -1679,12 +1687,12 @@ SrsSource* SrsSource::fetch(SrsRequest* r) // we always update the request of resource, // for origin auth is on, the token in request maybe invalid, // and we only need to update the token of request, it's simple. - source->req->update_auth(r); + source->update_auth(r); return source; } -void SrsSource::dispose_all() +void SrsSourceManager::dispose() { std::map::iterator it; for (it = pool.begin(); it != pool.end(); ++it) { @@ -1694,16 +1702,16 @@ void SrsSource::dispose_all() return; } -srs_error_t SrsSource::cycle_all() +srs_error_t SrsSourceManager::cycle() { int cid = _srs_context->get_id(); - srs_error_t err = do_cycle_all(); + srs_error_t err = do_cycle(); _srs_context->set_id(cid); return err; } -srs_error_t SrsSource::do_cycle_all() +srs_error_t SrsSourceManager::do_cycle() { srs_error_t err = srs_success; @@ -1744,7 +1752,7 @@ srs_error_t SrsSource::do_cycle_all() return err; } -void SrsSource::destroy() +void SrsSourceManager::destroy() { std::map::iterator it; for (it = pool.begin(); it != pool.end(); ++it) { @@ -1994,6 +2002,11 @@ bool SrsSource::inactive() return _can_publish; } +void SrsSource::update_auth(SrsRequest* r) +{ + req->update_auth(r); +} + bool SrsSource::can_publish(bool is_edge) { if (is_edge) { diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 847857f33..b0696346e 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -438,32 +438,43 @@ public: virtual srs_error_t update_vsh(SrsSharedPtrMessage* msg); }; -// live streaming source. -class SrsSource : public ISrsReloadHandler +// The source manager to create and refresh all stream sources. +class SrsSourceManager { - friend class SrsOriginHub; private: - static std::map pool; + std::map pool; +public: + SrsSourceManager(); + virtual ~SrsSourceManager(); public: // create source when fetch from cache failed. // @param r the client request. // @param h the event handler for source. // @param pps the matched source, if success never be NULL. - static srs_error_t fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps); + virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps); private: // Get the exists source, NULL when not exists. // update the request and return the exists source. - static SrsSource* fetch(SrsRequest* r); + virtual SrsSource* fetch(SrsRequest* r); public: // dispose and cycle all sources. - static void dispose_all(); - static srs_error_t cycle_all(); + virtual void dispose(); + virtual srs_error_t cycle(); private: - static srs_error_t do_cycle_all(); + virtual srs_error_t do_cycle(); public: // when system exit, destroy the sources, // For gmc to analysis mem leaks. - static void destroy(); + virtual void destroy(); +}; + +// Global singleton instance. +extern SrsSourceManager* _srs_sources; + +// live streaming source. +class SrsSource : public ISrsReloadHandler +{ + friend class SrsOriginHub; private: // For publish, it's the publish client id. // For edge, it's the edge ingest id. @@ -531,6 +542,8 @@ public: // Whether source is inactive, which means there is no publishing stream source. // @remark For edge, it's inactive util stream has been pulled from origin. virtual bool inactive(); + // Update the authentication information in request. + virtual void update_auth(SrsRequest* r); public: virtual bool can_publish(bool is_edge); virtual srs_error_t on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);