From 347fafec6e6f4ef0ba4e0d30e4a67427c00464fc Mon Sep 17 00:00:00 2001 From: "jinxue.cgh" Date: Sun, 17 May 2020 11:16:00 +0800 Subject: [PATCH] tenfold: refine publish nack send --- trunk/src/app/srs_app_rtc_conn.cpp | 34 ++++++++++++++--------------- trunk/src/app/srs_app_rtc_conn.hpp | 2 -- trunk/src/app/srs_app_rtc_queue.cpp | 34 ++++++++++++++++++++++++++--- trunk/src/app/srs_app_rtc_queue.hpp | 4 ++++ 4 files changed, 52 insertions(+), 22 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index c40ca544c..28840513e 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1376,7 +1376,7 @@ srs_error_t SrsRtcPlayer::on_rtcp_rr(char* data, int nb_data) SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) { - report_timer = new SrsHourGlass(this, 10 * SRS_UTIME_MILLISECONDS); + report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); session_ = session; request_keyframe_ = false; @@ -1422,14 +1422,10 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsReque srs_trace("RTC player video(ssrc=%u), audio(ssrc=%u), nack=%d", video_ssrc, audio_ssrc, nack_enabled_); - if ((err = report_timer->tick(EVENT_REPORT, 200 * SRS_UTIME_MILLISECONDS)) != srs_success) { + if ((err = report_timer->tick(0 * SRS_UTIME_MILLISECONDS)) != srs_success) { return srs_error_wrap(err, "hourglass tick"); } - if ((err = report_timer->tick(EVENT_NACK, 10*SRS_UTIME_MILLISECONDS)) != srs_success) { - return srs_error_wrap(err, "NACK tick"); - } - if ((err = report_timer->start()) != srs_success) { return srs_error_wrap(err, "start report_timer"); } @@ -1760,6 +1756,7 @@ srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt) SrsRtpNackForReceiver* nack_receiver = audio_nack_; SrsRtpRingBuffer* ring_queue = audio_queue_; + // TODO: FIXME: use is_audio() to jugdement uint32_t ssrc = pkt->header.get_ssrc(); uint16_t seq = pkt->header.get_sequence(); bool video = (ssrc == video_ssrc) ? true : false; @@ -1768,8 +1765,13 @@ srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt) ring_queue = video_queue_; } + // TODO: check whether is necessary? + nack_receiver->remove_timeout_packets(); + SrsRtpNackInfo* nack_info = nack_receiver->find(seq); if (nack_info) { + // seq had been received. + nack_receiver->remove(seq); return err; } @@ -1783,6 +1785,11 @@ srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt) nack_receiver->insert(nack_first, nack_last); nack_receiver->check_queue_size(); } + + // insert into video_queue and audio_queue + ring_queue->set(seq, pkt->copy()); + // send_nack + check_send_nacks(nack_receiver, ssrc); return err; } @@ -2142,18 +2149,11 @@ srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t { srs_error_t err = srs_success; // TODO: FIXME: Check error. - if (type == EVENT_REPORT) { - send_rtcp_rr(video_ssrc, video_queue_); - send_rtcp_rr(audio_ssrc, audio_queue_); - send_rtcp_xr_rrtr(video_ssrc); - send_rtcp_xr_rrtr(audio_ssrc); - } + send_rtcp_rr(video_ssrc, video_queue_); + send_rtcp_rr(audio_ssrc, audio_queue_); + send_rtcp_xr_rrtr(video_ssrc); + send_rtcp_xr_rrtr(audio_ssrc); - if (type == EVENT_NACK) { - check_send_nacks(video_nack_, video_ssrc); - check_send_nacks(audio_nack_, audio_ssrc); - } - return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 1fb3a3195..fa9985bff 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -275,8 +275,6 @@ private: SrsRtpNackForReceiver* video_nack_; SrsRtpRingBuffer* audio_queue_; SrsRtpNackForReceiver* audio_nack_; -private: - enum {EVENT_REPORT=1, EVENT_NACK = 2}; private: SrsRequest* req; SrsRtcSource* source; diff --git a/trunk/src/app/srs_app_rtc_queue.cpp b/trunk/src/app/srs_app_rtc_queue.cpp index cda039141..3a37af870 100644 --- a/trunk/src/app/srs_app_rtc_queue.cpp +++ b/trunk/src/app/srs_app_rtc_queue.cpp @@ -99,6 +99,9 @@ bool SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack // Normal sequence, seq follows high_. if (srs_rtp_seq_distance(end, seq) >= 0) { + //TODO: FIXME: if diff_upper > limit_max_size clear? + // int16_t diff_upper = srs_rtp_seq_distance(end, seq) + // notify_nack_list_full() nack_first = end; nack_last = seq; @@ -109,16 +112,24 @@ bool SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack ++nn_seq_flip_backs; } end = seq + 1; + // TODO: FIXME: check whether is neccessary? + // srs_rtp_seq_distance(begin, end) > max_size + // advance_to(), srs_rtp_seq_distance(begin, end) < max_size; return true; } // Out-of-order sequence, seq before low_. if (srs_rtp_seq_distance(seq, begin) > 0) { + nack_first = seq; + nack_last = begin; + begin = seq; + return true; + // When startup, we may receive packets in chaos order. // Because we don't know the ISN(initiazlie sequence number), the first packet // we received maybe no the first packet client sent. // @remark We only log a warning, because it seems ok for publisher. - return false; + //return false; } return true; @@ -174,6 +185,7 @@ SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue max_queue_size_ = queue_size; rtp_ = rtp; pre_check_time_ = 0; + last_remove_packet_time_ = -1; srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%" PRId64 ", nack_interval=%" PRId64, max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval); @@ -216,8 +228,7 @@ void SrsRtpNackForReceiver::check_queue_size() void SrsRtpNackForReceiver::get_nack_seqs(vector& seqs) { - //TODO: use get_system_time to udpate - srs_utime_t now = srs_update_system_time(); + srs_utime_t now = srs_get_system_time(); srs_utime_t interval = now - pre_check_time_; if (interval < opts_.nack_interval / 2) { return; @@ -260,3 +271,20 @@ void SrsRtpNackForReceiver::update_rtt(int rtt) opts_.nack_interval = rtt_; } +#define PACKET_CLEAR_TIMEOUT (3000 * SRS_UTIME_MILLISECONDS) +void SrsRtpNackForReceiver::remove_timeout_packets(void) +{ + srs_utime_t now = srs_get_system_time(); + if (last_remove_packet_time_ == -1) { + last_remove_packet_time_ = now; + return; + } + + srs_utime_t elapsed_time = now - last_remove_packet_time_; + last_remove_packet_time_ = now; + + if (elapsed_time > PACKET_CLEAR_TIMEOUT) { + rtp_->notify_nack_list_full(); + queue_.clear(); + } +} \ No newline at end of file diff --git a/trunk/src/app/srs_app_rtc_queue.hpp b/trunk/src/app/srs_app_rtc_queue.hpp index f3a2a25a6..9bbada3ce 100644 --- a/trunk/src/app/srs_app_rtc_queue.hpp +++ b/trunk/src/app/srs_app_rtc_queue.hpp @@ -138,6 +138,10 @@ public: void get_nack_seqs(std::vector& seqs); public: void update_rtt(int rtt); +private: + srs_utime_t last_remove_packet_time_; +public: + void remove_timeout_packets(void); }; #endif