From e80c8603d4e611c7ca88ceb12aa91e1ac7f7fc20 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 5 Dec 2014 20:55:19 +0800 Subject: [PATCH] fix #251, revert changes, for the cond wait and fast cache queue is no use. 2.0.59 --- trunk/src/app/srs_app_rtmp_conn.cpp | 18 +-- trunk/src/app/srs_app_source.cpp | 165 +----------------------- trunk/src/app/srs_app_source.hpp | 36 +----- trunk/src/core/srs_core.hpp | 2 +- trunk/src/core/srs_core_performance.hpp | 23 +--- trunk/src/rtmp/srs_protocol_stack.cpp | 14 +- 6 files changed, 24 insertions(+), 234 deletions(-) diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 0122c35a1..263e1a3f2 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -598,29 +598,19 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) // collect elapse for pithy print. pithy_print.elapse(); -#ifdef SRS_PERF_QUEUE_COND_WAIT - // wait for message to incoming. - // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 - consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep); -#endif - // get messages from consumer. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. int count = 0; - if ((ret = consumer->dump_packets(&msgs, &count)) != ERROR_SUCCESS) { + if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { srs_error("get messages from consumer failed. ret=%d", ret); return ret; } - -#ifdef SRS_PERF_QUEUE_COND_WAIT - // we use wait to get messages, so the count must be positive. - srs_assert(count > 0); -#else + + // no messages, sleep for a while. if (count <= 0) { st_usleep(mw_sleep * 1000); } -#endif - srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep); + srs_info("got %d msgs, mw=%d", count, mw_sleep); // reportable if (pithy_print.can_print()) { diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 77a107fda..d2a3dcce0 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -306,38 +306,13 @@ SrsConsumer::SrsConsumer(SrsSource* _source) jitter = new SrsRtmpJitter(); queue = new SrsMessageQueue(); should_update_source_id = false; - -#ifdef SRS_PERF_QUEUE_COND_WAIT - mw_wait = st_cond_new(); - mw_min_msgs = 0; - mw_duration = 0; - mw_waiting = false; -#endif - -#ifdef SRS_PERF_QUEUE_FAST_CACHE - mw_cache = new SrsMessageArray(SRS_PERF_MW_MSGS); - mw_count = 0; - mw_first_pkt = mw_last_pkt = 0; -#endif } SrsConsumer::~SrsConsumer() { -#ifdef SRS_PERF_QUEUE_FAST_CACHE - if (mw_cache) { - mw_cache->free(mw_count); - mw_count = 0; - } - srs_freep(mw_cache); -#endif - source->on_consumer_destroy(this); srs_freep(jitter); srs_freep(queue); - -#ifdef SRS_PERF_QUEUE_COND_WAIT - st_cond_destroy(mw_wait); -#endif } void SrsConsumer::set_queue_size(double queue_size) @@ -365,72 +340,15 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S return ret; } } - -#ifdef SRS_PERF_QUEUE_FAST_CACHE - // use fast cache if available - if (mw_count < mw_cache->max) { - // update fast cache timestamps - if (mw_count == 0) { - mw_first_pkt = msg->header.timestamp; - } - mw_last_pkt = msg->header.timestamp; - - mw_cache->msgs[mw_count++] = msg; - } else{ - // fast cache is full, use queue. - bool is_overflow = false; - if ((ret = queue->enqueue(msg, &is_overflow)) != ERROR_SUCCESS) { - return ret; - } - // when overflow, clear cache and refresh the fast cache. - if (is_overflow) { - mw_cache->free(mw_count); - if ((ret = dumps_queue_to_fast_cache()) != ERROR_SUCCESS) { - return ret; - } - } - } - - #ifdef SRS_PERF_QUEUE_COND_WAIT - // fire the mw when msgs is enough. - if (mw_waiting) { - // when fast cache not overflow, always flush. - // so we donot care about the queue. - bool fast_cache_overflow = mw_count >= mw_cache->max; - int duration_ms = (int)(mw_last_pkt - mw_first_pkt); - bool match_min_msgs = mw_count > mw_min_msgs; - - // when fast cache overflow, or duration ok, signal to flush. - if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) { - st_cond_signal(mw_wait); - mw_waiting = false; - } - } - #endif -#else + if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) { return ret; } - #ifdef SRS_PERF_QUEUE_COND_WAIT - // fire the mw when msgs is enough. - if (mw_waiting) { - int duration_ms = queue->duration(); - bool match_min_msgs = queue->size() > mw_min_msgs; - - // when duration ok, signal to flush. - if (match_min_msgs && duration_ms > mw_duration) { - st_cond_signal(mw_wait); - mw_waiting = false; - } - } - #endif -#endif - return ret; } -int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count) +int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) { int ret =ERROR_SUCCESS; @@ -446,68 +364,13 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count) return ret; } -#ifdef SRS_PERF_QUEUE_FAST_CACHE - // only dumps an whole array to msgs. - for (int i = 0; i < mw_count; i++) { - msgs->msgs[i] = mw_cache->msgs[i]; - } - *count = mw_count; - - // when fast cache is not filled, - // we donot check the queue, direclty zero fast cache. - if (mw_count < mw_cache->max) { - mw_count = 0; - mw_first_pkt = mw_last_pkt = 0; - return ret; - } - - return dumps_queue_to_fast_cache(); -#else - // pump msgs from queue. - int nb_msgs = 0; - if ((ret = queue->dump_packets(msgs->max, msgs->msgs, nb_msgs)) != ERROR_SUCCESS) { + if ((ret = queue->dump_packets(msgs->max, msgs->msgs, count)) != ERROR_SUCCESS) { return ret; } - *count = nb_msgs; return ret; -#endif -} - -#ifdef SRS_PERF_QUEUE_COND_WAIT -void SrsConsumer::wait(int nb_msgs, int duration) -{ - mw_min_msgs = nb_msgs; - mw_duration = duration; - -#ifdef SRS_PERF_QUEUE_FAST_CACHE - // when fast cache not overflow, always flush. - // so we donot care about the queue. - bool fast_cache_overflow = mw_count >= mw_cache->max; - int duration_ms = (int)(mw_last_pkt - mw_first_pkt); - bool match_min_msgs = mw_count > mw_min_msgs; - - // when fast cache overflow, or duration ok, signal to flush. - if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) { - return; - } -#else - int duration_ms = queue->duration(); - bool match_min_msgs = queue->size() > mw_min_msgs; - - // when duration ok, signal to flush. - if (match_min_msgs && duration_ms > mw_duration) { - return; - } -#endif - - // the enqueue will notify this cond. - mw_waiting = true; - // wait for msgs to incoming. - st_cond_wait(mw_wait); } -#endif int SrsConsumer::on_play_client_pause(bool is_pause) { @@ -519,28 +382,6 @@ int SrsConsumer::on_play_client_pause(bool is_pause) return ret; } -#ifdef SRS_PERF_QUEUE_FAST_CACHE -int SrsConsumer::dumps_queue_to_fast_cache() -{ - int ret =ERROR_SUCCESS; - - // fill fast cache with queue. - if ((ret = queue->dump_packets(mw_cache->max, mw_cache->msgs, mw_count)) != ERROR_SUCCESS) { - return ret; - } - // set the timestamp when got message. - if (mw_count > 0) { - SrsMessage* first_msg = mw_cache->msgs[0]; - mw_first_pkt = first_msg->header.timestamp; - - SrsMessage* last_msg = mw_cache->msgs[mw_count - 1]; - mw_last_pkt = last_msg->header.timestamp; - } - - return ret; -} -#endif - SrsGopCache::SrsGopCache() { cached_video_count = 0; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index fac678ed7..023f6d388 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -165,24 +165,6 @@ private: bool paused; // when source id changed, notice all consumers bool should_update_source_id; -#ifdef SRS_PERF_QUEUE_COND_WAIT - // the cond wait for mw. - // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 - st_cond_t mw_wait; - bool mw_waiting; - int mw_min_msgs; - int mw_duration; -#endif -#ifdef SRS_PERF_QUEUE_FAST_CACHE - // use fast cache for msgs - // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 - SrsMessageArray* mw_cache; - // the count of msg in fast cache. - int mw_count; - // the packet time in fast cache. - int64_t mw_first_pkt; - int64_t mw_last_pkt; -#endif public: SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); @@ -216,27 +198,11 @@ public: * @param count the count in array, output param. * @max_count the max count to dequeue, must be positive. */ - virtual int dump_packets(SrsMessageArray* msgs, int* count); -#ifdef SRS_PERF_QUEUE_COND_WAIT - /** - * wait for messages incomming, atleast nb_msgs and in duration. - * @param nb_msgs the messages count to wait. - * @param duration the messgae duration to wait. - */ - virtual void wait(int nb_msgs, int duration); -#endif + virtual int dump_packets(SrsMessageArray* msgs, int& count); /** * when client send the pause message. */ virtual int on_play_client_pause(bool is_pause); -private: -#ifdef SRS_PERF_QUEUE_FAST_CACHE - /** - * dumps the queue to fast cache, - * when fast cache is clear or queue is overflow. - */ - virtual int dumps_queue_to_fast_cache(); -#endif }; /** diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index aaa822fd6..14036f94a 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 58 +#define VERSION_REVISION 59 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 2cfcab65a..cf08a9d72 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -100,28 +100,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * how many msgs can be send entirely. * for play clients to get msgs then totally send out. * for the mw sleep set to 1800, the msgs is about 133. -* @remark, recomment to 256. +* @remark, recomment to 128. */ -#define SRS_PERF_MW_MSGS 256 -/** -* how many msgs atleast to send. -* @remark, recomment to 32. -*/ -#define SRS_PERF_MW_MIN_MSGS 32 - -/** -* whether enable the fast cache. -* @remark this improve performance for large connectios. -* @remark this also introduce complex, default to disable it. -* @see https://github.com/winlinvip/simple-rtmp-server/issues/251 -*/ -#undef SRS_PERF_QUEUE_FAST_CACHE -/** -* whether use cond wait to send messages. -* @remark this improve performance for large connectios. -* @see https://github.com/winlinvip/simple-rtmp-server/issues/251 -*/ -#undef SRS_PERF_QUEUE_COND_WAIT +#define SRS_PERF_MW_MSGS 128 /** * how many chunk stream to cache, [0, N]. diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp index bc84046fe..70ed47f43 100644 --- a/trunk/src/rtmp/srs_protocol_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_stack.cpp @@ -736,8 +736,20 @@ int SrsProtocol::do_send_messages(SrsMessage** msgs, int nb_msgs) if (iov_index <= 0) { return ret; } - srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d", + + // calc the bytes of iovs, for debug. +#if 0 + int nb_bytes = 0; + for (int i = 0; i < iov_index; i++) { + iovec* iov = out_iovs + i; + nb_bytes += iov->iov_len; + } + srs_warn("mw %d msgs %dB in %d iovs, max_msgs=%d, nb_out_iovs=%d", + nb_msgs, nb_bytes, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs); +#else + srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d", nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs); +#endif // send by writev // sendout header and payload by writev.