RTC: Apply RTP packet cache manager

pull/2252/head
winlin 4 years ago
parent 1833780655
commit 439a7fa655

@ -606,7 +606,7 @@ srs_error_t SrsRtcPlayStream::cycle()
for (int i = 0; i < msg_count; i++) {
SrsRtpPacket2* pkt = pkts[i];
srs_freep(pkt);
_srs_rtp_cache->recycle(pkt);
}
pkts.clear();
}
@ -1189,13 +1189,29 @@ srs_error_t SrsRtcPublishStream::on_rtp_plaintext(char* plaintext, int nb_plaint
_srs_blackhole->sendto(plaintext, nb_plaintext);
}
// Decode the RTP packet from buffer.
SrsRtpPacket2* pkt = new SrsRtpPacket2();
SrsAutoFree(SrsRtpPacket2, pkt);
// Allocate packet form cache.
SrsRtpPacket2* pkt = _srs_rtp_cache->allocate();
// Copy the packet body.
pkt->wrap(plaintext, nb_plaintext);
srs_assert(pkt->cache_buffer()->pos() == 0);
// Handle the packet.
err = do_on_rtp_plaintext(pkt);
// Release the packet to cache.
_srs_rtp_cache->recycle(pkt);
return err;
}
srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
pkt->set_decode_handler(this);
pkt->set_extension_types(&extension_types_);
pkt->wrap(plaintext, nb_plaintext);
if ((err = pkt->decode(pkt->cache_buffer())) != srs_success) {
return srs_error_wrap(err, "decode rtp packet");
}

@ -356,6 +356,8 @@ public:
private:
// @remark We copy the plaintext, user should free it.
srs_error_t on_rtp_plaintext(char* plaintext, int nb_plaintext);
private:
srs_error_t do_on_rtp_plaintext(SrsRtpPacket2* pkt);
public:
srs_error_t check_send_nacks();
public:

@ -49,7 +49,7 @@ SrsRtpRingBuffer::~SrsRtpRingBuffer()
{
for (int i = 0; i < capacity_; ++i) {
SrsRtpPacket2* pkt = queue_[i];
srs_freep(pkt);
_srs_rtp_cache->recycle(pkt);
}
srs_freepa(queue_);
}
@ -76,7 +76,7 @@ void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt)
SrsRtpPacket2* p = queue_[at % capacity_];
if (p) {
srs_freep(p);
_srs_rtp_cache->recycle(p);
}
queue_[at % capacity_] = pkt;
@ -164,7 +164,7 @@ void SrsRtpRingBuffer::clear_histroy(uint16_t seq)
for (uint16_t i = 0; i < capacity_; i++) {
SrsRtpPacket2* p = queue_[i];
if (p && p->header.get_sequence() < seq) {
srs_freep(p);
_srs_rtp_cache->recycle(p);
queue_[i] = NULL;
}
}
@ -175,7 +175,7 @@ void SrsRtpRingBuffer::clear_all_histroy()
for (uint16_t i = 0; i < capacity_; i++) {
SrsRtpPacket2* p = queue_[i];
if (p) {
srs_freep(p);
_srs_rtp_cache->recycle(p);
queue_[i] = NULL;
}
}

@ -169,7 +169,7 @@ SrsRtcConsumer::~SrsRtcConsumer()
vector<SrsRtpPacket2*>::iterator it;
for (it = queue.begin(); it != queue.end(); ++it) {
SrsRtpPacket2* pkt = *it;
srs_freep(pkt);
_srs_rtp_cache->recycle(pkt);
}
srs_cond_destroy(mw_wait);
@ -582,6 +582,16 @@ std::vector<SrsRtcTrackDescription*> SrsRtcStream::get_track_desc(std::string ty
return track_descs;
}
SrsRtpPacketCacheHelper::SrsRtpPacketCacheHelper()
{
pkt = _srs_rtp_cache->allocate();
}
SrsRtpPacketCacheHelper::~SrsRtpPacketCacheHelper()
{
_srs_rtp_cache->recycle(pkt);
}
#ifdef SRS_FFMPEG_FIT
SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source)
{
@ -785,14 +795,14 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio
// TODO: FIXME: Use it to padding audios.
nn_max_extra_payload = srs_max(nn_max_extra_payload, size);
SrsRtpPacket2* pkt = NULL;
SrsAutoFree(SrsRtpPacket2, pkt);
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
SrsAutoFree(SrsRtpPacketCacheHelper, helper);
if ((err = package_opus(data, size, &pkt)) != srs_success) {
if ((err = package_opus(data, size, helper)) != srs_success) {
return srs_error_wrap(err, "package opus");
}
if ((err = source_->on_rtp(pkt)) != srs_success) {
if ((err = source_->on_rtp(helper->pkt)) != srs_success) {
return srs_error_wrap(err, "consume opus");
}
}
@ -800,11 +810,11 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPacket2** ppkt)
srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPacketCacheHelper* helper)
{
srs_error_t err = srs_success;
SrsRtpPacket2* pkt = new SrsRtpPacket2();
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kAudioPayloadType);
pkt->header.set_ssrc(audio_ssrc);
pkt->frame_type = SrsFrameTypeAudio;
@ -821,8 +831,6 @@ srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPack
raw->payload = pkt->wrap(data, size);
raw->nn_payload = size;
*ppkt = pkt;
return err;
}
@ -849,22 +857,22 @@ srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg)
// Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A.
if (has_idr) {
SrsRtpPacket2* pkt = NULL;
SrsAutoFree(SrsRtpPacket2, pkt);
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
SrsAutoFree(SrsRtpPacketCacheHelper, helper);
if ((err = package_stap_a(source_, msg, &pkt)) != srs_success) {
if ((err = package_stap_a(source_, msg, helper)) != srs_success) {
return srs_error_wrap(err, "package stap-a");
}
if ((err = source_->on_rtp(pkt)) != srs_success) {
if ((err = source_->on_rtp(helper->pkt)) != srs_success) {
return srs_error_wrap(err, "consume sps/pps");
}
}
// If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet.
vector<SrsRtpPacket2*> pkts;
vector<SrsRtpPacketCacheHelper*> helpers;
if (merge_nalus && nn_samples > 1) {
if ((err = package_nalus(msg, samples, pkts)) != srs_success) {
if ((err = package_nalus(msg, samples, helpers)) != srs_success) {
return srs_error_wrap(err, "package nalus as one");
}
} else {
@ -879,22 +887,22 @@ srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg)
}
if (sample->size <= kRtpMaxPayloadSize) {
if ((err = package_single_nalu(msg, sample, pkts)) != srs_success) {
if ((err = package_single_nalu(msg, sample, helpers)) != srs_success) {
return srs_error_wrap(err, "package single nalu");
}
} else {
if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, pkts)) != srs_success) {
if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, helpers)) != srs_success) {
return srs_error_wrap(err, "package fu-a");
}
}
}
}
if (pkts.size() > 0) {
pkts.back()->header.set_marker(true);
if (!helpers.empty()) {
helpers.back()->pkt->header.set_marker(true);
}
return consume_packets(pkts);
return consume_packets(helpers);
}
srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, vector<SrsSample*>& samples)
@ -927,7 +935,7 @@ srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* f
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt)
srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacketCacheHelper* helper)
{
srs_error_t err = srs_success;
@ -943,7 +951,7 @@ srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsShare
return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty");
}
SrsRtpPacket2* pkt = new SrsRtpPacket2();
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
@ -982,13 +990,12 @@ srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcStream* source, SrsShare
payload += (int)pps.size();
}
*ppkt = pkt;
srs_info("RTC STAP-A seq=%u, sps %d, pps %d bytes", pkt->header.get_sequence(), sps.size(), pps.size());
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const vector<SrsSample*>& samples, vector<SrsRtpPacket2*>& pkts)
srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const vector<SrsSample*>& samples, vector<SrsRtpPacketCacheHelper*>& helpers)
{
srs_error_t err = srs_success;
@ -1024,7 +1031,10 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
if (nn_bytes < kRtpMaxPayloadSize) {
// Package NALUs in a single RTP packet.
SrsRtpPacket2* pkt = new SrsRtpPacket2();
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
helpers.push_back(helper);
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
@ -1033,7 +1043,6 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
pkt->header.set_timestamp(msg->timestamp * 90);
pkt->payload = raw;
pkt->wrap(msg);
pkts.push_back(pkt);
} else {
// We must free it, should never use RTP packets to free it,
// because more than one RTP packet will refer to it.
@ -1057,7 +1066,10 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes);
}
SrsRtpPacket2* pkt = new SrsRtpPacket2();
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
helpers.push_back(helper);
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
@ -1072,7 +1084,6 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
pkt->payload = fua;
pkt->wrap(msg);
pkts.push_back(pkt);
nb_left -= packet_size;
}
@ -1082,11 +1093,14 @@ srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, const
}
// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6
srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector<SrsRtpPacket2*>& pkts)
srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector<SrsRtpPacketCacheHelper*>& helpers)
{
srs_error_t err = srs_success;
SrsRtpPacket2* pkt = new SrsRtpPacket2();
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
helpers.push_back(helper);
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
@ -1101,12 +1115,10 @@ srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg,
pkt->wrap(msg);
pkts.push_back(pkt);
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector<SrsRtpPacket2*>& pkts)
srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector<SrsRtpPacketCacheHelper*>& helpers)
{
srs_error_t err = srs_success;
@ -1119,7 +1131,10 @@ srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSam
for (int i = 0; i < num_of_packet; ++i) {
int packet_size = srs_min(nb_left, fu_payload_size);
SrsRtpPacket2* pkt = new SrsRtpPacket2();
SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper();
helpers.push_back(helper);
SrsRtpPacket2* pkt = helper->pkt;
pkt->header.set_payload_type(kVideoPayloadType);
pkt->header.set_ssrc(video_ssrc);
pkt->frame_type = SrsFrameTypeVideo;
@ -1139,8 +1154,6 @@ srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSam
pkt->wrap(msg);
pkts.push_back(pkt);
p += packet_size;
nb_left -= packet_size;
}
@ -1148,22 +1161,22 @@ srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSam
return err;
}
srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector<SrsRtpPacket2*>& pkts)
srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector<SrsRtpPacketCacheHelper*>& helpers)
{
srs_error_t err = srs_success;
// TODO: FIXME: Consume a range of packets.
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts[i];
if ((err = source_->on_rtp(pkt)) != srs_success) {
for (int i = 0; i < (int)helpers.size(); i++) {
SrsRtpPacketCacheHelper* helper = helpers[i];
if ((err = source_->on_rtp(helper->pkt)) != srs_success) {
err = srs_error_wrap(err, "consume sps/pps");
break;
}
}
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts[i];
srs_freep(pkt);
for (int i = 0; i < (int)helpers.size(); i++) {
SrsRtpPacketCacheHelper* helper = helpers[i];
srs_freep(helper);
}
return err;

@ -47,6 +47,7 @@ class SrsRtcStream;
class SrsRtcFromRtmpBridger;
class SrsAudioRecode;
class SrsRtpPacket2;
class SrsRtpPacketCacheHelper;
class SrsSample;
class SrsRtcStreamDescription;
class SrsRtcTrackDescription;
@ -222,6 +223,16 @@ public:
std::vector<SrsRtcTrackDescription*> get_track_desc(std::string type, std::string media_type);
};
// A helper class, to release the packet to cache.
class SrsRtpPacketCacheHelper
{
public:
SrsRtpPacket2* pkt;
public:
SrsRtpPacketCacheHelper();
virtual ~SrsRtpPacketCacheHelper();
};
#ifdef SRS_FFMPEG_FIT
class SrsRtcFromRtmpBridger : public ISrsSourceBridger
{
@ -252,16 +263,16 @@ public:
virtual srs_error_t on_audio(SrsSharedPtrMessage* msg);
private:
srs_error_t transcode(char* adts_audio, int nn_adts_audio);
srs_error_t package_opus(char* data, int size, SrsRtpPacket2** ppkt);
srs_error_t package_opus(char* data, int size, SrsRtpPacketCacheHelper* helper);
public:
virtual srs_error_t on_video(SrsSharedPtrMessage* msg);
private:
srs_error_t filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, std::vector<SrsSample*>& samples);
srs_error_t package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt);
srs_error_t package_nalus(SrsSharedPtrMessage* msg, const std::vector<SrsSample*>& samples, std::vector<SrsRtpPacket2*>& pkts);
srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector<SrsRtpPacket2*>& pkts);
srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector<SrsRtpPacket2*>& pkts);
srs_error_t consume_packets(std::vector<SrsRtpPacket2*>& pkts);
srs_error_t package_stap_a(SrsRtcStream* source, SrsSharedPtrMessage* msg, SrsRtpPacketCacheHelper* helper);
srs_error_t package_nalus(SrsSharedPtrMessage* msg, const std::vector<SrsSample*>& samples, std::vector<SrsRtpPacketCacheHelper*>& helpers);
srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector<SrsRtpPacketCacheHelper*>& helpers);
srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector<SrsRtpPacketCacheHelper*>& helpers);
srs_error_t consume_packets(std::vector<SrsRtpPacketCacheHelper*>& helpers);
};
#endif

Loading…
Cancel
Save