From 7114682eecfe2a9d5b197da5cc4ff26daef35651 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 9 Feb 2021 17:15:25 +0800 Subject: [PATCH] Timer: Apply timer(HourGlass) to server and sources --- trunk/src/app/srs_app_config.cpp | 4 +- trunk/src/app/srs_app_hourglass.hpp | 2 +- trunk/src/app/srs_app_server.cpp | 276 ++++++++++++---------------- trunk/src/app/srs_app_server.hpp | 8 + trunk/src/app/srs_app_source.cpp | 37 ++-- trunk/src/app/srs_app_source.hpp | 10 +- 6 files changed, 163 insertions(+), 174 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index bbd0ab46c..0d2575f3f 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -8282,7 +8282,7 @@ bool SrsConfig::get_heartbeat_enabled() srs_utime_t SrsConfig::get_heartbeat_interval() { - static srs_utime_t DEFAULT = (srs_utime_t)(9.9 * SRS_UTIME_SECONDS); + static srs_utime_t DEFAULT = (srs_utime_t)(10 * SRS_UTIME_SECONDS); SrsConfDirective* conf = get_heartbeart(); if (!conf) { @@ -8294,7 +8294,7 @@ srs_utime_t SrsConfig::get_heartbeat_interval() return DEFAULT; } - return (srs_utime_t)(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS); + return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS); } string SrsConfig::get_heartbeat_url() diff --git a/trunk/src/app/srs_app_hourglass.hpp b/trunk/src/app/srs_app_hourglass.hpp index c297729f4..e75e040a6 100644 --- a/trunk/src/app/srs_app_hourglass.hpp +++ b/trunk/src/app/srs_app_hourglass.hpp @@ -59,7 +59,7 @@ public: // It's a complex and high-performance timer. // // Usage: -// SrsHourGlass* hg = new SrsHourGlass(handler, 100 * SRS_UTIME_MILLISECONDS); +// SrsHourGlass* hg = new SrsHourGlass("nack", handler, 100 * SRS_UTIME_MILLISECONDS); // // hg->tick(1, 300 * SRS_UTIME_MILLISECONDS); // hg->tick(2, 500 * SRS_UTIME_MILLISECONDS); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index d3d520ba1..8d069161c 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -56,47 +56,6 @@ using namespace std; #include #include -// system interval in srs_utime_t, -// all resolution times should be times togother, -// for example, system-interval is x=1s(1000ms), -// then rusage can be 3*x, for instance, 3*1=3s, -// the meminfo canbe 6*x, for instance, 6*1=6s, -// for performance refine, @see: https://github.com/ossrs/srs/issues/194 -// @remark, recomment to 1000ms. -#define SRS_SYS_CYCLE_INTERVAL (1000 * SRS_UTIME_MILLISECONDS) - -// update time interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_TIME_RESOLUTION_MS_TIMES -#define SRS_SYS_TIME_RESOLUTION_MS_TIMES 1 - -// update rusage interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_RUSAGE_RESOLUTION_TIMES -#define SRS_SYS_RUSAGE_RESOLUTION_TIMES 3 - -// update network devices info interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES -#define SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES 3 - -// update rusage interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_CPU_STAT_RESOLUTION_TIMES -#define SRS_SYS_CPU_STAT_RESOLUTION_TIMES 3 - -// update the disk iops interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_DISK_STAT_RESOLUTION_TIMES -#define SRS_SYS_DISK_STAT_RESOLUTION_TIMES 6 - -// update rusage interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_MEMINFO_RESOLUTION_TIMES -#define SRS_SYS_MEMINFO_RESOLUTION_TIMES 6 - -// update platform info interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES -#define SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES 9 - -// update network devices info interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES -#define SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 9 - std::string srs_listener_type2string(SrsListenerType type) { switch (type) { @@ -729,6 +688,7 @@ SrsServer::SrsServer() http_heartbeat = new SrsHttpHeartbeat(); ingester = new SrsIngester(); trd_ = new SrsSTCoroutine("srs", this, _srs_context->get_id()); + timer_ = NULL; } SrsServer::~SrsServer() @@ -741,6 +701,7 @@ void SrsServer::destroy() srs_warn("start destroy server"); srs_freep(trd_); + srs_freep(timer_); dispose(); @@ -1108,10 +1069,18 @@ srs_error_t SrsServer::start() { srs_error_t err = srs_success; + if ((err = _srs_sources->initialize()) != srs_success) { + return srs_error_wrap(err, "sources"); + } + if ((err = trd_->start()) != srs_success) { return srs_error_wrap(err, "start"); } + if ((err = setup_ticks()) != srs_success) { + return srs_error_wrap(err, "tick"); + } + return err; } @@ -1165,7 +1134,6 @@ srs_error_t SrsServer::cycle() return err; } - void SrsServer::on_signal(int signo) { if (signo == SRS_SIGNAL_RELOAD) { @@ -1235,138 +1203,132 @@ srs_error_t SrsServer::do_cycle() { srs_error_t err = srs_success; - // find the max loop - int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES); - - max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES); - // for asprocess. bool asprocess = _srs_config->get_asprocess(); - - // the daemon thread, update the time cache - // TODO: FIXME: use SrsHourGlass. + while (true) { + if ((err = trd_->pull()) != srs_success) { + return srs_error_wrap(err, "pull"); + } + if (handler && (err = handler->on_cycle()) != srs_success) { return srs_error_wrap(err, "handle callback"); } - - // the interval in config. - int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL); - - // dynamic fetch the max. - int dynamic_max = srs_max(max, heartbeat_max_resolution); - - for (int i = 0; i < dynamic_max; i++) { - if ((err = trd_->pull()) != srs_success) { - return srs_error_wrap(err, "pull"); - } - - srs_usleep(SRS_SYS_CYCLE_INTERVAL); - - // asprocess check. - if (asprocess && ::getppid() != ppid) { - return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid()); - } - - // gracefully quit for SIGINT or SIGTERM or SIGQUIT. - if (signal_fast_quit || signal_gracefully_quit) { - srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit, signal_gracefully_quit); - return err; - } - // for gperf heap checker, - // @see: research/gperftools/heap-checker/heap_checker.cc - // if user interrupt the program, exit to check mem leak. - // but, if gperf, use reload to ensure main return normally, - // because directly exit will cause core-dump. + // asprocess check. + if (asprocess && ::getppid() != ppid) { + return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid()); + } + + // gracefully quit for SIGINT or SIGTERM or SIGQUIT. + if (signal_fast_quit || signal_gracefully_quit) { + srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit, signal_gracefully_quit); + return err; + } + + // for gperf heap checker, + // @see: research/gperftools/heap-checker/heap_checker.cc + // if user interrupt the program, exit to check mem leak. + // but, if gperf, use reload to ensure main return normally, + // because directly exit will cause core-dump. #ifdef SRS_GPERF_MC - if (signal_gmc_stop) { - srs_warn("gmc got singal to stop server."); - return err; - } + if (signal_gmc_stop) { + srs_warn("gmc got singal to stop server."); + return err; + } #endif - - // do persistence config to file. - if (signal_persistence_config) { - signal_persistence_config = false; - srs_info("get signal to persistence config to file."); - - if ((err = _srs_config->persistence()) != srs_success) { - return srs_error_wrap(err, "config persistence to file"); - } - srs_trace("persistence config to file success."); - } - - // do reload the config. - if (signal_reload) { - signal_reload = false; - srs_info("get signal to reload the config."); - - if ((err = _srs_config->reload()) != srs_success) { - return srs_error_wrap(err, "config reload"); - } - srs_trace("reload config success."); - } - - // notice the stream sources to cycle. - if ((err = _srs_sources->cycle()) != srs_success) { - return srs_error_wrap(err, "source cycle"); - } - - // update the cache time - if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) { - srs_info("update current time cache."); - srs_update_system_time(); - } - - if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) { - srs_info("update resource info, rss."); - srs_update_system_rusage(); - } - if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) { - srs_info("update cpu info, cpu usage."); - srs_update_proc_stat(); - } - if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) { - srs_info("update disk info, disk iops."); - srs_update_disk_stat(); - } - if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) { - srs_info("update memory info, usage/free."); - srs_update_meminfo(); - } - if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) { - srs_info("update platform info, uptime/load."); - srs_update_platform_info(); - } - if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) { - srs_info("update network devices info."); - srs_update_network_devices(); - } - if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) { - srs_info("update network server kbps info."); - resample_kbps(); + + // do persistence config to file. + if (signal_persistence_config) { + signal_persistence_config = false; + srs_info("get signal to persistence config to file."); + + if ((err = _srs_config->persistence()) != srs_success) { + return srs_error_wrap(err, "config persistence to file"); } - if (_srs_config->get_heartbeat_enabled()) { - if ((i % heartbeat_max_resolution) == 0) { - srs_info("do http heartbeat, for internal server to report."); - http_heartbeat->heartbeat(); - } + srs_trace("persistence config to file success."); + } + + // do reload the config. + if (signal_reload) { + signal_reload = false; + srs_info("get signal to reload the config."); + + if ((err = _srs_config->reload()) != srs_success) { + return srs_error_wrap(err, "config reload"); } - - srs_info("server main thread loop"); + srs_trace("reload config success."); } + + srs_usleep(1 * SRS_UTIME_SECONDS); } return err; } +srs_error_t SrsServer::setup_ticks() +{ + srs_error_t err = srs_success; + + srs_freep(timer_); + timer_ = new SrsHourGlass("srs", this, 1 * SRS_UTIME_SECONDS); + + if ((err = timer_->tick(1, 1 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(2, 3 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(3, 3 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(4, 6 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(5, 6 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(6, 9 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(7, 9 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if (_srs_config->get_heartbeat_enabled()) { + if ((err = timer_->tick(9, _srs_config->get_heartbeat_interval())) != srs_success) { + return srs_error_wrap(err, "tick"); + } + } + + if ((err = timer_->start()) != srs_success) { + return srs_error_wrap(err, "timer"); + } + + return err; +} + +srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + switch (event) { + case 1: srs_update_system_time(); break; + case 2: srs_update_system_rusage(); break; + case 3: srs_update_proc_stat(); break; + case 4: srs_update_disk_stat(); break; + case 5: srs_update_meminfo(); break; + case 6: srs_update_platform_info(); break; + case 7: srs_update_network_devices(); break; + case 8: resample_kbps(); break; + case 9: http_heartbeat->heartbeat(); break; + } + + return err; +} + srs_error_t SrsServer::listen_rtmp() { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index ea83417fc..e7f562ee4 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -38,6 +38,7 @@ #include #include #include +#include class SrsServer; class SrsHttpServeMux; @@ -262,6 +263,7 @@ public: // SRS RTMP server, initialize and listen, start connection service thread, destroy client. class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler , virtual public ISrsResourceManager, virtual public ISrsCoroutineHandler + , virtual public ISrsHourGlass { private: // TODO: FIXME: Extract an HttpApiServer. @@ -271,6 +273,7 @@ private: SrsIngester* ingester; SrsResourceManager* conn_manager; SrsCoroutine* trd_; + SrsHourGlass* timer_; private: // The pid file fd, lock the file write when server is running. // @remark the init.d script should cleanup the pid file, when stop service, @@ -342,6 +345,11 @@ private: // update the global static data, for instance, the current time, // the cpu/mem/network statistic. virtual srs_error_t do_cycle(); +// interface ISrsHourGlass +private: + virtual srs_error_t setup_ticks(); + virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); +private: // listen at specified protocol. virtual srs_error_t listen_rtmp(); virtual srs_error_t listen_http_api(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 3d317eb2d..82b2177a1 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1689,6 +1689,7 @@ SrsSourceManager* _srs_sources = new SrsSourceManager(); SrsSourceManager::SrsSourceManager() { lock = NULL; + timer_ = NULL; } SrsSourceManager::~SrsSourceManager() @@ -1696,6 +1697,11 @@ SrsSourceManager::~SrsSourceManager() srs_mutex_destroy(lock); } +srs_error_t SrsSourceManager::initialize() +{ + return setup_ticks(); +} + srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) { srs_error_t err = srs_success; @@ -1789,28 +1795,37 @@ void SrsSourceManager::dispose() return; } -srs_error_t SrsSourceManager::cycle() +srs_error_t SrsSourceManager::setup_ticks() { - SrsContextId cid = _srs_context->get_id(); - srs_error_t err = do_cycle(); - _srs_context->set_id(cid); - + srs_error_t err = srs_success; + + srs_freep(timer_); + timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS); + + if ((err = timer_->tick(1, 1 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + + if ((err = timer_->start()) != srs_success) { + return srs_error_wrap(err, "timer"); + } + return err; } -srs_error_t SrsSourceManager::do_cycle() +srs_error_t SrsSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick) { srs_error_t err = srs_success; - + std::map::iterator it; for (it = pool.begin(); it != pool.end();) { SrsSource* source = it->second; - + // Do cycle source to cleanup components, such as hls dispose. if ((err = source->cycle()) != srs_success) { return srs_error_wrap(err, "source=%s/%s cycle", source->source_id().c_str(), source->pre_source_id().c_str()); } - + // TODO: FIXME: support source cleanup. // @see https://github.com/ossrs/srs/issues/713 // @see https://github.com/ossrs/srs/issues/714 @@ -1825,7 +1840,7 @@ srs_error_t SrsSourceManager::do_cycle() _srs_context->set_id(cid); } srs_trace("cleanup die source, total=%d", (int)pool.size()); - + srs_freep(source); pool.erase(it++); } else { @@ -1835,7 +1850,7 @@ srs_error_t SrsSourceManager::do_cycle() ++it; #endif } - + return err; } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 4a58eb954..9f7740437 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -34,6 +34,7 @@ #include #include #include +#include class SrsFormat; class SrsRtmpFormat; @@ -450,15 +451,17 @@ public: }; // The source manager to create and refresh all stream sources. -class SrsSourceManager +class SrsSourceManager : public ISrsHourGlass { private: srs_mutex_t lock; std::map pool; + SrsHourGlass* timer_; public: SrsSourceManager(); virtual ~SrsSourceManager(); public: + virtual srs_error_t initialize(); // create source when fetch from cache failed. // @param r the client request. // @param h the event handler for source. @@ -471,9 +474,10 @@ private: public: // dispose and cycle all sources. virtual void dispose(); - virtual srs_error_t cycle(); +// interface ISrsHourGlass private: - virtual srs_error_t do_cycle(); + virtual srs_error_t setup_ticks(); + virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); public: // when system exit, destroy th`e sources, // For gmc to analysis mem leaks.