Refine RTP ring buffer, change to template

pull/1753/head
winlin 5 years ago
parent 42ae71e96c
commit 80d45e5982

@ -616,8 +616,8 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid)
realtime = true;
// TODO: FIXME: Config the capacity?
audio_queue_ = new SrsRtpRingBuffer(100);
video_queue_ = new SrsRtpRingBuffer(1000);
audio_queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(100);
video_queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(1000);
nn_simulate_nack_drop = 0;
nack_enabled_ = false;

@ -34,6 +34,7 @@
#include <srs_app_sdp.hpp>
#include <srs_app_reload.hpp>
#include <srs_kernel_rtp.hpp>
#include <srs_app_rtp_queue.hpp>
#include <string>
#include <map>
@ -59,7 +60,6 @@ class SrsRtpPacket2;
class ISrsCodec;
class SrsRtpNackForReceiver;
class SrsRtpIncommingVideoFrame;
class SrsRtpRingBuffer;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -214,8 +214,8 @@ private:
uint16_t video_payload_type;
uint32_t video_ssrc;
// NACK ARQ ring buffer.
SrsRtpRingBuffer* audio_queue_;
SrsRtpRingBuffer* video_queue_;
SrsRtpRingBuffer<SrsRtpPacket2*>* audio_queue_;
SrsRtpRingBuffer<SrsRtpPacket2*>* video_queue_;
// Simulators.
int nn_simulate_nack_drop;
private:

@ -127,114 +127,9 @@ void SrsRtpNackForReceiver::update_rtt(int rtt)
opts_.nack_interval = rtt_;
}
SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity)
{
nn_seq_flip_backs = 0;
begin = end = 0;
capacity_ = (uint16_t)capacity;
initialized_ = false;
queue_ = new SrsRtpPacket2*[capacity_];
memset(queue_, 0, sizeof(SrsRtpPacket2*) * capacity);
}
SrsRtpRingBuffer::~SrsRtpRingBuffer()
{
srs_freepa(queue_);
}
bool SrsRtpRingBuffer::empty()
{
return begin == end;
}
int SrsRtpRingBuffer::size()
{
int size = srs_rtp_seq_distance(begin, end);
srs_assert(size >= 0);
return size;
}
void SrsRtpRingBuffer::advance_to(uint16_t seq)
{
begin = seq;
}
void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt)
{
SrsRtpPacket2* p = queue_[at % capacity_];
if (p) {
srs_freep(p);
}
queue_[at % capacity_] = pkt;
}
void SrsRtpRingBuffer::remove(uint16_t at)
{
set(at, NULL);
}
void SrsRtpRingBuffer::reset(uint16_t first, uint16_t last)
{
for (uint16_t s = first; s != last; ++s) {
queue_[s % capacity_] = NULL;
}
}
bool SrsRtpRingBuffer::overflow()
{
return srs_rtp_seq_distance(begin, end) >= capacity_;
}
uint32_t SrsRtpRingBuffer::get_extended_highest_sequence()
{
return nn_seq_flip_backs * 65536 + end - 1;
}
void SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last)
{
if (!initialized_) {
initialized_ = true;
begin = seq;
end = seq + 1;
return;
}
// Normal sequence, seq follows high_.
if (srs_rtp_seq_distance(end, seq) >= 0) {
nack_first = end;
nack_last = seq;
// When distance(seq,high_)>0 and seq<high_, seq must flip back,
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
// TODO: FIXME: The first flip may be dropped.
if (seq < end) {
++nn_seq_flip_backs;
}
end = seq + 1;
return;
}
// Out-of-order sequence, seq before low_.
if (srs_rtp_seq_distance(seq, begin) > 0) {
// When startup, we may receive packets in chaos order.
// Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first packet client sent.
// @remark We only log a warning, because it seems ok for publisher.
srs_warn("too old seq %u, range [%u, %u]", seq, begin, end);
}
}
SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq)
{
return queue_[seq % capacity_];
}
SrsRtpQueue::SrsRtpQueue(int capacity)
{
queue_ = new SrsRtpRingBuffer(capacity);
queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(capacity);
jitter_ = 0;
last_trans_time_ = -1;
@ -293,8 +188,12 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
// OK, we got one new RTP packet, which is not in NACK.
if (!nack_info) {
++num_of_packet_received_;
uint16_t nack_first = 0, nack_last = 0;
queue_->update(seq, nack_first, nack_last);
if (!queue_->update(seq, nack_first, nack_last)) {
srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end);
}
if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last);

@ -113,13 +113,14 @@ public:
// but not an entire video frame right now.
// * seq10: This packet is lost or not received, we put it in the nack list.
// We store the received packets in ring buffer.
template<typename T>
class SrsRtpRingBuffer
{
private:
// Capacity of the ring-buffer.
uint16_t capacity_;
// Ring bufer.
SrsRtpPacket2** queue_;
T* queue_;
// Increase one when uint16 flip back, for get_extended_highest_sequence.
uint64_t nn_seq_flip_backs;
// Whether initialized, because we use uint16 so we can't use -1.
@ -132,28 +133,100 @@ public:
// For example, when got 1 elems, the end is 1.
uint16_t end;
public:
SrsRtpRingBuffer(int capacity);
virtual ~SrsRtpRingBuffer();
SrsRtpRingBuffer(int capacity) {
nn_seq_flip_backs = 0;
begin = end = 0;
capacity_ = (uint16_t)capacity;
initialized_ = false;
queue_ = new T[capacity_];
memset(queue_, 0, sizeof(T) * capacity);
}
virtual ~SrsRtpRingBuffer() {
srs_freepa(queue_);
}
public:
// Whether the ring buffer is empty.
bool empty();
bool empty() {
return begin == end;
}
// Get the count of elems in ring buffer.
int size();
int size() {
int size = srs_rtp_seq_distance(begin, end);
srs_assert(size >= 0);
return size;
}
// Move the low position of buffer to seq.
void advance_to(uint16_t seq);
void advance_to(uint16_t seq) {
begin = seq;
}
// Free the packet at position.
void set(uint16_t at, SrsRtpPacket2* pkt);
void remove(uint16_t at);
void set(uint16_t at, T pkt) {
T p = queue_[at % capacity_];
if (p) {
srs_freep(p);
}
queue_[at % capacity_] = pkt;
}
void remove(uint16_t at) {
set(at, NULL);
}
// Directly reset range [first, last) to NULL.
void reset(uint16_t first, uint16_t last);
void reset(uint16_t first, uint16_t last) {
for (uint16_t s = first; s != last; ++s) {
queue_[s % capacity_] = NULL;
}
}
// Whether queue overflow or heavy(too many packets and need clear).
bool overflow();
bool overflow() {
return srs_rtp_seq_distance(begin, end) >= capacity_;
}
// The highest sequence number, calculate the flip back base.
uint32_t get_extended_highest_sequence();
uint32_t get_extended_highest_sequence() {
return nn_seq_flip_backs * 65536 + end - 1;
}
// Update the sequence, got the nack range by [first, last).
void update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last);
// @return If false, the seq is too old.
bool update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last) {
if (!initialized_) {
initialized_ = true;
begin = seq;
end = seq + 1;
return true;
}
// Normal sequence, seq follows high_.
if (srs_rtp_seq_distance(end, seq) >= 0) {
nack_first = end;
nack_last = seq;
// When distance(seq,high_)>0 and seq<high_, seq must flip back,
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
// TODO: FIXME: The first flip may be dropped.
if (seq < end) {
++nn_seq_flip_backs;
}
end = seq + 1;
return true;
}
// Out-of-order sequence, seq before low_.
if (srs_rtp_seq_distance(seq, begin) > 0) {
// When startup, we may receive packets in chaos order.
// Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first packet client sent.
// @remark We only log a warning, because it seems ok for publisher.
return false;
}
return true;
}
// Get the packet by seq.
SrsRtpPacket2* at(uint16_t seq);
T at(uint16_t seq) {
return queue_[seq % capacity_];
}
};
class SrsRtpQueue
@ -167,7 +240,7 @@ private:
uint64_t num_of_packet_received_;
uint64_t number_of_packet_lossed_;
protected:
SrsRtpRingBuffer* queue_;
SrsRtpRingBuffer<SrsRtpPacket2*>* queue_;
public:
SrsRtpQueue(int capacity);
virtual ~SrsRtpQueue();

Loading…
Cancel
Save