From 80d45e5982a72f9f1b9ae3eecc783995b78fc749 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 5 May 2020 07:42:27 +0800 Subject: [PATCH] Refine RTP ring buffer, change to template --- trunk/src/app/srs_app_rtc_conn.cpp | 4 +- trunk/src/app/srs_app_rtc_conn.hpp | 6 +- trunk/src/app/srs_app_rtp_queue.cpp | 113 ++-------------------------- trunk/src/app/srs_app_rtp_queue.hpp | 101 +++++++++++++++++++++---- 4 files changed, 98 insertions(+), 126 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index e384be27f..cc6e69298 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -616,8 +616,8 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid) realtime = true; // TODO: FIXME: Config the capacity? - audio_queue_ = new SrsRtpRingBuffer(100); - video_queue_ = new SrsRtpRingBuffer(1000); + audio_queue_ = new SrsRtpRingBuffer(100); + video_queue_ = new SrsRtpRingBuffer(1000); nn_simulate_nack_drop = 0; nack_enabled_ = false; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 959f6990c..2b09be28f 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -59,7 +60,6 @@ class SrsRtpPacket2; class ISrsCodec; class SrsRtpNackForReceiver; class SrsRtpIncommingVideoFrame; -class SrsRtpRingBuffer; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -214,8 +214,8 @@ private: uint16_t video_payload_type; uint32_t video_ssrc; // NACK ARQ ring buffer. - SrsRtpRingBuffer* audio_queue_; - SrsRtpRingBuffer* video_queue_; + SrsRtpRingBuffer* audio_queue_; + SrsRtpRingBuffer* video_queue_; // Simulators. int nn_simulate_nack_drop; private: diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index b2c085b1e..9bcd7a1e9 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -127,114 +127,9 @@ void SrsRtpNackForReceiver::update_rtt(int rtt) opts_.nack_interval = rtt_; } -SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity) -{ - nn_seq_flip_backs = 0; - begin = end = 0; - capacity_ = (uint16_t)capacity; - initialized_ = false; - - queue_ = new SrsRtpPacket2*[capacity_]; - memset(queue_, 0, sizeof(SrsRtpPacket2*) * capacity); -} - -SrsRtpRingBuffer::~SrsRtpRingBuffer() -{ - srs_freepa(queue_); -} - -bool SrsRtpRingBuffer::empty() -{ - return begin == end; -} - -int SrsRtpRingBuffer::size() -{ - int size = srs_rtp_seq_distance(begin, end); - srs_assert(size >= 0); - return size; -} - -void SrsRtpRingBuffer::advance_to(uint16_t seq) -{ - begin = seq; -} - -void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt) -{ - SrsRtpPacket2* p = queue_[at % capacity_]; - - if (p) { - srs_freep(p); - } - - queue_[at % capacity_] = pkt; -} - -void SrsRtpRingBuffer::remove(uint16_t at) -{ - set(at, NULL); -} - -void SrsRtpRingBuffer::reset(uint16_t first, uint16_t last) -{ - for (uint16_t s = first; s != last; ++s) { - queue_[s % capacity_] = NULL; - } -} - -bool SrsRtpRingBuffer::overflow() -{ - return srs_rtp_seq_distance(begin, end) >= capacity_; -} - -uint32_t SrsRtpRingBuffer::get_extended_highest_sequence() -{ - return nn_seq_flip_backs * 65536 + end - 1; -} - -void SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last) -{ - if (!initialized_) { - initialized_ = true; - begin = seq; - end = seq + 1; - return; - } - - // Normal sequence, seq follows high_. - if (srs_rtp_seq_distance(end, seq) >= 0) { - nack_first = end; - nack_last = seq; - - // When distance(seq,high_)>0 and seq0 and 1<65535. - // TODO: FIXME: The first flip may be dropped. - if (seq < end) { - ++nn_seq_flip_backs; - } - end = seq + 1; - return; - } - - // Out-of-order sequence, seq before low_. - if (srs_rtp_seq_distance(seq, begin) > 0) { - // When startup, we may receive packets in chaos order. - // Because we don't know the ISN(initiazlie sequence number), the first packet - // we received maybe no the first packet client sent. - // @remark We only log a warning, because it seems ok for publisher. - srs_warn("too old seq %u, range [%u, %u]", seq, begin, end); - } -} - -SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq) -{ - return queue_[seq % capacity_]; -} - SrsRtpQueue::SrsRtpQueue(int capacity) { - queue_ = new SrsRtpRingBuffer(capacity); + queue_ = new SrsRtpRingBuffer(capacity); jitter_ = 0; last_trans_time_ = -1; @@ -293,8 +188,12 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt // OK, we got one new RTP packet, which is not in NACK. if (!nack_info) { ++num_of_packet_received_; + uint16_t nack_first = 0, nack_last = 0; - queue_->update(seq, nack_first, nack_last); + if (!queue_->update(seq, nack_first, nack_last)) { + srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end); + } + if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) { srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); insert_into_nack_list(nack, nack_first, nack_last); diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 46fbbdb6a..bc2706e87 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -113,13 +113,14 @@ public: // but not an entire video frame right now. // * seq10: This packet is lost or not received, we put it in the nack list. // We store the received packets in ring buffer. +template class SrsRtpRingBuffer { private: // Capacity of the ring-buffer. uint16_t capacity_; // Ring bufer. - SrsRtpPacket2** queue_; + T* queue_; // Increase one when uint16 flip back, for get_extended_highest_sequence. uint64_t nn_seq_flip_backs; // Whether initialized, because we use uint16 so we can't use -1. @@ -132,28 +133,100 @@ public: // For example, when got 1 elems, the end is 1. uint16_t end; public: - SrsRtpRingBuffer(int capacity); - virtual ~SrsRtpRingBuffer(); + SrsRtpRingBuffer(int capacity) { + nn_seq_flip_backs = 0; + begin = end = 0; + capacity_ = (uint16_t)capacity; + initialized_ = false; + + queue_ = new T[capacity_]; + memset(queue_, 0, sizeof(T) * capacity); + } + virtual ~SrsRtpRingBuffer() { + srs_freepa(queue_); + } public: // Whether the ring buffer is empty. - bool empty(); + bool empty() { + return begin == end; + } // Get the count of elems in ring buffer. - int size(); + int size() { + int size = srs_rtp_seq_distance(begin, end); + srs_assert(size >= 0); + return size; + } // Move the low position of buffer to seq. - void advance_to(uint16_t seq); + void advance_to(uint16_t seq) { + begin = seq; + } // Free the packet at position. - void set(uint16_t at, SrsRtpPacket2* pkt); - void remove(uint16_t at); + void set(uint16_t at, T pkt) { + T p = queue_[at % capacity_]; + + if (p) { + srs_freep(p); + } + + queue_[at % capacity_] = pkt; + } + void remove(uint16_t at) { + set(at, NULL); + } // Directly reset range [first, last) to NULL. - void reset(uint16_t first, uint16_t last); + void reset(uint16_t first, uint16_t last) { + for (uint16_t s = first; s != last; ++s) { + queue_[s % capacity_] = NULL; + } + } // Whether queue overflow or heavy(too many packets and need clear). - bool overflow(); + bool overflow() { + return srs_rtp_seq_distance(begin, end) >= capacity_; + } // The highest sequence number, calculate the flip back base. - uint32_t get_extended_highest_sequence(); + uint32_t get_extended_highest_sequence() { + return nn_seq_flip_backs * 65536 + end - 1; + } // Update the sequence, got the nack range by [first, last). - void update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last); + // @return If false, the seq is too old. + bool update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last) { + if (!initialized_) { + initialized_ = true; + begin = seq; + end = seq + 1; + return true; + } + + // Normal sequence, seq follows high_. + if (srs_rtp_seq_distance(end, seq) >= 0) { + nack_first = end; + nack_last = seq; + + // When distance(seq,high_)>0 and seq0 and 1<65535. + // TODO: FIXME: The first flip may be dropped. + if (seq < end) { + ++nn_seq_flip_backs; + } + end = seq + 1; + return true; + } + + // Out-of-order sequence, seq before low_. + if (srs_rtp_seq_distance(seq, begin) > 0) { + // When startup, we may receive packets in chaos order. + // Because we don't know the ISN(initiazlie sequence number), the first packet + // we received maybe no the first packet client sent. + // @remark We only log a warning, because it seems ok for publisher. + return false; + } + + return true; + } // Get the packet by seq. - SrsRtpPacket2* at(uint16_t seq); + T at(uint16_t seq) { + return queue_[seq % capacity_]; + } }; class SrsRtpQueue @@ -167,7 +240,7 @@ private: uint64_t num_of_packet_received_; uint64_t number_of_packet_lossed_; protected: - SrsRtpRingBuffer* queue_; + SrsRtpRingBuffer* queue_; public: SrsRtpQueue(int capacity); virtual ~SrsRtpQueue();