Timer: Extract shared FastTimer to use one timer for all connections

pull/2357/head
winlin 4 years ago committed by Winlin
parent 659e173e15
commit 8747dd6630

@ -27,11 +27,22 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_kbps.hpp>
SrsPps* _srs_pps_timer = new SrsPps();
extern SrsPps* _srs_pps_clock_15ms;
extern SrsPps* _srs_pps_clock_20ms;
extern SrsPps* _srs_pps_clock_25ms;
extern SrsPps* _srs_pps_clock_30ms;
extern SrsPps* _srs_pps_clock_35ms;
extern SrsPps* _srs_pps_clock_40ms;
extern SrsPps* _srs_pps_clock_80ms;
extern SrsPps* _srs_pps_clock_160ms;
extern SrsPps* _srs_pps_timer_s;
ISrsHourGlass::ISrsHourGlass()
{
}
@ -89,6 +100,14 @@ srs_error_t SrsHourGlass::tick(int event, srs_utime_t interval)
return err;
}
void SrsHourGlass::untick(int event)
{
map<int, srs_utime_t>::iterator it = ticks.find(event);
if (it != ticks.end()) {
ticks.erase(it);
}
}
srs_error_t SrsHourGlass::cycle()
{
srs_error_t err = srs_success;
@ -119,3 +138,128 @@ srs_error_t SrsHourGlass::cycle()
return err;
}
ISrsFastTimer::ISrsFastTimer()
{
}
ISrsFastTimer::~ISrsFastTimer()
{
}
SrsFastTimer::SrsFastTimer(std::string label, srs_utime_t resolution)
{
timer_ = new SrsHourGlass(label, this, resolution);
}
SrsFastTimer::~SrsFastTimer()
{
srs_freep(timer_);
}
srs_error_t SrsFastTimer::start()
{
srs_error_t err = srs_success;
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
return err;
}
void SrsFastTimer::subscribe(srs_utime_t interval, ISrsFastTimer* timer)
{
static int g_event = 0;
int event = g_event++;
// TODO: FIXME: Error leak. Change tick to void in future.
timer_->tick(event, interval);
handlers_[event] = timer;
}
void SrsFastTimer::unsubscribe(ISrsFastTimer* timer)
{
for (map<int, ISrsFastTimer*>::iterator it = handlers_.begin(); it != handlers_.end();) {
if (it->second != timer) {
++it;
continue;
}
handlers_.erase(it++);
int event = it->first;
timer_->untick(event);
}
}
srs_error_t SrsFastTimer::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
for (map<int, ISrsFastTimer*>::iterator it = handlers_.begin(); it != handlers_.end(); ++it) {
ISrsFastTimer* timer = it->second;
if (event != it->first) {
continue;
}
if ((err = timer->on_timer(interval, tick)) != srs_success) {
return srs_error_wrap(err, "tick for event=%d, interval=%dms, tick=%dms",
event, srsu2msi(interval), srsu2msi(tick));
}
break;
}
return err;
}
SrsClockWallMonitor::SrsClockWallMonitor()
{
}
SrsClockWallMonitor::~SrsClockWallMonitor()
{
}
srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
static srs_utime_t clock = 0;
srs_utime_t now = srs_update_system_time();
if (!clock) {
clock = now;
return err;
}
srs_utime_t elapsed = now - clock;
clock = now;
if (elapsed <= 15 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_15ms->sugar;
} else if (elapsed <= 21 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_20ms->sugar;
} else if (elapsed <= 25 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_25ms->sugar;
} else if (elapsed <= 30 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_30ms->sugar;
} else if (elapsed <= 35 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_35ms->sugar;
} else if (elapsed <= 40 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_40ms->sugar;
} else if (elapsed <= 80 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_80ms->sugar;
} else if (elapsed <= 160 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_160ms->sugar;
} else {
++_srs_pps_timer_s->sugar;
}
return err;
}

@ -68,7 +68,7 @@ public:
//
// // The hg will create a thread for timer.
// hg->start();
class SrsHourGlass : virtual public ISrsCoroutineHandler
class SrsHourGlass : public ISrsCoroutineHandler
{
private:
std::string label_;
@ -96,10 +96,55 @@ public:
// @param interval the interval in srs_utime_t of tick.
virtual srs_error_t tick(srs_utime_t interval);
virtual srs_error_t tick(int event, srs_utime_t interval);
// Remove the tick by event.
void untick(int event);
public:
// Cycle the hourglass, which will sleep resolution every time.
// and call handler when ticked.
virtual srs_error_t cycle();
};
// The handler for fast timer.
class ISrsFastTimer
{
public:
ISrsFastTimer();
virtual ~ISrsFastTimer();
public:
// Tick when timer is active.
virtual srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick) = 0;
};
// The fast timer, shared by objects, for high performance.
// For example, we should never start a timer for each connection or publisher or player,
// instead, we should start only one fast timer in server.
class SrsFastTimer : public ISrsHourGlass
{
private:
SrsHourGlass* timer_;
std::map<int, ISrsFastTimer*> handlers_;
public:
SrsFastTimer(std::string label, srs_utime_t resolution);
virtual ~SrsFastTimer();
public:
srs_error_t start();
public:
void subscribe(srs_utime_t interval, ISrsFastTimer* timer);
void unsubscribe(ISrsFastTimer* timer);
// Interface ISrsHourGlass
private:
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
};
// To monitor the system wall clock timer deviation.
class SrsClockWallMonitor : public ISrsFastTimer
{
public:
SrsClockWallMonitor();
virtual ~SrsClockWallMonitor();
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
};
#endif

@ -208,11 +208,18 @@ SrsServer* SrsServerAdapter::instance()
SrsHybridServer::SrsHybridServer()
{
// Note that the timer depends on other global variables,
// so we MUST never create it in constructor.
timer_ = NULL;
clock_monitor_ = new SrsClockWallMonitor();
}
SrsHybridServer::~SrsHybridServer()
{
srs_freep(clock_monitor_);
srs_freep(timer_);
vector<ISrsHybridServer*>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer* server = *it;
@ -235,10 +242,20 @@ srs_error_t SrsHybridServer::initialize()
return srs_error_wrap(err, "initialize st failed");
}
if ((err = setup_ticks()) != srs_success) {
return srs_error_wrap(err, "tick");
// Create global shared timer.
timer_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS);
// Start the timer first.
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
// The hybrid server start a timer, do routines of hybrid server.
timer_->subscribe(5 * SRS_UTIME_SECONDS, this);
// A monitor to check the clock wall deviation, per clock tick.
timer_->subscribe(20 * SRS_UTIME_MILLISECONDS, clock_monitor_);
vector<ISrsHybridServer*>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer* server = *it;
@ -289,67 +306,15 @@ SrsServerAdapter* SrsHybridServer::srs()
return NULL;
}
srs_error_t SrsHybridServer::setup_ticks()
SrsFastTimer* SrsHybridServer::timer()
{
srs_error_t err = srs_success;
// Start timer for system global works.
timer_ = new SrsHourGlass("hybrid", this, 20 * SRS_UTIME_MILLISECONDS);
if ((err = timer_->tick(1, 20 * SRS_UTIME_MILLISECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(2, 5 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "start");
return timer_;
}
return err;
}
srs_error_t SrsHybridServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsHybridServer::on_timer(srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
// Update system wall clock.
if (event == 1) {
static srs_utime_t clock = 0;
srs_utime_t now = srs_update_system_time();
if (!clock) {
clock = now;
return err;
}
srs_utime_t elapsed = now - clock;
clock = now;
if (elapsed <= 15 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_15ms->sugar;
} else if (elapsed <= 21 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_20ms->sugar;
} else if (elapsed <= 25 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_25ms->sugar;
} else if (elapsed <= 30 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_30ms->sugar;
} else if (elapsed <= 35 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_35ms->sugar;
} else if (elapsed <= 40 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_40ms->sugar;
} else if (elapsed <= 80 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_80ms->sugar;
} else if (elapsed <= 160 * SRS_UTIME_MILLISECONDS) {
++_srs_pps_clock_160ms->sugar;
} else {
++_srs_pps_timer_s->sugar;
}
return err;
}
// Show statistics for RTC server.
SrsProcSelfStat* u = srs_get_self_proc_stat();
// Resident Set Size: number of pages the process has in real memory.

@ -64,11 +64,12 @@ public:
};
// The hybrid server manager.
class SrsHybridServer : public ISrsHourGlass
class SrsHybridServer : public ISrsFastTimer
{
private:
std::vector<ISrsHybridServer*> servers;
SrsHourGlass* timer_;
SrsFastTimer* timer_;
SrsClockWallMonitor* clock_monitor_;
public:
SrsHybridServer();
virtual ~SrsHybridServer();
@ -80,10 +81,10 @@ public:
virtual void stop();
public:
virtual SrsServerAdapter* srs();
// interface ISrsHourGlass
SrsFastTimer* timer();
// interface ISrsFastTimer
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
};
extern SrsHybridServer* _srs_hybrid;

Loading…
Cancel
Save