From 8dc0746e2dd9d41c100bd1eac8104fe57772461d Mon Sep 17 00:00:00 2001 From: xiaozhihong Date: Fri, 24 Apr 2020 16:19:08 +0800 Subject: [PATCH] rtc publish release --- trunk/src/app/srs_app_http_api.cpp | 4 +- trunk/src/app/srs_app_rtc.cpp | 31 +++++++++++-- trunk/src/app/srs_app_rtc.hpp | 9 ++++ trunk/src/app/srs_app_rtc_conn.cpp | 51 ++++++++++++++++----- trunk/src/app/srs_app_rtc_conn.hpp | 3 ++ trunk/src/app/srs_app_rtp_queue.cpp | 69 ++++++++++++++++++++--------- trunk/src/app/srs_app_rtp_queue.hpp | 12 +++-- trunk/src/app/srs_app_sdp.cpp | 5 ++- trunk/src/kernel/srs_kernel_rtp.cpp | 5 ++- trunk/src/kernel/srs_kernel_rtp.hpp | 1 + 10 files changed, 146 insertions(+), 44 deletions(-) diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 81e45183c..694caf038 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1290,9 +1290,7 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std:: local_sdp.group_policy_ = "BUNDLE"; - int mid = 0; - - for (int i = 0; i < remote_sdp.media_descs_.size(); ++i) { + for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i]; if (remote_media_desc.is_audio()) { diff --git a/trunk/src/app/srs_app_rtc.cpp b/trunk/src/app/srs_app_rtc.cpp index 288f892e1..a038f3c02 100644 --- a/trunk/src/app/srs_app_rtc.cpp +++ b/trunk/src/app/srs_app_rtc.cpp @@ -238,16 +238,17 @@ srs_error_t SrsRtpH264Demuxer::parse(SrsRtpSharedPacket* rtp_pkt) } uint8_t nal_type = rtp_payload[0] & kNalTypeMask; + if (nal_type == SrsAvcNaluTypeIDR) { + rtp_h264_header->is_key_frame = true; + } if (nal_type >= 1 && nal_type <= 23) { - srs_verbose("seq=%u, single nalu", rtp_pkt->rtp_header.get_sequence()); rtp_h264_header->is_first_packet_of_frame = true; rtp_h264_header->is_last_packet_of_frame = true; rtp_h264_header->nalu_type = nal_type; rtp_h264_header->nalu_header = rtp_payload[0]; rtp_h264_header->nalu_offset.push_back(make_pair(0, rtp_payload_size)); } else if (nal_type == kFuA) { - srs_verbose("seq=%u, fu-a", rtp_pkt->rtp_header.get_sequence()); if ((rtp_payload[1] & kStart)) { rtp_h264_header->is_first_packet_of_frame = true; } @@ -258,13 +259,11 @@ srs_error_t SrsRtpH264Demuxer::parse(SrsRtpSharedPacket* rtp_pkt) rtp_h264_header->nalu_header = (rtp_payload[0] & (~kNalTypeMask)) | (rtp_payload[1] & kNalTypeMask); rtp_h264_header->nalu_offset.push_back(make_pair(2, rtp_payload_size - 2)); } else if (nal_type == kStapA) { - srs_verbose("seq=%u, stap-a", rtp_pkt->rtp_header.get_sequence()); int i = 1; rtp_h264_header->is_first_packet_of_frame = true; rtp_h264_header->is_last_packet_of_frame = true; rtp_h264_header->nalu_type = nal_type; while (i < rtp_payload_size) { - srs_verbose("stap-a cur index=%s", srs_string_dumps_hex(reinterpret_cast(rtp_payload + i), 2).c_str()); uint16_t nal_len = (rtp_payload[i]) << 8 | rtp_payload[i + 1]; if (nal_len > rtp_payload_size - i) { return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid stap-a packet, nal len=%u, i=%d, rtp_payload_size=%d", nal_len, i, rtp_payload_size); @@ -286,6 +285,30 @@ srs_error_t SrsRtpH264Demuxer::parse(SrsRtpSharedPacket* rtp_pkt) return err; } +SrsRtpOpusDemuxer::SrsRtpOpusDemuxer() +{ +} + +SrsRtpOpusDemuxer::~SrsRtpOpusDemuxer() +{ +} + +srs_error_t SrsRtpOpusDemuxer::parse(SrsRtpSharedPacket* rtp_pkt) +{ + srs_error_t err = srs_success; + + SrsRtpOpusHeader* rtp_opus_header = dynamic_cast(rtp_pkt->rtp_payload_header); + if (rtp_opus_header == NULL) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid rtp packet"); + } + + rtp_opus_header->is_first_packet_of_frame = true; + rtp_opus_header->is_last_packet_of_frame = true; + rtp_opus_header->is_key_frame = true; + + return err; +} + SrsRtc::SrsRtc() { req = NULL; diff --git a/trunk/src/app/srs_app_rtc.hpp b/trunk/src/app/srs_app_rtc.hpp index 7c22e04dd..99db8bd78 100644 --- a/trunk/src/app/srs_app_rtc.hpp +++ b/trunk/src/app/srs_app_rtc.hpp @@ -87,6 +87,15 @@ public: srs_error_t parse(SrsRtpSharedPacket* rtp_pkt); }; +class SrsRtpOpusDemuxer +{ +public: + SrsRtpOpusDemuxer(); + virtual ~SrsRtpOpusDemuxer(); +public: + srs_error_t parse(SrsRtpSharedPacket* rtp_pkt); +}; + class SrsRtc { private: diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 21e749c48..4b43f5444 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1495,6 +1495,7 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) rtc_session = session; rtp_h264_demuxer = new SrsRtpH264Demuxer(); + rtp_opus_demuxer = new SrsRtpOpusDemuxer(); rtp_video_queue = new SrsRtpQueue(1000); rtp_audio_queue = new SrsRtpQueue(100, true); @@ -1505,6 +1506,7 @@ SrsRtcPublisher::~SrsRtcPublisher() { srs_freep(report_timer); srs_freep(rtp_h264_demuxer); + srs_freep(rtp_opus_demuxer); srs_freep(rtp_video_queue); srs_freep(rtp_audio_queue); } @@ -1718,10 +1720,11 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* */ SrsBuffer stream(buf, nb_buf); - uint8_t first = stream.read_1bytes(); + /*uint8_t first = */stream.read_1bytes(); uint8_t pt = stream.read_1bytes(); + srs_assert(pt == kXR); uint16_t length = (stream.read_2bytes() + 1) * 4; - uint32_t ssrc = stream.read_4bytes(); + /*uint32_t ssrc = */stream.read_4bytes(); if (length != nb_buf) { return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet, length=%u, nb_buf=%d", length, nb_buf); @@ -1865,18 +1868,46 @@ srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(SrsUdpMuxSocket* skt, uint32_t ss return err; } +srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssrc) +{ + srs_error_t err = srs_success; + + char buf[kRtpPacketSize]; + SrsBuffer stream(buf, sizeof(buf)); + stream.write_1bytes(0x81); + stream.write_1bytes(kPsFb); + stream.write_2bytes(2); + stream.write_4bytes(ssrc); + stream.write_4bytes(ssrc); + + srs_verbose("PLI ssrc=%u", ssrc); + + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + if ((err = rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf)) != srs_success) { + return srs_error_wrap(err, "protect rtcp psfb pli"); + } + + skt->sendto(protected_buf, nb_protected_buf, 0); + + return err; +} srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt) { srs_error_t err = srs_success; - rtp_pkt->rtp_payload_header = new SrsRtpOpusHeader(); - rtp_pkt->rtp_payload_header->is_first_packet_of_frame = true; - rtp_pkt->rtp_payload_header->is_last_packet_of_frame = true; + if ((err = rtp_opus_demuxer->parse(rtp_pkt)) != srs_success) { + return srs_error_wrap(err, "rtp opus demux failed"); + } rtp_audio_queue->insert(rtp_pkt); + if (rtp_audio_queue->get_and_clean_if_needed_rqeuest_key_frame()) { + send_rtcp_fb_pli(skt, audio_ssrc); + } + check_send_nacks(rtp_audio_queue, audio_ssrc, skt); return collect_audio_frame(); @@ -1915,6 +1946,10 @@ srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_video_queue->insert(rtp_pkt); + if (rtp_video_queue->get_and_clean_if_needed_rqeuest_key_frame()) { + send_rtcp_fb_pli(skt, video_ssrc); + } + check_send_nacks(rtp_video_queue, video_ssrc, skt); return collect_video_frame(); @@ -2375,8 +2410,6 @@ srs_error_t SrsRtcSession::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* sk srs_error_t SrsRtcSession::on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt) { - srs_error_t err = srs_success; - if (rtc_publisher == NULL) { return srs_error_new(ERROR_RTC_RTCP, "rtc publisher null"); } @@ -2517,19 +2550,15 @@ srs_error_t SrsRtcSession::start_publish(SrsUdpMuxSocket* skt) uint32_t video_ssrc = 0; uint32_t audio_ssrc = 0; - uint16_t video_payload_type = 0; - uint16_t audio_payload_type = 0; for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i]; if (media_desc.is_audio()) { if (! media_desc.ssrc_infos_.empty()) { audio_ssrc = media_desc.ssrc_infos_[0].ssrc_; - audio_payload_type = media_desc.payload_types_[0].payload_type_; } } else if (media_desc.is_video()) { if (! media_desc.ssrc_infos_.empty()) { video_ssrc = media_desc.ssrc_infos_[0].ssrc_; - video_payload_type = media_desc.payload_types_[0].payload_type_; } } } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 718d62d9e..1a1630d17 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -53,6 +53,7 @@ class SrsRtpPacket2; class ISrsUdpSender; class SrsRtpQueue; class SrsRtpH264Demuxer; +class SrsRtpOpusDemuxer; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -264,6 +265,7 @@ private: uint32_t audio_ssrc; private: SrsRtpH264Demuxer* rtp_h264_demuxer; + SrsRtpOpusDemuxer* rtp_opus_demuxer; SrsRtpQueue* rtp_video_queue; SrsRtpQueue* rtp_audio_queue; private: @@ -286,6 +288,7 @@ private: void check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, SrsUdpMuxSocket* skt); srs_error_t send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, SrsRtpQueue* rtp_queue); srs_error_t send_rtcp_xr_rrtr(SrsUdpMuxSocket* skt, uint32_t ssrc); + srs_error_t send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssrc); private: srs_error_t on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt); srs_error_t on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt); diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 74390314b..47f8587a9 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -41,13 +41,14 @@ SrsRtpNackInfo::SrsRtpNackInfo() req_nack_count_ = 0; } -SrsRtpNackList::SrsRtpNackList(SrsRtpQueue* rtp_queue) +SrsRtpNackList::SrsRtpNackList(SrsRtpQueue* rtp_queue, size_t queue_size) { + max_queue_size_ = queue_size; rtp_queue_ = rtp_queue; pre_check_time_ = 0; - srs_info("nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%ld, nack_interval=%ld" - opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval); + srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%ld, nack_interval=%ld" + max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval); } SrsRtpNackList::~SrsRtpNackList() @@ -58,6 +59,7 @@ void SrsRtpNackList::insert(uint16_t seq) { // FIXME: full, drop packet, and request key frame. SrsRtpNackInfo& nack_info = queue_[seq]; + (void)nack_info; } void SrsRtpNackList::remove(uint16_t seq) @@ -76,12 +78,11 @@ SrsRtpNackInfo* SrsRtpNackList::find(uint16_t seq) return &(iter->second); } -void SrsRtpNackList::dump() +void SrsRtpNackList::check_queue_size() { - return; - srs_verbose("@debug, queue size=%u", queue_.size()); - for (std::map::iterator iter = queue_.begin(); iter != queue_.end(); ++iter) { - srs_verbose("@debug, nack seq=%u", iter->first); + if (queue_.size() >= max_queue_size_) { + srs_verbose("NACK list full, queue size=%u, max_queue_size=%u", queue_.size(), max_queue_size_); + rtp_queue_->notify_nack_list_full(); } } @@ -134,7 +135,7 @@ void SrsRtpNackList::update_rtt(int rtt) } SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) - : nack_(this) + : nack_(this, capacity * 2 / 3) { capacity_ = capacity; head_sequence_ = 0; @@ -155,6 +156,8 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) number_of_packet_lossed_ = 0; one_packet_per_frame_ = one_packet_per_frame; + + request_key_frame_ = false; } SrsRtpQueue::~SrsRtpQueue() @@ -189,7 +192,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) } else { // Calc jitter. { - int trans_time = now/1000 - rtp_pkt->rtp_header.get_timestamp()/90; + int trans_time = now / 1000 - rtp_pkt->rtp_header.get_timestamp() / 90; int cur_jitter = trans_time - last_trans_time_; if (cur_jitter < 0) { @@ -214,7 +217,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) highest_sequence_ = seq; } else { // Because we don't know the ISN(initiazlie sequence number), the first packet - // we received maybe no the first paacet client sented. + // we received maybe no the first packet client sented. if (! start_collected_) { if (seq_cmp(seq, head_sequence_)) { srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", head_sequence_, seq); @@ -228,9 +231,6 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) } } - int delay = highest_sequence_ - head_sequence_ + 1; - srs_verbose("seqs range=[%u-%u], delay=%d", head_sequence_, highest_sequence_, delay); - // Check seqs out of range. if (head_sequence_ + capacity_ < highest_sequence_) { srs_verbose("try collect packet becuase seq out of range"); @@ -261,12 +261,12 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) head_sequence_ = s; } - SrsRtpSharedPacket* old_pkt = queue_[seq % capacity_]; + SrsRtpSharedPacket*& old_pkt = queue_[seq % capacity_]; if (old_pkt) { delete old_pkt; } - queue_[seq % capacity_] = rtp_pkt->copy(); + old_pkt = rtp_pkt->copy(); // Marker bit means the last packet of frame received. if (rtp_pkt->rtp_header.get_marker() || (highest_sequence_ - head_sequence_ >= capacity_ / 2) || one_packet_per_frame_) { @@ -294,6 +294,15 @@ void SrsRtpQueue::get_and_clean_collected_frames(std::vectorrtp_payload_header->is_key_frame && pkt->rtp_payload_header->is_first_packet_of_frame) { + found_key_frame = true; + srs_verbose("found firsr packet of key frame, seq=%u", head_sequence_); + break; + } + + nack_.remove(head_sequence_); + remove(head_sequence_); + ++head_sequence_; + } + + if (! found_key_frame) { + srs_verbose("no found first packet of key frame, request key frame"); + request_key_frame_ = true; + head_sequence_ = highest_sequence_; + } +} + uint32_t SrsRtpQueue::get_extended_highest_sequence() { return cycle_ * 65536 + highest_sequence_; @@ -350,8 +382,7 @@ void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end) ++number_of_packet_lossed_; } - // FIXME: Record key frame sequence. - // FIXME: When nack list too long, clear and send PLI. + nack_.check_queue_size(); } void SrsRtpQueue::collect_packet() @@ -360,8 +391,6 @@ void SrsRtpQueue::collect_packet() for (uint16_t s = head_sequence_; s != highest_sequence_; ++s) { SrsRtpSharedPacket* pkt = queue_[s % capacity_]; - nack_.dump(); - if (nack_.find(s) != NULL) { srs_verbose("seq=%u, found in nack list when collect frame", s); break; diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index afe747795..f8a31d08f 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -79,6 +79,8 @@ class SrsRtpNackList private: // Nack queue, seq order, oldest to newest. std::map queue_; + // Max nack count. + size_t max_queue_size_; SrsRtpQueue* rtp_queue_; SrsNackOption opts_; private: @@ -86,14 +88,13 @@ private: private: int rtt_; public: - SrsRtpNackList(SrsRtpQueue* rtp_queue); + SrsRtpNackList(SrsRtpQueue* rtp_queue, size_t queue_size); virtual ~SrsRtpNackList(); public: void insert(uint16_t seq); void remove(uint16_t seq); SrsRtpNackInfo* find(uint16_t seq); -public: - void dump(); + void check_queue_size(); public: void get_nack_seqs(std::vector& seqs); public: @@ -109,7 +110,7 @@ private: * \___(no received, in nack list) */ // Capacity of the ring-buffer. - size_t capacity_; + uint16_t capacity_; // Thei highest sequence we have receive. uint16_t highest_sequence_; // The sequence waitting to read. @@ -132,6 +133,7 @@ public: SrsRtpNackList nack_; private: std::vector > frames_; + bool request_key_frame_; public: SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false); virtual ~SrsRtpQueue(); @@ -140,7 +142,9 @@ public: srs_error_t remove(uint16_t seq); public: void get_and_clean_collected_frames(std::vector >& frames); + bool get_and_clean_if_needed_rqeuest_key_frame(); void notify_drop_seq(uint16_t seq); + void notify_nack_list_full(); public: uint32_t get_extended_highest_sequence(); uint8_t get_fraction_lost(); diff --git a/trunk/src/app/srs_app_sdp.cpp b/trunk/src/app/srs_app_sdp.cpp index 7b89e0ce6..da4e8226a 100644 --- a/trunk/src/app/srs_app_sdp.cpp +++ b/trunk/src/app/srs_app_sdp.cpp @@ -225,7 +225,10 @@ srs_error_t SrsMediaPayloadType::encode(std::ostringstream& os) } if (! format_specific_param_.empty()) { - os << "a=fmtp:" << payload_type_ << " " << format_specific_param_ << kCRLF; + os << "a=fmtp:" << payload_type_ << " " << format_specific_param_ + // FIXME:test code. + << ";x-google-max-bitrate=6000;x-google-min-bitrate=5100;x-google-start-bitrate=5000" + << kCRLF; } return err; diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index daf73ac32..eb1938a87 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -97,7 +97,7 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* stream) timestamp = stream->read_4bytes(); ssrc = stream->read_4bytes(); - if (stream->size() < header_size()) { + if ((size_t)stream->size() < header_size()) { return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect"); } @@ -610,6 +610,7 @@ SrsRtpPayloadHeader::SrsRtpPayloadHeader() { is_first_packet_of_frame = false; is_last_packet_of_frame = false; + is_key_frame = false; } SrsRtpPayloadHeader::~SrsRtpPayloadHeader() @@ -625,6 +626,8 @@ SrsRtpPayloadHeader& SrsRtpPayloadHeader::operator=(const SrsRtpPayloadHeader& r { is_first_packet_of_frame = rhs.is_first_packet_of_frame; is_last_packet_of_frame = rhs.is_last_packet_of_frame; + + return *this; } SrsRtpH264Header::SrsRtpH264Header() : SrsRtpPayloadHeader() diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index 2ad222bee..42fda10c0 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -231,6 +231,7 @@ class SrsRtpPayloadHeader public: bool is_first_packet_of_frame; bool is_last_packet_of_frame; + bool is_key_frame; public: SrsRtpPayloadHeader(); virtual ~SrsRtpPayloadHeader();