From 6cdb08cc87a98333ea971da4c7a815a18ab8749e Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 18 Feb 2021 12:03:17 +0800 Subject: [PATCH] Copy 4.0.71 at 2021-02-18 --- trunk/src/app/srs_app_config.cpp | 4 +- trunk/src/app/srs_app_conn.cpp | 83 ++++- trunk/src/app/srs_app_conn.hpp | 30 +- trunk/src/app/srs_app_hourglass.cpp | 14 +- trunk/src/app/srs_app_hourglass.hpp | 16 +- trunk/src/app/srs_app_hybrid.cpp | 70 +++- trunk/src/app/srs_app_hybrid.hpp | 9 +- trunk/src/app/srs_app_listener.cpp | 131 ++++++-- trunk/src/app/srs_app_listener.hpp | 15 + trunk/src/app/srs_app_rtc_conn.cpp | 390 +++++++++++------------ trunk/src/app/srs_app_rtc_conn.hpp | 65 ++-- trunk/src/app/srs_app_rtc_dtls.cpp | 41 +-- trunk/src/app/srs_app_rtc_dtls.hpp | 17 +- trunk/src/app/srs_app_rtc_queue.cpp | 33 +- trunk/src/app/srs_app_rtc_queue.hpp | 3 + trunk/src/app/srs_app_rtc_server.cpp | 147 ++++++--- trunk/src/app/srs_app_rtc_server.hpp | 2 - trunk/src/app/srs_app_rtc_source.cpp | 80 ++++- trunk/src/app/srs_app_rtc_source.hpp | 17 +- trunk/src/app/srs_app_server.cpp | 288 ++++++++--------- trunk/src/app/srs_app_server.hpp | 15 +- trunk/src/app/srs_app_source.cpp | 37 ++- trunk/src/app/srs_app_source.hpp | 10 +- trunk/src/core/srs_core_version4.hpp | 2 +- trunk/src/kernel/srs_kernel_rtc_rtcp.cpp | 5 + trunk/src/kernel/srs_kernel_rtc_rtcp.hpp | 1 + trunk/src/kernel/srs_kernel_rtc_rtp.cpp | 186 +++++++++++ trunk/src/kernel/srs_kernel_rtc_rtp.hpp | 48 ++- trunk/src/protocol/srs_protocol_kbps.cpp | 91 +++++- trunk/src/protocol/srs_protocol_kbps.hpp | 56 +++- trunk/src/protocol/srs_service_st.cpp | 5 + trunk/src/protocol/srs_service_st.hpp | 1 + trunk/src/utest/srs_utest_app.cpp | 113 +++++++ trunk/src/utest/srs_utest_config.cpp | 2 +- 34 files changed, 1432 insertions(+), 595 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index bbd0ab46c..0d2575f3f 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -8282,7 +8282,7 @@ bool SrsConfig::get_heartbeat_enabled() srs_utime_t SrsConfig::get_heartbeat_interval() { - static srs_utime_t DEFAULT = (srs_utime_t)(9.9 * SRS_UTIME_SECONDS); + static srs_utime_t DEFAULT = (srs_utime_t)(10 * SRS_UTIME_SECONDS); SrsConfDirective* conf = get_heartbeart(); if (!conf) { @@ -8294,7 +8294,7 @@ srs_utime_t SrsConfig::get_heartbeat_interval() return DEFAULT; } - return (srs_utime_t)(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS); + return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS); } string SrsConfig::get_heartbeat_url() diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index c7a4d3a84..fffa224f2 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -36,6 +36,12 @@ using namespace std; #include #include +#include + +SrsPps* _srs_pps_ids = new SrsPps(_srs_clock); +SrsPps* _srs_pps_fids = new SrsPps(_srs_clock); +SrsPps* _srs_pps_fids_level0 = new SrsPps(_srs_clock); + ISrsDisposingHandler::ISrsDisposingHandler() { } @@ -52,6 +58,9 @@ SrsResourceManager::SrsResourceManager(const std::string& label, bool verbose) trd = NULL; p_disposing_ = NULL; removing_ = false; + + nn_level0_cache_ = 100000; + conns_level0_cache_ = new SrsResourceFastIdItem[nn_level0_cache_]; } SrsResourceManager::~SrsResourceManager() @@ -65,6 +74,8 @@ SrsResourceManager::~SrsResourceManager() } clear(); + + srs_freepa(conns_level0_cache_); } srs_error_t SrsResourceManager::start() @@ -114,10 +125,14 @@ srs_error_t SrsResourceManager::cycle() return err; } -void SrsResourceManager::add(ISrsResource* conn) +void SrsResourceManager::add(ISrsResource* conn, bool* exists) { if (std::find(conns_.begin(), conns_.end(), conn) == conns_.end()) { conns_.push_back(conn); + } else { + if (exists) { + *exists = false; + } } } @@ -127,6 +142,39 @@ void SrsResourceManager::add_with_id(const std::string& id, ISrsResource* conn) conns_id_[id] = conn; } +void SrsResourceManager::add_with_fast_id(uint64_t id, ISrsResource* conn) +{ + bool exists = false; + add(conn, &exists); + conns_fast_id_[id] = conn; + + if (exists) { + return; + } + + // For new resource, build the level-0 cache for fast-id. + SrsResourceFastIdItem* item = &conns_level0_cache_[(id | id>>32) % nn_level0_cache_]; + + // Ignore if exits item. + if (item->fast_id && item->fast_id == id) { + return; + } + + // Fresh one, create the item. + if (!item->fast_id) { + item->fast_id = id; + item->impl = conn; + item->nn_collisions = 1; + item->available = true; + } + + // Collision, increase the collisions. + if (item->fast_id != id) { + item->nn_collisions++; + item->available = false; + } +} + void SrsResourceManager::add_with_name(const std::string& name, ISrsResource* conn) { add(conn); @@ -140,12 +188,27 @@ ISrsResource* SrsResourceManager::at(int index) ISrsResource* SrsResourceManager::find_by_id(std::string id) { + ++_srs_pps_ids->sugar; map::iterator it = conns_id_.find(id); return (it != conns_id_.end())? it->second : NULL; } +ISrsResource* SrsResourceManager::find_by_fast_id(uint64_t id) +{ + SrsResourceFastIdItem* item = &conns_level0_cache_[(id | id>>32) % nn_level0_cache_]; + if (item->available && item->fast_id == id) { + ++_srs_pps_fids_level0->sugar; + return item->impl; + } + + ++_srs_pps_fids->sugar; + map::iterator it = conns_fast_id_.find(id); + return (it != conns_fast_id_.end())? it->second : NULL; +} + ISrsResource* SrsResourceManager::find_by_name(std::string name) { + ++_srs_pps_ids->sugar; map::iterator it = conns_name_.find(name); return (it != conns_name_.end())? it->second : NULL; } @@ -316,6 +379,24 @@ void SrsResourceManager::dispose(ISrsResource* c) } } + for (map::iterator it = conns_fast_id_.begin(); it != conns_fast_id_.end();) { + if (c != it->second) { + ++it; + } else { + // Update the level-0 cache for fast-id. + uint64_t id = it->first; + SrsResourceFastIdItem* item = &conns_level0_cache_[(id | id>>32) % nn_level0_cache_]; + item->nn_collisions--; + if (!item->nn_collisions) { + item->fast_id = 0; + item->available = false; + } + + // Use C++98 style: https://stackoverflow.com/a/4636230 + conns_fast_id_.erase(it++); + } + } + vector::iterator it = std::find(conns_.begin(), conns_.end(), c); if (it != conns_.end()) { conns_.erase(it); diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index c61bbefbe..db10ee423 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -52,6 +52,27 @@ public: virtual void on_disposing(ISrsResource* c) = 0; }; +// The item to identify the fast id object. +class SrsResourceFastIdItem +{ +public: + // If available, use the resource in item. + bool available; + // How many resource have the same fast-id, which contribute a collision. + int nn_collisions; + // The first fast-id of resources. + uint64_t fast_id; + // The first resource object. + ISrsResource* impl; +public: + SrsResourceFastIdItem() { + available = false; + nn_collisions = 0; + fast_id = 0; + impl = NULL; + } +}; + // The resource manager remove resource and delete it asynchronously. class SrsResourceManager : virtual public ISrsCoroutineHandler, virtual public ISrsResourceManager { @@ -76,6 +97,11 @@ private: std::vector conns_; // The connections with resource id. std::map conns_id_; + // The connections with resource fast(int) id. + std::map conns_fast_id_; + // The level-0 fast cache for fast id. + int nn_level0_cache_; + SrsResourceFastIdItem* conns_level0_cache_; // The connections with resource name. std::map conns_name_; public: @@ -89,11 +115,13 @@ public: public: virtual srs_error_t cycle(); public: - void add(ISrsResource* conn); + void add(ISrsResource* conn, bool* exists = NULL); void add_with_id(const std::string& id, ISrsResource* conn); + void add_with_fast_id(uint64_t id, ISrsResource* conn); void add_with_name(const std::string& name, ISrsResource* conn); ISrsResource* at(int index); ISrsResource* find_by_id(std::string id); + ISrsResource* find_by_fast_id(uint64_t id); ISrsResource* find_by_name(std::string name); public: void subscribe(ISrsDisposingHandler* h); diff --git a/trunk/src/app/srs_app_hourglass.cpp b/trunk/src/app/srs_app_hourglass.cpp index 24cec52e4..f899a8b6b 100644 --- a/trunk/src/app/srs_app_hourglass.cpp +++ b/trunk/src/app/srs_app_hourglass.cpp @@ -28,6 +28,10 @@ using namespace std; #include #include +#include + +SrsPps* _srs_pps_timer = new SrsPps(_srs_clock); + ISrsHourGlass::ISrsHourGlass() { } @@ -36,8 +40,9 @@ ISrsHourGlass::~ISrsHourGlass() { } -SrsHourGlass::SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution) +SrsHourGlass::SrsHourGlass(string label, ISrsHourGlass* h, srs_utime_t resolution) { + label_ = label; handler = h; _resolution = resolution; total_elapse = 0; @@ -60,6 +65,11 @@ srs_error_t SrsHourGlass::start() return err; } +void SrsHourGlass::stop() +{ + trd->stop(); +} + srs_error_t SrsHourGlass::tick(srs_utime_t interval) { return tick(0, interval); @@ -94,6 +104,8 @@ srs_error_t SrsHourGlass::cycle() srs_utime_t interval = it->second; if (interval == 0 || (total_elapse % interval) == 0) { + ++_srs_pps_timer->sugar; + if ((err = handler->notify(event, interval, total_elapse)) != srs_success) { return srs_error_wrap(err, "notify"); } diff --git a/trunk/src/app/srs_app_hourglass.hpp b/trunk/src/app/srs_app_hourglass.hpp index aeb6893b5..a9f781268 100644 --- a/trunk/src/app/srs_app_hourglass.hpp +++ b/trunk/src/app/srs_app_hourglass.hpp @@ -29,6 +29,7 @@ #include #include +#include class SrsCoroutine; @@ -59,17 +60,18 @@ public: // It's a complex and high-performance timer. // // Usage: -// SrsHourGlass* hg = new SrsHourGlass(handler, 1 * SRS_UTIME_MILLISECONDS); +// SrsHourGlass* hg = new SrsHourGlass("nack", handler, 100 * SRS_UTIME_MILLISECONDS); // -// hg->tick(1, 3 * SRS_UTIME_MILLISECONDS); -// hg->tick(2, 5 * SRS_UTIME_MILLISECONDS); -// hg->tick(3, 7 * SRS_UTIME_MILLISECONDS); +// hg->tick(1, 300 * SRS_UTIME_MILLISECONDS); +// hg->tick(2, 500 * SRS_UTIME_MILLISECONDS); +// hg->tick(3, 700 * SRS_UTIME_MILLISECONDS); // // // The hg will create a thread for timer. // hg->start(); class SrsHourGlass : virtual public ISrsCoroutineHandler { private: + std::string label_; SrsCoroutine* trd; ISrsHourGlass* handler; srs_utime_t _resolution; @@ -81,11 +83,13 @@ private: // for each cycle, we increase it with a resolution. srs_utime_t total_elapse; public: - SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution); + // TODO: FIMXE: Refine to SrsHourGlass(std::string label); + SrsHourGlass(std::string label, ISrsHourGlass* h, srs_utime_t resolution); virtual ~SrsHourGlass(); public: - // Start the hourglass. + // Start or stop the hourglass. virtual srs_error_t start(); + virtual void stop(); public: // Add a pair of tick(event, interval). // @param event the event of tick, default is 0. diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index a43e08800..8f7300215 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -27,9 +27,18 @@ #include #include #include +#include using namespace std; +extern SrsPps* _srs_pps_cids_get; +extern SrsPps* _srs_pps_cids_set; + +extern SrsPps* _srs_pps_timer; +extern SrsPps* _srs_pps_pub; +extern SrsPps* _srs_pps_conn; +extern SrsPps* _srs_pps_dispose; + ISrsHybridServer::ISrsHybridServer() { } @@ -91,8 +100,8 @@ srs_error_t SrsServerAdapter::run() return srs_error_wrap(err, "ingest"); } - if ((err = srs->cycle()) != srs_success) { - return srs_error_wrap(err, "main cycle"); + if ((err = srs->start()) != srs_success) { + return srs_error_wrap(err, "start"); } return err; @@ -109,6 +118,7 @@ SrsServer* SrsServerAdapter::instance() SrsHybridServer::SrsHybridServer() { + timer_ = NULL; } SrsHybridServer::~SrsHybridServer() @@ -135,6 +145,10 @@ srs_error_t SrsHybridServer::initialize() return srs_error_wrap(err, "initialize st failed"); } + if ((err = setup_ticks()) != srs_success) { + return srs_error_wrap(err, "tick"); + } + vector::iterator it; for (it = servers.begin(); it != servers.end(); ++it) { ISrsHybridServer* server = *it; @@ -151,29 +165,17 @@ srs_error_t SrsHybridServer::run() { srs_error_t err = srs_success; - // TODO: FIXME: Identify master server directly. - // Run master server in this main thread. - SrsServerAdapter* master_server = NULL; - vector::iterator it; for (it = servers.begin(); it != servers.end(); ++it) { ISrsHybridServer* server = *it; - if (!master_server) { - master_server = dynamic_cast(server); - if (master_server) { - continue; - } - } - if ((err = server->run()) != srs_success) { return srs_error_wrap(err, "run server"); } } - if (master_server) { - return master_server->run(); - } + // Wait for all server to quit. + srs_thread_exit(NULL); return err; } @@ -197,5 +199,41 @@ SrsServerAdapter* SrsHybridServer::srs() return NULL; } +srs_error_t SrsHybridServer::setup_ticks() +{ + srs_error_t err = srs_success; + + timer_ = new SrsHourGlass("hybrid", this, 1 * SRS_UTIME_SECONDS); + + if ((err = timer_->tick(1, 5 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + + if ((err = timer_->start()) != srs_success) { + return srs_error_wrap(err, "start"); + } + + return err; +} + +srs_error_t SrsHybridServer::notify(int event, srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + // Show statistics for RTC server. + SrsProcSelfStat* u = srs_get_self_proc_stat(); + // Resident Set Size: number of pages the process has in real memory. + int memory = (int)(u->rss * 4 / 1024); + + _srs_pps_timer->update(); _srs_pps_pub->update(); _srs_pps_conn->update(); + + srs_trace("Hybrid cpu=%.2f%%,%dMB, timer=%d,%d,%d", + u->percent * 100, memory, + _srs_pps_timer->r10s(), _srs_pps_pub->r10s(), _srs_pps_conn->r10s() + ); + + return err; +} + SrsHybridServer* _srs_hybrid = new SrsHybridServer(); diff --git a/trunk/src/app/srs_app_hybrid.hpp b/trunk/src/app/srs_app_hybrid.hpp index 0f995afe4..7e988fe26 100644 --- a/trunk/src/app/srs_app_hybrid.hpp +++ b/trunk/src/app/srs_app_hybrid.hpp @@ -28,6 +28,8 @@ #include +#include + class SrsServer; // The hibrid server interfaces, we could register many servers. @@ -62,10 +64,11 @@ public: }; // The hybrid server manager. -class SrsHybridServer +class SrsHybridServer : public ISrsHourGlass { private: std::vector servers; + SrsHourGlass* timer_; public: SrsHybridServer(); virtual ~SrsHybridServer(); @@ -77,6 +80,10 @@ public: virtual void stop(); public: virtual SrsServerAdapter* srs(); +// interface ISrsHourGlass +private: + virtual srs_error_t setup_ticks(); + virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); }; extern SrsHybridServer* _srs_hybrid; diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index f2525057b..a07ddc7cb 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -41,6 +41,13 @@ using namespace std; #include #include #include +#include + +#include + +SrsPps* _srs_pps_pkts = new SrsPps(_srs_clock); +SrsPps* _srs_pps_addrs = new SrsPps(_srs_clock); +SrsPps* _srs_pps_fast_addrs = new SrsPps(_srs_clock); // set the max packet size. #define SRS_UDP_MAX_PACKET_SIZE 65535 @@ -293,17 +300,29 @@ SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd) fromlen = 0; peer_port = 0; + + fast_id_ = 0; + address_changed_ = false; + cache_buffer_ = new SrsBuffer(buf, nb_buf); } SrsUdpMuxSocket::~SrsUdpMuxSocket() { srs_freepa(buf); + srs_freep(cache_buffer_); } int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout) { fromlen = sizeof(from); nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &fromlen, timeout); + if (nread <= 0) { + return nread; + } + + // Reset the fast cache buffer size. + cache_buffer_->set_size(nread); + cache_buffer_->skip(-1 * cache_buffer_->pos()); // Drop UDP health check packet of Aliyun SLB. // Healthcheck udp check @@ -313,21 +332,18 @@ int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout) return 0; } - if (nread > 0) { - // TODO: FIXME: Maybe we should not covert to string for each packet. - char address_string[64]; - char port_string[16]; - if (getnameinfo((sockaddr*)&from, fromlen, - (char*)&address_string, sizeof(address_string), - (char*)&port_string, sizeof(port_string), - NI_NUMERICHOST|NI_NUMERICSERV)) { - return -1; - } - - peer_ip = std::string(address_string); - peer_port = atoi(port_string); + // Parse address from cache. + if (from.ss_family == AF_INET) { + sockaddr_in* addr = (sockaddr_in*)&from; + fast_id_ = uint64_t(addr->sin_port)<<48 | uint64_t(addr->sin_addr.s_addr); } + // We will regenerate the peer_ip, peer_port and peer_id. + address_changed_ = true; + + // Update the stat. + ++_srs_pps_pkts->sugar; + return nread; } @@ -385,10 +401,63 @@ int SrsUdpMuxSocket::get_peer_port() const std::string SrsUdpMuxSocket::peer_id() { - char id_buf[1024]; - int len = snprintf(id_buf, sizeof(id_buf), "%s:%d", peer_ip.c_str(), peer_port); + if (address_changed_) { + address_changed_ = false; + + // Parse address from cache. + bool parsed = false; + if (from.ss_family == AF_INET) { + sockaddr_in* addr = (sockaddr_in*)&from; + + // Load from fast cache, previous ip. + std::map::iterator it = cache_.find(addr->sin_addr.s_addr); + if (it == cache_.end()) { + peer_ip = inet_ntoa(addr->sin_addr); + cache_[addr->sin_addr.s_addr] = peer_ip; + } else { + peer_ip = it->second; + } + + peer_port = ntohs(addr->sin_port); + parsed = true; + } + + if (!parsed) { + // TODO: FIXME: Maybe we should not covert to string for each packet. + char address_string[64]; + char port_string[16]; + if (getnameinfo((sockaddr*)&from, fromlen, + (char*)&address_string, sizeof(address_string), + (char*)&port_string, sizeof(port_string), + NI_NUMERICHOST|NI_NUMERICSERV)) { + return ""; + } + + peer_ip = std::string(address_string); + peer_port = atoi(port_string); + } + + // Build the peer id. + static char id_buf[128]; + int len = snprintf(id_buf, sizeof(id_buf), "%s:%d", peer_ip.c_str(), peer_port); + peer_id_ = string(id_buf, len); + + // Update the stat. + ++_srs_pps_addrs->sugar; + } + + return peer_id_; +} + +uint64_t SrsUdpMuxSocket::fast_id() +{ + ++_srs_pps_fast_addrs->sugar; + return fast_id_; +} - return string(id_buf, len); +SrsBuffer* SrsUdpMuxSocket::buffer() +{ + return cache_buffer_; } SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly() @@ -405,6 +474,11 @@ SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly() sendonly->peer_ip = peer_ip; sendonly->peer_port = peer_port; + // Copy the fast id. + sendonly->peer_id_ = peer_id_; + sendonly->fast_id_ = fast_id_; + sendonly->address_changed_ = address_changed_; + return sendonly; } @@ -514,6 +588,11 @@ srs_error_t SrsUdpMuxListener::cycle() SrsAutoFree(SrsErrorPithyPrint, pp_pkt_handler_err); set_socket_buffer(); + + // Because we have to decrypt the cipher of received packet payload, + // and the size is not determined, so we think there is at least one copy, + // and we can reuse the plaintext h264/opus with players when got plaintext. + SrsUdpMuxSocket skt(lfd); while (true) { if ((err = trd->pull()) != srs_success) { @@ -522,12 +601,6 @@ srs_error_t SrsUdpMuxListener::cycle() nn_loop++; - // TODO: FIXME: Refactor the memory cache for receiver. - // Because we have to decrypt the cipher of received packet payload, - // and the size is not determined, so we think there is at least one copy, - // and we can reuse the plaintext h264/opus with players when got plaintext. - SrsUdpMuxSocket skt(lfd); - int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT); if (nread <= 0) { if (nread < 0) { @@ -540,15 +613,16 @@ srs_error_t SrsUdpMuxListener::cycle() nn_msgs++; nn_msgs_stage++; - // Restore context when packets processed. - if (true) { - SrsContextRestore(cid); - err = handler->on_udp_packet(&skt); - } + // Handle the UDP packet. + err = handler->on_udp_packet(&skt); + // Use pithy print to show more smart information. if (err != srs_success) { uint32_t nn = 0; if (pp_pkt_handler_err->can_print(err, &nn)) { + // For performance, only restore context when output log. + _srs_context->set_id(cid); + // Append more information. err = srs_error_wrap(err, "size=%u, data=[%s]", skt.size(), srs_string_dumps_hex(skt.data(), skt.size(), 8).c_str()); srs_warn("handle udp pkt, count=%u/%u, err: %s", pp_pkt_handler_err->nn_count, nn, srs_error_desc(err).c_str()); @@ -558,6 +632,9 @@ srs_error_t SrsUdpMuxListener::cycle() pprint->elapse(); if (pprint->can_print()) { + // For performance, only restore context when output log. + _srs_context->set_id(cid); + int pps_average = 0; int pps_last = 0; if (true) { if (srs_get_system_time() > srs_get_system_startup_time()) { diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 758c09e73..53d0f92c0 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -29,12 +29,14 @@ #include #include +#include #include #include struct sockaddr; +class SrsBuffer; class SrsUdpMuxSocket; // The udp packet handler. @@ -135,6 +137,9 @@ public: // TODO: FIXME: Rename it. Refine it for performance issue. class SrsUdpMuxSocket { +private: + std::map cache_; + SrsBuffer* cache_buffer_; private: char* buf; int nb_buf; @@ -142,8 +147,16 @@ private: srs_netfd_t lfd; sockaddr_storage from; int fromlen; +private: std::string peer_ip; int peer_port; +private: + // Cache for peer id. + std::string peer_id_; + // If the address changed, we should generate the peer_id. + bool address_changed_; + // For IPv4 client, we use 8 bytes int id to find it fastly. + uint64_t fast_id_; public: SrsUdpMuxSocket(srs_netfd_t fd); virtual ~SrsUdpMuxSocket(); @@ -158,6 +171,8 @@ public: std::string get_peer_ip() const; int get_peer_port() const; std::string peer_id(); + uint64_t fast_id(); + SrsBuffer* buffer(); SrsUdpMuxSocket* copy_sendonly(); }; diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 0131ae978..4a7558b72 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -58,8 +58,23 @@ using namespace std; #include #include +#include + +SrsPps* _srs_pps_pli = new SrsPps(_srs_clock); +SrsPps* _srs_pps_twcc = new SrsPps(_srs_clock); +SrsPps* _srs_pps_rr = new SrsPps(_srs_clock); +SrsPps* _srs_pps_pub = new SrsPps(_srs_clock); +SrsPps* _srs_pps_conn = new SrsPps(_srs_clock); + +extern SrsPps* _srs_pps_snack; +extern SrsPps* _srs_pps_snack2; + +extern SrsPps* _srs_pps_rnack; +extern SrsPps* _srs_pps_rnack2; + #define SRS_TICKID_RTCP 0 -#define SRS_TICKID_TWCC 2 +#define SRS_TICKID_TWCC 1 +#define SRS_TICKID_SEND_NACKS 2 ISrsRtcTransport::ISrsRtcTransport() { @@ -170,30 +185,24 @@ srs_error_t SrsSecurityTransport::srtp_initialize() return err; } -srs_error_t SrsSecurityTransport::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher) +srs_error_t SrsSecurityTransport::protect_rtp(void* packet, int* nb_cipher) { - return srtp_->protect_rtp(plaintext, cipher, nb_cipher); + return srtp_->protect_rtp(packet, nb_cipher); } -srs_error_t SrsSecurityTransport::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher) +srs_error_t SrsSecurityTransport::protect_rtcp(void* packet, int* nb_cipher) { - return srtp_->protect_rtcp(plaintext, cipher, nb_cipher); + return srtp_->protect_rtcp(packet, nb_cipher); } -// TODO: FIXME: Merge with protect_rtp. -srs_error_t SrsSecurityTransport::protect_rtp2(void* rtp_hdr, int* len_ptr) +srs_error_t SrsSecurityTransport::unprotect_rtp(void* packet, int* nb_plaintext) { - return srtp_->protect_rtp2(rtp_hdr, len_ptr); + return srtp_->unprotect_rtp(packet, nb_plaintext); } -srs_error_t SrsSecurityTransport::unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext) +srs_error_t SrsSecurityTransport::unprotect_rtcp(void* packet, int* nb_plaintext) { - return srtp_->unprotect_rtp(cipher, plaintext, nb_plaintext); -} - -srs_error_t SrsSecurityTransport::unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext) -{ - return srtp_->unprotect_rtcp(cipher, plaintext, nb_plaintext); + return srtp_->unprotect_rtcp(packet, nb_plaintext); } SrsSemiSecurityTransport::SrsSemiSecurityTransport(SrsRtcConnection* s) : SrsSecurityTransport(s) @@ -204,17 +213,12 @@ SrsSemiSecurityTransport::~SrsSemiSecurityTransport() { } -srs_error_t SrsSemiSecurityTransport::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher) +srs_error_t SrsSemiSecurityTransport::protect_rtp(void* packet, int* nb_cipher) { return srs_success; } -srs_error_t SrsSemiSecurityTransport::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher) -{ - return srs_success; -} - -srs_error_t SrsSemiSecurityTransport::protect_rtp2(void* rtp_hdr, int* len_ptr) +srs_error_t SrsSemiSecurityTransport::protect_rtcp(void* packet, int* nb_cipher) { return srs_success; } @@ -264,32 +268,23 @@ srs_error_t SrsPlaintextTransport::write_dtls_data(void* data, int size) return srs_success; } -srs_error_t SrsPlaintextTransport::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher) -{ - memcpy(cipher, plaintext, nb_cipher); - return srs_success; -} - -srs_error_t SrsPlaintextTransport::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher) +srs_error_t SrsPlaintextTransport::protect_rtp(void* packet, int* nb_cipher) { - memcpy(cipher, plaintext, nb_cipher); return srs_success; } -srs_error_t SrsPlaintextTransport::protect_rtp2(void* rtp_hdr, int* len_ptr) +srs_error_t SrsPlaintextTransport::protect_rtcp(void* packet, int* nb_cipher) { return srs_success; } -srs_error_t SrsPlaintextTransport::unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext) +srs_error_t SrsPlaintextTransport::unprotect_rtp(void* packet, int* nb_plaintext) { - memcpy(plaintext, cipher, nb_plaintext); return srs_success; } -srs_error_t SrsPlaintextTransport::unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext) +srs_error_t SrsPlaintextTransport::unprotect_rtcp(void* packet, int* nb_plaintext) { - memcpy(plaintext, cipher, nb_plaintext); return srs_success; } @@ -351,6 +346,8 @@ srs_error_t SrsRtcPLIWorker::cycle() uint32_t ssrc = it->first; SrsContextId cid = it->second; + ++_srs_pps_pli->sugar; + if ((err = handler_->do_request_keyframe(ssrc, cid)) != srs_success) { srs_warn("PLI error, %s", srs_error_desc(err).c_str()); srs_error_reset(err); @@ -393,13 +390,18 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) nack_enabled_ = false; _srs_config->subscribe(this); - timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS); + timer_ = new SrsHourGlass("play", this, 1000 * SRS_UTIME_MILLISECONDS); nack_epp = new SrsErrorPithyPrint(); pli_worker_ = new SrsRtcPLIWorker(this); } SrsRtcPlayStream::~SrsRtcPlayStream() { + // TODO: FIXME: Should not do callback in de-constructor? + if (_srs_rtc_hijacker) { + _srs_rtc_hijacker->on_stop_play(session_, this, req_); + } + _srs_config->unsubscribe(this); srs_freep(nack_epp); @@ -670,53 +672,6 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector& pkts, uint32_t ssrc, uint16_t seq) -{ - for (map::iterator it = audio_tracks_.begin(); it != audio_tracks_.end(); ++it) { - SrsRtcAudioSendTrack* track = it->second; - - // If track is inactive, not process nack request. - if (!track->get_track_status()){ - continue; - } - - if (!track->has_ssrc(ssrc)) { - continue; - } - - // update recv nack statistic - track->on_recv_nack(); - - SrsRtpPacket2* pkt = track->fetch_rtp_packet(seq); - if (pkt != NULL) { - pkts.push_back(pkt); - } - return; - } - - for (map::iterator it = video_tracks_.begin(); it != video_tracks_.end(); ++it) { - SrsRtcVideoSendTrack* track = it->second; - - // If track is inactive, not process nack request. - if (!track->get_track_status()){ - continue; - } - - if (!track->has_ssrc(ssrc)) { - continue; - } - - // update recv nack statistic - track->on_recv_nack(); - - SrsRtpPacket2* pkt = track->fetch_rtp_packet(seq); - if (pkt != NULL) { - pkts.push_back(pkt); - } - return; - } -} - void SrsRtcPlayStream::set_all_tracks_status(bool status) { std::ostringstream merged_log; @@ -806,36 +761,46 @@ srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp) { srs_error_t err = srs_success; + ++_srs_pps_rnack->sugar; + + uint32_t ssrc = rtcp->get_media_ssrc(); + // If NACK disabled, print a log. if (!nack_enabled_) { vector sns = rtcp->get_lost_sns(); - srs_trace("RTC NACK ssrc=%u, seq=%s, ignored", rtcp->get_media_ssrc(), srs_join_vector_string(sns, ",").c_str()); + srs_trace("RTC NACK ssrc=%u, seq=%s, ignored", ssrc, srs_join_vector_string(sns, ",").c_str()); return err; } - // TODO: FIXME: Support ARQ. - vector resend_pkts; + SrsRtcSendTrack* target = NULL; + // Try audio track first. + for (map::iterator it = audio_tracks_.begin(); it != audio_tracks_.end(); ++it) { + SrsRtcAudioSendTrack* track = it->second; + if (!track->get_track_status() || !track->has_ssrc(ssrc)) { + continue; + } - vector sns = rtcp->get_lost_sns(); - for(int i = 0; i < (int)sns.size(); ++i) { - uint16_t seq = sns.at(i); - nack_fetch(resend_pkts, rtcp->get_media_ssrc(), seq); + target = track; + break; } - - for (int i = 0; i < (int)resend_pkts.size(); ++i) { - SrsRtpPacket2* pkt = resend_pkts[i]; - info.nn_bytes += pkt->nb_bytes(); - - uint32_t nn = 0; - if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) { - srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", pkt->header.get_sequence(), - pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes()); + // If not found, try video track. + for (map::iterator it = video_tracks_.begin(); !target && it != video_tracks_.end(); ++it) { + SrsRtcVideoSendTrack* track = it->second; + if (!track->get_track_status() || !track->has_ssrc(ssrc)) { + continue; } + + target = track; + break; + } + // Error if no track. + if (!target) { + return srs_error_new(ERROR_RTC_NO_TRACK, "no track for %u ssrc", ssrc); } - // By default, we send packets by sendmmsg. - if ((err = session_->do_send_packets(resend_pkts, info)) != srs_success) { - return srs_error_wrap(err, "raw send"); + vector seqs = rtcp->get_lost_sns(); + if((err = target->on_recv_nack(seqs, info)) != srs_success) { + return srs_error_wrap(err, "track response nack. id:%s, ssrc=%u", target->get_track_id().c_str(), ssrc); } session_->stat_->nn_nack++; @@ -909,13 +874,14 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid) { - timer_ = new SrsHourGlass(this, 10 * SRS_UTIME_MILLISECONDS); + timer_ = new SrsHourGlass("publish", this, 20 * SRS_UTIME_MILLISECONDS); cid_ = cid; is_started = false; session_ = session; request_keyframe_ = false; pli_epp = new SrsErrorPithyPrint(); + twcc_epp_ = new SrsErrorPithyPrint(3.0); req = NULL; source = NULL; @@ -933,16 +899,20 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon SrsRtcPublishStream::~SrsRtcPublishStream() { - if (_srs_rtc_hijacker) { - _srs_rtc_hijacker->on_stop_publish(session_, this, req); - } - // TODO: FIXME: Should remove and delete source. if (source) { source->set_publish_stream(NULL); source->on_unpublish(); } + // TODO: FIXME: Should not do callback in de-constructor? + // NOTE: on_stop_publish lead to switch io, + // it must be called after source stream unpublish (set source stream is_created=false). + // if not, it lead to republish failed. + if (_srs_rtc_hijacker) { + _srs_rtc_hijacker->on_stop_publish(session_, this, req); + } + for (int i = 0; i < (int)video_tracks_.size(); ++i) { SrsRtcVideoRecvTrack* track = video_tracks_.at(i); srs_freep(track); @@ -957,6 +927,7 @@ SrsRtcPublishStream::~SrsRtcPublishStream() srs_freep(timer_); srs_freep(pli_worker_); + srs_freep(twcc_epp_); srs_freep(pli_epp); srs_freep(req); } @@ -991,9 +962,9 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost); pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(req->vhost); - bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost); + twcc_enabled_ = _srs_config->get_rtc_twcc_enabled(req->vhost); - srs_trace("RTC publisher nack=%d, pt-drop=%u, twcc=%u/%d", nack_enabled_, pt_to_drop_, twcc_enabled, twcc_id); + srs_trace("RTC publisher nack=%d, pt-drop=%u, twcc=%u/%d", nack_enabled_, pt_to_drop_, twcc_enabled_, twcc_id); session_->stat_->nn_publishers++; @@ -1014,7 +985,7 @@ srs_error_t SrsRtcPublishStream::start() return err; } - if ((err = timer_->tick(SRS_TICKID_TWCC, 50 * SRS_UTIME_MILLISECONDS)) != srs_success) { + if ((err = timer_->tick(SRS_TICKID_TWCC, 40 * SRS_UTIME_MILLISECONDS)) != srs_success) { return srs_error_wrap(err, "twcc tick"); } @@ -1150,23 +1121,22 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) return err; } - // Decode the header first. - SrsRtpHeader h; - if (pt_to_drop_ && twcc_id_) { - SrsBuffer b(data, nb_data); - h.ignore_padding(true); h.set_extensions(&extension_types_); - if ((err = h.decode(&b)) != srs_success) { - return srs_error_wrap(err, "twcc decode header"); + // If payload type is configed to drop, ignore this packet. + if (pt_to_drop_) { + uint8_t pt = srs_rtp_fast_parse_pt(data, nb_data); + if (pt_to_drop_ == pt) { + return err; } } - // We must parse the TWCC from RTP header before SRTP unprotect, because: - // 1. Client may send some padding packets with invalid SequenceNumber, which causes the SRTP fail. - // 2. Server may send multiple duplicated NACK to client, and got more than one ARQ packet, which also fail SRTP. - // so, we must parse the header before SRTP unprotect(which may fail and drop packet). + // Decode the header first. if (twcc_id_) { + // We must parse the TWCC from RTP header before SRTP unprotect, because: + // 1. Client may send some padding packets with invalid SequenceNumber, which causes the SRTP fail. + // 2. Server may send multiple duplicated NACK to client, and got more than one ARQ packet, which also fail SRTP. + // so, we must parse the header before SRTP unprotect(which may fail and drop packet). uint16_t twcc_sn = 0; - if ((err = h.get_twcc_sequence_number(twcc_sn)) == srs_success) { + if ((err = srs_rtp_fast_parse_twcc(data, nb_data, &extension_types_, twcc_sn)) == srs_success) { if((err = on_twcc(twcc_sn)) != srs_success) { return srs_error_wrap(err, "on twcc"); } @@ -1175,34 +1145,36 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) } } - // If payload type is configed to drop, ignore this packet. - if (pt_to_drop_ && pt_to_drop_ == h.get_payload_type()) { - return err; - } - // Decrypt the cipher to plaintext RTP data. int nb_unprotected_buf = nb_data; - char* unprotected_buf = new char[kRtpPacketSize]; - if ((err = session_->transport_->unprotect_rtp(data, unprotected_buf, nb_unprotected_buf)) != srs_success) { + if ((err = session_->transport_->unprotect_rtp(data, &nb_unprotected_buf)) != srs_success) { // We try to decode the RTP header for more detail error informations. SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding. + err = srs_error_wrap(err, "marker=%u, pt=%u, seq=%u, ts=%u, ssrc=%u, pad=%u, payload=%uB", h.get_marker(), h.get_payload_type(), h.get_sequence(), h.get_timestamp(), h.get_ssrc(), h.get_padding(), nb_data - b.pos()); - srs_freepa(unprotected_buf); return err; } + srs_assert(nb_unprotected_buf > 0); + char* unprotected_buf = new char[nb_unprotected_buf]; + memcpy(unprotected_buf, data, nb_unprotected_buf); + if (_srs_blackhole->blackhole) { _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); } // Handle the plaintext RTP packet. if ((err = do_on_rtp(unprotected_buf, nb_unprotected_buf)) != srs_success) { + // We try to decode the RTP header for more detail error informations. + SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); + srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding. + int nb_header = h.nb_bytes(); - const char* body = unprotected_buf + nb_header; - int nb_body = nb_unprotected_buf - nb_header; + const char* body = data + nb_header; + int nb_body = nb_data - nb_header; return srs_error_wrap(err, "cipher=%u, plaintext=%u, body=[%s]", nb_data, nb_unprotected_buf, srs_string_dumps_hex(body, nb_body, 8).c_str()); } @@ -1239,28 +1211,20 @@ srs_error_t SrsRtcPublishStream::do_on_rtp(char* plaintext, int nb_plaintext) SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc); if (audio_track) { pkt->frame_type = SrsFrameTypeAudio; - if ((err = audio_track->on_rtp(source, pkt)) != srs_success) { + if ((err = audio_track->on_rtp(source, pkt, nack_enabled_)) != srs_success) { return srs_error_wrap(err, "on audio"); } } else if (video_track) { pkt->frame_type = SrsFrameTypeVideo; - if ((err = video_track->on_rtp(source, pkt)) != srs_success) { + if ((err = video_track->on_rtp(source, pkt, nack_enabled_)) != srs_success) { return srs_error_wrap(err, "on video"); } } else { return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc); } - // Check then send NACK every each RTP packet, to make it more efficient. - // For example, NACK of video track maybe triggered by audio RTP packets. - if ((err = check_send_nacks()) != srs_success) { - srs_warn("ignore nack err %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - if (_srs_rtc_hijacker) { - // TODO: FIXME: copy pkt by hijacker itself - if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req, pkt->copy())) != srs_success) { + if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req, pkt)) != srs_success) { return srs_error_wrap(err, "on rtp packet"); } } @@ -1272,6 +1236,10 @@ srs_error_t SrsRtcPublishStream::check_send_nacks() { srs_error_t err = srs_success; + if (!nack_enabled_) { + return err; + } + for (int i = 0; i < (int)video_tracks_.size(); ++i) { SrsRtcVideoRecvTrack* track = video_tracks_.at(i); if ((err = track->check_send_nacks()) != srs_success) { @@ -1297,17 +1265,13 @@ void SrsRtcPublishStream::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer } uint32_t ssrc = pkt->header.get_ssrc(); - if (get_audio_track(ssrc)) { - *ppayload = new SrsRtpRawPayload(); - } else if (get_video_track(ssrc)) { - uint8_t v = (uint8_t)pkt->nalu_type; - if (v == kStapA) { - *ppayload = new SrsRtpSTAPPayload(); - } else if (v == kFuA) { - *ppayload = new SrsRtpFUAPayload2(); - } else { - *ppayload = new SrsRtpRawPayload(); - } + SrsRtcAudioRecvTrack* audio_track = get_audio_track(ssrc); + SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc); + + if (audio_track) { + audio_track->on_before_decode_payload(pkt, buf, ppayload); + } else if (video_track) { + video_track->on_before_decode_payload(pkt, buf, ppayload); } } @@ -1316,9 +1280,11 @@ srs_error_t SrsRtcPublishStream::send_periodic_twcc() srs_error_t err = srs_success; if (last_time_send_twcc_) { + uint32_t nn = 0; srs_utime_t duration = srs_duration(last_time_send_twcc_, srs_get_system_time()); - if (duration > 80 * SRS_UTIME_MILLISECONDS) { - srs_warn2(TAG_LARGE_TIMER, "send_twcc interval exceeded %dms > 100ms", srsu2msi(duration)); + if (duration > 80 * SRS_UTIME_MILLISECONDS && twcc_epp_->can_print(0, &nn)) { + srs_warn2(TAG_LARGE_TIMER, "send_twcc interval exceeded %dms > 80ms, count=%u/%u", + srsu2msi(duration), nn, twcc_epp_->nn_count); } } last_time_send_twcc_ = srs_get_system_time(); @@ -1339,12 +1305,11 @@ srs_error_t SrsRtcPublishStream::send_periodic_twcc() } int nb_protected_buf = buffer->pos(); - char protected_buf[kRtpPacketSize]; - if ((err = session_->transport_->protect_rtcp(pkt, protected_buf, nb_protected_buf)) != srs_success) { + if ((err = session_->transport_->protect_rtcp(pkt, &nb_protected_buf)) != srs_success) { return srs_error_wrap(err, "protect rtcp, size=%u", nb_protected_buf); } - return session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); + return session_->sendonly_skt->sendto(pkt, nb_protected_buf, 0); } srs_error_t SrsRtcPublishStream::on_rtcp(SrsRtcpCommon* rtcp) @@ -1485,7 +1450,7 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim } } - if (type == SRS_TICKID_TWCC) { + if (twcc_enabled_ && type == SRS_TICKID_TWCC) { // We should not depends on the received packet, // instead we should send feedback every Nms. if ((err = send_periodic_twcc()) != srs_success) { @@ -1619,7 +1584,7 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) req = NULL; cid_ = cid; stat_ = new SrsRtcConnectionStatistic(); - timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS); + timer_ = new SrsHourGlass("conn", this, 20 * SRS_UTIME_MILLISECONDS); hijacker_ = NULL; sendonly_skt = NULL; @@ -1776,6 +1741,7 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRequest* req, const SrsSdp& remot SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription(); SrsAutoFree(SrsRtcStreamDescription, stream_desc); + // TODO: FIXME: Change to api of stream desc. if ((err = negotiate_publish_capability(req, remote_sdp, stream_desc)) != srs_success) { return srs_error_wrap(err, "publish negotiate"); } @@ -1922,6 +1888,10 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st return srs_error_wrap(err, "init"); } + if ((err = timer_->tick(SRS_TICKID_SEND_NACKS, 20 * SRS_UTIME_MILLISECONDS)) != srs_success) { + return srs_error_wrap(err, "tick nack"); + } + if ((err = timer_->start()) != srs_success) { return srs_error_wrap(err, "start timer"); } @@ -1969,12 +1939,12 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data) { srs_error_t err = srs_success; - char unprotected_buf[kRtpPacketSize]; int nb_unprotected_buf = nb_data; - if ((err = transport_->unprotect_rtcp(data, unprotected_buf, nb_unprotected_buf)) != srs_success) { + if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) { return srs_error_wrap(err, "rtcp unprotect"); } + char* unprotected_buf = data; if (_srs_blackhole->blackhole) { _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); } @@ -2107,27 +2077,36 @@ srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data) { srs_error_t err = srs_success; + SrsRtcPublishStream* publisher = NULL; + if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) { + return srs_error_wrap(err, "find"); + } + srs_assert(publisher); + + return publisher->on_rtp(data, nb_data); +} + +srs_error_t SrsRtcConnection::find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher) +{ + srs_error_t err = srs_success; + if (publishers_.size() == 0) { return srs_error_new(ERROR_RTC_RTCP, "no publisher"); } - SrsRtpHeader header; - if (true) { - SrsBuffer* buffer = new SrsBuffer(data, nb_data); - SrsAutoFree(SrsBuffer, buffer); - header.ignore_padding(true); - if(srs_success != (err = header.decode(buffer))) { - return srs_error_wrap(err, "decode rtp header"); - } + uint32_t ssrc = srs_rtp_fast_parse_ssrc(buf, size); + if (ssrc == 0) { + return srs_error_new(ERROR_RTC_NO_PUBLISHER, "invalid ssrc"); } - map::iterator it = publishers_ssrc_map_.find(header.get_ssrc()); + map::iterator it = publishers_ssrc_map_.find(ssrc); if(it == publishers_ssrc_map_.end()) { - return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no publisher for ssrc:%u", header.get_ssrc()); + return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no publisher for ssrc:%u", ssrc); } - SrsRtcPublishStream* publisher = it->second; - return publisher->on_rtp(data, nb_data); + *ppublisher = it->second; + + return err; } srs_error_t SrsRtcConnection::on_connection_established() @@ -2228,7 +2207,7 @@ srs_error_t SrsRtcConnection::start_publish(std::string stream_uri) bool SrsRtcConnection::is_alive() { - return last_stun_time + session_timeout < srs_get_system_time(); + return last_stun_time + session_timeout > srs_get_system_time(); } void SrsRtcConnection::alive() @@ -2272,7 +2251,12 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt) // If no cache, build cache and setup the relations in connection. if (!addr_cache) { peer_addresses_[peer_id] = addr_cache = skt->copy_sendonly(); - server_->insert_into_id_sessions(peer_id, this); + _srs_rtc_manager->add_with_id(peer_id, this); + + uint64_t fast_id = skt->fast_id(); + if (fast_id) { + _srs_rtc_manager->add_with_fast_id(fast_id, this); + } } // Update the transport. @@ -2282,6 +2266,24 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt) srs_error_t SrsRtcConnection::notify(int type, srs_utime_t interval, srs_utime_t tick) { srs_error_t err = srs_success; + + ++_srs_pps_conn->sugar; + + // For publisher to send NACK. + if (type == SRS_TICKID_SEND_NACKS) { + srs_update_system_time(); + + std::map::iterator it; + for (it = publishers_.begin(); it != publishers_.end(); it++) { + SrsRtcPublishStream* publisher = it->second; + + if ((err = publisher->check_send_nacks()) != srs_success) { + srs_warn("ignore nack err %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + } + } + return err; } @@ -2290,12 +2292,11 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data) srs_error_t err = srs_success; int nb_buf = nb_data; - char protected_buf[kRtpPacketSize]; - if ((err = transport_->protect_rtcp(data, protected_buf, nb_buf)) != srs_success) { + if ((err = transport_->protect_rtcp(data, &nb_buf)) != srs_success) { return srs_error_wrap(err, "protect rtcp"); } - if ((err = sendonly_skt->sendto(protected_buf, nb_buf, 0)) != srs_success) { + if ((err = sendonly_skt->sendto(data, nb_buf, 0)) != srs_success) { return srs_error_wrap(err, "send"); } @@ -2304,16 +2305,19 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data) void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc, uint32_t& sent_nacks, uint32_t& timeout_nacks) { + ++_srs_pps_snack->sugar; + SrsRtcpNack rtcpNack(ssrc); rtcpNack.set_media_ssrc(ssrc); nack->get_nack_seqs(rtcpNack, timeout_nacks); - sent_nacks = rtcpNack.get_lost_sns().size(); - if(!sent_nacks){ + if(rtcpNack.empty()){ return; } + ++_srs_pps_snack2->sugar; + char buf[kRtcpPacketSize]; SrsBuffer stream(buf, sizeof(buf)); @@ -2321,12 +2325,11 @@ void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ss rtcpNack.encode(&stream); // TODO: FIXME: Check error. - char protected_buf[kRtpPacketSize]; int nb_protected_buf = stream.pos(); - transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf); + transport_->protect_rtcp(stream.data(), &nb_protected_buf); // TODO: FIXME: Check error. - sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); + sendonly_skt->sendto(stream.data(), nb_protected_buf, 0); } srs_error_t SrsRtcConnection::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue, const uint64_t& last_send_systime, const SrsNtp& last_send_ntp) @@ -2366,13 +2369,12 @@ srs_error_t SrsRtcConnection::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_ srs_info("RR ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, extended_highest_sequence=%u, interarrival_jitter=%u", ssrc, fraction_lost, cumulative_number_of_packets_lost, extended_highest_sequence, interarrival_jitter); - char protected_buf[kRtpPacketSize]; int nb_protected_buf = stream.pos(); - if ((err = transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) { + if ((err = transport_->protect_rtcp(stream.data(), &nb_protected_buf)) != srs_success) { return srs_error_wrap(err, "protect rtcp rr"); } - return sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); + return sendonly_skt->sendto(stream.data(), nb_protected_buf, 0); } srs_error_t SrsRtcConnection::send_rtcp_xr_rrtr(uint32_t ssrc) @@ -2419,13 +2421,12 @@ srs_error_t SrsRtcConnection::send_rtcp_xr_rrtr(uint32_t ssrc) stream.write_4bytes(cur_ntp.ntp_second_); stream.write_4bytes(cur_ntp.ntp_fractions_); - char protected_buf[kRtpPacketSize]; int nb_protected_buf = stream.pos(); - if ((err = transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) { + if ((err = transport_->protect_rtcp(stream.data(), &nb_protected_buf)) != srs_success) { return srs_error_wrap(err, "protect rtcp xr"); } - return sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); + return sendonly_skt->sendto(stream.data(), nb_protected_buf, 0); } srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc, const SrsContextId& cid_of_subscriber) @@ -2450,13 +2451,12 @@ srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc, const SrsContextId _srs_blackhole->sendto(stream.data(), stream.pos()); } - char protected_buf[kRtpPacketSize]; int nb_protected_buf = stream.pos(); - if ((err = transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) { + if ((err = transport_->protect_rtcp(stream.data(), &nb_protected_buf)) != srs_success) { return srs_error_wrap(err, "protect rtcp psfb pli"); } - return sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); + return sendonly_skt->sendto(stream.data(), nb_protected_buf, 0); } void SrsRtcConnection::simulate_nack_drop(int nn) @@ -2507,7 +2507,7 @@ srs_error_t SrsRtcConnection::do_send_packets(const std::vector& // Cipher RTP to SRTP packet. if (true) { int nn_encrypt = (int)iov->iov_len; - if ((err = transport_->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) { + if ((err = transport_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) { return srs_error_wrap(err, "srtp protect"); } iov->iov_len = (size_t)nn_encrypt; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index e04c9671e..6b84e108a 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -97,11 +97,14 @@ public: virtual srs_error_t on_dtls(char* data, int nb_data) = 0; virtual srs_error_t on_dtls_alert(std::string type, std::string desc) = 0; public: - virtual srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher) = 0; - virtual srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher) = 0; - virtual srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr) = 0; - virtual srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext) = 0; - virtual srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext) = 0; + // Encrypt the packet(paintext) to cipher, which is aso the packet ptr. + // The nb_cipher should be initialized to the size of cipher, with some paddings. + virtual srs_error_t protect_rtp(void* packet, int* nb_cipher) = 0; + virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher) = 0; + // Decrypt the packet(cipher) to plaintext, which is also the packet ptr. + // The nb_plaintext should be initialized to the size of cipher. + virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext) = 0; + virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext) = 0; }; // The security transport, use DTLS/SRTP to protect the data. @@ -122,19 +125,14 @@ public: srs_error_t on_dtls(char* data, int nb_data); srs_error_t on_dtls_alert(std::string type, std::string desc); public: - // Encrypt the input plaintext to output cipher with nb_cipher bytes. - // @remark Note that the nb_cipher is the size of input plaintext, and - // it also is the length of output cipher when return. - srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher); - srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher); - // Encrypt the input rtp_hdr with *len_ptr bytes. - // @remark the input plaintext and out cipher reuse rtp_hdr. - srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr); - // Decrypt the input cipher to output cipher with nb_cipher bytes. - // @remark Note that the nb_plaintext is the size of input cipher, and - // it also is the length of output plaintext when return. - srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext); - srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext); + // Encrypt the packet(paintext) to cipher, which is aso the packet ptr. + // The nb_cipher should be initialized to the size of cipher, with some paddings. + srs_error_t protect_rtp(void* packet, int* nb_cipher); + srs_error_t protect_rtcp(void* packet, int* nb_cipher); + // Decrypt the packet(cipher) to plaintext, which is also the packet ptr. + // The nb_plaintext should be initialized to the size of cipher. + srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); + srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); // implement ISrsDtlsCallback public: virtual srs_error_t on_dtls_handshake_done(); @@ -151,9 +149,8 @@ public: SrsSemiSecurityTransport(SrsRtcConnection* s); virtual ~SrsSemiSecurityTransport(); public: - virtual srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher); - virtual srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher); - virtual srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr); + srs_error_t protect_rtp(void* packet, int* nb_cipher); + srs_error_t protect_rtcp(void* packet, int* nb_cipher); }; // Plaintext transport, without DTLS or SRTP. @@ -173,11 +170,10 @@ public: virtual srs_error_t on_dtls_application_data(const char* data, const int len); virtual srs_error_t write_dtls_data(void* data, int size); public: - virtual srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher); - virtual srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher); - virtual srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr); - virtual srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext); - virtual srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext); + srs_error_t protect_rtp(void* packet, int* nb_cipher); + srs_error_t protect_rtcp(void* packet, int* nb_cipher); + srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); + srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); }; // The handler for PLI worker coroutine. @@ -289,7 +285,6 @@ public: virtual srs_error_t cycle(); private: srs_error_t send_packets(SrsRtcStream* source, const std::vector& pkts, SrsRtcPlayStreamStatistic& info); - void nack_fetch(std::vector& pkts, uint32_t ssrc, uint16_t seq); public: // Directly set the status of track, generally for init to set the default value. void set_all_tracks_status(bool status); @@ -318,11 +313,13 @@ private: SrsHourGlass* timer_; uint64_t nn_audio_frames; SrsRtcPLIWorker* pli_worker_; + SrsErrorPithyPrint* twcc_epp_; private: SrsRtcConnection* session_; uint16_t pt_to_drop_; // Whether enabled nack. bool nack_enabled_; + bool twcc_enabled_; private: bool request_keyframe_; SrsErrorPithyPrint* pli_epp; @@ -358,6 +355,7 @@ public: srs_error_t on_rtp(char* buf, int nb_buf); private: srs_error_t do_on_rtp(char* plaintext, int nb_plaintext); +public: srs_error_t check_send_nacks(); public: virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload); @@ -415,8 +413,11 @@ public: }; // A RTC Peer Connection, SDP level object. -class SrsRtcConnection : virtual public ISrsHourGlass, virtual public ISrsResource - , virtual public ISrsDisposingHandler +// +// For performance, we use non-virtual public from resource, +// see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a +class SrsRtcConnection : public ISrsResource + , virtual public ISrsHourGlass, virtual public ISrsDisposingHandler { friend class SrsSecurityTransport; friend class SrsRtcPlayStream; @@ -507,6 +508,10 @@ public: srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r); srs_error_t on_dtls(char* data, int nb_data); srs_error_t on_rtp(char* data, int nb_data); +private: + // Decode the RTP header from buf, find the publisher by SSRC. + srs_error_t find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher); +public: srs_error_t on_rtcp(char* data, int nb_data); private: srs_error_t dispatch_rtcp(SrsRtcpCommon* rtcp); @@ -575,6 +580,8 @@ public: virtual srs_error_t on_before_play(SrsRtcConnection* session, SrsRequest* req) = 0; // When start player by RTC. virtual srs_error_t on_start_play(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req) = 0; + // When stop player by RTC. + virtual void on_stop_play(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req) = 0; // When start consuming for player for RTC. virtual srs_error_t on_start_consume(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req, SrsRtcConsumer* consumer) = 0; }; diff --git a/trunk/src/app/srs_app_rtc_dtls.cpp b/trunk/src/app/srs_app_rtc_dtls.cpp index cd877b360..d2d6bc364 100644 --- a/trunk/src/app/srs_app_rtc_dtls.cpp +++ b/trunk/src/app/srs_app_rtc_dtls.cpp @@ -955,7 +955,7 @@ srs_error_t SrsSRTP::initialize(string recv_key, std::string send_key) return err; } -srs_error_t SrsSRTP::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher) +srs_error_t SrsSRTP::protect_rtp(void* packet, int* nb_cipher) { srs_error_t err = srs_success; @@ -964,17 +964,15 @@ srs_error_t SrsSRTP::protect_rtp(const char* plaintext, char* cipher, int& nb_ci return srs_error_new(ERROR_RTC_SRTP_PROTECT, "not ready"); } - memcpy(cipher, plaintext, nb_cipher); - srtp_err_status_t r0 = srtp_err_status_ok; - if ((r0 = srtp_protect(send_ctx_, cipher, &nb_cipher)) != srtp_err_status_ok) { + if ((r0 = srtp_protect(send_ctx_, packet, nb_cipher)) != srtp_err_status_ok) { return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect r0=%u", r0); } return err; } -srs_error_t SrsSRTP::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher) +srs_error_t SrsSRTP::protect_rtcp(void* packet, int* nb_cipher) { srs_error_t err = srs_success; @@ -983,34 +981,15 @@ srs_error_t SrsSRTP::protect_rtcp(const char* plaintext, char* cipher, int& nb_c return srs_error_new(ERROR_RTC_SRTP_PROTECT, "not ready"); } - memcpy(cipher, plaintext, nb_cipher); - srtp_err_status_t r0 = srtp_err_status_ok; - if ((r0 = srtp_protect_rtcp(send_ctx_, cipher, &nb_cipher)) != srtp_err_status_ok) { + if ((r0 = srtp_protect_rtcp(send_ctx_, packet, nb_cipher)) != srtp_err_status_ok) { return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtcp protect r0=%u", r0); } return err; } -srs_error_t SrsSRTP::protect_rtp2(void* rtp_hdr, int* len_ptr) -{ - srs_error_t err = srs_success; - - // If DTLS/SRTP is not ready, fail. - if (!send_ctx_) { - return srs_error_new(ERROR_RTC_SRTP_PROTECT, "not ready"); - } - - srtp_err_status_t r0 = srtp_err_status_ok; - if ((r0 = srtp_protect(send_ctx_, rtp_hdr, len_ptr)) != srtp_err_status_ok) { - return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect r0=%u", r0); - } - - return err; -} - -srs_error_t SrsSRTP::unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext) +srs_error_t SrsSRTP::unprotect_rtp(void* packet, int* nb_plaintext) { srs_error_t err = srs_success; @@ -1019,17 +998,15 @@ srs_error_t SrsSRTP::unprotect_rtp(const char* cipher, char* plaintext, int& nb_ return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready"); } - memcpy(plaintext, cipher, nb_plaintext); - srtp_err_status_t r0 = srtp_err_status_ok; - if ((r0 = srtp_unprotect(recv_ctx_, plaintext, &nb_plaintext)) != srtp_err_status_ok) { + if ((r0 = srtp_unprotect(recv_ctx_, packet, nb_plaintext)) != srtp_err_status_ok) { return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtp unprotect r0=%u", r0); } return err; } -srs_error_t SrsSRTP::unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext) +srs_error_t SrsSRTP::unprotect_rtcp(void* packet, int* nb_plaintext) { srs_error_t err = srs_success; @@ -1038,10 +1015,8 @@ srs_error_t SrsSRTP::unprotect_rtcp(const char* cipher, char* plaintext, int& nb return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready"); } - memcpy(plaintext, cipher, nb_plaintext); - srtp_err_status_t r0 = srtp_err_status_ok; - if ((r0 = srtp_unprotect_rtcp(recv_ctx_, plaintext, &nb_plaintext)) != srtp_err_status_ok) { + if ((r0 = srtp_unprotect_rtcp(recv_ctx_, packet, nb_plaintext)) != srtp_err_status_ok) { return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect r0=%u", r0); } diff --git a/trunk/src/app/srs_app_rtc_dtls.hpp b/trunk/src/app/srs_app_rtc_dtls.hpp index e6ddfaf3e..75e099f83 100644 --- a/trunk/src/app/srs_app_rtc_dtls.hpp +++ b/trunk/src/app/srs_app_rtc_dtls.hpp @@ -224,19 +224,10 @@ public: // Intialize srtp context with recv_key and send_key. srs_error_t initialize(std::string recv_key, std::string send_key); public: - // Encrypt the input plaintext to output cipher with nb_cipher bytes. - // @remark Note that the nb_cipher is the size of input plaintext, and - // it also is the length of output cipher when return. - srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher); - srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher); - // Encrypt the input rtp_hdr with *len_ptr bytes. - // @remark the input plaintext and out cipher reuse rtp_hdr. - srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr); - // Decrypt the input cipher to output cipher with nb_cipher bytes. - // @remark Note that the nb_plaintext is the size of input cipher, and - // it also is the length of output plaintext when return. - srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext); - srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext); + srs_error_t protect_rtp(void* packet, int* nb_cipher); + srs_error_t protect_rtcp(void* packet, int* nb_cipher); + srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); + srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); }; #endif diff --git a/trunk/src/app/srs_app_rtc_queue.cpp b/trunk/src/app/srs_app_rtc_queue.cpp index efa3a6b75..2bba97b57 100644 --- a/trunk/src/app/srs_app_rtc_queue.cpp +++ b/trunk/src/app/srs_app_rtc_queue.cpp @@ -146,10 +146,7 @@ SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq) { void SrsRtpRingBuffer::notify_nack_list_full() { - while(begin <= end) { - remove(begin); - ++begin; - } + clear_all_histroy(); begin = end = 0; initialized_ = false; @@ -161,6 +158,29 @@ void SrsRtpRingBuffer::notify_drop_seq(uint16_t seq) advance_to(seq+1); } +void SrsRtpRingBuffer::clear_histroy(uint16_t seq) +{ + // TODO FIXME Did not consider loopback + for (uint16_t i = 0; i < capacity_; i++) { + SrsRtpPacket2* p = queue_[i]; + if (p && p->header.get_sequence() < seq) { + srs_freep(p); + queue_[i] = NULL; + } + } +} + +void SrsRtpRingBuffer::clear_all_histroy() +{ + for (uint16_t i = 0; i < capacity_; i++) { + SrsRtpPacket2* p = queue_[i]; + if (p) { + srs_freep(p); + queue_[i] = NULL; + } + } +} + SrsNackOption::SrsNackOption() { max_count = 15; @@ -170,7 +190,7 @@ SrsNackOption::SrsNackOption() max_nack_interval = 500 * SRS_UTIME_MILLISECONDS; min_nack_interval = 20 * SRS_UTIME_MILLISECONDS; - nack_check_interval = 3 * SRS_UTIME_MILLISECONDS; + nack_check_interval = 20 * SRS_UTIME_MILLISECONDS; //TODO: FIXME: audio and video using diff nack strategy // video: @@ -239,8 +259,7 @@ void SrsRtpNackForReceiver::check_queue_size() void SrsRtpNackForReceiver::get_nack_seqs(SrsRtcpNack& seqs, uint32_t& timeout_nacks) { - // TODO: FIXME: Use packet as tick count, not clock. - srs_utime_t now = srs_update_system_time(); + srs_utime_t now = srs_get_system_time(); srs_utime_t interval = now - pre_check_time_; if (interval < opts_.nack_check_interval) { diff --git a/trunk/src/app/srs_app_rtc_queue.hpp b/trunk/src/app/srs_app_rtc_queue.hpp index 630f4c654..b1d2b2cac 100644 --- a/trunk/src/app/srs_app_rtc_queue.hpp +++ b/trunk/src/app/srs_app_rtc_queue.hpp @@ -87,6 +87,9 @@ public: // TODO: FIXME: Refine it? void notify_nack_list_full(); void notify_drop_seq(uint16_t seq); +public: + void clear_histroy(uint16_t seq); + void clear_all_histroy(); }; struct SrsNackOption diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 0d9b747d0..1573c73ff 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -45,6 +45,30 @@ using namespace std; #include #include +extern SrsPps* _srs_pps_pkts; +extern SrsPps* _srs_pps_addrs; +extern SrsPps* _srs_pps_fast_addrs; + +extern SrsPps* _srs_pps_ids; +extern SrsPps* _srs_pps_fids; +extern SrsPps* _srs_pps_fids_level0; + +extern SrsPps* _srs_pps_pli; +extern SrsPps* _srs_pps_twcc; +extern SrsPps* _srs_pps_rr; + +extern SrsPps* _srs_pps_timer; +extern SrsPps* _srs_pps_pub; +extern SrsPps* _srs_pps_conn; + +extern SrsPps* _srs_pps_snack; +extern SrsPps* _srs_pps_snack2; +extern SrsPps* _srs_pps_sanack; +extern SrsPps* _srs_pps_svnack; + +extern SrsPps* _srs_pps_rnack; +extern SrsPps* _srs_pps_rnack2; + SrsRtcBlackhole::SrsRtcBlackhole() { blackhole = false; @@ -217,7 +241,7 @@ SrsRtcServer::SrsRtcServer() { handler = NULL; hijacker = NULL; - timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); + timer = new SrsHourGlass("server", this, 1 * SRS_UTIME_SECONDS); } SrsRtcServer::~SrsRtcServer() @@ -300,27 +324,32 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; - string peer_id = skt->peer_id(); + SrsRtcConnection* session = NULL; char* data = skt->data(); int size = skt->size(); + bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)data, size); + bool is_rtcp = srs_is_rtcp((uint8_t*)data, size); - SrsRtcConnection* session = NULL; - if (true) { - ISrsResource* conn = _srs_rtc_manager->find_by_id(peer_id); - if (conn) { - // Switch to the session to write logs to the context. - session = dynamic_cast(conn); - session->switch_to_context(); - } + uint64_t fast_id = skt->fast_id(); + // Try fast id first, if not found, search by long peer id. + if (fast_id) { + session = (SrsRtcConnection*)_srs_rtc_manager->find_by_fast_id(fast_id); + } + if (!session) { + string peer_id = skt->peer_id(); + session = (SrsRtcConnection*)_srs_rtc_manager->find_by_id(peer_id); } - // When got any packet, the session is alive now. if (session) { + // When got any packet, the session is alive now. session->alive(); } // Notify hijack to handle the UDP packet. - if (hijacker) { + if (hijacker && is_rtp_or_rtcp && is_rtcp) { bool consumed = false; + if (session) { + session->switch_to_context(); + } if ((err = hijacker->on_udp_packet(skt, session, &consumed)) != srs_success) { return srs_error_wrap(err, "hijack consumed=%u", consumed); } @@ -331,27 +360,27 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) } // For STUN, the peer address may change. - if (srs_is_stun((uint8_t*)data, size)) { + if (!is_rtp_or_rtcp && srs_is_stun((uint8_t*)data, size)) { + string peer_id = skt->peer_id(); + SrsStunPacket ping; if ((err = ping.decode(data, size)) != srs_success) { return srs_error_wrap(err, "decode stun packet failed"); } - srs_info("recv stun packet from %s, use-candidate=%d, ice-controlled=%d, ice-controlling=%d", - peer_id.c_str(), ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling()); - if (!session) { session = find_session_by_username(ping.get_username()); - - // Switch to the session to write logs to the context. - if (session) { - session->switch_to_context(); - } } + if (session) { + session->switch_to_context(); + } + + srs_info("recv stun packet from %s, fast=%" PRId64 ", use-candidate=%d, ice-controlled=%d, ice-controlling=%d", + peer_id.c_str(), fast_id, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling()); // TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it. if (!session) { - return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s", - ping.get_username().c_str(), peer_id.c_str()); + return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s, fast=%" PRId64, + ping.get_username().c_str(), peer_id.c_str(), fast_id); } return session->on_stun(skt, &ping); @@ -359,18 +388,26 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) // For DTLS, RTCP or RTP, which does not support peer address changing. if (!session) { - return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s", peer_id.c_str()); + string peer_id = skt->peer_id(); + return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s, fast=%" PRId64, peer_id.c_str(), fast_id); } - if (srs_is_dtls((uint8_t*)data, size)) { - return session->on_dtls(data, size); - } else if (srs_is_rtp_or_rtcp((uint8_t*)data, size)) { - if (srs_is_rtcp((uint8_t*)data, size)) { - return session->on_rtcp(data, size); + // Note that we don't(except error) switch to the context of session, for performance issue. + if (is_rtp_or_rtcp && !is_rtcp) { + err = session->on_rtp(data, size); + if (err != srs_success) { + session->switch_to_context(); } - return session->on_rtp(data, size); + return err; } + session->switch_to_context(); + if (is_rtp_or_rtcp && is_rtcp) { + return session->on_rtcp(data, size); + } + if (srs_is_dtls((uint8_t*)data, size)) { + return session->on_dtls(data, size); + } return srs_error_new(ERROR_RTC_UDP, "unknown packet"); } @@ -469,8 +506,12 @@ srs_error_t SrsRtcServer::do_create_session( // We allows to mock the eip of server. if (!mock_eip.empty()) { - local_sdp.add_candidate(mock_eip, _srs_config->get_rtc_server_listen(), "host"); - srs_trace("RTC: Use candidate mock_eip %s", mock_eip.c_str()); + string host; + int port = _srs_config->get_rtc_server_listen(); + srs_parse_hostport(mock_eip, host, port); + + local_sdp.add_candidate(host, port, "host"); + srs_trace("RTC: Use candidate mock_eip %s as %s:%d", mock_eip.c_str(), host.c_str(), port); } else { std::vector candidate_ips = get_candidate_ips(); for (int i = 0; i < (int)candidate_ips.size(); ++i) { @@ -534,8 +575,12 @@ srs_error_t SrsRtcServer::create_session2(SrsRequest* req, SrsSdp& local_sdp, co // We allows to mock the eip of server. if (!mock_eip.empty()) { - local_sdp.add_candidate(mock_eip, _srs_config->get_rtc_server_listen(), "host"); - srs_trace("RTC: Use candidate mock_eip %s", mock_eip.c_str()); + string host; + int port = _srs_config->get_rtc_server_listen(); + srs_parse_hostport(mock_eip, host, port); + + local_sdp.add_candidate(host, port, "host"); + srs_trace("RTC: Use candidate mock_eip %s as %s:%d", mock_eip.c_str(), host.c_str(), port); } else { std::vector candidate_ips = get_candidate_ips(); for (int i = 0; i < (int)candidate_ips.size(); ++i) { @@ -574,11 +619,6 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest* return err; } -void SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session) -{ - _srs_rtc_manager->add_with_id(peer_id, session); -} - SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& username) { ISrsResource* conn = _srs_rtc_manager->find_by_name(username); @@ -595,7 +635,13 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic // Check all sessions and dispose the dead sessions. for (int i = 0; i < (int)_srs_rtc_manager->size(); i++) { SrsRtcConnection* session = dynamic_cast(_srs_rtc_manager->at(i)); - if (!session || !session->is_alive() || session->disposing_) { + // Ignore not session, or already disposing. + if (!session || session->disposing_) { + continue; + } + + // Update stat if session is alive. + if (session->is_alive()) { nn_rtc_conns++; continue; } @@ -616,12 +662,23 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic return err; } - // Show statistics for RTC server. - SrsProcSelfStat* u = srs_get_self_proc_stat(); - // Resident Set Size: number of pages the process has in real memory. - int memory = (int)(u->rss * 4 / 1024); + // Update the pps stat for UDP socket and adddresses. + _srs_pps_pkts->update(); _srs_pps_addrs->update(); _srs_pps_fast_addrs->update(); + _srs_pps_ids->update(); _srs_pps_fids->update(); _srs_pps_fids_level0->update(); + _srs_pps_pli->update(); _srs_pps_twcc->update(); _srs_pps_rr->update(); + _srs_pps_timer->update(); _srs_pps_pub->update(); _srs_pps_conn->update(); + _srs_pps_snack->update(); _srs_pps_snack2->update(); _srs_pps_sanack->update(); _srs_pps_svnack->update(); + _srs_pps_rnack->update(); _srs_pps_rnack2->update(); + // TODO: FIXME: Show more data for RTC server. - srs_trace("RTC: Server conns=%u, cpu=%.2f%%, rss=%dMB", nn_rtc_conns, u->percent * 100, memory); + srs_trace("RTC: Server conns=%u, pkts=%d, addrs=%d,%d, fid=%d,%d,%d, rtcp=%d,%d,%d, snk=%d,%d,%d,%d, rnk=%d,%d", + nn_rtc_conns, + _srs_pps_pkts->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s(), + _srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(), + _srs_pps_pli->r10s(), _srs_pps_twcc->r10s(), _srs_pps_rr->r10s(), + _srs_pps_snack->r10s(), _srs_pps_snack2->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(), + _srs_pps_rnack->r10s(), _srs_pps_rnack2->r10s() + ); return err; } diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index e1f7d7397..8dd4b5530 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -122,8 +122,6 @@ public: // We start offering, create_session2 to generate offer, setup_session2 to handle answer. srs_error_t create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, bool unified_plan, SrsRtcConnection** psession); srs_error_t setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp); -public: - void insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session); public: SrsRtcConnection* find_session_by_username(const std::string& ufrag); // interface ISrsHourGlass diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 979078b15..24898f3fb 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -48,6 +48,17 @@ #include #endif +#include + +// The NACK sent by us(SFU). +SrsPps* _srs_pps_snack = new SrsPps(_srs_clock); +SrsPps* _srs_pps_snack2 = new SrsPps(_srs_clock); +SrsPps* _srs_pps_sanack = new SrsPps(_srs_clock); +SrsPps* _srs_pps_svnack = new SrsPps(_srs_clock); + +SrsPps* _srs_pps_rnack = new SrsPps(_srs_clock); +SrsPps* _srs_pps_rnack2 = new SrsPps(_srs_clock); + // Firefox defaults as 109, Chrome is 111. const int kAudioPayloadType = 111; const int kAudioChannel = 2; @@ -1831,7 +1842,17 @@ SrsRtcAudioRecvTrack::~SrsRtcAudioRecvTrack() { } -srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt) +void SrsRtcAudioRecvTrack::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) +{ + // No payload, ignore. + if (buf->empty()) { + return; + } + + *ppayload = new SrsRtpRawPayload(); +} + +srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled) { srs_error_t err = srs_success; @@ -1847,7 +1868,7 @@ srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pk } // For NACK to handle packet. - if ((err = on_nack(pkt)) != srs_success) { + if (nack_enabled && (err = on_nack(pkt)) != srs_success) { return srs_error_wrap(err, "on nack"); } @@ -1858,6 +1879,8 @@ srs_error_t SrsRtcAudioRecvTrack::check_send_nacks() { srs_error_t err = srs_success; + ++_srs_pps_sanack->sugar; + uint32_t timeout_nacks = 0; if ((err = do_check_send_nacks(timeout_nacks)) != srs_success) { return srs_error_wrap(err, "audio"); @@ -1875,7 +1898,24 @@ SrsRtcVideoRecvTrack::~SrsRtcVideoRecvTrack() { } -srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt) +void SrsRtcVideoRecvTrack::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) +{ + // No payload, ignore. + if (buf->empty()) { + return; + } + + uint8_t v = (uint8_t)pkt->nalu_type; + if (v == kStapA) { + *ppayload = new SrsRtpSTAPPayload(); + } else if (v == kFuA) { + *ppayload = new SrsRtpFUAPayload2(); + } else { + *ppayload = new SrsRtpRawPayload(); + } +} + +srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled) { srs_error_t err = srs_success; @@ -1893,7 +1933,7 @@ srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pk } // For NACK to handle packet. - if ((err = on_nack(pkt)) != srs_success) { + if (nack_enabled && (err = on_nack(pkt)) != srs_success) { return srs_error_wrap(err, "on nack"); } @@ -1904,6 +1944,8 @@ srs_error_t SrsRtcVideoRecvTrack::check_send_nacks() { srs_error_t err = srs_success; + ++_srs_pps_svnack->sugar; + uint32_t timeout_nacks = 0; if ((err = do_check_send_nacks(timeout_nacks)) != srs_success) { return srs_error_wrap(err, "video"); @@ -1989,11 +2031,39 @@ std::string SrsRtcSendTrack::get_track_id() return track_desc_->id_; } -void SrsRtcSendTrack::on_recv_nack() +srs_error_t SrsRtcSendTrack::on_recv_nack(const vector& lost_seqs, SrsRtcPlayStreamStatistic& info) { + srs_error_t err = srs_success; + + ++_srs_pps_rnack2->sugar; + SrsRtcTrackStatistic* statistic = statistic_; statistic->nacks++; + + vector resend_pkts; + for(int i = 0; i < (int)lost_seqs.size(); ++i) { + uint16_t seq = lost_seqs.at(i); + SrsRtpPacket2* pkt = fetch_rtp_packet(seq); + if (pkt == NULL) { + continue; + } + resend_pkts.push_back(pkt); + + info.nn_bytes += pkt->nb_bytes(); + uint32_t nn = 0; + if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) { + srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", pkt->header.get_sequence(), + pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes()); + } + } + + // By default, we send packets by sendmmsg. + if ((err = session_->do_send_packets(resend_pkts, info)) != srs_success) { + return srs_error_wrap(err, "raw send"); + } + + return err; } SrsRtcAudioSendTrack::SrsRtcAudioSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc) diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index bd6328aa7..6d2334083 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -36,6 +36,7 @@ #include #include #include +#include class SrsRequest; class SrsMetaCache; @@ -525,29 +526,33 @@ public: protected: srs_error_t on_nack(SrsRtpPacket2* pkt); public: - virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt) = 0; + virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled) = 0; virtual srs_error_t check_send_nacks() = 0; protected: virtual srs_error_t do_check_send_nacks(uint32_t& timeout_nacks); }; -class SrsRtcAudioRecvTrack : public SrsRtcRecvTrack +class SrsRtcAudioRecvTrack : virtual public SrsRtcRecvTrack, virtual public ISrsRtpPacketDecodeHandler { public: SrsRtcAudioRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); virtual ~SrsRtcAudioRecvTrack(); public: - virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt); + virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload); +public: + virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled); virtual srs_error_t check_send_nacks(); }; -class SrsRtcVideoRecvTrack : public SrsRtcRecvTrack +class SrsRtcVideoRecvTrack : virtual public SrsRtcRecvTrack, virtual public ISrsRtpPacketDecodeHandler { public: SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* stream_descs); virtual ~SrsRtcVideoRecvTrack(); public: - virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt); + virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload); +public: + virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled); virtual srs_error_t check_send_nacks(); }; @@ -577,7 +582,7 @@ public: public: virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info) = 0; virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt) = 0; - virtual void on_recv_nack(); + virtual srs_error_t on_recv_nack(const std::vector& lost_seqs, SrsRtcPlayStreamStatistic& info); }; class SrsRtcAudioSendTrack : public SrsRtcSendTrack diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index f2c2c314f..8d069161c 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -56,47 +56,6 @@ using namespace std; #include #include -// system interval in srs_utime_t, -// all resolution times should be times togother, -// for example, system-interval is x=1s(1000ms), -// then rusage can be 3*x, for instance, 3*1=3s, -// the meminfo canbe 6*x, for instance, 6*1=6s, -// for performance refine, @see: https://github.com/ossrs/srs/issues/194 -// @remark, recomment to 1000ms. -#define SRS_SYS_CYCLE_INTERVAL (1000 * SRS_UTIME_MILLISECONDS) - -// update time interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_TIME_RESOLUTION_MS_TIMES -#define SRS_SYS_TIME_RESOLUTION_MS_TIMES 1 - -// update rusage interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_RUSAGE_RESOLUTION_TIMES -#define SRS_SYS_RUSAGE_RESOLUTION_TIMES 3 - -// update network devices info interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES -#define SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES 3 - -// update rusage interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_CPU_STAT_RESOLUTION_TIMES -#define SRS_SYS_CPU_STAT_RESOLUTION_TIMES 3 - -// update the disk iops interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_DISK_STAT_RESOLUTION_TIMES -#define SRS_SYS_DISK_STAT_RESOLUTION_TIMES 6 - -// update rusage interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_MEMINFO_RESOLUTION_TIMES -#define SRS_SYS_MEMINFO_RESOLUTION_TIMES 6 - -// update platform info interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES -#define SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES 9 - -// update network devices info interval: -// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES -#define SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 9 - std::string srs_listener_type2string(SrsListenerType type) { switch (type) { @@ -728,6 +687,8 @@ SrsServer::SrsServer() http_server = new SrsHttpServer(this); http_heartbeat = new SrsHttpHeartbeat(); ingester = new SrsIngester(); + trd_ = new SrsSTCoroutine("srs", this, _srs_context->get_id()); + timer_ = NULL; } SrsServer::~SrsServer() @@ -738,7 +699,10 @@ SrsServer::~SrsServer() void SrsServer::destroy() { srs_warn("start destroy server"); - + + srs_freep(trd_); + srs_freep(timer_); + dispose(); srs_freep(http_api_mux); @@ -1101,6 +1065,25 @@ srs_error_t SrsServer::ingest() return err; } +srs_error_t SrsServer::start() +{ + srs_error_t err = srs_success; + + if ((err = _srs_sources->initialize()) != srs_success) { + return srs_error_wrap(err, "sources"); + } + + if ((err = trd_->start()) != srs_success) { + return srs_error_wrap(err, "start"); + } + + if ((err = setup_ticks()) != srs_success) { + return srs_error_wrap(err, "tick"); + } + + return err; +} + srs_error_t SrsServer::cycle() { srs_error_t err = srs_success; @@ -1151,7 +1134,6 @@ srs_error_t SrsServer::cycle() return err; } - void SrsServer::on_signal(int signo) { if (signo == SRS_SIGNAL_RELOAD) { @@ -1221,134 +1203,132 @@ srs_error_t SrsServer::do_cycle() { srs_error_t err = srs_success; - // find the max loop - int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES); - - max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES); - max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES); - // for asprocess. bool asprocess = _srs_config->get_asprocess(); - - // the daemon thread, update the time cache - // TODO: FIXME: use SrsHourGlass. + while (true) { + if ((err = trd_->pull()) != srs_success) { + return srs_error_wrap(err, "pull"); + } + if (handler && (err = handler->on_cycle()) != srs_success) { return srs_error_wrap(err, "handle callback"); } - - // the interval in config. - int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL); - - // dynamic fetch the max. - int dynamic_max = srs_max(max, heartbeat_max_resolution); - - for (int i = 0; i < dynamic_max; i++) { - srs_usleep(SRS_SYS_CYCLE_INTERVAL); - // asprocess check. - if (asprocess && ::getppid() != ppid) { - return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid()); - } - - // gracefully quit for SIGINT or SIGTERM or SIGQUIT. - if (signal_fast_quit || signal_gracefully_quit) { - srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit, signal_gracefully_quit); - return err; - } - - // for gperf heap checker, - // @see: research/gperftools/heap-checker/heap_checker.cc - // if user interrupt the program, exit to check mem leak. - // but, if gperf, use reload to ensure main return normally, - // because directly exit will cause core-dump. + // asprocess check. + if (asprocess && ::getppid() != ppid) { + return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid()); + } + + // gracefully quit for SIGINT or SIGTERM or SIGQUIT. + if (signal_fast_quit || signal_gracefully_quit) { + srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit, signal_gracefully_quit); + return err; + } + + // for gperf heap checker, + // @see: research/gperftools/heap-checker/heap_checker.cc + // if user interrupt the program, exit to check mem leak. + // but, if gperf, use reload to ensure main return normally, + // because directly exit will cause core-dump. #ifdef SRS_GPERF_MC - if (signal_gmc_stop) { - srs_warn("gmc got singal to stop server."); - return err; - } + if (signal_gmc_stop) { + srs_warn("gmc got singal to stop server."); + return err; + } #endif - - // do persistence config to file. - if (signal_persistence_config) { - signal_persistence_config = false; - srs_info("get signal to persistence config to file."); - - if ((err = _srs_config->persistence()) != srs_success) { - return srs_error_wrap(err, "config persistence to file"); - } - srs_trace("persistence config to file success."); - } - - // do reload the config. - if (signal_reload) { - signal_reload = false; - srs_info("get signal to reload the config."); - - if ((err = _srs_config->reload()) != srs_success) { - return srs_error_wrap(err, "config reload"); - } - srs_trace("reload config success."); - } - - // notice the stream sources to cycle. - if ((err = _srs_sources->cycle()) != srs_success) { - return srs_error_wrap(err, "source cycle"); - } - - // update the cache time - if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) { - srs_info("update current time cache."); - srs_update_system_time(); - } - - if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) { - srs_info("update resource info, rss."); - srs_update_system_rusage(); - } - if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) { - srs_info("update cpu info, cpu usage."); - srs_update_proc_stat(); - } - if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) { - srs_info("update disk info, disk iops."); - srs_update_disk_stat(); - } - if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) { - srs_info("update memory info, usage/free."); - srs_update_meminfo(); - } - if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) { - srs_info("update platform info, uptime/load."); - srs_update_platform_info(); - } - if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) { - srs_info("update network devices info."); - srs_update_network_devices(); - } - if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) { - srs_info("update network server kbps info."); - resample_kbps(); + + // do persistence config to file. + if (signal_persistence_config) { + signal_persistence_config = false; + srs_info("get signal to persistence config to file."); + + if ((err = _srs_config->persistence()) != srs_success) { + return srs_error_wrap(err, "config persistence to file"); } - if (_srs_config->get_heartbeat_enabled()) { - if ((i % heartbeat_max_resolution) == 0) { - srs_info("do http heartbeat, for internal server to report."); - http_heartbeat->heartbeat(); - } + srs_trace("persistence config to file success."); + } + + // do reload the config. + if (signal_reload) { + signal_reload = false; + srs_info("get signal to reload the config."); + + if ((err = _srs_config->reload()) != srs_success) { + return srs_error_wrap(err, "config reload"); } - - srs_info("server main thread loop"); + srs_trace("reload config success."); } + + srs_usleep(1 * SRS_UTIME_SECONDS); } return err; } +srs_error_t SrsServer::setup_ticks() +{ + srs_error_t err = srs_success; + + srs_freep(timer_); + timer_ = new SrsHourGlass("srs", this, 1 * SRS_UTIME_SECONDS); + + if ((err = timer_->tick(1, 1 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(2, 3 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(3, 3 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(4, 6 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(5, 6 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(6, 9 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(7, 9 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + if (_srs_config->get_heartbeat_enabled()) { + if ((err = timer_->tick(9, _srs_config->get_heartbeat_interval())) != srs_success) { + return srs_error_wrap(err, "tick"); + } + } + + if ((err = timer_->start()) != srs_success) { + return srs_error_wrap(err, "timer"); + } + + return err; +} + +srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + switch (event) { + case 1: srs_update_system_time(); break; + case 2: srs_update_system_rusage(); break; + case 3: srs_update_proc_stat(); break; + case 4: srs_update_disk_stat(); break; + case 5: srs_update_meminfo(); break; + case 6: srs_update_platform_info(); break; + case 7: srs_update_network_devices(); break; + case 8: resample_kbps(); break; + case 9: http_heartbeat->heartbeat(); break; + } + + return err; +} + srs_error_t SrsServer::listen_rtmp() { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 60a69d517..e7f562ee4 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -38,6 +38,7 @@ #include #include #include +#include class SrsServer; class SrsHttpServeMux; @@ -260,7 +261,9 @@ public: // TODO: FIXME: Rename to SrsLiveServer. // SRS RTMP server, initialize and listen, start connection service thread, destroy client. -class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler, virtual public ISrsResourceManager +class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler + , virtual public ISrsResourceManager, virtual public ISrsCoroutineHandler + , virtual public ISrsHourGlass { private: // TODO: FIXME: Extract an HttpApiServer. @@ -269,6 +272,8 @@ private: SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; SrsResourceManager* conn_manager; + SrsCoroutine* trd_; + SrsHourGlass* timer_; private: // The pid file fd, lock the file write when server is running. // @remark the init.d script should cleanup the pid file, when stop service, @@ -315,6 +320,9 @@ public: virtual srs_error_t register_signal(); virtual srs_error_t http_handle(); virtual srs_error_t ingest(); + virtual srs_error_t start(); +// interface ISrsCoroutineHandler +public: virtual srs_error_t cycle(); // server utilities. public: @@ -337,6 +345,11 @@ private: // update the global static data, for instance, the current time, // the cpu/mem/network statistic. virtual srs_error_t do_cycle(); +// interface ISrsHourGlass +private: + virtual srs_error_t setup_ticks(); + virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); +private: // listen at specified protocol. virtual srs_error_t listen_rtmp(); virtual srs_error_t listen_http_api(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 3d317eb2d..82b2177a1 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1689,6 +1689,7 @@ SrsSourceManager* _srs_sources = new SrsSourceManager(); SrsSourceManager::SrsSourceManager() { lock = NULL; + timer_ = NULL; } SrsSourceManager::~SrsSourceManager() @@ -1696,6 +1697,11 @@ SrsSourceManager::~SrsSourceManager() srs_mutex_destroy(lock); } +srs_error_t SrsSourceManager::initialize() +{ + return setup_ticks(); +} + srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) { srs_error_t err = srs_success; @@ -1789,28 +1795,37 @@ void SrsSourceManager::dispose() return; } -srs_error_t SrsSourceManager::cycle() +srs_error_t SrsSourceManager::setup_ticks() { - SrsContextId cid = _srs_context->get_id(); - srs_error_t err = do_cycle(); - _srs_context->set_id(cid); - + srs_error_t err = srs_success; + + srs_freep(timer_); + timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS); + + if ((err = timer_->tick(1, 1 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } + + if ((err = timer_->start()) != srs_success) { + return srs_error_wrap(err, "timer"); + } + return err; } -srs_error_t SrsSourceManager::do_cycle() +srs_error_t SrsSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick) { srs_error_t err = srs_success; - + std::map::iterator it; for (it = pool.begin(); it != pool.end();) { SrsSource* source = it->second; - + // Do cycle source to cleanup components, such as hls dispose. if ((err = source->cycle()) != srs_success) { return srs_error_wrap(err, "source=%s/%s cycle", source->source_id().c_str(), source->pre_source_id().c_str()); } - + // TODO: FIXME: support source cleanup. // @see https://github.com/ossrs/srs/issues/713 // @see https://github.com/ossrs/srs/issues/714 @@ -1825,7 +1840,7 @@ srs_error_t SrsSourceManager::do_cycle() _srs_context->set_id(cid); } srs_trace("cleanup die source, total=%d", (int)pool.size()); - + srs_freep(source); pool.erase(it++); } else { @@ -1835,7 +1850,7 @@ srs_error_t SrsSourceManager::do_cycle() ++it; #endif } - + return err; } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 4a58eb954..9f7740437 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -34,6 +34,7 @@ #include #include #include +#include class SrsFormat; class SrsRtmpFormat; @@ -450,15 +451,17 @@ public: }; // The source manager to create and refresh all stream sources. -class SrsSourceManager +class SrsSourceManager : public ISrsHourGlass { private: srs_mutex_t lock; std::map pool; + SrsHourGlass* timer_; public: SrsSourceManager(); virtual ~SrsSourceManager(); public: + virtual srs_error_t initialize(); // create source when fetch from cache failed. // @param r the client request. // @param h the event handler for source. @@ -471,9 +474,10 @@ private: public: // dispose and cycle all sources. virtual void dispose(); - virtual srs_error_t cycle(); +// interface ISrsHourGlass private: - virtual srs_error_t do_cycle(); + virtual srs_error_t setup_ticks(); + virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); public: // when system exit, destroy th`e sources, // For gmc to analysis mem leaks. diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 1c994c88a..f5ff1fd88 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -24,6 +24,6 @@ #ifndef SRS_CORE_VERSION4_HPP #define SRS_CORE_VERSION4_HPP -#define SRS_VERSION4_REVISION 62 +#define SRS_VERSION4_REVISION 71 #endif diff --git a/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp index 4e0f98eee..fb91a5bc7 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtcp.cpp @@ -1212,6 +1212,11 @@ vector SrsRtcpNack::get_lost_sns() const return sn; } +bool SrsRtcpNack::empty() +{ + return lost_sns_.empty(); +} + void SrsRtcpNack::set_media_ssrc(uint32_t ssrc) { media_ssrc_ = ssrc; diff --git a/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp index 56c30912a..279a08e27 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtcp.hpp @@ -345,6 +345,7 @@ public: uint32_t get_media_ssrc() const; std::vector get_lost_sns() const; + bool empty(); void set_media_ssrc(uint32_t ssrc); void add_lost_sn(uint16_t sn); diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index 9396d867c..a542d59b4 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -23,6 +23,7 @@ #include +#include #include #include using namespace std; @@ -33,6 +34,106 @@ using namespace std; #include #include +/* @see https://tools.ietf.org/html/rfc1889#section-5.1 + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P|X| CC |M| PT | sequence number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | timestamp | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | synchronization source (SSRC) identifier | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | contributing source (CSRC) identifiers | + | .... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ +uint32_t srs_rtp_fast_parse_ssrc(char* buf, int size) +{ + if (size < 12) { + return 0; + } + + uint32_t value = 0; + char* pp = (char*)&value; + + char* p = buf + 8; + pp[3] = *p++; + pp[2] = *p++; + pp[1] = *p++; + pp[0] = *p++; + return value; +} +uint8_t srs_rtp_fast_parse_pt(char* buf, int size) +{ + if (size < 12) { + return 0; + } + return buf[1] & 0x7f; +} +srs_error_t srs_rtp_fast_parse_twcc(char* buf, int size, SrsRtpExtensionTypes* ext_types, uint16_t& twcc_sn) +{ + srs_error_t err = srs_success; + + int need_size = 12 /*rtp head fix len*/ + 4 /* extension header len*/ + 3 /* twcc extension len*/; + if(size < (need_size)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "required %d bytes, actual %d", need_size, size); + } + + uint8_t first = buf[0]; + bool extension = (first & 0x10); + uint8_t cc = (first & 0x0F); + + if(!extension) { + return srs_error_new(ERROR_RTC_RTP, "no extension in rtp"); + } + + need_size += cc * 4; // csrc size + if(size < (need_size)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "required %d bytes, actual %d", need_size, size); + } + buf += 12 + 4*cc; + + uint16_t value = *((uint16_t*)buf); + value = ntohs(value); + if(0xBEDE != value) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "no support this type(0x%02x) extension", value); + } + buf += 2; + + uint16_t extension_length = ntohs(*((uint16_t*)buf)); + buf += 2; + extension_length *= 4; + need_size += extension_length; // entension size + if(size < (need_size)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "required %d bytes, actual %d", need_size, size); + } + + while(extension_length > 0) { + uint8_t v = buf[0]; + buf++; + extension_length--; + if(0 == v) { + continue; + } + + uint8_t id = (v & 0xF0) >>4; + uint8_t len = (v & 0x0F) + 1; + + SrsRtpExtensionType xtype = ext_types->get_type(id); + if(xtype == kRtpExtensionTransportSequenceNumber) { + twcc_sn = ntohs(*((uint16_t*)buf)); + return err; + } else { + buf += len; + extension_length -= len; + } + } + + + return err; +} + // If value is newer than pre_value,return true; otherwise false bool srs_seq_is_newer(uint16_t value, uint16_t pre_value) { @@ -179,6 +280,58 @@ void SrsRtpExtensionTwcc::set_sn(uint16_t sn) has_twcc_ = true; } +SrsRtpExtensionOneByte::SrsRtpExtensionOneByte() : has_ext_(false), id_(0), value_(0) +{ +} + +void SrsRtpExtensionOneByte::set_id(int id) +{ + id_ = id; + has_ext_ = true; +} + +void SrsRtpExtensionOneByte::set_value(uint8_t value) +{ + value_ = value; + has_ext_ = true; +} + +srs_error_t SrsRtpExtensionOneByte::decode(SrsBuffer* buf) +{ + srs_error_t err = srs_success; + + if (!buf->require(2)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2); + } + uint8_t v = buf->read_1bytes(); + + id_ = (v & 0xF0) >> 4; + uint8_t len = (v & 0x0F); + if(!id_ || len != 0) { + return srs_error_new(ERROR_RTC_RTP, "invalid rtp extension id=%d, len=%d", id_, len); + } + + value_ = buf->read_1bytes(); + + has_ext_ = true; + return err; +} + +srs_error_t SrsRtpExtensionOneByte::encode(SrsBuffer* buf) +{ + srs_error_t err = srs_success; + + if (!buf->require(2)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2); + } + + uint8_t id_len = (id_ & 0x0F)<< 4 | 0x00; + buf->write_1bytes(id_len); + buf->write_1bytes(value_); + + return err; +} + SrsRtpExtensions::SrsRtpExtensions() : has_ext_(false) { } @@ -252,6 +405,11 @@ srs_error_t SrsRtpExtensions::decode_0xbede(SrsBuffer* buf) return srs_error_wrap(err, "decode twcc extension"); } has_ext_ = true; + } else if (xtype == kRtpExtensionAudioLevel) { + if((err = audio_level_.decode(buf)) != srs_success) { + return srs_error_wrap(err, "decode audio level extension"); + } + has_ext_ = true; } else { buf->skip(1 + (len + 1)); } @@ -263,6 +421,7 @@ srs_error_t SrsRtpExtensions::decode_0xbede(SrsBuffer* buf) uint64_t SrsRtpExtensions::nb_bytes() { int size = 4 + (twcc_.has_twcc_ext() ? twcc_.nb_bytes() : 0); + size += (audio_level_.exists() ? audio_level_.nb_bytes() : 0); // add padding size += (size % 4 == 0) ? 0 : (4 - size % 4); return size; @@ -281,6 +440,10 @@ srs_error_t SrsRtpExtensions::encode(SrsBuffer* buf) len += twcc_.nb_bytes(); } + if (audio_level_.exists()) { + len += audio_level_.nb_bytes(); + } + int padding_count = (len % 4 == 0) ? 0 : (4 - len % 4); len += padding_count; @@ -293,6 +456,12 @@ srs_error_t SrsRtpExtensions::encode(SrsBuffer* buf) } } + if (audio_level_.exists()) { + if (srs_success != (err = audio_level_.encode(buf))) { + return srs_error_wrap(err, "encode audio level extension"); + } + } + // add padding while(padding_count > 0) { buf->write_1bytes(0); @@ -331,6 +500,23 @@ srs_error_t SrsRtpExtensions::set_twcc_sequence_number(uint8_t id, uint16_t sn) return srs_success; } +srs_error_t SrsRtpExtensions::get_audio_level(uint8_t& level) +{ + if(audio_level_.exists()) { + level = audio_level_.get_value(); + return srs_success; + } + return srs_error_new(ERROR_RTC_RTP_MUXER, "not find rtp extension audio level"); +} + +srs_error_t SrsRtpExtensions::set_audio_level(int id, uint8_t level) +{ + has_ext_ = true; + audio_level_.set_id(id); + audio_level_.set_value(level); + return srs_success; +} + SrsRtpHeader::SrsRtpHeader() { padding_length = 0; diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index 76a39fbc5..741a4fafb 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -57,6 +57,12 @@ class SrsBuffer; class SrsRtpRawPayload; class SrsRtpFUAPayload2; class SrsSharedPtrMessage; +class SrsRtpExtensionTypes; + +// Fast parse the SSRC from RTP packet. Return 0 if invalid. +uint32_t srs_rtp_fast_parse_ssrc(char* buf, int size); +uint8_t srs_rtp_fast_parse_pt(char* buf, int size); +srs_error_t srs_rtp_fast_parse_twcc(char* buf, int size, SrsRtpExtensionTypes* types, uint16_t& twcc_sn); // The "distance" between two uint16 number, for example: // distance(prev_value=3, value=5) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)5) === -2 @@ -86,9 +92,12 @@ enum SrsRtpExtensionType { kRtpExtensionNone, kRtpExtensionTransportSequenceNumber, + kRtpExtensionAudioLevel, kRtpExtensionNumberOfExtensions // Must be the last entity in the enum. }; +const std::string kAudioLevelUri = "urn:ietf:params:rtp-hdrext:ssrc-audio-level"; + struct SrsExtensionInfo { SrsRtpExtensionType type; @@ -96,7 +105,8 @@ struct SrsExtensionInfo }; const SrsExtensionInfo kExtensions[] = { - {kRtpExtensionTransportSequenceNumber, std::string("http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01")} + {kRtpExtensionTransportSequenceNumber, std::string("http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01")}, + {kRtpExtensionAudioLevel, kAudioLevelUri}, }; class SrsRtpExtensionTypes @@ -116,7 +126,8 @@ private: uint8_t ids_[kRtpExtensionNumberOfExtensions]; }; -class SrsRtpExtensionTwcc : public ISrsCodec +// Note that the extensions should never extends from any class, for performance. +class SrsRtpExtensionTwcc// : public ISrsCodec { bool has_twcc_; uint8_t id_; @@ -138,12 +149,36 @@ public: virtual uint64_t nb_bytes(); }; -class SrsRtpExtensions : public ISrsCodec +// Note that the extensions should never extends from any class, for performance. +class SrsRtpExtensionOneByte// : public ISrsCodec +{ + bool has_ext_; + int id_; + uint8_t value_; +public: + SrsRtpExtensionOneByte(); + virtual ~SrsRtpExtensionOneByte() {} + + bool exists() { return has_ext_; } + int get_id() { return id_; } + uint8_t get_value() { return value_; } + void set_id(int id); + void set_value(uint8_t value); +public: + // ISrsCodec + virtual srs_error_t decode(SrsBuffer* buf); + virtual srs_error_t encode(SrsBuffer* buf); + virtual uint64_t nb_bytes() { return 2; }; +}; + +// Note that the extensions should never extends from any class, for performance. +class SrsRtpExtensions// : public ISrsCodec { private: bool has_ext_; SrsRtpExtensionTypes types_; SrsRtpExtensionTwcc twcc_; + SrsRtpExtensionOneByte audio_level_; public: SrsRtpExtensions(); virtual ~SrsRtpExtensions(); @@ -152,6 +187,8 @@ public: void set_types_(const SrsRtpExtensionTypes* types); srs_error_t get_twcc_sequence_number(uint16_t& twcc_sn); srs_error_t set_twcc_sequence_number(uint8_t id, uint16_t sn); + srs_error_t get_audio_level(uint8_t& level); + srs_error_t set_audio_level(int id, uint8_t level); // ISrsCodec public: @@ -163,7 +200,8 @@ public: virtual uint64_t nb_bytes(); }; -class SrsRtpHeader : public ISrsCodec +// Note that the header should never extends from any class, for performance. +class SrsRtpHeader// : public ISrsCodec { private: uint8_t padding_length; @@ -257,7 +295,7 @@ public: // Whether the packet is Audio packet. bool is_audio(); // Copy the RTP packet. - SrsRtpPacket2* copy(); + virtual SrsRtpPacket2* copy(); // Set RTP header extensions for encoding or decoding header extension void set_extension_types(const SrsRtpExtensionTypes* v); // interface ISrsEncoder diff --git a/trunk/src/protocol/srs_protocol_kbps.cpp b/trunk/src/protocol/srs_protocol_kbps.cpp index 11f119a53..712e684ad 100644 --- a/trunk/src/protocol/srs_protocol_kbps.cpp +++ b/trunk/src/protocol/srs_protocol_kbps.cpp @@ -25,24 +25,83 @@ #include -SrsKbpsSample::SrsKbpsSample() +SrsRateSample::SrsRateSample() { - bytes = time = -1; - kbps = 0; + total = time = -1; + rate = 0; } -SrsKbpsSample::~SrsKbpsSample() +SrsRateSample::~SrsRateSample() { } -SrsKbpsSample* SrsKbpsSample::update(int64_t b, srs_utime_t t, int k) +SrsRateSample* SrsRateSample::update(int64_t nn, srs_utime_t t, int k) { - bytes = b; + total = nn; time = t; - kbps = k; + rate = k; return this; } +SrsPps::SrsPps(SrsWallClock* c) +{ + clk_ = c; + sugar = 0; +} + +SrsPps::~SrsPps() +{ +} + +void SrsPps::update() +{ + update(sugar); +} + +void SrsPps::update(int64_t nn) +{ + srs_utime_t now = clk_->now(); + + if (sample_30s_.time < 0) { + sample_30s_.update(nn, now, 0); + } + if (sample_1m_.time < 0) { + sample_1m_.update(nn, now, 0); + } + if (sample_5m_.time < 0) { + sample_5m_.update(nn, now, 0); + } + if (sample_60m_.time < 0) { + sample_60m_.update(nn, now, 0); + } + + if (now - sample_10s_.time >= 10 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_10s_.total) * 1000 / srsu2ms(now - sample_10s_.time)); + sample_10s_.update(nn, now, kps); + } + if (now - sample_30s_.time >= 30 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_30s_.total) * 1000 / srsu2ms(now - sample_30s_.time)); + sample_30s_.update(nn, now, kps); + } + if (now - sample_1m_.time >= 60 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_1m_.total) * 1000 / srsu2ms(now - sample_1m_.time)); + sample_1m_.update(nn, now, kps); + } + if (now - sample_5m_.time >= 300 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_5m_.total) * 1000 / srsu2ms(now - sample_5m_.time)); + sample_5m_.update(nn, now, kps); + } + if (now - sample_60m_.time >= 3600 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_60m_.total) * 1000 / srsu2ms(now - sample_60m_.time)); + sample_60m_.update(nn, now, kps); + } +} + +int SrsPps::r10s() +{ + return sample_10s_.rate; +} + SrsKbpsSlice::SrsKbpsSlice(SrsWallClock* c) { clk = c; @@ -78,19 +137,19 @@ void SrsKbpsSlice::sample() } if (now - sample_30s.time >= 30 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_30s.bytes) * 8 / srsu2ms(now - sample_30s.time)); + int kbps = (int)((total_bytes - sample_30s.total) * 8 / srsu2ms(now - sample_30s.time)); sample_30s.update(total_bytes, now, kbps); } if (now - sample_1m.time >= 60 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_1m.bytes) * 8 / srsu2ms(now - sample_1m.time)); + int kbps = (int)((total_bytes - sample_1m.total) * 8 / srsu2ms(now - sample_1m.time)); sample_1m.update(total_bytes, now, kbps); } if (now - sample_5m.time >= 300 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_5m.bytes) * 8 / srsu2ms(now - sample_5m.time)); + int kbps = (int)((total_bytes - sample_5m.total) * 8 / srsu2ms(now - sample_5m.time)); sample_5m.update(total_bytes, now, kbps); } if (now - sample_60m.time >= 3600 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_60m.bytes) * 8 / srsu2ms(now - sample_60m.time)); + int kbps = (int)((total_bytes - sample_60m.total) * 8 / srsu2ms(now - sample_60m.time)); sample_60m.update(total_bytes, now, kbps); } } @@ -188,22 +247,22 @@ int SrsKbps::get_recv_kbps() int SrsKbps::get_send_kbps_30s() { - return os.sample_30s.kbps; + return os.sample_30s.rate; } int SrsKbps::get_recv_kbps_30s() { - return is.sample_30s.kbps; + return is.sample_30s.rate; } int SrsKbps::get_send_kbps_5m() { - return os.sample_5m.kbps; + return os.sample_5m.rate; } int SrsKbps::get_recv_kbps_5m() { - return is.sample_5m.kbps; + return is.sample_5m.rate; } void SrsKbps::add_delta(int64_t in, int64_t out) @@ -297,3 +356,5 @@ int SrsKbps::size_memory() return sizeof(SrsKbps); } +SrsWallClock* _srs_clock = new SrsWallClock(); + diff --git a/trunk/src/protocol/srs_protocol_kbps.hpp b/trunk/src/protocol/srs_protocol_kbps.hpp index 22c3d032b..014038e85 100644 --- a/trunk/src/protocol/srs_protocol_kbps.hpp +++ b/trunk/src/protocol/srs_protocol_kbps.hpp @@ -30,21 +30,46 @@ class SrsWallClock; -/** - * a kbps sample, for example, the kbps at time, - * 10minute kbps sample. - */ -class SrsKbpsSample +// A sample for rate-based stat, such as kbps or kps. +class SrsRateSample { public: - int64_t bytes; + int64_t total; srs_utime_t time; - int kbps; + // kbps or kps + int rate; public: - SrsKbpsSample(); - virtual ~SrsKbpsSample(); + SrsRateSample(); + virtual ~SrsRateSample(); public: - virtual SrsKbpsSample* update(int64_t b, srs_utime_t t, int k); + virtual SrsRateSample* update(int64_t nn, srs_utime_t t, int k); +}; + +// A pps manager every some duration. +class SrsPps +{ +private: + SrsWallClock* clk_; +private: + // samples + SrsRateSample sample_10s_; + SrsRateSample sample_30s_; + SrsRateSample sample_1m_; + SrsRateSample sample_5m_; + SrsRateSample sample_60m_; +public: + // Sugar for target to stat. + int64_t sugar; +public: + SrsPps(SrsWallClock* clk); + virtual ~SrsPps(); +public: + // Update with the nn which is target. + void update(); + // Update with the nn. + void update(int64_t nn); + // Get the 10s average stat. + int r10s(); }; /** @@ -82,10 +107,10 @@ public: // cache for io maybe freed. int64_t last_bytes; // samples - SrsKbpsSample sample_30s; - SrsKbpsSample sample_1m; - SrsKbpsSample sample_5m; - SrsKbpsSample sample_60m; + SrsRateSample sample_30s; + SrsRateSample sample_1m; + SrsRateSample sample_5m; + SrsRateSample sample_60m; public: // for the delta bytes. int64_t delta_bytes; @@ -233,4 +258,7 @@ public: virtual int size_memory(); }; +// The global clock. +extern SrsWallClock* _srs_clock; + #endif diff --git a/trunk/src/protocol/srs_service_st.cpp b/trunk/src/protocol/srs_service_st.cpp index 0e2c4f586..99cf856af 100644 --- a/trunk/src/protocol/srs_service_st.cpp +++ b/trunk/src/protocol/srs_service_st.cpp @@ -151,6 +151,11 @@ srs_thread_t srs_thread_self() return (srs_thread_t)st_thread_self(); } +void srs_thread_exit(void* retval) +{ + st_thread_exit(retval); +} + srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd) { st_utime_t timeout = ST_UTIME_NO_TIMEOUT; diff --git a/trunk/src/protocol/srs_service_st.hpp b/trunk/src/protocol/srs_service_st.hpp index ed86737d6..263bb5104 100644 --- a/trunk/src/protocol/srs_service_st.hpp +++ b/trunk/src/protocol/srs_service_st.hpp @@ -57,6 +57,7 @@ extern srs_error_t srs_fd_keepalive(int fd); // Get current coroutine/thread. extern srs_thread_t srs_thread_self(); +extern void srs_thread_exit(void* retval); // For client, to open socket and connect to server. // @param tm The timeout in srs_utime_t. diff --git a/trunk/src/utest/srs_utest_app.cpp b/trunk/src/utest/srs_utest_app.cpp index 010ed4272..a0ad704f7 100644 --- a/trunk/src/utest/srs_utest_app.cpp +++ b/trunk/src/utest/srs_utest_app.cpp @@ -30,6 +30,119 @@ using namespace std; #include #include +#include +#include + +class MockIDResource : public ISrsResource +{ +public: + int id; + MockIDResource(int v) { + id = v; + } + virtual ~MockIDResource() { + } + virtual const SrsContextId& get_id() { + return _srs_context->get_id(); + } + virtual std::string desc() { + return ""; + } +}; + +VOID TEST(AppResourceManagerTest, FindByFastID) +{ + srs_error_t err = srs_success; + + if (true) { + SrsResourceManager m("test"); + HELPER_EXPECT_SUCCESS(m.start()); + + m.add_with_fast_id(101, new MockIDResource(1)); + m.add_with_fast_id(102, new MockIDResource(2)); + m.add_with_fast_id(103, new MockIDResource(3)); + EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(101))->id); + EXPECT_EQ(2, ((MockIDResource*)m.find_by_fast_id(102))->id); + EXPECT_EQ(3, ((MockIDResource*)m.find_by_fast_id(103))->id); + } + + if (true) { + SrsResourceManager m("test"); + HELPER_EXPECT_SUCCESS(m.start()); + + MockIDResource* r1 = new MockIDResource(1); + MockIDResource* r2 = new MockIDResource(2); + MockIDResource* r3 = new MockIDResource(3); + m.add_with_fast_id(101, r1); + m.add_with_fast_id(102, r2); + m.add_with_fast_id(103, r3); + EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(101))->id); + EXPECT_EQ(2, ((MockIDResource*)m.find_by_fast_id(102))->id); + EXPECT_EQ(3, ((MockIDResource*)m.find_by_fast_id(103))->id); + + m.remove(r2); srs_usleep(0); + EXPECT_TRUE(m.find_by_fast_id(102) == NULL); + } + + if (true) { + SrsResourceManager m("test"); + HELPER_EXPECT_SUCCESS(m.start()); + + MockIDResource* r1 = new MockIDResource(1); + MockIDResource* r2 = new MockIDResource(2); + MockIDResource* r3 = new MockIDResource(3); + m.add_with_fast_id(1, r1); + m.add_with_fast_id(100001, r2); + m.add_with_fast_id(1000001, r3); + EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(1))->id); + EXPECT_EQ(2, ((MockIDResource*)m.find_by_fast_id(100001))->id); + EXPECT_EQ(3, ((MockIDResource*)m.find_by_fast_id(1000001))->id); + + m.remove(r2); srs_usleep(0); + EXPECT_TRUE(m.find_by_fast_id(100001) == NULL); + + m.remove(r3); srs_usleep(0); + EXPECT_TRUE(m.find_by_fast_id(1000001) == NULL); + + m.remove(r1); srs_usleep(0); + EXPECT_TRUE(m.find_by_fast_id(1) == NULL); + } + + if (true) { + SrsResourceManager m("test"); + HELPER_EXPECT_SUCCESS(m.start()); + + m.add_with_fast_id(101, new MockIDResource(1)); + m.add_with_fast_id(10101, new MockIDResource(2)); + m.add_with_fast_id(1010101, new MockIDResource(3)); + m.add_with_fast_id(101010101, new MockIDResource(4)); + m.add_with_fast_id(10101010101LL, new MockIDResource(5)); + m.add_with_fast_id(1010101010101LL, new MockIDResource(6)); + m.add_with_fast_id(101010101010101LL, new MockIDResource(7)); + m.add_with_fast_id(10101010101010101LL, new MockIDResource(8)); + m.add_with_fast_id(1010101010101010101ULL, new MockIDResource(9)); + m.add_with_fast_id(11010101010101010101ULL, new MockIDResource(10)); + EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(101))->id); + EXPECT_EQ(2, ((MockIDResource*)m.find_by_fast_id(10101))->id); + EXPECT_EQ(3, ((MockIDResource*)m.find_by_fast_id(1010101))->id); + EXPECT_EQ(4, ((MockIDResource*)m.find_by_fast_id(101010101))->id); + EXPECT_EQ(5, ((MockIDResource*)m.find_by_fast_id(10101010101LL))->id); + EXPECT_EQ(6, ((MockIDResource*)m.find_by_fast_id(1010101010101LL))->id); + EXPECT_EQ(7, ((MockIDResource*)m.find_by_fast_id(101010101010101LL))->id); + EXPECT_EQ(8, ((MockIDResource*)m.find_by_fast_id(10101010101010101LL))->id); + EXPECT_EQ(9, ((MockIDResource*)m.find_by_fast_id(1010101010101010101ULL))->id); + EXPECT_EQ(10, ((MockIDResource*)m.find_by_fast_id(11010101010101010101ULL))->id); + } + + if (true) { + SrsResourceManager m("test"); + HELPER_EXPECT_SUCCESS(m.start()); + + m.add_with_fast_id(101, new MockIDResource(1)); + m.add_with_fast_id(101, new MockIDResource(4)); + EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(101))->id); + } +} VOID TEST(AppCoroutineTest, Dummy) { diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index ec6bef58f..6f75d5eb9 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -1973,7 +1973,7 @@ VOID TEST(ConfigUnitTest, CheckDefaultValuesVhost) if (true) { HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF)); - EXPECT_EQ(srs_utime_t(9.9 * SRS_UTIME_SECONDS), conf.get_heartbeat_interval()); + EXPECT_EQ(srs_utime_t(10 * SRS_UTIME_SECONDS), conf.get_heartbeat_interval()); HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "heartbeat{interval 10;}")); EXPECT_EQ(10 * SRS_UTIME_SECONDS, conf.get_heartbeat_interval());