From 64705d1cc878b51b3ea039b47f050d42f8e8ac53 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 19 Sep 2020 10:30:05 +0800 Subject: [PATCH] RTC: Refine resouce management --- trunk/src/app/srs_app_caster_flv.cpp | 6 +- trunk/src/app/srs_app_caster_flv.hpp | 10 +- trunk/src/app/srs_app_conn.cpp | 133 ++++++++++++++++++------ trunk/src/app/srs_app_conn.hpp | 69 ++++++++---- trunk/src/app/srs_app_gb28181.cpp | 7 +- trunk/src/app/srs_app_gb28181.hpp | 3 +- trunk/src/app/srs_app_gb28181_sip.cpp | 5 + trunk/src/app/srs_app_gb28181_sip.hpp | 1 + trunk/src/app/srs_app_http_api.cpp | 7 +- trunk/src/app/srs_app_http_api.hpp | 5 +- trunk/src/app/srs_app_http_conn.cpp | 9 +- trunk/src/app/srs_app_http_conn.hpp | 7 +- trunk/src/app/srs_app_rtc_conn.cpp | 52 ++++++--- trunk/src/app/srs_app_rtc_conn.hpp | 13 ++- trunk/src/app/srs_app_rtc_server.cpp | 63 ++++------- trunk/src/app/srs_app_rtc_server.hpp | 10 +- trunk/src/app/srs_app_rtmp_conn.cpp | 5 + trunk/src/app/srs_app_rtmp_conn.hpp | 3 + trunk/src/app/srs_app_rtsp.cpp | 12 ++- trunk/src/app/srs_app_rtsp.hpp | 8 +- trunk/src/app/srs_app_server.cpp | 4 +- trunk/src/app/srs_app_server.hpp | 10 +- trunk/src/app/srs_app_st.cpp | 6 +- trunk/src/app/srs_app_st.hpp | 6 +- trunk/src/protocol/srs_service_conn.cpp | 16 ++- trunk/src/protocol/srs_service_conn.hpp | 35 +++++-- trunk/src/utest/srs_utest_rtc.cpp | 4 +- trunk/src/utest/srs_utest_service.cpp | 14 ++- trunk/src/utest/srs_utest_service.hpp | 2 + 29 files changed, 356 insertions(+), 169 deletions(-) diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 92362a90f..482def5aa 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -49,7 +49,7 @@ SrsAppCasterFlv::SrsAppCasterFlv(SrsConfDirective* c) { http_mux = new SrsHttpServeMux(); output = _srs_config->get_stream_caster_output(c); - manager = new SrsConnectionManager(); + manager = new SrsResourceManager("CasterFLV"); } SrsAppCasterFlv::~SrsAppCasterFlv() @@ -95,7 +95,7 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) return err; } -void SrsAppCasterFlv::remove(ISrsConnection* c) +void SrsAppCasterFlv::remove(ISrsResource* c) { SrsTcpConnection* conn = dynamic_cast(c); @@ -141,7 +141,7 @@ srs_error_t SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa return err; } -SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) +SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) { sdk = NULL; pprint = SrsPithyPrint::create_caster(); diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index 37d56b31a..aee43b382 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -48,13 +48,13 @@ class SrsSimpleRtmpClient; // The stream caster for flv stream over HTTP POST. class SrsAppCasterFlv : virtual public ISrsTcpHandler - , virtual public IConnectionManager, virtual public ISrsHttpHandler + , virtual public ISrsResourceManager, virtual public ISrsHttpHandler { private: std::string output; SrsHttpServeMux* http_mux; std::vector conns; - SrsConnectionManager* manager; + SrsResourceManager* manager; public: SrsAppCasterFlv(SrsConfDirective* c); virtual ~SrsAppCasterFlv(); @@ -63,9 +63,9 @@ public: // Interface ISrsTcpHandler public: virtual srs_error_t on_tcp_client(srs_netfd_t stfd); -// Interface IConnectionManager +// Interface ISrsResourceManager public: - virtual void remove(ISrsConnection* c); + virtual void remove(ISrsResource* c); // Interface ISrsHttpHandler public: virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); @@ -79,7 +79,7 @@ private: SrsPithyPrint* pprint; SrsSimpleRtmpClient* sdk; public: - SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port); + SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port); virtual ~SrsDynamicHttpConn(); public: virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg); diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 443a4980d..a097a2cab 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -31,28 +31,44 @@ using namespace std; #include #include #include +#include -SrsConnectionManager::SrsConnectionManager() +ISrsDisposingHandler::ISrsDisposingHandler() { +} + +ISrsDisposingHandler::~ISrsDisposingHandler() +{ +} + +SrsResourceManager::SrsResourceManager(const std::string& label, bool verbose) +{ + verbose_ = verbose; + label_ = label; cond = srs_cond_new(); - trd = new SrsSTCoroutine("manager", this); + trd = NULL; } -SrsConnectionManager::~SrsConnectionManager() +SrsResourceManager::~SrsResourceManager() { - srs_cond_signal(cond); - trd->stop(); + if (trd) { + srs_cond_signal(cond); + trd->stop(); - srs_freep(trd); - srs_cond_destroy(cond); + srs_freep(trd); + srs_cond_destroy(cond); + } clear(); } -srs_error_t SrsConnectionManager::start() +srs_error_t SrsResourceManager::start() { srs_error_t err = srs_success; + cid_ = _srs_context->generate_id(); + trd = new SrsSTCoroutine("manager", this, cid_); + if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "conn manager"); } @@ -60,20 +76,22 @@ srs_error_t SrsConnectionManager::start() return err; } -bool SrsConnectionManager::empty() +bool SrsResourceManager::empty() { return conns_.empty(); } -size_t SrsConnectionManager::size() +size_t SrsResourceManager::size() { return conns_.size(); } -srs_error_t SrsConnectionManager::cycle() +srs_error_t SrsResourceManager::cycle() { srs_error_t err = srs_success; + srs_trace("%s connection manager run", label_.c_str()); + while (true) { if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "conn manager"); @@ -91,67 +109,110 @@ srs_error_t SrsConnectionManager::cycle() return err; } -void SrsConnectionManager::add(ISrsConnection* conn) +void SrsResourceManager::add(ISrsResource* conn) { if (std::find(conns_.begin(), conns_.end(), conn) == conns_.end()) { conns_.push_back(conn); } } -void SrsConnectionManager::add_with_id(const std::string& id, ISrsConnection* conn) +void SrsResourceManager::add_with_id(const std::string& id, ISrsResource* conn) { add(conn); conns_id_.insert(make_pair(id, conn)); } -void SrsConnectionManager::add_with_name(const std::string& name, ISrsConnection* conn) +void SrsResourceManager::add_with_name(const std::string& name, ISrsResource* conn) { add(conn); conns_name_.insert(make_pair(name, conn)); } -ISrsConnection* SrsConnectionManager::at(int index) +ISrsResource* SrsResourceManager::at(int index) { - return conns_.at(index); + return (index < (int)conns_.size())? conns_.at(index) : NULL; } -ISrsConnection* SrsConnectionManager::find_by_id(std::string id) +ISrsResource* SrsResourceManager::find_by_id(std::string id) { - map::iterator it = conns_id_.find(id); + map::iterator it = conns_id_.find(id); return (it != conns_id_.end())? it->second : NULL; } -ISrsConnection* SrsConnectionManager::find_by_name(std::string name) +ISrsResource* SrsResourceManager::find_by_name(std::string name) { - map::iterator it = conns_name_.find(name); + map::iterator it = conns_name_.find(name); return (it != conns_name_.end())? it->second : NULL; } -void SrsConnectionManager::remove(ISrsConnection* c) +void SrsResourceManager::subscribe(ISrsDisposingHandler* h) +{ + if (std::find(handlers_.begin(), handlers_.end(), h) == handlers_.end()) { + handlers_.push_back(h); + } +} + +void SrsResourceManager::unsubscribe(ISrsDisposingHandler* h) +{ + vector::iterator it = find(handlers_.begin(), handlers_.end(), h); + if (it != handlers_.end()) { + handlers_.erase(it); + } +} + +void SrsResourceManager::remove(ISrsResource* c) { + SrsContextRestore(cid_); + if (verbose_) { + _srs_context->set_id(c->get_id()); + srs_trace("before dispose resource(%s), zombies=%d", c->desc().c_str(), (int)zombies_.size()); + } + if (std::find(zombies_.begin(), zombies_.end(), c) == zombies_.end()) { zombies_.push_back(c); - srs_cond_signal(cond); } + + for (int i = 0; i < (int)handlers_.size(); i++) { + ISrsDisposingHandler* h = handlers_.at(i); + h->on_before_dispose(c); + } + + srs_cond_signal(cond); } -void SrsConnectionManager::clear() +void SrsResourceManager::clear() { + if (zombies_.empty()) { + return; + } + + SrsContextRestore(cid_); + if (verbose_) { + srs_trace("clear zombies=%d connections", (int)zombies_.size()); + } + // To prevent thread switch when delete connection, // we copy all connections then free one by one. - vector copy; + vector copy; copy.swap(zombies_); - vector::iterator it; + vector::iterator it; for (it = copy.begin(); it != copy.end(); ++it) { - ISrsConnection* conn = *it; + ISrsResource* conn = *it; + + if (verbose_) { + _srs_context->set_id(conn->get_id()); + srs_trace("disposing resource(%s), zombies=%d/%d", conn->desc().c_str(), + (int)copy.size(), (int)zombies_.size()); + } + dispose(conn); } } -void SrsConnectionManager::dispose(ISrsConnection* c) +void SrsResourceManager::dispose(ISrsResource* c) { - for (map::iterator it = conns_name_.begin(); it != conns_name_.end();) { + for (map::iterator it = conns_name_.begin(); it != conns_name_.end();) { if (c != it->second) { ++it; } else { @@ -160,7 +221,7 @@ void SrsConnectionManager::dispose(ISrsConnection* c) } } - for (map::iterator it = conns_id_.begin(); it != conns_id_.end();) { + for (map::iterator it = conns_id_.begin(); it != conns_id_.end();) { if (c != it->second) { ++it; } else { @@ -169,15 +230,20 @@ void SrsConnectionManager::dispose(ISrsConnection* c) } } - vector::iterator it = std::find(conns_.begin(), conns_.end(), c); + vector::iterator it = std::find(conns_.begin(), conns_.end(), c); if (it != conns_.end()) { conns_.erase(it); } + for (int i = 0; i < (int)handlers_.size(); i++) { + ISrsDisposingHandler* h = handlers_.at(i); + h->on_disposing(c); + } + srs_freep(c); } -SrsTcpConnection::SrsTcpConnection(IConnectionManager* cm, srs_netfd_t c, string cip, int cport) +SrsTcpConnection::SrsTcpConnection(ISrsResourceManager* cm, srs_netfd_t c, string cip, int cport) { manager = cm; stfd = c; @@ -357,6 +423,11 @@ string SrsTcpConnection::remote_ip() return ip; } +const SrsContextId& SrsTcpConnection::get_id() +{ + return trd->cid(); +} + void SrsTcpConnection::expire() { trd->interrupt(); diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 9c0e5167d..7ae7fc610 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -37,24 +37,43 @@ class SrsWallClock; -// The connection manager remove connection and delete it asynchronously. -class SrsConnectionManager : virtual public ISrsCoroutineHandler, virtual public IConnectionManager +// Hooks for connection manager, to handle the event when disposing connections. +class ISrsDisposingHandler +{ +public: + ISrsDisposingHandler(); + virtual ~ISrsDisposingHandler(); +public: + // When before disposing resource, trigger when manager.remove(c), sync API. + virtual void on_before_dispose(ISrsResource* c) = 0; + // When disposing resource, async API, c is freed after it. + virtual void on_disposing(ISrsResource* c) = 0; +}; + +// The resource manager remove resource and delete it asynchronously. +class SrsResourceManager : virtual public ISrsCoroutineHandler, virtual public ISrsResourceManager { +private: + std::string label_; + SrsContextId cid_; + bool verbose_; private: SrsCoroutine* trd; srs_cond_t cond; + // Callback handlers. + std::vector handlers_; // The zombie connections, we will delete it asynchronously. - std::vector zombies_; + std::vector zombies_; private: // The connections without any id. - std::vector conns_; - // The connections with connection id. - std::map conns_id_; - // The connections with connection name. - std::map conns_name_; + std::vector conns_; + // The connections with resource id. + std::map conns_id_; + // The connections with resource name. + std::map conns_name_; public: - SrsConnectionManager(); - virtual ~SrsConnectionManager(); + SrsResourceManager(const std::string& label, bool verbose = false); + virtual ~SrsResourceManager(); public: srs_error_t start(); bool empty(); @@ -63,18 +82,21 @@ public: public: virtual srs_error_t cycle(); public: - void add(ISrsConnection* conn); - void add_with_id(const std::string& id, ISrsConnection* conn); - void add_with_name(const std::string& name, ISrsConnection* conn); - ISrsConnection* at(int index); - ISrsConnection* find_by_id(std::string id); - ISrsConnection* find_by_name(std::string name); -// Interface IConnectionManager + void add(ISrsResource* conn); + void add_with_id(const std::string& id, ISrsResource* conn); + void add_with_name(const std::string& name, ISrsResource* conn); + ISrsResource* at(int index); + ISrsResource* find_by_id(std::string id); + ISrsResource* find_by_name(std::string name); +public: + void subscribe(ISrsDisposingHandler* h); + void unsubscribe(ISrsDisposingHandler* h); +// Interface ISrsResourceManager public: - virtual void remove(ISrsConnection* c); + virtual void remove(ISrsResource* c); private: void clear(); - void dispose(ISrsConnection* c); + void dispose(ISrsResource* c); }; // The basic connection of SRS, for TCP based protocols, @@ -88,7 +110,7 @@ protected: // when thread stop, the connection will be delete by server. SrsCoroutine* trd; // The manager object to manage the connection. - IConnectionManager* manager; + ISrsResourceManager* manager; // The underlayer st fd handler. srs_netfd_t stfd; // The ip and port of client. @@ -106,7 +128,7 @@ protected: // for current connection to log self create time and calculate the living time. int64_t create_time; public: - SrsTcpConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip, int cport); + SrsTcpConnection(ISrsResourceManager* cm, srs_netfd_t c, std::string cip, int cport); virtual ~SrsTcpConnection(); // Interface ISrsKbpsDelta public: @@ -136,8 +158,11 @@ public: // Get the srs id which identify the client. // TODO: FIXME: Rename to cid. virtual SrsContextId srs_id(); - // Get the remote ip of peer. +// Interface ISrsConnection. +public: virtual std::string remote_ip(); + virtual const SrsContextId& get_id(); +public: // Set connection to expired. virtual void expire(); protected: diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 1ef8d58af..d0110198c 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -1011,6 +1011,11 @@ std::string SrsGb28181RtmpMuxer::remote_ip() return ""; } +std::string SrsGb28181RtmpMuxer::desc() +{ + return "GBConn"; +} + std::string SrsGb28181RtmpMuxer::get_channel_id() { return channel_id; @@ -1890,7 +1895,7 @@ SrsGb28181Manger::SrsGb28181Manger(SrsServer *s, SrsConfDirective* c) // TODO: FIXME: support reload. server = s; config = new SrsGb28181Config(c); - manager = new SrsConnectionManager(); + manager = new SrsResourceManager("GB28181"); } SrsGb28181Manger::~SrsGb28181Manger() diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 1c4f6e07a..37e36418e 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -348,6 +348,7 @@ private: public: virtual srs_error_t cycle(); virtual std::string remote_ip(); + virtual std::string desc(); public: virtual srs_error_t on_rtp_video(SrsSimpleStream* stream, int64_t dts); virtual srs_error_t on_rtp_audio(SrsSimpleStream* stream, int64_t dts, int type); @@ -473,7 +474,7 @@ private: std::map rtp_pool; std::map rtmpmuxers_ssrc; std::map rtmpmuxers; - SrsConnectionManager* manager; + SrsResourceManager* manager; SrsGb28181SipService* sip_service; SrsServer* server; public: diff --git a/trunk/src/app/srs_app_gb28181_sip.cpp b/trunk/src/app/srs_app_gb28181_sip.cpp index 45d8f493b..a1ca6c5a8 100644 --- a/trunk/src/app/srs_app_gb28181_sip.cpp +++ b/trunk/src/app/srs_app_gb28181_sip.cpp @@ -301,6 +301,11 @@ std::string SrsGb28181SipSession::remote_ip() return _peer_ip; } +std::string SrsGb28181SipSession::desc() +{ + return "SipConn"; +} + srs_error_t SrsGb28181SipSession::cycle() { srs_error_t err = do_cycle(); diff --git a/trunk/src/app/srs_app_gb28181_sip.hpp b/trunk/src/app/srs_app_gb28181_sip.hpp index bb22613e7..f4aa7432e 100644 --- a/trunk/src/app/srs_app_gb28181_sip.hpp +++ b/trunk/src/app/srs_app_gb28181_sip.hpp @@ -144,6 +144,7 @@ public: public: virtual srs_error_t cycle(); virtual std::string remote_ip(); + virtual std::string desc(); private: virtual srs_error_t do_cycle(); }; diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 04347022f..fea77eca3 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1674,7 +1674,7 @@ srs_error_t SrsGoApiTcmalloc::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess } #endif -SrsHttpApi::SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) +SrsHttpApi::SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsTcpConnection(cm, fd, cip, port) { mux = m; @@ -1692,6 +1692,11 @@ SrsHttpApi::~SrsHttpApi() _srs_config->unsubscribe(this); } +std::string SrsHttpApi::desc() +{ + return "HttpConn"; +} + void SrsHttpApi::remark(int64_t* in, int64_t* out) { // TODO: FIXME: implements it diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index 00035b222..02a94c3ab 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -261,8 +261,11 @@ private: SrsHttpCorsMux* cors; SrsHttpServeMux* mux; public: - SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port); + SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port); virtual ~SrsHttpApi(); +// Interface ISrsResource. +public: + virtual std::string desc(); // Interface ISrsKbpsDelta public: virtual void remark(int64_t* in, int64_t* out); diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index c8e1cdb1d..a0ca69125 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -59,7 +59,7 @@ using namespace std; #include #include -SrsHttpConn::SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) +SrsHttpConn::SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsTcpConnection(cm, fd, cip, port) { parser = new SrsHttpParser(); @@ -73,6 +73,11 @@ SrsHttpConn::~SrsHttpConn() srs_freep(cors); } +std::string SrsHttpConn::desc() +{ + return "HttpConn"; +} + void SrsHttpConn::remark(int64_t* in, int64_t* out) { // TODO: FIXME: implements it @@ -185,7 +190,7 @@ srs_error_t SrsHttpConn::on_reload_http_stream_crossdomain() return err; } -SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) +SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) { } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 9bc296c58..a545fa817 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -62,8 +62,11 @@ protected: ISrsHttpServeMux* http_mux; SrsHttpCorsMux* cors; public: - SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); + SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); virtual ~SrsHttpConn(); +// Interface ISrsResource. +public: + virtual std::string desc(); // Interface ISrsKbpsDelta public: virtual void remark(int64_t* in, int64_t* out); @@ -89,7 +92,7 @@ public: class SrsResponseOnlyHttpConn : public SrsHttpConn { public: - SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); + SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); virtual ~SrsResponseOnlyHttpConn(); public: // Directly read a HTTP request message. diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index d8f03e26c..06de1fe87 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1603,10 +1603,14 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) nn_simulate_player_nack_drop = 0; pp_address_change = new SrsErrorPithyPrint(); pli_epp = new SrsErrorPithyPrint(); + + _srs_rtc_manager->subscribe(this); } SrsRtcConnection::~SrsRtcConnection() { + _srs_rtc_manager->unsubscribe(this); + srs_freep(timer_); // Cleanup publishers. @@ -1640,6 +1644,31 @@ SrsRtcConnection::~SrsRtcConnection() srs_freep(pli_epp); } +void SrsRtcConnection::on_before_dispose(ISrsResource* c) +{ + if (disposing_) { + return; + } + + SrsRtcConnection* session = dynamic_cast(c); + if (session == this) { + disposing_ = true; + } + + if (session && session == this) { + _srs_context->set_id(cid_); + srs_trace("RTC: session detach from [%s](%s), disposing=%d", c->get_id().c_str(), + c->desc().c_str(), disposing_); + } +} + +void SrsRtcConnection::on_disposing(ISrsResource* c) +{ + if (disposing_) { + return; + } +} + SrsSdp* SrsRtcConnection::get_local_sdp() { return &local_sdp; @@ -1688,12 +1717,14 @@ vector SrsRtcConnection::peer_addresses() return addresses; } -string SrsRtcConnection::remote_ip() +const SrsContextId& SrsRtcConnection::get_id() { - if (sendonly_skt) { - return sendonly_skt->get_peer_ip(); - } - return ""; + return cid_; +} + +std::string SrsRtcConnection::desc() +{ + return "RtcConn"; } void SrsRtcConnection::switch_to_context() @@ -2126,17 +2157,14 @@ srs_error_t SrsRtcConnection::on_dtls_alert(std::string type, std::string desc) { srs_error_t err = srs_success; - SrsRtcConnection* session = this; - // CN(Close Notify) is sent when client close the PeerConnection. if (type == "warning" && desc == "CN") { SrsContextRestore(_srs_context->get_id()); - session->switch_to_context(); - - string username = session->username(); - srs_trace("RTC: session DTLS alert, username=%s, summary: %s", username.c_str(), session->stat_->summary().c_str()); + switch_to_context(); - server_->dispose(session); + srs_trace("RTC: session destroy by DTLS alert, username=%s, summary: %s", + username_.c_str(), stat_->summary().c_str()); + _srs_rtc_manager->remove(this); } return err; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 561ad2459..bd805b8ab 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -416,7 +417,8 @@ public: }; // A RTC Peer Connection, SDP level object. -class SrsRtcConnection : virtual public ISrsHourGlass, virtual public ISrsConnection +class SrsRtcConnection : virtual public ISrsHourGlass, virtual public ISrsResource + , virtual public ISrsDisposingHandler { friend class SrsSecurityTransport; friend class SrsRtcPlayStream; @@ -469,6 +471,10 @@ private: public: SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid); virtual ~SrsRtcConnection(); +// interface ISrsDisposingHandler +public: + virtual void on_before_dispose(ISrsResource* c); + virtual void on_disposing(ISrsResource* c); public: // TODO: FIXME: save only connection info. SrsSdp* get_local_sdp(); @@ -482,9 +488,10 @@ public: std::string username(); // Get all addresses client used. std::vector peer_addresses(); -// interface ISrsConnection +// Interface ISrsResource. public: - virtual std::string remote_ip(); + virtual const SrsContextId& get_id(); + virtual std::string desc(); public: void switch_to_context(); const SrsContextId& context_id(); diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 855ad1309..92affba4c 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -216,7 +216,6 @@ SrsRtcServer::SrsRtcServer() { handler = NULL; hijacker = NULL; - manager = new SrsConnectionManager(); timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); } @@ -231,8 +230,6 @@ SrsRtcServer::~SrsRtcServer() srs_freep(listener); } } - - srs_freep(manager); } srs_error_t SrsRtcServer::initialize() @@ -251,10 +248,6 @@ srs_error_t SrsRtcServer::initialize() return srs_error_wrap(err, "black hole"); } - if ((err = manager->start()) != srs_success) { - return srs_error_wrap(err, "start manager"); - } - srs_trace("RTC server init ok"); return err; @@ -311,7 +304,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) SrsRtcConnection* session = NULL; if (true) { - ISrsConnection* conn = manager->find_by_id(peer_id); + ISrsResource* conn = _srs_rtc_manager->find_by_id(peer_id); if (conn) { // Switch to the session to write logs to the context. session = dynamic_cast(conn); @@ -458,7 +451,7 @@ srs_error_t SrsRtcServer::do_create_session( local_ufrag = srs_random_str(8); username = local_ufrag + ":" + remote_sdp.get_ice_ufrag(); - if (!manager->find_by_name(username)) { + if (!_srs_rtc_manager->find_by_name(username)) { break; } } @@ -504,7 +497,7 @@ srs_error_t SrsRtcServer::do_create_session( } // We allows username is optional, but it never empty here. - manager->add_with_name(username, session); + _srs_rtc_manager->add_with_name(username, session); return err; } @@ -567,7 +560,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest* } // We allows username is optional, but it never empty here. - manager->add_with_name(username, session); + _srs_rtc_manager->add_with_name(username, session); session->set_remote_sdp(remote_sdp); session->set_state(WAITING_STUN); @@ -575,37 +568,14 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest* return err; } -void SrsRtcServer::dispose(SrsRtcConnection* session) -{ - if (session->disposing_) { - return; - } - - destroy(session); - - if (handler) { - handler->on_timeout(session); - } -} - -void SrsRtcServer::destroy(SrsRtcConnection* session) -{ - if (session->disposing_) { - return; - } - session->disposing_ = true; - - manager->remove(session); -} - void SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session) { - manager->add_with_id(peer_id, session); + _srs_rtc_manager->add_with_id(peer_id, session); } SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& username) { - ISrsConnection* conn = manager->find_by_name(username); + ISrsResource* conn = _srs_rtc_manager->find_by_name(username); return dynamic_cast(conn); } @@ -614,11 +584,9 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic srs_error_t err = srs_success; // Check all sessions and dispose the dead sessions. - for (int i = 0; i < (int)manager->size(); i++) { - SrsRtcConnection* session = dynamic_cast(manager->at(i)); - srs_assert(session); - - if (!session->is_stun_timeout()) { + for (int i = 0; i < (int)_srs_rtc_manager->size(); i++) { + SrsRtcConnection* session = dynamic_cast(_srs_rtc_manager->at(i)); + if (!session || !session->is_stun_timeout() || session->disposing_) { continue; } @@ -626,10 +594,11 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic session->switch_to_context(); string username = session->username(); - srs_trace("RTC: session STUN timeout, username=%s, summary: %s", username.c_str(), session->stat_->summary().c_str()); + srs_trace("RTC: session destroy by timeout, username=%s, summary: %s", username.c_str(), + session->stat_->summary().c_str()); - // Destroy session and notify the handler. - dispose(session); + // Use manager to free session and notify other objects. + _srs_rtc_manager->remove(session); } return err; @@ -672,6 +641,10 @@ srs_error_t RtcServerAdapter::run() return srs_error_wrap(err, "listen api"); } + if ((err = _srs_rtc_manager->start()) != srs_success) { + return srs_error_wrap(err, "start manager"); + } + return err; } @@ -679,3 +652,5 @@ void RtcServerAdapter::stop() { } +SrsResourceManager* _srs_rtc_manager = new SrsResourceManager("RTC", true); + diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index d1bf3890e..e1f7d7397 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -40,7 +40,7 @@ class SrsRtcConnection; class SrsRequest; class SrsSdp; class SrsRtcStream; -class SrsConnectionManager; +class SrsResourceManager; // The UDP black hole, for developer to use wireshark to catch plaintext packets. // For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole, @@ -92,7 +92,6 @@ private: std::vector listeners; ISrsRtcServerHandler* handler; ISrsRtcServerHijacker* hijacker; - SrsConnectionManager* manager; public: SrsRtcServer(); virtual ~SrsRtcServer(); @@ -123,10 +122,6 @@ public: // We start offering, create_session2 to generate offer, setup_session2 to handle answer. srs_error_t create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, bool unified_plan, SrsRtcConnection** psession); srs_error_t setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp); - // Destroy the session and notify the callback. - void dispose(SrsRtcConnection* session); - // Destroy the session from server, without notify callback. - void destroy(SrsRtcConnection* session); public: void insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session); public: @@ -150,5 +145,8 @@ public: virtual void stop(); }; +// Manager for RTC connections. +extern SrsResourceManager* _srs_rtc_manager; + #endif diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 6145231a8..ae38ebcce 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -140,6 +140,11 @@ SrsRtmpConn::~SrsRtmpConn() srs_freep(security); } +std::string SrsRtmpConn::desc() +{ + return "RtmpConn"; +} + void SrsRtmpConn::dispose() { SrsTcpConnection::dispose(); diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index ccafe09d8..be9f04bc7 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -119,6 +119,9 @@ private: public: SrsRtmpConn(SrsServer* svr, srs_netfd_t c, std::string cip, int port); virtual ~SrsRtmpConn(); +// Interface ISrsResource. +public: + virtual std::string desc(); public: virtual void dispose(); protected: diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 59ca3c7ff..577cb8679 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -256,6 +256,16 @@ std::string SrsRtspConn::remote_ip() return ""; } +std::string SrsRtspConn::desc() +{ + return "RtspConn"; +} + +const SrsContextId& SrsRtspConn::get_id() +{ + return _srs_context->get_id(); +} + srs_error_t SrsRtspConn::do_cycle() { srs_error_t err = srs_success; @@ -712,7 +722,7 @@ SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c) output = _srs_config->get_stream_caster_output(c); local_port_min = _srs_config->get_stream_caster_rtp_port_min(c); local_port_max = _srs_config->get_stream_caster_rtp_port_max(c); - manager = new SrsConnectionManager(); + manager = new SrsResourceManager("CasterRTSP"); } SrsRtspCaster::~SrsRtspCaster() diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index bd3c418a7..ca79a359e 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -51,7 +51,7 @@ class SrsAudioFrame; class SrsSimpleStream; class SrsPithyPrint; class SrsSimpleRtmpClient; -class SrsConnectionManager; +class SrsResourceManager; // A rtp connection which transport a stream. class SrsRtpConn: public ISrsUdpHandler @@ -144,7 +144,11 @@ public: virtual ~SrsRtspConn(); public: virtual srs_error_t serve(); +// Interface ISrsConnection. +public: virtual std::string remote_ip(); + virtual const SrsContextId& get_id(); + virtual std::string desc(); private: virtual srs_error_t do_cycle(); // internal methods @@ -182,7 +186,7 @@ private: std::map used_ports; private: std::vector clients; - SrsConnectionManager* manager; + SrsResourceManager* manager; public: SrsRtspCaster(SrsConfDirective* c); virtual ~SrsRtspCaster(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 761ad6b5c..279758c22 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -656,7 +656,7 @@ SrsServer::SrsServer() pid_fd = -1; signal_manager = new SrsSignalManager(this); - conn_manager = new SrsConnectionManager(); + conn_manager = new SrsResourceManager("RTMP/API"); handler = NULL; ppid = ::getppid(); @@ -1575,7 +1575,7 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon return err; } -void SrsServer::remove(ISrsConnection* c) +void SrsServer::remove(ISrsResource* c) { SrsTcpConnection* conn = dynamic_cast(c); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 7d8677bc8..4fd4fdbae 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -53,7 +53,7 @@ class SrsUdpListener; class SrsTcpListener; class SrsAppCasterFlv; class SrsRtspCaster; -class SrsConnectionManager; +class SrsResourceManager; class SrsGb28181Caster; @@ -241,7 +241,7 @@ public: }; // SRS RTMP server, initialize and listen, start connection service thread, destroy client. -class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler, virtual public IConnectionManager +class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler, virtual public ISrsResourceManager { private: // TODO: FIXME: rename to http_api @@ -249,7 +249,7 @@ private: SrsHttpServer* http_server; SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; - SrsConnectionManager* conn_manager; + SrsResourceManager* conn_manager; private: // The pid file fd, lock the file write when server is running. // @remark the init.d script should cleanup the pid file, when stop service, @@ -342,12 +342,12 @@ public: virtual SrsHttpServeMux* api_server(); private: virtual srs_error_t fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpConnection** pconn); -// Interface IConnectionManager +// Interface ISrsResourceManager public: // A callback for connection to remove itself. // When connection thread cycle terminated, callback this to delete connection. // @see SrsTcpConnection.on_thread_stop(). - virtual void remove(ISrsConnection* c); + virtual void remove(ISrsResource* c); // Interface ISrsReloadHandler. public: virtual srs_error_t on_reload_listen(); diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index 5fc7fe528..71b6111d2 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -74,9 +74,9 @@ srs_error_t SrsDummyCoroutine::pull() return srs_error_new(ERROR_THREAD_DUMMY, "dummy pull"); } -SrsContextId SrsDummyCoroutine::cid() +const SrsContextId& SrsDummyCoroutine::cid() { - return SrsContextId(); + return _srs_context->get_id(); } _ST_THREAD_CREATE_PFN _pfn_st_thread_create = (_ST_THREAD_CREATE_PFN)st_thread_create; @@ -189,7 +189,7 @@ srs_error_t SrsSTCoroutine::pull() return srs_error_copy(trd_err); } -SrsContextId SrsSTCoroutine::cid() +const SrsContextId& SrsSTCoroutine::cid() { return cid_; } diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 2db3ccf56..8931d1931 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -81,7 +81,7 @@ public: // @return a copy of error, which should be freed by user. // NULL if not terminated and user should pull again. virtual srs_error_t pull() = 0; - virtual SrsContextId cid() = 0; + virtual const SrsContextId& cid() = 0; }; // An empty coroutine, user can default to this object before create any real coroutine. @@ -96,7 +96,7 @@ public: virtual void stop(); virtual void interrupt(); virtual srs_error_t pull(); - virtual SrsContextId cid(); + virtual const SrsContextId& cid(); }; // For utest to mock the thread create. @@ -156,7 +156,7 @@ public: // @remark Return ERROR_THREAD_INTERRUPED when thread is interrupted. virtual srs_error_t pull(); // Get the context id of thread. - virtual SrsContextId cid(); + virtual const SrsContextId& cid(); private: virtual srs_error_t cycle(); static void* pfn(void* arg); diff --git a/trunk/src/protocol/srs_service_conn.cpp b/trunk/src/protocol/srs_service_conn.cpp index 1269acdca..66b23a49e 100644 --- a/trunk/src/protocol/srs_service_conn.cpp +++ b/trunk/src/protocol/srs_service_conn.cpp @@ -23,19 +23,27 @@ #include -ISrsConnection::ISrsConnection() +ISrsResource::ISrsResource() { } -ISrsConnection::~ISrsConnection() +ISrsResource::~ISrsResource() +{ +} + +ISrsResourceManager::ISrsResourceManager() { } -IConnectionManager::IConnectionManager() +ISrsResourceManager::~ISrsResourceManager() { } -IConnectionManager::~IConnectionManager() +ISrsConnection::ISrsConnection() +{ +} + +ISrsConnection::~ISrsConnection() { } diff --git a/trunk/src/protocol/srs_service_conn.hpp b/trunk/src/protocol/srs_service_conn.hpp index 806acb8dd..c4c2e2602 100644 --- a/trunk/src/protocol/srs_service_conn.hpp +++ b/trunk/src/protocol/srs_service_conn.hpp @@ -28,26 +28,39 @@ #include -// The connection interface for all HTTP/RTMP/RTSP object. -class ISrsConnection +// The resource managed by ISrsResourceManager. +class ISrsResource { public: - ISrsConnection(); - virtual ~ISrsConnection(); + ISrsResource(); + virtual ~ISrsResource(); public: - // Get remote ip address. - virtual std::string remote_ip() = 0; + // Get the context id of connection. + virtual const SrsContextId& get_id() = 0; + // The resource description, optional. + virtual std::string desc() = 0; }; -// The manager for connection. -class IConnectionManager +// The manager for resource. +class ISrsResourceManager { public: - IConnectionManager(); - virtual ~IConnectionManager(); + ISrsResourceManager(); + virtual ~ISrsResourceManager(); public: // Remove then free the specified connection. - virtual void remove(ISrsConnection* c) = 0; + virtual void remove(ISrsResource* c) = 0; +}; + +// The connection interface for all HTTP/RTMP/RTSP object. +class ISrsConnection : public ISrsResource +{ +public: + ISrsConnection(); + virtual ~ISrsConnection(); +public: + // Get remote ip address. + virtual std::string remote_ip() = 0; }; #endif diff --git a/trunk/src/utest/srs_utest_rtc.cpp b/trunk/src/utest/srs_utest_rtc.cpp index 96bf960a6..e2625ce99 100644 --- a/trunk/src/utest/srs_utest_rtc.cpp +++ b/trunk/src/utest/srs_utest_rtc.cpp @@ -43,7 +43,7 @@ VOID TEST(KernelRTCTest, ConnectionManagerTest) // Normal scenario, free object by manager. if (true) { - SrsConnectionManager manager; + SrsResourceManager manager; HELPER_EXPECT_SUCCESS(manager.start()); EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty()); @@ -58,7 +58,7 @@ VOID TEST(KernelRTCTest, ConnectionManagerTest) // Coroutine switch context, signal is lost. if (true) { - SrsConnectionManager manager; + SrsResourceManager manager; HELPER_EXPECT_SUCCESS(manager.start()); EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty()); diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index 7cace63cc..3b54f9a6f 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -54,6 +54,16 @@ MockSrsConnection::~MockSrsConnection() } } +const SrsContextId& MockSrsConnection::get_id() +{ + return _srs_context->get_id(); +} + +std::string MockSrsConnection::desc() +{ + return "Mock"; +} + std::string MockSrsConnection::remote_ip() { return "127.0.0.1"; @@ -1371,7 +1381,7 @@ VOID TEST(HTTPClientTest, HTTPClientUtility) } } -class MockConnectionManager : public IConnectionManager +class MockConnectionManager : public ISrsResourceManager { public: MockConnectionManager() { @@ -1379,7 +1389,7 @@ public: virtual ~MockConnectionManager() { } public: - virtual void remove(ISrsConnection* /*c*/) { + virtual void remove(ISrsResource* /*c*/) { } }; diff --git a/trunk/src/utest/srs_utest_service.hpp b/trunk/src/utest/srs_utest_service.hpp index d60254a2b..96a0c01ab 100644 --- a/trunk/src/utest/srs_utest_service.hpp +++ b/trunk/src/utest/srs_utest_service.hpp @@ -40,6 +40,8 @@ public: public: MockSrsConnection(); virtual ~MockSrsConnection(); + virtual const SrsContextId& get_id(); + virtual std::string desc(); virtual std::string remote_ip(); };