From be951b17f19cfe587b4147f89ba5f953a7e05cb2 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 22 Jul 2020 18:20:21 +0800 Subject: [PATCH] RTC: Refine code --- trunk/src/app/srs_app_listener.hpp | 1 + trunk/src/app/srs_app_rtc_conn.cpp | 99 +++++++--------------------- trunk/src/app/srs_app_rtc_conn.hpp | 11 ++-- trunk/src/app/srs_app_rtc_server.cpp | 66 ++++++++++++++++++- trunk/src/app/srs_app_rtc_server.hpp | 24 +++++++ 5 files changed, 120 insertions(+), 81 deletions(-) diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index f6c1777c1..19f7ef2af 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -133,6 +133,7 @@ public: virtual srs_error_t cycle(); }; +// TODO: FIXME: Rename it. Refine it for performance issue. class SrsUdpMuxSocket { private: diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 14a6ce7a4..2aff617d9 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -93,10 +93,8 @@ srs_error_t SrsSecurityTransport::write_dtls_data(void* data, int size) } } - if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) { - // Ignore any error for black-hole. - void* p = data; int len = size; SrsRtcConnection* s = session_; - srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(data, size); } return err; @@ -929,10 +927,8 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) return err; } - if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) { - // Ignore any error for black-hole. - void* p = unprotected_buf; int len = nb_unprotected_buf; SrsRtcConnection* s = session_; - srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); } char* buf = unprotected_buf; @@ -1481,9 +1477,6 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, SrsContextId context_id) sessionStunTimeout = 0; disposing_ = false; - blackhole = false; - blackhole_addr = NULL; - blackhole_stfd = NULL; twcc_id_ = 0; nn_simulate_player_nack_drop = 0; } @@ -1494,8 +1487,6 @@ SrsRtcConnection::~SrsRtcConnection() srs_freep(publisher_); srs_freep(transport_); srs_freep(req); - srs_close_stfd(blackhole_stfd); - srs_freep(blackhole_addr); srs_freep(sendonly_skt); } @@ -1540,11 +1531,6 @@ string SrsRtcConnection::peer_id() return peer_id_; } -void SrsRtcConnection::set_peer_id(string v) -{ - peer_id_ = v; -} - string SrsRtcConnection::username() { return username_; @@ -1572,7 +1558,7 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRequest* req, const SrsSdp& remot SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription(); SrsAutoFree(SrsRtcStreamDescription, stream_desc); if ((err = negotiate_publish_capability(req, remote_sdp, stream_desc)) != srs_success) { - return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); + return srs_error_wrap(err, "publish negotiate"); } if ((err = generate_publish_local_sdp(req, local_sdp, stream_desc)) != srs_success) { @@ -1593,16 +1579,17 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRequest* req, const SrsSdp& remot return err; } +// TODO: FIXME: Error when play before publishing. srs_error_t SrsRtcConnection::add_player(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp) { srs_error_t err = srs_success; std::map play_sub_relations; if ((err = negotiate_play_capability(req, remote_sdp, play_sub_relations)) != srs_success) { - return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); + return srs_error_wrap(err, "play negotiate"); } if (!play_sub_relations.size()) { - return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "cannot negotiate sub relations"); + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no play relations"); } SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription(); @@ -1638,11 +1625,11 @@ srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, SrsSdp& local_sdp) std::map play_sub_relations; if ((err = fetch_source_capability(req, play_sub_relations)) != srs_success) { - return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); + return srs_error_wrap(err, "play negotiate"); } if (!play_sub_relations.size()) { - return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "cannot negotiate sub relations"); + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no play relations"); } SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription(); @@ -1691,30 +1678,8 @@ srs_error_t SrsRtcConnection::initialize(SrsRtcStream* source, SrsRequest* r, bo sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req->vhost); last_stun_time = srs_get_system_time(); - blackhole = _srs_config->get_rtc_server_black_hole(); - - srs_trace("RTC init session, DTLS(role=%s, version=%s), timeout=%dms, blackhole=%d", - cfg->dtls_role.c_str(), cfg->dtls_version.c_str(), srsu2msi(sessionStunTimeout), blackhole); - - if (blackhole) { - string blackhole_ep = _srs_config->get_rtc_server_black_hole_addr(); - if (!blackhole_ep.empty()) { - string host; int port; - srs_parse_hostport(blackhole_ep, host, port); - - srs_freep(blackhole_addr); - blackhole_addr = new sockaddr_in(); - blackhole_addr->sin_family = AF_INET; - blackhole_addr->sin_addr.s_addr = inet_addr(host.c_str()); - blackhole_addr->sin_port = htons(port); - - int fd = socket(AF_INET, SOCK_DGRAM, 0); - blackhole_stfd = srs_netfd_open_socket(fd); - srs_assert(blackhole_stfd); - - srs_trace("RTC blackhole %s:%d, fd=%d", host.c_str(), port, fd); - } - } + srs_trace("RTC init session, DTLS(role=%s, version=%s), timeout=%dms", + cfg->dtls_role.c_str(), cfg->dtls_version.c_str(), srsu2msi(sessionStunTimeout)); return err; } @@ -1736,10 +1701,8 @@ srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) } // Write STUN messages to blackhole. - if (blackhole && blackhole_addr && blackhole_stfd) { - // Ignore any error for black-hole. - void* p = skt->data(); int len = skt->size(); - srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(skt->data(), skt->size()); } if ((err = on_binding_request(r)) != srs_success) { @@ -1768,10 +1731,8 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data) return srs_error_wrap(err, "rtcp unprotect failed"); } - if (blackhole && blackhole_addr && blackhole_stfd) { - // Ignore any error for black-hole. - void* p = unprotected_buf; int len = nb_unprotected_buf; - srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); } if (player_) { @@ -1866,7 +1827,8 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt) sendonly_skt = skt->copy_sendonly(); // Update the sessions to handle packets from the new address. - server_->insert_into_id_sessions(sendonly_skt->peer_id().c_str(), this); + peer_id_ = sendonly_skt->peer_id(); + server_->insert_into_id_sessions(peer_id_, this); // Remove the old address. if (!old_peer_id.empty()) { @@ -1905,10 +1867,8 @@ void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ss stream.write_2bytes(pid); stream.write_2bytes(blp); - if (blackhole && blackhole_addr && blackhole_stfd) { - // Ignore any error for black-hole. - void* p = stream.data(); int len = stream.pos(); - srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(stream.data(), stream.pos()); } char protected_buf[kRtpPacketSize]; @@ -2059,10 +2019,8 @@ srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc) srs_trace("RTC PLI ssrc=%u", ssrc); - if (blackhole && blackhole_addr && blackhole_stfd) { - // Ignore any error for black-hole. - void* p = stream.data(); int len = stream.pos(); - srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(stream.data(), stream.pos()); } char protected_buf[kRtpPacketSize]; @@ -2194,22 +2152,15 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) if (state_ == WAITING_STUN) { state_ = DOING_DTLS_HANDSHAKE; - - peer_id_ = sendonly_skt->peer_id(); - server_->insert_into_id_sessions(peer_id_, this); - - state_ = DOING_DTLS_HANDSHAKE; - srs_trace("RTC session=%s, STUN done, waitting DTLS handshake.", id().c_str()); + srs_trace("RTC session=%s, STUN done, waiting DTLS handshake.", id().c_str()); if((err = transport_->start_active_handshake()) != srs_success) { return srs_error_wrap(err, "fail to dtls handshake"); } } - if (blackhole && blackhole_addr && blackhole_stfd) { - // Ignore any error for black-hole. - void* p = stream->data(); int len = stream->pos(); - srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(stream->data(), stream->pos()); } return err; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 261af68ad..7da0001d0 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -327,10 +327,6 @@ public: std::string sequence_startup; std::string sequence_delta; std::string sequence_keep; -private: - bool blackhole; - sockaddr_in* blackhole_addr; - srs_netfd_t blackhole_stfd; private: // twcc handler int twcc_id_; @@ -345,15 +341,20 @@ public: void set_local_sdp(const SrsSdp& sdp); SrsSdp* get_remote_sdp(); void set_remote_sdp(const SrsSdp& sdp); + // Connection level state machine, for ARQ of UDP packets. SrsRtcConnectionStateType state(); void set_state(SrsRtcConnectionStateType state); + // TODO: FIXME: Rename it. std::string id(); + // TODO: FIXME: Rename it. std::string peer_id(); - void set_peer_id(std::string v); + // TODO: FIXME: Rename it. std::string username(); +public: void set_encrypt(bool v); void switch_to_context(); SrsContextId context_id(); +public: srs_error_t add_publisher(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp); srs_error_t add_player(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp); // server send offer sdp to client, local sdp derivate from source stream desc. diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 9b2278445..218231af9 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -23,6 +23,8 @@ #include +using namespace std; + #include #include #include @@ -42,11 +44,67 @@ #include #include +SrsRtcBlackhole::SrsRtcBlackhole() +{ + blackhole = false; + blackhole_addr = NULL; + blackhole_stfd = NULL; +} + +SrsRtcBlackhole::~SrsRtcBlackhole() +{ + srs_close_stfd(blackhole_stfd); + srs_freep(blackhole_addr); +} + +srs_error_t SrsRtcBlackhole::initialize() +{ + srs_error_t err = srs_success; + + blackhole = _srs_config->get_rtc_server_black_hole(); + if (!blackhole) { + return err; + } + + string blackhole_ep = _srs_config->get_rtc_server_black_hole_addr(); + if (blackhole_ep.empty()) { + blackhole = false; + srs_warn("disable black hole for no endpoint"); + return err; + } + + string host; int port; + srs_parse_hostport(blackhole_ep, host, port); + + srs_freep(blackhole_addr); + blackhole_addr = new sockaddr_in(); + blackhole_addr->sin_family = AF_INET; + blackhole_addr->sin_addr.s_addr = inet_addr(host.c_str()); + blackhole_addr->sin_port = htons(port); + + int fd = socket(AF_INET, SOCK_DGRAM, 0); + blackhole_stfd = srs_netfd_open_socket(fd); + srs_assert(blackhole_stfd); + + srs_trace("RTC blackhole %s:%d, fd=%d", host.c_str(), port, fd); + + return err; +} + +void SrsRtcBlackhole::sendto(void* data, int len) +{ + if (!blackhole) { + return; + } + + srs_sendto(blackhole_stfd, data, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); +} + +SrsRtcBlackhole* _srs_blackhole = new SrsRtcBlackhole(); + // @global dtls certficate for rtc module. SrsDtlsCertificate* _srs_rtc_dtls_certificate = new SrsDtlsCertificate(); -using namespace std; - static bool is_stun(const uint8_t* data, const int size) { return data != NULL && size > 0 && (data[0] == 0 || data[0] == 1); @@ -176,6 +234,10 @@ srs_error_t SrsRtcServer::initialize() return srs_error_wrap(err, "start timer"); } + if ((err = _srs_blackhole->initialize()) != srs_success) { + return srs_error_wrap(err, "black hole"); + } + srs_trace("RTC server init ok"); return err; diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 79c445d88..3a878aa7e 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -41,6 +41,27 @@ class SrsRequest; class SrsSdp; class SrsRtcStream; +// The UDP black hole, for developer to use wireshark to catch plaintext packets. +// For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole, +// we can use wireshark to capture the plaintext. +class SrsRtcBlackhole +{ +public: + bool blackhole; +private: + sockaddr_in* blackhole_addr; + srs_netfd_t blackhole_stfd; +public: + SrsRtcBlackhole(); + virtual ~SrsRtcBlackhole(); +public: + srs_error_t initialize(); + void sendto(void* data, int len); +}; + +extern SrsRtcBlackhole* _srs_blackhole; + +// The handler for RTC server to call. class ISrsRtcServerHandler { public: @@ -51,6 +72,7 @@ public: virtual void on_timeout(SrsRtcConnection* session) = 0; }; +// The RTC server instance, listen UDP port, handle UDP packet, manage RTC connections. class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass { private: @@ -58,7 +80,9 @@ private: std::vector listeners; ISrsRtcServerHandler* handler; private: + // TODO: FIXME: Rename it. std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) + // TODO: FIXME: Rename it. std::map map_id_session; // key: peerip(ip + ":" + port) // The zombie sessions, we will free them. std::vector zombies_;