From 3a3d908a63f318c53a304ba1288ec7243d955821 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 21 Jul 2020 11:38:41 +0800 Subject: [PATCH] RTC: Refine twcc to connection --- trunk/src/app/srs_app_rtc_conn.cpp | 208 +++++++++++++-------------- trunk/src/app/srs_app_rtc_conn.hpp | 15 +- trunk/src/app/srs_app_rtc_source.cpp | 34 +++-- trunk/src/app/srs_app_rtc_source.hpp | 7 +- 4 files changed, 140 insertions(+), 124 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 52b4cd984..2da90e327 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -225,7 +225,6 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid) mw_msgs = 0; realtime = true; - nn_simulate_nack_drop = 0; nack_enabled_ = false; _srs_config->subscribe(this); @@ -432,9 +431,6 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector send_pkts; // Covert kernel messages to RTP packets. for (int i = 0; i < (int)pkts.size(); i++) { @@ -444,17 +440,12 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vectorheader.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) { continue; } - - // Update stats. - info.nn_bytes += pkt->nb_bytes(); - + // For audio, we transcoded AAC to opus in extra payloads. if (pkt->is_audio()) { - info.nn_audios++; - SrsRtcAudioSendTrack* audio_track = audio_tracks_[pkt->header.get_ssrc()]; // TODO: FIXME: Any simple solution? - if ((err = audio_track->on_rtp(send_pkts, pkt)) != srs_success) { + if ((err = audio_track->on_rtp(pkt, info)) != srs_success) { return srs_error_wrap(err, "audio_track on rtp"); } // TODO: FIXME: Padding audio to the max payload in RTP packets. @@ -463,7 +454,7 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vectorheader.get_ssrc()]; // TODO: FIXME: Any simple solution? - if ((err = video_track->on_rtp(send_pkts, pkt)) != srs_success) { + if ((err = video_track->on_rtp(pkt, info)) != srs_success) { return srs_error_wrap(err, "audio_track on rtp"); } } @@ -473,80 +464,6 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vectorheader.get_timestamp(), pkt->nb_bytes()); } - // By default, we send packets by sendmmsg. - if ((err = do_send_packets(send_pkts, info)) != srs_success) { - return srs_error_wrap(err, "raw send"); - } - - return err; -} - -srs_error_t SrsRtcPlayStream::do_send_packets(const std::vector& pkts, SrsRtcOutgoingInfo& info) -{ - srs_error_t err = srs_success; - - // Cache the encrypt flag and sender. - bool encrypt = session_->encrypt; - - for (int i = 0; i < (int)pkts.size(); i++) { - SrsRtpPacket2* pkt = pkts.at(i); - - // For this message, select the first iovec. - iovec* iov = new iovec(); - SrsAutoFree(iovec, iov); - - char* iov_base = new char[kRtpPacketSize]; - SrsAutoFreeA(char, iov_base); - - iov->iov_base = iov_base; - iov->iov_len = kRtpPacketSize; - - // Marshal packet to bytes in iovec. - if (true) { -#ifdef SRS_CXX14 - // should set twcc sn before packet encode. - if(twcc_id_) { - twcc_sn = twcc_controller.allocate_twcc_sn(); - pkt->header.set_twcc_sequence_number(twcc_id_, twcc_sn); - } -#endif - - SrsBuffer stream((char*)iov->iov_base, iov->iov_len); - if ((err = pkt->encode(&stream)) != srs_success) { - return srs_error_wrap(err, "encode packet"); - } - iov->iov_len = stream.pos(); - } - - // Whether encrypt the RTP bytes. - if (encrypt) { - int nn_encrypt = (int)iov->iov_len; - if ((err = session_->transport_->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) { - return srs_error_wrap(err, "srtp protect"); - } - iov->iov_len = (size_t)nn_encrypt; - } - - info.nn_rtp_bytes += (int)iov->iov_len; - - // When we send out a packet, increase the stat counter. - info.nn_rtp_pkts++; - - // For NACK simulator, drop packet. - if (nn_simulate_nack_drop) { - simulate_drop_packet(&pkt->header, (int)iov->iov_len); - iov->iov_len = 0; - continue; - } - - // TODO: FIXME: Handle error. - session_->sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0); - - // Detail log, should disable it in release version. - srs_info("RTC: SEND PT=%u, SSRC=%#x, SEQ=%u, Time=%u, %u/%u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(), - pkt->header.get_sequence(), pkt->header.get_timestamp(), pkt->nb_bytes(), iov->iov_len); - } - return err; } @@ -579,20 +496,6 @@ void SrsRtcPlayStream::nack_fetch(vector& pkts, uint32_t ssrc, u } } -void SrsRtcPlayStream::simulate_nack_drop(int nn) -{ - nn_simulate_nack_drop = nn; -} - -void SrsRtcPlayStream::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) -{ - srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u, ts=%u, %d bytes", nn_simulate_nack_drop, - h->get_sequence(), h->get_ssrc(), h->get_timestamp(), - nn_bytes); - - nn_simulate_nack_drop--; -} - srs_error_t SrsRtcPlayStream::on_rtcp(char* data, int nb_data) { srs_error_t err = srs_success; @@ -697,10 +600,13 @@ srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf) : Feedback Control Information (FCI) : : : */ - /*uint8_t first = */stream->read_1bytes(); + uint8_t first = stream->read_1bytes(); //uint8_t version = first & 0xC0; //uint8_t padding = first & 0x20; - //uint8_t fmt = first & 0x1F; + uint8_t fmt = first & 0x1F; + if(15 == fmt) { + return session_->on_rtcp_feedback(buf, nb_buf); + } /*uint8_t payload_type = */stream->read_1bytes(); /*uint16_t length = */stream->read_2bytes(); @@ -746,7 +652,7 @@ srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf) } // By default, we send packets by sendmmsg. - if ((err = do_send_packets(resend_pkts, info)) != srs_success) { + if ((err = session_->do_send_packets(resend_pkts, info)) != srs_success) { return srs_error_wrap(err, "raw send"); } @@ -1577,6 +1483,8 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s) blackhole = false; blackhole_addr = NULL; blackhole_stfd = NULL; + twcc_id_ = 0; + nn_simulate_player_nack_drop = 0; } SrsRtcConnection::~SrsRtcConnection() @@ -1877,6 +1785,11 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data) return err; } +srs_error_t SrsRtcConnection::on_rtcp_feedback(char* data, int nb_data) +{ + return srs_success; +} + srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data) { if (publisher_ == NULL) { @@ -2166,13 +2079,78 @@ srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc) void SrsRtcConnection::simulate_nack_drop(int nn) { - if (player_) { - player_->simulate_nack_drop(nn); - } - if (publisher_) { publisher_->simulate_nack_drop(nn); } + + nn_simulate_player_nack_drop = nn; +} + +void SrsRtcConnection::simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes) +{ + srs_warn("RTC NACK simulator #%d player drop seq=%u, ssrc=%u, ts=%u, %d bytes", nn_simulate_player_nack_drop, + h->get_sequence(), h->get_ssrc(), h->get_timestamp(), + nn_bytes); + + nn_simulate_player_nack_drop--; +} + +srs_error_t SrsRtcConnection::do_send_packets(const std::vector& pkts, SrsRtcOutgoingInfo& info) +{ + srs_error_t err = srs_success; + + for (int i = 0; i < (int)pkts.size(); i++) { + SrsRtpPacket2* pkt = pkts.at(i); + + // For this message, select the first iovec. + iovec* iov = new iovec(); + SrsAutoFree(iovec, iov); + + char* iov_base = new char[kRtpPacketSize]; + SrsAutoFreeA(char, iov_base); + + iov->iov_base = iov_base; + iov->iov_len = kRtpPacketSize; + + // Marshal packet to bytes in iovec. + if (true) { + SrsBuffer stream((char*)iov->iov_base, iov->iov_len); + if ((err = pkt->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "encode packet"); + } + iov->iov_len = stream.pos(); + } + + // Whether encrypt the RTP bytes. + if (encrypt) { + int nn_encrypt = (int)iov->iov_len; + if ((err = transport_->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) { + return srs_error_wrap(err, "srtp protect"); + } + iov->iov_len = (size_t)nn_encrypt; + } + + info.nn_rtp_bytes += (int)iov->iov_len; + + // When we send out a packet, increase the stat counter. + info.nn_rtp_pkts++; + + // For NACK simulator, drop packet. + if (nn_simulate_player_nack_drop) { + simulate_player_drop_packet(&pkt->header, (int)iov->iov_len); + iov->iov_len = 0; + continue; + } + + // TODO: FIXME: Handle error. + sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0); + + // Detail log, should disable it in release version. + srs_info("RTC: SEND PT=%u, SSRC=%#x, SEQ=%u, Time=%u, %u/%u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(), + pkt->header.get_sequence(), pkt->header.get_timestamp(), pkt->nb_bytes(), iov->iov_len); + } + + return err; } #ifdef SRS_OSX @@ -2834,6 +2812,22 @@ srs_error_t SrsRtcConnection::create_player(SrsRequest* req, std::map::iterator it = sub_relations.begin(); + while (it != sub_relations.end()) { + if (it->second->type_ == "video") { + SrsRtcTrackDescription* track = it->second; + twcc_id = track->get_rtp_extension_id(kTWCCExt); + } + ++it; + } + } + srs_trace("RTC connection player gcc=%d", twcc_id); + return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 6827c499b..26c9838d8 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -175,14 +175,13 @@ private: // key: publish_ssrc, value: send track to process rtp/rtcp std::map audio_tracks_; std::map video_tracks_; - // Simulators. - int nn_simulate_nack_drop; private: // For merged-write messages. int mw_msgs; bool realtime; // Whether enabled nack. bool nack_enabled_; +private: // Whether palyer started. bool is_started; // statistic send packets. @@ -206,12 +205,8 @@ public: virtual srs_error_t cycle(); private: srs_error_t send_packets(SrsRtcStream* source, const std::vector& pkts, SrsRtcOutgoingInfo& info); - srs_error_t do_send_packets(const std::vector& pkts, SrsRtcOutgoingInfo& info); public: void nack_fetch(std::vector& pkts, uint32_t ssrc, uint16_t seq); - void simulate_nack_drop(int nn); -private: - void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes); public: srs_error_t on_rtcp(char* data, int nb_data); private: @@ -336,6 +331,11 @@ private: bool blackhole; sockaddr_in* blackhole_addr; srs_netfd_t blackhole_stfd; +private: + // twcc handler + int twcc_id_; + // Simulators. + int nn_simulate_player_nack_drop; public: SrsRtcConnection(SrsRtcServer* s); virtual ~SrsRtcConnection(); @@ -366,6 +366,7 @@ public: srs_error_t on_dtls(char* data, int nb_data); srs_error_t on_rtp(char* data, int nb_data); srs_error_t on_rtcp(char* data, int nb_data); + srs_error_t on_rtcp_feedback(char* buf, int nb_buf); public: srs_error_t on_connection_established(); srs_error_t start_play(); @@ -381,6 +382,8 @@ public: public: // Simulate the NACK to drop nn packets. void simulate_nack_drop(int nn); + void simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes); + srs_error_t do_send_packets(const std::vector& pkts, SrsRtcOutgoingInfo& info); private: srs_error_t on_binding_request(SrsStunPacket* r); // publish media capabilitiy negotiate diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index bc99a691f..a0fb8527d 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -1723,7 +1723,7 @@ std::string SrsRtcSendTrack::get_track_id() return track_desc_->id_; } -srs_error_t SrsRtcSendTrack::on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt) +srs_error_t SrsRtcSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info) { return srs_success; } @@ -1742,7 +1742,7 @@ SrsRtcAudioSendTrack::~SrsRtcAudioSendTrack() { } -srs_error_t SrsRtcAudioSendTrack::on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt) +srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info) { srs_error_t err = srs_success; @@ -1750,16 +1750,25 @@ srs_error_t SrsRtcAudioSendTrack::on_rtp(std::vector& send_packe return err; } + std::vector pkts; pkt->header.set_ssrc(track_desc_->ssrc_); - send_packets.push_back(pkt); - // Put rtp packet to NACK/ARQ queue if (true) { SrsRtpPacket2* nack = pkt->copy(); rtp_queue_->set(nack->header.get_sequence(), nack); } + pkts.push_back(pkt); + + // Update stats. + info.nn_bytes += pkt->nb_bytes(); + info.nn_audios++; + + if ((err = session_->do_send_packets(pkts, info)) != srs_success) { + return srs_error_wrap(err, "raw send"); + } + return err; } @@ -1779,23 +1788,32 @@ SrsRtcVideoSendTrack::~SrsRtcVideoSendTrack() { } -srs_error_t SrsRtcVideoSendTrack::on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt) +srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info) { srs_error_t err = srs_success; if (!track_desc_->is_active_) { return err; } - + + std::vector pkts; + pkt->header.set_ssrc(track_desc_->ssrc_); - - send_packets.push_back(pkt); // Put rtp packet to NACK/ARQ queue if (true) { SrsRtpPacket2* nack = pkt->copy(); rtp_queue_->set(nack->header.get_sequence(), nack); } + pkts.push_back(pkt); + // Update stats. + info.nn_bytes += pkt->nb_bytes(); + info.nn_videos++; + + if ((err = session_->do_send_packets(pkts, info)) != srs_success) { + return srs_error_wrap(err, "raw send"); + } + return err; } diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index aaec5fa36..297a7042a 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -54,6 +54,7 @@ class SrsRtcConnection; class SrsRtpRingBuffer; class SrsRtpNackForReceiver; class SrsJsonObject; +class SrsRtcOutgoingInfo; class SrsNtp { @@ -470,7 +471,7 @@ public: void set_track_status(bool active); std::string get_track_id(); public: - virtual srs_error_t on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt); + virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info); virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); }; @@ -480,7 +481,7 @@ public: SrsRtcAudioSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); virtual ~SrsRtcAudioSendTrack(); public: - virtual srs_error_t on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt); + virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info); virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); }; @@ -490,7 +491,7 @@ public: SrsRtcVideoSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); virtual ~SrsRtcVideoSendTrack(); public: - virtual srs_error_t on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt); + virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info); virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); };