From 3ffb0980f54a0f2bb9016add741f62733a947f3d Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 29 May 2017 18:34:41 +0800 Subject: [PATCH] For #906, #902, use connection manager to remove connection --- trunk/src/app/srs_app_caster_flv.cpp | 9 ++++- trunk/src/app/srs_app_caster_flv.hpp | 11 +++--- trunk/src/app/srs_app_server.cpp | 9 ++++- trunk/src/app/srs_app_server.hpp | 6 ++-- trunk/src/app/srs_app_thread.cpp | 52 ++++++++++++++++++++++++++++ trunk/src/app/srs_app_thread.hpp | 30 ++++++++++++++++ 6 files changed, 108 insertions(+), 9 deletions(-) diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index df60f3d00..64f57835d 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -51,10 +51,13 @@ SrsAppCasterFlv::SrsAppCasterFlv(SrsConfDirective* c) { http_mux = new SrsHttpServeMux(); output = _srs_config->get_stream_caster_output(c); + manager = new SrsCoroutineManager(); } SrsAppCasterFlv::~SrsAppCasterFlv() { + srs_freep(http_mux); + srs_freep(manager); } int SrsAppCasterFlv::initialize() @@ -65,6 +68,10 @@ int SrsAppCasterFlv::initialize() return ret; } + if ((ret = manager->start()) != ERROR_SUCCESS) { + return ret; + } + return ret; } @@ -95,7 +102,7 @@ void SrsAppCasterFlv::remove(ISrsConnection* c) // fixbug: SrsHttpConn for CasterFlv is not freed, which could cause memory leak // so, free conn which is not managed by SrsServer->conns; // @see: https://github.com/ossrs/srs/issues/826 - srs_freep(c); + manager->remove(c); } int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index dd318fb2f..dcdcee8d4 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -42,7 +42,7 @@ class SrsFlvDecoder; class SrsTcpClient; class SrsSimpleRtmpClient; -#include +#include #include #include #include @@ -52,24 +52,25 @@ class SrsSimpleRtmpClient; * the stream caster for flv stream over HTTP POST. */ class SrsAppCasterFlv : virtual public ISrsTcpHandler -, virtual public IConnectionManager, virtual public ISrsHttpHandler + , virtual public IConnectionManager, virtual public ISrsHttpHandler { private: std::string output; SrsHttpServeMux* http_mux; std::vector conns; + SrsCoroutineManager* manager; public: SrsAppCasterFlv(SrsConfDirective* c); virtual ~SrsAppCasterFlv(); public: virtual int initialize(); - // ISrsTcpHandler +// ISrsTcpHandler public: virtual int on_tcp_client(st_netfd_t stfd); - // IConnectionManager +// IConnectionManager public: virtual void remove(ISrsConnection* c); - // ISrsHttpHandler +// ISrsHttpHandler public: virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); }; diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 0893536cb..192192245 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -50,6 +50,7 @@ using namespace std; #include #include #include +#include // system interval in ms, // all resolution times should be times togother, @@ -483,6 +484,7 @@ SrsServer::SrsServer() pid_fd = -1; signal_manager = NULL; + conn_manager = new SrsCoroutineManager(); handler = NULL; ppid = ::getppid(); @@ -524,6 +526,7 @@ void SrsServer::destroy() } srs_freep(signal_manager); + srs_freep(conn_manager); } void SrsServer::dispose() @@ -742,6 +745,10 @@ int SrsServer::listen() return ret; } + if ((ret = conn_manager->start()) != ERROR_SUCCESS) { + return ret; + } + return ret; } @@ -1334,7 +1341,7 @@ void SrsServer::remove(ISrsConnection* c) // all connections are created by server, // so we free it here. - srs_freep(conn); + conn_manager->remove(c); } int SrsServer::on_reload_listen() diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index c6c049693..93c01e61b 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -54,6 +54,7 @@ class SrsAppCasterFlv; #ifdef SRS_AUTO_KAFKA class SrsKafkaProducer; #endif +class SrsCoroutineManager; // listener type for server to identify the connection, // that is, use different type to process the connection. @@ -233,8 +234,8 @@ public: * start connection service thread, destroy client. */ class SrsServer : virtual public ISrsReloadHandler -, virtual public ISrsSourceHandler -, virtual public IConnectionManager + , virtual public ISrsSourceHandler + , virtual public IConnectionManager { private: // TODO: FIXME: rename to http_api @@ -244,6 +245,7 @@ private: #ifdef SRS_AUTO_INGEST SrsIngester* ingester; #endif + SrsCoroutineManager* conn_manager; private: /** * the pid file fd, lock the file write when server is running. diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index f590b07fb..0782b11ed 100755 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -26,6 +26,58 @@ #include #include +#include +using namespace std; + +SrsCoroutineManager::SrsCoroutineManager() +{ + cond = st_cond_new(); + trd = new SrsCoroutine("manager", this); +} + +SrsCoroutineManager::~SrsCoroutineManager() +{ + srs_freep(trd); + st_cond_destroy(cond); + + clear(); +} + +int SrsCoroutineManager::start() +{ + return trd->start(); +} + +int SrsCoroutineManager::cycle() +{ + while (!trd->pull()) { + st_cond_wait(cond); + clear(); + } + + return ERROR_SUCCESS; +} + +void SrsCoroutineManager::remove(ISrsConnection* c) +{ + conns.push_back(c); + st_cond_signal(cond); +} + +void SrsCoroutineManager::clear() +{ + // To prevent thread switch when delete connection, + // we copy all connections then free one by one. + vector copy = conns; + conns.clear(); + + vector::iterator it; + for (it = copy.begin(); it != copy.end(); ++it) { + ISrsConnection* conn = *it; + srs_freep(conn); + } +} + ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler() { } diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index bd7664b59..44487e9b1 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -26,7 +26,37 @@ #include +#include + #include +#include + +/** + * The coroutine manager use a thread to delete a connection, which will stop the service + * thread, for example, when the RTMP connection thread cycle terminated, it will notify + * the manager(the server) to remove the connection from list of server and push it to + * the manager thread to delete it, finally the thread of connection will stop. + */ +class SrsCoroutineManager : virtual public ISrsCoroutineHandler, virtual public IConnectionManager +{ +private: + SrsCoroutine* trd; + std::vector conns; + st_cond_t cond; +public: + SrsCoroutineManager(); + virtual ~SrsCoroutineManager(); +public: + int start(); +// ISrsCoroutineHandler +public: + virtual int cycle(); +// IConnectionManager +public: + virtual void remove(ISrsConnection* c); +private: + void clear(); +}; /** * the one cycle thread is a thread do the cycle only one time,