From b4093bfbe4093d9abd7e8c9f4feeaaca21d92f00 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 15 Dec 2013 18:25:55 +0800 Subject: [PATCH] support set live queue length --- README.md | 1 + trunk/conf/srs.conf | 9 +- trunk/src/core/srs_core_config.cpp | 28 ++++ trunk/src/core/srs_core_config.hpp | 5 + trunk/src/core/srs_core_forward.cpp | 8 + trunk/src/core/srs_core_forward.hpp | 4 + trunk/src/core/srs_core_reload.cpp | 5 + trunk/src/core/srs_core_reload.hpp | 1 + trunk/src/core/srs_core_source.cpp | 232 +++++++++++++++++++--------- trunk/src/core/srs_core_source.hpp | 50 +++++- 10 files changed, 262 insertions(+), 81 deletions(-) diff --git a/README.md b/README.md index 67e7b3c9b..a62d389e3 100755 --- a/README.md +++ b/README.md @@ -212,6 +212,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw * nginx v1.5.0: 139524 lines
### History +* v0.9, 2013-12-15, drop the old whole gop when live message queue full. * v0.9, 2013-12-15, fix the forwarder reconnect bug, feed it the sequence header. * v0.9, 2013-12-15, support reload the hls/forwarder/transcoder. * v0.9, 2013-12-14, refine the thread model for the retry threads. diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index dc94b75ef..2e1826be2 100755 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -21,6 +21,7 @@ max_connections 2000; vhost __defaultVhost__ { enabled on; gop_cache on; + queue_length 30; forward 127.0.0.1:19350; hls { enabled on; @@ -92,7 +93,8 @@ vhost __defaultVhost__ { vhost dev { enabled on; gop_cache on; - forward 127.0.0.1:19350; + queue_length 30; + #forward 127.0.0.1:19350; hls { enabled off; hls_path ./objs/nginx/html; @@ -685,6 +687,11 @@ vhost min.delay.com { # set to on if requires client fast startup. # default: on gop_cache off; + # the max live queue length in seconds. + # if the messages in the queue exceed the max length, + # drop the old whole gop. + # default: 30 + queue_length 10; } # the vhost for antisuck. vhost refer.anti_suck.com { diff --git a/trunk/src/core/srs_core_config.cpp b/trunk/src/core/srs_core_config.cpp index 904dc0fed..fea6d9dd2 100644 --- a/trunk/src/core/srs_core_config.cpp +++ b/trunk/src/core/srs_core_config.cpp @@ -548,6 +548,17 @@ int SrsConfig::reload() } srs_trace("vhost %s reload gop_cache success.", vhost.c_str()); } + // queue_length + if (!srs_directive_equals(new_vhost->get("queue_length"), old_vhost->get("queue_length"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_queue_length(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes queue_length failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload queue_length success.", vhost.c_str()); + } // forward if (!srs_directive_equals(new_vhost->get("forward"), old_vhost->get("forward"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { @@ -1275,6 +1286,7 @@ bool SrsConfig::get_gop_cache(string vhost) return true; } + conf = conf->get("gop_cache"); if (conf && conf->arg0() == "off") { return false; } @@ -1282,6 +1294,22 @@ bool SrsConfig::get_gop_cache(string vhost) return true; } +double SrsConfig::get_queue_length(string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return SRS_CONF_DEFAULT_QUEUE_LENGTH; + } + + conf = conf->get("queue_length"); + if (conf || conf->arg0().empty()) { + return SRS_CONF_DEFAULT_QUEUE_LENGTH; + } + + return ::atoi(conf->arg0().c_str()); +} + SrsConfDirective* SrsConfig::get_forward(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/core/srs_core_config.hpp b/trunk/src/core/srs_core_config.hpp index d173d6ec2..b7ccf8d89 100644 --- a/trunk/src/core/srs_core_config.hpp +++ b/trunk/src/core/srs_core_config.hpp @@ -48,6 +48,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONF_DEFAULT_AAC_SYNC 100 // in ms, for HLS aac flush the audio #define SRS_CONF_DEFAULT_AAC_DELAY 300 +// in seconds, the live queue length. +#define SRS_CONF_DEFAULT_QUEUE_LENGTH 30 +// in seconds, the paused queue length. +#define SRS_CONF_DEFAULT_PAUSED_LENGTH 10 #define SRS_CONF_DEFAULT_CHUNK_SIZE 4096 @@ -145,6 +149,7 @@ public: virtual std::string get_log_dir(); virtual int get_max_connections(); virtual bool get_gop_cache(std::string vhost); + virtual double get_queue_length(std::string vhost); virtual SrsConfDirective* get_forward(std::string vhost); private: virtual SrsConfDirective* get_hls(std::string vhost); diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp index c6bca4fbb..bb2de325e 100644 --- a/trunk/src/core/srs_core_forward.cpp +++ b/trunk/src/core/srs_core_forward.cpp @@ -51,12 +51,14 @@ SrsForwarder::SrsForwarder(SrsSource* _source) stream_id = 0; pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_MS); + queue = new SrsMessageQueue(); } SrsForwarder::~SrsForwarder() { on_unpublish(); + // TODO: FIXME: remove it. std::vector::iterator it; for (it = msgs.begin(); it != msgs.end(); ++it) { SrsSharedPtrMessage* msg = *it; @@ -65,6 +67,12 @@ SrsForwarder::~SrsForwarder() msgs.clear(); srs_freep(pthread); + srs_freep(queue); +} + +void SrsForwarder::set_queue_size(double queue_size) +{ + queue->set_queue_size(queue_size); } int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) diff --git a/trunk/src/core/srs_core_forward.hpp b/trunk/src/core/srs_core_forward.hpp index d2d38242e..37cb71269 100644 --- a/trunk/src/core/srs_core_forward.hpp +++ b/trunk/src/core/srs_core_forward.hpp @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsSharedPtrMessage; class SrsOnMetaDataPacket; +class SrsMessageQueue; class SrsRtmpClient; class SrsRequest; class SrsSource; @@ -58,10 +59,13 @@ private: private: SrsSource* source; SrsRtmpClient* client; + SrsMessageQueue* queue; std::vector msgs; public: SrsForwarder(SrsSource* _source); virtual ~SrsForwarder(); +public: + virtual void set_queue_size(double queue_size); public: virtual int on_publish(SrsRequest* req, std::string forward_server); virtual void on_unpublish(); diff --git a/trunk/src/core/srs_core_reload.cpp b/trunk/src/core/srs_core_reload.cpp index 71cd5a3c1..f6feee15e 100644 --- a/trunk/src/core/srs_core_reload.cpp +++ b/trunk/src/core/srs_core_reload.cpp @@ -55,6 +55,11 @@ int ISrsReloadHandler::on_reload_gop_cache(string /*vhost*/) return ERROR_SUCCESS; } +int ISrsReloadHandler::on_reload_queue_length(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + int ISrsReloadHandler::on_reload_forward(string /*vhost*/) { return ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_reload.hpp b/trunk/src/core/srs_core_reload.hpp index a819bd7dd..3d8f3e8b9 100644 --- a/trunk/src/core/srs_core_reload.hpp +++ b/trunk/src/core/srs_core_reload.hpp @@ -44,6 +44,7 @@ public: virtual int on_reload_pithy_print(); virtual int on_reload_vhost_removed(std::string vhost); virtual int on_reload_gop_cache(std::string vhost); + virtual int on_reload_queue_length(std::string vhost); virtual int on_reload_forward(std::string vhost); virtual int on_reload_hls(std::string vhost); virtual int on_reload_transcode(std::string vhost); diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index fab9361bc..332f164f0 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -39,7 +39,6 @@ using namespace std; #define CONST_MAX_JITTER_MS 500 #define DEFAULT_FRAME_TIME_MS 10 -#define PAUSED_SHRINK_SIZE 250 SrsRtmpJitter::SrsRtmpJitter() { @@ -50,9 +49,21 @@ SrsRtmpJitter::~SrsRtmpJitter() { } +// TODO: FIXME: remove the 64bits time, change the timestamp in heaer to 64bits. int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time) { int ret = ERROR_SUCCESS; + + // set to 0 for metadata. + if (!msg->header.is_video() && !msg->header.is_audio()) { + if (corrected_time) { + *corrected_time = 0; + } + + msg->header.timestamp = 0; + + return ret; + } int sample_rate = tba; int frame_rate = tbv; @@ -110,55 +121,50 @@ int SrsRtmpJitter::get_time() return (int)last_pkt_correct_time; } -SrsConsumer::SrsConsumer(SrsSource* _source) +SrsMessageQueue::SrsMessageQueue() { - source = _source; - paused = false; - jitter = new SrsRtmpJitter(); + queue_size_ms = 0; + av_start_time = av_end_time = -1; } -SrsConsumer::~SrsConsumer() +SrsMessageQueue::~SrsMessageQueue() { clear(); - - source->on_consumer_destroy(this); - srs_freep(jitter); } -int SrsConsumer::get_time() +void SrsMessageQueue::set_queue_size(double queue_size) { - return jitter->get_time(); + queue_size_ms = (int)(queue_size * 1000); } -int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) +int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; - if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) { - srs_freep(msg); - return ret; + if (msg->header.is_video() || msg->header.is_audio()) { + if (av_start_time == -1) { + av_start_time = msg->header.timestamp; + } + + av_end_time = msg->header.timestamp; } - // TODO: check the queue size and drop packets if overflow. msgs.push_back(msg); + + while (av_end_time - av_start_time > queue_size_ms) { + shrink(); + } return ret; } -int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) +int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) { int ret = ERROR_SUCCESS; if (msgs.empty()) { return ret; } - - if (paused) { - if ((int)msgs.size() >= PAUSED_SHRINK_SIZE) { - shrink(); - } - return ret; - } if (max_count == 0) { count = (int)msgs.size(); @@ -181,76 +187,120 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c return ret; } -int SrsConsumer::on_play_client_pause(bool is_pause) -{ - int ret = ERROR_SUCCESS; - - srs_trace("stream consumer change pause state %d=>%d", paused, is_pause); - paused = is_pause; - - return ret; -} - -void SrsConsumer::shrink() +void SrsMessageQueue::shrink() { - int i = 0; - std::vector::iterator it; + int iframe_index = -1; - // issue the last video iframe. - bool has_video = false; - int frame_to_remove = 0; - std::vector::iterator iframe = msgs.end(); - for (i = 0, it = msgs.begin(); it != msgs.end(); ++it, i++) { - SrsSharedPtrMessage* msg = *it; + // issue the first iframe. + // skip the first frame, whatever the type of it, + // for when we shrinked, the first is the iframe, + // we will directly remove the gop next time. + for (int i = 1; i < (int)msgs.size(); i++) { + SrsSharedPtrMessage* msg = msgs[i]; + if (msg->header.is_video()) { - has_video = true; if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) { - iframe = it; - frame_to_remove = i + 1; + // the max frame index to remove. + iframe_index = i; + + // set the start time, we will remove until this frame. + av_start_time = msg->header.timestamp; + + break; } } } - // last iframe is the first elem, ignore it. - if (iframe == msgs.begin()) { - return; - } - - // recalc the frame to remove - if (iframe == msgs.end()) { - frame_to_remove = 0; - } - if (!has_video) { - frame_to_remove = (int)msgs.size(); - } - - srs_trace("shrink the cache queue, has_video=%d, has_iframe=%d, size=%d, removed=%d", - has_video, iframe != msgs.end(), (int)msgs.size(), frame_to_remove); - - // if no video, remove all audio. - if (!has_video) { + // no iframe, clear the queue. + if (iframe_index < 0) { clear(); return; } - // if exists video Iframe, remove the frames before it. - if (iframe != msgs.end()) { - for (it = msgs.begin(); it != iframe; ++it) { - SrsSharedPtrMessage* msg = *it; - srs_freep(msg); - } - msgs.erase(msgs.begin(), iframe); + // remove the first gop from the front + for (int i = 0; i < iframe_index; i++) { + SrsSharedPtrMessage* msg = msgs[i]; + srs_freep(msg); } + msgs.erase(msgs.begin(), msgs.begin() + iframe_index); + + srs_trace("shrink the cache queue, " + "size=%d, removed=%d", (int)msgs.size(), iframe_index); } -void SrsConsumer::clear() +void SrsMessageQueue::clear() { std::vector::iterator it; + for (it = msgs.begin(); it != msgs.end(); ++it) { SrsSharedPtrMessage* msg = *it; srs_freep(msg); } + msgs.clear(); + + av_start_time = av_end_time = -1; +} + +SrsConsumer::SrsConsumer(SrsSource* _source) +{ + source = _source; + paused = false; + jitter = new SrsRtmpJitter(); + queue = new SrsMessageQueue(); +} + +SrsConsumer::~SrsConsumer() +{ + source->on_consumer_destroy(this); + srs_freep(jitter); + srs_freep(queue); +} + +void SrsConsumer::set_queue_size(double queue_size) +{ + queue->set_queue_size(queue_size); +} + +int SrsConsumer::get_time() +{ + return jitter->get_time(); +} + +int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) +{ + int ret = ERROR_SUCCESS; + + if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) { + srs_freep(msg); + return ret; + } + + if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) +{ + // paused, return nothing. + if (paused) { + return ERROR_SUCCESS; + } + + return queue->get_packets(max_count, pmsgs, count); +} + +int SrsConsumer::on_play_client_pause(bool is_pause) +{ + int ret = ERROR_SUCCESS; + + srs_trace("stream consumer change pause state %d=>%d", paused, is_pause); + paused = is_pause; + + return ret; } SrsGopCache::SrsGopCache() @@ -436,6 +486,41 @@ int SrsSource::on_reload_gop_cache(string vhost) return ret; } +int SrsSource::on_reload_queue_length(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + double queue_size = config->get_queue_length(req->vhost); + + if (true) { + std::vector::iterator it; + + for (it = consumers.begin(); it != consumers.end(); ++it) { + SrsConsumer* consumer = *it; + consumer->set_queue_size(queue_size); + } + + srs_trace("consumers reload queue size success."); + } + + if (true) { + std::vector::iterator it; + + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + forwarder->set_queue_size(queue_size); + } + + srs_trace("forwarders reload queue size success."); + } + + return ret; +} + int SrsSource::on_reload_forward(string vhost) { int ret = ERROR_SUCCESS; @@ -735,7 +820,7 @@ int SrsSource::on_video(SrsCommonMessage* video) // cache the last gop packets if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { - srs_error("shrink gop cache failed. ret=%d", ret); + srs_error("gop cache msg failed. ret=%d", ret); return ret; } srs_verbose("cache gop success."); @@ -809,6 +894,7 @@ void SrsSource::on_unpublish() int ret = ERROR_SUCCESS; consumer = new SrsConsumer(this); + consumer->set_queue_size(config->get_queue_length(req->vhost)); consumers.push_back(consumer); if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index 897918c7e..a4567b2b9 100644 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -74,6 +74,45 @@ public: virtual int get_time(); }; +/** +* the message queue for the consumer(client), forwarder. +* we limit the size in seconds, drop old messages(the whole gop) if full. +*/ +class SrsMessageQueue +{ +private: + int64_t av_start_time; + int64_t av_end_time; + int queue_size_ms; + std::vector msgs; +public: + SrsMessageQueue(); + virtual ~SrsMessageQueue(); +public: + /** + * set the queue size + * @param queue_size the queue size in seconds. + */ + virtual void set_queue_size(double queue_size); +public: + /** + * enqueue the message, the timestamp always monotonically. + * @param msg, the msg to enqueue, user never free it whatever the return code. + */ + virtual int enqueue(SrsSharedPtrMessage* msg); + /** + * get messages from the queue. + */ + virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); +private: + /** + * remove a gop from the front. + * if no iframe found, clear it. + */ + virtual void shrink(); + virtual void clear(); +}; + /** * the consumer for SrsSource, that is a play client. */ @@ -82,11 +121,14 @@ class SrsConsumer private: SrsRtmpJitter* jitter; SrsSource* source; + SrsMessageQueue* queue; std::vector msgs; bool paused; public: SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); +public: + virtual void set_queue_size(double queue_size); public: /** * get current client time, the last packet time. @@ -111,13 +153,6 @@ public: * when client send the pause message. */ virtual int on_play_client_pause(bool is_pause); -private: - /** - * when paused, shrink the cache queue, - * remove to cache only one gop. - */ - virtual void shrink(); - virtual void clear(); }; /** @@ -218,6 +253,7 @@ public: // interface ISrsReloadHandler public: virtual int on_reload_gop_cache(std::string vhost); + virtual int on_reload_queue_length(std::string vhost); virtual int on_reload_forward(std::string vhost); virtual int on_reload_hls(std::string vhost); virtual int on_reload_transcode(std::string vhost);