From d6828a3e58b6542fd01e2bd00b44477067dfe59a Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 9 Apr 2019 09:39:16 +0800 Subject: [PATCH] Refine get_mw_sleep in time unit --- trunk/src/app/srs_app_config.cpp | 12 +++++++----- trunk/src/app/srs_app_config.hpp | 4 ++-- trunk/src/app/srs_app_http_stream.cpp | 11 ++++++----- trunk/src/app/srs_app_recv_thread.hpp | 1 + trunk/src/app/srs_app_rtmp_conn.cpp | 20 ++++++++++---------- trunk/src/app/srs_app_rtmp_conn.hpp | 4 ++-- trunk/src/core/srs_core_performance.hpp | 2 +- trunk/src/main/srs_main_server.cpp | 4 ++-- trunk/src/utest/srs_utest_config.cpp | 4 +++- 9 files changed, 34 insertions(+), 28 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index c344cc60a..113b857d9 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4676,24 +4676,26 @@ srs_utime_t SrsConfig::get_mr_sleep(string vhost) return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS); } -int SrsConfig::get_mw_sleep_ms(string vhost) +srs_utime_t SrsConfig::get_mw_sleep(string vhost) { + static srs_utime_t DEFAULT = SRS_PERF_MW_SLEEP; + SrsConfDirective* conf = get_vhost(vhost); if (!conf) { - return SRS_PERF_MW_SLEEP; + return DEFAULT; } conf = conf->get("play"); if (!conf) { - return SRS_PERF_MW_SLEEP; + return DEFAULT; } conf = conf->get("mw_latency"); if (!conf || conf->arg0().empty()) { - return SRS_PERF_MW_SLEEP; + return DEFAULT; } - return ::atoi(conf->arg0().c_str()); + return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS); } bool SrsConfig::get_realtime_enabled(string vhost) diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 9e9572499..7d22e4f2a 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -781,11 +781,11 @@ public: // TODO: FIXME: add utest for mr config. virtual srs_utime_t get_mr_sleep(std::string vhost); /** - * get the mw sleep time in ms for vhost. + * get the mw sleep time in srs_utime_t for vhost. * @param vhost, the vhost to get the mw sleep time. */ // TODO: FIXME: add utest for mw config. - virtual int get_mw_sleep_ms(std::string vhost); + virtual srs_utime_t get_mw_sleep(std::string vhost); /** * whether min latency mode enabled. * @param vhost, the vhost to get the min_latency. diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 883e14e63..afa96601e 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -589,9 +589,9 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess } } - int mw_sleep = _srs_config->get_mw_sleep_ms(req->vhost) * SRS_UTIME_MILLISECONDS; + srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost); if ((err = hc->set_socket_buffer(mw_sleep)) != srs_success) { - return srs_error_wrap(err, "set mw_sleep"); + return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep); } SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc); @@ -602,7 +602,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess } srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d", - entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, mw_sleep, enc->has_cache(), msgs.max); + entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, int(mw_sleep / SRS_UTIME_MILLISECONDS), + enc->has_cache(), msgs.max); // TODO: free and erase the disabled entry after all related connections is closed. while (entry->enabled) { @@ -622,14 +623,14 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess if (count <= 0) { // Directly use sleep, donot use consumer wait, because we couldn't awake consumer. - srs_usleep(mw_sleep * SRS_UTIME_MILLISECONDS); + srs_usleep(mw_sleep); // ignore when nothing got. continue; } if (pprint->can_print()) { srs_trace("-> " SRS_CONSTS_LOG_HTTP_STREAM " http: got %d msgs, age=%d, min=%d, mw=%d", - count, pprint->age(), SRS_PERF_MW_MIN_MSGS, mw_sleep); + count, pprint->age(), SRS_PERF_MW_MIN_MSGS, int(mw_sleep / SRS_UTIME_MILLISECONDS)); } // sendout all messages. diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index b092a5ed2..785f779ab 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -132,6 +132,7 @@ private: srs_error_t recv_error; SrsConsumer* _consumer; public: + // TODO: FIXME: Refine timeout in time unit. SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms); virtual ~SrsQueueRecvThread(); public: diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 6cc9c7dfe..f40bf8222 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -646,7 +646,7 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source) // Use receiving thread to receive packets from peer. // @see: https://github.com/ossrs/srs/issues/217 - SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP); + SrsQueueRecvThread trd(consumer, rtmp, int(SRS_PERF_MW_SLEEP / SRS_UTIME_MILLISECONDS)); if ((err = trd.start()) != srs_success) { return srs_error_wrap(err, "rtmp: start receive thread"); @@ -688,12 +688,12 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr // setup the mw config. // when mw_sleep changed, resize the socket send buffer. mw_enabled = true; - change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost)); + change_mw_sleep(_srs_config->get_mw_sleep(req->vhost)); // initialize the send_min_interval send_min_interval = _srs_config->get_send_min_interval(req->vhost); srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d", - send_min_interval, mw_sleep, mw_enabled, realtime, tcp_nodelay); + send_min_interval, int(mw_sleep / SRS_UTIME_MILLISECONDS), mw_enabled, realtime, tcp_nodelay); while (true) { // collect elapse for pithy print. @@ -725,10 +725,10 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr // @see https://github.com/ossrs/srs/issues/257 if (realtime) { // for realtime, min required msgs is 0, send when got one+ msgs. - consumer->wait(0, mw_sleep); + consumer->wait(0, int(mw_sleep / SRS_UTIME_MILLISECONDS)); } else { // for no-realtime, got some msgs then send. - consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep); + consumer->wait(SRS_PERF_MW_MIN_MSGS, int(mw_sleep / SRS_UTIME_MILLISECONDS)); } #endif @@ -745,12 +745,12 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr kbps->sample(); srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d", (int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mw_sleep); + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), int(mw_sleep / SRS_UTIME_MILLISECONDS)); } if (count <= 0) { #ifndef SRS_PERF_QUEUE_COND_WAIT - srs_usleep(mw_sleep * 1000); + srs_usleep(mw_sleep); #endif // ignore when nothing got. continue; @@ -1111,14 +1111,14 @@ srs_error_t SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsComm return err; } -void SrsRtmpConn::change_mw_sleep(int sleep_ms) +void SrsRtmpConn::change_mw_sleep(srs_utime_t sleep_v) { if (!mw_enabled) { return; } - set_socket_buffer(sleep_ms * SRS_UTIME_MILLISECONDS); - mw_sleep = sleep_ms; + set_socket_buffer(sleep_v); + mw_sleep = sleep_v; } void SrsRtmpConn::set_sock_options() diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index e1639d517..b52a77768 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -110,7 +110,7 @@ private: // @see https://github.com/ossrs/srs/issues/47 int64_t duration; // the MR(merged-write) sleep time in ms. - int mw_sleep; + srs_utime_t mw_sleep; // the MR(merged-write) only enabled for play. int mw_enabled; // for realtime @@ -158,7 +158,7 @@ private: virtual srs_error_t handle_publish_message(SrsSource* source, SrsCommonMessage* msg); virtual srs_error_t process_publish_message(SrsSource* source, SrsCommonMessage* msg); virtual srs_error_t process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); - virtual void change_mw_sleep(int sleep_ms); + virtual void change_mw_sleep(srs_utime_t sleep_v); virtual void set_sock_options(); private: virtual srs_error_t check_edge_token_traverse_auth(); diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 6e71008d9..efc743c9a 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -92,7 +92,7 @@ * 2000 150 300 */ // the default config of mw. -#define SRS_PERF_MW_SLEEP 350 +#define SRS_PERF_MW_SLEEP (350 * SRS_UTIME_MILLISECONDS) /** * how many msgs can be send entirely. * for play clients to get msgs then totally send out. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 8f6941390..451174e63 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -287,7 +287,7 @@ void show_macro_features() stringstream ss; // mw(merged-write) - ss << "mw sleep:" << SRS_PERF_MW_SLEEP << "ms"; + ss << "mw sleep:" << SRS_PERF_MW_SLEEP / SRS_UTIME_MILLISECONDS << "ms"; // mr(merged-read) ss << ". mr "; @@ -343,7 +343,7 @@ void show_macro_features() possible_mr_latency = SRS_PERF_MR_SLEEP / SRS_UTIME_MILLISECONDS; #endif srs_trace("system default latency in ms: mw(0-%d) + mr(0-%d) + play-queue(0-%d)", - SRS_PERF_MW_SLEEP, possible_mr_latency, SRS_PERF_PLAY_QUEUE*1000); + SRS_PERF_MW_SLEEP / SRS_UTIME_MILLISECONDS, possible_mr_latency, SRS_PERF_PLAY_QUEUE*1000); #ifdef SRS_AUTO_MEM_WATCH #warning "srs memory watcher will hurts performance. user should kill by SIGTERM or init.d script." diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index 0f0781aad..e688584cf 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -1847,9 +1847,11 @@ VOID TEST(ConfigUnitTest, CheckDefaultValues) if (true) { EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF)); EXPECT_EQ(350 * SRS_UTIME_MILLISECONDS, conf.get_mr_sleep("")); + EXPECT_EQ(350 * SRS_UTIME_MILLISECONDS, conf.get_mw_sleep("")); - EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{publish{mr_latency 1000;}}")); + EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{publish{mr_latency 1000;} play{mw_latency 1000;}}")); EXPECT_EQ(1000 * SRS_UTIME_MILLISECONDS, conf.get_mr_sleep("v")); + EXPECT_EQ(1000 * SRS_UTIME_MILLISECONDS, conf.get_mw_sleep("v")); } }