Refine get_queue_length and set_queue_size in time unit

pull/1651/head
winlin 6 years ago
parent e3983b3513
commit c1b64ba24f

@ -4459,9 +4459,9 @@ bool SrsConfig::get_mix_correct(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0()); return SRS_CONF_PERFER_FALSE(conf->arg0());
} }
double SrsConfig::get_queue_length(string vhost) srs_utime_t SrsConfig::get_queue_length(string vhost)
{ {
static double DEFAULT = SRS_PERF_PLAY_QUEUE; static srs_utime_t DEFAULT = SRS_PERF_PLAY_QUEUE;
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);
if (!conf) { if (!conf) {
@ -4478,7 +4478,7 @@ double SrsConfig::get_queue_length(string vhost)
return DEFAULT; return DEFAULT;
} }
return ::atoi(conf->arg0().c_str()); return srs_utime_t(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
} }
bool SrsConfig::get_refer_enabled(string vhost) bool SrsConfig::get_refer_enabled(string vhost)

@ -729,11 +729,11 @@ public:
*/ */
virtual bool get_mix_correct(std::string vhost); virtual bool get_mix_correct(std::string vhost);
/** /**
* get the cache queue length, in seconds. * get the cache queue length, in srs_utime_t.
* when exceed the queue length, drop packet util I frame. * when exceed the queue length, drop packet util I frame.
* @remark, default 10. * @remark, default 10s.
*/ */
virtual double get_queue_length(std::string vhost); virtual srs_utime_t get_queue_length(std::string vhost);
/** /**
* whether the refer hotlink-denial enabled. * whether the refer hotlink-denial enabled.
*/ */

@ -433,7 +433,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder()
srs_freep(queue); srs_freep(queue);
} }
void SrsEdgeForwarder::set_queue_size(double queue_size) void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size)
{ {
return queue->set_queue_size(queue_size); return queue->set_queue_size(queue_size);
} }
@ -710,7 +710,7 @@ SrsPublishEdge::~SrsPublishEdge()
srs_freep(forwarder); srs_freep(forwarder);
} }
void SrsPublishEdge::set_queue_size(double queue_size) void SrsPublishEdge::set_queue_size(srs_utime_t queue_size)
{ {
return forwarder->set_queue_size(queue_size); return forwarder->set_queue_size(queue_size);
} }

@ -170,7 +170,7 @@ public:
SrsEdgeForwarder(); SrsEdgeForwarder();
virtual ~SrsEdgeForwarder(); virtual ~SrsEdgeForwarder();
public: public:
virtual void set_queue_size(double queue_size); virtual void set_queue_size(srs_utime_t queue_size);
public: public:
virtual srs_error_t initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r); virtual srs_error_t initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r);
virtual srs_error_t start(); virtual srs_error_t start();
@ -232,7 +232,7 @@ public:
SrsPublishEdge(); SrsPublishEdge();
virtual ~SrsPublishEdge(); virtual ~SrsPublishEdge();
public: public:
virtual void set_queue_size(double queue_size); virtual void set_queue_size(srs_utime_t queue_size);
public: public:
virtual srs_error_t initialize(SrsSource* source, SrsRequest* req); virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
virtual bool can_publish(); virtual bool can_publish();

@ -85,7 +85,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
return err; return err;
} }
void SrsForwarder::set_queue_size(double queue_size) void SrsForwarder::set_queue_size(srs_utime_t queue_size)
{ {
queue->set_queue_size(queue_size); queue->set_queue_size(queue_size);
} }

@ -71,7 +71,7 @@ public:
virtual ~SrsForwarder(); virtual ~SrsForwarder();
public: public:
virtual srs_error_t initialize(SrsRequest* r, std::string ep); virtual srs_error_t initialize(SrsRequest* r, std::string ep);
virtual void set_queue_size(double queue_size); virtual void set_queue_size(srs_utime_t queue_size);
public: public:
virtual srs_error_t on_publish(); virtual srs_error_t on_publish();
virtual void on_unpublish(); virtual void on_unpublish();

@ -64,7 +64,7 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r)
trd = new SrsSTCoroutine("http-stream", this); trd = new SrsSTCoroutine("http-stream", this);
// TODO: FIXME: support reload. // TODO: FIXME: support reload.
fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); fast_cache = srs_utime_t(_srs_config->get_vhost_http_remux_fast_cache(req->vhost) * SRS_UTIME_SECONDS);
} }
SrsBufferCache::~SrsBufferCache() SrsBufferCache::~SrsBufferCache()
@ -108,7 +108,8 @@ srs_error_t SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgor
return srs_error_wrap(err, "dump packets"); return srs_error_wrap(err, "dump packets");
} }
srs_trace("http: dump cache %d msgs, duration=%dms, cache=%.2fs", queue->size(), srsu2msi(queue->duration()), fast_cache); srs_trace("http: dump cache %d msgs, duration=%dms, cache=%dms",
queue->size(), srsu2msi(queue->duration()), srsu2msi(fast_cache));
return err; return err;
} }

@ -39,7 +39,7 @@ class SrsTsTransmuxer;
class SrsBufferCache : public ISrsCoroutineHandler class SrsBufferCache : public ISrsCoroutineHandler
{ {
private: private:
double fast_cache; srs_utime_t fast_cache;
private: private:
SrsMessageQueue* queue; SrsMessageQueue* queue;
SrsSource* source; SrsSource* source;

@ -260,9 +260,9 @@ srs_utime_t SrsMessageQueue::duration()
return (av_end_time - av_start_time); return (av_end_time - av_start_time);
} }
void SrsMessageQueue::set_queue_size(double queue_size) void SrsMessageQueue::set_queue_size(srs_utime_t queue_size)
{ {
max_queue_size = srs_utime_t(queue_size * SRS_UTIME_SECONDS); max_queue_size = queue_size;
} }
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
@ -442,7 +442,7 @@ SrsConsumer::~SrsConsumer()
#endif #endif
} }
void SrsConsumer::set_queue_size(double queue_size) void SrsConsumer::set_queue_size(srs_utime_t queue_size)
{ {
queue->set_queue_size(queue_size); queue->set_queue_size(queue_size);
} }
@ -1468,9 +1468,8 @@ srs_error_t SrsOriginHub::create_forwarders()
if ((err = forwarder->initialize(req, forward_server)) != srs_success) { if ((err = forwarder->initialize(req, forward_server)) != srs_success) {
return srs_error_wrap(err, "init forwarder"); return srs_error_wrap(err, "init forwarder");
} }
// TODO: FIXME: support queue size. srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
double queue_size = _srs_config->get_queue_length(req->vhost);
forwarder->set_queue_size(queue_size); forwarder->set_queue_size(queue_size);
if ((err = forwarder->on_publish()) != srs_success) { if ((err = forwarder->on_publish()) != srs_success) {
@ -1864,7 +1863,7 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
return srs_error_wrap(err, "edge(publish)"); return srs_error_wrap(err, "edge(publish)");
} }
double queue_size = _srs_config->get_queue_length(req->vhost); srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
publish_edge->set_queue_size(queue_size); publish_edge->set_queue_size(queue_size);
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost); jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost);
@ -1919,7 +1918,7 @@ srs_error_t SrsSource::on_reload_vhost_play(string vhost)
// queue length // queue length
if (true) { if (true) {
double v = _srs_config->get_queue_length(req->vhost); srs_utime_t v = _srs_config->get_queue_length(req->vhost);
if (true) { if (true) {
std::vector<SrsConsumer*>::iterator it; std::vector<SrsConsumer*>::iterator it;
@ -2443,7 +2442,7 @@ srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consum
consumer = new SrsConsumer(this, conn); consumer = new SrsConsumer(this, conn);
consumers.push_back(consumer); consumers.push_back(consumer);
double queue_size = _srs_config->get_queue_length(req->vhost); srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
consumer->set_queue_size(queue_size); consumer->set_queue_size(queue_size);
// if atc, update the sequence header to gop cache time. // if atc, update the sequence header to gop cache time.

@ -161,9 +161,9 @@ public:
virtual srs_utime_t duration(); virtual srs_utime_t duration();
/** /**
* set the queue size * set the queue size
* @param queue_size the queue size in seconds. * @param queue_size the queue size in srs_utime_t.
*/ */
virtual void set_queue_size(double queue_size); virtual void set_queue_size(srs_utime_t queue_size);
public: public:
/** /**
* enqueue the message, the timestamp always monotonically. * enqueue the message, the timestamp always monotonically.
@ -243,7 +243,7 @@ public:
/** /**
* set the size of queue. * set the size of queue.
*/ */
virtual void set_queue_size(double queue_size); virtual void set_queue_size(srs_utime_t queue_size);
/** /**
* when source id changed, notice client to print. * when source id changed, notice client to print.
*/ */

@ -148,8 +148,8 @@
*/ */
// whether gop cache is on. // whether gop cache is on.
#define SRS_PERF_GOP_CACHE true #define SRS_PERF_GOP_CACHE true
// in seconds, the live queue length. // in srs_utime_t, the live queue length.
#define SRS_PERF_PLAY_QUEUE 30 #define SRS_PERF_PLAY_QUEUE (30 * SRS_UTIME_SECONDS)
/** /**
* whether always use complex send algorithm. * whether always use complex send algorithm.

@ -306,7 +306,7 @@ void show_macro_features()
// gc(gop-cache) // gc(gop-cache)
ss << "gc:" << srs_bool2switch(SRS_PERF_GOP_CACHE); ss << "gc:" << srs_bool2switch(SRS_PERF_GOP_CACHE);
// pq(play-queue) // pq(play-queue)
ss << ", pq:" << SRS_PERF_PLAY_QUEUE << "s"; ss << ", pq:" << srsu2msi(SRS_PERF_PLAY_QUEUE) << "ms";
// cscc(chunk stream cache cid) // cscc(chunk stream cache cid)
ss << ", cscc:[0," << SRS_PERF_CHUNK_STREAM_CACHE << ")"; ss << ", cscc:[0," << SRS_PERF_CHUNK_STREAM_CACHE << ")";
// csa(complex send algorithm) // csa(complex send algorithm)
@ -342,7 +342,7 @@ void show_macro_features()
possible_mr_latency = srsu2msi(SRS_PERF_MR_SLEEP); possible_mr_latency = srsu2msi(SRS_PERF_MR_SLEEP);
#endif #endif
srs_trace("system default latency in ms: mw(0-%d) + mr(0-%d) + play-queue(0-%d)", srs_trace("system default latency in ms: mw(0-%d) + mr(0-%d) + play-queue(0-%d)",
srsu2msi(SRS_PERF_MW_SLEEP), possible_mr_latency, SRS_PERF_PLAY_QUEUE*1000); srsu2msi(SRS_PERF_MW_SLEEP), possible_mr_latency, srsu2msi(SRS_PERF_PLAY_QUEUE));
#ifdef SRS_AUTO_MEM_WATCH #ifdef SRS_AUTO_MEM_WATCH
#warning "srs memory watcher will hurts performance. user should kill by SIGTERM or init.d script." #warning "srs memory watcher will hurts performance. user should kill by SIGTERM or init.d script."

@ -1806,7 +1806,7 @@ VOID TEST(ConfigMainTest, CheckConf_vhost_ingest_id)
EXPECT_TRUE(ERROR_SUCCESS != conf.parse(_MIN_OK_CONF"vhost v{ingest{} ingest{}}")); EXPECT_TRUE(ERROR_SUCCESS != conf.parse(_MIN_OK_CONF"vhost v{ingest{} ingest{}}"));
} }
VOID TEST(ConfigUnitTest, CheckDefaultValues) VOID TEST(ConfigUnitTest, CheckDefaultValuesVhost)
{ {
MockSrsConfig conf; MockSrsConfig conf;
@ -1875,6 +1875,17 @@ VOID TEST(ConfigUnitTest, CheckDefaultValues)
EXPECT_EQ(10 * SRS_UTIME_SECONDS, conf.get_hls_dispose("v")); EXPECT_EQ(10 * SRS_UTIME_SECONDS, conf.get_hls_dispose("v"));
} }
if (true) {
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF));
EXPECT_EQ(30 * SRS_UTIME_SECONDS, conf.get_queue_length(""));
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{play{queue_length 100;}}"));
EXPECT_EQ(100 * SRS_UTIME_SECONDS, conf.get_queue_length("v"));
}
}
VOID TEST(ConfigUnitTest, CheckDefaultValuesGlobal)
{
if (true) { if (true) {
srs_utime_t t0 = srs_update_system_time(); srs_utime_t t0 = srs_update_system_time();
srs_usleep(10 * SRS_UTIME_MILLISECONDS); srs_usleep(10 * SRS_UTIME_MILLISECONDS);

Loading…
Cancel
Save