diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index b2f5199a6..0122c35a1 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -598,9 +598,11 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) // collect elapse for pithy print. pithy_print.elapse(); +#ifdef SRS_PERF_QUEUE_COND_WAIT // wait for message to incoming. // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep); +#endif // get messages from consumer. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. @@ -610,8 +612,14 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) return ret; } +#ifdef SRS_PERF_QUEUE_COND_WAIT // we use wait to get messages, so the count must be positive. srs_assert(count > 0); +#else + if (count <= 0) { + st_usleep(mw_sleep * 1000); + } +#endif srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep); // reportable diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 228cb0d32..77a107fda 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -167,6 +167,16 @@ SrsMessageQueue::~SrsMessageQueue() clear(); } +int SrsMessageQueue::size() +{ + return (int)msgs.size(); +} + +int SrsMessageQueue::duration() +{ + return (int)(av_end_time - av_start_time); +} + void SrsMessageQueue::set_queue_size(double queue_size) { queue_size_ms = (int)(queue_size * 1000); @@ -297,28 +307,37 @@ SrsConsumer::SrsConsumer(SrsSource* _source) queue = new SrsMessageQueue(); should_update_source_id = false; +#ifdef SRS_PERF_QUEUE_COND_WAIT mw_wait = st_cond_new(); mw_min_msgs = 0; mw_duration = 0; mw_waiting = false; +#endif +#ifdef SRS_PERF_QUEUE_FAST_CACHE mw_cache = new SrsMessageArray(SRS_PERF_MW_MSGS); mw_count = 0; mw_first_pkt = mw_last_pkt = 0; +#endif } SrsConsumer::~SrsConsumer() { +#ifdef SRS_PERF_QUEUE_FAST_CACHE if (mw_cache) { mw_cache->free(mw_count); mw_count = 0; } srs_freep(mw_cache); +#endif source->on_consumer_destroy(this); srs_freep(jitter); srs_freep(queue); + +#ifdef SRS_PERF_QUEUE_COND_WAIT st_cond_destroy(mw_wait); +#endif } void SrsConsumer::set_queue_size(double queue_size) @@ -347,6 +366,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S } } +#ifdef SRS_PERF_QUEUE_FAST_CACHE // use fast cache if available if (mw_count < mw_cache->max) { // update fast cache timestamps @@ -371,6 +391,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S } } + #ifdef SRS_PERF_QUEUE_COND_WAIT // fire the mw when msgs is enough. if (mw_waiting) { // when fast cache not overflow, always flush. @@ -385,6 +406,26 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S mw_waiting = false; } } + #endif +#else + if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) { + return ret; + } + + #ifdef SRS_PERF_QUEUE_COND_WAIT + // fire the mw when msgs is enough. + if (mw_waiting) { + int duration_ms = queue->duration(); + bool match_min_msgs = queue->size() > mw_min_msgs; + + // when duration ok, signal to flush. + if (match_min_msgs && duration_ms > mw_duration) { + st_cond_signal(mw_wait); + mw_waiting = false; + } + } + #endif +#endif return ret; } @@ -405,6 +446,7 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count) return ret; } +#ifdef SRS_PERF_QUEUE_FAST_CACHE // only dumps an whole array to msgs. for (int i = 0; i < mw_count; i++) { msgs->msgs[i] = mw_cache->msgs[i]; @@ -420,13 +462,26 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count) } return dumps_queue_to_fast_cache(); +#else + + // pump msgs from queue. + int nb_msgs = 0; + if ((ret = queue->dump_packets(msgs->max, msgs->msgs, nb_msgs)) != ERROR_SUCCESS) { + return ret; + } + *count = nb_msgs; + + return ret; +#endif } +#ifdef SRS_PERF_QUEUE_COND_WAIT void SrsConsumer::wait(int nb_msgs, int duration) { mw_min_msgs = nb_msgs; mw_duration = duration; +#ifdef SRS_PERF_QUEUE_FAST_CACHE // when fast cache not overflow, always flush. // so we donot care about the queue. bool fast_cache_overflow = mw_count >= mw_cache->max; @@ -437,12 +492,22 @@ void SrsConsumer::wait(int nb_msgs, int duration) if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) { return; } +#else + int duration_ms = queue->duration(); + bool match_min_msgs = queue->size() > mw_min_msgs; + + // when duration ok, signal to flush. + if (match_min_msgs && duration_ms > mw_duration) { + return; + } +#endif // the enqueue will notify this cond. mw_waiting = true; // wait for msgs to incoming. st_cond_wait(mw_wait); } +#endif int SrsConsumer::on_play_client_pause(bool is_pause) { @@ -454,6 +519,7 @@ int SrsConsumer::on_play_client_pause(bool is_pause) return ret; } +#ifdef SRS_PERF_QUEUE_FAST_CACHE int SrsConsumer::dumps_queue_to_fast_cache() { int ret =ERROR_SUCCESS; @@ -473,6 +539,7 @@ int SrsConsumer::dumps_queue_to_fast_cache() return ret; } +#endif SrsGopCache::SrsGopCache() { diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index b2f93eeec..fac678ed7 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include class SrsPlayEdge; class SrsPublishEdge; @@ -116,6 +117,14 @@ public: SrsMessageQueue(); virtual ~SrsMessageQueue(); public: + /** + * get the size of queue. + */ + virtual int size(); + /** + * get the duration of queue. + */ + virtual int duration(); /** * set the queue size * @param queue_size the queue size in seconds. @@ -156,12 +165,15 @@ private: bool paused; // when source id changed, notice all consumers bool should_update_source_id; +#ifdef SRS_PERF_QUEUE_COND_WAIT // the cond wait for mw. // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 st_cond_t mw_wait; bool mw_waiting; int mw_min_msgs; int mw_duration; +#endif +#ifdef SRS_PERF_QUEUE_FAST_CACHE // use fast cache for msgs // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 SrsMessageArray* mw_cache; @@ -170,6 +182,7 @@ private: // the packet time in fast cache. int64_t mw_first_pkt; int64_t mw_last_pkt; +#endif public: SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); @@ -204,22 +217,26 @@ public: * @max_count the max count to dequeue, must be positive. */ virtual int dump_packets(SrsMessageArray* msgs, int* count); +#ifdef SRS_PERF_QUEUE_COND_WAIT /** * wait for messages incomming, atleast nb_msgs and in duration. * @param nb_msgs the messages count to wait. * @param duration the messgae duration to wait. */ virtual void wait(int nb_msgs, int duration); +#endif /** * when client send the pause message. */ virtual int on_play_client_pause(bool is_pause); private: +#ifdef SRS_PERF_QUEUE_FAST_CACHE /** * dumps the queue to fast cache, * when fast cache is clear or queue is overflow. */ virtual int dumps_queue_to_fast_cache(); +#endif }; /** diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 3b039b76c..aaa822fd6 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 57 +#define VERSION_REVISION 58 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index d7ba665f3..2cfcab65a 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -109,6 +109,20 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #define SRS_PERF_MW_MIN_MSGS 32 +/** +* whether enable the fast cache. +* @remark this improve performance for large connectios. +* @remark this also introduce complex, default to disable it. +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251 +*/ +#undef SRS_PERF_QUEUE_FAST_CACHE +/** +* whether use cond wait to send messages. +* @remark this improve performance for large connectios. +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251 +*/ +#undef SRS_PERF_QUEUE_COND_WAIT + /** * how many chunk stream to cache, [0, N]. * to imporove about 10% performance when chunk size small, and 5% for large chunk.