Timer: Apply timer(HourGlass) to server and sources

pull/2199/head
winlin 4 years ago
parent 77cffd3e04
commit 7114682eec

@ -8282,7 +8282,7 @@ bool SrsConfig::get_heartbeat_enabled()
srs_utime_t SrsConfig::get_heartbeat_interval()
{
static srs_utime_t DEFAULT = (srs_utime_t)(9.9 * SRS_UTIME_SECONDS);
static srs_utime_t DEFAULT = (srs_utime_t)(10 * SRS_UTIME_SECONDS);
SrsConfDirective* conf = get_heartbeart();
if (!conf) {
@ -8294,7 +8294,7 @@ srs_utime_t SrsConfig::get_heartbeat_interval()
return DEFAULT;
}
return (srs_utime_t)(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}
string SrsConfig::get_heartbeat_url()

@ -59,7 +59,7 @@ public:
// It's a complex and high-performance timer.
//
// Usage:
// SrsHourGlass* hg = new SrsHourGlass(handler, 100 * SRS_UTIME_MILLISECONDS);
// SrsHourGlass* hg = new SrsHourGlass("nack", handler, 100 * SRS_UTIME_MILLISECONDS);
//
// hg->tick(1, 300 * SRS_UTIME_MILLISECONDS);
// hg->tick(2, 500 * SRS_UTIME_MILLISECONDS);

@ -56,47 +56,6 @@ using namespace std;
#include <srs_app_gb28181.hpp>
#include <srs_app_gb28181_sip.hpp>
// system interval in srs_utime_t,
// all resolution times should be times togother,
// for example, system-interval is x=1s(1000ms),
// then rusage can be 3*x, for instance, 3*1=3s,
// the meminfo canbe 6*x, for instance, 6*1=6s,
// for performance refine, @see: https://github.com/ossrs/srs/issues/194
// @remark, recomment to 1000ms.
#define SRS_SYS_CYCLE_INTERVAL (1000 * SRS_UTIME_MILLISECONDS)
// update time interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_TIME_RESOLUTION_MS_TIMES
#define SRS_SYS_TIME_RESOLUTION_MS_TIMES 1
// update rusage interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_RUSAGE_RESOLUTION_TIMES
#define SRS_SYS_RUSAGE_RESOLUTION_TIMES 3
// update network devices info interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES
#define SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES 3
// update rusage interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_CPU_STAT_RESOLUTION_TIMES
#define SRS_SYS_CPU_STAT_RESOLUTION_TIMES 3
// update the disk iops interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_DISK_STAT_RESOLUTION_TIMES
#define SRS_SYS_DISK_STAT_RESOLUTION_TIMES 6
// update rusage interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_MEMINFO_RESOLUTION_TIMES
#define SRS_SYS_MEMINFO_RESOLUTION_TIMES 6
// update platform info interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES
#define SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES 9
// update network devices info interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES
#define SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 9
std::string srs_listener_type2string(SrsListenerType type)
{
switch (type) {
@ -729,6 +688,7 @@ SrsServer::SrsServer()
http_heartbeat = new SrsHttpHeartbeat();
ingester = new SrsIngester();
trd_ = new SrsSTCoroutine("srs", this, _srs_context->get_id());
timer_ = NULL;
}
SrsServer::~SrsServer()
@ -741,6 +701,7 @@ void SrsServer::destroy()
srs_warn("start destroy server");
srs_freep(trd_);
srs_freep(timer_);
dispose();
@ -1108,10 +1069,18 @@ srs_error_t SrsServer::start()
{
srs_error_t err = srs_success;
if ((err = _srs_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "sources");
}
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}
if ((err = setup_ticks()) != srs_success) {
return srs_error_wrap(err, "tick");
}
return err;
}
@ -1165,7 +1134,6 @@ srs_error_t SrsServer::cycle()
return err;
}
void SrsServer::on_signal(int signo)
{
if (signo == SRS_SIGNAL_RELOAD) {
@ -1235,138 +1203,132 @@ srs_error_t SrsServer::do_cycle()
{
srs_error_t err = srs_success;
// find the max loop
int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);
// for asprocess.
bool asprocess = _srs_config->get_asprocess();
// the daemon thread, update the time cache
// TODO: FIXME: use SrsHourGlass.
while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
if (handler && (err = handler->on_cycle()) != srs_success) {
return srs_error_wrap(err, "handle callback");
}
// the interval in config.
int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL);
// dynamic fetch the max.
int dynamic_max = srs_max(max, heartbeat_max_resolution);
for (int i = 0; i < dynamic_max; i++) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
srs_usleep(SRS_SYS_CYCLE_INTERVAL);
// asprocess check.
if (asprocess && ::getppid() != ppid) {
return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid());
}
// gracefully quit for SIGINT or SIGTERM or SIGQUIT.
if (signal_fast_quit || signal_gracefully_quit) {
srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit, signal_gracefully_quit);
return err;
}
// for gperf heap checker,
// @see: research/gperftools/heap-checker/heap_checker.cc
// if user interrupt the program, exit to check mem leak.
// but, if gperf, use reload to ensure main return normally,
// because directly exit will cause core-dump.
// asprocess check.
if (asprocess && ::getppid() != ppid) {
return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid());
}
// gracefully quit for SIGINT or SIGTERM or SIGQUIT.
if (signal_fast_quit || signal_gracefully_quit) {
srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit, signal_gracefully_quit);
return err;
}
// for gperf heap checker,
// @see: research/gperftools/heap-checker/heap_checker.cc
// if user interrupt the program, exit to check mem leak.
// but, if gperf, use reload to ensure main return normally,
// because directly exit will cause core-dump.
#ifdef SRS_GPERF_MC
if (signal_gmc_stop) {
srs_warn("gmc got singal to stop server.");
return err;
}
if (signal_gmc_stop) {
srs_warn("gmc got singal to stop server.");
return err;
}
#endif
// do persistence config to file.
if (signal_persistence_config) {
signal_persistence_config = false;
srs_info("get signal to persistence config to file.");
if ((err = _srs_config->persistence()) != srs_success) {
return srs_error_wrap(err, "config persistence to file");
}
srs_trace("persistence config to file success.");
}
// do reload the config.
if (signal_reload) {
signal_reload = false;
srs_info("get signal to reload the config.");
if ((err = _srs_config->reload()) != srs_success) {
return srs_error_wrap(err, "config reload");
}
srs_trace("reload config success.");
}
// notice the stream sources to cycle.
if ((err = _srs_sources->cycle()) != srs_success) {
return srs_error_wrap(err, "source cycle");
}
// update the cache time
if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {
srs_info("update current time cache.");
srs_update_system_time();
}
if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {
srs_info("update resource info, rss.");
srs_update_system_rusage();
}
if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {
srs_info("update cpu info, cpu usage.");
srs_update_proc_stat();
}
if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {
srs_info("update disk info, disk iops.");
srs_update_disk_stat();
}
if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {
srs_info("update memory info, usage/free.");
srs_update_meminfo();
}
if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {
srs_info("update platform info, uptime/load.");
srs_update_platform_info();
}
if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {
srs_info("update network devices info.");
srs_update_network_devices();
}
if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
srs_info("update network server kbps info.");
resample_kbps();
// do persistence config to file.
if (signal_persistence_config) {
signal_persistence_config = false;
srs_info("get signal to persistence config to file.");
if ((err = _srs_config->persistence()) != srs_success) {
return srs_error_wrap(err, "config persistence to file");
}
if (_srs_config->get_heartbeat_enabled()) {
if ((i % heartbeat_max_resolution) == 0) {
srs_info("do http heartbeat, for internal server to report.");
http_heartbeat->heartbeat();
}
srs_trace("persistence config to file success.");
}
// do reload the config.
if (signal_reload) {
signal_reload = false;
srs_info("get signal to reload the config.");
if ((err = _srs_config->reload()) != srs_success) {
return srs_error_wrap(err, "config reload");
}
srs_info("server main thread loop");
srs_trace("reload config success.");
}
srs_usleep(1 * SRS_UTIME_SECONDS);
}
return err;
}
srs_error_t SrsServer::setup_ticks()
{
srs_error_t err = srs_success;
srs_freep(timer_);
timer_ = new SrsHourGlass("srs", this, 1 * SRS_UTIME_SECONDS);
if ((err = timer_->tick(1, 1 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(2, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(3, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(4, 6 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(5, 6 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(6, 9 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(7, 9 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if (_srs_config->get_heartbeat_enabled()) {
if ((err = timer_->tick(9, _srs_config->get_heartbeat_interval())) != srs_success) {
return srs_error_wrap(err, "tick");
}
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}
return err;
}
srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
switch (event) {
case 1: srs_update_system_time(); break;
case 2: srs_update_system_rusage(); break;
case 3: srs_update_proc_stat(); break;
case 4: srs_update_disk_stat(); break;
case 5: srs_update_meminfo(); break;
case 6: srs_update_platform_info(); break;
case 7: srs_update_network_devices(); break;
case 8: resample_kbps(); break;
case 9: http_heartbeat->heartbeat(); break;
}
return err;
}
srs_error_t SrsServer::listen_rtmp()
{
srs_error_t err = srs_success;

@ -38,6 +38,7 @@
#include <srs_service_st.hpp>
#include <srs_app_gb28181.hpp>
#include <srs_app_gb28181_sip.hpp>
#include <srs_app_hourglass.hpp>
class SrsServer;
class SrsHttpServeMux;
@ -262,6 +263,7 @@ public:
// SRS RTMP server, initialize and listen, start connection service thread, destroy client.
class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler
, virtual public ISrsResourceManager, virtual public ISrsCoroutineHandler
, virtual public ISrsHourGlass
{
private:
// TODO: FIXME: Extract an HttpApiServer.
@ -271,6 +273,7 @@ private:
SrsIngester* ingester;
SrsResourceManager* conn_manager;
SrsCoroutine* trd_;
SrsHourGlass* timer_;
private:
// The pid file fd, lock the file write when server is running.
// @remark the init.d script should cleanup the pid file, when stop service,
@ -342,6 +345,11 @@ private:
// update the global static data, for instance, the current time,
// the cpu/mem/network statistic.
virtual srs_error_t do_cycle();
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
private:
// listen at specified protocol.
virtual srs_error_t listen_rtmp();
virtual srs_error_t listen_http_api();

@ -1689,6 +1689,7 @@ SrsSourceManager* _srs_sources = new SrsSourceManager();
SrsSourceManager::SrsSourceManager()
{
lock = NULL;
timer_ = NULL;
}
SrsSourceManager::~SrsSourceManager()
@ -1696,6 +1697,11 @@ SrsSourceManager::~SrsSourceManager()
srs_mutex_destroy(lock);
}
srs_error_t SrsSourceManager::initialize()
{
return setup_ticks();
}
srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
srs_error_t err = srs_success;
@ -1789,28 +1795,37 @@ void SrsSourceManager::dispose()
return;
}
srs_error_t SrsSourceManager::cycle()
srs_error_t SrsSourceManager::setup_ticks()
{
SrsContextId cid = _srs_context->get_id();
srs_error_t err = do_cycle();
_srs_context->set_id(cid);
srs_error_t err = srs_success;
srs_freep(timer_);
timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS);
if ((err = timer_->tick(1, 1 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}
return err;
}
srs_error_t SrsSourceManager::do_cycle()
srs_error_t SrsSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSource* source = it->second;
// Do cycle source to cleanup components, such as hls dispose.
if ((err = source->cycle()) != srs_success) {
return srs_error_wrap(err, "source=%s/%s cycle", source->source_id().c_str(), source->pre_source_id().c_str());
}
// TODO: FIXME: support source cleanup.
// @see https://github.com/ossrs/srs/issues/713
// @see https://github.com/ossrs/srs/issues/714
@ -1825,7 +1840,7 @@ srs_error_t SrsSourceManager::do_cycle()
_srs_context->set_id(cid);
}
srs_trace("cleanup die source, total=%d", (int)pool.size());
srs_freep(source);
pool.erase(it++);
} else {
@ -1835,7 +1850,7 @@ srs_error_t SrsSourceManager::do_cycle()
++it;
#endif
}
return err;
}

@ -34,6 +34,7 @@
#include <srs_app_reload.hpp>
#include <srs_core_performance.hpp>
#include <srs_service_st.hpp>
#include <srs_app_hourglass.hpp>
class SrsFormat;
class SrsRtmpFormat;
@ -450,15 +451,17 @@ public:
};
// The source manager to create and refresh all stream sources.
class SrsSourceManager
class SrsSourceManager : public ISrsHourGlass
{
private:
srs_mutex_t lock;
std::map<std::string, SrsSource*> pool;
SrsHourGlass* timer_;
public:
SrsSourceManager();
virtual ~SrsSourceManager();
public:
virtual srs_error_t initialize();
// create source when fetch from cache failed.
// @param r the client request.
// @param h the event handler for source.
@ -471,9 +474,10 @@ private:
public:
// dispose and cycle all sources.
virtual void dispose();
virtual srs_error_t cycle();
// interface ISrsHourGlass
private:
virtual srs_error_t do_cycle();
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
public:
// when system exit, destroy th`e sources,
// For gmc to analysis mem leaks.

Loading…
Cancel
Save