rtc publish release

pull/1753/head
xiaozhihong 5 years ago
parent 775065175a
commit 8dc0746e2d

@ -1290,9 +1290,7 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::
local_sdp.group_policy_ = "BUNDLE";
int mid = 0;
for (int i = 0; i < remote_sdp.media_descs_.size(); ++i) {
for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) {
const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i];
if (remote_media_desc.is_audio()) {

@ -238,16 +238,17 @@ srs_error_t SrsRtpH264Demuxer::parse(SrsRtpSharedPacket* rtp_pkt)
}
uint8_t nal_type = rtp_payload[0] & kNalTypeMask;
if (nal_type == SrsAvcNaluTypeIDR) {
rtp_h264_header->is_key_frame = true;
}
if (nal_type >= 1 && nal_type <= 23) {
srs_verbose("seq=%u, single nalu", rtp_pkt->rtp_header.get_sequence());
rtp_h264_header->is_first_packet_of_frame = true;
rtp_h264_header->is_last_packet_of_frame = true;
rtp_h264_header->nalu_type = nal_type;
rtp_h264_header->nalu_header = rtp_payload[0];
rtp_h264_header->nalu_offset.push_back(make_pair(0, rtp_payload_size));
} else if (nal_type == kFuA) {
srs_verbose("seq=%u, fu-a", rtp_pkt->rtp_header.get_sequence());
if ((rtp_payload[1] & kStart)) {
rtp_h264_header->is_first_packet_of_frame = true;
}
@ -258,13 +259,11 @@ srs_error_t SrsRtpH264Demuxer::parse(SrsRtpSharedPacket* rtp_pkt)
rtp_h264_header->nalu_header = (rtp_payload[0] & (~kNalTypeMask)) | (rtp_payload[1] & kNalTypeMask);
rtp_h264_header->nalu_offset.push_back(make_pair(2, rtp_payload_size - 2));
} else if (nal_type == kStapA) {
srs_verbose("seq=%u, stap-a", rtp_pkt->rtp_header.get_sequence());
int i = 1;
rtp_h264_header->is_first_packet_of_frame = true;
rtp_h264_header->is_last_packet_of_frame = true;
rtp_h264_header->nalu_type = nal_type;
while (i < rtp_payload_size) {
srs_verbose("stap-a cur index=%s", srs_string_dumps_hex(reinterpret_cast<const char*>(rtp_payload + i), 2).c_str());
uint16_t nal_len = (rtp_payload[i]) << 8 | rtp_payload[i + 1];
if (nal_len > rtp_payload_size - i) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid stap-a packet, nal len=%u, i=%d, rtp_payload_size=%d", nal_len, i, rtp_payload_size);
@ -286,6 +285,30 @@ srs_error_t SrsRtpH264Demuxer::parse(SrsRtpSharedPacket* rtp_pkt)
return err;
}
SrsRtpOpusDemuxer::SrsRtpOpusDemuxer()
{
}
SrsRtpOpusDemuxer::~SrsRtpOpusDemuxer()
{
}
srs_error_t SrsRtpOpusDemuxer::parse(SrsRtpSharedPacket* rtp_pkt)
{
srs_error_t err = srs_success;
SrsRtpOpusHeader* rtp_opus_header = dynamic_cast<SrsRtpOpusHeader*>(rtp_pkt->rtp_payload_header);
if (rtp_opus_header == NULL) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid rtp packet");
}
rtp_opus_header->is_first_packet_of_frame = true;
rtp_opus_header->is_last_packet_of_frame = true;
rtp_opus_header->is_key_frame = true;
return err;
}
SrsRtc::SrsRtc()
{
req = NULL;

@ -87,6 +87,15 @@ public:
srs_error_t parse(SrsRtpSharedPacket* rtp_pkt);
};
class SrsRtpOpusDemuxer
{
public:
SrsRtpOpusDemuxer();
virtual ~SrsRtpOpusDemuxer();
public:
srs_error_t parse(SrsRtpSharedPacket* rtp_pkt);
};
class SrsRtc
{
private:

@ -1495,6 +1495,7 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
rtc_session = session;
rtp_h264_demuxer = new SrsRtpH264Demuxer();
rtp_opus_demuxer = new SrsRtpOpusDemuxer();
rtp_video_queue = new SrsRtpQueue(1000);
rtp_audio_queue = new SrsRtpQueue(100, true);
@ -1505,6 +1506,7 @@ SrsRtcPublisher::~SrsRtcPublisher()
{
srs_freep(report_timer);
srs_freep(rtp_h264_demuxer);
srs_freep(rtp_opus_demuxer);
srs_freep(rtp_video_queue);
srs_freep(rtp_audio_queue);
}
@ -1718,10 +1720,11 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket*
*/
SrsBuffer stream(buf, nb_buf);
uint8_t first = stream.read_1bytes();
/*uint8_t first = */stream.read_1bytes();
uint8_t pt = stream.read_1bytes();
srs_assert(pt == kXR);
uint16_t length = (stream.read_2bytes() + 1) * 4;
uint32_t ssrc = stream.read_4bytes();
/*uint32_t ssrc = */stream.read_4bytes();
if (length != nb_buf) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet, length=%u, nb_buf=%d", length, nb_buf);
@ -1865,18 +1868,46 @@ srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(SrsUdpMuxSocket* skt, uint32_t ss
return err;
}
srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssrc)
{
srs_error_t err = srs_success;
char buf[kRtpPacketSize];
SrsBuffer stream(buf, sizeof(buf));
stream.write_1bytes(0x81);
stream.write_1bytes(kPsFb);
stream.write_2bytes(2);
stream.write_4bytes(ssrc);
stream.write_4bytes(ssrc);
srs_verbose("PLI ssrc=%u", ssrc);
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if ((err = rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp psfb pli");
}
skt->sendto(protected_buf, nb_protected_buf, 0);
return err;
}
srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt)
{
srs_error_t err = srs_success;
rtp_pkt->rtp_payload_header = new SrsRtpOpusHeader();
rtp_pkt->rtp_payload_header->is_first_packet_of_frame = true;
rtp_pkt->rtp_payload_header->is_last_packet_of_frame = true;
if ((err = rtp_opus_demuxer->parse(rtp_pkt)) != srs_success) {
return srs_error_wrap(err, "rtp opus demux failed");
}
rtp_audio_queue->insert(rtp_pkt);
if (rtp_audio_queue->get_and_clean_if_needed_rqeuest_key_frame()) {
send_rtcp_fb_pli(skt, audio_ssrc);
}
check_send_nacks(rtp_audio_queue, audio_ssrc, skt);
return collect_audio_frame();
@ -1915,6 +1946,10 @@ srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket*
rtp_video_queue->insert(rtp_pkt);
if (rtp_video_queue->get_and_clean_if_needed_rqeuest_key_frame()) {
send_rtcp_fb_pli(skt, video_ssrc);
}
check_send_nacks(rtp_video_queue, video_ssrc, skt);
return collect_video_frame();
@ -2375,8 +2410,6 @@ srs_error_t SrsRtcSession::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* sk
srs_error_t SrsRtcSession::on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt)
{
srs_error_t err = srs_success;
if (rtc_publisher == NULL) {
return srs_error_new(ERROR_RTC_RTCP, "rtc publisher null");
}
@ -2517,19 +2550,15 @@ srs_error_t SrsRtcSession::start_publish(SrsUdpMuxSocket* skt)
uint32_t video_ssrc = 0;
uint32_t audio_ssrc = 0;
uint16_t video_payload_type = 0;
uint16_t audio_payload_type = 0;
for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) {
const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i];
if (media_desc.is_audio()) {
if (! media_desc.ssrc_infos_.empty()) {
audio_ssrc = media_desc.ssrc_infos_[0].ssrc_;
audio_payload_type = media_desc.payload_types_[0].payload_type_;
}
} else if (media_desc.is_video()) {
if (! media_desc.ssrc_infos_.empty()) {
video_ssrc = media_desc.ssrc_infos_[0].ssrc_;
video_payload_type = media_desc.payload_types_[0].payload_type_;
}
}
}

@ -53,6 +53,7 @@ class SrsRtpPacket2;
class ISrsUdpSender;
class SrsRtpQueue;
class SrsRtpH264Demuxer;
class SrsRtpOpusDemuxer;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -264,6 +265,7 @@ private:
uint32_t audio_ssrc;
private:
SrsRtpH264Demuxer* rtp_h264_demuxer;
SrsRtpOpusDemuxer* rtp_opus_demuxer;
SrsRtpQueue* rtp_video_queue;
SrsRtpQueue* rtp_audio_queue;
private:
@ -286,6 +288,7 @@ private:
void check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, SrsUdpMuxSocket* skt);
srs_error_t send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, SrsRtpQueue* rtp_queue);
srs_error_t send_rtcp_xr_rrtr(SrsUdpMuxSocket* skt, uint32_t ssrc);
srs_error_t send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssrc);
private:
srs_error_t on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt);
srs_error_t on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt);

@ -41,13 +41,14 @@ SrsRtpNackInfo::SrsRtpNackInfo()
req_nack_count_ = 0;
}
SrsRtpNackList::SrsRtpNackList(SrsRtpQueue* rtp_queue)
SrsRtpNackList::SrsRtpNackList(SrsRtpQueue* rtp_queue, size_t queue_size)
{
max_queue_size_ = queue_size;
rtp_queue_ = rtp_queue;
pre_check_time_ = 0;
srs_info("nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%ld, nack_interval=%ld"
opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval);
srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%ld, nack_interval=%ld"
max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval);
}
SrsRtpNackList::~SrsRtpNackList()
@ -58,6 +59,7 @@ void SrsRtpNackList::insert(uint16_t seq)
{
// FIXME: full, drop packet, and request key frame.
SrsRtpNackInfo& nack_info = queue_[seq];
(void)nack_info;
}
void SrsRtpNackList::remove(uint16_t seq)
@ -76,12 +78,11 @@ SrsRtpNackInfo* SrsRtpNackList::find(uint16_t seq)
return &(iter->second);
}
void SrsRtpNackList::dump()
void SrsRtpNackList::check_queue_size()
{
return;
srs_verbose("@debug, queue size=%u", queue_.size());
for (std::map<uint16_t, SrsRtpNackInfo>::iterator iter = queue_.begin(); iter != queue_.end(); ++iter) {
srs_verbose("@debug, nack seq=%u", iter->first);
if (queue_.size() >= max_queue_size_) {
srs_verbose("NACK list full, queue size=%u, max_queue_size=%u", queue_.size(), max_queue_size_);
rtp_queue_->notify_nack_list_full();
}
}
@ -134,7 +135,7 @@ void SrsRtpNackList::update_rtt(int rtt)
}
SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
: nack_(this)
: nack_(this, capacity * 2 / 3)
{
capacity_ = capacity;
head_sequence_ = 0;
@ -155,6 +156,8 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
number_of_packet_lossed_ = 0;
one_packet_per_frame_ = one_packet_per_frame;
request_key_frame_ = false;
}
SrsRtpQueue::~SrsRtpQueue()
@ -214,7 +217,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
highest_sequence_ = seq;
} else {
// Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first paacet client sented.
// we received maybe no the first packet client sented.
if (! start_collected_) {
if (seq_cmp(seq, head_sequence_)) {
srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", head_sequence_, seq);
@ -228,9 +231,6 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
}
}
int delay = highest_sequence_ - head_sequence_ + 1;
srs_verbose("seqs range=[%u-%u], delay=%d", head_sequence_, highest_sequence_, delay);
// Check seqs out of range.
if (head_sequence_ + capacity_ < highest_sequence_) {
srs_verbose("try collect packet becuase seq out of range");
@ -261,12 +261,12 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
head_sequence_ = s;
}
SrsRtpSharedPacket* old_pkt = queue_[seq % capacity_];
SrsRtpSharedPacket*& old_pkt = queue_[seq % capacity_];
if (old_pkt) {
delete old_pkt;
}
queue_[seq % capacity_] = rtp_pkt->copy();
old_pkt = rtp_pkt->copy();
// Marker bit means the last packet of frame received.
if (rtp_pkt->rtp_header.get_marker() || (highest_sequence_ - head_sequence_ >= capacity_ / 2) || one_packet_per_frame_) {
@ -294,6 +294,15 @@ void SrsRtpQueue::get_and_clean_collected_frames(std::vector<std::vector<SrsRtpS
frames.swap(frames_);
}
bool SrsRtpQueue::get_and_clean_if_needed_rqeuest_key_frame()
{
if (request_key_frame_) {
request_key_frame_ = false;
}
return request_key_frame_;
}
void SrsRtpQueue::notify_drop_seq(uint16_t seq)
{
uint16_t s = seq + 1;
@ -308,6 +317,29 @@ void SrsRtpQueue::notify_drop_seq(uint16_t seq)
head_sequence_ = s;
}
void SrsRtpQueue::notify_nack_list_full()
{
bool found_key_frame = false;
while (head_sequence_ <= highest_sequence_) {
SrsRtpSharedPacket* pkt = queue_[head_sequence_ % capacity_];
if (pkt && pkt->rtp_payload_header->is_key_frame && pkt->rtp_payload_header->is_first_packet_of_frame) {
found_key_frame = true;
srs_verbose("found firsr packet of key frame, seq=%u", head_sequence_);
break;
}
nack_.remove(head_sequence_);
remove(head_sequence_);
++head_sequence_;
}
if (! found_key_frame) {
srs_verbose("no found first packet of key frame, request key frame");
request_key_frame_ = true;
head_sequence_ = highest_sequence_;
}
}
uint32_t SrsRtpQueue::get_extended_highest_sequence()
{
return cycle_ * 65536 + highest_sequence_;
@ -350,8 +382,7 @@ void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end)
++number_of_packet_lossed_;
}
// FIXME: Record key frame sequence.
// FIXME: When nack list too long, clear and send PLI.
nack_.check_queue_size();
}
void SrsRtpQueue::collect_packet()
@ -360,8 +391,6 @@ void SrsRtpQueue::collect_packet()
for (uint16_t s = head_sequence_; s != highest_sequence_; ++s) {
SrsRtpSharedPacket* pkt = queue_[s % capacity_];
nack_.dump();
if (nack_.find(s) != NULL) {
srs_verbose("seq=%u, found in nack list when collect frame", s);
break;

@ -79,6 +79,8 @@ class SrsRtpNackList
private:
// Nack queue, seq order, oldest to newest.
std::map<uint16_t, SrsRtpNackInfo, SeqComp> queue_;
// Max nack count.
size_t max_queue_size_;
SrsRtpQueue* rtp_queue_;
SrsNackOption opts_;
private:
@ -86,14 +88,13 @@ private:
private:
int rtt_;
public:
SrsRtpNackList(SrsRtpQueue* rtp_queue);
SrsRtpNackList(SrsRtpQueue* rtp_queue, size_t queue_size);
virtual ~SrsRtpNackList();
public:
void insert(uint16_t seq);
void remove(uint16_t seq);
SrsRtpNackInfo* find(uint16_t seq);
public:
void dump();
void check_queue_size();
public:
void get_nack_seqs(std::vector<uint16_t>& seqs);
public:
@ -109,7 +110,7 @@ private:
* \___(no received, in nack list)
*/
// Capacity of the ring-buffer.
size_t capacity_;
uint16_t capacity_;
// Thei highest sequence we have receive.
uint16_t highest_sequence_;
// The sequence waitting to read.
@ -132,6 +133,7 @@ public:
SrsRtpNackList nack_;
private:
std::vector<std::vector<SrsRtpSharedPacket*> > frames_;
bool request_key_frame_;
public:
SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false);
virtual ~SrsRtpQueue();
@ -140,7 +142,9 @@ public:
srs_error_t remove(uint16_t seq);
public:
void get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames);
bool get_and_clean_if_needed_rqeuest_key_frame();
void notify_drop_seq(uint16_t seq);
void notify_nack_list_full();
public:
uint32_t get_extended_highest_sequence();
uint8_t get_fraction_lost();

@ -225,7 +225,10 @@ srs_error_t SrsMediaPayloadType::encode(std::ostringstream& os)
}
if (! format_specific_param_.empty()) {
os << "a=fmtp:" << payload_type_ << " " << format_specific_param_ << kCRLF;
os << "a=fmtp:" << payload_type_ << " " << format_specific_param_
// FIXME:test code.
<< ";x-google-max-bitrate=6000;x-google-min-bitrate=5100;x-google-start-bitrate=5000"
<< kCRLF;
}
return err;

@ -97,7 +97,7 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* stream)
timestamp = stream->read_4bytes();
ssrc = stream->read_4bytes();
if (stream->size() < header_size()) {
if ((size_t)stream->size() < header_size()) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect");
}
@ -610,6 +610,7 @@ SrsRtpPayloadHeader::SrsRtpPayloadHeader()
{
is_first_packet_of_frame = false;
is_last_packet_of_frame = false;
is_key_frame = false;
}
SrsRtpPayloadHeader::~SrsRtpPayloadHeader()
@ -625,6 +626,8 @@ SrsRtpPayloadHeader& SrsRtpPayloadHeader::operator=(const SrsRtpPayloadHeader& r
{
is_first_packet_of_frame = rhs.is_first_packet_of_frame;
is_last_packet_of_frame = rhs.is_last_packet_of_frame;
return *this;
}
SrsRtpH264Header::SrsRtpH264Header() : SrsRtpPayloadHeader()

@ -231,6 +231,7 @@ class SrsRtpPayloadHeader
public:
bool is_first_packet_of_frame;
bool is_last_packet_of_frame;
bool is_key_frame;
public:
SrsRtpPayloadHeader();
virtual ~SrsRtpPayloadHeader();

Loading…
Cancel
Save