RTC: Refine NACK, remove dead code

pull/1809/head
winlin 5 years ago
parent f81d35d20f
commit ab6e3cae52

@ -507,8 +507,8 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid)
realtime = true;
// TODO: FIXME: Config the capacity?
audio_queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(100);
video_queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(1000);
audio_queue_ = new SrsRtpRingBuffer(100);
video_queue_ = new SrsRtpRingBuffer(1000);
nn_simulate_nack_drop = 0;
nack_enabled_ = false;
@ -1378,9 +1378,10 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS);
session_ = session;
video_queue_ = new SrsRtpVideoQueue(1000);
request_keyframe_ = false;
video_queue_ = new SrsRtpRingBuffer(1000);
video_nack_ = new SrsRtpNackForReceiver(video_queue_, 1000 * 2 / 3);
audio_queue_ = new SrsRtpAudioQueue(100);
audio_queue_ = new SrsRtpRingBuffer(100);
audio_nack_ = new SrsRtpNackForReceiver(audio_queue_, 100 * 2 / 3);
source = NULL;
@ -1490,7 +1491,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr
}
}
srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue)
srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue)
{
srs_error_t err = srs_success;
@ -1507,10 +1508,12 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue)
stream.write_2bytes(7);
stream.write_4bytes(ssrc); // TODO: FIXME: Should be 1?
uint8_t fraction_lost = rtp_queue->get_fraction_lost();
uint32_t cumulative_number_of_packets_lost = rtp_queue->get_cumulative_number_of_packets_lost() & 0x7FFFFF;
// TODO: FIXME: Implements it.
// TODO: FIXME: See https://github.com/ossrs/srs/blob/f81d35d20f04ebec01915cb78a882e45b7ee8800/trunk/src/app/srs_app_rtc_queue.cpp
uint8_t fraction_lost = 0;
uint32_t cumulative_number_of_packets_lost = 0 & 0x7FFFFF;
uint32_t extended_highest_sequence = rtp_queue->get_extended_highest_sequence();
uint32_t interarrival_jitter = rtp_queue->get_interarrival_jitter();
uint32_t interarrival_jitter = 0;
uint32_t rr_lsr = 0;
uint32_t rr_dlsr = 0;
@ -1648,15 +1651,17 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
// Decode the RTP packet from buffer.
SrsRtpPacket2* pkt = new SrsRtpPacket2();
if (true) {
pkt->set_decode_handler(this);
pkt->shared_msg = new SrsSharedPtrMessage();
pkt->shared_msg->wrap(buf, nb_buf);
pkt->set_decode_handler(this);
pkt->shared_msg = new SrsSharedPtrMessage();
pkt->shared_msg->wrap(buf, nb_buf);
SrsBuffer b(buf, nb_buf);
if ((err = pkt->decode(&b)) != srs_success) {
return srs_error_wrap(err, "decode rtp packet");
SrsBuffer b(buf, nb_buf);
if ((err = pkt->decode(&b)) != srs_success) {
return srs_error_wrap(err, "decode rtp packet");
}
}
// For NACK simulator, drop packet.
@ -1666,15 +1671,27 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf)
return err;
}
// For source to consume packet.
uint32_t ssrc = pkt->rtp_header.get_ssrc();
if (ssrc == audio_ssrc) {
return on_audio(pkt);
if ((err = on_audio(pkt)) != srs_success) {
return srs_error_wrap(err, "on audio");
}
} else if (ssrc == video_ssrc) {
return on_video(pkt);
if ((err = on_video(pkt)) != srs_success) {
return srs_error_wrap(err, "on video");
}
} else {
srs_freep(pkt);
return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc);
}
// For NACK to handle packet.
if (nack_enabled_ && (err = on_nack(pkt)) != srs_success) {
return srs_error_wrap(err, "on nack");
}
return err;
}
void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload)
@ -1709,40 +1726,6 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt)
source->on_rtp(pkt);
return err;
// TODO: FIXME: Directly dispatch to consumer for performance?
std::vector<SrsRtpPacket2*> frames;
if (nack_enabled_) {
// TODO: FIXME: Error check.
audio_queue_->consume(audio_nack_, pkt);
check_send_nacks(audio_nack_, audio_ssrc);
// Collect all audio frames.
audio_queue_->collect_frames(audio_nack_, frames);
} else {
// TODO: FIXME: Error check.
audio_queue_->consume(NULL, pkt);
// Collect all audio frames.
audio_queue_->collect_frames(NULL, frames);
}
for (size_t i = 0; i < frames.size(); ++i) {
SrsRtpPacket2* frame = frames[i];
// TODO: FIXME: Check error.
source->on_rtp(frame);
if (nn_audio_frames++ == 0) {
SrsRtpHeader* h = &frame->rtp_header;
SrsRtpRawPayload* payload = dynamic_cast<SrsRtpRawPayload*>(frame->payload);
srs_trace("RTC got Opus seq=%u, ssrc=%u, ts=%u, %d bytes", h->get_sequence(), h->get_ssrc(), h->get_timestamp(), payload->nn_payload);
}
}
return err;
}
srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
@ -1754,152 +1737,38 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
// TODO: FIXME: Error check.
source->on_rtp(pkt);
if (video_queue_->should_request_key_frame()) {
// TODO: FIXME: Check error.
send_rtcp_fb_pli(video_ssrc);
}
return err;
std::vector<SrsRtpPacket2*> frames;
if (nack_enabled_) {
// TODO: FIXME: Error check.
video_queue_->consume(video_nack_, pkt);
check_send_nacks(video_nack_, video_ssrc);
// Collect video frames.
video_queue_->collect_frames(video_nack_, frames);
} else {
// TODO: FIXME: Error check.
video_queue_->consume(NULL, pkt);
// Collect video frames.
video_queue_->collect_frames(NULL, frames);
}
for (size_t i = 0; i < frames.size(); ++i) {
SrsRtpPacket2* frame = frames[i];
if (request_keyframe_) {
request_keyframe_ = false;
// TODO: FIXME: Check error.
on_video_frame(frame);
srs_freep(frame);
}
if (video_queue_->should_request_key_frame()) {
// TODO: FIXME: Check error.
send_rtcp_fb_pli(video_ssrc);
}
return srs_success;
return err;
}
srs_error_t SrsRtcPublisher::on_video_frame(SrsRtpPacket2* frame)
srs_error_t SrsRtcPublisher::on_nack(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
int64_t timestamp = frame->rtp_header.get_timestamp();
// No FU-A, because we convert it to RAW RTP packet.
if (frame->nalu_type == (SrsAvcNaluType)kFuA) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid FU-A");
}
// For STAP-A, it must be SPS/PPS, and only one packet.
if (frame->nalu_type == (SrsAvcNaluType)kStapA) {
SrsRtpSTAPPayload* payload = dynamic_cast<SrsRtpSTAPPayload*>(frame->payload);
if (!payload) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload");
}
SrsSample* sps = payload->get_sps();
SrsSample* pps = payload->get_pps();
if (!sps || !sps->size) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload no sps");
}
if (!pps || !pps->size) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload no pps");
}
// TODO: FIXME: Directly covert to sample for performance.
// 5 bytes flv tag header.
// 5 bytes sps/pps sequence header.
// 6 bytes size for sps/pps, each is 3 bytes.
int nn_payload = sps->size + pps->size + 5 + 5 + 6;
char* data = new char[nn_payload];
SrsBuffer buf(data, nn_payload);
buf.write_1bytes(0x17); // Keyframe.
buf.write_1bytes(0x00); // Sequence header.
buf.write_3bytes(0x00); // CTS.
// FIXME: Replace magic number for avc_demux_sps_pps.
buf.write_1bytes(0x01); // configurationVersion
buf.write_1bytes(0x42); // AVCProfileIndication, 0x42 = Baseline
buf.write_1bytes(0xC0); // profile_compatibility
buf.write_1bytes(0x1f); // AVCLevelIndication, 0x1f = Level3.1
buf.write_1bytes(0x03); // lengthSizeMinusOne, size of length for NALU.
buf.write_1bytes(0x01); // numOfSequenceParameterSets
buf.write_2bytes(sps->size); // sequenceParameterSetLength
buf.write_bytes(sps->bytes, sps->size); // sps
buf.write_1bytes(0x01); // numOfPictureParameterSets
buf.write_2bytes(pps->size); // pictureParameterSetLength
buf.write_bytes(pps->bytes, pps->size); // pps
SrsMessageHeader header;
header.message_type = RTMP_MSG_VideoMessage;
// TODO: FIXME: Maybe the tbn is not 90k.
header.timestamp = (timestamp / 90) & 0x3fffffff;
SrsCommonMessage* shared_video = new SrsCommonMessage();
SrsAutoFree(SrsCommonMessage, shared_video);
// TODO: FIXME: Check error.
shared_video->create(&header, data, nn_payload);
return source->on_video(shared_video);
}
// For RAW NALU, should be one RAW packet.
SrsRtpRawPayload* payload = dynamic_cast<SrsRtpRawPayload*>(frame->payload);
if (!payload) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "RAW-NALU payload");
}
if (!payload->nn_payload) {
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = audio_nack_->find(seq);
if (nack_info) {
return err;
}
// TODO: FIXME: Directly covert to sample for performance.
// 5 bytes FLV tag header.
// 4 bytes NALU IBMF header, define by sequence header.
int nn_payload = payload->nn_payload + 5 + 4;
char* data = new char[nn_payload];
SrsBuffer buf(data, nn_payload);
if (frame->nalu_type == SrsAvcNaluTypeIDR) {
buf.write_1bytes(0x17); // Keyframe.
SrsRtpHeader* h = &frame->rtp_header;
srs_trace("RTC got IDR seq=%u, ssrc=%u, ts=%u, %d bytes", h->get_sequence(), h->get_ssrc(), h->get_timestamp(), nn_payload);
} else {
buf.write_1bytes(0x27); // Not-Keyframe.
uint16_t nack_first = 0, nack_last = 0;
if (!audio_queue_->update(seq, nack_first, nack_last)) {
srs_warn("too old seq %u, range [%u, %u]", seq, audio_queue_->begin, audio_queue_->end);
}
buf.write_1bytes(0x01); // Not-SequenceHeader.
buf.write_3bytes(0x00); // CTS.
buf.write_4bytes(payload->nn_payload); // Size of NALU.
buf.write_bytes(payload->payload, payload->nn_payload); // NALU.
if (srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
audio_nack_->insert(nack_first, nack_last);
audio_nack_->check_queue_size();
}
SrsMessageHeader header;
header.message_type = RTMP_MSG_VideoMessage;
// TODO: FIXME: Maybe the tbn is not 90k.
header.timestamp = (timestamp / 90) & 0x3fffffff;
SrsCommonMessage* shared_video = new SrsCommonMessage();
SrsAutoFree(SrsCommonMessage, shared_video);
// TODO: FIXME: Check error.
shared_video->create(&header, data, nn_payload);
return source->on_video(shared_video);
return err;
}
srs_error_t SrsRtcPublisher::on_rtcp(char* data, int nb_data)
@ -2250,7 +2119,7 @@ void SrsRtcPublisher::request_keyframe()
int pcid = session_->context_id();
srs_trace("RTC play=[%d][%d] request keyframe from publish=[%d][%d]", ::getpid(), scid, ::getpid(), pcid);
video_queue_->request_keyframe();
request_keyframe_ = true;
}
srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick)

@ -53,13 +53,11 @@ class SrsSharedPtrMessage;
class SrsRtcSource;
class SrsRtpPacket2;
class ISrsUdpSender;
class SrsRtpQueue;
class SrsRtpAudioQueue;
class SrsRtpVideoQueue;
class SrsRtpPacket2;
class ISrsCodec;
class SrsRtpNackForReceiver;
class SrsRtpIncommingVideoFrame;
class SrsRtpRingBuffer;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -208,8 +206,8 @@ private:
uint16_t video_payload_type;
uint32_t video_ssrc;
// NACK ARQ ring buffer.
SrsRtpRingBuffer<SrsRtpPacket2*>* audio_queue_;
SrsRtpRingBuffer<SrsRtpPacket2*>* video_queue_;
SrsRtpRingBuffer* audio_queue_;
SrsRtpRingBuffer* video_queue_;
// Simulators.
int nn_simulate_nack_drop;
private:
@ -271,9 +269,10 @@ private:
uint32_t video_ssrc;
uint32_t audio_ssrc;
private:
SrsRtpVideoQueue* video_queue_;
bool request_keyframe_;
SrsRtpRingBuffer* video_queue_;
SrsRtpNackForReceiver* video_nack_;
SrsRtpAudioQueue* audio_queue_;
SrsRtpRingBuffer* audio_queue_;
SrsRtpNackForReceiver* audio_nack_;
private:
SrsRequest* req;
@ -292,7 +291,7 @@ public:
srs_error_t initialize(uint32_t vssrc, uint32_t assrc, SrsRequest* req);
private:
void check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc);
srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue);
srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue);
srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc);
srs_error_t send_rtcp_fb_pli(uint32_t ssrc);
public:
@ -301,7 +300,7 @@ public:
private:
srs_error_t on_audio(SrsRtpPacket2* pkt);
srs_error_t on_video(SrsRtpPacket2* pkt);
srs_error_t on_video_frame(SrsRtpPacket2* frame);
srs_error_t on_nack(SrsRtpPacket2* pkt);
public:
srs_error_t on_rtcp(char* data, int nb_data);
private:

@ -41,13 +41,13 @@ SrsRtpNackInfo::SrsRtpNackInfo()
req_nack_count_ = 0;
}
SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpQueue* rtp_queue, size_t queue_size)
SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue_size)
{
max_queue_size_ = queue_size;
rtp_queue_ = rtp_queue;
rtp_ = rtp;
pre_check_time_ = 0;
srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%ld, nack_interval=%ld"
srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%" PRId64 ", nack_interval=%" PRId64,
max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval);
}
@ -55,10 +55,11 @@ SrsRtpNackForReceiver::~SrsRtpNackForReceiver()
{
}
void SrsRtpNackForReceiver::insert(uint16_t seq)
void SrsRtpNackForReceiver::insert(uint16_t first, uint16_t last)
{
// FIXME: full, drop packet, and request key frame.
queue_[seq] = SrsRtpNackInfo();
for (uint16_t s = first; s != last; ++s) {
queue_[s] = SrsRtpNackInfo();
}
}
void SrsRtpNackForReceiver::remove(uint16_t seq)
@ -80,7 +81,7 @@ SrsRtpNackInfo* SrsRtpNackForReceiver::find(uint16_t seq)
void SrsRtpNackForReceiver::check_queue_size()
{
if (queue_.size() >= max_queue_size_) {
rtp_queue_->notify_nack_list_full();
rtp_->notify_nack_list_full();
}
}
@ -100,7 +101,7 @@ void SrsRtpNackForReceiver::get_nack_seqs(vector<uint16_t>& seqs)
int alive_time = now - nack_info.generate_time_;
if (alive_time > opts_.max_alive_time || nack_info.req_nack_count_ > opts_.max_count) {
rtp_queue_->notify_drop_seq(seq);
rtp_->notify_drop_seq(seq);
queue_.erase(iter++);
continue;
}
@ -127,550 +128,110 @@ void SrsRtpNackForReceiver::update_rtt(int rtt)
opts_.nack_interval = rtt_;
}
SrsRtpQueue::SrsRtpQueue()
SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity)
{
jitter_ = 0;
last_trans_time_ = -1;
pre_number_of_packet_received_ = 0;
pre_number_of_packet_lossed_ = 0;
nn_seq_flip_backs = 0;
begin = end = 0;
capacity_ = (uint16_t)capacity;
initialized_ = false;
num_of_packet_received_ = 0;
number_of_packet_lossed_ = 0;
queue_ = new SrsRtpPacket2*[capacity_];
memset(queue_, 0, sizeof(SrsRtpPacket2*) * capacity);
}
SrsRtpQueue::~SrsRtpQueue()
SrsRtpRingBuffer::~SrsRtpRingBuffer()
{
srs_freepa(queue_);
}
uint8_t SrsRtpQueue::get_fraction_lost()
bool SrsRtpRingBuffer::empty()
{
int64_t total = (number_of_packet_lossed_ - pre_number_of_packet_lossed_ + num_of_packet_received_ - pre_number_of_packet_received_);
uint8_t loss = 0;
if (total > 0) {
loss = (number_of_packet_lossed_ - pre_number_of_packet_lossed_) * 256 / total;
}
pre_number_of_packet_lossed_ = number_of_packet_lossed_;
pre_number_of_packet_received_ = num_of_packet_received_;
return loss;
return begin == end;
}
uint32_t SrsRtpQueue::get_cumulative_number_of_packets_lost()
int SrsRtpRingBuffer::size()
{
return number_of_packet_lossed_;
int size = srs_rtp_seq_distance(begin, end);
srs_assert(size >= 0);
return size;
}
uint32_t SrsRtpQueue::get_interarrival_jitter()
void SrsRtpRingBuffer::advance_to(uint16_t seq)
{
return static_cast<uint32_t>(jitter_);
}
srs_error_t SrsRtpQueue::on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
// TODO: FIXME: Update time for each packet, may hurt performance.
srs_utime_t now = srs_update_system_time();
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = NULL;
if (nack) {
nack_info = nack->find(seq);
}
if (nack_info) {
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
(void)nack_rtt;
nack->remove(seq);
}
// Calc jitter time, ignore nack packets.
// TODO: FIXME: Covert time to srs_utime_t.
if (last_trans_time_ == -1) {
last_trans_time_ = now / 1000 - pkt->rtp_header.get_timestamp() / 90;
} else if (!nack_info) {
int trans_time = now / 1000 - pkt->rtp_header.get_timestamp() / 90;
int cur_jitter = trans_time - last_trans_time_;
if (cur_jitter < 0) {
cur_jitter = -cur_jitter;
}
last_trans_time_ = trans_time;
jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast<double>(cur_jitter) / 16.0);
}
// OK, got new RTP packet.
if (!nack_info) {
++num_of_packet_received_;
}
return err;
begin = seq;
}
void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last)
void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt)
{
if (!nack) {
return;
}
SrsRtpPacket2* p = queue_[at % capacity_];
for (uint16_t s = first; s != last; ++s) {
nack->insert(s);
++number_of_packet_lossed_;
if (p) {
srs_freep(p);
}
nack->check_queue_size();
}
SrsRtpAudioPacket::SrsRtpAudioPacket()
{
pkt = NULL;
}
SrsRtpAudioPacket::~SrsRtpAudioPacket()
{
srs_freep(pkt);
}
SrsRtpPacket2* SrsRtpAudioPacket::detach()
{
SrsRtpPacket2* p = pkt;
pkt = NULL;
return p;
queue_[at % capacity_] = pkt;
}
SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity)
void SrsRtpRingBuffer::remove(uint16_t at)
{
queue_ = new SrsRtpRingBuffer<SrsRtpAudioPacket*>(capacity);
set(at, NULL);
}
SrsRtpAudioQueue::~SrsRtpAudioQueue()
bool SrsRtpRingBuffer::overflow()
{
srs_freep(queue_);
return srs_rtp_seq_distance(begin, end) >= capacity_;
}
void SrsRtpAudioQueue::notify_drop_seq(uint16_t seq)
uint32_t SrsRtpRingBuffer::get_extended_highest_sequence()
{
uint16_t next = seq + 1;
if (srs_rtp_seq_distance(queue_->end, seq) > 0) {
seq = queue_->end;
}
srs_trace("nack drop seq=%u, drop range [%u, %u, %u]", seq, queue_->begin, next, queue_->end);
queue_->advance_to(next);
return nn_seq_flip_backs * 65536 + end - 1;
}
void SrsRtpAudioQueue::notify_nack_list_full()
bool SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last)
{
// TODO: FIXME: Maybe we should not drop all packets.
queue_->advance_to(queue_->end);
}
uint32_t SrsRtpAudioQueue::get_extended_highest_sequence()
{
return queue_->get_extended_highest_sequence();
}
srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = NULL;
if (nack) {
nack_info = nack->find(seq);
}
if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) {
return srs_error_wrap(err, "consume audio");
}
// OK, we got one new RTP packet, which is not in NACK.
if (!nack_info) {
uint16_t nack_first = 0, nack_last = 0;
if (!queue_->update(seq, nack_first, nack_last)) {
srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end);
}
if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last);
}
}
// Save packet at the position seq.
SrsRtpAudioPacket* apkt = new SrsRtpAudioPacket();
apkt->pkt = pkt;
queue_->set(seq, apkt);
return err;
}
void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frames)
{
// When done, next point to the next available packet.
uint16_t next = queue_->begin;
// If nack disabled, we ignore any empty packet.
if (!nack) {
for (; next != queue_->end; ++next) {
SrsRtpAudioPacket* pkt = queue_->at(next);
if (pkt) {
frames.push_back(pkt->detach());
}
}
} else {
for (; next != queue_->end; ++next) {
SrsRtpAudioPacket* pkt = queue_->at(next);
// TODO: FIXME: Should not wait for NACK packets.
// Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) {
srs_trace("wait for nack seq=%u", next);
break;
}
frames.push_back(pkt->detach());
}
}
// Reap packets from begin to next.
if (next != queue_->begin) {
srs_verbose("RTC collect audio [%u, %u, %u]", queue_->begin, next, queue_->end);
queue_->advance_to(next);
}
// For audio, if overflow, clear all packets.
// TODO: FIXME: Should notify nack?
if (queue_->overflow()) {
queue_->advance_to(queue_->end);
}
}
SrsRtpVideoPacket::SrsRtpVideoPacket()
{
video_is_first_packet = false;
video_is_last_packet = false;
video_is_idr = false;
pkt = NULL;
}
SrsRtpVideoPacket::~SrsRtpVideoPacket()
{
srs_freep(pkt);
}
SrsRtpPacket2* SrsRtpVideoPacket::detach()
{
SrsRtpPacket2* p = pkt;
pkt = NULL;
return p;
}
SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity)
{
request_key_frame_ = false;
queue_ = new SrsRtpRingBuffer<SrsRtpVideoPacket*>(capacity);
}
SrsRtpVideoQueue::~SrsRtpVideoQueue()
{
srs_freep(queue_);
}
void SrsRtpVideoQueue::notify_drop_seq(uint16_t seq)
{
// If not found start frame, return the end, and we will clear queue.
uint16_t next = next_start_of_frame(seq);
srs_trace("nack drop seq=%u, drop range [%u, %u, %u]", seq, queue_->begin, next, queue_->end);
queue_->advance_to(next);
}
void SrsRtpVideoQueue::notify_nack_list_full()
{
// If not found start frame, return the end, and we will clear queue.
uint16_t next = next_keyframe();
srs_trace("nack overflow, drop range [%u, %u, %u]", queue_->begin, next, queue_->end);
queue_->advance_to(next);
}
uint32_t SrsRtpVideoQueue::get_extended_highest_sequence()
{
return queue_->get_extended_highest_sequence();
}
srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
SrsRtpVideoPacket* vpkt = new SrsRtpVideoPacket();
vpkt->pkt = pkt;
uint8_t v = (uint8_t)pkt->nalu_type;
if (v == kFuA) {
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
if (!payload) {
srs_freep(pkt); srs_freep(vpkt);
return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload");
}
vpkt->video_is_first_packet = payload->start;
vpkt->video_is_last_packet = payload->end;
vpkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR);
} else {
vpkt->video_is_first_packet = true;
vpkt->video_is_last_packet = true;
if (v == kStapA) {
vpkt->video_is_idr = true;
} else {
vpkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR);
}
}
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = NULL;
if (nack) {
nack_info = nack->find(seq);
}
if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) {
srs_freep(pkt); srs_freep(vpkt);
return srs_error_wrap(err, "consume video");
}
// OK, we got one new RTP packet, which is not in NACK.
if (!nack_info) {
uint16_t nack_first = 0, nack_last = 0;
if (!queue_->update(seq, nack_first, nack_last)) {
srs_warn("too old seq %u, range [%u, %u]", seq, queue_->begin, queue_->end);
}
if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_first, nack_last);
}
}
// Save packet at the position seq.
queue_->set(seq, vpkt);
return err;
}
void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames)
{
while (true) {
SrsRtpPacket2* pkt = NULL;
collect_frame(nack, &pkt);
if (!pkt) {
break;
}
frames.push_back(pkt);
}
if (queue_->overflow()) {
on_overflow(nack);
}
}
bool SrsRtpVideoQueue::should_request_key_frame()
{
if (request_key_frame_) {
request_key_frame_ = false;
if (!initialized_) {
initialized_ = true;
begin = seq;
end = seq + 1;
return true;
}
return request_key_frame_;
}
void SrsRtpVideoQueue::request_keyframe()
{
request_key_frame_ = true;
}
void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack)
{
// If not found start frame, return the end, and we will clear queue.
uint16_t next = next_start_of_frame(queue_->begin);
srs_trace("on overflow, remove range [%u, %u, %u]", queue_->begin, next, queue_->end);
for (uint16_t s = queue_->begin; s != next; ++s) {
if (nack) {
nack->remove(s);
}
queue_->remove(s);
}
queue_->advance_to(next);
}
// Normal sequence, seq follows high_.
if (srs_rtp_seq_distance(end, seq) >= 0) {
nack_first = end;
nack_last = seq;
// TODO: FIXME: Should refer to the FU-A original video frame, to avoid finding for each packet.
void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt)
{
bool found = false;
vector<SrsRtpVideoPacket*> frame;
// When done, next point to the next available packet.
uint16_t next = queue_->begin;
// If nack disabled, we ignore any empty packet.
if (!nack) {
for (; next != queue_->end; ++next) {
SrsRtpVideoPacket* vpkt = queue_->at(next);
if (!vpkt) {
continue;
}
if (frame.empty() && !vpkt->video_is_first_packet) {
continue;
}
frame.push_back(vpkt);
if (vpkt->pkt->rtp_header.get_marker() || vpkt->video_is_last_packet) {
found = true;
next++;
break;
}
// When distance(seq,high_)>0 and seq<high_, seq must flip back,
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
// TODO: FIXME: The first flip may be dropped.
if (seq < end) {
++nn_seq_flip_backs;
}
} else {
for (; next != queue_->end; ++next) {
SrsRtpVideoPacket* vpkt = queue_->at(next);
// TODO: FIXME: Should not wait for NACK packets.
// Not found or in NACK, stop collecting frame.
if (!vpkt || nack->find(next) != NULL) {
srs_trace("wait for nack seq=%u", next);
return;
}
// Ignore when the first packet not the start.
if (frame.empty() && !vpkt->video_is_first_packet) {
return;
}
// OK, collect packet to frame.
frame.push_back(vpkt);
// Done, we got the last packet of frame.
// @remark Note that the STAP-A is marker false and it's the last packet.
if (vpkt->pkt->rtp_header.get_marker() || vpkt->video_is_last_packet) {
found = true;
next++;
break;
}
}
}
if (!found || frame.empty()) {
return;
end = seq + 1;
return true;
}
if (next != queue_->begin) {
srs_verbose("RTC collect video [%u, %u, %u]", queue_->begin, next, queue_->end);
queue_->advance_to(next);
// Out-of-order sequence, seq before low_.
if (srs_rtp_seq_distance(seq, begin) > 0) {
// When startup, we may receive packets in chaos order.
// Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first packet client sent.
// @remark We only log a warning, because it seems ok for publisher.
return false;
}
// Merge packets to one packet.
covert_frame(frame, ppkt);
return;
return true;
}
void SrsRtpVideoQueue::covert_frame(std::vector<SrsRtpVideoPacket*>& frame, SrsRtpPacket2** ppkt)
{
if (frame.size() == 1) {
*ppkt = frame[0]->detach();
return;
}
// If more than one packet in a frame, it must be FU-A.
SrsRtpPacket2* head = frame.at(0)->pkt;
SrsAvcNaluType nalu_type = head->nalu_type;
// Covert FU-A to one RAW RTP packet.
int nn_nalus = 0;
for (size_t i = 0; i < frame.size(); ++i) {
SrsRtpVideoPacket* vpkt = frame[i];
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(vpkt->pkt->payload);
if (!payload) {
nn_nalus = 0; break;
}
nn_nalus += payload->size;
}
// Invalid packets, ignore.
if (nalu_type != (SrsAvcNaluType)kFuA || !nn_nalus) {
return;
}
// Merge to one RAW RTP packet.
// TODO: FIXME: Should covert to multiple NALU RTP packet to avoid copying.
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkt->rtp_header = head->rtp_header;
SrsRtpFUAPayload2* head_payload = dynamic_cast<SrsRtpFUAPayload2*>(head->payload);
pkt->nalu_type = head_payload->nalu_type;
SrsRtpRawPayload* payload = new SrsRtpRawPayload();
pkt->payload = payload;
payload->nn_payload = nn_nalus + 1;
payload->payload = new char[payload->nn_payload];
SrsBuffer buf(payload->payload, payload->nn_payload);
buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header.
for (size_t i = 0; i < frame.size(); ++i) {
SrsRtpVideoPacket* vpkt = frame[i];
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(vpkt->pkt->payload);
buf.write_bytes(payload->payload, payload->size);
}
*ppkt = pkt;
SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq) {
return queue_[seq % capacity_];
}
uint16_t SrsRtpVideoQueue::next_start_of_frame(uint16_t seq)
void SrsRtpRingBuffer::notify_nack_list_full()
{
uint16_t s = seq;
if (srs_rtp_seq_distance(seq, queue_->begin) >= 0) {
s = queue_->begin + 1;
}
for (; s != queue_->end; ++s) {
SrsRtpVideoPacket* vpkt = queue_->at(s);
if (vpkt && vpkt->video_is_first_packet) {
return s;
}
}
return queue_->end;
}
uint16_t SrsRtpVideoQueue::next_keyframe()
void SrsRtpRingBuffer::notify_drop_seq(uint16_t seq)
{
uint16_t s = queue_->begin + 1;
for (; s != queue_->end; ++s) {
SrsRtpVideoPacket* vpkt = queue_->at(s);
if (vpkt && vpkt->video_is_idr && vpkt->video_is_first_packet) {
return s;
}
}
return queue_->end;
}

@ -32,6 +32,7 @@
class SrsRtpPacket2;
class SrsRtpQueue;
class SrsRtpRingBuffer;
struct SrsNackOption
{
@ -84,17 +85,17 @@ private:
std::map<uint16_t, SrsRtpNackInfo, SeqComp> queue_;
// Max nack count.
size_t max_queue_size_;
SrsRtpQueue* rtp_queue_;
SrsRtpRingBuffer* rtp_;
SrsNackOption opts_;
private:
srs_utime_t pre_check_time_;
private:
int rtt_;
public:
SrsRtpNackForReceiver(SrsRtpQueue* rtp_queue, size_t queue_size);
SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue_size);
virtual ~SrsRtpNackForReceiver();
public:
void insert(uint16_t seq);
void insert(uint16_t first, uint16_t last);
void remove(uint16_t seq);
SrsRtpNackInfo* find(uint16_t seq);
void check_queue_size();
@ -113,14 +114,13 @@ public:
// but not an entire video frame right now.
// * seq10: This packet is lost or not received, we put it in the nack list.
// We store the received packets in ring buffer.
template<typename T>
class SrsRtpRingBuffer
{
private:
// Capacity of the ring-buffer.
uint16_t capacity_;
// Ring bufer.
T* queue_;
SrsRtpPacket2** queue_;
// Increase one when uint16 flip back, for get_extended_highest_sequence.
uint64_t nn_seq_flip_backs;
// Whether initialized, because we use uint16 so we can't use -1.
@ -133,186 +133,31 @@ public:
// For example, when got 1 elems, the end is 1.
uint16_t end;
public:
SrsRtpRingBuffer(int capacity) {
nn_seq_flip_backs = 0;
begin = end = 0;
capacity_ = (uint16_t)capacity;
initialized_ = false;
queue_ = new T[capacity_];
memset(queue_, 0, sizeof(T) * capacity);
}
virtual ~SrsRtpRingBuffer() {
srs_freepa(queue_);
}
SrsRtpRingBuffer(int capacity);
virtual ~SrsRtpRingBuffer();
public:
// Whether the ring buffer is empty.
bool empty() {
return begin == end;
}
bool empty();
// Get the count of elems in ring buffer.
int size() {
int size = srs_rtp_seq_distance(begin, end);
srs_assert(size >= 0);
return size;
}
int size();
// Move the low position of buffer to seq.
void advance_to(uint16_t seq) {
begin = seq;
}
void advance_to(uint16_t seq);
// Free the packet at position.
void set(uint16_t at, T pkt) {
T p = queue_[at % capacity_];
if (p) {
srs_freep(p);
}
queue_[at % capacity_] = pkt;
}
void remove(uint16_t at) {
set(at, NULL);
}
void set(uint16_t at, SrsRtpPacket2* pkt);
void remove(uint16_t at);
// Whether queue overflow or heavy(too many packets and need clear).
bool overflow() {
return srs_rtp_seq_distance(begin, end) >= capacity_;
}
bool overflow();
// The highest sequence number, calculate the flip back base.
uint32_t get_extended_highest_sequence() {
return nn_seq_flip_backs * 65536 + end - 1;
}
uint32_t get_extended_highest_sequence();
// Update the sequence, got the nack range by [first, last).
// @return If false, the seq is too old.
bool update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last) {
if (!initialized_) {
initialized_ = true;
begin = seq;
end = seq + 1;
return true;
}
// Normal sequence, seq follows high_.
if (srs_rtp_seq_distance(end, seq) >= 0) {
nack_first = end;
nack_last = seq;
// When distance(seq,high_)>0 and seq<high_, seq must flip back,
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
// TODO: FIXME: The first flip may be dropped.
if (seq < end) {
++nn_seq_flip_backs;
}
end = seq + 1;
return true;
}
// Out-of-order sequence, seq before low_.
if (srs_rtp_seq_distance(seq, begin) > 0) {
// When startup, we may receive packets in chaos order.
// Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first packet client sent.
// @remark We only log a warning, because it seems ok for publisher.
return false;
}
return true;
}
bool update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last);
// Get the packet by seq.
T at(uint16_t seq) {
return queue_[seq % capacity_];
}
};
class SrsRtpQueue
{
private:
double jitter_;
// TODO: FIXME: Covert time to srs_utime_t.
int64_t last_trans_time_;
uint64_t pre_number_of_packet_received_;
uint64_t pre_number_of_packet_lossed_;
uint64_t num_of_packet_received_;
uint64_t number_of_packet_lossed_;
public:
SrsRtpQueue();
virtual ~SrsRtpQueue();
SrsRtpPacket2* at(uint16_t seq);
public:
virtual void notify_drop_seq(uint16_t seq) = 0;
virtual void notify_nack_list_full() = 0;
virtual uint32_t get_extended_highest_sequence() = 0;
uint8_t get_fraction_lost();
uint32_t get_cumulative_number_of_packets_lost();
uint32_t get_interarrival_jitter();
protected:
srs_error_t on_consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last);
};
class SrsRtpAudioPacket
{
public:
SrsRtpPacket2* pkt;
public:
SrsRtpAudioPacket();
virtual ~SrsRtpAudioPacket();
public:
SrsRtpPacket2* detach();
};
class SrsRtpAudioQueue : public SrsRtpQueue
{
private:
SrsRtpRingBuffer<SrsRtpAudioPacket*>* queue_;
public:
SrsRtpAudioQueue(int capacity);
virtual ~SrsRtpAudioQueue();
public:
virtual void notify_drop_seq(uint16_t seq);
virtual void notify_nack_list_full();
virtual uint32_t get_extended_highest_sequence();
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
public:
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames);
};
class SrsRtpVideoPacket
{
public:
// Helper information for video decoder only.
bool video_is_first_packet;
bool video_is_last_packet;
bool video_is_idr;
public:
SrsRtpPacket2* pkt;
public:
SrsRtpVideoPacket();
virtual ~SrsRtpVideoPacket();
public:
SrsRtpPacket2* detach();
};
class SrsRtpVideoQueue : public SrsRtpQueue
{
private:
bool request_key_frame_;
SrsRtpRingBuffer<SrsRtpVideoPacket*>* queue_;
public:
SrsRtpVideoQueue(int capacity);
virtual ~SrsRtpVideoQueue();
public:
virtual void notify_drop_seq(uint16_t seq);
virtual void notify_nack_list_full();
virtual uint32_t get_extended_highest_sequence();
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frame);
bool should_request_key_frame();
void request_keyframe();
private:
virtual void on_overflow(SrsRtpNackForReceiver* nack);
virtual void collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt);
virtual void covert_frame(std::vector<SrsRtpVideoPacket*>& frame, SrsRtpPacket2** ppkt);
uint16_t next_start_of_frame(uint16_t seq);
uint16_t next_keyframe();
// TODO: FIXME: Move it?
void notify_nack_list_full();
void notify_drop_seq(uint16_t seq);
};
#endif

Loading…
Cancel
Save