From 365e6bb45a125dcca696064d76faa5b6c2473296 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 3 May 2020 19:09:48 +0800 Subject: [PATCH] Refine RTC publisher, no cache. 4.0.25 --- trunk/src/app/srs_app_rtp_queue.cpp | 168 +++++++++++++-------------- trunk/src/app/srs_app_rtp_queue.hpp | 46 ++++---- trunk/src/core/srs_core_version4.hpp | 2 +- 3 files changed, 103 insertions(+), 113 deletions(-) diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 618065c1a..3702e775f 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -131,7 +131,7 @@ void SrsRtpNackForReceiver::update_rtt(int rtt) SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity) { nn_seq_flip_backs = 0; - high_ = low_ = 0; + begin = end = 0; capacity_ = (uint16_t)capacity; initialized_ = false; @@ -144,19 +144,21 @@ SrsRtpRingBuffer::~SrsRtpRingBuffer() srs_freepa(queue_); } -uint16_t SrsRtpRingBuffer::low() +bool SrsRtpRingBuffer::empty() { - return low_; + return begin == end; } -uint16_t SrsRtpRingBuffer::high() +int SrsRtpRingBuffer::size() { - return high_; + int size = srs_rtp_seq_distance(begin, end); + srs_assert(size >= 0); + return size; } void SrsRtpRingBuffer::advance_to(uint16_t seq) { - low_ = seq; + begin = seq; } void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt) @@ -175,52 +177,54 @@ void SrsRtpRingBuffer::remove(uint16_t at) set(at, NULL); } -void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high) +void SrsRtpRingBuffer::reset(uint16_t first, uint16_t last) { - for (uint16_t s = low; s != high; ++s) { + for (uint16_t s = first; s != last; ++s) { queue_[s % capacity_] = NULL; } } bool SrsRtpRingBuffer::overflow() { - return high_ - low_ >= capacity_; + return srs_rtp_seq_distance(begin, end) >= capacity_; } uint32_t SrsRtpRingBuffer::get_extended_highest_sequence() { - return nn_seq_flip_backs * 65536 + high_; + return nn_seq_flip_backs * 65536 + end - 1; } -void SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_low, uint16_t& nack_high) +void SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last) { if (!initialized_) { initialized_ = true; - low_ = high_ = seq; + begin = seq; + end = seq + 1; return; } // Normal sequence, seq follows high_. - if (srs_rtp_seq_distance(high_, seq)) { - nack_low = high_ + 1; - nack_high = seq; + if (srs_rtp_seq_distance(end, seq) >= 0) { + nack_first = end + 1; + nack_last = seq + 1; // When distance(seq,high_)>0 and seq0 and 1<65535. - if (seq < high_) { + // TODO: FIXME: The first flip may be dropped. + if (seq < end) { ++nn_seq_flip_backs; } - high_ = seq; + end = seq + 1; return; } // Out-of-order sequence, seq before low_. - if (srs_rtp_seq_distance(seq, low_)) { + if (srs_rtp_seq_distance(seq, begin) > 0) { // When startup, we may receive packets in chaos order. // Because we don't know the ISN(initiazlie sequence number), the first packet // we received maybe no the first packet client sent. // @remark We only log a warning, because it seems ok for publisher. - srs_warn("too old seq %u, range [%u, %u]", seq, low_, high_); + srs_warn("too old seq %u, range [%u, %u]", seq, begin, end); } } @@ -283,11 +287,11 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt // OK, we got one new RTP packet, which is not in NACK. if (!nack_info) { ++num_of_packet_received_; - uint16_t nack_low = 0, nack_high = 0; - queue_->update(seq, nack_low, nack_high); - if (srs_rtp_seq_distance(nack_low, nack_high)) { - srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_low, nack_high); - insert_into_nack_list(nack, nack_low, nack_high); + uint16_t nack_first = 0, nack_last = 0; + queue_->update(seq, nack_first, nack_last); + if (srs_rtp_seq_distance(nack_first, nack_last) > 0) { + srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); + insert_into_nack_list(nack, nack_first, nack_last); } } @@ -326,9 +330,9 @@ uint32_t SrsRtpQueue::get_interarrival_jitter() return static_cast(jitter_); } -void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end) +void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last) { - for (uint16_t s = seq_start; s != seq_end; ++s) { + for (uint16_t s = first; s != last; ++s) { nack->insert(s); ++number_of_packet_lossed_; } @@ -346,22 +350,26 @@ SrsRtpAudioQueue::~SrsRtpAudioQueue() void SrsRtpAudioQueue::notify_drop_seq(uint16_t seq) { - // TODO: FIXME: The seq may be greater than high. - queue_->advance_to(seq + 1); + uint16_t next = seq + 1; + if (srs_rtp_seq_distance(queue_->end, seq) > 0) { + seq = queue_->end; + } + srs_trace("nack drop seq=%u, drop range [%u, %u, %u]", seq, queue_->begin, next, queue_->end); + + queue_->advance_to(next); } void SrsRtpAudioQueue::notify_nack_list_full() { // TODO: FIXME: Maybe we should not drop all packets. - queue_->advance_to(queue_->high()); + queue_->advance_to(queue_->end); } void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector& frames) { - // When done, s point to the next available packet. - uint16_t next = queue_->low(); - - for (; next != queue_->high(); ++next) { + // When done, next point to the next available packet. + uint16_t next = queue_->begin; + for (; next != queue_->end; ++next) { SrsRtpPacket2* pkt = queue_->at(next); // Not found or in NACK, stop collecting frame. @@ -373,17 +381,17 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vectorlow() != next) { + if (next != queue_->begin) { // Reset the range of packets to NULL in buffer. - queue_->reset(queue_->low(), next); + queue_->reset(queue_->begin, next); - srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next); + srs_verbose("RTC collect audio [%u, %u, %u]", queue_->begin, next, queue_->end); queue_->advance_to(next); } // For audio, if overflow, clear all packets. if (queue_->overflow()) { - queue_->advance_to(queue_->high()); + queue_->advance_to(queue_->end); } } @@ -398,30 +406,20 @@ SrsRtpVideoQueue::~SrsRtpVideoQueue() void SrsRtpVideoQueue::notify_drop_seq(uint16_t seq) { - uint16_t next = next_start_of_frame(); + // If not found start frame, return the end, and we will clear queue. + uint16_t next = next_start_of_frame(seq); + srs_trace("nack drop seq=%u, drop range [%u, %u, %u]", seq, queue_->begin, next, queue_->end); - // Note that low_ mean not found, clear queue util one packet. - if (next == queue_->low()) { - next = queue_->high() - 1; - } - - // When NACK is timeout, move to the next start of frame. - srs_trace("nack drop seq=%u, drop range [%u, %u]", seq, queue_->low(), next + 1); - queue_->advance_to(next + 1); + queue_->advance_to(next); } void SrsRtpVideoQueue::notify_nack_list_full() { + // If not found start frame, return the end, and we will clear queue. uint16_t next = next_keyframe(); + srs_trace("nack overflow, drop range [%u, %u, %u]", queue_->begin, next, queue_->end); - // Note that low_ mean not found, clear queue util one packet. - if (next == queue_->low()) { - next = queue_->high() - 1; - } - - // When NACK is overflow, move to the next keyframe. - srs_trace("nack overflow drop range [%u, %u]", queue_->low(), next + 1); - queue_->advance_to(next + 1); + queue_->advance_to(next); } srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) @@ -459,13 +457,13 @@ srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2 void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector& frames) { - while (queue_->low() != queue_->high()) { + while (true) { SrsRtpPacket2* pkt = NULL; - collect_packet(nack, &pkt); + collect_frame(nack, &pkt); if (!pkt) { - return; + break; } frames.push_back(pkt); @@ -493,33 +491,27 @@ void SrsRtpVideoQueue::request_keyframe() void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack) { - uint16_t next = next_start_of_frame(); - - // Note that low_ mean not found, clear queue util one packet. - if (next == queue_->low()) { - next = queue_->high() - 1; - } - srs_trace("seq out of range [%u, %u]", queue_->low(), next); + // If not found start frame, return the end, and we will clear queue. + uint16_t next = next_start_of_frame(queue_->begin); + srs_trace("on overflow, remove range [%u, %u, %u]", queue_->begin, next, queue_->end); - for (uint16_t s = queue_->low(); s != next; ++s) { + for (uint16_t s = queue_->begin; s != next; ++s) { nack->remove(s); queue_->remove(s); } - srs_trace("force update seq %u to %u", queue_->low(), next + 1); - queue_->advance_to(next + 1); + queue_->advance_to(next); } // TODO: FIXME: Should refer to the FU-A original video frame, to avoid finding for each packet. -void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt) +void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt) { - // When done, s point to the next available packet. - uint16_t next = queue_->low(); - bool found = false; vector frame; - for (; next != queue_->high(); ++next) { + // When done, next point to the next available packet. + uint16_t next = queue_->begin; + for (; next != queue_->end; ++next) { SrsRtpPacket2* pkt = queue_->at(next); // Not found or in NACK, stop collecting frame. @@ -529,7 +521,7 @@ void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2 } // Ignore when the first packet not the start. - if (next == queue_->low() && !pkt->video_is_first_packet) { + if (next == queue_->begin && !pkt->video_is_first_packet) { return; } @@ -549,21 +541,20 @@ void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2 return; } - uint16_t cur = next - 1; - if (cur != queue_->high()) { + if (next != queue_->begin) { // Reset the range of packets to NULL in buffer. - queue_->reset(queue_->low(), next); + queue_->reset(queue_->begin, next); - srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next); + srs_verbose("RTC collect video [%u, %u, %u]", queue_->begin, next, queue_->end); queue_->advance_to(next); } // Merge packets to one packet. - covert_packet(frame, ppkt); + covert_frame(frame, ppkt); return; } -void SrsRtpVideoQueue::covert_packet(std::vector& frame, SrsRtpPacket2** ppkt) +void SrsRtpVideoQueue::covert_frame(std::vector& frame, SrsRtpPacket2** ppkt) { if (frame.size() == 1) { *ppkt = frame[0]; @@ -620,35 +611,34 @@ void SrsRtpVideoQueue::covert_packet(std::vector& frame, SrsRtpP *ppkt = pkt; } -uint16_t SrsRtpVideoQueue::next_start_of_frame() +uint16_t SrsRtpVideoQueue::next_start_of_frame(uint16_t seq) { - if (queue_->low() == queue_->high()) { - return queue_->low(); + uint16_t s = seq; + if (srs_rtp_seq_distance(seq, queue_->begin) >= 0) { + s = queue_->begin + 1; } - for (uint16_t s = queue_->low() + 1 ; s != queue_->high(); ++s) { + for (; s != queue_->end; ++s) { SrsRtpPacket2* pkt = queue_->at(s); if (pkt && pkt->video_is_first_packet) { return s; } } - return queue_->low(); + return queue_->end; } uint16_t SrsRtpVideoQueue::next_keyframe() { - if (queue_->low() == queue_->high()) { - return queue_->low(); - } + uint16_t s = queue_->begin + 1; - for (uint16_t s = queue_->low() + 1 ; s != queue_->high(); ++s) { + for (; s != queue_->end; ++s) { SrsRtpPacket2* pkt = queue_->at(s); if (pkt && pkt->video_is_idr && pkt->video_is_first_packet) { return s; } } - return queue_->low(); + return queue_->end; } diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 60ee7a162..7b3d496e9 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -66,9 +66,9 @@ struct SrsRtpNackInfo // distance(low=3, high=65534) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)65534) === 5 // distance(low=65532, high=65534) === (int16_t)(uint16_t)((uint16_t)65532-(uint16_t)65534) === -2 // For RTP sequence, it's only uint16 and may flip back, so 3 maybe 3+0xffff. -inline bool srs_rtp_seq_distance(const uint16_t& low, const uint16_t& high) +inline int16_t srs_rtp_seq_distance(const uint16_t& low, const uint16_t& high) { - return ((int16_t)(high - low)) > 0; + return (int16_t)(high - low); } class SrsRtpNackForReceiver @@ -76,7 +76,7 @@ class SrsRtpNackForReceiver private: struct SeqComp { bool operator()(const uint16_t& low, const uint16_t& high) const { - return srs_rtp_seq_distance(low, high); + return srs_rtp_seq_distance(low, high) > 0; } }; private: @@ -124,29 +124,34 @@ private: uint64_t nn_seq_flip_backs; // Whether initialized, because we use uint16 so we can't use -1. bool initialized_; -private: - // Current position we are working at. - uint16_t low_; - uint16_t high_; +public: + // The begin iterator for ring buffer. + // For example, when got 1 elems, the begin is 0. + uint16_t begin; + // The end iterator for ring buffer. + // For example, when got 1 elems, the end is 1. + uint16_t end; public: SrsRtpRingBuffer(int capacity); virtual ~SrsRtpRingBuffer(); public: - // Move the position of buffer. - uint16_t low(); - uint16_t high(); + // Whether the ring buffer is empty. + bool empty(); + // Get the count of elems in ring buffer. + int size(); + // Move the low position of buffer to seq. void advance_to(uint16_t seq); // Free the packet at position. void set(uint16_t at, SrsRtpPacket2* pkt); void remove(uint16_t at); - // Directly reset range [low, high] to NULL. - void reset(uint16_t low, uint16_t high); + // Directly reset range [first, last) to NULL. + void reset(uint16_t first, uint16_t last); // Whether queue overflow or heavy(too many packets and need clear). bool overflow(); // The highest sequence number, calculate the flip back base. uint32_t get_extended_highest_sequence(); - // Update the sequence, got the nack range by [low, high]. - void update(uint16_t seq, uint16_t& nack_low, uint16_t& nack_high); + // Update the sequence, got the nack range by [first, last). + void update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last); // Get the packet by seq. SrsRtpPacket2* at(uint16_t seq); }; @@ -176,7 +181,7 @@ public: uint32_t get_cumulative_number_of_packets_lost(); uint32_t get_interarrival_jitter(); private: - void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end); + void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last); }; class SrsRtpAudioQueue : public SrsRtpQueue @@ -206,14 +211,9 @@ public: void request_keyframe(); private: virtual void on_overflow(SrsRtpNackForReceiver* nack); - virtual void collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt); - virtual void covert_packet(std::vector& frame, SrsRtpPacket2** ppkt); - // For video, get the next start packet of frame. - // @remark If not found, return the low_, which should never be the "next" one, - // because it MAY or NOT current start packet of frame but never be the next. - uint16_t next_start_of_frame(); - // For video, get the next seq of keyframe. - // @remark Return low_ if not found. + virtual void collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt); + virtual void covert_frame(std::vector& frame, SrsRtpPacket2** ppkt); + uint16_t next_start_of_frame(uint16_t seq); uint16_t next_keyframe(); }; diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index abaee8484..a5eb4d404 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -24,6 +24,6 @@ #ifndef SRS_CORE_VERSION4_HPP #define SRS_CORE_VERSION4_HPP -#define SRS_VERSION4_REVISION 24 +#define SRS_VERSION4_REVISION 25 #endif