From 899dddb6241fdefd2b6da3bc2eccf661cff9b2db Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 5 May 2020 08:08:03 +0800 Subject: [PATCH] Refactor RTC publisher, audio/video use its special ring buffer --- trunk/src/app/srs_app_rtp_queue.cpp | 153 ++++++++++++++++++---------- trunk/src/app/srs_app_rtp_queue.hpp | 17 ++-- 2 files changed, 108 insertions(+), 62 deletions(-) diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 9bcd7a1e9..88a72fed9 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -127,10 +127,8 @@ void SrsRtpNackForReceiver::update_rtt(int rtt) opts_.nack_interval = rtt_; } -SrsRtpQueue::SrsRtpQueue(int capacity) +SrsRtpQueue::SrsRtpQueue() { - queue_ = new SrsRtpRingBuffer(capacity); - jitter_ = 0; last_trans_time_ = -1; @@ -143,10 +141,33 @@ SrsRtpQueue::SrsRtpQueue(int capacity) SrsRtpQueue::~SrsRtpQueue() { - srs_freep(queue_); } -srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) +uint8_t SrsRtpQueue::get_fraction_lost() +{ + int64_t total = (number_of_packet_lossed_ - pre_number_of_packet_lossed_ + num_of_packet_received_ - pre_number_of_packet_received_); + uint8_t loss = 0; + if (total > 0) { + loss = (number_of_packet_lossed_ - pre_number_of_packet_lossed_) * 256 / total; + } + + pre_number_of_packet_lossed_ = number_of_packet_lossed_; + pre_number_of_packet_received_ = num_of_packet_received_; + + return loss; +} + +uint32_t SrsRtpQueue::get_cumulative_number_of_packets_lost() +{ + return number_of_packet_lossed_; +} + +uint32_t SrsRtpQueue::get_interarrival_jitter() +{ + return static_cast(jitter_); +} + +srs_error_t SrsRtpQueue::on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -156,8 +177,6 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt uint16_t seq = pkt->rtp_header.get_sequence(); SrsRtpNackInfo* nack_info = NULL; - - // If no NACK, disable nack. if (nack) { nack_info = nack->find(seq); } @@ -185,56 +204,14 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast(cur_jitter) / 16.0); } - // OK, we got one new RTP packet, which is not in NACK. + // OK, got new RTP packet. if (!nack_info) { ++num_of_packet_received_; - - uint16_t nack_first = 0, nack_last = 0; - if (!queue_->update(seq, nack_first, nack_last)) { - srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end); - } - - if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) { - srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); - insert_into_nack_list(nack, nack_first, nack_last); - } } - // Save packet at the position seq. - queue_->set(seq, pkt); - return err; } -uint32_t SrsRtpQueue::get_extended_highest_sequence() -{ - return queue_->get_extended_highest_sequence(); -} - -uint8_t SrsRtpQueue::get_fraction_lost() -{ - int64_t total = (number_of_packet_lossed_ - pre_number_of_packet_lossed_ + num_of_packet_received_ - pre_number_of_packet_received_); - uint8_t loss = 0; - if (total > 0) { - loss = (number_of_packet_lossed_ - pre_number_of_packet_lossed_) * 256 / total; - } - - pre_number_of_packet_lossed_ = number_of_packet_lossed_; - pre_number_of_packet_received_ = num_of_packet_received_; - - return loss; -} - -uint32_t SrsRtpQueue::get_cumulative_number_of_packets_lost() -{ - return number_of_packet_lossed_; -} - -uint32_t SrsRtpQueue::get_interarrival_jitter() -{ - return static_cast(jitter_); -} - void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last) { if (!nack) { @@ -249,12 +226,14 @@ void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t fi nack->check_queue_size(); } -SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) : SrsRtpQueue(capacity) +SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) { + queue_ = new SrsRtpRingBuffer(capacity); } SrsRtpAudioQueue::~SrsRtpAudioQueue() { + srs_freep(queue_); } void SrsRtpAudioQueue::notify_drop_seq(uint16_t seq) @@ -274,9 +253,43 @@ void SrsRtpAudioQueue::notify_nack_list_full() queue_->advance_to(queue_->end); } +uint32_t SrsRtpAudioQueue::get_extended_highest_sequence() +{ + return queue_->get_extended_highest_sequence(); +} + srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) { - return SrsRtpQueue::consume(nack, pkt); + srs_error_t err = srs_success; + + uint16_t seq = pkt->rtp_header.get_sequence(); + + SrsRtpNackInfo* nack_info = NULL; + if (nack) { + nack_info = nack->find(seq); + } + + if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) { + return err; + } + + // OK, we got one new RTP packet, which is not in NACK. + if (!nack_info) { + uint16_t nack_first = 0, nack_last = 0; + if (!queue_->update(seq, nack_first, nack_last)) { + srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end); + } + + if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) { + srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); + insert_into_nack_list(nack, nack_first, nack_last); + } + } + + // Save packet at the position seq. + queue_->set(seq, pkt); + + return err; } void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector& frames) @@ -323,13 +336,15 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector(capacity); } SrsRtpVideoQueue::~SrsRtpVideoQueue() { + srs_freep(queue_); } void SrsRtpVideoQueue::notify_drop_seq(uint16_t seq) @@ -350,6 +365,11 @@ void SrsRtpVideoQueue::notify_nack_list_full() queue_->advance_to(next); } +uint32_t SrsRtpVideoQueue::get_extended_highest_sequence() +{ + return queue_->get_extended_highest_sequence(); +} + srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -376,10 +396,33 @@ srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2 } } - if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) { - return srs_error_wrap(err, "video consume"); + uint16_t seq = pkt->rtp_header.get_sequence(); + + SrsRtpNackInfo* nack_info = NULL; + if (nack) { + nack_info = nack->find(seq); + } + + if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) { + return err; } + // OK, we got one new RTP packet, which is not in NACK. + if (!nack_info) { + uint16_t nack_first = 0, nack_last = 0; + if (!queue_->update(seq, nack_first, nack_last)) { + srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end); + } + + if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) { + srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); + insert_into_nack_list(nack, nack_first, nack_last); + } + } + + // Save packet at the position seq. + queue_->set(seq, pkt); + return err; } diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index bc2706e87..55628ebf9 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -239,33 +239,34 @@ private: uint64_t pre_number_of_packet_lossed_; uint64_t num_of_packet_received_; uint64_t number_of_packet_lossed_; -protected: - SrsRtpRingBuffer* queue_; public: - SrsRtpQueue(int capacity); + SrsRtpQueue(); virtual ~SrsRtpQueue(); public: - virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); virtual void notify_drop_seq(uint16_t seq) = 0; virtual void notify_nack_list_full() = 0; -public: - uint32_t get_extended_highest_sequence(); + virtual uint32_t get_extended_highest_sequence() = 0; uint8_t get_fraction_lost(); uint32_t get_cumulative_number_of_packets_lost(); uint32_t get_interarrival_jitter(); -private: +protected: + srs_error_t on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last); }; class SrsRtpAudioQueue : public SrsRtpQueue { +private: + SrsRtpRingBuffer* queue_; public: SrsRtpAudioQueue(int capacity); virtual ~SrsRtpAudioQueue(); public: virtual void notify_drop_seq(uint16_t seq); virtual void notify_nack_list_full(); + virtual uint32_t get_extended_highest_sequence(); virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); +public: virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector& frames); }; @@ -273,12 +274,14 @@ class SrsRtpVideoQueue : public SrsRtpQueue { private: bool request_key_frame_; + SrsRtpRingBuffer* queue_; public: SrsRtpVideoQueue(int capacity); virtual ~SrsRtpVideoQueue(); public: virtual void notify_drop_seq(uint16_t seq); virtual void notify_nack_list_full(); + virtual uint32_t get_extended_highest_sequence(); virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector& frame); bool should_request_key_frame();