From 96a5c7b1abb3252e796fc2298e4d6793998478f2 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 3 May 2014 22:59:21 +0800 Subject: [PATCH] fix mem leak of encoder, edge and source. add destroy for gmc to detect mem leak. to 0.9.89 --- trunk/src/app/srs_app_conn.cpp | 86 ++++++++++++++--------------- trunk/src/app/srs_app_conn.hpp | 11 ++-- trunk/src/app/srs_app_edge.cpp | 3 + trunk/src/app/srs_app_encoder.cpp | 1 + trunk/src/app/srs_app_rtmp_conn.cpp | 2 + trunk/src/app/srs_app_server.cpp | 72 ++++++++++++++++-------- trunk/src/app/srs_app_source.cpp | 21 ++++--- trunk/src/app/srs_app_source.hpp | 5 ++ trunk/src/app/srs_app_thread.cpp | 5 ++ trunk/src/app/srs_app_thread.hpp | 25 ++++++--- trunk/src/core/srs_core.hpp | 2 +- 11 files changed, 144 insertions(+), 89 deletions(-) diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 3e0352231..8e520712d 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -35,26 +35,59 @@ SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd) server = srs_server; stfd = client_stfd; connection_id = 0; + pthread = new SrsThread(this, 0); } SrsConnection::~SrsConnection() { - srs_freepa(ip); - srs_close_stfd(stfd); + stop(); } int SrsConnection::start() +{ + return pthread->start(); +} + +int SrsConnection::cycle() { int ret = ERROR_SUCCESS; - if (st_thread_create(cycle_thread, this, 0, 0) == NULL) { - ret = ERROR_ST_CREATE_CYCLE_THREAD; - srs_error("st_thread_create conn cycle thread error. ret=%d", ret); - return ret; + _srs_context->generate_id(); + connection_id = _srs_context->get_id(); + + ret = do_cycle(); + + // if socket io error, set to closed. + if (srs_is_client_gracefully_close(ret)) { + ret = ERROR_SOCKET_CLOSED; } - srs_verbose("create st conn cycle thread success."); - return ret; + // success. + if (ret == ERROR_SUCCESS) { + srs_trace("client process normally finished. ret=%d", ret); + } + + // client close peer. + if (ret == ERROR_SOCKET_CLOSED) { + srs_warn("client disconnect peer. ret=%d", ret); + } + + // set loop to stop to quit. + pthread->stop_loop(); + + return ERROR_SUCCESS; +} + +void SrsConnection::on_thread_stop() +{ + server->remove(this); +} + +void SrsConnection::stop() +{ + srs_close_stfd(stfd); + srs_freep(pthread); + srs_freepa(ip); } int SrsConnection::get_peer_ip() @@ -92,40 +125,3 @@ int SrsConnection::get_peer_ip() return ret; } -void SrsConnection::cycle() -{ - int ret = ERROR_SUCCESS; - - _srs_context->generate_id(); - connection_id = _srs_context->get_id(); - - ret = do_cycle(); - - // if socket io error, set to closed. - if (srs_is_client_gracefully_close(ret)) { - ret = ERROR_SOCKET_CLOSED; - } - - // success. - if (ret == ERROR_SUCCESS) { - srs_trace("client process normally finished. ret=%d", ret); - } - - // client close peer. - if (ret == ERROR_SOCKET_CLOSED) { - srs_warn("client disconnect peer. ret=%d", ret); - } - - server->remove(this); -} - -void* SrsConnection::cycle_thread(void* arg) -{ - SrsConnection* conn = (SrsConnection*)arg; - srs_assert(conn != NULL); - - conn->cycle(); - - return NULL; -} - diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 07b2ad9d2..9a6f34c61 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -31,10 +31,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include class SrsServer; -class SrsConnection +class SrsConnection : public ISrsThreadHandler { +private: + SrsThread* pthread; protected: char* ip; SrsServer* server; @@ -45,13 +48,13 @@ public: virtual ~SrsConnection(); public: virtual int start(); + virtual int cycle(); + virtual void on_thread_stop(); protected: virtual int do_cycle() = 0; + virtual void stop(); protected: virtual int get_peer_ip(); -private: - virtual void cycle(); - static void* cycle_thread(void* arg); }; #endif \ No newline at end of file diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 5aa6d0cac..9658da2cf 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -335,6 +335,9 @@ SrsEdgeForwarder::SrsEdgeForwarder() SrsEdgeForwarder::~SrsEdgeForwarder() { stop(); + + srs_freep(pthread); + srs_freep(queue); } void SrsEdgeForwarder::set_queue_size(double queue_size) diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp index 3d4c19d9b..66f3f0bec 100644 --- a/trunk/src/app/srs_app_encoder.cpp +++ b/trunk/src/app/srs_app_encoder.cpp @@ -53,6 +53,7 @@ SrsEncoder::~SrsEncoder() on_unpublish(); srs_freep(pthread); + srs_freep(pithy_print); } int SrsEncoder::on_publish(SrsRequest* req) diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index d613db2ed..53f25ad25 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -79,6 +79,8 @@ SrsRtmpConn::~SrsRtmpConn() { _srs_config->unsubscribe(this); + stop(); + srs_freep(req); srs_freep(res); srs_freep(rtmp); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 5d03ed506..b6f6733d2 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifdef SRS_AUTO_INGEST #include #endif +#include #define SERVER_LISTEN_BACKLOG 512 @@ -251,6 +252,13 @@ int SrsSignalManager::start() sa.sa_flags = 0; sigaction(SIGINT, &sa, NULL); + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGUSR2, &sa, NULL); + + srs_trace("signal installed"); + return pthread->start(); } @@ -316,27 +324,17 @@ SrsServer::~SrsServer() void SrsServer::destroy() { - _srs_config->unsubscribe(this); + srs_warn("start destroy server"); - if (true) { - std::vector::iterator it; - for (it = conns.begin(); it != conns.end(); ++it) { - SrsConnection* conn = *it; - srs_freep(conn); - } - conns.clear(); - } + _srs_config->unsubscribe(this); close_listeners(SrsListenerRtmpStream); close_listeners(SrsListenerHttpApi); close_listeners(SrsListenerHttpStream); - - if (pid_fd > 0) { - ::close(pid_fd); - pid_fd = -1; - } - - srs_freep(signal_manager); + +#ifdef SRS_AUTO_INGEST + ingester->stop(); +#endif #ifdef SRS_AUTO_HTTP_API srs_freep(http_api_handler); @@ -348,6 +346,27 @@ void SrsServer::destroy() #ifdef SRS_AUTO_INGEST srs_freep(ingester); #endif + + if (pid_fd > 0) { + ::close(pid_fd); + pid_fd = -1; + } + + srs_freep(signal_manager); + + for (std::vector::iterator it = conns.begin(); it != conns.end();) { + SrsConnection* conn = *it; + + // remove the connection, then free it, + // for the free will remove itself from server, + // when erased here, the remove of server will ignore. + it = conns.erase(it); + + srs_freep(conn); + } + conns.clear(); + + SrsSource::destroy(); } int SrsServer::initialize() @@ -540,12 +559,14 @@ int SrsServer::cycle() ret = do_cycle(); -#ifdef SRS_AUTO_INGEST - ingester->stop(); -#endif - destroy(); +#ifdef SRS_AUTO_GPERF_MC + srs_warn("sleep a long time for system st-threads to cleanup."); + st_usleep(3 * 1000 * 1000); + srs_warn("system quit"); +#endif + return ret; } @@ -553,10 +574,14 @@ void SrsServer::remove(SrsConnection* conn) { std::vector::iterator it = std::find(conns.begin(), conns.end(), conn); - if (it != conns.end()) { - conns.erase(it); + // removed by destroy, ignore. + if (it == conns.end()) { + srs_warn("server moved connection, ignore."); + return; } + conns.erase(it); + srs_info("conn removed. conns=%d", (int)conns.size()); // all connections are created by server, @@ -571,7 +596,7 @@ void SrsServer::on_signal(int signo) return; } - if (signo == SIGINT) { + if (signo == SIGINT || signo == SIGUSR2) { #ifdef SRS_AUTO_GPERF_MC srs_trace("gmc is on, main cycle will terminate normally."); signal_gmc_stop = true; @@ -611,6 +636,7 @@ int SrsServer::do_cycle() // because directly exit will cause core-dump. #ifdef SRS_AUTO_GPERF_MC if (signal_gmc_stop) { + srs_warn("gmc got singal to stop server."); return ret; } #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 23efd4e4e..5c9aeca1e 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -437,6 +437,16 @@ int SrsSource::find(SrsRequest* req, SrsSource** ppsource) return ret; } +void SrsSource::destroy() +{ + std::map::iterator it; + for (it = pool.begin(); it != pool.end(); ++it) { + SrsSource* source = it->second; + srs_freep(source); + } + pool.clear(); +} + SrsSource::SrsSource(SrsRequest* req) { _req = req->copy(); @@ -468,14 +478,9 @@ SrsSource::~SrsSource() { _srs_config->unsubscribe(this); - if (true) { - std::vector::iterator it; - for (it = consumers.begin(); it != consumers.end(); ++it) { - SrsConsumer* consumer = *it; - srs_freep(consumer); - } - consumers.clear(); - } + // never free the consumers, + // for all consumers are auto free. + consumers.clear(); if (true) { std::vector::iterator it; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index a492dac49..914c4335f 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -222,6 +222,11 @@ public: * @remark stream_url should without port and schema. */ static int find(SrsRequest* req, SrsSource** ppsource); + /** + * when system exit, destroy the sources, + * for gmc to analysis mem leaks. + */ + static void destroy(); private: // deep copy of client request. SrsRequest* _req; diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index e19f064f7..cec5e4959 100644 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -121,6 +121,11 @@ bool SrsThread::can_loop() return loop; } +void SrsThread::stop_loop() +{ + loop = false; +} + void SrsThread::thread_cycle() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index fef83aa54..46ceefc9e 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -49,19 +49,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * when thread interrupt, the socket maybe not got EINT, * espectially on st_usleep(), so the cycle must check the loop, * when handler->cycle() has loop itself, for example: -* handler->cycle() is: -* while (true): -* st_usleep(0); -* if (read_from_socket(skt) < 0) break; +* while (true): +* st_usleep(0); +* if (read_from_socket(skt) < 0) break; * if thread stop when read_from_socket, it's ok, the loop will break, * but when thread stop interrupt the s_usleep(0), then the loop is * death loop. * in a word, the handler->cycle() must: -* handler->cycle() is: -* while (pthread->can_loop()): -* st_usleep(0); -* if (read_from_socket(skt) < 0) break; +* while (pthread->can_loop()): +* st_usleep(0); +* if (read_from_socket(skt) < 0) break; * check the loop, then it works. +* +* in the thread itself, that is the cycle method, +* if itself want to terminate the thread, should never use stop(), +* but use stop_loop() to set the loop to false and terminate normally. */ class ISrsThreadHandler { @@ -117,12 +119,19 @@ public: * @remark user can stop multiple times, ignore if already stopped. */ virtual void stop(); +public: /** * whether the thread should loop, * used for handler->cycle() which has a loop method, * to check this method, break if false. */ virtual bool can_loop(); + /** + * for the loop thread to stop the loop. + * other thread can directly use stop() to stop loop and wait for quit. + * this stop loop method only set loop to false. + */ + virtual void stop_loop(); private: virtual void thread_cycle(); static void* thread_fun(void* arg); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 5c4aeda11..4155dbf1e 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "88" +#define VERSION_REVISION "89" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "srs"