diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 88f93c11f..fc23b3b4c 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1820,14 +1820,11 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) nn_simulate_player_nack_drop = 0; pp_address_change = new SrsErrorPithyPrint(); pli_epp = new SrsErrorPithyPrint(); + delta_ = new SrsEphemeralDelta(); nack_enabled_ = false; timer_nack_ = new SrsRtcConnectionNackTimer(this); - clock_ = new SrsWallClock(); - kbps_ = new SrsKbps(clock_); - kbps_->set_io(NULL, NULL); - _srs_rtc_manager->subscribe(this); } @@ -1872,9 +1869,7 @@ SrsRtcConnection::~SrsRtcConnection() srs_freep(req_); srs_freep(pp_address_change); srs_freep(pli_epp); - - srs_freep(kbps_); - srs_freep(clock_); + srs_freep(delta_); } void SrsRtcConnection::on_before_dispose(ISrsResource* c) @@ -1952,7 +1947,7 @@ vector SrsRtcConnection::peer_addresses() void SrsRtcConnection::remark(int64_t* in, int64_t* out) { - kbps_->remark(in, out); + delta_->remark(in, out); } const SrsContextId& SrsRtcConnection::get_id() @@ -2110,7 +2105,7 @@ srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) srs_error_t err = srs_success; // Update stat when we received data. - kbps_->add_delta(skt->size(), 0); + delta_->add_delta(skt->size(), 0); if (!r->is_binding_request()) { return err; @@ -2135,7 +2130,7 @@ srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data) { // Update stat when we received data. - kbps_->add_delta(nb_data, 0); + delta_->add_delta(nb_data, 0); return transport_->on_dtls(data, nb_data); } @@ -2145,7 +2140,7 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data) srs_error_t err = srs_success; // Update stat when we received data. - kbps_->add_delta(nb_data, 0); + delta_->add_delta(nb_data, 0); int nb_unprotected_buf = nb_data; if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) { @@ -2286,7 +2281,7 @@ srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data) srs_error_t err = srs_success; // Update stat when we received data. - kbps_->add_delta(nb_data, 0); + delta_->add_delta(nb_data, 0); SrsRtcPublishStream* publisher = NULL; if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) { @@ -2485,7 +2480,7 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data) ++_srs_pps_srtcps->sugar; // Update stat when we sending data. - kbps_->add_delta(0, nb_data); + delta_->add_delta(0, nb_data); int nb_buf = nb_data; if ((err = transport_->protect_rtcp(data, &nb_buf)) != srs_success) { @@ -2692,7 +2687,7 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt) ++_srs_pps_srtps->sugar; // Update stat when we sending data. - kbps_->add_delta(0, iov->iov_len); + delta_->add_delta(0, iov->iov_len); // TODO: FIXME: Handle error. sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0); @@ -2766,7 +2761,7 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) } // Update stat when we sending data. - kbps_->add_delta(0, stream->pos()); + delta_->add_delta(0, stream->pos()); if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { return srs_error_wrap(err, "stun binding response send failed"); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 069a731b4..baf3b1c7e 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -51,8 +51,7 @@ class SrsStatistic; class SrsRtcUserConfig; class SrsRtcSendTrack; class SrsRtcPublishStream; -class SrsKbps; -class SrsWallClock; +class SrsEphemeralDelta; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -493,8 +492,7 @@ private: private: bool nack_enabled_; private: - SrsKbps* kbps_; - SrsWallClock* clock_; + SrsEphemeralDelta* delta_; public: SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid); virtual ~SrsRtcConnection(); diff --git a/trunk/src/protocol/srs_protocol_kbps.cpp b/trunk/src/protocol/srs_protocol_kbps.cpp index 4d46ae6d1..f667d41e7 100644 --- a/trunk/src/protocol/srs_protocol_kbps.cpp +++ b/trunk/src/protocol/srs_protocol_kbps.cpp @@ -68,6 +68,28 @@ ISrsKbpsDelta::~ISrsKbpsDelta() { } +SrsEphemeralDelta::SrsEphemeralDelta() +{ + in_ = out_ = 0; +} + +SrsEphemeralDelta::~SrsEphemeralDelta() +{ +} + +void SrsEphemeralDelta::add_delta(int64_t in, int64_t out) +{ + in_ += in; + out_ += out; +} + +void SrsEphemeralDelta::remark(int64_t* in, int64_t* out) +{ + if (in) *in = in_; + if (out) *out = out_; + in_ = out_ = 0; +} + SrsKbps::SrsKbps(SrsWallClock* c) : is(c), os(c) { clk = c; diff --git a/trunk/src/protocol/srs_protocol_kbps.hpp b/trunk/src/protocol/srs_protocol_kbps.hpp index 3fcc6b1fe..336bc54f8 100644 --- a/trunk/src/protocol/srs_protocol_kbps.hpp +++ b/trunk/src/protocol/srs_protocol_kbps.hpp @@ -84,6 +84,23 @@ public: virtual void remark(int64_t* in, int64_t* out) = 0; }; +// A delta data source for SrsKbps, used in ephemeral case, for example, UDP server to increase stat when received or +// sent out each UDP packet. +class SrsEphemeralDelta : public ISrsKbpsDelta +{ +private: + uint64_t in_; + uint64_t out_; +public: + SrsEphemeralDelta(); + virtual ~SrsEphemeralDelta(); +public: + virtual void add_delta(int64_t in, int64_t out); +// Interface ISrsKbpsDelta. +public: + virtual void remark(int64_t* in, int64_t* out); +}; + /** * to statistic the kbps of io. * itself can be a statistic source, for example, used for SRS bytes stat. diff --git a/trunk/src/utest/srs_utest_protocol.cpp b/trunk/src/utest/srs_utest_protocol.cpp index be24a46ee..bd2797cd5 100644 --- a/trunk/src/utest/srs_utest_protocol.cpp +++ b/trunk/src/utest/srs_utest_protocol.cpp @@ -6225,6 +6225,25 @@ VOID TEST(ProtocolKbpsTest, Connections) VOID TEST(ProtocolKbpsTest, Delta) { + if (true) { + SrsEphemeralDelta ed; + + ISrsKbpsDelta* delta = (ISrsKbpsDelta*)&ed; + int64_t in, out; + delta->remark(&in, &out); + EXPECT_EQ(0, in); + EXPECT_EQ(0, out); + + ed.add_delta(100 * 1000, 100 * 1000); + delta->remark(&in, &out); + EXPECT_EQ(100 * 1000, in); + EXPECT_EQ(100 * 1000, out); + + delta->remark(&in, &out); + EXPECT_EQ(0, in); + EXPECT_EQ(0, out); + } + if (true) { MockWallClock* clock = new MockWallClock(); SrsAutoFree(MockWallClock, clock);