From 439a7fa65513a83c57c73db83fc49d9f381230b7 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 26 Feb 2021 16:36:21 +0800 Subject: [PATCH] RTC: Apply RTP packet cache manager --- trunk/src/app/srs_app_rtc_conn.cpp | 26 ++++++-- trunk/src/app/srs_app_rtc_conn.hpp | 2 + trunk/src/app/srs_app_rtc_queue.cpp | 8 +-- trunk/src/app/srs_app_rtc_source.cpp | 99 ++++++++++++++++------------ trunk/src/app/srs_app_rtc_source.hpp | 23 +++++-- 5 files changed, 100 insertions(+), 58 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 829a447f0..290fbe40f 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -606,7 +606,7 @@ srs_error_t SrsRtcPlayStream::cycle() for (int i = 0; i < msg_count; i++) { SrsRtpPacket2* pkt = pkts[i]; - srs_freep(pkt); + _srs_rtp_cache->recycle(pkt); } pkts.clear(); } @@ -1189,13 +1189,29 @@ srs_error_t SrsRtcPublishStream::on_rtp_plaintext(char* plaintext, int nb_plaint _srs_blackhole->sendto(plaintext, nb_plaintext); } - // Decode the RTP packet from buffer. - SrsRtpPacket2* pkt = new SrsRtpPacket2(); - SrsAutoFree(SrsRtpPacket2, pkt); + // Allocate packet form cache. + SrsRtpPacket2* pkt = _srs_rtp_cache->allocate(); + + // Copy the packet body. + pkt->wrap(plaintext, nb_plaintext); + srs_assert(pkt->cache_buffer()->pos() == 0); + + // Handle the packet. + err = do_on_rtp_plaintext(pkt); + + // Release the packet to cache. + _srs_rtp_cache->recycle(pkt); + + return err; +} + +srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; pkt->set_decode_handler(this); pkt->set_extension_types(&extension_types_); - pkt->wrap(plaintext, nb_plaintext); + if ((err = pkt->decode(pkt->cache_buffer())) != srs_success) { return srs_error_wrap(err, "decode rtp packet"); } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 538eca984..3483862cb 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -356,6 +356,8 @@ public: private: // @remark We copy the plaintext, user should free it. srs_error_t on_rtp_plaintext(char* plaintext, int nb_plaintext); +private: + srs_error_t do_on_rtp_plaintext(SrsRtpPacket2* pkt); public: srs_error_t check_send_nacks(); public: diff --git a/trunk/src/app/srs_app_rtc_queue.cpp b/trunk/src/app/srs_app_rtc_queue.cpp index 2bba97b57..ee166eacd 100644 --- a/trunk/src/app/srs_app_rtc_queue.cpp +++ b/trunk/src/app/srs_app_rtc_queue.cpp @@ -49,7 +49,7 @@ SrsRtpRingBuffer::~SrsRtpRingBuffer() { for (int i = 0; i < capacity_; ++i) { SrsRtpPacket2* pkt = queue_[i]; - srs_freep(pkt); + _srs_rtp_cache->recycle(pkt); } srs_freepa(queue_); } @@ -76,7 +76,7 @@ void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt) SrsRtpPacket2* p = queue_[at % capacity_]; if (p) { - srs_freep(p); + _srs_rtp_cache->recycle(p); } queue_[at % capacity_] = pkt; @@ -164,7 +164,7 @@ void SrsRtpRingBuffer::clear_histroy(uint16_t seq) for (uint16_t i = 0; i < capacity_; i++) { SrsRtpPacket2* p = queue_[i]; if (p && p->header.get_sequence() < seq) { - srs_freep(p); + _srs_rtp_cache->recycle(p); queue_[i] = NULL; } } @@ -175,7 +175,7 @@ void SrsRtpRingBuffer::clear_all_histroy() for (uint16_t i = 0; i < capacity_; i++) { SrsRtpPacket2* p = queue_[i]; if (p) { - srs_freep(p); + _srs_rtp_cache->recycle(p); queue_[i] = NULL; } } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index c6a66e8df..8d9bd3cc6 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -169,7 +169,7 @@ SrsRtcConsumer::~SrsRtcConsumer() vector::iterator it; for (it = queue.begin(); it != queue.end(); ++it) { SrsRtpPacket2* pkt = *it; - srs_freep(pkt); + _srs_rtp_cache->recycle(pkt); } srs_cond_destroy(mw_wait); @@ -582,6 +582,16 @@ std::vector SrsRtcStream::get_track_desc(std::string ty return track_descs; } +SrsRtpPacketCacheHelper::SrsRtpPacketCacheHelper() +{ + pkt = _srs_rtp_cache->allocate(); +} + +SrsRtpPacketCacheHelper::~SrsRtpPacketCacheHelper() +{ + _srs_rtp_cache->recycle(pkt); +} + #ifdef SRS_FFMPEG_FIT SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source) { @@ -785,14 +795,14 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio // TODO: FIXME: Use it to padding audios. nn_max_extra_payload = srs_max(nn_max_extra_payload, size); - SrsRtpPacket2* pkt = NULL; - SrsAutoFree(SrsRtpPacket2, pkt); + SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper(); + SrsAutoFree(SrsRtpPacketCacheHelper, helper); - if ((err = package_opus(data, size, &pkt)) != srs_success) { + if ((err = package_opus(data, size, helper)) != srs_success) { return srs_error_wrap(err, "package opus"); } - if ((err = source_->on_rtp(pkt)) != srs_success) { + if ((err = source_->on_rtp(helper->pkt)) != srs_success) { return srs_error_wrap(err, "consume opus"); } } @@ -800,11 +810,11 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio return err; } -srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPacket2** ppkt) +srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPacketCacheHelper* helper) { srs_error_t err = srs_success; - SrsRtpPacket2* pkt = new SrsRtpPacket2(); + SrsRtpPacket2* pkt = helper->pkt; pkt->header.set_payload_type(kAudioPayloadType); pkt->header.set_ssrc(audio_ssrc); pkt->frame_type = SrsFrameTypeAudio; @@ -821,8 +831,6 @@ srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPack raw->payload = pkt->wrap(data, size); raw->nn_payload = size; - *ppkt = pkt; - return err; } @@ -849,22 +857,22 @@ srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg) // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. if (has_idr) { - SrsRtpPacket2* pkt = NULL; - SrsAutoFree(SrsRtpPacket2, pkt); + SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper(); + SrsAutoFree(SrsRtpPacketCacheHelper, helper); - if ((err = package_stap_a(source_, msg, &pkt)) != srs_success) { + if ((err = package_stap_a(source_, msg, helper)) != srs_success) { return srs_error_wrap(err, "package stap-a"); } - if ((err = source_->on_rtp(pkt)) != srs_success) { + if ((err = source_->on_rtp(helper->pkt)) != srs_success) { return srs_error_wrap(err, "consume sps/pps"); } } // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. - vector pkts; + vector helpers; if (merge_nalus && nn_samples > 1) { - if ((err = package_nalus(msg, samples, pkts)) != srs_success) { + if ((err = package_nalus(msg, samples, helpers)) != srs_success) { return srs_error_wrap(err, "package nalus as one"); } } else { @@ -879,22 +887,22 @@ srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg) } if (sample->size <= kRtpMaxPayloadSize) { - if ((err = package_single_nalu(msg, sample, pkts)) != srs_success) { + if ((err = package_single_nalu(msg, sample, helpers)) != srs_success) { return srs_error_wrap(err, "package single nalu"); } } else { - if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, pkts)) != srs_success) { + if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, helpers)) != srs_success) { return srs_error_wrap(err, "package fu-a"); } } } } - if (pkts.size() > 0) { - pkts.back()->header.set_marker(true); + if (!helpers.empty()) { + helpers.back()->pkt->header.set_marker(true); } - return consume_packets(pkts); + return consume_packets(helpers); } srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, vector& samples) @@ -927,7 +935,7 @@ srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* f return err; } -srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt) +srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacketCacheHelper* helper) { srs_error_t err = srs_success; @@ -943,7 +951,7 @@ srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsShare return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty"); } - SrsRtpPacket2* pkt = new SrsRtpPacket2(); + SrsRtpPacket2* pkt = helper->pkt; pkt->header.set_payload_type(kVideoPayloadType); pkt->header.set_ssrc(video_ssrc); pkt->frame_type = SrsFrameTypeVideo; @@ -982,13 +990,12 @@ srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsShare payload += (int)pps.size(); } - *ppkt = pkt; srs_info("RTC STAP-A seq=%u, sps %d, pps %d bytes", pkt->header.get_sequence(), sps.size(), pps.size()); return err; } -srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const vector& samples, vector& pkts) +srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const vector& samples, vector& helpers) { srs_error_t err = srs_success; @@ -1024,7 +1031,10 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const if (nn_bytes < kRtpMaxPayloadSize) { // Package NALUs in a single RTP packet. - SrsRtpPacket2* pkt = new SrsRtpPacket2(); + SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper(); + helpers.push_back(helper); + + SrsRtpPacket2* pkt = helper->pkt; pkt->header.set_payload_type(kVideoPayloadType); pkt->header.set_ssrc(video_ssrc); pkt->frame_type = SrsFrameTypeVideo; @@ -1033,7 +1043,6 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const pkt->header.set_timestamp(msg->timestamp * 90); pkt->payload = raw; pkt->wrap(msg); - pkts.push_back(pkt); } else { // We must free it, should never use RTP packets to free it, // because more than one RTP packet will refer to it. @@ -1057,7 +1066,10 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes); } - SrsRtpPacket2* pkt = new SrsRtpPacket2(); + SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper(); + helpers.push_back(helper); + + SrsRtpPacket2* pkt = helper->pkt; pkt->header.set_payload_type(kVideoPayloadType); pkt->header.set_ssrc(video_ssrc); pkt->frame_type = SrsFrameTypeVideo; @@ -1072,7 +1084,6 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const pkt->payload = fua; pkt->wrap(msg); - pkts.push_back(pkt); nb_left -= packet_size; } @@ -1082,11 +1093,14 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const } // Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 -srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector& pkts) +srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector& helpers) { srs_error_t err = srs_success; - SrsRtpPacket2* pkt = new SrsRtpPacket2(); + SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper(); + helpers.push_back(helper); + + SrsRtpPacket2* pkt = helper->pkt; pkt->header.set_payload_type(kVideoPayloadType); pkt->header.set_ssrc(video_ssrc); pkt->frame_type = SrsFrameTypeVideo; @@ -1101,12 +1115,10 @@ srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, pkt->wrap(msg); - pkts.push_back(pkt); - return err; } -srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector& pkts) +srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector& helpers) { srs_error_t err = srs_success; @@ -1119,7 +1131,10 @@ srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSam for (int i = 0; i < num_of_packet; ++i) { int packet_size = srs_min(nb_left, fu_payload_size); - SrsRtpPacket2* pkt = new SrsRtpPacket2(); + SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper(); + helpers.push_back(helper); + + SrsRtpPacket2* pkt = helper->pkt; pkt->header.set_payload_type(kVideoPayloadType); pkt->header.set_ssrc(video_ssrc); pkt->frame_type = SrsFrameTypeVideo; @@ -1139,8 +1154,6 @@ srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSam pkt->wrap(msg); - pkts.push_back(pkt); - p += packet_size; nb_left -= packet_size; } @@ -1148,22 +1161,22 @@ srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSam return err; } -srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector& pkts) +srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector& helpers) { srs_error_t err = srs_success; // TODO: FIXME: Consume a range of packets. - for (int i = 0; i < (int)pkts.size(); i++) { - SrsRtpPacket2* pkt = pkts[i]; - if ((err = source_->on_rtp(pkt)) != srs_success) { + for (int i = 0; i < (int)helpers.size(); i++) { + SrsRtpPacketCacheHelper* helper = helpers[i]; + if ((err = source_->on_rtp(helper->pkt)) != srs_success) { err = srs_error_wrap(err, "consume sps/pps"); break; } } - for (int i = 0; i < (int)pkts.size(); i++) { - SrsRtpPacket2* pkt = pkts[i]; - srs_freep(pkt); + for (int i = 0; i < (int)helpers.size(); i++) { + SrsRtpPacketCacheHelper* helper = helpers[i]; + srs_freep(helper); } return err; diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 6d2334083..5d8f03f99 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -47,6 +47,7 @@ class SrsRtcStream; class SrsRtcFromRtmpBridger; class SrsAudioRecode; class SrsRtpPacket2; +class SrsRtpPacketCacheHelper; class SrsSample; class SrsRtcStreamDescription; class SrsRtcTrackDescription; @@ -222,6 +223,16 @@ public: std::vector get_track_desc(std::string type, std::string media_type); }; +// A helper class, to release the packet to cache. +class SrsRtpPacketCacheHelper +{ +public: + SrsRtpPacket2* pkt; +public: + SrsRtpPacketCacheHelper(); + virtual ~SrsRtpPacketCacheHelper(); +}; + #ifdef SRS_FFMPEG_FIT class SrsRtcFromRtmpBridger : public ISrsSourceBridger { @@ -252,16 +263,16 @@ public: virtual srs_error_t on_audio(SrsSharedPtrMessage* msg); private: srs_error_t transcode(char* adts_audio, int nn_adts_audio); - srs_error_t package_opus(char* data, int size, SrsRtpPacket2** ppkt); + srs_error_t package_opus(char* data, int size, SrsRtpPacketCacheHelper* helper); public: virtual srs_error_t on_video(SrsSharedPtrMessage* msg); private: srs_error_t filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, std::vector& samples); - srs_error_t package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt); - srs_error_t package_nalus(SrsSharedPtrMessage* msg, const std::vector& samples, std::vector& pkts); - srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector& pkts); - srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& pkts); - srs_error_t consume_packets(std::vector& pkts); + srs_error_t package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacketCacheHelper* helper); + srs_error_t package_nalus(SrsSharedPtrMessage* msg, const std::vector& samples, std::vector& helpers); + srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector& helpers); + srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& helpers); + srs_error_t consume_packets(std::vector& helpers); }; #endif