From 991672bf41e9e06fcf16d8e0b38070d4573ecf2a Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 15 Jul 2020 13:11:35 +0800 Subject: [PATCH] RTC: Refine stream/ssrc/sdp structure --- trunk/src/app/srs_app_rtc_api.cpp | 9 +- trunk/src/app/srs_app_rtc_conn.cpp | 1639 +++++++++++++++++-------- trunk/src/app/srs_app_rtc_conn.hpp | 87 +- trunk/src/app/srs_app_rtc_dtls.cpp | 3 + trunk/src/app/srs_app_rtc_sdp.cpp | 29 +- trunk/src/app/srs_app_rtc_sdp.hpp | 6 + trunk/src/app/srs_app_rtc_server.cpp | 47 +- trunk/src/app/srs_app_rtc_server.hpp | 7 + trunk/src/app/srs_app_rtc_source.cpp | 738 +++++++++++ trunk/src/app/srs_app_rtc_source.hpp | 271 +++- trunk/src/kernel/srs_kernel_error.hpp | 3 + 11 files changed, 2240 insertions(+), 599 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_api.cpp b/trunk/src/app/srs_app_rtc_api.cpp index 1760e4a55..c2ff32a96 100644 --- a/trunk/src/app/srs_app_rtc_api.cpp +++ b/trunk/src/app/srs_app_rtc_api.cpp @@ -167,10 +167,6 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe local_sdp.session_config_.dtls_role = _srs_config->get_rtc_dtls_role(request.vhost); local_sdp.session_config_.dtls_version = _srs_config->get_rtc_dtls_version(request.vhost); - if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) { - return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); - } - // Whether enabled. bool server_enabled = _srs_config->get_rtc_server_enabled(); bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost); @@ -521,14 +517,11 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt SrsSdp local_sdp; + // TODO: FIXME: move to create_session. // Config for SDP and session. local_sdp.session_config_.dtls_role = _srs_config->get_rtc_dtls_role(request.vhost); local_sdp.session_config_.dtls_version = _srs_config->get_rtc_dtls_version(request.vhost); - if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) { - return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); - } - // Whether enabled. bool server_enabled = _srs_config->get_rtc_server_enabled(); bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 469fc08d4..be25e944c 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -56,42 +56,7 @@ using namespace std; #include #include #include - -uint64_t SrsNtp::kMagicNtpFractionalUnit = 1ULL << 32; - -SrsNtp::SrsNtp() -{ - system_ms_ = 0; - ntp_ = 0; - ntp_second_ = 0; - ntp_fractions_ = 0; -} - -SrsNtp::~SrsNtp() -{ -} - -SrsNtp SrsNtp::from_time_ms(uint64_t ms) -{ - SrsNtp srs_ntp; - srs_ntp.system_ms_ = ms; - srs_ntp.ntp_second_ = ms / 1000; - srs_ntp.ntp_fractions_ = (static_cast(ms % 1000 / 1000.0)) * kMagicNtpFractionalUnit; - srs_ntp.ntp_ = (static_cast(srs_ntp.ntp_second_) << 32) | srs_ntp.ntp_fractions_; - return srs_ntp; -} - -SrsNtp SrsNtp::to_time_ms(uint64_t ntp) -{ - SrsNtp srs_ntp; - srs_ntp.ntp_ = ntp; - srs_ntp.ntp_second_ = (ntp & 0xFFFFFFFF00000000ULL) >> 32; - srs_ntp.ntp_fractions_ = (ntp & 0x00000000FFFFFFFFULL); - srs_ntp.system_ms_ = (static_cast(srs_ntp.ntp_second_) * 1000) + - (static_cast(static_cast(srs_ntp.ntp_fractions_) * 1000.0) / kMagicNtpFractionalUnit); - return srs_ntp; -} - +#include SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s) { @@ -105,15 +70,8 @@ SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s) SrsSecurityTransport::~SrsSecurityTransport() { - if (dtls_) { - srs_freep(dtls_); - dtls_ = NULL; - } - - if (srtp_) { - srs_freep(srtp_); - srtp_ = NULL; - } + srs_freep(dtls_); + srs_freep(srtp_); } srs_error_t SrsSecurityTransport::initialize(SrsSessionConfig* cfg) @@ -267,10 +225,6 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid) mw_msgs = 0; realtime = true; - // TODO: FIXME: Config the capacity? - audio_queue_ = new SrsRtpRingBuffer(100); - video_queue_ = new SrsRtpRingBuffer(1000); - nn_simulate_nack_drop = 0; nack_enabled_ = false; @@ -282,31 +236,28 @@ SrsRtcPlayStream::~SrsRtcPlayStream() _srs_config->unsubscribe(this); srs_freep(trd); - srs_freep(audio_queue_); - srs_freep(video_queue_); } -srs_error_t SrsRtcPlayStream::initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt) +srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map sub_relations) { srs_error_t err = srs_success; - video_ssrc = vssrc; - audio_ssrc = assrc; - - video_payload_type = v_pt; - audio_payload_type = a_pt; - - // TODO: FIXME: Support reload. - nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost); - srs_trace("RTC player video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), nack=%d", - video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, nack_enabled_); + std::map::iterator it = sub_relations.begin(); + while (it != sub_relations.end()) { + if (it->second->type_ == "audio") { + audio_tracks_.insert(make_pair(it->first, new SrsRtcAudioSendTrack(session_, it->second))); + } - if (_srs_rtc_hijacker) { - if ((err = _srs_rtc_hijacker->on_start_play(session_, this, session_->req)) != srs_success) { - return srs_error_wrap(err, "on start play"); + if (it->second->type_ == "video") { + video_tracks_.insert(make_pair(it->first, new SrsRtcVideoSendTrack(session_, it->second))); } + ++it; } + // TODO: FIXME: Support reload. + nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost); + srs_trace("RTC player nack=%d", nack_enabled_); + return err; } @@ -340,6 +291,12 @@ srs_error_t SrsRtcPlayStream::start() { srs_error_t err = srs_success; + // If player coroutine allocated, we think the player is started. + // To prevent play multiple times for this play stream. + if (is_started) { + return srs_error_new(ERROR_RTC_STREM_STARTED, "playstream start"); + } + srs_freep(trd); trd = new SrsSTCoroutine("rtc_sender", this, _parent_cid); @@ -347,6 +304,14 @@ srs_error_t SrsRtcPlayStream::start() return srs_error_wrap(err, "rtc_sender"); } + if (_srs_rtc_hijacker) { + if ((err = _srs_rtc_hijacker->on_start_play(session_, this, session_->req)) != srs_success) { + return srs_error_wrap(err, "on start play"); + } + } + + is_started = true; + return err; } @@ -397,7 +362,6 @@ srs_error_t SrsRtcPlayStream::cycle() // TODO: FIXME: Use cache for performance? vector pkts; - SrsRtcOutgoingInfo info; if (_srs_rtc_hijacker) { if ((err = _srs_rtc_hijacker->on_start_consume(session_, this, session_->req, consumer)) != srs_success) { @@ -465,24 +429,40 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector send_pkts; // Covert kernel messages to RTP packets. for (int i = 0; i < (int)pkts.size(); i++) { SrsRtpPacket2* pkt = pkts[i]; + // TODO: FIXME: Maybe refine for performance issue. + if (!audio_tracks_.count(pkt->header.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) { + continue; + } + // Update stats. info.nn_bytes += pkt->nb_bytes(); // For audio, we transcoded AAC to opus in extra payloads. if (pkt->is_audio()) { info.nn_audios++; - pkt->header.set_ssrc(audio_ssrc); - pkt->header.set_payload_type(audio_payload_type); + SrsRtcAudioSendTrack* audio_track = audio_tracks_[pkt->header.get_ssrc()]; + // TODO: FIXME: Any simple solution? + if ((err = audio_track->on_rtp(send_pkts, pkt)) != srs_success) { + return srs_error_wrap(err, "audio_track on rtp"); + } // TODO: FIXME: Padding audio to the max payload in RTP packets. } else { info.nn_videos++; - pkt->header.set_ssrc(video_ssrc); - pkt->header.set_payload_type(video_payload_type); + + SrsRtcVideoSendTrack* video_track = video_tracks_[pkt->header.get_ssrc()]; + // TODO: FIXME: Any simple solution? + if ((err = video_track->on_rtp(send_pkts, pkt)) != srs_success) { + return srs_error_wrap(err, "audio_track on rtp"); + } } // Detail log, should disable it in release version. @@ -491,7 +471,7 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector& iov->iov_len = (size_t)nn_encrypt; } - // Put final RTP packet to NACK/ARQ queue. - if (nack_enabled_) { - SrsRtpPacket2* nack = new SrsRtpPacket2(); - nack->header = pkt->header; - - // TODO: FIXME: Should avoid memory copying. - SrsRtpRawPayload* payload = new SrsRtpRawPayload(); - nack->payload = payload; - - payload->nn_payload = (int)iov->iov_len; - payload->payload = new char[payload->nn_payload]; - memcpy((void*)payload->payload, iov->iov_base, iov->iov_len); - - nack->shared_msg = new SrsSharedPtrMessage(); - nack->shared_msg->wrap(payload->payload, payload->nn_payload); - - if (nack->header.get_ssrc() == video_ssrc) { - video_queue_->set(nack->header.get_sequence(), nack); - } else { - audio_queue_->set(nack->header.get_sequence(), nack); - } - } - info.nn_rtp_bytes += (int)iov->iov_len; // When we send out a packet, increase the stat counter. @@ -584,16 +541,26 @@ srs_error_t SrsRtcPlayStream::do_send_packets(const std::vector& void SrsRtcPlayStream::nack_fetch(vector& pkts, uint32_t ssrc, uint16_t seq) { - SrsRtpPacket2* pkt = NULL; - - if (ssrc == video_ssrc) { - pkt = video_queue_->at(seq); - } else if (ssrc == audio_ssrc) { - pkt = audio_queue_->at(seq); + if (true) { + std::map::iterator it; + for (it = audio_tracks_.begin(); it != audio_tracks_.end(); ++it) { + if (it->second->has_ssrc(ssrc)) { + SrsRtpPacket2* pkt = it->second->fetch_rtp_packet(seq); + pkts.push_back(pkt); + return; + } + } } - if (pkt) { - pkts.push_back(pkt); + if (true) { + std::map::iterator it; + for (it = video_tracks_.begin(); it != video_tracks_.end(); ++it) { + if (it->second->has_ssrc(ssrc)) { + SrsRtpPacket2* pkt = it->second->fetch_rtp_packet(seq); + pkts.push_back(pkt); + return; + } + } } } @@ -604,8 +571,8 @@ void SrsRtcPlayStream::simulate_nack_drop(int nn) void SrsRtcPlayStream::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) { - srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop, - h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(), + srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u, ts=%u, %d bytes", nn_simulate_nack_drop, + h->get_sequence(), h->get_ssrc(), h->get_timestamp(), nn_bytes); nn_simulate_nack_drop--; @@ -758,19 +725,14 @@ srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf) for (int i = 0; i < (int)resend_pkts.size(); ++i) { SrsRtpPacket2* pkt = resend_pkts[i]; + info.nn_bytes += pkt->nb_bytes(); + srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, %d bytes", pkt->header.get_sequence(), + pkt->header.get_ssrc(), pkt->header.get_timestamp(), pkt->nb_bytes()); + } - char* data = new char[pkt->nb_bytes()]; - SrsAutoFreeA(char, data); - - SrsBuffer buf(data, pkt->nb_bytes()); - - // TODO: FIXME: Check error. - pkt->encode(&buf); - session_->sendonly_skt->sendto(data, pkt->nb_bytes(), 0); - - SrsRtpHeader* h = &pkt->header; - srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, %d bytes", h->get_sequence(), - h->get_ssrc(), h->get_timestamp(), pkt->nb_bytes()); + // By default, we send packets by sendmmsg. + if ((err = do_send_packets(resend_pkts, info)) != srs_success) { + return srs_error_wrap(err, "raw send"); } return err; @@ -795,14 +757,17 @@ srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(char* buf, int nb_buf) /*uint8_t payload_type = */stream->read_1bytes(); /*uint16_t length = */stream->read_2bytes(); /*uint32_t ssrc_of_sender = */stream->read_4bytes(); - /*uint32_t ssrc_of_media_source = */stream->read_4bytes(); + uint32_t ssrc_of_media_source = stream->read_4bytes(); switch (fmt) { case kPLI: { ISrsRtcPublishStream* publisher = session_->source_->publish_stream(); if (publisher) { - publisher->request_keyframe(); - srs_trace("RTC request PLI"); + uint32_t ssrc = get_video_publish_ssrc(ssrc_of_media_source); + if (ssrc != 0) { + publisher->request_keyframe(ssrc); + srs_trace("RTC request PLI"); + } } break; } @@ -833,16 +798,24 @@ srs_error_t SrsRtcPlayStream::on_rtcp_rr(char* data, int nb_data) return err; } +uint32_t SrsRtcPlayStream::get_video_publish_ssrc(uint32_t play_ssrc) +{ + std::map::iterator it; + for (it = video_tracks_.begin(); it != video_tracks_.end(); ++it) { + if (it->second->has_ssrc(play_ssrc)) { + return it->first; + } + } + + return 0; +} + SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session) { report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); session_ = session; request_keyframe_ = false; - video_queue_ = new SrsRtpRingBuffer(1000); - video_nack_ = new SrsRtpNackForReceiver(video_queue_, 1000 * 2 / 3); - audio_queue_ = new SrsRtpRingBuffer(100); - audio_nack_ = new SrsRtpNackForReceiver(audio_queue_, 100 * 2 / 3); source = NULL; nn_simulate_nack_drop = 0; @@ -863,34 +836,50 @@ SrsRtcPublishStream::~SrsRtcPublishStream() source->on_unpublish(); } + srs_freep(req); srs_freep(report_timer); - srs_freep(video_nack_); - srs_freep(video_queue_); - srs_freep(audio_nack_); - srs_freep(audio_queue_); } -srs_error_t SrsRtcPublishStream::initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* r) +srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescription* stream_desc) { srs_error_t err = srs_success; - video_ssrc = vssrc; - audio_ssrc = assrc; - req = r; + req = r->copy(); - // TODO: FIXME: Support reload. - nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost); - pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(session_->req->vhost); - bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost); - if (twcc_enabled) { - twcc_id_ = twcc_id; + audio_tracks_.push_back(new SrsRtcAudioRecvTrack(session_, stream_desc->audio_track_desc_)); + for (int i = 0; i < stream_desc->video_track_descs_.size(); ++i) { + SrsRtcTrackDescription* desc = stream_desc->video_track_descs_.at(i); + video_tracks_.push_back(new SrsRtcVideoRecvTrack(session_, desc)); } - srs_trace("RTC publisher video(ssrc=%u), audio(ssrc=%u), nack=%d, pt-drop=%u, twcc=%u/%d", - video_ssrc, audio_ssrc, nack_enabled_, pt_to_drop_, twcc_enabled, twcc_id); - if (twcc_id_) { + int twcc_id = -1; + uint32_t media_ssrc = 0; + if (stream_desc->audio_track_desc_) { + SrsRtcTrackDescription* desc = stream_desc->audio_track_desc_; + twcc_id = desc->get_rtp_extension_id(kTWCCExt); + media_ssrc = desc->ssrc_; + } + if (twcc_id != -1) { extension_types_.register_by_uri(twcc_id_, kTWCCExt); - rtcp_twcc_.set_media_ssrc(video_ssrc); + rtcp_twcc_.set_media_ssrc(media_ssrc); + } + + nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost); + pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(req->vhost); + bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost); + + srs_trace("RTC publisher nack=%d, pt-drop=%u, twcc=%u/%d", nack_enabled_, pt_to_drop_, twcc_enabled, twcc_id); + + return err; +} + +srs_error_t SrsRtcPublishStream::start() +{ + srs_error_t err = srs_success; + + // If report_timer started, we think the publisher is started. + if (is_started) { + return err; } if ((err = report_timer->tick(0 * SRS_UTIME_MILLISECONDS)) != srs_success) { @@ -917,212 +906,42 @@ srs_error_t SrsRtcPublishStream::initialize(uint32_t vssrc, uint32_t assrc, int } } - return err; -} - -void SrsRtcPublishStream::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc) -{ - // If DTLS is not OK, drop all messages. - if (!session_->transport_) { - return; - } - - // @see: https://tools.ietf.org/html/rfc4585#section-6.1 - vector nack_seqs; - nack->get_nack_seqs(nack_seqs); - - vector::iterator iter = nack_seqs.begin(); - while (iter != nack_seqs.end()) { - char buf[kRtpPacketSize]; - SrsBuffer stream(buf, sizeof(buf)); - // FIXME: Replace magic number. - stream.write_1bytes(0x81); - stream.write_1bytes(kRtpFb); - stream.write_2bytes(3); - stream.write_4bytes(ssrc); // TODO: FIXME: Should be 1? - stream.write_4bytes(ssrc); // TODO: FIXME: Should be 0? - uint16_t pid = *iter; - uint16_t blp = 0; - while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) { - blp |= (1 << (*(iter + 1) - pid - 1)); - ++iter; - } - - stream.write_2bytes(pid); - stream.write_2bytes(blp); - - if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) { - // Ignore any error for black-hole. - void* p = stream.data(); int len = stream.pos(); SrsRtcConnection* s = session_; - srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); - } - - char protected_buf[kRtpPacketSize]; - int nb_protected_buf = stream.pos(); - - // FIXME: Merge nack rtcp into one packets. - if (session_->transport_->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { - // TODO: FIXME: Check error. - session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); - } - - ++iter; - } -} - -srs_error_t SrsRtcPublishStream::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue) -{ - srs_error_t err = srs_success; - - // If DTLS is not OK, drop all messages. - if (!session_->transport_) { - return err; - } - - // @see https://tools.ietf.org/html/rfc3550#section-6.4.2 - char buf[kRtpPacketSize]; - SrsBuffer stream(buf, sizeof(buf)); - stream.write_1bytes(0x81); - stream.write_1bytes(kRR); - stream.write_2bytes(7); - stream.write_4bytes(ssrc); // TODO: FIXME: Should be 1? - - // TODO: FIXME: Implements it. - // TODO: FIXME: See https://github.com/ossrs/srs/blob/f81d35d20f04ebec01915cb78a882e45b7ee8800/trunk/src/app/srs_app_rtc_queue.cpp - uint8_t fraction_lost = 0; - uint32_t cumulative_number_of_packets_lost = 0 & 0x7FFFFF; - uint32_t extended_highest_sequence = rtp_queue->get_extended_highest_sequence(); - uint32_t interarrival_jitter = 0; - - uint32_t rr_lsr = 0; - uint32_t rr_dlsr = 0; - - const uint64_t& lsr_systime = last_sender_report_sys_time[ssrc]; - const SrsNtp& lsr_ntp = last_sender_report_ntp[ssrc]; - - if (lsr_systime > 0) { - rr_lsr = (lsr_ntp.ntp_second_ << 16) | (lsr_ntp.ntp_fractions_ >> 16); - uint32_t dlsr = (srs_update_system_time() - lsr_systime) / 1000; - rr_dlsr = ((dlsr / 1000) << 16) | ((dlsr % 1000) * 65536 / 1000); - } - - stream.write_4bytes(ssrc); - stream.write_1bytes(fraction_lost); - stream.write_3bytes(cumulative_number_of_packets_lost); - stream.write_4bytes(extended_highest_sequence); - stream.write_4bytes(interarrival_jitter); - stream.write_4bytes(rr_lsr); - stream.write_4bytes(rr_dlsr); - - srs_verbose("RR ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, extended_highest_sequence=%u, interarrival_jitter=%u", - ssrc, fraction_lost, cumulative_number_of_packets_lost, extended_highest_sequence, interarrival_jitter); - - char protected_buf[kRtpPacketSize]; - int nb_protected_buf = stream.pos(); - if ((err = session_->transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) { - return srs_error_wrap(err, "protect rtcp rr"); - } + is_started = true; - // TDOO: FIXME: Check error. - session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); return err; } -srs_error_t SrsRtcPublishStream::send_rtcp_xr_rrtr(uint32_t ssrc) +srs_error_t SrsRtcPublishStream::send_rtcp_rr() { srs_error_t err = srs_success; - // If DTLS is not OK, drop all messages. - if (!session_->transport_) { - return err; + for (int i = 0; i < video_tracks_.size(); ++i) { + SrsRtcVideoRecvTrack* track = video_tracks_.at(i); + track->send_rtcp_rr(); } - /* - @see: http://www.rfc-editor.org/rfc/rfc3611.html#section-2 - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - |V=2|P|reserved | PT=XR=207 | length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | SSRC | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - : report blocks : - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - - @see: http://www.rfc-editor.org/rfc/rfc3611.html#section-4.4 - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | BT=4 | reserved | block length = 2 | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | NTP timestamp, most significant word | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | NTP timestamp, least significant word | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - */ - srs_utime_t now = srs_update_system_time(); - SrsNtp cur_ntp = SrsNtp::from_time_ms(now / 1000); - - char buf[kRtpPacketSize]; - SrsBuffer stream(buf, sizeof(buf)); - stream.write_1bytes(0x80); - stream.write_1bytes(kXR); - stream.write_2bytes(4); - stream.write_4bytes(ssrc); - stream.write_1bytes(4); - stream.write_1bytes(0); - stream.write_2bytes(2); - stream.write_4bytes(cur_ntp.ntp_second_); - stream.write_4bytes(cur_ntp.ntp_fractions_); - - char protected_buf[kRtpPacketSize]; - int nb_protected_buf = stream.pos(); - if ((err = session_->transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) { - return srs_error_wrap(err, "protect rtcp xr"); + for (int i = 0; i < audio_tracks_.size(); ++i) { + SrsRtcAudioRecvTrack* track = audio_tracks_.at(i); + track->send_rtcp_rr(); } - // TDOO: FIXME: Check error. - session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); - return err; } -srs_error_t SrsRtcPublishStream::send_rtcp_fb_pli(uint32_t ssrc) +srs_error_t SrsRtcPublishStream::send_rtcp_xr_rrtr() { srs_error_t err = srs_success; - // If DTLS is not OK, drop all messages. - if (!session_->transport_) { - return err; - } - - char buf[kRtpPacketSize]; - SrsBuffer stream(buf, sizeof(buf)); - stream.write_1bytes(0x81); - stream.write_1bytes(kPsFb); - stream.write_2bytes(2); - stream.write_4bytes(ssrc); - stream.write_4bytes(ssrc); - - srs_trace("RTC PLI ssrc=%u", ssrc); - - if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) { - // Ignore any error for black-hole. - void* p = stream.data(); int len = stream.pos(); SrsRtcConnection* s = session_; - srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + for (int i = 0; i < video_tracks_.size(); ++i) { + SrsRtcVideoRecvTrack* track = video_tracks_.at(i); + track->send_rtcp_xr_rrtr(); } - char protected_buf[kRtpPacketSize]; - int nb_protected_buf = stream.pos(); - if ((err = session_->transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) { - return srs_error_wrap(err, "protect rtcp psfb pli"); + for (int i = 0; i < audio_tracks_.size(); ++i) { + SrsRtcAudioRecvTrack* track = audio_tracks_.at(i); + track->send_rtcp_xr_rrtr(); } - // TDOO: FIXME: Check error. - session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); - return err; } @@ -1212,25 +1031,22 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) // For source to consume packet. uint32_t ssrc = pkt->header.get_ssrc(); - if (ssrc == audio_ssrc) { + SrsRtcAudioRecvTrack* audio_track = get_audio_track(ssrc); + SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc); + if (audio_track) { pkt->frame_type = SrsFrameTypeAudio; - if ((err = on_audio(pkt)) != srs_success) { + if ((err = audio_track->on_rtp(source, pkt)) != srs_success) { return srs_error_wrap(err, "on audio"); } - } else if (ssrc == video_ssrc) { + } else if (video_track) { pkt->frame_type = SrsFrameTypeVideo; - if ((err = on_video(pkt)) != srs_success) { + if ((err = video_track->on_rtp(source, pkt)) != srs_success) { return srs_error_wrap(err, "on video"); } } else { return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc); } - // For NACK to handle packet. - if (nack_enabled_ && (err = on_nack(pkt)) != srs_success) { - return srs_error_wrap(err, "on nack"); - } - if (_srs_rtc_hijacker) { if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req, pkt->copy())) != srs_success) { return srs_error_wrap(err, "on rtp packet"); @@ -1248,9 +1064,9 @@ void SrsRtcPublishStream::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer } uint32_t ssrc = pkt->header.get_ssrc(); - if (ssrc == audio_ssrc) { + if (get_audio_track(ssrc)) { *ppayload = new SrsRtpRawPayload(); - } else if (ssrc == video_ssrc) { + } else if (get_video_track(ssrc)) { uint8_t v = (uint8_t)pkt->nalu_type; if (v == kStapA) { *ppayload = new SrsRtpSTAPPayload(); @@ -1262,82 +1078,6 @@ void SrsRtcPublishStream::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer } } -srs_error_t SrsRtcPublishStream::on_audio(SrsRtpPacket2* pkt) -{ - srs_error_t err = srs_success; - - pkt->frame_type = SrsFrameTypeAudio; - - // TODO: FIXME: Error check. - source->on_rtp(pkt); - - return err; -} - -srs_error_t SrsRtcPublishStream::on_video(SrsRtpPacket2* pkt) -{ - srs_error_t err = srs_success; - - pkt->frame_type = SrsFrameTypeVideo; - - // TODO: FIXME: Error check. - source->on_rtp(pkt); - - if (request_keyframe_) { - request_keyframe_ = false; - - // TODO: FIXME: Check error. - send_rtcp_fb_pli(video_ssrc); - } - - return err; -} - -srs_error_t SrsRtcPublishStream::on_nack(SrsRtpPacket2* pkt) -{ - srs_error_t err = srs_success; - - SrsRtpNackForReceiver* nack_receiver = audio_nack_; - SrsRtpRingBuffer* ring_queue = audio_queue_; - - // TODO: FIXME: use is_audio() to jugdement - uint32_t ssrc = pkt->header.get_ssrc(); - uint16_t seq = pkt->header.get_sequence(); - bool video = (ssrc == video_ssrc) ? true : false; - if (video) { - nack_receiver = video_nack_; - ring_queue = video_queue_; - } - - // TODO: check whether is necessary? - nack_receiver->remove_timeout_packets(); - - SrsRtpNackInfo* nack_info = nack_receiver->find(seq); - if (nack_info) { - // seq had been received. - nack_receiver->remove(seq); - return err; - } - - // insert check nack list - uint16_t nack_first = 0, nack_last = 0; - if (!ring_queue->update(seq, nack_first, nack_last)) { - srs_warn("too old seq %u, range [%u, %u]", seq, ring_queue->begin, ring_queue->end); - } - if (srs_rtp_seq_distance(nack_first, nack_last) > 0) { - srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); - nack_receiver->insert(nack_first, nack_last); - nack_receiver->check_queue_size(); - } - - // insert into video_queue and audio_queue - ring_queue->set(seq, pkt->copy()); - // send_nack - check_send_nacks(nack_receiver, ssrc); - - return err; -} - srs_error_t SrsRtcPublishStream::send_periodic_twcc() { srs_error_t err = srs_success; @@ -1517,8 +1257,7 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ssrc, fraction_lost, cumulative_number_of_packets_lost, highest_seq, jitter, lst, dlsr); } - last_sender_report_ntp[ssrc_of_sender] = srs_ntp; - last_sender_report_sys_time[ssrc_of_sender] = srs_update_system_time(); + update_send_report_time(ssrc_of_sender, srs_ntp); return err; } @@ -1575,11 +1314,7 @@ srs_error_t SrsRtcPublishStream::on_rtcp_xr(char* buf, int nb_buf) srs_verbose("ssrc=%u, compact_ntp=%u, lrr=%u, dlrr=%u, rtt=%d", ssrc, compact_ntp, lrr, dlrr, rtt); - if (ssrc == video_ssrc) { - video_nack_->update_rtt(rtt); - } else if (ssrc == audio_ssrc) { - audio_nack_->update_rtt(rtt); - } + update_rtt(ssrc, rtt); } } } @@ -1709,13 +1444,16 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ return err; } -void SrsRtcPublishStream::request_keyframe() +void SrsRtcPublishStream::request_keyframe(uint32_t ssrc) { SrsContextId scid = _srs_context->get_id(); SrsContextId pcid = session_->context_id(); srs_trace("RTC play=[%d][%s] request keyframe from publish=[%d][%s]", ::getpid(), scid.c_str(), ::getpid(), pcid.c_str()); - request_keyframe_ = true; + SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc); + if (video_track) { + video_track->request_keyframe(); + } } srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utime_t tick) @@ -1723,16 +1461,14 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim srs_error_t err = srs_success; // TODO: FIXME: Check error. - send_rtcp_rr(video_ssrc, video_queue_); - send_rtcp_rr(audio_ssrc, audio_queue_); - send_rtcp_xr_rrtr(video_ssrc); - send_rtcp_xr_rrtr(audio_ssrc); + send_rtcp_rr(); + send_rtcp_xr_rrtr(); // TODO: FIXME: Check error. // We should not depends on the received packet, // instead we should send feedback every Nms. send_periodic_twcc(); - + return err; } @@ -1744,12 +1480,64 @@ void SrsRtcPublishStream::simulate_nack_drop(int nn) void SrsRtcPublishStream::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) { srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop, - h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(), + h->get_sequence(), h->get_ssrc(), (get_video_track(h->get_ssrc())? "Video":"Audio"), h->get_timestamp(), nn_bytes); nn_simulate_nack_drop--; } +SrsRtcVideoRecvTrack* SrsRtcPublishStream::get_video_track(uint32_t ssrc) +{ + for (int i = 0; i < video_tracks_.size(); ++i) { + SrsRtcVideoRecvTrack* track = video_tracks_.at(i); + if (track->has_ssrc(ssrc)) { + return track; + } + } + + return NULL; +} + +SrsRtcAudioRecvTrack* SrsRtcPublishStream::get_audio_track(uint32_t ssrc) +{ + for (int i = 0; i < audio_tracks_.size(); ++i) { + SrsRtcAudioRecvTrack* track = audio_tracks_.at(i); + if (track->has_ssrc(ssrc)) { + return track; + } + } + + return NULL; +} + +void SrsRtcPublishStream::update_rtt(uint32_t ssrc, int rtt) +{ + SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc); + if (video_track) { + return video_track->update_rtt(rtt); + } + + SrsRtcAudioRecvTrack* audio_track = get_audio_track(ssrc); + if (audio_track) { + return audio_track->update_rtt(rtt); + } +} + +void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp& ntp) +{ + SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc); + if (video_track) { + return video_track->update_send_report_time(ntp); + } + + SrsRtcAudioRecvTrack* audio_track = get_audio_track(ssrc); + if (audio_track) { + return audio_track->update_send_report_time(ntp); + } +} + +uint32_t SrsRtcConnection::ssrc_num = 0; + SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s) { req = NULL; @@ -1850,6 +1638,71 @@ SrsContextId SrsRtcConnection::context_id() return cid; } +srs_error_t SrsRtcConnection::add_publisher(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp) +{ + srs_error_t err = srs_success; + + SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription(); + SrsAutoFree(SrsRtcStreamDescription, stream_desc); + if ((err = negotiate_publish_capability(req, remote_sdp, stream_desc)) != srs_success) { + return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); + } + + if ((err = generate_publish_local_sdp(req, local_sdp, stream_desc)) != srs_success) { + return srs_error_wrap(err, "generate local sdp"); + } + + SrsRtcStream* source = NULL; + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + source->set_stream_desc(stream_desc->copy()); + + if ((err = create_publisher(req, stream_desc)) != srs_success) { + return srs_error_wrap(err, "create publish"); + } + + return err; +} + +srs_error_t SrsRtcConnection::add_player(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp) +{ + srs_error_t err = srs_success; + std::map play_sub_relations; + if ((err = negotiate_play_capability(req, remote_sdp, play_sub_relations)) != srs_success) { + return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); + } + + if (!play_sub_relations.size()) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "cannot negotiate sub relations"); + } + + SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription(); + SrsAutoFree(SrsRtcStreamDescription, stream_desc); + std::map::iterator it = play_sub_relations.begin(); + while (it != play_sub_relations.end()) { + if (it->second->type_ == "audio" || !stream_desc->audio_track_desc_) { + stream_desc->audio_track_desc_ = it->second->copy(); + } + + if (it->second->type_ == "video") { + stream_desc->video_track_descs_.push_back(it->second->copy()); + } + ++it; + } + + if ((err = generate_play_local_sdp(req, local_sdp, stream_desc)) != srs_success) { + return srs_error_wrap(err, "generate local sdp"); + } + + if ((err = create_player(req, play_sub_relations)) != srs_success) { + return srs_error_wrap(err, "create player"); + } + + return err; +} + srs_error_t SrsRtcConnection::initialize(SrsRtcStream* source, SrsRequest* r, bool is_publisher, string username, SrsContextId context_id) { srs_error_t err = srs_success; @@ -1973,6 +1826,7 @@ srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data) return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done"); } + //TODO: FIXME: add unprotect_rtcp. return publisher_->on_rtp(data, nb_data); } @@ -2000,34 +1854,8 @@ srs_error_t SrsRtcConnection::start_play() { srs_error_t err = srs_success; - // If player is initialized, we think the session is started. - // To prevent play multiple times for the DTLS ARQ packet. - if (player_) { - return err; - } - player_ = new SrsRtcPlayStream(this, _srs_context->get_id()); - - uint32_t video_ssrc = 0; - uint32_t audio_ssrc = 0; - uint16_t video_payload_type = 0; - uint16_t audio_payload_type = 0; - for (size_t i = 0; i < local_sdp.media_descs_.size(); ++i) { - const SrsMediaDesc& media_desc = local_sdp.media_descs_[i]; - if (media_desc.is_audio()) { - audio_ssrc = media_desc.ssrc_infos_[0].ssrc_; - audio_payload_type = media_desc.payload_types_[0].payload_type_; - } else if (media_desc.is_video()) { - video_ssrc = media_desc.ssrc_infos_[0].ssrc_; - video_payload_type = media_desc.payload_types_[0].payload_type_; - } - } - - if ((err = player_->initialize(video_ssrc, audio_ssrc, video_payload_type, audio_payload_type)) != srs_success) { - return srs_error_wrap(err, "SrsRtcPlayStream init"); - } - if ((err = player_->start()) != srs_success) { - return srs_error_wrap(err, "start SrsRtcPlayStream"); + return srs_error_wrap(err, "start"); } return err; @@ -2037,49 +1865,8 @@ srs_error_t SrsRtcConnection::start_publish() { srs_error_t err = srs_success; - // If publisher is initialized, we think the session is started. - // To prevent publish multiple times for the DTLS ARQ packet. - if (publisher_) { - return err; - } - publisher_ = new SrsRtcPublishStream(this); - - // Request PLI for exists players? - //publisher_->request_keyframe(); - - uint32_t video_ssrc = 0; - uint32_t audio_ssrc = 0; - for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { - const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i]; - if (media_desc.is_audio()) { - if (!media_desc.ssrc_infos_.empty()) { - audio_ssrc = media_desc.ssrc_infos_[0].ssrc_; - } - } else if (media_desc.is_video()) { - if (!media_desc.ssrc_infos_.empty()) { - video_ssrc = media_desc.ssrc_infos_[0].ssrc_; - } - } - } - - uint32_t twcc_ext_id = 0; - for (size_t i = 0; i < local_sdp.media_descs_.size(); ++i) { - const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i]; - map extmaps = media_desc.get_extmaps(); - for(map::iterator it_ext = extmaps.begin(); it_ext != extmaps.end(); ++it_ext) { - if(kTWCCExt == it_ext->second) { - twcc_ext_id = it_ext->first; - break; - } - } - if (twcc_ext_id != 0){ - break; - } - } - - // FIXME: err process. - if ((err = publisher_->initialize(video_ssrc, audio_ssrc, twcc_ext_id, req)) != srs_success) { - return srs_error_wrap(err, "rtc publisher init"); + if ((err = publisher_->start()) != srs_success) { + return srs_error_wrap(err, "start SrsRtcPlayStream"); } return err; @@ -2101,71 +1888,274 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt) sendonly_skt = skt->copy_sendonly(); } -void SrsRtcConnection::simulate_nack_drop(int nn) +void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc) { - if (player_) { - player_->simulate_nack_drop(nn); + // If DTLS is not OK, drop all messages. + if (!transport_) { + return; } - if (publisher_) { - publisher_->simulate_nack_drop(nn); + // @see: https://tools.ietf.org/html/rfc4585#section-6.1 + vector nack_seqs; + nack->get_nack_seqs(nack_seqs); + + vector::iterator iter = nack_seqs.begin(); + while (iter != nack_seqs.end()) { + char buf[kRtpPacketSize]; + SrsBuffer stream(buf, sizeof(buf)); + // FIXME: Replace magic number. + stream.write_1bytes(0x81); + stream.write_1bytes(kRtpFb); + stream.write_2bytes(3); + stream.write_4bytes(ssrc); // TODO: FIXME: Should be 1? + stream.write_4bytes(ssrc); // TODO: FIXME: Should be 0? + uint16_t pid = *iter; + uint16_t blp = 0; + while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) { + blp |= (1 << (*(iter + 1) - pid - 1)); + ++iter; + } + + stream.write_2bytes(pid); + stream.write_2bytes(blp); + + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = stream.data(); int len = stream.pos(); + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + + // FIXME: Merge nack rtcp into one packets. + if (transport_->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { + // TODO: FIXME: Check error. + sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); + } + + ++iter; } } -#ifdef SRS_OSX -// These functions are similar to the older byteorder(3) family of functions. -// For example, be32toh() is identical to ntohl(). -// @see https://linux.die.net/man/3/be32toh -#define be32toh ntohl -#endif - -srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) +srs_error_t SrsRtcConnection::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue, const uint64_t& last_send_systime, const SrsNtp& last_send_ntp) { srs_error_t err = srs_success; - bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost); - if (strict_check && r->get_ice_controlled()) { - // @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1 - // TODO: Send 487 (Role Conflict) error response. - return srs_error_new(ERROR_RTC_STUN, "Peer must not in ice-controlled role in ice-lite mode."); + // If DTLS is not OK, drop all messages. + if (!transport_) { + return err; } - SrsStunPacket stun_binding_response; + // @see https://tools.ietf.org/html/rfc3550#section-6.4.2 char buf[kRtpPacketSize]; - SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); - SrsAutoFree(SrsBuffer, stream); + SrsBuffer stream(buf, sizeof(buf)); + stream.write_1bytes(0x81); + stream.write_1bytes(kRR); + stream.write_2bytes(7); + stream.write_4bytes(ssrc); // TODO: FIXME: Should be 1? - stun_binding_response.set_message_type(BindingResponse); - stun_binding_response.set_local_ufrag(r->get_remote_ufrag()); - stun_binding_response.set_remote_ufrag(r->get_local_ufrag()); - stun_binding_response.set_transcation_id(r->get_transcation_id()); - // FIXME: inet_addr is deprecated, IPV6 support - stun_binding_response.set_mapped_address(be32toh(inet_addr(sendonly_skt->get_peer_ip().c_str()))); - stun_binding_response.set_mapped_port(sendonly_skt->get_peer_port()); + // TODO: FIXME: Implements it. + // TODO: FIXME: See https://github.com/ossrs/srs/blob/f81d35d20f04ebec01915cb78a882e45b7ee8800/trunk/src/app/srs_app_rtc_queue.cpp + uint8_t fraction_lost = 0; + uint32_t cumulative_number_of_packets_lost = 0 & 0x7FFFFF; + uint32_t extended_highest_sequence = rtp_queue->get_extended_highest_sequence(); + uint32_t interarrival_jitter = 0; - if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { - return srs_error_wrap(err, "stun binding response encode failed"); - } + uint32_t rr_lsr = 0; + uint32_t rr_dlsr = 0; - if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { - return srs_error_wrap(err, "stun binding response send failed"); + if (last_send_systime > 0) { + rr_lsr = (last_send_ntp.ntp_second_ << 16) | (last_send_ntp.ntp_fractions_ >> 16); + uint32_t dlsr = (srs_update_system_time() - last_send_systime) / 1000; + rr_dlsr = ((dlsr / 1000) << 16) | ((dlsr % 1000) * 65536 / 1000); } - if (state_ == WAITING_STUN) { - state_ = DOING_DTLS_HANDSHAKE; + stream.write_4bytes(ssrc); + stream.write_1bytes(fraction_lost); + stream.write_3bytes(cumulative_number_of_packets_lost); + stream.write_4bytes(extended_highest_sequence); + stream.write_4bytes(interarrival_jitter); + stream.write_4bytes(rr_lsr); + stream.write_4bytes(rr_dlsr); - peer_id_ = sendonly_skt->peer_id(); - server_->insert_into_id_sessions(peer_id_, this); + srs_verbose("RR ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, extended_highest_sequence=%u, interarrival_jitter=%u", + ssrc, fraction_lost, cumulative_number_of_packets_lost, extended_highest_sequence, interarrival_jitter); - state_ = DOING_DTLS_HANDSHAKE; - srs_trace("RTC session=%s, STUN done, waitting DTLS handshake.", id().c_str()); + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + if ((err = transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) { + return srs_error_wrap(err, "protect rtcp rr"); + } - if((err = transport_->start_active_handshake()) != srs_success) { - return srs_error_wrap(err, "fail to dtls handshake"); - } + // TDOO: FIXME: Check error. + sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); + return err; +} + +srs_error_t SrsRtcConnection::send_rtcp_xr_rrtr(uint32_t ssrc) +{ + srs_error_t err = srs_success; + + // If DTLS is not OK, drop all messages. + if (!transport_) { + return err; } - if (blackhole && blackhole_addr && blackhole_stfd) { + /* + @see: http://www.rfc-editor.org/rfc/rfc3611.html#section-2 + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P|reserved | PT=XR=207 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + : report blocks : + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + @see: http://www.rfc-editor.org/rfc/rfc3611.html#section-4.4 + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | BT=4 | reserved | block length = 2 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NTP timestamp, most significant word | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NTP timestamp, least significant word | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + srs_utime_t now = srs_update_system_time(); + SrsNtp cur_ntp = SrsNtp::from_time_ms(now / 1000); + + char buf[kRtpPacketSize]; + SrsBuffer stream(buf, sizeof(buf)); + stream.write_1bytes(0x80); + stream.write_1bytes(kXR); + stream.write_2bytes(4); + stream.write_4bytes(ssrc); + stream.write_1bytes(4); + stream.write_1bytes(0); + stream.write_2bytes(2); + stream.write_4bytes(cur_ntp.ntp_second_); + stream.write_4bytes(cur_ntp.ntp_fractions_); + + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + if ((err = transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) { + return srs_error_wrap(err, "protect rtcp xr"); + } + + // TDOO: FIXME: Check error. + sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); + + return err; +} + +srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc) +{ + srs_error_t err = srs_success; + + // If DTLS is not OK, drop all messages. + if (!transport_) { + return err; + } + + char buf[kRtpPacketSize]; + SrsBuffer stream(buf, sizeof(buf)); + stream.write_1bytes(0x81); + stream.write_1bytes(kPsFb); + stream.write_2bytes(2); + stream.write_4bytes(ssrc); + stream.write_4bytes(ssrc); + + srs_trace("RTC PLI ssrc=%u", ssrc); + + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = stream.data(); int len = stream.pos(); + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + if ((err = transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) { + return srs_error_wrap(err, "protect rtcp psfb pli"); + } + + // TDOO: FIXME: Check error. + sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); + + return err; +} + +void SrsRtcConnection::simulate_nack_drop(int nn) +{ + if (player_) { + player_->simulate_nack_drop(nn); + } + + if (publisher_) { + publisher_->simulate_nack_drop(nn); + } +} + +#ifdef SRS_OSX +// These functions are similar to the older byteorder(3) family of functions. +// For example, be32toh() is identical to ntohl(). +// @see https://linux.die.net/man/3/be32toh +#define be32toh ntohl +#endif + +srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) +{ + srs_error_t err = srs_success; + + bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost); + if (strict_check && r->get_ice_controlled()) { + // @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1 + // TODO: Send 487 (Role Conflict) error response. + return srs_error_new(ERROR_RTC_STUN, "Peer must not in ice-controlled role in ice-lite mode."); + } + + SrsStunPacket stun_binding_response; + char buf[kRtpPacketSize]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + stun_binding_response.set_message_type(BindingResponse); + stun_binding_response.set_local_ufrag(r->get_remote_ufrag()); + stun_binding_response.set_remote_ufrag(r->get_local_ufrag()); + stun_binding_response.set_transcation_id(r->get_transcation_id()); + // FIXME: inet_addr is deprecated, IPV6 support + stun_binding_response.set_mapped_address(be32toh(inet_addr(sendonly_skt->get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(sendonly_skt->get_peer_port()); + + if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { + return srs_error_wrap(err, "stun binding response encode failed"); + } + + if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { + return srs_error_wrap(err, "stun binding response send failed"); + } + + if (state_ == WAITING_STUN) { + state_ = DOING_DTLS_HANDSHAKE; + + peer_id_ = sendonly_skt->peer_id(); + server_->insert_into_id_sessions(peer_id_, this); + + state_ = DOING_DTLS_HANDSHAKE; + srs_trace("RTC session=%s, STUN done, waitting DTLS handshake.", id().c_str()); + + if((err = transport_->start_active_handshake()) != srs_success) { + return srs_error_wrap(err, "fail to dtls handshake"); + } + } + + if (blackhole && blackhole_addr && blackhole_stfd) { // Ignore any error for black-hole. void* p = stream->data(); int len = stream->pos(); srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); @@ -2174,6 +2164,573 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) return err; } +srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRequest* req, const SrsSdp& remote_sdp, SrsRtcStreamDescription* stream_desc) +{ + srs_error_t err = srs_success; + + if (!stream_desc) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "stream description is NULL"); + } + + bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost); + bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost); + + for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { + const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i]; + + SrsRtcTrackDescription* track_desc = new SrsRtcTrackDescription(); + SrsAutoFree(SrsRtcTrackDescription, track_desc); + + track_desc->set_direction("recvonly"); + track_desc->set_mid(remote_media_desc.mid_); + // Whether feature enabled in remote extmap. + int remote_twcc_id = 0; + if (true) { + map extmaps = remote_media_desc.get_extmaps(); + for(map::iterator it = extmaps.begin(); it != extmaps.end(); ++it) { + if (it->second == kTWCCExt) { + remote_twcc_id = it->first; + break; + } + } + } + + if (twcc_enabled && remote_twcc_id) { + track_desc->add_rtp_extension_desc(remote_twcc_id, kTWCCExt); + } + + if (remote_media_desc.is_audio()) { + // TODO: check opus format specific param + std::vector payloads = remote_media_desc.find_media_with_encoding_name("opus"); + if (payloads.empty()) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no valid found opus payload type"); + } + + for (std::vector::iterator iter = payloads.begin(); iter != payloads.end(); ++iter) { + // if the playload is opus, and the encoding_param_ is channel + SrsAudioPayload* audio_payload = new SrsAudioPayload(iter->payload_type_, iter->encoding_name_, iter->clock_rate_, ::atol(iter->encoding_param_.c_str())); + audio_payload->set_opus_param_desc(iter->format_specific_param_); + // TODO: FIXME: Only support some transport algorithms. + for (int j = 0; j < (int)iter->rtcp_fb_.size(); ++j) { + + if (nack_enabled) { + if (iter->rtcp_fb_.at(j) == "nack" || iter->rtcp_fb_.at(j) == "nack pli") { + audio_payload->rtcp_fbs_.push_back(iter->rtcp_fb_.at(j)); + } + } + if (twcc_enabled && remote_twcc_id) { + if (iter->rtcp_fb_.at(j) == "transport-cc") { + audio_payload->rtcp_fbs_.push_back(iter->rtcp_fb_.at(j)); + } + } + } + track_desc->type_ = "audio"; + track_desc->set_codec_payload((SrsCodecPayload*)audio_payload); + // Only choose one match opus codec. + break; + } + } else if (remote_media_desc.is_video()) { + std::vector payloads = remote_media_desc.find_media_with_encoding_name("H264"); + if (payloads.empty()) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no found valid H.264 payload type"); + } + + std::deque backup_payloads; + for (std::vector::iterator iter = payloads.begin(); iter != payloads.end(); ++iter) { + if (iter->format_specific_param_.empty()) { + backup_payloads.push_front(*iter); + continue; + } + H264SpecificParam h264_param; + if ((err = srs_parse_h264_fmtp(iter->format_specific_param_, h264_param)) != srs_success) { + srs_error_reset(err); continue; + } + + // Try to pick the "best match" H.264 payload type. + if (h264_param.packetization_mode == "1" && h264_param.level_asymmerty_allow == "1") { + // if the playload is opus, and the encoding_param_ is channel + SrsVideoPayload* video_payload = new SrsVideoPayload(iter->payload_type_, iter->encoding_name_, iter->clock_rate_); + video_payload->set_h264_param_desc(iter->format_specific_param_); + + // TODO: FIXME: Only support some transport algorithms. + for (int j = 0; j < (int)iter->rtcp_fb_.size(); ++j) { + if (nack_enabled) { + if (iter->rtcp_fb_.at(j) == "nack" || iter->rtcp_fb_.at(j) == "nack pli") { + video_payload->rtcp_fbs_.push_back(iter->rtcp_fb_.at(j)); + } + } + if (twcc_enabled && remote_twcc_id) { + if (iter->rtcp_fb_.at(j) == "transport-cc") { + video_payload->rtcp_fbs_.push_back(iter->rtcp_fb_.at(j)); + } + } + } + + track_desc->type_ = "video"; + track_desc->set_codec_payload((SrsCodecPayload*)video_payload); + // Only choose first match H.264 payload type. + break; + } + + backup_payloads.push_back(*iter); + } + + // Try my best to pick at least one media payload type. + if (!track_desc->media_ && ! backup_payloads.empty()) { + SrsMediaPayloadType media_pt= backup_payloads.front(); + // if the playload is opus, and the encoding_param_ is channel + SrsVideoPayload* video_payload = new SrsVideoPayload(media_pt.payload_type_, media_pt.encoding_name_, media_pt.clock_rate_); + + std::vector rtcp_fbs = media_pt.rtcp_fb_; + // TODO: FIXME: Only support some transport algorithms. + for (int j = 0; j < (int)rtcp_fbs.size(); ++j) { + if (nack_enabled) { + if (rtcp_fbs.at(j) == "nack" || rtcp_fbs.at(j) == "nack pli") { + video_payload->rtcp_fbs_.push_back(rtcp_fbs.at(j)); + } + } + + if (twcc_enabled && remote_twcc_id) { + if (rtcp_fbs.at(j) == "transport-cc") { + video_payload->rtcp_fbs_.push_back(rtcp_fbs.at(j)); + } + } + } + + track_desc->set_codec_payload((SrsCodecPayload*)video_payload); + + srs_warn("choose backup H.264 payload type=%d", backup_payloads.front().payload_type_); + } + + // TODO: FIXME: Support RRTR? + //local_media_desc.payload_types_.back().rtcp_fb_.push_back("rrtr"); + } + + // TODO: FIXME: use one parse paylod from sdp. + + track_desc->create_auxiliary_payload(remote_media_desc.find_media_with_encoding_name("red")); + track_desc->create_auxiliary_payload(remote_media_desc.find_media_with_encoding_name("rtx")); + track_desc->create_auxiliary_payload(remote_media_desc.find_media_with_encoding_name("ulpfec")); + track_desc->create_auxiliary_payload(remote_media_desc.find_media_with_encoding_name("rsfec")); + + std::string track_id; + for (int i = 0; i < remote_media_desc.ssrc_infos_.size(); ++i) { + SrsSSRCInfo ssrc_info = remote_media_desc.ssrc_infos_.at(i); + // ssrc have same track id, will be description in the same track description. + if(track_id != ssrc_info.msid_tracker_) { + SrsRtcTrackDescription* track_desc_copy = track_desc->copy(); + track_desc_copy->ssrc_ = ssrc_info.ssrc_; + track_desc_copy->id_ = ssrc_info.msid_tracker_; + + if (remote_media_desc.is_audio() && !stream_desc->audio_track_desc_) { + stream_desc->audio_track_desc_ = track_desc_copy; + } else if (remote_media_desc.is_video()) { + stream_desc->video_track_descs_.push_back(track_desc_copy); + } + } + track_id = ssrc_info.msid_tracker_; + } + + // set track fec_ssrc and rtx_ssrc + for (int i = 0; i < remote_media_desc.ssrc_groups_.size(); ++i) { + SrsSSRCGroup ssrc_group = remote_media_desc.ssrc_groups_.at(i); + SrsRtcTrackDescription* track_desc = stream_desc->find_track_description_by_ssrc(ssrc_group.ssrcs_[0]); + if (!track_desc) { + continue; + } + + if (ssrc_group.semantic_ == "FID") { + track_desc->set_rtx_ssrc(ssrc_group.ssrcs_[1]); + } else if (ssrc_group.semantic_ == "FEC") { + track_desc->set_fec_ssrc(ssrc_group.ssrcs_[1]); + } + } + } + + return err; +} + +srs_error_t SrsRtcConnection::generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc) +{ + srs_error_t err = srs_success; + + if (!stream_desc) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "stream description is NULL"); + } + + local_sdp.version_ = "0"; + + local_sdp.username_ = RTMP_SIG_SRS_SERVER; + local_sdp.session_id_ = srs_int2str((int64_t)this); + local_sdp.session_version_ = "2"; + local_sdp.nettype_ = "IN"; + local_sdp.addrtype_ = "IP4"; + local_sdp.unicast_address_ = "0.0.0.0"; + + local_sdp.session_name_ = "SRSPublishSession"; + + local_sdp.msid_semantic_ = "WMS"; + std::string stream_id = req->app + "/" + req->stream; + local_sdp.msids_.push_back(stream_id); + + local_sdp.group_policy_ = "BUNDLE"; + + // generate audio media desc + if (stream_desc->audio_track_desc_) { + SrsRtcTrackDescription* audio_track = stream_desc->audio_track_desc_; + + local_sdp.media_descs_.push_back(SrsMediaDesc("audio")); + SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back(); + + local_media_desc.port_ = 9; + local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF"; + local_media_desc.rtcp_mux_ = true; + local_media_desc.rtcp_rsize_ = true; + + local_media_desc.mid_ = audio_track->mid_; + local_sdp.groups_.push_back(local_media_desc.mid_); + + // anwer not need set stream_id and track_id; + // local_media_desc.msid_ = stream_id; + // local_media_desc.msid_tracker_ = audio_track->id_; + local_media_desc.extmaps_ = audio_track->extmaps_; + + if (audio_track->direction_ == "recvonly") { + local_media_desc.recvonly_ = true; + } else if (audio_track->direction_ == "sendonly") { + local_media_desc.sendonly_ = true; + } else if (audio_track->direction_ == "sendrecv") { + local_media_desc.sendrecv_ = true; + } else if (audio_track->direction_ == "inactive_") { + local_media_desc.inactive_ = true; + } + + SrsAudioPayload* payload = (SrsAudioPayload*)audio_track->media_; + local_media_desc.payload_types_.push_back(payload->generate_media_payload_type()); + } + + for (int i = 0; i < stream_desc->video_track_descs_.size(); ++i) { + SrsRtcTrackDescription* video_track = stream_desc->video_track_descs_.at(i); + + local_sdp.media_descs_.push_back(SrsMediaDesc("video")); + SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back(); + + local_media_desc.port_ = 9; + local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF"; + local_media_desc.rtcp_mux_ = true; + local_media_desc.rtcp_rsize_ = true; + + local_media_desc.mid_ = video_track->mid_; + local_sdp.groups_.push_back(local_media_desc.mid_); + + // anwer not need set stream_id and track_id; + //local_media_desc.msid_ = stream_id; + //local_media_desc.msid_tracker_ = video_track->id_; + local_media_desc.extmaps_ = video_track->extmaps_; + + if (video_track->direction_ == "recvonly") { + local_media_desc.recvonly_ = true; + } else if (video_track->direction_ == "sendonly") { + local_media_desc.sendonly_ = true; + } else if (video_track->direction_ == "sendrecv") { + local_media_desc.sendrecv_ = true; + } else if (video_track->direction_ == "inactive_") { + local_media_desc.inactive_ = true; + } + + SrsVideoPayload* payload = (SrsVideoPayload*)video_track->media_; + local_media_desc.payload_types_.push_back(payload->generate_media_payload_type()); + + // only need media desc info, not ssrc info; + break; + } + + return err; +} + +srs_error_t SrsRtcConnection::negotiate_play_capability(SrsRequest* req, const SrsSdp& remote_sdp, std::map& sub_relations) +{ + srs_error_t err = srs_success; + + bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost); + bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost); + + SrsRtcStream* source = NULL; + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { + return srs_error_wrap(err, "fetch rtc source"); + } + + // TODO: FIXME: Avoid SSRC collision. + if (!ssrc_num) { + ssrc_num = ::getpid() * 10000 + ::getpid() * 100 + ::getpid(); + } + + for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { + const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i]; + // Whether feature enabled in remote extmap. + int remote_twcc_id = 0; + if (true) { + map extmaps = remote_media_desc.get_extmaps(); + for(map::iterator it = extmaps.begin(); it != extmaps.end(); ++it) { + if (it->second == kTWCCExt) { + remote_twcc_id = it->first; + break; + } + } + } + + std::vector track_descs; + if (remote_media_desc.is_audio()) { + // TODO: check opus format specific param + std::vector payloads = remote_media_desc.find_media_with_encoding_name("opus"); + if (payloads.empty()) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no valid found opus payload type"); + } + track_descs = source->get_track_desc("audio", "opus"); + } else if (remote_media_desc.is_video()) { + // TODO: check opus format specific param + std::vector payloads = remote_media_desc.find_media_with_encoding_name("H264"); + if (payloads.empty()) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no valid found opus payload type"); + } + track_descs = source->get_track_desc("video", "H264"); + } + + for (int i = 0; i < track_descs.size(); ++i) { + SrsRtcTrackDescription* track = track_descs[i]->copy(); + uint32_t publish_ssrc = track->ssrc_; + + track->media_->rtcp_fbs_.clear(); + if (nack_enabled) { + track->media_->rtcp_fbs_.push_back("nack"); + track->media_->rtcp_fbs_.push_back("nack pli"); + } + + track->extmaps_.clear(); + if (twcc_enabled && remote_twcc_id) { + track->add_rtp_extension_desc(remote_twcc_id, kTWCCExt); + } + + track->ssrc_ = ++ssrc_num; + // TODO: FIXME: set audio_payload rtcp_fbs_, + // according by whether downlink is support transport algorithms. + // TODO: FIXME: if we support downlink RTX, MUST assign rtx_ssrc_, rtx_pt, rtx_apt + // not support rtx + if (true) { + if (track->rtx_) { + srs_freep(track->rtx_); + } + track->rtx_ssrc_ = 0; + } + // TODO: FIXME: if we support downlink ulpfec, MUST assign ulpfec params + // set_ulpfec_config; + if (true) { + if (track->ulpfec_) { + srs_freep(track->ulpfec_); + } + track->fec_ssrc_ = 0; + } + // TODO: FIXME: if we support downlink , MUST assign fec_ssrc_ + // set_rsfec_config; + if (true) { + if (track->rsfec_) { + srs_freep(track->rsfec_); + } + track->fec_ssrc_ = 0; + } + + track->set_direction("sendonly"); + sub_relations.insert(make_pair(publish_ssrc, track)); + } + } + + return err; +} + +srs_error_t SrsRtcConnection::generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc) +{ + srs_error_t err = srs_success; + + if (!stream_desc) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "stream description is NULL"); + } + + local_sdp.version_ = "0"; + + local_sdp.username_ = RTMP_SIG_SRS_SERVER; + local_sdp.session_id_ = srs_int2str((int64_t)this); + local_sdp.session_version_ = "2"; + local_sdp.nettype_ = "IN"; + local_sdp.addrtype_ = "IP4"; + local_sdp.unicast_address_ = "0.0.0.0"; + + local_sdp.session_name_ = "SRSPlaySession"; + + local_sdp.msid_semantic_ = "WMS"; + std::string stream_id = req->app + "/" + req->stream; + local_sdp.msids_.push_back(stream_id); + + local_sdp.group_policy_ = "BUNDLE"; + + std::string cname = srs_random_str(16); + + // generate audio media desc + if (stream_desc->audio_track_desc_) { + SrsRtcTrackDescription* audio_track = stream_desc->audio_track_desc_; + + local_sdp.media_descs_.push_back(SrsMediaDesc("audio")); + SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back(); + + local_media_desc.port_ = 9; + local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF"; + local_media_desc.rtcp_mux_ = true; + local_media_desc.rtcp_rsize_ = true; + + local_media_desc.mid_ = audio_track->mid_; + local_media_desc.msid_ = stream_id; + local_media_desc.msid_tracker_ = audio_track->id_; + local_media_desc.extmaps_ = audio_track->extmaps_; + + local_media_desc.mid_ = audio_track->mid_; + local_sdp.groups_.push_back(local_media_desc.mid_); + + if (audio_track->direction_ == "recvonly") { + local_media_desc.recvonly_ = true; + } else if (audio_track->direction_ == "sendonly") { + local_media_desc.sendonly_ = true; + } else if (audio_track->direction_ == "sendrecv") { + local_media_desc.sendrecv_ = true; + } else if (audio_track->direction_ == "inactive_") { + local_media_desc.inactive_ = true; + } + + SrsAudioPayload* payload = (SrsAudioPayload*)audio_track->media_; + local_media_desc.payload_types_.push_back(payload->generate_media_payload_type()); + + //TODO: FIXME: add red, rtx, ulpfec, rsfec..., payload_types_. + //local_media_desc.payload_types_.push_back(payload->generate_media_payload_type()); + + + local_media_desc.ssrc_infos_.push_back(SrsSSRCInfo(audio_track->ssrc_, cname, stream_id, audio_track->id_)); + + if (audio_track->rtx_) { + std::vector group_ssrcs; + group_ssrcs.push_back(audio_track->ssrc_); + group_ssrcs.push_back(audio_track->rtx_ssrc_); + + local_media_desc.ssrc_groups_.push_back(SrsSSRCGroup("FID", group_ssrcs)); + local_media_desc.ssrc_infos_.push_back(SrsSSRCInfo(audio_track->rtx_ssrc_, cname, stream_id, audio_track->id_)); + } + + if (audio_track->ulpfec_ || audio_track->rsfec_) { + std::vector group_ssrcs; + group_ssrcs.push_back(audio_track->ssrc_); + group_ssrcs.push_back(audio_track->fec_ssrc_); + local_media_desc.ssrc_groups_.push_back(SrsSSRCGroup("FEC", group_ssrcs)); + + local_media_desc.ssrc_infos_.push_back(SrsSSRCInfo(audio_track->fec_ssrc_, cname, stream_id, audio_track->id_)); + } + } + + for (int i = 0; i < stream_desc->video_track_descs_.size(); ++i) { + SrsRtcTrackDescription* track = stream_desc->video_track_descs_[i]; + + // for plan b, we only add one m= + if (i == 0) { + local_sdp.media_descs_.push_back(SrsMediaDesc("video")); + SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back(); + + local_media_desc.port_ = 9; + local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF"; + local_media_desc.rtcp_mux_ = true; + local_media_desc.rtcp_rsize_ = true; + + local_media_desc.mid_ = track->mid_; + local_media_desc.msid_ = stream_id; + local_media_desc.msid_tracker_ = track->id_; + local_media_desc.extmaps_ = track->extmaps_; + + local_media_desc.mid_ = track->mid_; + local_sdp.groups_.push_back(local_media_desc.mid_); + + if (track->direction_ == "recvonly") { + local_media_desc.recvonly_ = true; + } else if (track->direction_ == "sendonly") { + local_media_desc.sendonly_ = true; + } else if (track->direction_ == "sendrecv") { + local_media_desc.sendrecv_ = true; + } else if (track->direction_ == "inactive_") { + local_media_desc.inactive_ = true; + } + + SrsVideoPayload* payload = (SrsVideoPayload*)track->media_; + + local_media_desc.payload_types_.push_back(payload->generate_media_payload_type()); + } + + SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back(); + local_media_desc.ssrc_infos_.push_back(SrsSSRCInfo(track->ssrc_, cname, stream_id, track->id_)); + + if (track->rtx_ && track->rtx_ssrc_) { + std::vector group_ssrcs; + group_ssrcs.push_back(track->ssrc_); + group_ssrcs.push_back(track->rtx_ssrc_); + + local_media_desc.ssrc_groups_.push_back(SrsSSRCGroup("FID", group_ssrcs)); + local_media_desc.ssrc_infos_.push_back(SrsSSRCInfo(track->rtx_ssrc_, cname, stream_id, track->id_)); + } + + if ((track->ulpfec_ || track->rsfec_) && track->fec_ssrc_) { + std::vector group_ssrcs; + group_ssrcs.push_back(track->ssrc_); + group_ssrcs.push_back(track->fec_ssrc_); + local_media_desc.ssrc_groups_.push_back(SrsSSRCGroup("FEC", group_ssrcs)); + + local_media_desc.ssrc_infos_.push_back(SrsSSRCInfo(track->fec_ssrc_, cname, stream_id, track->id_)); + } + // only need media desc info, not ssrc info; + break; + } + + return err; +} + +srs_error_t SrsRtcConnection::create_player(SrsRequest* req, std::map sub_relations) +{ + srs_error_t err = srs_success; + + if (player_) { + return err; + } + + player_ = new SrsRtcPlayStream(this, _srs_context->get_id()); + if ((err = player_->initialize(req, sub_relations)) != srs_success) { + return srs_error_wrap(err, "SrsRtcPlayStream init"); + } + + return err; +} + +srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcStreamDescription* stream_desc) +{ + srs_error_t err = srs_success; + + if (!stream_desc) { + return srs_error_new(ERROR_RTC_STREAM_DESC, "rtc publisher init"); + } + + if (publisher_) { + return err; + } + + publisher_ = new SrsRtcPublishStream(this); + if ((err = publisher_->initialize(req, stream_desc)) != srs_success) { + return srs_error_wrap(err, "rtc publisher init"); + } + + return err; +} + ISrsRtcHijacker::ISrsRtcHijacker() { } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index e9a501bc7..0e47c1707 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -57,6 +57,8 @@ class SrsRtpNackForReceiver; class SrsRtpIncommingVideoFrame; class SrsRtpRingBuffer; class SrsRtcConsumer; +class SrsRtcAudioSendTrack; +class SrsRtcVideoSendTrack; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -75,23 +77,6 @@ const uint8_t kSLI = 2; const uint8_t kRPSI = 3; const uint8_t kAFB = 15; -class SrsNtp -{ -public: - uint64_t system_ms_; - uint64_t ntp_; - uint32_t ntp_second_; - uint32_t ntp_fractions_; -public: - SrsNtp(); - virtual ~SrsNtp(); -public: - static SrsNtp from_time_ms(uint64_t ms); - static SrsNtp to_time_ms(uint64_t ntp); -public: - static uint64_t kMagicNtpFractionalUnit; -}; - enum SrsRtcConnectionStateType { // TODO: FIXME: Should prefixed by enum name. @@ -187,16 +172,9 @@ protected: SrsCoroutine* trd; SrsRtcConnection* session_; private: - // TODO: FIXME: How to handle timestamp overflow? - // Information for audio. - uint32_t audio_ssrc; - uint16_t audio_payload_type; - // Information for video. - uint16_t video_payload_type; - uint32_t video_ssrc; - // NACK ARQ ring buffer. - SrsRtpRingBuffer* audio_queue_; - SrsRtpRingBuffer* video_queue_; + // key: publish_ssrc, value: send track to process rtp/rtcp + std::map audio_tracks_; + std::map video_tracks_; // Simulators. int nn_simulate_nack_drop; private: @@ -205,11 +183,15 @@ private: bool realtime; // Whether enabled nack. bool nack_enabled_; + // Whether palyer started. + bool is_started; + // statistic send packets. + SrsRtcOutgoingInfo info; public: SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid); virtual ~SrsRtcPlayStream(); public: - srs_error_t initialize(uint32_t vssrc, uint32_t assrc, uint16_t v_pt, uint16_t a_pt); + srs_error_t initialize(SrsRequest* request, std::map sub_relations); // interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost); @@ -238,6 +220,7 @@ private: srs_error_t on_rtcp_feedback(char* data, int nb_data); srs_error_t on_rtcp_ps_feedback(char* data, int nb_data); srs_error_t on_rtcp_rr(char* data, int nb_data); + uint32_t get_video_publish_ssrc(uint32_t play_ssrc); }; // A RTC publish stream, client push and publish stream to SRS. @@ -248,48 +231,41 @@ private: uint64_t nn_audio_frames; private: SrsRtcConnection* session_; - uint32_t video_ssrc; - uint32_t audio_ssrc; uint16_t pt_to_drop_; // Whether enabled nack. bool nack_enabled_; private: bool request_keyframe_; - SrsRtpRingBuffer* video_queue_; - SrsRtpNackForReceiver* video_nack_; - SrsRtpRingBuffer* audio_queue_; - SrsRtpNackForReceiver* audio_nack_; private: SrsRequest* req; SrsRtcStream* source; // Simulators. int nn_simulate_nack_drop; private: - std::map last_sender_report_sys_time; - std::map last_sender_report_ntp; + // track vector + std::vector audio_tracks_; + std::vector video_tracks_; private: srs_utime_t last_twcc_feedback_time_; int twcc_id_; uint8_t twcc_fb_count_; SrsRtcpTWCC rtcp_twcc_; SrsRtpExtensionTypes extension_types_; + bool is_started; public: SrsRtcPublishStream(SrsRtcConnection* session); virtual ~SrsRtcPublishStream(); public: - srs_error_t initialize(uint32_t vssrc, uint32_t assrc, int twcc_id, SrsRequest* req); + srs_error_t initialize(SrsRequest* req, SrsRtcStreamDescription* stream_desc); + srs_error_t start(); private: void check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc); - srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue); - srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc); - srs_error_t send_rtcp_fb_pli(uint32_t ssrc); + srs_error_t send_rtcp_rr(); + srs_error_t send_rtcp_xr_rrtr(); public: srs_error_t on_rtp(char* buf, int nb_buf); virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload); private: - srs_error_t on_audio(SrsRtpPacket2* pkt); - srs_error_t on_video(SrsRtpPacket2* pkt); - srs_error_t on_nack(SrsRtpPacket2* pkt); srs_error_t send_periodic_twcc(); public: srs_error_t on_rtcp(char* data, int nb_data); @@ -300,7 +276,7 @@ private: srs_error_t on_rtcp_ps_feedback(char* data, int nb_data); srs_error_t on_rtcp_rr(char* data, int nb_data); public: - void request_keyframe(); + void request_keyframe(uint32_t ssrc); // interface ISrsHourGlass public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); @@ -310,6 +286,10 @@ private: void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes); private: srs_error_t on_twcc(uint16_t sn); + SrsRtcAudioRecvTrack* get_audio_track(uint32_t ssrc); + SrsRtcVideoRecvTrack* get_video_track(uint32_t ssrc); + void update_rtt(uint32_t ssrc, int rtt); + void update_send_report_time(uint32_t ssrc, const SrsNtp& ntp); }; // A RTC Peer Connection, SDP level object. @@ -320,6 +300,8 @@ class SrsRtcConnection friend class SrsRtcPublishStream; public: bool disposing_; +private: + static uint32_t ssrc_num; private: SrsRtcServer* server_; SrsRtcConnectionStateType state_; @@ -360,6 +342,7 @@ public: SrsRtcConnection(SrsRtcServer* s); virtual ~SrsRtcConnection(); public: + // TODO: FIXME: save only connection info. SrsSdp* get_local_sdp(); void set_local_sdp(const SrsSdp& sdp); SrsSdp* get_remote_sdp(); @@ -373,6 +356,8 @@ public: void set_encrypt(bool v); void switch_to_context(); SrsContextId context_id(); + srs_error_t add_publisher(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp); + srs_error_t add_player(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp); public: // Before initialize, user must set the local SDP, which is used to inititlize DTLS. srs_error_t initialize(SrsRtcStream* source, SrsRequest* r, bool is_publisher, std::string username, SrsContextId context_id); @@ -387,11 +372,25 @@ public: srs_error_t start_publish(); bool is_stun_timeout(); void update_sendonly_socket(SrsUdpMuxSocket* skt); +public: + // send rtcp + void check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc); + srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue, const uint64_t& last_send_systime, const SrsNtp& last_send_ntp); + srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc); + srs_error_t send_rtcp_fb_pli(uint32_t ssrc); public: // Simulate the NACK to drop nn packets. void simulate_nack_drop(int nn); private: srs_error_t on_binding_request(SrsStunPacket* r); + // publish media capabilitiy negotiate + srs_error_t negotiate_publish_capability(SrsRequest* req, const SrsSdp& remote_sdp, SrsRtcStreamDescription* stream_desc); + srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc); + // play media capabilitiy negotiate + srs_error_t negotiate_play_capability(SrsRequest* req, const SrsSdp& remote_sdp, std::map& sub_relations); + srs_error_t generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcStreamDescription* stream_desc); + srs_error_t create_player(SrsRequest* request, std::map sub_relations); + srs_error_t create_publisher(SrsRequest* request, SrsRtcStreamDescription* stream_desc); }; class ISrsRtcHijacker diff --git a/trunk/src/app/srs_app_rtc_dtls.cpp b/trunk/src/app/srs_app_rtc_dtls.cpp index 80c7cc0ae..cbee6c0d6 100644 --- a/trunk/src/app/srs_app_rtc_dtls.cpp +++ b/trunk/src/app/srs_app_rtc_dtls.cpp @@ -243,6 +243,9 @@ ISrsDtlsCallback::~ISrsDtlsCallback() SrsDtls::SrsDtls(ISrsDtlsCallback* cb) { + dtls_ctx = NULL; + dtls = NULL; + callback = cb; handshake_done = false; diff --git a/trunk/src/app/srs_app_rtc_sdp.cpp b/trunk/src/app/srs_app_rtc_sdp.cpp index 5e2a584fd..b5b942675 100644 --- a/trunk/src/app/srs_app_rtc_sdp.cpp +++ b/trunk/src/app/srs_app_rtc_sdp.cpp @@ -46,7 +46,7 @@ if (!getline(is,word,delim)) {\ return srs_error_new(ERROR_RTC_SDP_DECODE, "fetch with delim failed");\ }\ -static std::vector split_str(const std::string& str, const std::string& delim) +std::vector split_str(const std::string& str, const std::string& delim) { std::vector ret; size_t pre_pos = 0; @@ -176,6 +176,16 @@ SrsSSRCInfo::SrsSSRCInfo() ssrc_ = 0; } +SrsSSRCInfo::SrsSSRCInfo(uint32_t ssrc, std::string cname, std::string stream_id, std::string track_id) +{ + ssrc_ = ssrc; + cname_ = cname; + msid_ = stream_id; + msid_tracker_ = track_id; + mslabel_ = msid_; + label_ = msid_tracker_; +} + SrsSSRCInfo::~SrsSSRCInfo() { } @@ -802,6 +812,13 @@ void SrsSdp::set_ice_pwd(const std::string& pwd) } } +void SrsSdp::set_dtls_role(const std::string& dtls_role) +{ + for (std::vector::iterator iter = media_descs_.begin(); iter != media_descs_.end(); ++iter) { + iter->session_info_.setup_ = dtls_role; + } +} + void SrsSdp::set_fingerprint_algo(const std::string& algo) { for (std::vector::iterator iter = media_descs_.begin(); iter != media_descs_.end(); ++iter) { @@ -849,6 +866,16 @@ std::string SrsSdp::get_ice_pwd() const return ""; } +std::string SrsSdp::get_dtls_role() const +{ + // Becaues we use BUNDLE, so we can choose the first element. + for (std::vector::const_iterator iter = media_descs_.begin(); iter != media_descs_.end(); ++iter) { + return iter->session_info_.setup_; + } + + return ""; +} + srs_error_t SrsSdp::parse_line(const std::string& line) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_sdp.hpp b/trunk/src/app/srs_app_rtc_sdp.hpp index c09af3ee0..443923efc 100644 --- a/trunk/src/app/srs_app_rtc_sdp.hpp +++ b/trunk/src/app/srs_app_rtc_sdp.hpp @@ -34,6 +34,9 @@ #include const std::string kTWCCExt = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"; +// TDOO: FIXME: Rename it, and add utest. +extern std::vector split_str(const std::string& str, const std::string& delim); + struct SrsSessionConfig { public: @@ -64,6 +67,7 @@ class SrsSSRCInfo { public: SrsSSRCInfo(); + SrsSSRCInfo(uint32_t ssrc, std::string cname, std::string stream_id, std::string track_id); virtual ~SrsSSRCInfo(); public: srs_error_t encode(std::ostringstream& os); @@ -192,12 +196,14 @@ public: public: void set_ice_ufrag(const std::string& ufrag); void set_ice_pwd(const std::string& pwd); + void set_dtls_role(const std::string& dtls_role); void set_fingerprint_algo(const std::string& algo); void set_fingerprint(const std::string& fingerprint); void add_candidate(const std::string& ip, const int& port, const std::string& type); std::string get_ice_ufrag() const; std::string get_ice_pwd() const; + std::string get_dtls_role() const; private: srs_error_t parse_line(const std::string& line); private: diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index d049ea9e5..a3a073eb1 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -311,6 +311,36 @@ srs_error_t SrsRtcServer::create_session( return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str()); } + // TODO: FIXME: add do_create_session to error process. + SrsRtcConnection* session = new SrsRtcConnection(this); + if ((err = do_create_session(session, req, remote_sdp, local_sdp, mock_eip, publish, source)) != srs_success) { + srs_freep(session); + return srs_error_wrap(err, "create session"); + } + + *psession = session; + + return err; +} + +srs_error_t SrsRtcServer::do_create_session( + SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish, + SrsRtcStream* source +) +{ + srs_error_t err = srs_success; + + // first add publisher/player for negotiate sdp media info + if (publish) { + if ((err = session->add_publisher(req, remote_sdp, local_sdp)) != srs_success) { + return srs_error_wrap(err, "add publisher"); + } + } else { + if ((err = session->add_player(req, remote_sdp, local_sdp)) != srs_success) { + return srs_error_wrap(err, "add publisher"); + } + } + std::string local_pwd = srs_random_str(32); std::string local_ufrag = ""; // TODO: FIXME: Rename for a better name, it's not an username. @@ -339,7 +369,19 @@ srs_error_t SrsRtcServer::create_session( } } - SrsRtcConnection* session = new SrsRtcConnection(this); + if (remote_sdp.get_dtls_role() == "active") { + local_sdp.set_dtls_role("passive"); + } else if (remote_sdp.get_dtls_role() == "passive") { + local_sdp.set_dtls_role("active"); + } else if (remote_sdp.get_dtls_role() == "actpass") { + local_sdp.set_dtls_role(local_sdp.session_config_.dtls_role); + } else { + // @see: https://tools.ietf.org/html/rfc4145#section-4.1 + // The default value of the setup attribute in an offer/answer exchange + // is 'active' in the offer and 'passive' in the answer. + local_sdp.set_dtls_role("passive"); + } + session->set_remote_sdp(remote_sdp); // We must setup the local SDP, then initialize the session object. session->set_local_sdp(local_sdp); @@ -348,13 +390,10 @@ srs_error_t SrsRtcServer::create_session( SrsContextId cid = _srs_context->get_id(); // Before session initialize, we must setup the local SDP. if ((err = session->initialize(source, req, publish, username, cid)) != srs_success) { - srs_freep(session); return srs_error_wrap(err, "init"); } map_username_session.insert(make_pair(username, session)); - *psession = session; - return err; } diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 1c0e97f94..8ea233a24 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -39,6 +39,7 @@ class SrsHourGlass; class SrsRtcConnection; class SrsRequest; class SrsSdp; +class SrsRtcStream; class ISrsRtcServerHandler { @@ -80,6 +81,12 @@ public: SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish, SrsRtcConnection** psession ); +private: + srs_error_t do_create_session( + SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, + const std::string& mock_eip, bool publish, SrsRtcStream* source + ); +public: // We start offering, create_session2 to generate offer, setup_session2 to handle answer. srs_error_t create_session2(SrsSdp& local_sdp, SrsRtcConnection** psession); srs_error_t setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 7550a2e71..ab0a6c15f 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -35,6 +35,8 @@ #include #include #include +#include +#include #ifdef SRS_FFMPEG_FIT #include @@ -90,6 +92,41 @@ srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFor return err; } +uint64_t SrsNtp::kMagicNtpFractionalUnit = 1ULL << 32; + +SrsNtp::SrsNtp() +{ + system_ms_ = 0; + ntp_ = 0; + ntp_second_ = 0; + ntp_fractions_ = 0; +} + +SrsNtp::~SrsNtp() +{ +} + +SrsNtp SrsNtp::from_time_ms(uint64_t ms) +{ + SrsNtp srs_ntp; + srs_ntp.system_ms_ = ms; + srs_ntp.ntp_second_ = ms / 1000; + srs_ntp.ntp_fractions_ = (static_cast(ms % 1000 / 1000.0)) * kMagicNtpFractionalUnit; + srs_ntp.ntp_ = (static_cast(srs_ntp.ntp_second_) << 32) | srs_ntp.ntp_fractions_; + return srs_ntp; +} + +SrsNtp SrsNtp::to_time_ms(uint64_t ntp) +{ + SrsNtp srs_ntp; + srs_ntp.ntp_ = ntp; + srs_ntp.ntp_second_ = (ntp & 0xFFFFFFFF00000000ULL) >> 32; + srs_ntp.ntp_fractions_ = (ntp & 0x00000000FFFFFFFFULL); + srs_ntp.system_ms_ = (static_cast(srs_ntp.ntp_second_) * 1000) + + (static_cast(static_cast(srs_ntp.ntp_fractions_) * 1000.0) / kMagicNtpFractionalUnit); + return srs_ntp; +} + SrsRtcConsumer::SrsRtcConsumer(SrsRtcStream* s) { source = s; @@ -254,6 +291,7 @@ SrsRtcStream::SrsRtcStream() #else bridger_ = new SrsRtcDummyBridger(); #endif + stream_desc_ = NULL; } SrsRtcStream::~SrsRtcStream() @@ -264,6 +302,7 @@ SrsRtcStream::~SrsRtcStream() srs_freep(req); srs_freep(bridger_); + srs_freep(stream_desc_); } srs_error_t SrsRtcStream::initialize(SrsRequest* r) @@ -423,6 +462,36 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt) return err; } +void SrsRtcStream::set_stream_desc(SrsRtcStreamDescription* stream_desc) +{ + srs_freep(stream_desc_); + stream_desc_ = stream_desc->copy(); +} + +std::vector SrsRtcStream::get_track_desc(std::string type, std::string media_name) +{ + std::vector track_descs; + if (!stream_desc_) { + return track_descs; + } + + if (type == "audio") { + if (stream_desc_->audio_track_desc_->media_->name_ == media_name) { + track_descs.push_back(stream_desc_->audio_track_desc_); + } + } + + if (type == "video") { + std::vector::iterator it = stream_desc_->video_track_descs_.begin(); + while (it != stream_desc_->video_track_descs_.end() ){ + track_descs.push_back(*it); + ++it; + } + } + + return track_descs; +} + #ifdef SRS_FFMPEG_FIT SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source) { @@ -975,3 +1044,672 @@ void SrsRtcDummyBridger::on_unpublish() { } +SrsCodecPayload::SrsCodecPayload() +{ +} + +SrsCodecPayload::SrsCodecPayload(uint8_t pt, std::string encode_name, int sample) +{ + pt_ = pt; + name_ = encode_name; + sample_ = sample; +} + +SrsCodecPayload::~SrsCodecPayload() +{ +} + +SrsCodecPayload* SrsCodecPayload::copy() +{ + SrsCodecPayload* cp = new SrsCodecPayload(); + + cp->type_ = type_; + cp->pt_ = pt_; + cp->name_ = name_; + cp->sample_ = sample_; + cp->rtcp_fbs_ = rtcp_fbs_; + + return cp; +} + +SrsMediaPayloadType SrsCodecPayload::generate_media_payload_type() +{ + SrsMediaPayloadType media_payload_type(pt_); + + media_payload_type.encoding_name_ = name_; + media_payload_type.clock_rate_ = sample_; + media_payload_type.rtcp_fb_ = rtcp_fbs_; + + return media_payload_type; +} + +SrsVideoPayload::SrsVideoPayload() +{ +} + +SrsVideoPayload::SrsVideoPayload(uint8_t pt, std::string encode_name, int sample) + :SrsCodecPayload(pt, encode_name, sample) +{ + h264_param_.profile_level_id = ""; + h264_param_.packetization_mode = ""; + h264_param_.level_asymmerty_allow = ""; +} + +SrsVideoPayload::~SrsVideoPayload() +{ +} + +SrsVideoPayload* SrsVideoPayload::copy() +{ + SrsVideoPayload* cp = new SrsVideoPayload(); + + cp->type_ = type_; + cp->pt_ = pt_; + cp->name_ = name_; + cp->sample_ = sample_; + cp->rtcp_fbs_ = rtcp_fbs_; + cp->h264_param_ = h264_param_; + + return cp; +} + +SrsMediaPayloadType SrsVideoPayload::generate_media_payload_type() +{ + SrsMediaPayloadType media_payload_type(pt_); + + media_payload_type.encoding_name_ = name_; + media_payload_type.clock_rate_ = sample_; + media_payload_type.rtcp_fb_ = rtcp_fbs_; + + std::ostringstream format_specific_param; + if (!h264_param_.level_asymmerty_allow.empty()) { + format_specific_param << "level-asymmetry-allowed=" << h264_param_.level_asymmerty_allow; + } + if (!h264_param_.packetization_mode.empty()) { + format_specific_param << ";packetization-mode=" << h264_param_.packetization_mode; + } + if (!h264_param_.profile_level_id.empty()) { + format_specific_param << ";profile-level-id=" << h264_param_.profile_level_id; + } + + media_payload_type.format_specific_param_ = format_specific_param.str(); + + return media_payload_type; +} + +srs_error_t SrsVideoPayload::set_h264_param_desc(std::string fmtp) +{ + srs_error_t err = srs_success; + std::vector vec = split_str(fmtp, ";"); + for (size_t i = 0; i < vec.size(); ++i) { + std::vector kv = split_str(vec[i], "="); + if (kv.size() == 2) { + if (kv[0] == "profile-level-id") { + h264_param_.profile_level_id = kv[1]; + } else if (kv[0] == "packetization-mode") { + // 6.3. Non-Interleaved Mode + // This mode is in use when the value of the OPTIONAL packetization-mode + // media type parameter is equal to 1. This mode SHOULD be supported. + // It is primarily intended for low-delay applications. Only single NAL + // unit packets, STAP-As, and FU-As MAY be used in this mode. STAP-Bs, + // MTAPs, and FU-Bs MUST NOT be used. The transmission order of NAL + // units MUST comply with the NAL unit decoding order. + // @see https://tools.ietf.org/html/rfc6184#section-6.3 + h264_param_.packetization_mode = kv[1]; + } else if (kv[0] == "level-asymmetry-allowed") { + h264_param_.level_asymmerty_allow = kv[1]; + } else { + return srs_error_new(ERROR_RTC_SDP_DECODE, "invalid h264 param=%s", kv[0].c_str()); + } + } else { + return srs_error_new(ERROR_RTC_SDP_DECODE, "invalid h264 param=%s", vec[i].c_str()); + } + } + + return err; +} + +SrsAudioPayload::SrsAudioPayload() +{ +} + +SrsAudioPayload::SrsAudioPayload(uint8_t pt, std::string encode_name, int sample, int channel) + :SrsCodecPayload(pt, encode_name, sample) +{ + channel_ = channel; + opus_param_.minptime = 0; + opus_param_.use_inband_fec = false; + opus_param_.usedtx = false; +} + +SrsAudioPayload::~SrsAudioPayload() +{ +} + +SrsAudioPayload* SrsAudioPayload::copy() +{ + SrsAudioPayload* cp = new SrsAudioPayload(); + + cp->type_ = type_; + cp->pt_ = pt_; + cp->name_ = name_; + cp->sample_ = sample_; + cp->rtcp_fbs_ = rtcp_fbs_; + cp->channel_ = channel_; + cp->opus_param_ = opus_param_; + + return cp; +} + +SrsMediaPayloadType SrsAudioPayload::generate_media_payload_type() +{ + SrsMediaPayloadType media_payload_type(pt_); + + media_payload_type.encoding_name_ = name_; + media_payload_type.clock_rate_ = sample_; + media_payload_type.encoding_param_ = srs_int2str(channel_); + media_payload_type.rtcp_fb_ = rtcp_fbs_; + + std::ostringstream format_specific_param; + if (opus_param_.minptime) { + format_specific_param << "minptime=" << opus_param_.minptime; + } + if (opus_param_.use_inband_fec) { + format_specific_param << ";useinbandfec=1"; + } + if (opus_param_.usedtx) { + format_specific_param << ";usedtx=1"; + } + + media_payload_type.format_specific_param_ = format_specific_param.str(); + + return media_payload_type; +} + +srs_error_t SrsAudioPayload::set_opus_param_desc(std::string fmtp) +{ + srs_error_t err = srs_success; + std::vector vec = split_str(fmtp, ";"); + for (size_t i = 0; i < vec.size(); ++i) { + std::vector kv = split_str(vec[i], "="); + if (kv.size() == 2) { + if (kv[0] == "minptime") { + opus_param_.minptime = (int)::atol(kv[1].c_str()); + } else if (kv[0] == "useinbandfec") { + opus_param_.use_inband_fec = (kv[1] == "1") ? true : false; + } else if (kv[0] == "usedtx") { + opus_param_.usedtx = (kv[1] == "1") ? true : false; + } + } else { + return srs_error_new(ERROR_RTC_SDP_DECODE, "invalid opus param=%s", vec[i].c_str()); + } + } + + return err; +} + +SrsRtcTrackDescription::SrsRtcTrackDescription() +{ + ssrc_ = 0; + rtx_ssrc_ = 0; + fec_ssrc_ = 0; + is_active_ = true; + + media_ = NULL; + red_ = NULL; + rtx_ = NULL; + ulpfec_ = NULL; + rsfec_ = NULL; +} + +SrsRtcTrackDescription::~SrsRtcTrackDescription() +{ + srs_freep(media_); + srs_freep(red_); + srs_freep(rtx_); + srs_freep(ulpfec_); + srs_freep(rsfec_); +} + +bool SrsRtcTrackDescription::has_ssrc(uint32_t ssrc) +{ + if (ssrc == ssrc_ || ssrc == rtx_ssrc_ || ssrc == fec_ssrc_) { + return true; + } + return false; +} + +void SrsRtcTrackDescription::add_rtp_extension_desc(int id, std::string uri) +{ + extmaps_[id] = uri; +} + +void SrsRtcTrackDescription::set_direction(std::string direction) +{ + direction_ = direction; +} + +void SrsRtcTrackDescription::set_codec_payload(SrsCodecPayload* payload) +{ + media_ = payload; +} + +void SrsRtcTrackDescription::create_auxiliary_payload(const std::vector payloads) +{ + if (!payloads.size()) { + return; + } + + SrsMediaPayloadType payload = payloads.at(0); + if (payload.encoding_name_ == "red"){ + srs_freep(red_); + red_ = new SrsCodecPayload(payload.payload_type_, "red", payload.clock_rate_); + } else if (payload.encoding_name_ == "rtx") { + srs_freep(rtx_); + rtx_ = new SrsCodecPayload(payload.payload_type_, "rtx", payload.clock_rate_); + } else if (payload.encoding_name_ == "ulpfec") { + srs_freep(ulpfec_); + ulpfec_ = new SrsCodecPayload(payload.payload_type_, "ulpfec", payload.clock_rate_); + } else if (payload.encoding_name_ == "rsfec") { + srs_freep(rsfec_); + rsfec_ = new SrsCodecPayload(payload.payload_type_, "rsfec", payload.clock_rate_); + } +} + +void SrsRtcTrackDescription::set_rtx_ssrc(uint32_t ssrc) +{ + rtx_ssrc_ = ssrc; +} + +void SrsRtcTrackDescription::set_fec_ssrc(uint32_t ssrc) +{ + fec_ssrc_ = ssrc; +} + +void SrsRtcTrackDescription::set_mid(std::string mid) +{ + mid_ = mid; +} + +int SrsRtcTrackDescription::get_rtp_extension_id(std::string uri) +{ + for(std::map::iterator it = extmaps_.begin(); it != extmaps_.end(); ++it) { + if(uri == it->second) { + return it->first; + } + } + + return -1; +} + +SrsRtcTrackDescription* SrsRtcTrackDescription::copy() +{ + SrsRtcTrackDescription* cp = new SrsRtcTrackDescription(); + + cp->type_ = type_; + cp->id_ = id_; + cp->ssrc_ = ssrc_; + cp->fec_ssrc_ = fec_ssrc_; + cp->rtx_ssrc_ = rtx_ssrc_; + cp->extmaps_ = extmaps_; + cp->direction_ = direction_; + cp->mid_ = mid_; + cp->is_active_ = is_active_; + cp->media_ = media_ ? media_->copy():NULL; + cp->red_ = red_ ? red_->copy():NULL; + cp->rtx_ = rtx_ ? rtx_->copy():NULL; + cp->ulpfec_ = ulpfec_ ? ulpfec_->copy():NULL; + cp->rsfec_ = rsfec_ ? rsfec_->copy():NULL; + + return cp; +} + +SrsRtcStreamDescription::SrsRtcStreamDescription() +{ + audio_track_desc_ = NULL; +} + +SrsRtcStreamDescription::~SrsRtcStreamDescription() +{ + srs_freep(audio_track_desc_); + + for (int i = 0; i < video_track_descs_.size(); ++i) { + srs_freep(video_track_descs_.at(i)); + } + video_track_descs_.clear(); +} + +SrsRtcStreamDescription* SrsRtcStreamDescription::copy() +{ + SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription(); + + if (audio_track_desc_) { + stream_desc->audio_track_desc_ = audio_track_desc_->copy(); + } + + for (int i = 0; i < video_track_descs_.size(); ++i) { + stream_desc->video_track_descs_.push_back(video_track_descs_.at(i)->copy()); + } + + return stream_desc; +} + +SrsRtcTrackDescription* SrsRtcStreamDescription::find_track_description_by_ssrc(uint32_t ssrc) +{ + if (audio_track_desc_->has_ssrc(ssrc)) { + return audio_track_desc_; + } + + for (int i = 0; i < video_track_descs_.size(); ++i) { + if (video_track_descs_.at(i)->has_ssrc(ssrc)) { + return video_track_descs_.at(i); + } + } + + return NULL; +} + +ISrsRtcTrack::ISrsRtcTrack() +{ +} + +ISrsRtcTrack::~ISrsRtcTrack() +{ +} + +SrsRtcRecvTrack::SrsRtcRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio) +{ + session_ = session; + track_desc_ = track_desc->copy(); + if (is_audio) { + rtp_queue_ = new SrsRtpRingBuffer(100); + nack_receiver_ = new SrsRtpNackForReceiver(rtp_queue_, 100 * 2 / 3); + } else { + rtp_queue_ = new SrsRtpRingBuffer(1000); + nack_receiver_ = new SrsRtpNackForReceiver(rtp_queue_, 1000 * 2 / 3); + } +} + +SrsRtcRecvTrack::~SrsRtcRecvTrack() +{ + srs_freep(rtp_queue_); + srs_freep(nack_receiver_); + srs_freep(track_desc_); +} + +bool SrsRtcRecvTrack::has_ssrc(uint32_t ssrc) +{ + if (track_desc_) { + return track_desc_->has_ssrc(ssrc); + } + + return false; +} + +void SrsRtcRecvTrack::update_rtt(int rtt) +{ + if (nack_receiver_) { + nack_receiver_->update_rtt(rtt); + } +} + +void SrsRtcRecvTrack::update_send_report_time(const SrsNtp& ntp) +{ + last_sender_report_ntp = ntp; + last_sender_report_sys_time = srs_update_system_time();; +} + +srs_error_t SrsRtcRecvTrack::send_rtcp_rr() +{ + srs_error_t err = srs_success; + + if (session_) { + return session_->send_rtcp_rr(track_desc_->ssrc_, rtp_queue_, last_sender_report_sys_time, last_sender_report_ntp); + } + + return err; +} + +srs_error_t SrsRtcRecvTrack::send_rtcp_xr_rrtr() +{ + srs_error_t err = srs_success; + + if (track_desc_) { + return session_->send_rtcp_xr_rrtr(track_desc_->ssrc_); + } + + return err; +} + +srs_error_t SrsRtcRecvTrack::on_nack(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + uint32_t ssrc = pkt->header.get_ssrc(); + uint16_t seq = pkt->header.get_sequence(); + + // TODO: check whether is necessary? + nack_receiver_->remove_timeout_packets(); + SrsRtpNackInfo* nack_info = nack_receiver_->find(seq); + if (nack_info) { + // seq had been received. + nack_receiver_->remove(seq); + return err; + } + + // insert check nack list + uint16_t nack_first = 0, nack_last = 0; + if (!rtp_queue_->update(seq, nack_first, nack_last)) { + srs_warn("too old seq %u, range [%u, %u]", seq, rtp_queue_->begin, rtp_queue_->end); + } + if (srs_rtp_seq_distance(nack_first, nack_last) > 0) { + srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); + nack_receiver_->insert(nack_first, nack_last); + nack_receiver_->check_queue_size(); + } + + // insert into video_queue and audio_queue + rtp_queue_->set(seq, pkt->copy()); + // send_nack + session_->check_send_nacks(nack_receiver_, ssrc); + + return err; +} + +srs_error_t SrsRtcRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt) +{ + return srs_success; +} + +SrsRtcAudioRecvTrack::SrsRtcAudioRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc) + : SrsRtcRecvTrack(session, track_desc, true) +{ +} + +SrsRtcAudioRecvTrack::~SrsRtcAudioRecvTrack() +{ +} + +srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + // uint8_t pt = pkt->header.get_payload_type(); + + // SrsRtcTrackDescription track = rtc_stream_desc_->get_audio_tracks(); + // // process red packet. + // if (pt == red_pt) { + + // } else if (pt == rtx_pt) { // process rtx_pt. + // // restore retranmission packet. + // } else if (pt == fec_pt) { + + // } + + if (source) { + if ((err = source->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "source on rtp"); + } + } + + // For NACK to handle packet. + if ((err = on_nack(pkt)) != srs_success) { + return srs_error_wrap(err, "on nack"); + } + + return err; +} + +SrsRtcVideoRecvTrack::SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc) + : SrsRtcRecvTrack(session, track_desc, false) +{ + request_key_frame_ = false; +} + +SrsRtcVideoRecvTrack::~SrsRtcVideoRecvTrack() +{ +} + +srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + pkt->frame_type = SrsFrameTypeVideo; + + // TODO: FIXME: add rtp process + if (request_key_frame_) { + // TODO: FIXME: add coroutine to request key frame. + request_key_frame_ = false; + // TODO: FIXME: Check error. + session_->send_rtcp_fb_pli(track_desc_->ssrc_); + } + + if (source) { + if ((err = source->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "source on rtp"); + } + } + + // For NACK to handle packet. + if ((err = on_nack(pkt)) != srs_success) { + return srs_error_wrap(err, "on nack"); + } + + return err; +} + +void SrsRtcVideoRecvTrack::request_keyframe() +{ + request_key_frame_ = true; +} + + +SrsRtcSendTrack::SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio) +{ + session_ = session; + track_desc_ = track_desc->copy(); + if (is_audio) { + rtp_queue_ = new SrsRtpRingBuffer(100); + } else { + rtp_queue_ = new SrsRtpRingBuffer(1000); + } +} +SrsRtcSendTrack::~SrsRtcSendTrack() +{ + srs_freep(rtp_queue_); + srs_freep(track_desc_); +} + +bool SrsRtcSendTrack::has_ssrc(uint32_t ssrc) +{ + if (track_desc_) { + return track_desc_->has_ssrc(ssrc); + } + + return false; +} +SrsRtpPacket2* SrsRtcSendTrack::fetch_rtp_packet(uint16_t seq) +{ + if (rtp_queue_) { + return rtp_queue_->at(seq); + } + + return NULL; +} + +srs_error_t SrsRtcSendTrack::on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt) +{ + return srs_success; +} + +srs_error_t SrsRtcSendTrack::on_rtcp(SrsRtpPacket2* pkt) +{ + return srs_success; +} + +SrsRtcAudioSendTrack::SrsRtcAudioSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc) + : SrsRtcSendTrack(session, track_desc, true) +{ +} + +SrsRtcAudioSendTrack::~SrsRtcAudioSendTrack() +{ +} + +srs_error_t SrsRtcAudioSendTrack::on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + pkt->header.set_ssrc(track_desc_->ssrc_); + pkt->header.set_payload_type(track_desc_->media_->pt_); + + // Put rtp packet to NACK/ARQ queue + if (true) { + SrsRtpPacket2* nack = pkt->copy(); + rtp_queue_->set(nack->header.get_sequence(), nack); + } + send_packets.push_back(pkt); + + return err; +} + +srs_error_t SrsRtcAudioSendTrack::on_rtcp(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + // process rtcp + return err; +} + +SrsRtcVideoSendTrack::SrsRtcVideoSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc) + : SrsRtcSendTrack(session, track_desc, false) +{ +} + +SrsRtcVideoSendTrack::~SrsRtcVideoSendTrack() +{ +} + +srs_error_t SrsRtcVideoSendTrack::on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + pkt->header.set_ssrc(track_desc_->ssrc_); + pkt->header.set_payload_type(track_desc_->media_->pt_); + + // Put rtp packet to NACK/ARQ queue + if (true) { + SrsRtpPacket2* nack = pkt->copy(); + rtp_queue_->set(nack->header.get_sequence(), nack); + } + + send_packets.push_back(pkt); + + return err; +} + +srs_error_t SrsRtcVideoSendTrack::on_rtcp(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + // process rtcp + return err; +} + diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 66215dbab..036798a8c 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -28,7 +28,12 @@ #include #include +#include +#include +#include +#include +#include #include #include @@ -43,6 +48,28 @@ class SrsRtcFromRtmpBridger; class SrsAudioRecode; class SrsRtpPacket2; class SrsSample; +class SrsRtcStreamDescription; +class SrsRtcTrackDescription; +class SrsRtcConnection; +class SrsRtpRingBuffer; +class SrsRtpNackForReceiver; + +class SrsNtp +{ +public: + uint64_t system_ms_; + uint64_t ntp_; + uint32_t ntp_second_; + uint32_t ntp_fractions_; +public: + SrsNtp(); + virtual ~SrsNtp(); +public: + static SrsNtp from_time_ms(uint64_t ms); + static SrsNtp to_time_ms(uint64_t ntp); +public: + static uint64_t kMagicNtpFractionalUnit; +}; class SrsRtcConsumer { @@ -100,7 +127,7 @@ public: ISrsRtcPublishStream(); virtual ~ISrsRtcPublishStream(); public: - virtual void request_keyframe() = 0; + virtual void request_keyframe(uint32_t ssrc) = 0; }; // A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream. @@ -118,6 +145,8 @@ private: ISrsRtcPublishStream* publish_stream_; // Transmux RTMP to RTC. ISrsSourceBridger* bridger_; + // Steam description for this steam. + SrsRtcStreamDescription* stream_desc_; private: // To delivery stream to clients. std::vector consumers; @@ -159,6 +188,9 @@ public: void set_publish_stream(ISrsRtcPublishStream* v); // Consume the shared RTP packet, user must free it. srs_error_t on_rtp(SrsRtpPacket2* pkt); + // Set and get stream description for souce + void set_stream_desc(SrsRtcStreamDescription* stream_desc); + std::vector get_track_desc(std::string type, std::string media_type); }; #ifdef SRS_FFMPEG_FIT @@ -214,5 +246,242 @@ public: virtual void on_unpublish(); }; +// TODO: FIXME: Rename it. +class SrsCodecPayload +{ +public: + std::string type_; + uint8_t pt_; + std::string name_; + int sample_; + + std::vector rtcp_fbs_; +public: + SrsCodecPayload(); + SrsCodecPayload(uint8_t pt, std::string encode_name, int sample); + virtual ~SrsCodecPayload(); +public: + virtual SrsCodecPayload* copy(); + virtual SrsMediaPayloadType generate_media_payload_type(); +}; + +// TODO: FIXME: Rename it. +class SrsVideoPayload : public SrsCodecPayload +{ +public: + struct H264SpecificParameter + { + std::string profile_level_id; + std::string packetization_mode; + std::string level_asymmerty_allow; + }; + H264SpecificParameter h264_param_; + +public: + SrsVideoPayload(); + SrsVideoPayload(uint8_t pt, std::string encode_name, int sample); + virtual ~SrsVideoPayload(); +public: + virtual SrsVideoPayload* copy(); + virtual SrsMediaPayloadType generate_media_payload_type(); +public: + srs_error_t set_h264_param_desc(std::string fmtp); +}; + +// TODO: FIXME: Rename it. +class SrsAudioPayload : public SrsCodecPayload +{ + struct SrsOpusParameter + { + int minptime; + bool use_inband_fec; + bool usedtx; + }; + +public: + int channel_; + SrsOpusParameter opus_param_; +public: + SrsAudioPayload(); + SrsAudioPayload(uint8_t pt, std::string encode_name, int sample, int channel); + virtual ~SrsAudioPayload(); +public: + virtual SrsAudioPayload* copy(); + virtual SrsMediaPayloadType generate_media_payload_type(); +public: + srs_error_t set_opus_param_desc(std::string fmtp); +}; + +class SrsRtcTrackDescription +{ +public: + // type: audio, video + std::string type_; + // track_id + std::string id_; + // ssrc is the primary ssrc for this track, + // if sdp has ssrc-group, it is the first ssrc of the ssrc-group + uint32_t ssrc_; + // rtx ssrc is the second ssrc of "FEC" src-group, + // if no rtx ssrc, rtx_ssrc_ = 0. + uint32_t fec_ssrc_; + // rtx ssrc is the second ssrc of "FID" src-group, + // if no rtx ssrc, rtx_ssrc_ = 0. + uint32_t rtx_ssrc_; + // key: rtp header extension id, value: rtp header extension uri. + std::map extmaps_; + // Whether this track active. default: active. + bool is_active_; + // direction + std::string direction_; + // TODO: FIXME: whether mid is needed? + std::string mid_; + + // meida payload, such as opus, h264. + SrsCodecPayload* media_; + SrsCodecPayload* red_; + SrsCodecPayload* rtx_; + SrsCodecPayload* ulpfec_; + SrsCodecPayload* rsfec_; +public: + SrsRtcTrackDescription(); + virtual ~SrsRtcTrackDescription(); +public: + // whether or not the track has ssrc. + // for example: + // we need check track has the ssrc in the ssrc_group, then add ssrc_group to the track, + bool has_ssrc(uint32_t ssrc); +public: + void add_rtp_extension_desc(int id, std::string uri); + void set_direction(std::string direction); + void set_codec_payload(SrsCodecPayload* payload); + // auxiliary paylod include red, rtx, ulpfec, rsfec. + void create_auxiliary_payload(const std::vector payload_types); + void set_rtx_ssrc(uint32_t ssrc); + void set_fec_ssrc(uint32_t ssrc); + void set_mid(std::string mid); + int get_rtp_extension_id(std::string uri); +public: + SrsRtcTrackDescription* copy(); +public: + // find media with payload type. + SrsMediaPayloadType generate_media_payload_type(int payload_type); +}; + +class SrsRtcStreamDescription +{ +public: + // the id for this stream; + std::string id_; + + SrsRtcTrackDescription* audio_track_desc_; + std::vector video_track_descs_; +public: + SrsRtcStreamDescription(); + virtual ~SrsRtcStreamDescription(); + +public: + SrsRtcStreamDescription* copy(); + SrsRtcTrackDescription* find_track_description_by_ssrc(uint32_t ssrc); +}; + +class ISrsRtcTrack +{ +public: + ISrsRtcTrack(); + virtual ~ISrsRtcTrack(); +public: + virtual srs_error_t on_rtp(SrsRtpPacket2* pkt) = 0; +}; + +class SrsRtcRecvTrack +{ +protected: + SrsRtcTrackDescription* track_desc_; + + SrsRtcConnection* session_; + SrsRtpRingBuffer* rtp_queue_; + SrsRtpNackForReceiver* nack_receiver_; + + // send report ntp and received time. + SrsNtp last_sender_report_ntp; + uint64_t last_sender_report_sys_time; +public: + SrsRtcRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* stream_descs, bool is_audio); + virtual ~SrsRtcRecvTrack(); +public: + bool has_ssrc(uint32_t ssrc); + void update_rtt(int rtt); + void update_send_report_time(const SrsNtp& ntp); + srs_error_t send_rtcp_rr(); + srs_error_t send_rtcp_xr_rrtr(); +protected: + srs_error_t on_nack(SrsRtpPacket2* pkt); +public: + virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt); +}; + +class SrsRtcAudioRecvTrack : public SrsRtcRecvTrack +{ +public: + SrsRtcAudioRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); + virtual ~SrsRtcAudioRecvTrack(); +public: + virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt); +}; + +class SrsRtcVideoRecvTrack : public SrsRtcRecvTrack +{ +private: + bool request_key_frame_; +public: + SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* stream_descs); + virtual ~SrsRtcVideoRecvTrack(); +public: + virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt); +public: + void request_keyframe(); +}; + +class SrsRtcSendTrack +{ +protected: + // send track description + SrsRtcTrackDescription* track_desc_; + + SrsRtcConnection* session_; + // NACK ARQ ring buffer. + SrsRtpRingBuffer* rtp_queue_; +public: + SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio); + virtual ~SrsRtcSendTrack(); +public: + bool has_ssrc(uint32_t ssrc); + SrsRtpPacket2* fetch_rtp_packet(uint16_t seq); +public: + virtual srs_error_t on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt); + virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); +}; + +class SrsRtcAudioSendTrack : public SrsRtcSendTrack +{ +public: + SrsRtcAudioSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); + virtual ~SrsRtcAudioSendTrack(); +public: + virtual srs_error_t on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt); + virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); +}; + +class SrsRtcVideoSendTrack : public SrsRtcSendTrack +{ +public: + SrsRtcVideoSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); + virtual ~SrsRtcVideoSendTrack(); +public: + virtual srs_error_t on_rtp(std::vector& send_packets, SrsRtpPacket2* pkt); + virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); +}; + #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 8231fbd05..111338f04 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -353,6 +353,9 @@ #define ERROR_RTC_NO_SESSION 5022 #define ERROR_RTC_INVALID_PARAMS 5023 #define ERROR_RTC_DUMMY_BRIDGER 5024 +#define ERROR_RTC_STREM_STARTED 5025 +#define ERROR_RTC_STREAM_DESC 5026 +#define ERROR_RTC_TRACK_CODEC 5027 /////////////////////////////////////////////////////// // GB28181 API error.