diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 5ccac9172..13f0d4301 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -507,8 +507,8 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid) realtime = true; // TODO: FIXME: Config the capacity? - audio_queue_ = new SrsRtpRingBuffer(100); - video_queue_ = new SrsRtpRingBuffer(1000); + audio_queue_ = new SrsRtpRingBuffer(100); + video_queue_ = new SrsRtpRingBuffer(1000); nn_simulate_nack_drop = 0; nack_enabled_ = false; @@ -1378,9 +1378,10 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); session_ = session; - video_queue_ = new SrsRtpVideoQueue(1000); + request_keyframe_ = false; + video_queue_ = new SrsRtpRingBuffer(1000); video_nack_ = new SrsRtpNackForReceiver(video_queue_, 1000 * 2 / 3); - audio_queue_ = new SrsRtpAudioQueue(100); + audio_queue_ = new SrsRtpRingBuffer(100); audio_nack_ = new SrsRtpNackForReceiver(audio_queue_, 100 * 2 / 3); source = NULL; @@ -1490,7 +1491,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr } } -srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue) +srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue) { srs_error_t err = srs_success; @@ -1507,10 +1508,12 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue) stream.write_2bytes(7); stream.write_4bytes(ssrc); // TODO: FIXME: Should be 1? - uint8_t fraction_lost = rtp_queue->get_fraction_lost(); - uint32_t cumulative_number_of_packets_lost = rtp_queue->get_cumulative_number_of_packets_lost() & 0x7FFFFF; + // TODO: FIXME: Implements it. + // TODO: FIXME: See https://github.com/ossrs/srs/blob/f81d35d20f04ebec01915cb78a882e45b7ee8800/trunk/src/app/srs_app_rtc_queue.cpp + uint8_t fraction_lost = 0; + uint32_t cumulative_number_of_packets_lost = 0 & 0x7FFFFF; uint32_t extended_highest_sequence = rtp_queue->get_extended_highest_sequence(); - uint32_t interarrival_jitter = rtp_queue->get_interarrival_jitter(); + uint32_t interarrival_jitter = 0; uint32_t rr_lsr = 0; uint32_t rr_dlsr = 0; @@ -1648,15 +1651,17 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf) { srs_error_t err = srs_success; + // Decode the RTP packet from buffer. SrsRtpPacket2* pkt = new SrsRtpPacket2(); + if (true) { + pkt->set_decode_handler(this); + pkt->shared_msg = new SrsSharedPtrMessage(); + pkt->shared_msg->wrap(buf, nb_buf); - pkt->set_decode_handler(this); - pkt->shared_msg = new SrsSharedPtrMessage(); - pkt->shared_msg->wrap(buf, nb_buf); - - SrsBuffer b(buf, nb_buf); - if ((err = pkt->decode(&b)) != srs_success) { - return srs_error_wrap(err, "decode rtp packet"); + SrsBuffer b(buf, nb_buf); + if ((err = pkt->decode(&b)) != srs_success) { + return srs_error_wrap(err, "decode rtp packet"); + } } // For NACK simulator, drop packet. @@ -1666,15 +1671,27 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf) return err; } + // For source to consume packet. uint32_t ssrc = pkt->rtp_header.get_ssrc(); if (ssrc == audio_ssrc) { - return on_audio(pkt); + if ((err = on_audio(pkt)) != srs_success) { + return srs_error_wrap(err, "on audio"); + } } else if (ssrc == video_ssrc) { - return on_video(pkt); + if ((err = on_video(pkt)) != srs_success) { + return srs_error_wrap(err, "on video"); + } } else { srs_freep(pkt); return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc); } + + // For NACK to handle packet. + if (nack_enabled_ && (err = on_nack(pkt)) != srs_success) { + return srs_error_wrap(err, "on nack"); + } + + return err; } void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) @@ -1709,40 +1726,6 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt) source->on_rtp(pkt); return err; - - // TODO: FIXME: Directly dispatch to consumer for performance? - std::vector frames; - - if (nack_enabled_) { - // TODO: FIXME: Error check. - audio_queue_->consume(audio_nack_, pkt); - - check_send_nacks(audio_nack_, audio_ssrc); - - // Collect all audio frames. - audio_queue_->collect_frames(audio_nack_, frames); - } else { - // TODO: FIXME: Error check. - audio_queue_->consume(NULL, pkt); - - // Collect all audio frames. - audio_queue_->collect_frames(NULL, frames); - } - - for (size_t i = 0; i < frames.size(); ++i) { - SrsRtpPacket2* frame = frames[i]; - - // TODO: FIXME: Check error. - source->on_rtp(frame); - - if (nn_audio_frames++ == 0) { - SrsRtpHeader* h = &frame->rtp_header; - SrsRtpRawPayload* payload = dynamic_cast(frame->payload); - srs_trace("RTC got Opus seq=%u, ssrc=%u, ts=%u, %d bytes", h->get_sequence(), h->get_ssrc(), h->get_timestamp(), payload->nn_payload); - } - } - - return err; } srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) @@ -1754,152 +1737,38 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) // TODO: FIXME: Error check. source->on_rtp(pkt); - if (video_queue_->should_request_key_frame()) { - // TODO: FIXME: Check error. - send_rtcp_fb_pli(video_ssrc); - } - - return err; - - std::vector frames; - - if (nack_enabled_) { - // TODO: FIXME: Error check. - video_queue_->consume(video_nack_, pkt); - - check_send_nacks(video_nack_, video_ssrc); - - // Collect video frames. - video_queue_->collect_frames(video_nack_, frames); - } else { - // TODO: FIXME: Error check. - video_queue_->consume(NULL, pkt); - - // Collect video frames. - video_queue_->collect_frames(NULL, frames); - } - - for (size_t i = 0; i < frames.size(); ++i) { - SrsRtpPacket2* frame = frames[i]; + if (request_keyframe_) { + request_keyframe_ = false; - // TODO: FIXME: Check error. - on_video_frame(frame); - - srs_freep(frame); - } - - if (video_queue_->should_request_key_frame()) { // TODO: FIXME: Check error. send_rtcp_fb_pli(video_ssrc); } - return srs_success; + return err; } -srs_error_t SrsRtcPublisher::on_video_frame(SrsRtpPacket2* frame) +srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; - int64_t timestamp = frame->rtp_header.get_timestamp(); - - // No FU-A, because we convert it to RAW RTP packet. - if (frame->nalu_type == (SrsAvcNaluType)kFuA) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid FU-A"); - } - - // For STAP-A, it must be SPS/PPS, and only one packet. - if (frame->nalu_type == (SrsAvcNaluType)kStapA) { - SrsRtpSTAPPayload* payload = dynamic_cast(frame->payload); - if (!payload) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload"); - } - - SrsSample* sps = payload->get_sps(); - SrsSample* pps = payload->get_pps(); - if (!sps || !sps->size) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload no sps"); - } - if (!pps || !pps->size) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload no pps"); - } - - // TODO: FIXME: Directly covert to sample for performance. - // 5 bytes flv tag header. - // 5 bytes sps/pps sequence header. - // 6 bytes size for sps/pps, each is 3 bytes. - int nn_payload = sps->size + pps->size + 5 + 5 + 6; - char* data = new char[nn_payload]; - SrsBuffer buf(data, nn_payload); - - buf.write_1bytes(0x17); // Keyframe. - buf.write_1bytes(0x00); // Sequence header. - buf.write_3bytes(0x00); // CTS. - - // FIXME: Replace magic number for avc_demux_sps_pps. - buf.write_1bytes(0x01); // configurationVersion - buf.write_1bytes(0x42); // AVCProfileIndication, 0x42 = Baseline - buf.write_1bytes(0xC0); // profile_compatibility - buf.write_1bytes(0x1f); // AVCLevelIndication, 0x1f = Level3.1 - buf.write_1bytes(0x03); // lengthSizeMinusOne, size of length for NALU. - - buf.write_1bytes(0x01); // numOfSequenceParameterSets - buf.write_2bytes(sps->size); // sequenceParameterSetLength - buf.write_bytes(sps->bytes, sps->size); // sps - - buf.write_1bytes(0x01); // numOfPictureParameterSets - buf.write_2bytes(pps->size); // pictureParameterSetLength - buf.write_bytes(pps->bytes, pps->size); // pps - - SrsMessageHeader header; - header.message_type = RTMP_MSG_VideoMessage; - // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = (timestamp / 90) & 0x3fffffff; - SrsCommonMessage* shared_video = new SrsCommonMessage(); - SrsAutoFree(SrsCommonMessage, shared_video); - // TODO: FIXME: Check error. - shared_video->create(&header, data, nn_payload); - return source->on_video(shared_video); - } - - // For RAW NALU, should be one RAW packet. - SrsRtpRawPayload* payload = dynamic_cast(frame->payload); - if (!payload) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "RAW-NALU payload"); - } - if (!payload->nn_payload) { + uint16_t seq = pkt->rtp_header.get_sequence(); + SrsRtpNackInfo* nack_info = audio_nack_->find(seq); + if (nack_info) { return err; } - // TODO: FIXME: Directly covert to sample for performance. - // 5 bytes FLV tag header. - // 4 bytes NALU IBMF header, define by sequence header. - int nn_payload = payload->nn_payload + 5 + 4; - char* data = new char[nn_payload]; - SrsBuffer buf(data, nn_payload); - - if (frame->nalu_type == SrsAvcNaluTypeIDR) { - buf.write_1bytes(0x17); // Keyframe. - - SrsRtpHeader* h = &frame->rtp_header; - srs_trace("RTC got IDR seq=%u, ssrc=%u, ts=%u, %d bytes", h->get_sequence(), h->get_ssrc(), h->get_timestamp(), nn_payload); - } else { - buf.write_1bytes(0x27); // Not-Keyframe. + uint16_t nack_first = 0, nack_last = 0; + if (!audio_queue_->update(seq, nack_first, nack_last)) { + srs_warn("too old seq %u, range [%u, %u]", seq, audio_queue_->begin, audio_queue_->end); } - buf.write_1bytes(0x01); // Not-SequenceHeader. - buf.write_3bytes(0x00); // CTS. - buf.write_4bytes(payload->nn_payload); // Size of NALU. - buf.write_bytes(payload->payload, payload->nn_payload); // NALU. + if (srs_rtp_seq_distance(nack_first, nack_last) > 0) { + srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); + audio_nack_->insert(nack_first, nack_last); + audio_nack_->check_queue_size(); + } - SrsMessageHeader header; - header.message_type = RTMP_MSG_VideoMessage; - // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = (timestamp / 90) & 0x3fffffff; - SrsCommonMessage* shared_video = new SrsCommonMessage(); - SrsAutoFree(SrsCommonMessage, shared_video); - // TODO: FIXME: Check error. - shared_video->create(&header, data, nn_payload); - return source->on_video(shared_video); + return err; } srs_error_t SrsRtcPublisher::on_rtcp(char* data, int nb_data) @@ -2250,7 +2119,7 @@ void SrsRtcPublisher::request_keyframe() int pcid = session_->context_id(); srs_trace("RTC play=[%d][%d] request keyframe from publish=[%d][%d]", ::getpid(), scid, ::getpid(), pcid); - video_queue_->request_keyframe(); + request_keyframe_ = true; } srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick) diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index f598a5e14..d3e71b7d7 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -53,13 +53,11 @@ class SrsSharedPtrMessage; class SrsRtcSource; class SrsRtpPacket2; class ISrsUdpSender; -class SrsRtpQueue; -class SrsRtpAudioQueue; -class SrsRtpVideoQueue; class SrsRtpPacket2; class ISrsCodec; class SrsRtpNackForReceiver; class SrsRtpIncommingVideoFrame; +class SrsRtpRingBuffer; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -208,8 +206,8 @@ private: uint16_t video_payload_type; uint32_t video_ssrc; // NACK ARQ ring buffer. - SrsRtpRingBuffer* audio_queue_; - SrsRtpRingBuffer* video_queue_; + SrsRtpRingBuffer* audio_queue_; + SrsRtpRingBuffer* video_queue_; // Simulators. int nn_simulate_nack_drop; private: @@ -271,9 +269,10 @@ private: uint32_t video_ssrc; uint32_t audio_ssrc; private: - SrsRtpVideoQueue* video_queue_; + bool request_keyframe_; + SrsRtpRingBuffer* video_queue_; SrsRtpNackForReceiver* video_nack_; - SrsRtpAudioQueue* audio_queue_; + SrsRtpRingBuffer* audio_queue_; SrsRtpNackForReceiver* audio_nack_; private: SrsRequest* req; @@ -292,7 +291,7 @@ public: srs_error_t initialize(uint32_t vssrc, uint32_t assrc, SrsRequest* req); private: void check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc); - srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue); + srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue); srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc); srs_error_t send_rtcp_fb_pli(uint32_t ssrc); public: @@ -301,7 +300,7 @@ public: private: srs_error_t on_audio(SrsRtpPacket2* pkt); srs_error_t on_video(SrsRtpPacket2* pkt); - srs_error_t on_video_frame(SrsRtpPacket2* frame); + srs_error_t on_nack(SrsRtpPacket2* pkt); public: srs_error_t on_rtcp(char* data, int nb_data); private: diff --git a/trunk/src/app/srs_app_rtc_queue.cpp b/trunk/src/app/srs_app_rtc_queue.cpp index a610e31b1..9f1f86cb4 100644 --- a/trunk/src/app/srs_app_rtc_queue.cpp +++ b/trunk/src/app/srs_app_rtc_queue.cpp @@ -41,13 +41,13 @@ SrsRtpNackInfo::SrsRtpNackInfo() req_nack_count_ = 0; } -SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpQueue* rtp_queue, size_t queue_size) +SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue_size) { max_queue_size_ = queue_size; - rtp_queue_ = rtp_queue; + rtp_ = rtp; pre_check_time_ = 0; - srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%ld, nack_interval=%ld" + srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%" PRId64 ", nack_interval=%" PRId64, max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval); } @@ -55,10 +55,11 @@ SrsRtpNackForReceiver::~SrsRtpNackForReceiver() { } -void SrsRtpNackForReceiver::insert(uint16_t seq) +void SrsRtpNackForReceiver::insert(uint16_t first, uint16_t last) { - // FIXME: full, drop packet, and request key frame. - queue_[seq] = SrsRtpNackInfo(); + for (uint16_t s = first; s != last; ++s) { + queue_[s] = SrsRtpNackInfo(); + } } void SrsRtpNackForReceiver::remove(uint16_t seq) @@ -80,7 +81,7 @@ SrsRtpNackInfo* SrsRtpNackForReceiver::find(uint16_t seq) void SrsRtpNackForReceiver::check_queue_size() { if (queue_.size() >= max_queue_size_) { - rtp_queue_->notify_nack_list_full(); + rtp_->notify_nack_list_full(); } } @@ -100,7 +101,7 @@ void SrsRtpNackForReceiver::get_nack_seqs(vector& seqs) int alive_time = now - nack_info.generate_time_; if (alive_time > opts_.max_alive_time || nack_info.req_nack_count_ > opts_.max_count) { - rtp_queue_->notify_drop_seq(seq); + rtp_->notify_drop_seq(seq); queue_.erase(iter++); continue; } @@ -127,550 +128,110 @@ void SrsRtpNackForReceiver::update_rtt(int rtt) opts_.nack_interval = rtt_; } -SrsRtpQueue::SrsRtpQueue() +SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity) { - jitter_ = 0; - last_trans_time_ = -1; - - pre_number_of_packet_received_ = 0; - pre_number_of_packet_lossed_ = 0; + nn_seq_flip_backs = 0; + begin = end = 0; + capacity_ = (uint16_t)capacity; + initialized_ = false; - num_of_packet_received_ = 0; - number_of_packet_lossed_ = 0; + queue_ = new SrsRtpPacket2*[capacity_]; + memset(queue_, 0, sizeof(SrsRtpPacket2*) * capacity); } -SrsRtpQueue::~SrsRtpQueue() +SrsRtpRingBuffer::~SrsRtpRingBuffer() { + srs_freepa(queue_); } -uint8_t SrsRtpQueue::get_fraction_lost() +bool SrsRtpRingBuffer::empty() { - int64_t total = (number_of_packet_lossed_ - pre_number_of_packet_lossed_ + num_of_packet_received_ - pre_number_of_packet_received_); - uint8_t loss = 0; - if (total > 0) { - loss = (number_of_packet_lossed_ - pre_number_of_packet_lossed_) * 256 / total; - } - - pre_number_of_packet_lossed_ = number_of_packet_lossed_; - pre_number_of_packet_received_ = num_of_packet_received_; - - return loss; + return begin == end; } -uint32_t SrsRtpQueue::get_cumulative_number_of_packets_lost() +int SrsRtpRingBuffer::size() { - return number_of_packet_lossed_; + int size = srs_rtp_seq_distance(begin, end); + srs_assert(size >= 0); + return size; } -uint32_t SrsRtpQueue::get_interarrival_jitter() +void SrsRtpRingBuffer::advance_to(uint16_t seq) { - return static_cast(jitter_); -} - -srs_error_t SrsRtpQueue::on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) -{ - srs_error_t err = srs_success; - - // TODO: FIXME: Update time for each packet, may hurt performance. - srs_utime_t now = srs_update_system_time(); - - uint16_t seq = pkt->rtp_header.get_sequence(); - - SrsRtpNackInfo* nack_info = NULL; - if (nack) { - nack_info = nack->find(seq); - } - - if (nack_info) { - int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0; - (void)nack_rtt; - nack->remove(seq); - } - - // Calc jitter time, ignore nack packets. - // TODO: FIXME: Covert time to srs_utime_t. - if (last_trans_time_ == -1) { - last_trans_time_ = now / 1000 - pkt->rtp_header.get_timestamp() / 90; - } else if (!nack_info) { - int trans_time = now / 1000 - pkt->rtp_header.get_timestamp() / 90; - - int cur_jitter = trans_time - last_trans_time_; - if (cur_jitter < 0) { - cur_jitter = -cur_jitter; - } - - last_trans_time_ = trans_time; - - jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast(cur_jitter) / 16.0); - } - - // OK, got new RTP packet. - if (!nack_info) { - ++num_of_packet_received_; - } - - return err; + begin = seq; } -void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last) +void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt) { - if (!nack) { - return; - } + SrsRtpPacket2* p = queue_[at % capacity_]; - for (uint16_t s = first; s != last; ++s) { - nack->insert(s); - ++number_of_packet_lossed_; + if (p) { + srs_freep(p); } - nack->check_queue_size(); -} - -SrsRtpAudioPacket::SrsRtpAudioPacket() -{ - pkt = NULL; -} - -SrsRtpAudioPacket::~SrsRtpAudioPacket() -{ - srs_freep(pkt); -} - -SrsRtpPacket2* SrsRtpAudioPacket::detach() -{ - SrsRtpPacket2* p = pkt; - pkt = NULL; - return p; + queue_[at % capacity_] = pkt; } -SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) +void SrsRtpRingBuffer::remove(uint16_t at) { - queue_ = new SrsRtpRingBuffer(capacity); + set(at, NULL); } -SrsRtpAudioQueue::~SrsRtpAudioQueue() +bool SrsRtpRingBuffer::overflow() { - srs_freep(queue_); + return srs_rtp_seq_distance(begin, end) >= capacity_; } -void SrsRtpAudioQueue::notify_drop_seq(uint16_t seq) +uint32_t SrsRtpRingBuffer::get_extended_highest_sequence() { - uint16_t next = seq + 1; - if (srs_rtp_seq_distance(queue_->end, seq) > 0) { - seq = queue_->end; - } - srs_trace("nack drop seq=%u, drop range [%u, %u, %u]", seq, queue_->begin, next, queue_->end); - - queue_->advance_to(next); + return nn_seq_flip_backs * 65536 + end - 1; } -void SrsRtpAudioQueue::notify_nack_list_full() +bool SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last) { - // TODO: FIXME: Maybe we should not drop all packets. - queue_->advance_to(queue_->end); -} - -uint32_t SrsRtpAudioQueue::get_extended_highest_sequence() -{ - return queue_->get_extended_highest_sequence(); -} - -srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) -{ - srs_error_t err = srs_success; - - uint16_t seq = pkt->rtp_header.get_sequence(); - - SrsRtpNackInfo* nack_info = NULL; - if (nack) { - nack_info = nack->find(seq); - } - - if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) { - return srs_error_wrap(err, "consume audio"); - } - - // OK, we got one new RTP packet, which is not in NACK. - if (!nack_info) { - uint16_t nack_first = 0, nack_last = 0; - if (!queue_->update(seq, nack_first, nack_last)) { - srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end); - } - - if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) { - srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); - insert_into_nack_list(nack, nack_first, nack_last); - } - } - - // Save packet at the position seq. - SrsRtpAudioPacket* apkt = new SrsRtpAudioPacket(); - apkt->pkt = pkt; - queue_->set(seq, apkt); - - return err; -} - -void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector& frames) -{ - // When done, next point to the next available packet. - uint16_t next = queue_->begin; - - // If nack disabled, we ignore any empty packet. - if (!nack) { - for (; next != queue_->end; ++next) { - SrsRtpAudioPacket* pkt = queue_->at(next); - if (pkt) { - frames.push_back(pkt->detach()); - } - } - } else { - for (; next != queue_->end; ++next) { - SrsRtpAudioPacket* pkt = queue_->at(next); - - // TODO: FIXME: Should not wait for NACK packets. - // Not found or in NACK, stop collecting frame. - if (!pkt || nack->find(next) != NULL) { - srs_trace("wait for nack seq=%u", next); - break; - } - - frames.push_back(pkt->detach()); - } - } - - // Reap packets from begin to next. - if (next != queue_->begin) { - srs_verbose("RTC collect audio [%u, %u, %u]", queue_->begin, next, queue_->end); - queue_->advance_to(next); - } - - // For audio, if overflow, clear all packets. - // TODO: FIXME: Should notify nack? - if (queue_->overflow()) { - queue_->advance_to(queue_->end); - } -} - -SrsRtpVideoPacket::SrsRtpVideoPacket() -{ - video_is_first_packet = false; - video_is_last_packet = false; - video_is_idr = false; - - pkt = NULL; -} - -SrsRtpVideoPacket::~SrsRtpVideoPacket() -{ - srs_freep(pkt); -} - -SrsRtpPacket2* SrsRtpVideoPacket::detach() -{ - SrsRtpPacket2* p = pkt; - pkt = NULL; - return p; -} - -SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) -{ - request_key_frame_ = false; - queue_ = new SrsRtpRingBuffer(capacity); -} - -SrsRtpVideoQueue::~SrsRtpVideoQueue() -{ - srs_freep(queue_); -} - -void SrsRtpVideoQueue::notify_drop_seq(uint16_t seq) -{ - // If not found start frame, return the end, and we will clear queue. - uint16_t next = next_start_of_frame(seq); - srs_trace("nack drop seq=%u, drop range [%u, %u, %u]", seq, queue_->begin, next, queue_->end); - - queue_->advance_to(next); -} - -void SrsRtpVideoQueue::notify_nack_list_full() -{ - // If not found start frame, return the end, and we will clear queue. - uint16_t next = next_keyframe(); - srs_trace("nack overflow, drop range [%u, %u, %u]", queue_->begin, next, queue_->end); - - queue_->advance_to(next); -} - -uint32_t SrsRtpVideoQueue::get_extended_highest_sequence() -{ - return queue_->get_extended_highest_sequence(); -} - -srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) -{ - srs_error_t err = srs_success; - - SrsRtpVideoPacket* vpkt = new SrsRtpVideoPacket(); - vpkt->pkt = pkt; - - uint8_t v = (uint8_t)pkt->nalu_type; - if (v == kFuA) { - SrsRtpFUAPayload2* payload = dynamic_cast(pkt->payload); - if (!payload) { - srs_freep(pkt); srs_freep(vpkt); - return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload"); - } - - vpkt->video_is_first_packet = payload->start; - vpkt->video_is_last_packet = payload->end; - vpkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR); - } else { - vpkt->video_is_first_packet = true; - vpkt->video_is_last_packet = true; - - if (v == kStapA) { - vpkt->video_is_idr = true; - } else { - vpkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR); - } - } - - uint16_t seq = pkt->rtp_header.get_sequence(); - - SrsRtpNackInfo* nack_info = NULL; - if (nack) { - nack_info = nack->find(seq); - } - - if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) { - srs_freep(pkt); srs_freep(vpkt); - return srs_error_wrap(err, "consume video"); - } - - // OK, we got one new RTP packet, which is not in NACK. - if (!nack_info) { - uint16_t nack_first = 0, nack_last = 0; - if (!queue_->update(seq, nack_first, nack_last)) { - srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end); - } - - if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) { - srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); - insert_into_nack_list(nack, nack_first, nack_last); - } - } - - // Save packet at the position seq. - queue_->set(seq, vpkt); - - return err; -} - -void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector& frames) -{ - while (true) { - SrsRtpPacket2* pkt = NULL; - collect_frame(nack, &pkt); - if (!pkt) { - break; - } - - frames.push_back(pkt); - } - - if (queue_->overflow()) { - on_overflow(nack); - } -} - -bool SrsRtpVideoQueue::should_request_key_frame() -{ - if (request_key_frame_) { - request_key_frame_ = false; + if (!initialized_) { + initialized_ = true; + begin = seq; + end = seq + 1; return true; } - return request_key_frame_; -} - -void SrsRtpVideoQueue::request_keyframe() -{ - request_key_frame_ = true; -} - -void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack) -{ - // If not found start frame, return the end, and we will clear queue. - uint16_t next = next_start_of_frame(queue_->begin); - srs_trace("on overflow, remove range [%u, %u, %u]", queue_->begin, next, queue_->end); - - for (uint16_t s = queue_->begin; s != next; ++s) { - if (nack) { - nack->remove(s); - } - queue_->remove(s); - } - - queue_->advance_to(next); -} + // Normal sequence, seq follows high_. + if (srs_rtp_seq_distance(end, seq) >= 0) { + nack_first = end; + nack_last = seq; -// TODO: FIXME: Should refer to the FU-A original video frame, to avoid finding for each packet. -void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt) -{ - bool found = false; - vector frame; - - // When done, next point to the next available packet. - uint16_t next = queue_->begin; - - // If nack disabled, we ignore any empty packet. - if (!nack) { - for (; next != queue_->end; ++next) { - SrsRtpVideoPacket* vpkt = queue_->at(next); - if (!vpkt) { - continue; - } - - if (frame.empty() && !vpkt->video_is_first_packet) { - continue; - } - - frame.push_back(vpkt); - - if (vpkt->pkt->rtp_header.get_marker() || vpkt->video_is_last_packet) { - found = true; - next++; - break; - } + // When distance(seq,high_)>0 and seq0 and 1<65535. + // TODO: FIXME: The first flip may be dropped. + if (seq < end) { + ++nn_seq_flip_backs; } - } else { - for (; next != queue_->end; ++next) { - SrsRtpVideoPacket* vpkt = queue_->at(next); - - // TODO: FIXME: Should not wait for NACK packets. - // Not found or in NACK, stop collecting frame. - if (!vpkt || nack->find(next) != NULL) { - srs_trace("wait for nack seq=%u", next); - return; - } - - // Ignore when the first packet not the start. - if (frame.empty() && !vpkt->video_is_first_packet) { - return; - } - - // OK, collect packet to frame. - frame.push_back(vpkt); - - // Done, we got the last packet of frame. - // @remark Note that the STAP-A is marker false and it's the last packet. - if (vpkt->pkt->rtp_header.get_marker() || vpkt->video_is_last_packet) { - found = true; - next++; - break; - } - } - } - - if (!found || frame.empty()) { - return; + end = seq + 1; + return true; } - if (next != queue_->begin) { - srs_verbose("RTC collect video [%u, %u, %u]", queue_->begin, next, queue_->end); - queue_->advance_to(next); + // Out-of-order sequence, seq before low_. + if (srs_rtp_seq_distance(seq, begin) > 0) { + // When startup, we may receive packets in chaos order. + // Because we don't know the ISN(initiazlie sequence number), the first packet + // we received maybe no the first packet client sent. + // @remark We only log a warning, because it seems ok for publisher. + return false; } - // Merge packets to one packet. - covert_frame(frame, ppkt); - - return; + return true; } -void SrsRtpVideoQueue::covert_frame(std::vector& frame, SrsRtpPacket2** ppkt) -{ - if (frame.size() == 1) { - *ppkt = frame[0]->detach(); - return; - } - - // If more than one packet in a frame, it must be FU-A. - SrsRtpPacket2* head = frame.at(0)->pkt; - SrsAvcNaluType nalu_type = head->nalu_type; - - // Covert FU-A to one RAW RTP packet. - int nn_nalus = 0; - for (size_t i = 0; i < frame.size(); ++i) { - SrsRtpVideoPacket* vpkt = frame[i]; - SrsRtpFUAPayload2* payload = dynamic_cast(vpkt->pkt->payload); - if (!payload) { - nn_nalus = 0; break; - } - - nn_nalus += payload->size; - } - - // Invalid packets, ignore. - if (nalu_type != (SrsAvcNaluType)kFuA || !nn_nalus) { - return; - } - - // Merge to one RAW RTP packet. - // TODO: FIXME: Should covert to multiple NALU RTP packet to avoid copying. - SrsRtpPacket2* pkt = new SrsRtpPacket2(); - pkt->rtp_header = head->rtp_header; - - SrsRtpFUAPayload2* head_payload = dynamic_cast(head->payload); - pkt->nalu_type = head_payload->nalu_type; - - SrsRtpRawPayload* payload = new SrsRtpRawPayload(); - pkt->payload = payload; - - payload->nn_payload = nn_nalus + 1; - payload->payload = new char[payload->nn_payload]; - - SrsBuffer buf(payload->payload, payload->nn_payload); - - buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header. - - for (size_t i = 0; i < frame.size(); ++i) { - SrsRtpVideoPacket* vpkt = frame[i]; - SrsRtpFUAPayload2* payload = dynamic_cast(vpkt->pkt->payload); - buf.write_bytes(payload->payload, payload->size); - } - - *ppkt = pkt; +SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq) { + return queue_[seq % capacity_]; } -uint16_t SrsRtpVideoQueue::next_start_of_frame(uint16_t seq) +void SrsRtpRingBuffer::notify_nack_list_full() { - uint16_t s = seq; - if (srs_rtp_seq_distance(seq, queue_->begin) >= 0) { - s = queue_->begin + 1; - } - - for (; s != queue_->end; ++s) { - SrsRtpVideoPacket* vpkt = queue_->at(s); - if (vpkt && vpkt->video_is_first_packet) { - return s; - } - } - - return queue_->end; } -uint16_t SrsRtpVideoQueue::next_keyframe() +void SrsRtpRingBuffer::notify_drop_seq(uint16_t seq) { - uint16_t s = queue_->begin + 1; - - for (; s != queue_->end; ++s) { - SrsRtpVideoPacket* vpkt = queue_->at(s); - if (vpkt && vpkt->video_is_idr && vpkt->video_is_first_packet) { - return s; - } - } - - return queue_->end; } diff --git a/trunk/src/app/srs_app_rtc_queue.hpp b/trunk/src/app/srs_app_rtc_queue.hpp index b71af99f6..436a1998b 100644 --- a/trunk/src/app/srs_app_rtc_queue.hpp +++ b/trunk/src/app/srs_app_rtc_queue.hpp @@ -32,6 +32,7 @@ class SrsRtpPacket2; class SrsRtpQueue; +class SrsRtpRingBuffer; struct SrsNackOption { @@ -84,17 +85,17 @@ private: std::map queue_; // Max nack count. size_t max_queue_size_; - SrsRtpQueue* rtp_queue_; + SrsRtpRingBuffer* rtp_; SrsNackOption opts_; private: srs_utime_t pre_check_time_; private: int rtt_; public: - SrsRtpNackForReceiver(SrsRtpQueue* rtp_queue, size_t queue_size); + SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue_size); virtual ~SrsRtpNackForReceiver(); public: - void insert(uint16_t seq); + void insert(uint16_t first, uint16_t last); void remove(uint16_t seq); SrsRtpNackInfo* find(uint16_t seq); void check_queue_size(); @@ -113,14 +114,13 @@ public: // but not an entire video frame right now. // * seq10: This packet is lost or not received, we put it in the nack list. // We store the received packets in ring buffer. -template class SrsRtpRingBuffer { private: // Capacity of the ring-buffer. uint16_t capacity_; // Ring bufer. - T* queue_; + SrsRtpPacket2** queue_; // Increase one when uint16 flip back, for get_extended_highest_sequence. uint64_t nn_seq_flip_backs; // Whether initialized, because we use uint16 so we can't use -1. @@ -133,186 +133,31 @@ public: // For example, when got 1 elems, the end is 1. uint16_t end; public: - SrsRtpRingBuffer(int capacity) { - nn_seq_flip_backs = 0; - begin = end = 0; - capacity_ = (uint16_t)capacity; - initialized_ = false; - - queue_ = new T[capacity_]; - memset(queue_, 0, sizeof(T) * capacity); - } - virtual ~SrsRtpRingBuffer() { - srs_freepa(queue_); - } + SrsRtpRingBuffer(int capacity); + virtual ~SrsRtpRingBuffer(); public: // Whether the ring buffer is empty. - bool empty() { - return begin == end; - } + bool empty(); // Get the count of elems in ring buffer. - int size() { - int size = srs_rtp_seq_distance(begin, end); - srs_assert(size >= 0); - return size; - } + int size(); // Move the low position of buffer to seq. - void advance_to(uint16_t seq) { - begin = seq; - } + void advance_to(uint16_t seq); // Free the packet at position. - void set(uint16_t at, T pkt) { - T p = queue_[at % capacity_]; - - if (p) { - srs_freep(p); - } - - queue_[at % capacity_] = pkt; - } - void remove(uint16_t at) { - set(at, NULL); - } + void set(uint16_t at, SrsRtpPacket2* pkt); + void remove(uint16_t at); // Whether queue overflow or heavy(too many packets and need clear). - bool overflow() { - return srs_rtp_seq_distance(begin, end) >= capacity_; - } + bool overflow(); // The highest sequence number, calculate the flip back base. - uint32_t get_extended_highest_sequence() { - return nn_seq_flip_backs * 65536 + end - 1; - } + uint32_t get_extended_highest_sequence(); // Update the sequence, got the nack range by [first, last). // @return If false, the seq is too old. - bool update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last) { - if (!initialized_) { - initialized_ = true; - begin = seq; - end = seq + 1; - return true; - } - - // Normal sequence, seq follows high_. - if (srs_rtp_seq_distance(end, seq) >= 0) { - nack_first = end; - nack_last = seq; - - // When distance(seq,high_)>0 and seq0 and 1<65535. - // TODO: FIXME: The first flip may be dropped. - if (seq < end) { - ++nn_seq_flip_backs; - } - end = seq + 1; - return true; - } - - // Out-of-order sequence, seq before low_. - if (srs_rtp_seq_distance(seq, begin) > 0) { - // When startup, we may receive packets in chaos order. - // Because we don't know the ISN(initiazlie sequence number), the first packet - // we received maybe no the first packet client sent. - // @remark We only log a warning, because it seems ok for publisher. - return false; - } - - return true; - } + bool update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last); // Get the packet by seq. - T at(uint16_t seq) { - return queue_[seq % capacity_]; - } -}; - -class SrsRtpQueue -{ -private: - double jitter_; - // TODO: FIXME: Covert time to srs_utime_t. - int64_t last_trans_time_; - uint64_t pre_number_of_packet_received_; - uint64_t pre_number_of_packet_lossed_; - uint64_t num_of_packet_received_; - uint64_t number_of_packet_lossed_; -public: - SrsRtpQueue(); - virtual ~SrsRtpQueue(); + SrsRtpPacket2* at(uint16_t seq); public: - virtual void notify_drop_seq(uint16_t seq) = 0; - virtual void notify_nack_list_full() = 0; - virtual uint32_t get_extended_highest_sequence() = 0; - uint8_t get_fraction_lost(); - uint32_t get_cumulative_number_of_packets_lost(); - uint32_t get_interarrival_jitter(); -protected: - srs_error_t on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); - void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last); -}; - -class SrsRtpAudioPacket -{ -public: - SrsRtpPacket2* pkt; -public: - SrsRtpAudioPacket(); - virtual ~SrsRtpAudioPacket(); -public: - SrsRtpPacket2* detach(); -}; - -class SrsRtpAudioQueue : public SrsRtpQueue -{ -private: - SrsRtpRingBuffer* queue_; -public: - SrsRtpAudioQueue(int capacity); - virtual ~SrsRtpAudioQueue(); -public: - virtual void notify_drop_seq(uint16_t seq); - virtual void notify_nack_list_full(); - virtual uint32_t get_extended_highest_sequence(); - virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); -public: - virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector& frames); -}; - -class SrsRtpVideoPacket -{ -public: - // Helper information for video decoder only. - bool video_is_first_packet; - bool video_is_last_packet; - bool video_is_idr; -public: - SrsRtpPacket2* pkt; -public: - SrsRtpVideoPacket(); - virtual ~SrsRtpVideoPacket(); -public: - SrsRtpPacket2* detach(); -}; - -class SrsRtpVideoQueue : public SrsRtpQueue -{ -private: - bool request_key_frame_; - SrsRtpRingBuffer* queue_; -public: - SrsRtpVideoQueue(int capacity); - virtual ~SrsRtpVideoQueue(); -public: - virtual void notify_drop_seq(uint16_t seq); - virtual void notify_nack_list_full(); - virtual uint32_t get_extended_highest_sequence(); - virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); - virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector& frame); - bool should_request_key_frame(); - void request_keyframe(); -private: - virtual void on_overflow(SrsRtpNackForReceiver* nack); - virtual void collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt); - virtual void covert_frame(std::vector& frame, SrsRtpPacket2** ppkt); - uint16_t next_start_of_frame(uint16_t seq); - uint16_t next_keyframe(); + // TODO: FIXME: Move it? + void notify_nack_list_full(); + void notify_drop_seq(uint16_t seq); }; #endif