diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index bfed41778..28637e738 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -134,20 +134,123 @@ void SrsRtpNackList::update_rtt(int rtt) opts_.nack_interval = rtt_; } -SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) - : nack_(this, capacity * 2 / 3) +SrsRtpRingBuffer::SrsRtpRingBuffer(size_t capacity) { + nn_seq_flip_backs = 0; + high_ = low_ = 0; capacity_ = capacity; - head_sequence_ = 0; - highest_sequence_ = 0; initialized_ = false; - start_collected_ = false; + queue_ = new SrsRtpSharedPacket*[capacity_]; memset(queue_, 0, sizeof(SrsRtpSharedPacket*) * capacity); +} + +SrsRtpRingBuffer::~SrsRtpRingBuffer() +{ + srs_freepa(queue_); +} + +void SrsRtpRingBuffer::set(uint16_t at, SrsRtpSharedPacket* pkt) +{ + SrsRtpSharedPacket* p = queue_[at % capacity_]; + + if (p) { + srs_freep(p); + } + + queue_[at % capacity_] = pkt; +} + +void SrsRtpRingBuffer::remove(uint16_t at) +{ + set(at, NULL); +} + +uint16_t SrsRtpRingBuffer::next_start_of_frame() +{ + if (low_ == high_) { + return low_; + } + + for (uint16_t s = low_ + 1 ; s != high_; ++s) { + SrsRtpSharedPacket*& pkt = queue_[s % capacity_]; + if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) { + return s; + } + } + + return low_; +} + +uint16_t SrsRtpRingBuffer::next_keyframe() +{ + if (low_ == high_) { + return low_; + } + + for (uint16_t s = low_ + 1 ; s != high_; ++s) { + SrsRtpSharedPacket*& pkt = queue_[s % capacity_]; + if (pkt && pkt->rtp_payload_header->is_key_frame && pkt->rtp_payload_header->is_first_packet_of_frame) { + return s; + } + } + + return low_; +} + +uint32_t SrsRtpRingBuffer::get_extended_highest_sequence() +{ + return nn_seq_flip_backs * 65536 + high_; +} + +void SrsRtpRingBuffer::update(uint16_t seq, bool startup, uint16_t& nack_low, uint16_t& nack_high) +{ + if (!initialized_) { + initialized_ = true; + low_ = high_ = seq; + return; + } + + // Normal sequence, seq follows high_. + if (srs_rtp_seq_distance(high_, seq)) { + nack_low = high_ + 1; + nack_high = seq; + + // When distance(seq,high_)>0 and seq0 and 1<65535. + if (seq < high_) { + srs_verbose("warp around, flip_back=%" PRId64, nn_seq_flip_backs); + ++nn_seq_flip_backs; + } + high_ = seq; + return; + } + + // Out-of-order sequence, seq before low_. + if (srs_rtp_seq_distance(seq, low_)) { + // When startup, we may receive packets in chaos order. + // Because we don't know the ISN(initiazlie sequence number), the first packet + // we received maybe no the first packet client sent. + if (startup) { + nack_low = seq + 1; + nack_high = low_; + + srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", low_, seq); + low_ = seq; + } else { + srs_verbose("seq=%u, rtx success, too old", seq); + } + } +} + +SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) + : nack_(this, capacity * 2 / 3) +{ + nn_collected_frames = 0; + queue_ = new SrsRtpRingBuffer(capacity); - cycle_ = 0; jitter_ = 0; - last_trans_time_ = 0; + last_trans_time_ = -1; pre_number_of_packet_received_ = 0; pre_number_of_packet_lossed_ = 0; @@ -162,137 +265,91 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) SrsRtpQueue::~SrsRtpQueue() { - srs_freepa(queue_); + srs_freep(queue_); } srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) { srs_error_t err = srs_success; - uint16_t seq = rtp_pkt->rtp_header.get_sequence(); - // TODO: FIXME: Update time for each packet, may hurt performance. srs_utime_t now = srs_update_system_time(); - // First packet recv, init head_sequence and highest_sequence. - if (!initialized_) { - initialized_ = true; - head_sequence_ = seq; - highest_sequence_ = seq; + uint16_t seq = rtp_pkt->rtp_header.get_sequence(); + SrsRtpNackInfo* nack_info = nack_.find(seq); + if (nack_info) { + int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0; + (void)nack_rtt; + srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms", + seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt); + nack_.remove(seq); + } - ++num_of_packet_received_; + // Calc jitter time, ignore nack packets. + // TODO: FIXME: Covert time to srs_utime_t. + if (last_trans_time_ == -1) { + last_trans_time_ = now / 1000 - rtp_pkt->rtp_header.get_timestamp() / 90; + } else if (!nack_info) { + int trans_time = now / 1000 - rtp_pkt->rtp_header.get_timestamp() / 90; - // TODO: FIXME: Covert time to srs_utime_t. - last_trans_time_ = now/1000 - rtp_pkt->rtp_header.get_timestamp()/90; - } else { - SrsRtpNackInfo* nack_info = NULL; - if ((nack_info = nack_.find(seq)) != NULL) { - int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0; - (void)nack_rtt; - srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms", - seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt); - nack_.remove(seq); - } else { - // Calc jitter. - { - int trans_time = now / 1000 - rtp_pkt->rtp_header.get_timestamp() / 90; - - int cur_jitter = trans_time - last_trans_time_; - if (cur_jitter < 0) { - cur_jitter = -cur_jitter; - } - - last_trans_time_ = trans_time; - - jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast(cur_jitter) / 16.0); - srs_verbose("jitter=%.2f", jitter_); - } - - ++num_of_packet_received_; - // seq > highest_sequence_ - if (seq_cmp(highest_sequence_, seq)) { - insert_into_nack_list(highest_sequence_ + 1, seq); - - if (seq < highest_sequence_) { - srs_verbose("warp around, cycle=%lu", cycle_); - ++cycle_; - } - highest_sequence_ = seq; - } else { - // Because we don't know the ISN(initiazlie sequence number), the first packet - // we received maybe no the first packet client sented. - if (!start_collected_) { - if (seq_cmp(seq, head_sequence_)) { - srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", head_sequence_, seq); - head_sequence_ = seq; - } - insert_into_nack_list(seq + 1, highest_sequence_); - } else { - srs_verbose("seq=%u, rtx success, too old", seq); - } - } + int cur_jitter = trans_time - last_trans_time_; + if (cur_jitter < 0) { + cur_jitter = -cur_jitter; } + + last_trans_time_ = trans_time; + + jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast(cur_jitter) / 16.0); + srs_verbose("jitter=%.2f", jitter_); } - // Check seqs out of range. - if (head_sequence_ + capacity_ < highest_sequence_) { + // OK, we got one new RTP packet, which is not in NACK. + if (!nack_info) { + ++num_of_packet_received_; + uint16_t nack_low = 0, nack_high = 0; + queue_->update(seq, !nn_collected_frames, nack_low, nack_high); + if (srs_rtp_seq_distance(nack_low, nack_high)) { + srs_trace("update nack seq=%u, startup=%d, nack range [%u, %u]", seq, !nn_collected_frames, nack_low, nack_high); + insert_into_nack_list(nack_low, nack_high); + } + } + + // When packets overflow, collect frame and move head to next frame start. + if (queue_->overflow()) { srs_verbose("try collect packet becuase seq out of range"); collect_packet(); - } - while (head_sequence_ + capacity_ < highest_sequence_) { - srs_trace("seqs out of range, head seq=%u, hightest seq=%u", head_sequence_, highest_sequence_); - remove(head_sequence_); - uint16_t s = head_sequence_ + 1; - for ( ; s != highest_sequence_; ++s) { - SrsRtpSharedPacket*& pkt = queue_[s % capacity_]; - // Choose the new head sequence. Must be the first packet of frame. - if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) { - srs_trace("find except, update head seq from %u to %u when seqs out of range", head_sequence_, s); - head_sequence_ = s; - break; - } - - // Drop the seq. + + uint16_t next = queue_->next_start_of_frame(); + + // Note that low_ mean not found, clear queue util one packet. + if (next == queue_->low()) { + next = queue_->high() - 1; + } + srs_trace("seqs out of range, seq range [%u, %u]", queue_->low(), next); + + for (uint16_t s = queue_->low(); s != next; ++s) { nack_.remove(s); - srs_verbose("seqs out of range, drop seq=%u", s); - if (pkt && pkt->rtp_header.get_sequence() == s) { - delete pkt; - pkt = NULL; - } + queue_->remove(s); } - srs_trace("force update, update head seq from %u to %u when seqs out of range", head_sequence_, s); - head_sequence_ = s; - } - SrsRtpSharedPacket*& old_pkt = queue_[seq % capacity_]; - if (old_pkt) { - delete old_pkt; + srs_trace("force update, update head seq from %u to %u when seqs out of range", queue_->low(), next + 1); + queue_->advance_to(next + 1); } // TODO: FIXME: Change to ptr of ptr. - old_pkt = rtp_pkt->copy(); + queue_->set(seq, rtp_pkt->copy()); - // Marker bit means the last packet of frame received. - if (rtp_pkt->rtp_header.get_marker() || (highest_sequence_ - head_sequence_ >= capacity_ / 2) || one_packet_per_frame_) { + // Collect packets to frame when: + // 1. Marker bit means the last packet of frame received. + // 2. Queue has lots of packets, the load is heavy. + // 3. The frame contains only one packet for each frame. + if (rtp_pkt->rtp_header.get_marker() || queue_->is_heavy() || one_packet_per_frame_) { collect_packet(); } return err; } -srs_error_t SrsRtpQueue::remove(uint16_t seq) -{ - srs_error_t err = srs_success; - - SrsRtpSharedPacket*& pkt = queue_[seq % capacity_]; - if (pkt && pkt->rtp_header.get_sequence() == seq) { - delete pkt; - pkt = NULL; - } - - return err; -} - void SrsRtpQueue::get_and_clean_collected_frames(std::vector >& frames) { frames.swap(frames_); @@ -309,44 +366,35 @@ bool SrsRtpQueue::get_and_clean_if_needed_request_key_frame() void SrsRtpQueue::notify_drop_seq(uint16_t seq) { - uint16_t s = seq + 1; - for ( ; s != highest_sequence_; ++s) { - SrsRtpSharedPacket* pkt = queue_[s % capacity_]; - if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) { - break; - } + uint16_t next = queue_->next_start_of_frame(); + + // Note that low_ mean not found, clear queue util one packet. + if (next == queue_->low()) { + next = queue_->high() - 1; } - srs_verbose("drop seq=%u, highest seq=%u, update head seq %u to %u", seq, highest_sequence_, head_sequence_, s); - head_sequence_ = s; + // When NACK is timeout, move to the next start of frame. + srs_trace("nack drop seq=%u, drop range [%u, %u]", seq, queue_->low(), next + 1); + queue_->advance_to(next + 1); } void SrsRtpQueue::notify_nack_list_full() { - bool found_key_frame = false; - while (head_sequence_ <= highest_sequence_) { - SrsRtpSharedPacket* pkt = queue_[head_sequence_ % capacity_]; - if (pkt && pkt->rtp_payload_header->is_key_frame && pkt->rtp_payload_header->is_first_packet_of_frame) { - found_key_frame = true; - srs_verbose("found firsr packet of key frame, seq=%u", head_sequence_); - break; - } + uint16_t next = queue_->next_keyframe(); - nack_.remove(head_sequence_); - remove(head_sequence_); - ++head_sequence_; + // Note that low_ mean not found, clear queue util one packet. + if (next == queue_->low()) { + next = queue_->high() - 1; } - if (!found_key_frame) { - srs_verbose("no found first packet of key frame, request key frame"); - request_key_frame_ = true; - head_sequence_ = highest_sequence_; - } + // When NACK is overflow, move to the next keyframe. + srs_trace("nack overflow drop range [%u, %u]", queue_->low(), next + 1); + queue_->advance_to(next + 1); } uint32_t SrsRtpQueue::get_extended_highest_sequence() { - return cycle_ * 65536 + highest_sequence_; + return queue_->get_extended_highest_sequence(); } uint8_t SrsRtpQueue::get_fraction_lost() @@ -392,8 +440,8 @@ void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end) void SrsRtpQueue::collect_packet() { vector frame; - for (uint16_t s = head_sequence_; s != highest_sequence_; ++s) { - SrsRtpSharedPacket* pkt = queue_[s % capacity_]; + for (uint16_t s = queue_->low(); s != queue_->high(); ++s) { + SrsRtpSharedPacket* pkt = queue_->at(s); if (nack_.find(s) != NULL) { srs_verbose("seq=%u, found in nack list when collect frame", s); @@ -401,20 +449,18 @@ void SrsRtpQueue::collect_packet() } // We must collect frame from first packet to last packet. - if (s == head_sequence_ && pkt->rtp_payload_size() != 0 && !pkt->rtp_payload_header->is_first_packet_of_frame) { + if (s == queue_->low() && pkt->rtp_payload_size() != 0 && !pkt->rtp_payload_header->is_first_packet_of_frame) { break; } frame.push_back(pkt->copy()); if (pkt->rtp_header.get_marker() || one_packet_per_frame_) { - if (!start_collected_) { - start_collected_ = true; - } + nn_collected_frames++; frames_.push_back(frame); frame.clear(); - srs_verbose("head seq=%u, update to %u because collect one full farme", head_sequence_, s + 1); - head_sequence_ = s + 1; + srs_verbose("head seq=%u, update to %u because collect one full farme", queue_->low(), s + 1); + queue_->advance_to(s + 1); } } diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 4b2fb8929..70631e801 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -61,21 +61,24 @@ struct SrsRtpNackInfo int req_nack_count_; }; -inline bool seq_cmp(const uint16_t& l, const uint16_t& r) +// The "distance" between two uint16 number, for example: +// distance(low=3, high=5) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)5) === -2 +// distance(low=3, high=65534) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)65534) === 5 +// distance(low=65532, high=65534) === (int16_t)(uint16_t)((uint16_t)65532-(uint16_t)65534) === -2 +// For RTP sequence, it's only uint16 and may flip back, so 3 maybe 3+0xffff. +inline bool srs_rtp_seq_distance(const uint16_t& low, const uint16_t& high) { - return ((int16_t)(r - l)) > 0; + return ((int16_t)(high - low)) > 0; } -struct SeqComp -{ - bool operator()(const uint16_t& l, const uint16_t& r) const - { - return seq_cmp(l, r); - } -}; - class SrsRtpNackList { +private: + struct SeqComp { + bool operator()(const uint16_t& low, const uint16_t& high) const { + return srs_rtp_seq_distance(low, high); + } + }; private: // Nack queue, seq order, oldest to newest. std::map queue_; @@ -101,27 +104,64 @@ public: void update_rtt(int rtt); }; -class SrsRtpQueue +// For UDP, the packets sequence may present as bellow: +// [seq1(done)|seq2|seq3 ... seq10|seq11(lost)|seq12|seq13] +// \___(head_sequence_) \ \___(highest_sequence_) +// \___(no received, in nack list) +// * seq1: The packet is done, we already got the entire frame and processed it. +// * seq2,seq3,...,seq10,seq12,seq13: We are processing theses packets, for example, some FU-A or NALUs, +// but not an entire video frame right now. +// * seq10: This packet is lost or not received, we put it in the nack list. +// We store the received packets in ring buffer. +class SrsRtpRingBuffer { private: - /* - *[seq1|seq2|seq3|seq4|seq5 ... seq10|seq11(loss)|seq12(loss)|seq13] - * \___(head_sequence_) \ \___(highest_sequence_) - * \___(no received, in nack list) - */ // Capacity of the ring-buffer. uint16_t capacity_; - // Thei highest sequence we have receive. - uint16_t highest_sequence_; - // The sequence waitting to read. - uint16_t head_sequence_; - bool initialized_; - bool start_collected_; // Ring bufer. SrsRtpSharedPacket** queue_; + // Increase one when uint16 flip back, for get_extended_highest_sequence. + uint64_t nn_seq_flip_backs; + // Whether initialized, because we use uint16 so we can't use -1. + bool initialized_; +private: + // Current position we are working at. + uint16_t low_; + uint16_t high_; +public: + SrsRtpRingBuffer(size_t capacity); + virtual ~SrsRtpRingBuffer(); +public: + uint16_t low() { return low_; } + uint16_t high() { return high_; } + void advance_to(uint16_t seq) { low_ = seq; } + void set(uint16_t at, SrsRtpSharedPacket* pkt); + void remove(uint16_t at); + bool overflow() { return low_ + capacity_ < high_; } + bool is_heavy() { return high_ - low_ >= capacity_ / 2; } + // Get the next start packet of frame. + // @remark If not found, return the low_, which should never be the "next" one, + // because it MAY or NOT current start packet of frame but never be the next. + uint16_t next_start_of_frame(); + // Get the next seq of keyframe. + // @remark Return low_ if not found. + uint16_t next_keyframe(); + // The highest sequence number, calculate the flip back base. + uint32_t get_extended_highest_sequence(); + // Update the sequence, got the nack range by [low, high]. + void update(uint16_t seq, bool startup, uint16_t& nack_low, uint16_t& nack_high); + // Get the packet by seq. + SrsRtpSharedPacket* at(uint16_t seq) { return queue_[seq % capacity_]; } +}; + +class SrsRtpQueue +{ +private: + uint64_t nn_collected_frames; + SrsRtpRingBuffer* queue_; private: - uint64_t cycle_; double jitter_; + // TODO: FIXME: Covert time to srs_utime_t. int64_t last_trans_time_; uint64_t pre_number_of_packet_received_; uint64_t pre_number_of_packet_lossed_; @@ -139,7 +179,6 @@ public: virtual ~SrsRtpQueue(); public: srs_error_t insert(SrsRtpSharedPacket* rtp_pkt); - srs_error_t remove(uint16_t seq); public: void get_and_clean_collected_frames(std::vector >& frames); bool get_and_clean_if_needed_request_key_frame();