From 2c259bd95b69f985234f3083891d52147b7a17dd Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 30 Aug 2022 13:41:27 +0800 Subject: [PATCH] Refactor: Extract SrsNetworkKbps from SrsKbps. v5.0.53 --- trunk/configure | 2 +- trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_edge.cpp | 4 +- trunk/src/app/srs_app_edge.hpp | 6 +- trunk/src/app/srs_app_rtmp_conn.cpp | 17 +- trunk/src/app/srs_app_rtmp_conn.hpp | 1 + trunk/src/app/srs_app_srt_conn.cpp | 18 +- trunk/src/app/srs_app_srt_conn.hpp | 1 + trunk/src/app/srs_app_statistic.cpp | 26 +- trunk/src/app/srs_app_statistic.hpp | 4 - trunk/src/core/srs_core_version5.hpp | 2 +- .../src/protocol/srs_protocol_http_client.cpp | 18 +- .../src/protocol/srs_protocol_http_client.hpp | 7 +- trunk/src/protocol/srs_protocol_kbps.cpp | 223 +++--- trunk/src/protocol/srs_protocol_kbps.hpp | 144 ++-- trunk/src/protocol/srs_protocol_rtmp_conn.cpp | 24 +- trunk/src/protocol/srs_protocol_rtmp_conn.hpp | 9 +- trunk/src/utest/srs_utest_protocol.cpp | 381 ---------- trunk/src/utest/srs_utest_protocol2.cpp | 707 ++++++++++++++++++ trunk/src/utest/srs_utest_protocol2.hpp | 16 + 20 files changed, 949 insertions(+), 662 deletions(-) create mode 100644 trunk/src/utest/srs_utest_protocol2.cpp create mode 100644 trunk/src/utest/srs_utest_protocol2.hpp diff --git a/trunk/configure b/trunk/configure index 9c4149f2e..af078f077 100755 --- a/trunk/configure +++ b/trunk/configure @@ -389,7 +389,7 @@ fi if [ $SRS_UTEST = YES ]; then MODULE_FILES=("srs_utest" "srs_utest_amf0" "srs_utest_protocol" "srs_utest_kernel" "srs_utest_core" "srs_utest_config" "srs_utest_rtmp" "srs_utest_http" "srs_utest_avc" "srs_utest_reload" - "srs_utest_mp4" "srs_utest_service" "srs_utest_app" "srs_utest_rtc") + "srs_utest_mp4" "srs_utest_service" "srs_utest_app" "srs_utest_rtc" "srs_utest_protocol2") if [[ $SRS_SRT == YES ]]; then MODULE_FILES+=("srs_utest_srt") fi diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 5d71d7e94..2809f0d24 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 5.0 Changelog +* v5.0, 2022-08-30, Refactor: Extract SrsNetworkKbps from SrsKbps. v5.0.53 * v5.0, 2022-08-30, Remove bandwidth check because falsh is disabled. v5.0.52 * v5.0, 2022-08-30, Refactor: Use compositor for ISrsKbpsDelta. v5.0.51 * v5.0, 2022-08-29, RTC: Stat the WebRTC clients bandwidth. v5.0.50 diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 49b3ae349..46dfcd3be 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -150,7 +150,7 @@ void SrsEdgeRtmpUpstream::set_recv_timeout(srs_utime_t tm) sdk->set_recv_timeout(tm); } -void SrsEdgeRtmpUpstream::kbps_sample(const char* label, int64_t age) +void SrsEdgeRtmpUpstream::kbps_sample(const char* label, srs_utime_t age) { sdk->kbps_sample(label, age); } @@ -377,7 +377,7 @@ void SrsEdgeFlvUpstream::set_recv_timeout(srs_utime_t tm) sdk_->set_recv_timeout(tm); } -void SrsEdgeFlvUpstream::kbps_sample(const char* label, int64_t age) +void SrsEdgeFlvUpstream::kbps_sample(const char* label, srs_utime_t age) { sdk_->kbps_sample(label, age); } diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 971da42e2..1c5425874 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -71,7 +71,7 @@ public: public: virtual void selected(std::string& server, int& port) = 0; virtual void set_recv_timeout(srs_utime_t tm) = 0; - virtual void kbps_sample(const char* label, int64_t age) = 0; + virtual void kbps_sample(const char* label, srs_utime_t age) = 0; }; class SrsEdgeRtmpUpstream : public SrsEdgeUpstream @@ -97,7 +97,7 @@ public: public: virtual void selected(std::string& server, int& port); virtual void set_recv_timeout(srs_utime_t tm); - virtual void kbps_sample(const char* label, int64_t age); + virtual void kbps_sample(const char* label, srs_utime_t age); }; class SrsEdgeFlvUpstream : public SrsEdgeUpstream @@ -129,7 +129,7 @@ public: public: virtual void selected(std::string& server, int& port); virtual void set_recv_timeout(srs_utime_t tm); - virtual void kbps_sample(const char* label, int64_t age); + virtual void kbps_sample(const char* label, srs_utime_t age); }; // The edge used to ingest stream from origin. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 4ce43003e..65bd81482 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -100,9 +100,12 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int cport) ip = cip; port = cport; create_time = srsu2ms(srs_get_system_time()); + trd = new SrsSTCoroutine("rtmp", this, _srs_context->get_id()); + + kbps = new SrsNetworkKbps(); + kbps->set_io(skt, skt); delta_ = new SrsNetworkDelta(); delta_->set_io(skt, skt); - trd = new SrsSTCoroutine("rtmp", this, _srs_context->get_id()); rtmp = new SrsRtmpServer(skt); refer = new SrsRefer(); @@ -134,6 +137,7 @@ SrsRtmpConn::~SrsRtmpConn() } srs_freep(trd); + srs_freep(kbps); srs_freep(delta_); srs_freep(skt); @@ -752,7 +756,10 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons // reportable if (pprint->can_print()) { - srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, mw=%d/%d", (int)pprint->age(), count, srsu2msi(mw_sleep), mw_msgs); + kbps->sample(); + srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d", + (int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs); } if (count <= 0) { @@ -918,9 +925,13 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre // reportable if (pprint->can_print()) { + kbps->sample(); bool mr = _srs_config->get_mr_enabled(req->vhost); srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost); - srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, mr=%d/%d, p1stpt=%d, pnt=%d", (int)pprint->age(), mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout)); + srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d", + (int)pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep), + srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout)); } } diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 2b1ff3c4d..2f234b1c8 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -113,6 +113,7 @@ private: int port; // The delta for statistic. SrsNetworkDelta* delta_; + SrsNetworkKbps* kbps; // The create time in milliseconds. // for current connection to log self create time and calculate the living time. int64_t create_time; diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index b9c84e8d1..be484c8d5 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -160,11 +160,14 @@ SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, s srt_fd_ = srt_fd; srt_conn_ = new SrsSrtConnection(srt_fd_); - delta_ = new SrsNetworkDelta(); - delta_->set_io(srt_conn_, srt_conn_); ip_ = ip; port_ = port; + kbps_ = new SrsNetworkKbps(); + kbps_->set_io(srt_conn_, srt_conn_); + delta_ = new SrsNetworkDelta(); + delta_->set_io(srt_conn_, srt_conn_); + trd_ = new SrsSTCoroutine("ts-srt", this, _srs_context->get_id()); srt_source_ = NULL; @@ -176,6 +179,7 @@ SrsMpegtsSrtConn::~SrsMpegtsSrtConn() { srs_freep(trd_); + srs_freep(kbps_); srs_freep(delta_); srs_freep(srt_conn_); srs_freep(req_); @@ -408,7 +412,10 @@ srs_error_t SrsMpegtsSrtConn::do_publishing() s.pktRecv(), s.pktRcvLoss(), s.pktRcvRetrans(), s.pktRcvDrop()); } - srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " time=%d, packets=%d", (int)pprint->age(), nb_packets); + kbps_->sample(); + srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " time=%" PRId64 ", packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", + srsu2ms(pprint->age()), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(), + kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m()); nb_packets = 0; } @@ -485,7 +492,10 @@ srs_error_t SrsMpegtsSrtConn::do_playing() s.pktSent(), s.pktSndLoss(), s.pktRetrans(), s.pktSndDrop()); } - srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " time=%d, packets=%d", (int)pprint->age(), nb_packets); + kbps_->sample(); + srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " time=%" PRId64 ", packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", + srsu2ms(pprint->age()), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(), + kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m()); nb_packets = 0; } diff --git a/trunk/src/app/srs_app_srt_conn.hpp b/trunk/src/app/srs_app_srt_conn.hpp index 709aa98c5..a5e6b128f 100644 --- a/trunk/src/app/srs_app_srt_conn.hpp +++ b/trunk/src/app/srs_app_srt_conn.hpp @@ -116,6 +116,7 @@ private: srs_srt_t srt_fd_; SrsSrtConnection* srt_conn_; SrsNetworkDelta* delta_; + SrsNetworkKbps* kbps_; std::string ip_; int port_; SrsCoroutine* trd_; diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index a8369837f..dd37df89a 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -31,10 +31,8 @@ SrsStatisticVhost::SrsStatisticVhost() { id = srs_generate_stat_vid(); - clk = new SrsWallClock(); - kbps = new SrsKbps(clk); - kbps->set_io(NULL, NULL); - + kbps = new SrsKbps(); + nb_clients = 0; nb_streams = 0; } @@ -42,7 +40,6 @@ SrsStatisticVhost::SrsStatisticVhost() SrsStatisticVhost::~SrsStatisticVhost() { srs_freep(kbps); - srs_freep(clk); } srs_error_t SrsStatisticVhost::dumps(SrsJsonObject* obj) @@ -97,10 +94,8 @@ SrsStatisticStream::SrsStatisticStream() width = 0; height = 0; - clk = new SrsWallClock(); - kbps = new SrsKbps(clk); - kbps->set_io(NULL, NULL); - + kbps = new SrsKbps(); + nb_clients = 0; frames = new SrsPps(); } @@ -108,7 +103,6 @@ SrsStatisticStream::SrsStatisticStream() SrsStatisticStream::~SrsStatisticStream() { srs_freep(kbps); - srs_freep(clk); srs_freep(frames); } @@ -203,15 +197,12 @@ SrsStatisticClient::SrsStatisticClient() type = SrsRtmpConnUnknown; create = srs_get_system_time(); - clk = new SrsWallClock(); - kbps = new SrsKbps(clk); - kbps->set_io(NULL, NULL); + kbps = new SrsKbps(); } SrsStatisticClient::~SrsStatisticClient() { srs_freep(kbps); - srs_freep(clk); srs_freep(req); } @@ -246,16 +237,13 @@ SrsStatistic* SrsStatistic::_instance = NULL; SrsStatistic::SrsStatistic() { - clk = new SrsWallClock(); - kbps = new SrsKbps(clk); - kbps->set_io(NULL, NULL); + kbps = new SrsKbps(); } SrsStatistic::~SrsStatistic() { srs_freep(kbps); - srs_freep(clk); - + if (true) { std::map::iterator it; for (it = vhosts.begin(); it != vhosts.end(); it++) { diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 88ac4fc6a..07d9a918a 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -38,7 +38,6 @@ public: public: // The vhost total kbps. SrsKbps* kbps; - SrsWallClock* clk; public: SrsStatisticVhost(); virtual ~SrsStatisticVhost(); @@ -61,7 +60,6 @@ public: public: // The stream total kbps. SrsKbps* kbps; - SrsWallClock* clk; // The fps of stream. SrsPps* frames; public: @@ -110,7 +108,6 @@ public: public: // The stream total kbps. SrsKbps* kbps; - SrsWallClock* clk; public: SrsStatisticClient(); virtual ~SrsStatisticClient(); @@ -141,7 +138,6 @@ private: std::map clients; // The server total kbps. SrsKbps* kbps; - SrsWallClock* clk; private: SrsStatistic(); virtual ~SrsStatistic(); diff --git a/trunk/src/core/srs_core_version5.hpp b/trunk/src/core/srs_core_version5.hpp index d29de7d87..1a5e273a8 100644 --- a/trunk/src/core/srs_core_version5.hpp +++ b/trunk/src/core/srs_core_version5.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 5 #define VERSION_MINOR 0 -#define VERSION_REVISION 52 +#define VERSION_REVISION 53 #endif diff --git a/trunk/src/protocol/srs_protocol_http_client.cpp b/trunk/src/protocol/srs_protocol_http_client.cpp index d55b7bec8..4e696c4dc 100644 --- a/trunk/src/protocol/srs_protocol_http_client.cpp +++ b/trunk/src/protocol/srs_protocol_http_client.cpp @@ -253,8 +253,7 @@ SrsHttpClient::SrsHttpClient() { transport = NULL; ssl_transport = NULL; - clk = new SrsWallClock(); - kbps = new SrsKbps(clk); + kbps = new SrsNetworkKbps(); parser = NULL; recv_timeout = timeout = SRS_UTIME_NO_TIMEOUT; port = 0; @@ -263,9 +262,8 @@ SrsHttpClient::SrsHttpClient() SrsHttpClient::~SrsHttpClient() { disconnect(); - + srs_freep(kbps); - srs_freep(clk); srs_freep(parser); } @@ -410,18 +408,18 @@ void SrsHttpClient::set_recv_timeout(srs_utime_t tm) recv_timeout = tm; } -void SrsHttpClient::kbps_sample(const char* label, int64_t age) +void SrsHttpClient::kbps_sample(const char* label, srs_utime_t age) { kbps->sample(); - + int sr = kbps->get_send_kbps(); int sr30s = kbps->get_send_kbps_30s(); int sr5m = kbps->get_send_kbps_5m(); int rr = kbps->get_recv_kbps(); int rr30s = kbps->get_recv_kbps_30s(); int rr5m = kbps->get_recv_kbps_5m(); - - srs_trace("<- %s time=%" PRId64 ", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, sr, sr30s, sr5m, rr, rr30s, rr5m); + + srs_trace("<- %s time=%" PRId64 ", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, srsu2ms(age), sr, sr30s, sr5m, rr, rr30s, rr5m); } void SrsHttpClient::disconnect() @@ -450,9 +448,9 @@ srs_error_t SrsHttpClient::connect() // Set the recv/send timeout in srs_utime_t. transport->set_recv_timeout(recv_timeout); transport->set_send_timeout(timeout); - - kbps->set_io(transport, transport); + kbps->set_io(transport, transport); + if (schema_ != "https") { return err; } diff --git a/trunk/src/protocol/srs_protocol_http_client.hpp b/trunk/src/protocol/srs_protocol_http_client.hpp index 0fa1481db..26774973c 100644 --- a/trunk/src/protocol/srs_protocol_http_client.hpp +++ b/trunk/src/protocol/srs_protocol_http_client.hpp @@ -21,7 +21,7 @@ class SrsHttpUri; class SrsHttpParser; class ISrsHttpMessage; class SrsStSocket; -class SrsKbps; +class SrsNetworkKbps; class SrsWallClock; class SrsTcpClient; @@ -63,8 +63,7 @@ private: SrsTcpClient* transport; SrsHttpParser* parser; std::map headers; - SrsKbps* kbps; - SrsWallClock* clk; + SrsNetworkKbps* kbps; private: // The timeout in srs_utime_t. srs_utime_t timeout; @@ -103,7 +102,7 @@ public: public: virtual void set_recv_timeout(srs_utime_t tm); public: - virtual void kbps_sample(const char* label, int64_t age); + virtual void kbps_sample(const char* label, srs_utime_t age); private: virtual void disconnect(); virtual srs_error_t connect(); diff --git a/trunk/src/protocol/srs_protocol_kbps.cpp b/trunk/src/protocol/srs_protocol_kbps.cpp index 3e2929d8b..8f10abaa7 100644 --- a/trunk/src/protocol/srs_protocol_kbps.cpp +++ b/trunk/src/protocol/srs_protocol_kbps.cpp @@ -11,52 +11,46 @@ SrsKbpsSlice::SrsKbpsSlice(SrsWallClock* c) { clk = c; - io = NULL; - last_bytes = io_bytes_base = starttime = bytes = delta_bytes = 0; + starttime = 0; + bytes = 0; } SrsKbpsSlice::~SrsKbpsSlice() { } -int64_t SrsKbpsSlice::get_total_bytes() -{ - return bytes + last_bytes - io_bytes_base; -} - void SrsKbpsSlice::sample() { srs_utime_t now = clk->now(); - int64_t total_bytes = get_total_bytes(); - + if (sample_30s.time < 0) { - sample_30s.update(total_bytes, now, 0); + sample_30s.update(bytes, now, 0); } if (sample_1m.time < 0) { - sample_1m.update(total_bytes, now, 0); + sample_1m.update(bytes, now, 0); } if (sample_5m.time < 0) { - sample_5m.update(total_bytes, now, 0); + sample_5m.update(bytes, now, 0); } if (sample_60m.time < 0) { - sample_60m.update(total_bytes, now, 0); + sample_60m.update(bytes, now, 0); } if (now - sample_30s.time >= 30 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_30s.total) * 8 / srsu2ms(now - sample_30s.time)); - sample_30s.update(total_bytes, now, kbps); + int kbps = (int)((bytes - sample_30s.total) * 8 / srsu2ms(now - sample_30s.time)); + sample_30s.update(bytes, now, kbps); } if (now - sample_1m.time >= 60 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_1m.total) * 8 / srsu2ms(now - sample_1m.time)); - sample_1m.update(total_bytes, now, kbps); + int kbps = (int)((bytes - sample_1m.total) * 8 / srsu2ms(now - sample_1m.time)); + sample_1m.update(bytes, now, kbps); } if (now - sample_5m.time >= 300 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_5m.total) * 8 / srsu2ms(now - sample_5m.time)); - sample_5m.update(total_bytes, now, kbps); + int kbps = (int)((bytes - sample_5m.total) * 8 / srsu2ms(now - sample_5m.time)); + sample_5m.update(bytes, now, kbps); } if (now - sample_60m.time >= 3600 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_60m.total) * 8 / srsu2ms(now - sample_60m.time)); - sample_60m.update(total_bytes, now, kbps); + int kbps = (int)((bytes - sample_60m.total) * 8 / srsu2ms(now - sample_60m.time)); + sample_60m.update(bytes, now, kbps); } } @@ -138,57 +132,22 @@ void SrsNetworkDelta::remark(int64_t* in, int64_t* out) in_delta_ = out_delta_ = 0; } -SrsKbps::SrsKbps(SrsWallClock* c) : is(c), os(c) +SrsKbps::SrsKbps(SrsWallClock* c) { - clk = c; + clk = c ? c : _srs_clock; + is = new SrsKbpsSlice(clk); + os = new SrsKbpsSlice(clk); } SrsKbps::~SrsKbps() { -} - -void SrsKbps::set_io(ISrsProtocolStatistic* in, ISrsProtocolStatistic* out) -{ - // set input stream - // now, set start time. - if (is.starttime == 0) { - is.starttime = clk->now(); - } - // save the old in bytes. - if (is.io) { - is.bytes += is.io->get_recv_bytes() - is.io_bytes_base; - } - // use new io. - is.io = in; - is.last_bytes = is.io_bytes_base = 0; - if (in) { - is.last_bytes = is.io_bytes_base = in->get_recv_bytes(); - } - // resample - is.sample(); - - // set output stream - // now, set start time. - if (os.starttime == 0) { - os.starttime = clk->now(); - } - // save the old in bytes. - if (os.io) { - os.bytes += os.io->get_send_bytes() - os.io_bytes_base; - } - // use new io. - os.io = out; - os.last_bytes = os.io_bytes_base = 0; - if (out) { - os.last_bytes = os.io_bytes_base = out->get_send_bytes(); - } - // resample - os.sample(); + srs_freep(is); + srs_freep(os); } int SrsKbps::get_send_kbps() { - int duration = srsu2ms(clk->now() - is.starttime); + int duration = srsu2ms(clk->now() - is->starttime); if (duration <= 0) { return 0; } @@ -199,7 +158,7 @@ int SrsKbps::get_send_kbps() int SrsKbps::get_recv_kbps() { - int duration = srsu2ms(clk->now() - os.starttime); + int duration = srsu2ms(clk->now() - os->starttime); if (duration <= 0) { return 0; } @@ -210,90 +169,118 @@ int SrsKbps::get_recv_kbps() int SrsKbps::get_send_kbps_30s() { - return os.sample_30s.rate; + return os->sample_30s.rate; } int SrsKbps::get_recv_kbps_30s() { - return is.sample_30s.rate; + return is->sample_30s.rate; } int SrsKbps::get_send_kbps_5m() { - return os.sample_5m.rate; + return os->sample_5m.rate; } int SrsKbps::get_recv_kbps_5m() { - return is.sample_5m.rate; + return is->sample_5m.rate; +} + +void SrsKbps::add_delta(ISrsKbpsDelta* delta) +{ + if (!delta) return; + + int64_t in, out; + delta->remark(&in, &out); + add_delta(in, out); } void SrsKbps::add_delta(int64_t in, int64_t out) { // update the total bytes - is.last_bytes += in; - os.last_bytes += out; + is->bytes += in; + os->bytes += out; // we donot sample, please use sample() to do resample. } void SrsKbps::sample() { - // update the total bytes - if (os.io) { - os.last_bytes = os.io->get_send_bytes(); - } - - if (is.io) { - is.last_bytes = is.io->get_recv_bytes(); - } - - // resample - is.sample(); - os.sample(); + is->sample(); + os->sample(); } int64_t SrsKbps::get_send_bytes() { - // we must calc the send bytes dynamically, - // to not depends on the sample(which used to calc the kbps). - // @read https://github.com/ossrs/srs/issues/588 - - // session start bytes. - int64_t bytes = os.bytes; - - // When exists active session, use it to get the last bytes. - if (os.io) { - bytes += os.io->get_send_bytes() - os.io_bytes_base; - return bytes; - } - - // When no active session, the last_bytes record the last valid bytes. - // TODO: Maybe the bellow bytes is zero, because the ios.io.out is NULL. - bytes += os.last_bytes - os.io_bytes_base; - - return bytes; + return os->bytes; } int64_t SrsKbps::get_recv_bytes() { - // we must calc the send bytes dynamically, - // to not depends on the sample(which used to calc the kbps). - // @read https://github.com/ossrs/srs/issues/588 - - // session start bytes. - int64_t bytes = is.bytes; - - // When exists active session, use it to get the last bytes. - if (is.io) { - bytes += is.io->get_recv_bytes() - is.io_bytes_base; - return bytes; - } - - // When no active session, the last_bytes record the last valid bytes. - // TODO: Maybe the bellow bytes is zero, because the ios.io.out is NULL. - bytes += is.last_bytes - is.io_bytes_base; - - return bytes; + return is->bytes; +} + +SrsNetworkKbps::SrsNetworkKbps(SrsWallClock* clock) +{ + delta_ = new SrsNetworkDelta(); + kbps_ = new SrsKbps(clock); +} + +SrsNetworkKbps::~SrsNetworkKbps() +{ + srs_freep(kbps_); + srs_freep(delta_); +} + +void SrsNetworkKbps::set_io(ISrsProtocolStatistic* in, ISrsProtocolStatistic* out) +{ + delta_->set_io(in, out); +} + +void SrsNetworkKbps::sample() +{ + kbps_->add_delta(delta_); + kbps_->sample(); +} + +int SrsNetworkKbps::get_send_kbps() +{ + return kbps_->get_send_kbps(); +} + +int SrsNetworkKbps::get_recv_kbps() +{ + return kbps_->get_recv_kbps(); +} + +int SrsNetworkKbps::get_send_kbps_30s() +{ + return kbps_->get_send_kbps_30s(); +} + +int SrsNetworkKbps::get_recv_kbps_30s() +{ + return kbps_->get_recv_kbps_30s(); +} + +int SrsNetworkKbps::get_send_kbps_5m() +{ + return kbps_->get_send_kbps_5m(); +} + +int SrsNetworkKbps::get_recv_kbps_5m() +{ + return kbps_->get_recv_kbps_5m(); +} + +int64_t SrsNetworkKbps::get_send_bytes() +{ + return kbps_->get_send_bytes(); +} + +int64_t SrsNetworkKbps::get_recv_bytes() +{ + return kbps_->get_recv_bytes(); } diff --git a/trunk/src/protocol/srs_protocol_kbps.hpp b/trunk/src/protocol/srs_protocol_kbps.hpp index 2eb901888..d3ece15fe 100644 --- a/trunk/src/protocol/srs_protocol_kbps.hpp +++ b/trunk/src/protocol/srs_protocol_kbps.hpp @@ -13,64 +13,42 @@ #include /** - * a slice of kbps statistic, for input or output. - * a slice contains a set of sessions, which has a base offset of bytes, - * where a slice is: - * starttime(oldest session startup time) - * bytes(total bytes of previous sessions) - * io_bytes_base(bytes offset of current session) - * last_bytes(bytes of current session) - * so, the total send bytes now is: - * send_bytes = bytes + last_bytes - io_bytes_base - * so, the bytes sent duration current session is: - * send_bytes = last_bytes - io_bytes_base - * @remark use set_io to start new session. - * @remakr the slice is a data collection object driven by SrsKbps. + * The slice of kbps statistic, for input or output. */ class SrsKbpsSlice { private: SrsWallClock* clk; public: - // the slice io used for SrsKbps to invoke, - // the SrsKbpsSlice itself never use it. - ISrsProtocolStatistic* io; // session startup bytes // @remark, use total_bytes() to get the total bytes of slice. int64_t bytes; // slice starttime, the first time to record bytes. srs_utime_t starttime; - // session startup bytes number for io when set it, - // the base offset of bytes for io. - int64_t io_bytes_base; - // last updated bytes number, - // cache for io maybe freed. - int64_t last_bytes; // samples SrsRateSample sample_30s; SrsRateSample sample_1m; SrsRateSample sample_5m; SrsRateSample sample_60m; -public: - // for the delta bytes. - int64_t delta_bytes; public: SrsKbpsSlice(SrsWallClock* clk); virtual ~SrsKbpsSlice(); public: - // Get current total bytes, it doesn't depend on sample(). - virtual int64_t get_total_bytes(); // Resample the slice to calculate the kbps. virtual void sample(); }; /** - * the interface which provices delta of bytes. - * for a delta, for example, a live stream connection, we can got the delta by: + * The interface which provices delta of bytes. For example, we got a delta from a TCP client: * ISrsKbpsDelta* delta = ...; + * Now, we can add delta simple to a kbps: + * kbps->add_delta(delta); + * Or by multiple kbps: * int64_t in, out; * delta->remark(&in, &out); - * kbps->add_delta(in, out); + * kbps1->add_delta(in, out); + * kbpsN->add_delta(in, out); + * Then you're able to use the kbps object. */ class ISrsKbpsDelta { @@ -78,9 +56,8 @@ public: ISrsKbpsDelta(); virtual ~ISrsKbpsDelta(); public: - /** - * resample to generate the value of delta bytes. - */ + // Resample to get the value of delta bytes. + // @remark If no delta bytes, both in and out will be set to 0. virtual void remark(int64_t* in, int64_t* out) = 0; }; @@ -115,6 +92,7 @@ public: SrsNetworkDelta(); virtual ~SrsNetworkDelta(); public: + // Switch the under-layer network io, we use the bytes as a fresh delta. virtual void set_io(ISrsProtocolStatistic* in, ISrsProtocolStatistic* out); // Interface ISrsKbpsDelta. public: @@ -122,86 +100,64 @@ public: }; /** - * to statistic the kbps of io. - * itself can be a statistic source, for example, used for SRS bytes stat. - * there are some usage scenarios: - * 1. connections to calc kbps by sample(): - * SrsKbps* kbps = ...; - * kbps->set_io(in, out) - * kbps->sample() - * kbps->get_xxx_kbps(). - * the connections know how many bytes already send/recv. - * 2. server to calc kbps by add_delta(): + * To statistic the kbps. For example, we got a set of connections and add the total delta: * SrsKbps* kbps = ...; - * kbps->set_io(NULL, NULL) - * for each connection in connections: - * ISrsKbpsDelta* delta = connection; // where connection implements ISrsKbpsDelta - * int64_t in, out; - * delta->remark(&in, &out) - * kbps->add_delta(in, out) + * for conn in connections: + * kbps->add_delta(conn->delta()) // Which return an ISrsKbpsDelta object. + * Then we sample and got the total kbps: * kbps->sample() * kbps->get_xxx_kbps(). - * 3. kbps used as ISrsProtocolStatistic, to provides raw bytes: - * SrsKbps* kbps = ...; - * kbps->set_io(in, out); - * // both kbps->get_recv_bytes() and kbps->get_send_bytes() are available. - * // we can use the kbps as the data source of another kbps: - * SrsKbps* user = ...; - * user->set_io(kbps, kbps); - * the server never know how many bytes already send/recv, for the connection maybe closed. */ -class SrsKbps : public ISrsProtocolStatistic +class SrsKbps { private: - SrsKbpsSlice is; - SrsKbpsSlice os; + SrsKbpsSlice* is; + SrsKbpsSlice* os; SrsWallClock* clk; public: - // We won't free the clock c. - SrsKbps(SrsWallClock* c); + // Note that we won't free the clock c. + SrsKbps(SrsWallClock* c = NULL); virtual ~SrsKbps(); public: - /** - * set io to start new session. - * set the underlayer reader/writer, - * if the io destroied, for instance, the forwarder reconnect, - * user must set the io of SrsKbps to NULL to continue to use the kbps object. - * @param in the input stream statistic. can be NULL. - * @param out the output stream statistic. can be NULL. - * @remark if in/out is NULL, use the cached data for kbps. - * @remark User must set_io(NULL, NULL) then free the in and out. - */ - virtual void set_io(ISrsProtocolStatistic* in, ISrsProtocolStatistic* out); -public: - /** - * get total kbps, duration is from the startup of io. - * @remark, use sample() to update data. - */ + // Get total average kbps. virtual int get_send_kbps(); virtual int get_recv_kbps(); - // 30s + // Get the average kbps in 30s. virtual int get_send_kbps_30s(); virtual int get_recv_kbps_30s(); - // 5m + // Get the average kbps in 5m or 300s. virtual int get_send_kbps_5m(); virtual int get_recv_kbps_5m(); public: - /** - * add delta to kbps clac mechenism. - * we donot know the total bytes, but know the delta, for instance, - * for rtmp server to calc total bytes and kbps. - * @remark user must invoke sample() to calc result after invoke this method. - * @param delta, assert should never be NULL. - */ + // Add delta to kbps. Please call sample() after all deltas are added to kbps. virtual void add_delta(int64_t in, int64_t out); - /** - * resample all samples, ignore if in/out is NULL. - * used for user to calc the kbps, to sample new kbps value. - * @remark if user, for instance, the rtmp server to calc the total bytes, - * use the add_delta() is better solutions. - */ + virtual void add_delta(ISrsKbpsDelta* delta); + // Sample the kbps to get the kbps in N seconds. + virtual void sample(); +public: + virtual int64_t get_send_bytes(); + virtual int64_t get_recv_bytes(); +}; + +// A sugar to use SrsNetworkDelta and SrsKbps. +class SrsNetworkKbps +{ +private: + SrsNetworkDelta* delta_; + SrsKbps* kbps_; +public: + SrsNetworkKbps(SrsWallClock* c = NULL); + virtual ~SrsNetworkKbps(); +public: + virtual void set_io(ISrsProtocolStatistic* in, ISrsProtocolStatistic* out); virtual void sample(); -// Interface ISrsProtocolStatistic +public: + virtual int get_send_kbps(); + virtual int get_recv_kbps(); + virtual int get_send_kbps_30s(); + virtual int get_recv_kbps_30s(); + virtual int get_send_kbps_5m(); + virtual int get_recv_kbps_5m(); public: virtual int64_t get_send_bytes(); virtual int64_t get_recv_bytes(); diff --git a/trunk/src/protocol/srs_protocol_rtmp_conn.cpp b/trunk/src/protocol/srs_protocol_rtmp_conn.cpp index 35c95ecf5..bcffe5e58 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_conn.cpp +++ b/trunk/src/protocol/srs_protocol_rtmp_conn.cpp @@ -18,9 +18,8 @@ using namespace std; SrsBasicRtmpClient::SrsBasicRtmpClient(string r, srs_utime_t ctm, srs_utime_t stm) { - clk = new SrsWallClock(); - kbps = new SrsKbps(clk); - + kbps = new SrsNetworkKbps(); + url = r; connect_timeout = ctm; stream_timeout = stm; @@ -39,7 +38,6 @@ SrsBasicRtmpClient::~SrsBasicRtmpClient() { close(); srs_freep(kbps); - srs_freep(clk); srs_freep(req); } @@ -52,7 +50,7 @@ srs_error_t SrsBasicRtmpClient::connect() transport = new SrsTcpClient(req->host, req->port, srs_utime_t(connect_timeout)); client = new SrsRtmpClient(transport); kbps->set_io(transport, transport); - + if ((err = transport->connect()) != srs_success) { close(); return srs_error_wrap(err, "connect"); @@ -168,32 +166,32 @@ srs_error_t SrsBasicRtmpClient::play(int chunk_size, bool with_vhost, std::strin return err; } -void SrsBasicRtmpClient::kbps_sample(const char* label, int64_t age) +void SrsBasicRtmpClient::kbps_sample(const char* label, srs_utime_t age) { kbps->sample(); - + int sr = kbps->get_send_kbps(); int sr30s = kbps->get_send_kbps_30s(); int sr5m = kbps->get_send_kbps_5m(); int rr = kbps->get_recv_kbps(); int rr30s = kbps->get_recv_kbps_30s(); int rr5m = kbps->get_recv_kbps_5m(); - - srs_trace("<- %s time=%" PRId64 ", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, sr, sr30s, sr5m, rr, rr30s, rr5m); + + srs_trace("<- %s time=%" PRId64 ", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, srsu2ms(age), sr, sr30s, sr5m, rr, rr30s, rr5m); } -void SrsBasicRtmpClient::kbps_sample(const char* label, int64_t age, int msgs) +void SrsBasicRtmpClient::kbps_sample(const char* label, srs_utime_t age, int msgs) { kbps->sample(); - + int sr = kbps->get_send_kbps(); int sr30s = kbps->get_send_kbps_30s(); int sr5m = kbps->get_send_kbps_5m(); int rr = kbps->get_recv_kbps(); int rr30s = kbps->get_recv_kbps_30s(); int rr5m = kbps->get_recv_kbps_5m(); - - srs_trace("<- %s time=%" PRId64 ", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, msgs, sr, sr30s, sr5m, rr, rr30s, rr5m); + + srs_trace("<- %s time=%" PRId64 ", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", label, srsu2ms(age), msgs, sr, sr30s, sr5m, rr, rr30s, rr5m); } int SrsBasicRtmpClient::sid() diff --git a/trunk/src/protocol/srs_protocol_rtmp_conn.hpp b/trunk/src/protocol/srs_protocol_rtmp_conn.hpp index ec39d13eb..a2a1cd386 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_conn.hpp +++ b/trunk/src/protocol/srs_protocol_rtmp_conn.hpp @@ -17,7 +17,7 @@ class SrsRtmpClient; class SrsCommonMessage; class SrsSharedPtrMessage; class SrsPacket; -class SrsKbps; +class SrsNetworkKbps; class SrsWallClock; // The simple RTMP client, provides friendly APIs. @@ -38,8 +38,7 @@ protected: private: SrsTcpClient* transport; SrsRtmpClient* client; - SrsKbps* kbps; - SrsWallClock* clk; + SrsNetworkKbps* kbps; int stream_id; public: // Constructor. @@ -59,8 +58,8 @@ protected: public: virtual srs_error_t publish(int chunk_size, bool with_vhost = true, std::string* pstream = NULL); virtual srs_error_t play(int chunk_size, bool with_vhost = true, std::string* pstream = NULL); - virtual void kbps_sample(const char* label, int64_t age); - virtual void kbps_sample(const char* label, int64_t age, int msgs); + virtual void kbps_sample(const char* label, srs_utime_t age); + virtual void kbps_sample(const char* label, srs_utime_t age, int msgs); virtual int sid(); public: virtual srs_error_t recv_message(SrsCommonMessage** pmsg); diff --git a/trunk/src/utest/srs_utest_protocol.cpp b/trunk/src/utest/srs_utest_protocol.cpp index ca3c124b3..8bc6a34da 100644 --- a/trunk/src/utest/srs_utest_protocol.cpp +++ b/trunk/src/utest/srs_utest_protocol.cpp @@ -6042,387 +6042,6 @@ VOID TEST(ProtocolHTTPTest, ParseHTTPMessage) } } -VOID TEST(ProtocolKbpsTest, Connections) -{ - if (true) { - MockWallClock* clock = new MockWallClock(); - SrsAutoFree(MockWallClock, clock); - MockStatistic* io = new MockStatistic(); - SrsAutoFree(MockStatistic, io); - - SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); - SrsAutoFree(SrsKbps, kbps); - kbps->set_io(io, io); - - // No data, 0kbps. - kbps->sample(); - - EXPECT_EQ(0, kbps->get_recv_kbps()); - EXPECT_EQ(0, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(0, kbps->get_send_kbps()); - EXPECT_EQ(0, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - - // 800kbps in 30s. - clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); - io->set_in(30 * 100 * 1000)->set_out(30 * 100 * 1000); - kbps->sample(); - - EXPECT_EQ(800, kbps->get_recv_kbps()); - EXPECT_EQ(800, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(800, kbps->get_send_kbps()); - EXPECT_EQ(800, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - - // 800kbps in 300s. - clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS); - io->set_in(330 * 100 * 1000)->set_out(330 * 100 * 1000); - kbps->sample(); - - EXPECT_EQ(800, kbps->get_recv_kbps()); - EXPECT_EQ(800, kbps->get_recv_kbps_30s()); - EXPECT_EQ(800, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(800, kbps->get_send_kbps()); - EXPECT_EQ(800, kbps->get_send_kbps_30s()); - EXPECT_EQ(800, kbps->get_send_kbps_5m()); - } - - if (true) { - MockWallClock* clock = new MockWallClock(); - SrsAutoFree(MockWallClock, clock); - MockStatistic* io = new MockStatistic(); - SrsAutoFree(MockStatistic, io); - - SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); - SrsAutoFree(SrsKbps, kbps); - kbps->set_io(io, io); - - // No data, 0kbps. - kbps->sample(); - - EXPECT_EQ(0, kbps->get_recv_kbps()); - EXPECT_EQ(0, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(0, kbps->get_send_kbps()); - EXPECT_EQ(0, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - - // 800kbps in 30s. - clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); - io->set_in(30 * 100 * 1000); - kbps->sample(); - - EXPECT_EQ(800, kbps->get_recv_kbps()); - EXPECT_EQ(800, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(0, kbps->get_send_kbps()); - EXPECT_EQ(0, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - - // 800kbps in 300s. - clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS); - io->set_in(330 * 100 * 1000); - kbps->sample(); - - EXPECT_EQ(800, kbps->get_recv_kbps()); - EXPECT_EQ(800, kbps->get_recv_kbps_30s()); - EXPECT_EQ(800, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(0, kbps->get_send_kbps()); - EXPECT_EQ(0, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - } - - if (true) { - MockWallClock* clock = new MockWallClock(); - SrsAutoFree(MockWallClock, clock); - MockStatistic* io = new MockStatistic(); - SrsAutoFree(MockStatistic, io); - - SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); - SrsAutoFree(SrsKbps, kbps); - kbps->set_io(io, io); - - // No data, 0kbps. - kbps->sample(); - - EXPECT_EQ(0, kbps->get_recv_kbps()); - EXPECT_EQ(0, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(0, kbps->get_send_kbps()); - EXPECT_EQ(0, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - - // 800kbps in 30s. - clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); - io->set_out(30 * 100 * 1000); - kbps->sample(); - - EXPECT_EQ(0, kbps->get_recv_kbps()); - EXPECT_EQ(0, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(800, kbps->get_send_kbps()); - EXPECT_EQ(800, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - - // 800kbps in 300s. - clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS); - io->set_out(330 * 100 * 1000); - kbps->sample(); - - EXPECT_EQ(0, kbps->get_recv_kbps()); - EXPECT_EQ(0, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(800, kbps->get_send_kbps()); - EXPECT_EQ(800, kbps->get_send_kbps_30s()); - EXPECT_EQ(800, kbps->get_send_kbps_5m()); - } -} - -VOID TEST(ProtocolKbpsTest, Delta) -{ - if (true) { - MockWallClock* clock = new MockWallClock(); - SrsAutoFree(MockWallClock, clock); - MockStatistic* io = new MockStatistic(); - SrsAutoFree(MockStatistic, io); - - SrsNetworkDelta* conn = new SrsNetworkDelta(); - SrsAutoFree(SrsNetworkDelta, conn); - conn->set_io(io, io); - - // No data. - ISrsKbpsDelta* delta = dynamic_cast(conn); - int64_t in, out; - delta->remark(&in, &out); - EXPECT_EQ(0, in); - EXPECT_EQ(0, out); - - // 800kb. - io->set_in(100 * 1000)->set_out(100 * 1000); - delta->remark(&in, &out); - EXPECT_EQ(100 * 1000, in); - EXPECT_EQ(100 * 1000, out); - - // No data. - delta->remark(&in, &out); - EXPECT_EQ(0, in); - EXPECT_EQ(0, out); - } - - if (true) { - MockWallClock* clock = new MockWallClock(); - SrsAutoFree(MockWallClock, clock); - MockStatistic* io = new MockStatistic(); - SrsAutoFree(MockStatistic, io); - - SrsNetworkDelta* conn = new SrsNetworkDelta(); - SrsAutoFree(SrsNetworkDelta, conn); - conn->set_io(io, io); - - // No data. - ISrsKbpsDelta* delta = dynamic_cast(conn); - int64_t in, out; - delta->remark(&in, &out); - EXPECT_EQ(0, in); - EXPECT_EQ(0, out); - - // 800kb. - io->set_in(100 * 1000)->set_out(100 * 1000); - delta->remark(&in, &out); - EXPECT_EQ(100 * 1000, in); - EXPECT_EQ(100 * 1000, out); - - // Kbps without io, gather delta. - SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); - SrsAutoFree(SrsKbps, kbps); - kbps->set_io(NULL, NULL); - - // No data, 0kbps. - kbps->sample(); - - EXPECT_EQ(0, kbps->get_recv_kbps()); - EXPECT_EQ(0, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(0, kbps->get_send_kbps()); - EXPECT_EQ(0, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - - // 800kbps in 30s. - clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); - kbps->add_delta(30 * in, 30 * out); - kbps->sample(); - - EXPECT_EQ(800, kbps->get_recv_kbps()); - EXPECT_EQ(800, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(800, kbps->get_send_kbps()); - EXPECT_EQ(800, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - } -} - -VOID TEST(ProtocolKbpsTest, RAWStatistic) -{ - if (true) { - MockWallClock* clock = new MockWallClock(); - SrsAutoFree(MockWallClock, clock); - MockStatistic* io = new MockStatistic(); - SrsAutoFree(MockStatistic, io); - - SrsKbps* conn = new SrsKbps(clock->set_clock(0)); - SrsAutoFree(SrsKbps, conn); - conn->set_io(io, io); - - SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); - SrsAutoFree(SrsKbps, kbps); - kbps->set_io(conn, conn); - - // No data, 0kbps. - kbps->sample(); - - EXPECT_EQ(0, kbps->get_recv_kbps()); - EXPECT_EQ(0, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(0, kbps->get_send_kbps()); - EXPECT_EQ(0, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - - // 800kbps in 30s. - clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); - io->set_out(30 * 100 * 1000); - kbps->sample(); - - EXPECT_EQ(0, kbps->get_recv_kbps()); - EXPECT_EQ(0, kbps->get_recv_kbps_30s()); - EXPECT_EQ(0, kbps->get_recv_kbps_5m()); - - EXPECT_EQ(800, kbps->get_send_kbps()); - EXPECT_EQ(800, kbps->get_send_kbps_30s()); - EXPECT_EQ(0, kbps->get_send_kbps_5m()); - } - - if (true) { - MockWallClock* clock = new MockWallClock(); - SrsAutoFree(MockWallClock, clock); - - SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); - SrsAutoFree(SrsKbps, kbps); - - // No io, no data. - kbps->set_io(NULL, NULL); - EXPECT_EQ(0, kbps->get_recv_bytes()); - EXPECT_EQ(0, kbps->get_send_bytes()); - - // With io, zero data. - MockStatistic* io = new MockStatistic(); - SrsAutoFree(MockStatistic, io); - - kbps->set_io(io, io); - EXPECT_EQ(0, kbps->get_recv_bytes()); - EXPECT_EQ(0, kbps->get_send_bytes()); - - // With io with data. - io->set_in(100 * 1000)->set_out(100 * 1000); - EXPECT_EQ(100 * 1000, kbps->get_recv_bytes()); - EXPECT_EQ(100 * 1000, kbps->get_send_bytes()); - - // No io, cached data. - kbps->set_io(NULL, NULL); - EXPECT_EQ(100 * 1000, kbps->get_recv_bytes()); - EXPECT_EQ(100 * 1000, kbps->get_send_bytes()); - - // Use the same IO, but as a fresh io. - kbps->set_io(io, io); - EXPECT_EQ(100 * 1000, kbps->get_recv_bytes()); - EXPECT_EQ(100 * 1000, kbps->get_send_bytes()); - - io->set_in(150 * 1000)->set_out(150 * 1000); - EXPECT_EQ(150 * 1000, kbps->get_recv_bytes()); - EXPECT_EQ(150 * 1000, kbps->get_send_bytes()); - - // No io, cached data. - kbps->set_io(NULL, NULL); - EXPECT_EQ(150 * 1000, kbps->get_recv_bytes()); - EXPECT_EQ(150 * 1000, kbps->get_send_bytes()); - } -} - -VOID TEST(ProtocolKbpsTest, WriteLargeIOVs) -{ - srs_error_t err; - - if (true) { - iovec iovs[1]; - iovs[0].iov_base = (char*)"Hello"; - iovs[0].iov_len = 5; - - MockBufferIO io; - ssize_t nn = 0; - HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, 1, &nn)); - EXPECT_EQ(5, nn); - EXPECT_EQ(5, io.sbytes); - } - - if (true) { - iovec iovs[1024]; - int nn_iovs = (int)(sizeof(iovs)/sizeof(iovec)); - for (int i = 0; i < nn_iovs; i++) { - iovs[i].iov_base = (char*)"Hello"; - iovs[i].iov_len = 5; - } - - MockBufferIO io; - ssize_t nn = 0; - HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, nn_iovs, &nn)); - EXPECT_EQ(5 * nn_iovs, nn); - EXPECT_EQ(5 * nn_iovs, io.sbytes); - } - - if (true) { - iovec iovs[1025]; - int nn_iovs = (int)(sizeof(iovs)/sizeof(iovec)); - for (int i = 0; i < nn_iovs; i++) { - iovs[i].iov_base = (char*)"Hello"; - iovs[i].iov_len = 5; - } - - MockBufferIO io; - ssize_t nn = 0; - HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, nn_iovs, &nn)); - EXPECT_EQ(5 * nn_iovs, nn); - EXPECT_EQ(5 * nn_iovs, io.sbytes); - } - - if (true) { - iovec iovs[4096]; - int nn_iovs = (int)(sizeof(iovs)/sizeof(iovec)); - for (int i = 0; i < nn_iovs; i++) { - iovs[i].iov_base = (char*)"Hello"; - iovs[i].iov_len = 5; - } - - MockBufferIO io; - ssize_t nn = 0; - HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, nn_iovs, &nn)); - EXPECT_EQ(5 * nn_iovs, nn); - EXPECT_EQ(5 * nn_iovs, io.sbytes); - } -} - VOID TEST(ProtocolProtobufTest, VarintsSize) { EXPECT_EQ(1, SrsProtobufVarints::sizeof_varint( 0x00)); diff --git a/trunk/src/utest/srs_utest_protocol2.cpp b/trunk/src/utest/srs_utest_protocol2.cpp new file mode 100644 index 000000000..c1bca05ed --- /dev/null +++ b/trunk/src/utest/srs_utest_protocol2.cpp @@ -0,0 +1,707 @@ +// +// Copyright (c) 2013-2022 The SRS Authors +// +// SPDX-License-Identifier: MIT or MulanPSL-2.0 +// +#include + +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +VOID TEST(ProtocolKbpsTest, Connections) +{ + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); + SrsAutoFree(SrsKbps, kbps); + + SrsNetworkDelta* delta = new SrsNetworkDelta(); + SrsAutoFree(SrsNetworkDelta, delta); + delta->set_io(io, io); + + // No data, 0kbps. + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_in(30 * 100 * 1000)->set_out(30 * 100 * 1000); + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 300s. + clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_in(330 * 100 * 1000)->set_out(330 * 100 * 1000); + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(800, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(800, kbps->get_send_kbps_5m()); + } + + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); + SrsAutoFree(SrsKbps, kbps); + + SrsNetworkDelta* delta = new SrsNetworkDelta(); + SrsAutoFree(SrsNetworkDelta, delta); + delta->set_io(io, io); + + // No data, 0kbps. + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_in(30 * 100 * 1000); + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 300s. + clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_in(330 * 100 * 1000); + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(800, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + } + + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); + SrsAutoFree(SrsKbps, kbps); + + SrsNetworkDelta* delta = new SrsNetworkDelta(); + SrsAutoFree(SrsNetworkDelta, delta); + delta->set_io(io, io); + + // No data, 0kbps. + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_out(30 * 100 * 1000); + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 300s. + clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_out(330 * 100 * 1000); + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(800, kbps->get_send_kbps_5m()); + } +} + +VOID TEST(ProtocolKbpsTest, Delta) +{ + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsNetworkDelta* delta = new SrsNetworkDelta(); + SrsAutoFree(SrsNetworkDelta, delta); + delta->set_io(io, io); + + // No data. + int64_t in, out; + delta->remark(&in, &out); + EXPECT_EQ(0, in); + EXPECT_EQ(0, out); + + // 800kb. + io->set_in(100 * 1000)->set_out(100 * 1000); + delta->remark(&in, &out); + EXPECT_EQ(100 * 1000, in); + EXPECT_EQ(100 * 1000, out); + + // No data. + delta->remark(&in, &out); + EXPECT_EQ(0, in); + EXPECT_EQ(0, out); + } + + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsNetworkDelta* delta = new SrsNetworkDelta(); + SrsAutoFree(SrsNetworkDelta, delta); + delta->set_io(io, io); + + // No data. + int64_t in, out; + delta->remark(&in, &out); + EXPECT_EQ(0, in); + EXPECT_EQ(0, out); + + // 800kb. + io->set_in(100 * 1000)->set_out(100 * 1000); + delta->remark(&in, &out); + EXPECT_EQ(100 * 1000, in); + EXPECT_EQ(100 * 1000, out); + + // Kbps without io, gather delta. + SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); + SrsAutoFree(SrsKbps, kbps); + + // No data, 0kbps. + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + kbps->add_delta(30 * in, 30 * out); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + } +} + +VOID TEST(ProtocolKbpsTest, RAWStatistic) +{ + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsNetworkDelta* delta = new SrsNetworkDelta(); + SrsAutoFree(SrsNetworkDelta, delta); + delta->set_io(io, io); + + SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); + SrsAutoFree(SrsKbps, kbps); + + // No data, 0kbps. + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_out(30 * 100 * 1000); + kbps->add_delta(delta); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + } + + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + + SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); + SrsAutoFree(SrsKbps, kbps); + + // No io, no data. + EXPECT_EQ(0, kbps->get_recv_bytes()); + EXPECT_EQ(0, kbps->get_send_bytes()); + + // With io, zero data. + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsNetworkDelta* delta = new SrsNetworkDelta(); + SrsAutoFree(SrsNetworkDelta, delta); + delta->set_io(io, io); + + kbps->add_delta(delta); + kbps->sample(); + EXPECT_EQ(0, kbps->get_recv_bytes()); + EXPECT_EQ(0, kbps->get_send_bytes()); + + // With io with data. + io->set_in(100 * 1000)->set_out(100 * 1000); + kbps->add_delta(delta); + kbps->sample(); + EXPECT_EQ(100 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(100 * 1000, kbps->get_send_bytes()); + + // No io, cached data. + delta->set_io(NULL, NULL); + kbps->add_delta(delta); + kbps->sample(); + EXPECT_EQ(100 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(100 * 1000, kbps->get_send_bytes()); + + // Use the same IO, but as a fresh io. + delta->set_io(io, io); + kbps->add_delta(delta); + kbps->sample(); + EXPECT_EQ(200 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(200 * 1000, kbps->get_send_bytes()); + + io->set_in(150 * 1000)->set_out(150 * 1000); + kbps->add_delta(delta); + kbps->sample(); + EXPECT_EQ(250 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(250 * 1000, kbps->get_send_bytes()); + + // No io, cached data. + delta->set_io(NULL, NULL); + kbps->add_delta(delta); + kbps->sample(); + EXPECT_EQ(250 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(250 * 1000, kbps->get_send_bytes()); + } +} + +VOID TEST(ProtocolKbpsTest, WriteLargeIOVs) +{ + srs_error_t err; + + if (true) { + iovec iovs[1]; + iovs[0].iov_base = (char*)"Hello"; + iovs[0].iov_len = 5; + + MockBufferIO io; + ssize_t nn = 0; + HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, 1, &nn)); + EXPECT_EQ(5, nn); + EXPECT_EQ(5, io.sbytes); + } + + if (true) { + iovec iovs[1024]; + int nn_iovs = (int)(sizeof(iovs)/sizeof(iovec)); + for (int i = 0; i < nn_iovs; i++) { + iovs[i].iov_base = (char*)"Hello"; + iovs[i].iov_len = 5; + } + + MockBufferIO io; + ssize_t nn = 0; + HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, nn_iovs, &nn)); + EXPECT_EQ(5 * nn_iovs, nn); + EXPECT_EQ(5 * nn_iovs, io.sbytes); + } + + if (true) { + iovec iovs[1025]; + int nn_iovs = (int)(sizeof(iovs)/sizeof(iovec)); + for (int i = 0; i < nn_iovs; i++) { + iovs[i].iov_base = (char*)"Hello"; + iovs[i].iov_len = 5; + } + + MockBufferIO io; + ssize_t nn = 0; + HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, nn_iovs, &nn)); + EXPECT_EQ(5 * nn_iovs, nn); + EXPECT_EQ(5 * nn_iovs, io.sbytes); + } + + if (true) { + iovec iovs[4096]; + int nn_iovs = (int)(sizeof(iovs)/sizeof(iovec)); + for (int i = 0; i < nn_iovs; i++) { + iovs[i].iov_base = (char*)"Hello"; + iovs[i].iov_len = 5; + } + + MockBufferIO io; + ssize_t nn = 0; + HELPER_EXPECT_SUCCESS(srs_write_large_iovs(&io, iovs, nn_iovs, &nn)); + EXPECT_EQ(5 * nn_iovs, nn); + EXPECT_EQ(5 * nn_iovs, io.sbytes); + } +} + +VOID TEST(ProtocolKbpsTest, ConnectionsSugar) +{ + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsNetworkKbps* kbps = new SrsNetworkKbps(clock->set_clock(0)); + SrsAutoFree(SrsNetworkKbps, kbps); + kbps->set_io(io, io); + + // No data, 0kbps. + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_in(30 * 100 * 1000)->set_out(30 * 100 * 1000); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 300s. + clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_in(330 * 100 * 1000)->set_out(330 * 100 * 1000); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(800, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(800, kbps->get_send_kbps_5m()); + } + + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsNetworkKbps* kbps = new SrsNetworkKbps(clock->set_clock(0)); + SrsAutoFree(SrsNetworkKbps, kbps); + kbps->set_io(io, io); + + // No data, 0kbps. + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_in(30 * 100 * 1000); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 300s. + clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_in(330 * 100 * 1000); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(800, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + } + + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsNetworkKbps* kbps = new SrsNetworkKbps(clock->set_clock(0)); + SrsAutoFree(SrsNetworkKbps, kbps); + kbps->set_io(io, io); + + // No data, 0kbps. + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_out(30 * 100 * 1000); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 300s. + clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_out(330 * 100 * 1000); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(800, kbps->get_send_kbps_5m()); + } +} + +VOID TEST(ProtocolKbpsTest, DeltaSugar) +{ + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + // Kbps without io, gather delta. + SrsNetworkKbps* kbps = new SrsNetworkKbps(clock->set_clock(0)); + SrsAutoFree(SrsNetworkKbps, kbps); + kbps->set_io(io, io); + + // No data, 0kbps. + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_in(30 * 100 * 1000)->set_out(30 * 100 * 1000); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + } +} + +VOID TEST(ProtocolKbpsTest, RAWStatisticSugar) +{ + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsNetworkKbps* kbps = new SrsNetworkKbps(clock->set_clock(0)); + SrsAutoFree(SrsNetworkKbps, kbps); + kbps->set_io(io, io); + + // No data, 0kbps. + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS); + io->set_out(30 * 100 * 1000); + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + } + + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + + SrsNetworkKbps* kbps = new SrsNetworkKbps(clock->set_clock(0)); + SrsAutoFree(SrsNetworkKbps, kbps); + + // No io, no data. + EXPECT_EQ(0, kbps->get_recv_bytes()); + EXPECT_EQ(0, kbps->get_send_bytes()); + + // With io, zero data. + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + kbps->set_io(io, io); + + kbps->sample(); + EXPECT_EQ(0, kbps->get_recv_bytes()); + EXPECT_EQ(0, kbps->get_send_bytes()); + + // With io with data. + io->set_in(100 * 1000)->set_out(100 * 1000); + kbps->sample(); + EXPECT_EQ(100 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(100 * 1000, kbps->get_send_bytes()); + + // No io, cached data. + kbps->set_io(NULL, NULL); + kbps->sample(); + EXPECT_EQ(100 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(100 * 1000, kbps->get_send_bytes()); + + // Use the same IO, but as a fresh io. + kbps->set_io(io, io); + kbps->sample(); + EXPECT_EQ(200 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(200 * 1000, kbps->get_send_bytes()); + + io->set_in(150 * 1000)->set_out(150 * 1000); + kbps->sample(); + EXPECT_EQ(250 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(250 * 1000, kbps->get_send_bytes()); + + // No io, cached data. + kbps->set_io(NULL, NULL); + kbps->sample(); + EXPECT_EQ(250 * 1000, kbps->get_recv_bytes()); + EXPECT_EQ(250 * 1000, kbps->get_send_bytes()); + } +} + diff --git a/trunk/src/utest/srs_utest_protocol2.hpp b/trunk/src/utest/srs_utest_protocol2.hpp new file mode 100644 index 000000000..29289306e --- /dev/null +++ b/trunk/src/utest/srs_utest_protocol2.hpp @@ -0,0 +1,16 @@ +// +// Copyright (c) 2013-2022 The SRS Authors +// +// SPDX-License-Identifier: MIT or MulanPSL-2.0 +// + +#ifndef SRS_UTEST_PROTOCOL2_HPP +#define SRS_UTEST_PROTOCOL2_HPP + +/* +#include +*/ +#include + +#endif +