From 5836ffd2d4ed965a50f37fa4d8e15b8ae7b4270c Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 15 Dec 2013 19:24:29 +0800 Subject: [PATCH] refine the forwarder, use srs message queue --- trunk/src/core/srs_core_forward.cpp | 81 +++++++++++++++-------------- trunk/src/core/srs_core_forward.hpp | 4 +- trunk/src/core/srs_core_source.hpp | 6 ++- 3 files changed, 48 insertions(+), 43 deletions(-) diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp index bb2de325e..e4ae5c130 100644 --- a/trunk/src/core/srs_core_forward.cpp +++ b/trunk/src/core/srs_core_forward.cpp @@ -52,22 +52,16 @@ SrsForwarder::SrsForwarder(SrsSource* _source) pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_MS); queue = new SrsMessageQueue(); + jitter = new SrsRtmpJitter(); } SrsForwarder::~SrsForwarder() { on_unpublish(); - // TODO: FIXME: remove it. - std::vector::iterator it; - for (it = msgs.begin(); it != msgs.end(); ++it) { - SrsSharedPtrMessage* msg = *it; - srs_freep(msg); - } - msgs.clear(); - srs_freep(pthread); srs_freep(queue); + srs_freep(jitter); } void SrsForwarder::set_queue_size(double queue_size) @@ -143,7 +137,14 @@ int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) { int ret = ERROR_SUCCESS; - msgs.push_back(metadata); + if ((ret = jitter->correct(metadata, 0, 0)) != ERROR_SUCCESS) { + srs_freep(msg); + return ret; + } + + if ((ret = queue->enqueue(metadata)) != ERROR_SUCCESS) { + return ret; + } return ret; } @@ -152,8 +153,14 @@ int SrsForwarder::on_audio(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; - // TODO: FIXME: must drop the msgs when server failed. - msgs.push_back(msg); + if ((ret = jitter->correct(msg, 0, 0)) != ERROR_SUCCESS) { + srs_freep(msg); + return ret; + } + + if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) { + return ret; + } return ret; } @@ -162,8 +169,14 @@ int SrsForwarder::on_video(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; - // TODO: FIXME: must drop the msgs when server failed. - msgs.push_back(msg); + if ((ret = jitter->correct(msg, 0, 0)) != ERROR_SUCCESS) { + srs_freep(msg); + return ret; + } + + if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) { + return ret; + } return ret; } @@ -292,11 +305,20 @@ int SrsForwarder::forward() } } + // forward all messages. + int count = 0; + SrsSharedPtrMessage** msgs = NULL; + if ((ret = queue->get_packets(0, &msgs, count)) != ERROR_SUCCESS) { + srs_error("get message to forward failed. ret=%d", ret); + return ret; + } + // ignore when no messages. - int count = (int)msgs.size(); - if (msgs.empty()) { + if (count <= 0) { + srs_verbose("no packets to forward."); continue; } + SrsAutoFree(SrsSharedPtrMessage*, msgs, true); // pithy print pithy_print.elapse(SRS_PULSE_TIMEOUT_MS); @@ -306,36 +328,17 @@ int SrsForwarder::forward() } // all msgs to forward. - int i = 0; - for (i = 0; i < count; i++) { + for (int i = 0; i < count; i++) { SrsSharedPtrMessage* msg = msgs[i]; - msgs[i] = NULL; - - // we erased the sendout messages, the msg must not be NULL. + srs_assert(msg); + msgs[i] = NULL; - ret = client->send_message(msg); - if (ret != ERROR_SUCCESS) { + if ((ret = client->send_message(msg)) != ERROR_SUCCESS) { srs_error("forwarder send message to server failed. ret=%d", ret); - - // convert the index to count when error. - i++; - - break; + return ret; } } - - // clear sendout mesages. - if (i < count) { - srs_warn("clear forwarded msg, total=%d, forwarded=%d, ret=%d", count, i, ret); - } else { - srs_info("clear forwarded msg, total=%d, forwarded=%d, ret=%d", count, i, ret); - } - msgs.erase(msgs.begin(), msgs.begin() + i); - - if (ret != ERROR_SUCCESS) { - break; - } } return ret; diff --git a/trunk/src/core/srs_core_forward.hpp b/trunk/src/core/srs_core_forward.hpp index 37cb71269..a2e832cb5 100644 --- a/trunk/src/core/srs_core_forward.hpp +++ b/trunk/src/core/srs_core_forward.hpp @@ -30,13 +30,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#include #include class SrsSharedPtrMessage; class SrsOnMetaDataPacket; class SrsMessageQueue; +class SrsRtmpJitter; class SrsRtmpClient; class SrsRequest; class SrsSource; @@ -59,8 +59,8 @@ private: private: SrsSource* source; SrsRtmpClient* client; + SrsRtmpJitter* jitter; SrsMessageQueue* queue; - std::vector msgs; public: SrsForwarder(SrsSource* _source); virtual ~SrsForwarder(); diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index a4567b2b9..298b1e65f 100644 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -101,7 +101,10 @@ public: */ virtual int enqueue(SrsSharedPtrMessage* msg); /** - * get messages from the queue. + * get packets in consumer queue. + * @pmsgs SrsMessages*[], output the prt array. + * @count the count in array. + * @max_count the max count to dequeue, 0 to dequeue all. */ virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); private: @@ -122,7 +125,6 @@ private: SrsRtmpJitter* jitter; SrsSource* source; SrsMessageQueue* queue; - std::vector msgs; bool paused; public: SrsConsumer(SrsSource* _source);