Refactor RTC publisher, audio/video use its special ring buffer

pull/1753/head
winlin 5 years ago
parent 80d45e5982
commit 899dddb624

@ -127,10 +127,8 @@ void SrsRtpNackForReceiver::update_rtt(int rtt)
opts_.nack_interval = rtt_; opts_.nack_interval = rtt_;
} }
SrsRtpQueue::SrsRtpQueue(int capacity) SrsRtpQueue::SrsRtpQueue()
{ {
queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(capacity);
jitter_ = 0; jitter_ = 0;
last_trans_time_ = -1; last_trans_time_ = -1;
@ -143,10 +141,33 @@ SrsRtpQueue::SrsRtpQueue(int capacity)
SrsRtpQueue::~SrsRtpQueue() SrsRtpQueue::~SrsRtpQueue()
{ {
srs_freep(queue_);
} }
srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) uint8_t SrsRtpQueue::get_fraction_lost()
{
int64_t total = (number_of_packet_lossed_ - pre_number_of_packet_lossed_ + num_of_packet_received_ - pre_number_of_packet_received_);
uint8_t loss = 0;
if (total > 0) {
loss = (number_of_packet_lossed_ - pre_number_of_packet_lossed_) * 256 / total;
}
pre_number_of_packet_lossed_ = number_of_packet_lossed_;
pre_number_of_packet_received_ = num_of_packet_received_;
return loss;
}
uint32_t SrsRtpQueue::get_cumulative_number_of_packets_lost()
{
return number_of_packet_lossed_;
}
uint32_t SrsRtpQueue::get_interarrival_jitter()
{
return static_cast<uint32_t>(jitter_);
}
srs_error_t SrsRtpQueue::on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -156,8 +177,6 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
uint16_t seq = pkt->rtp_header.get_sequence(); uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = NULL; SrsRtpNackInfo* nack_info = NULL;
// If no NACK, disable nack.
if (nack) { if (nack) {
nack_info = nack->find(seq); nack_info = nack->find(seq);
} }
@ -185,56 +204,14 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast<double>(cur_jitter) / 16.0); jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast<double>(cur_jitter) / 16.0);
} }
// OK, we got one new RTP packet, which is not in NACK. // OK, got new RTP packet.
if (!nack_info) { if (!nack_info) {
++num_of_packet_received_; ++num_of_packet_received_;
uint16_t nack_first = 0, nack_last = 0;
if (!queue_->update(seq, nack_first, nack_last)) {
srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end);
}
if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last);
}
} }
// Save packet at the position seq.
queue_->set(seq, pkt);
return err; return err;
} }
uint32_t SrsRtpQueue::get_extended_highest_sequence()
{
return queue_->get_extended_highest_sequence();
}
uint8_t SrsRtpQueue::get_fraction_lost()
{
int64_t total = (number_of_packet_lossed_ - pre_number_of_packet_lossed_ + num_of_packet_received_ - pre_number_of_packet_received_);
uint8_t loss = 0;
if (total > 0) {
loss = (number_of_packet_lossed_ - pre_number_of_packet_lossed_) * 256 / total;
}
pre_number_of_packet_lossed_ = number_of_packet_lossed_;
pre_number_of_packet_received_ = num_of_packet_received_;
return loss;
}
uint32_t SrsRtpQueue::get_cumulative_number_of_packets_lost()
{
return number_of_packet_lossed_;
}
uint32_t SrsRtpQueue::get_interarrival_jitter()
{
return static_cast<uint32_t>(jitter_);
}
void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last) void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last)
{ {
if (!nack) { if (!nack) {
@ -249,12 +226,14 @@ void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t fi
nack->check_queue_size(); nack->check_queue_size();
} }
SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) : SrsRtpQueue(capacity) SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity)
{ {
queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(capacity);
} }
SrsRtpAudioQueue::~SrsRtpAudioQueue() SrsRtpAudioQueue::~SrsRtpAudioQueue()
{ {
srs_freep(queue_);
} }
void SrsRtpAudioQueue::notify_drop_seq(uint16_t seq) void SrsRtpAudioQueue::notify_drop_seq(uint16_t seq)
@ -274,9 +253,43 @@ void SrsRtpAudioQueue::notify_nack_list_full()
queue_->advance_to(queue_->end); queue_->advance_to(queue_->end);
} }
uint32_t SrsRtpAudioQueue::get_extended_highest_sequence()
{
return queue_->get_extended_highest_sequence();
}
srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{ {
return SrsRtpQueue::consume(nack, pkt); srs_error_t err = srs_success;
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = NULL;
if (nack) {
nack_info = nack->find(seq);
}
if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) {
return err;
}
// OK, we got one new RTP packet, which is not in NACK.
if (!nack_info) {
uint16_t nack_first = 0, nack_last = 0;
if (!queue_->update(seq, nack_first, nack_last)) {
srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end);
}
if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last);
}
}
// Save packet at the position seq.
queue_->set(seq, pkt);
return err;
} }
void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frames) void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frames)
@ -323,13 +336,15 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtp
} }
} }
SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) : SrsRtpQueue(capacity) SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity)
{ {
request_key_frame_ = false; request_key_frame_ = false;
queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(capacity);
} }
SrsRtpVideoQueue::~SrsRtpVideoQueue() SrsRtpVideoQueue::~SrsRtpVideoQueue()
{ {
srs_freep(queue_);
} }
void SrsRtpVideoQueue::notify_drop_seq(uint16_t seq) void SrsRtpVideoQueue::notify_drop_seq(uint16_t seq)
@ -350,6 +365,11 @@ void SrsRtpVideoQueue::notify_nack_list_full()
queue_->advance_to(next); queue_->advance_to(next);
} }
uint32_t SrsRtpVideoQueue::get_extended_highest_sequence()
{
return queue_->get_extended_highest_sequence();
}
srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -376,10 +396,33 @@ srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2
} }
} }
if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) { uint16_t seq = pkt->rtp_header.get_sequence();
return srs_error_wrap(err, "video consume");
SrsRtpNackInfo* nack_info = NULL;
if (nack) {
nack_info = nack->find(seq);
}
if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) {
return err;
} }
// OK, we got one new RTP packet, which is not in NACK.
if (!nack_info) {
uint16_t nack_first = 0, nack_last = 0;
if (!queue_->update(seq, nack_first, nack_last)) {
srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end);
}
if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last);
}
}
// Save packet at the position seq.
queue_->set(seq, pkt);
return err; return err;
} }

@ -239,33 +239,34 @@ private:
uint64_t pre_number_of_packet_lossed_; uint64_t pre_number_of_packet_lossed_;
uint64_t num_of_packet_received_; uint64_t num_of_packet_received_;
uint64_t number_of_packet_lossed_; uint64_t number_of_packet_lossed_;
protected:
SrsRtpRingBuffer<SrsRtpPacket2*>* queue_;
public: public:
SrsRtpQueue(int capacity); SrsRtpQueue();
virtual ~SrsRtpQueue(); virtual ~SrsRtpQueue();
public: public:
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
virtual void notify_drop_seq(uint16_t seq) = 0; virtual void notify_drop_seq(uint16_t seq) = 0;
virtual void notify_nack_list_full() = 0; virtual void notify_nack_list_full() = 0;
public: virtual uint32_t get_extended_highest_sequence() = 0;
uint32_t get_extended_highest_sequence();
uint8_t get_fraction_lost(); uint8_t get_fraction_lost();
uint32_t get_cumulative_number_of_packets_lost(); uint32_t get_cumulative_number_of_packets_lost();
uint32_t get_interarrival_jitter(); uint32_t get_interarrival_jitter();
private: protected:
srs_error_t on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last); void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last);
}; };
class SrsRtpAudioQueue : public SrsRtpQueue class SrsRtpAudioQueue : public SrsRtpQueue
{ {
private:
SrsRtpRingBuffer<SrsRtpPacket2*>* queue_;
public: public:
SrsRtpAudioQueue(int capacity); SrsRtpAudioQueue(int capacity);
virtual ~SrsRtpAudioQueue(); virtual ~SrsRtpAudioQueue();
public: public:
virtual void notify_drop_seq(uint16_t seq); virtual void notify_drop_seq(uint16_t seq);
virtual void notify_nack_list_full(); virtual void notify_nack_list_full();
virtual uint32_t get_extended_highest_sequence();
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
public:
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames); virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames);
}; };
@ -273,12 +274,14 @@ class SrsRtpVideoQueue : public SrsRtpQueue
{ {
private: private:
bool request_key_frame_; bool request_key_frame_;
SrsRtpRingBuffer<SrsRtpPacket2*>* queue_;
public: public:
SrsRtpVideoQueue(int capacity); SrsRtpVideoQueue(int capacity);
virtual ~SrsRtpVideoQueue(); virtual ~SrsRtpVideoQueue();
public: public:
virtual void notify_drop_seq(uint16_t seq); virtual void notify_drop_seq(uint16_t seq);
virtual void notify_nack_list_full(); virtual void notify_nack_list_full();
virtual uint32_t get_extended_highest_sequence();
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frame); virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frame);
bool should_request_key_frame(); bool should_request_key_frame();

Loading…
Cancel
Save