diff --git a/trunk/configure b/trunk/configure index ed76da4d5..974a9163b 100755 --- a/trunk/configure +++ b/trunk/configure @@ -279,7 +279,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" "srs_app_coworkers" "srs_app_hybrid") if [[ $SRS_RTC == YES ]]; then - MODULE_FILES+=("srs_app_rtc" "srs_app_rtc_conn" "srs_app_dtls" "srs_app_audio_recode" "srs_app_sdp") + MODULE_FILES+=("srs_app_rtc" "srs_app_rtc_conn" "srs_app_dtls" "srs_app_audio_recode" "srs_app_sdp" "srs_app_rtp_queue") fi if [[ $SRS_GB28181 == YES ]]; then MODULE_FILES+=("srs_app_gb28181" "srs_app_gb28181_sip") diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 071a0b8b5..388420374 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1068,6 +1068,11 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str local_media_desc.session_info_.setup_ = "active"; } else if (remote_media_desc.session_info_.setup_ == "actpass") { local_media_desc.session_info_.setup_ = "passive"; + } else { + // @see: https://tools.ietf.org/html/rfc4145#section-4.1 + // The default value of the setup attribute in an offer/answer exchange + // is 'active' in the offer and 'passive' in the answer. + local_media_desc.session_info_.setup_ = "passive"; } if (remote_media_desc.sendonly_) { @@ -1091,6 +1096,299 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str return err; } +uint32_t SrsGoApiRtcPublish::ssrc_num = 0; + +SrsGoApiRtcPublish::SrsGoApiRtcPublish(SrsRtcServer* rtc_svr) +{ + rtc_server = rtc_svr; +} + +SrsGoApiRtcPublish::~SrsGoApiRtcPublish() +{ +} + + +// Request: +// POST /rtc/v1/publish/ +// { +// "sdp":"offer...", "streamurl":"webrtc://r.ossrs.net/live/livestream", +// "api":'http...", "clientip":"..." +// } +// Response: +// {"sdp":"answer...", "sid":"..."} +// @see https://github.com/rtcdn/rtcdn-draft +srs_error_t SrsGoApiRtcPublish::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) +{ + srs_error_t err = srs_success; + + SrsJsonObject* res = SrsJsonAny::object(); + SrsAutoFree(SrsJsonObject, res); + + if ((err = do_serve_http(w, r, res)) != srs_success) { + srs_warn("RTC error %s", srs_error_desc(err).c_str()); srs_freep(err); + return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest); + } + + return srs_api_response(w, r, res->dumps()); +} + +srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res) +{ + srs_error_t err = srs_success; + + // For each RTC session, we use short-term HTTP connection. + SrsHttpHeader* hdr = w->header(); + hdr->set("Connection", "Close"); + + // Parse req, the request json object, from body. + SrsJsonObject* req = NULL; + if (true) { + string req_json; + if ((err = r->body_read_all(req_json)) != srs_success) { + return srs_error_wrap(err, "read body"); + } + + SrsJsonAny* json = SrsJsonAny::loads(req_json); + if (!json || !json->is_object()) { + return srs_error_wrap(err, "not json"); + } + + req = json->to_object(); + } + + // Fetch params from req object. + SrsJsonAny* prop = NULL; + if ((prop = req->ensure_property_string("sdp")) == NULL) { + return srs_error_wrap(err, "not sdp"); + } + string remote_sdp_str = prop->to_str(); + + if ((prop = req->ensure_property_string("streamurl")) == NULL) { + return srs_error_wrap(err, "not streamurl"); + } + string streamurl = prop->to_str(); + + string clientip; + if ((prop = req->ensure_property_string("clientip")) != NULL) { + clientip = prop->to_str(); + } + + string api; + if ((prop = req->ensure_property_string("api")) != NULL) { + api = prop->to_str(); + } + + // Parse app and stream from streamurl. + string app; + string stream_name; + if (true) { + string tcUrl; + srs_parse_rtmp_url(streamurl, tcUrl, stream_name); + + int port; + string schema, host, vhost, param; + srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param); + } + + srs_trace("RTC publish %s, api=%s, clientip=%s, app=%s, stream=%s, offer=%dB", + streamurl.c_str(), api.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length()); + + // TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information. + SrsSdp remote_sdp; + if ((err = remote_sdp.parse(remote_sdp_str)) != srs_success) { + return srs_error_wrap(err, "parse sdp failed: %s", remote_sdp_str.c_str()); + } + + if ((err = check_remote_sdp(remote_sdp)) != srs_success) { + return srs_error_wrap(err, "remote sdp check failed"); + } + + SrsSdp local_sdp; + if ((err = exchange_sdp(app, stream_name, remote_sdp, local_sdp)) != srs_success) { + return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); + } + + SrsRequest request; + request.app = app; + request.stream = stream_name; + + // TODO: FIXME: Maybe need a better name? + // TODO: FIXME: When server enabled, but vhost disabled, should report error. + SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp, ""); + + ostringstream os; + if ((err = local_sdp.encode(os)) != srs_success) { + return srs_error_wrap(err, "encode sdp"); + } + + string local_sdp_str = os.str(); + + srs_trace("local_sdp=%s", local_sdp_str.c_str()); + + res->set("code", SrsJsonAny::integer(ERROR_SUCCESS)); + res->set("server", SrsJsonAny::integer(SrsStatistic::instance()->server_id())); + + // TODO: add candidates in response json? + + res->set("sdp", SrsJsonAny::str(local_sdp_str.c_str())); + res->set("sessionid", SrsJsonAny::str(rtc_session->id().c_str())); + + srs_trace("RTC sid=%s, offer=%dB, answer=%dB", rtc_session->id().c_str(), remote_sdp_str.length(), local_sdp_str.length()); + + return err; +} + +srs_error_t SrsGoApiRtcPublish::check_remote_sdp(const SrsSdp& remote_sdp) +{ + srs_error_t err = srs_success; + + if (remote_sdp.group_policy_ != "BUNDLE") { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "now only support BUNDLE, group policy=%s", remote_sdp.group_policy_.c_str()); + } + + if (remote_sdp.media_descs_.empty()) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no media descriptions"); + } + + for (std::vector::const_iterator iter = remote_sdp.media_descs_.begin(); iter != remote_sdp.media_descs_.end(); ++iter) { + if (iter->type_ != "audio" && iter->type_ != "video") { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "unsupport media type=%s", iter->type_.c_str()); + } + + if (! iter->rtcp_mux_) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "now only suppor rtcp-mux"); + } + + for (std::vector::const_iterator iter_media = iter->payload_types_.begin(); iter_media != iter->payload_types_.end(); ++iter_media) { + if (iter->recvonly_) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "publish API only support sendrecv/sendonly"); + } + } + } + + return err; +} + +srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp) +{ + srs_error_t err = srs_success; + local_sdp.version_ = "0"; + + local_sdp.username_ = RTMP_SIG_SRS_SERVER; + local_sdp.session_id_ = srs_int2str((int64_t)this); + local_sdp.session_version_ = "2"; + local_sdp.nettype_ = "IN"; + local_sdp.addrtype_ = "IP4"; + local_sdp.unicast_address_ = "0.0.0.0"; + + local_sdp.session_name_ = "live_publish_session"; + + local_sdp.msid_semantic_ = "WMS"; + local_sdp.msids_.push_back(app + "/" + stream); + + local_sdp.group_policy_ = "BUNDLE"; + + int mid = 0; + + for (int i = 0; i < remote_sdp.media_descs_.size(); ++i) { + const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i]; + + if (remote_media_desc.is_audio()) { + local_sdp.media_descs_.push_back(SrsMediaDesc("audio")); + } else if (remote_media_desc.is_video()) { + local_sdp.media_descs_.push_back(SrsMediaDesc("video")); + } + + SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back(); + + if (remote_media_desc.is_audio()) { + // TODO: check opus format specific param + std::vector payloads = remote_media_desc.find_media_with_encoding_name("opus"); + for (std::vector::iterator iter = payloads.begin(); iter != payloads.end(); ++iter) { + // Only choose one match opus codec. + local_media_desc.payload_types_.push_back(*iter); + break; + } + + if (local_media_desc.payload_types_.empty()) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no valid found opus payload type"); + } + + } else if (remote_media_desc.is_video()) { + std::deque backup_payloads; + std::vector payloads = remote_media_desc.find_media_with_encoding_name("H264"); + for (std::vector::iterator iter = payloads.begin(); iter != payloads.end(); ++iter) { + if (iter->format_specific_param_.empty()) { + backup_payloads.push_front(*iter); + continue; + } + H264SpecificParam h264_param; + if ((err = parse_h264_fmtp(iter->format_specific_param_, h264_param)) != srs_success) { + srs_error_reset(err); continue; + } + + // Try to pick the "best match" H.264 payload type. + if (h264_param.packetization_mode == "1" && h264_param.level_asymmerty_allow == "1") { + // Only choose first match H.264 payload type. + local_media_desc.payload_types_.push_back(*iter); + break; + } + + backup_payloads.push_back(*iter); + } + + // Try my best to pick at least one media payload type. + if (local_media_desc.payload_types_.empty() && ! backup_payloads.empty()) { + srs_warn("choose backup H.264 payload type=%d", backup_payloads.front().payload_type_); + local_media_desc.payload_types_.push_back(backup_payloads.front()); + } + + if (local_media_desc.payload_types_.empty()) { + return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no found valid H.264 payload type"); + } + + local_media_desc.payload_types_.back().rtcp_fb_.push_back("rrtr"); + } + + local_media_desc.mid_ = remote_media_desc.mid_; + local_sdp.groups_.push_back(local_media_desc.mid_); + + local_media_desc.port_ = 9; + local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF"; + + if (remote_media_desc.session_info_.setup_ == "active") { + local_media_desc.session_info_.setup_ = "passive"; + } else if (remote_media_desc.session_info_.setup_ == "passive") { + local_media_desc.session_info_.setup_ = "active"; + } else if (remote_media_desc.session_info_.setup_ == "actpass") { + local_media_desc.session_info_.setup_ = "passive"; + } else { + // @see: https://tools.ietf.org/html/rfc4145#section-4.1 + // The default value of the setup attribute in an offer/answer exchange + // is 'active' in the offer and 'passive' in the answer. + local_media_desc.session_info_.setup_ = "passive"; + } + + local_sdp.media_descs_.back().session_info_.ice_options_ = "trickle"; + + if (remote_media_desc.sendonly_) { + local_media_desc.recvonly_ = true; + } else if (remote_media_desc.recvonly_) { + local_media_desc.sendonly_ = true; + } else if (remote_media_desc.sendrecv_) { + local_media_desc.sendrecv_ = true; + } + + local_media_desc.rtcp_mux_ = true; + + SrsSSRCInfo ssrc_info; + ssrc_info.ssrc_ = ++ssrc_num; + ssrc_info.cname_ = "test_sdp_cname"; + local_media_desc.ssrc_infos_.push_back(ssrc_info); + } + + return err; +} #endif SrsGoApiClients::SrsGoApiClients() diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index b94dfa46c..f38e4a58f 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -184,6 +184,23 @@ private: srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp); srs_error_t check_remote_sdp(const SrsSdp& remote_sdp); }; + +class SrsGoApiRtcPublish : public ISrsHttpHandler +{ +public: + static uint32_t ssrc_num; +private: + SrsRtcServer* rtc_server; +public: + SrsGoApiRtcPublish(SrsRtcServer* rtc_svr); + virtual ~SrsGoApiRtcPublish(); +public: + virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); +private: + virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res); + srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp); + srs_error_t check_remote_sdp(const SrsSdp& remote_sdp); +}; #endif class SrsGoApiClients : public ISrsHttpHandler diff --git a/trunk/src/app/srs_app_rtc.cpp b/trunk/src/app/srs_app_rtc.cpp index 95cfea597..288f892e1 100644 --- a/trunk/src/app/srs_app_rtc.cpp +++ b/trunk/src/app/srs_app_rtc.cpp @@ -212,6 +212,80 @@ srs_error_t SrsRtpOpusMuxer::transcode(SrsSharedPtrMessage* shared_audio, char* return err; } +SrsRtpH264Demuxer::SrsRtpH264Demuxer() +{ +} + +SrsRtpH264Demuxer::~SrsRtpH264Demuxer() +{ +} + +srs_error_t SrsRtpH264Demuxer::parse(SrsRtpSharedPacket* rtp_pkt) +{ + srs_error_t err = srs_success; + + SrsRtpH264Header* rtp_h264_header = dynamic_cast(rtp_pkt->rtp_payload_header); + if (rtp_h264_header == NULL) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid rtp packet"); + } + + uint8_t* rtp_payload = reinterpret_cast(rtp_pkt->rtp_payload()); + int rtp_payload_size = rtp_pkt->rtp_payload_size(); + + if (rtp_payload_size == 0) { + srs_verbose("seq=%u, empty payload", rtp_pkt->rtp_header.get_sequence()); + return err; + } + + uint8_t nal_type = rtp_payload[0] & kNalTypeMask; + + if (nal_type >= 1 && nal_type <= 23) { + srs_verbose("seq=%u, single nalu", rtp_pkt->rtp_header.get_sequence()); + rtp_h264_header->is_first_packet_of_frame = true; + rtp_h264_header->is_last_packet_of_frame = true; + rtp_h264_header->nalu_type = nal_type; + rtp_h264_header->nalu_header = rtp_payload[0]; + rtp_h264_header->nalu_offset.push_back(make_pair(0, rtp_payload_size)); + } else if (nal_type == kFuA) { + srs_verbose("seq=%u, fu-a", rtp_pkt->rtp_header.get_sequence()); + if ((rtp_payload[1] & kStart)) { + rtp_h264_header->is_first_packet_of_frame = true; + } + if ((rtp_payload[1] & kEnd)) { + rtp_h264_header->is_last_packet_of_frame = true; + } + rtp_h264_header->nalu_type = nal_type; + rtp_h264_header->nalu_header = (rtp_payload[0] & (~kNalTypeMask)) | (rtp_payload[1] & kNalTypeMask); + rtp_h264_header->nalu_offset.push_back(make_pair(2, rtp_payload_size - 2)); + } else if (nal_type == kStapA) { + srs_verbose("seq=%u, stap-a", rtp_pkt->rtp_header.get_sequence()); + int i = 1; + rtp_h264_header->is_first_packet_of_frame = true; + rtp_h264_header->is_last_packet_of_frame = true; + rtp_h264_header->nalu_type = nal_type; + while (i < rtp_payload_size) { + srs_verbose("stap-a cur index=%s", srs_string_dumps_hex(reinterpret_cast(rtp_payload + i), 2).c_str()); + uint16_t nal_len = (rtp_payload[i]) << 8 | rtp_payload[i + 1]; + if (nal_len > rtp_payload_size - i) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid stap-a packet, nal len=%u, i=%d, rtp_payload_size=%d", nal_len, i, rtp_payload_size); + } + + rtp_h264_header->nalu_offset.push_back(make_pair(i + 2, nal_len)); + + i += nal_len + 2; + } + + if (i != rtp_payload_size) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid stap-a packet"); + } + } else { + srs_verbose("payload size=%d, payload=%s", rtp_payload_size, srs_string_dumps_hex(rtp_pkt->payload, rtp_pkt->size).c_str()); + return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid h264 rtp packet"); + } + + return err; +} + SrsRtc::SrsRtc() { req = NULL; diff --git a/trunk/src/app/srs_app_rtc.hpp b/trunk/src/app/srs_app_rtc.hpp index d232ca31f..7c22e04dd 100644 --- a/trunk/src/app/srs_app_rtc.hpp +++ b/trunk/src/app/srs_app_rtc.hpp @@ -78,6 +78,15 @@ public: srs_error_t transcode(SrsSharedPtrMessage* shared_audio, char* adts_audio, int nn_adts_audio); }; +class SrsRtpH264Demuxer +{ +public: + SrsRtpH264Demuxer(); + virtual ~SrsRtpH264Demuxer(); +public: + srs_error_t parse(SrsRtpSharedPacket* rtp_pkt); +}; + class SrsRtc { private: diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 813885821..eba344f55 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -54,6 +54,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -123,6 +124,44 @@ static std::vector get_candidate_ips() return candidate_ips; } +static map ssrc_lxr; + +uint64_t SrsNtp::kMagicNtpFractionalUnit = 1ULL << 32; + +SrsNtp::SrsNtp() +{ + system_ms_ = 0; + ntp_ = 0; + ntp_second_ = 0; + ntp_fractions_ = 0; +} + +SrsNtp::~SrsNtp() +{ +} + +SrsNtp SrsNtp::from_time_ms(uint64_t ms) +{ + SrsNtp srs_ntp; + srs_ntp.system_ms_ = ms; + srs_ntp.ntp_second_ = ms / 1000; + srs_ntp.ntp_fractions_ = (static_cast(ms % 1000 / 1000.0)) * kMagicNtpFractionalUnit; + srs_ntp.ntp_ = (static_cast(srs_ntp.ntp_second_) << 32) | srs_ntp.ntp_fractions_; + return srs_ntp; +} + +SrsNtp SrsNtp::to_time_ms(uint64_t ntp) +{ + SrsNtp srs_ntp; + srs_ntp.ntp_ = ntp; + srs_ntp.ntp_second_ = (ntp & 0xFFFFFFFF00000000ULL) >> 32; + srs_ntp.ntp_fractions_ = (ntp & 0x00000000FFFFFFFFULL); + srs_ntp.system_ms_ = (static_cast(srs_ntp.ntp_second_) * 1000) + + (static_cast(static_cast(srs_ntp.ntp_fractions_) * 1000.0) / kMagicNtpFractionalUnit); + return srs_ntp; +} + + SrsDtlsSession::SrsDtlsSession(SrsRtcSession* s) { rtc_session = s; @@ -1415,6 +1454,356 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes return err; } +SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) +{ + rtc_session = session; + rtp_h264_demuxer = new SrsRtpH264Demuxer(); + rtp_video_queue = new SrsRtpQueue(1000); + rtp_audio_queue = new SrsRtpQueue(100, true); + + source = NULL; +} + +SrsRtcPublisher::~SrsRtcPublisher() +{ + srs_freep(rtp_h264_demuxer); + srs_freep(rtp_video_queue); + srs_freep(rtp_audio_queue); +} + +void SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsRequest request) +{ + video_ssrc = vssrc; + audio_ssrc = assrc; + this->request = request; + + srs_verbose("video_ssrc=%u, audio_ssrc=%u", video_ssrc, audio_ssrc); +} + +srs_error_t SrsRtcPublisher::on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf) +{ + srs_error_t err = srs_success; + + SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket(); + SrsAutoFree(SrsRtpSharedPacket, rtp_shared_pkt); + if ((err = rtp_shared_pkt->decode(buf, nb_buf)) != srs_success) { + return srs_error_wrap(err, "rtp packet decode failed"); + } + + uint32_t ssrc = rtp_shared_pkt->rtp_header.get_ssrc(); + + if (ssrc == audio_ssrc) { + return on_audio(skt, rtp_shared_pkt); + } else if (ssrc == video_ssrc) { + return on_video(skt, rtp_shared_pkt); + } + + return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc); +} + +srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt) +{ + srs_error_t err = srs_success; + return err; + + vector nack_seqs; + rtp_audio_queue->nack_.get_nack_seqs(nack_seqs); + vector::iterator iter = nack_seqs.begin(); + while (iter != nack_seqs.end()) { + char buf[1024]; + SrsBuffer stream(buf, sizeof(buf)); + stream.write_1bytes(0x81); + stream.write_1bytes(kRtpFb); + stream.write_2bytes(3); + stream.write_4bytes(audio_ssrc); + stream.write_4bytes(audio_ssrc); + uint16_t pid = *iter; + uint16_t blp = 0; + srs_verbose("pid=%u", pid); + while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) { + blp |= (1 << (*(iter + 1) - pid - 1)); + srs_verbose("blp=%u", *(iter+1)); + ++iter; + } + stream.write_2bytes(pid); + stream.write_2bytes(blp); + + srs_verbose("nack dump=%s", srs_string_dumps_hex(stream.data(), stream.pos()).c_str()); + + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + + if (rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { + skt->sendto(protected_buf, nb_protected_buf, 0); + //skt->sendto(stream.data(), stream.pos(), 0); + srs_verbose("send nack req, len=%d", nb_protected_buf); + } else { + srs_verbose("send nack failed, because of protect rtcp failed"); + } + ++iter; + } + + rtp_pkt->rtp_payload_header = new SrsRtpOpusHeader(); + rtp_pkt->rtp_payload_header->is_first_packet_of_frame = true; + rtp_pkt->rtp_payload_header->is_last_packet_of_frame = true; + + rtp_audio_queue->insert(rtp_pkt); + + std::vector > frames; + rtp_audio_queue->get_and_clean_collected_frames(frames); + + for (size_t i = 0; i < frames.size(); ++i) { + if (! frames[i].empty()) { + srs_verbose("collect %d audio frames, seq range %u,%u", + frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence()); + } + + for (size_t n = 0; n < frames[i].size(); ++n) { + srs_freep(frames[i][n]); + } + } + + return err; +} + +srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt) +{ + srs_error_t err = srs_success; + + vector nack_seqs; + rtp_video_queue->nack_.get_nack_seqs(nack_seqs); + vector::iterator iter = nack_seqs.begin(); + while (iter != nack_seqs.end()) { + char buf[1024]; + SrsBuffer stream(buf, sizeof(buf)); + stream.write_1bytes(0x81); + stream.write_1bytes(kRtpFb); + stream.write_2bytes(3); + stream.write_4bytes(video_ssrc); + stream.write_4bytes(video_ssrc); + uint16_t pid = *iter; + uint16_t blp = 0; + srs_verbose("pid=%u", pid); + while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) { + blp |= (1 << (*(iter + 1) - pid - 1)); + srs_verbose("blp=%u", *(iter+1)); + ++iter; + } + stream.write_2bytes(pid); + stream.write_2bytes(blp); + + srs_verbose("nack dump=%s", srs_string_dumps_hex(stream.data(), stream.pos()).c_str()); + + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + + if (rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { + skt->sendto(protected_buf, nb_protected_buf, 0); + //skt->sendto(stream.data(), stream.pos(), 0); + srs_verbose("send nack req, len=%d", nb_protected_buf); + } else { + srs_verbose("send nack failed, because of protect rtcp failed"); + } + ++iter; + } + + rtp_pkt->rtp_payload_header = new SrsRtpH264Header(); + + if ((err = rtp_h264_demuxer->parse(rtp_pkt)) != srs_success) { + return srs_error_wrap(err, "rtp h264 demux failed"); + } + + rtp_video_queue->insert(rtp_pkt); + + std::vector > frames; + rtp_video_queue->get_and_clean_collected_frames(frames); + + for (size_t i = 0; i < frames.size(); ++i) { + if (! frames[i].empty()) { + srs_verbose("collect %d video frames, seq range %u,%u", frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence()); + } + int frame_size = 5; + vector nalu_len; + uint32_t len = 0; + for (size_t n = 0; n < frames[i].size(); ++n) { + SrsRtpH264Header* rtp_h264_header = dynamic_cast(frames[i][n]->rtp_payload_header); + for (size_t j = 0; j < rtp_h264_header->nalu_offset.size(); ++j) { + if (rtp_h264_header->nalu_type != kFuA) { + uint8_t* p = reinterpret_cast(frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first); + if (((p[0] & kNalTypeMask) != SrsAvcNaluTypeAccessUnitDelimiter) && + ((p[0] & kNalTypeMask) != SrsAvcNaluTypeSEI) && + ((p[0] & kNalTypeMask) != SrsAvcNaluTypeSPS) && + ((p[0] & kNalTypeMask) != SrsAvcNaluTypePPS)) { + frame_size += rtp_h264_header->nalu_offset[j].second + 4; + nalu_len.push_back(rtp_h264_header->nalu_offset[j].second); + } + } else { + if (frames[i][n]->rtp_payload_header->is_first_packet_of_frame) { + frame_size += 5; + len += 1; + } + frame_size += rtp_h264_header->nalu_offset[j].second; + len += rtp_h264_header->nalu_offset[j].second; + + if (frames[i][n]->rtp_payload_header->is_last_packet_of_frame) { + nalu_len.push_back(len); + len = 0; + } + } + } + } + + uint8_t* frame = new uint8_t[frame_size]; + int frame_len = 5; + + bool video_header_change = false; + int64_t timestamp = 0; + + bool idr = false; + size_t len_index = 0; + for (size_t n = 0; n < frames[i].size(); ++n) { + SrsRtpH264Header* rtp_h264_header = dynamic_cast(frames[i][n]->rtp_payload_header); + for (size_t j = 0; j < rtp_h264_header->nalu_offset.size(); ++j) { + timestamp = frames[i][n]->rtp_header.get_timestamp(); + + uint8_t* p = reinterpret_cast(frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first); + srs_verbose("nalu_type=%u, %02X", rtp_h264_header->nalu_type, p[0]); + if (rtp_h264_header->nalu_type != kFuA) { + if ((p[0] & kNalTypeMask) == SrsAvcNaluTypeSPS) { + srs_verbose("sps"); + string cur_sps = string((char*)p, rtp_h264_header->nalu_offset[j].second); + if (! cur_sps.empty() && sps != cur_sps) { + video_header_change = true; + sps = cur_sps; + } + } else if ((p[0] & kNalTypeMask) == SrsAvcNaluTypePPS) { + srs_verbose("pps"); + string cur_pps = string((char*)p, rtp_h264_header->nalu_offset[j].second); + if (! cur_pps.empty() && pps != cur_pps) { + video_header_change = true; + pps = cur_pps; + } + } else if (((p[0] & kNalTypeMask) != SrsAvcNaluTypeAccessUnitDelimiter) && ((p[0] & kNalTypeMask) != SrsAvcNaluTypeSEI)) { + uint32_t len = nalu_len[len_index++]; + srs_verbose("nalu len=%u", len); + SrsBuffer stream((char*)frame + frame_len, 4); + stream.write_4bytes(len); + frame_len += 4; + memcpy(frame + frame_len, p, rtp_h264_header->nalu_offset[j].second); + frame_len += rtp_h264_header->nalu_offset[j].second; + } + } else { + if (frames[i][n]->rtp_payload_header->is_first_packet_of_frame) { + uint32_t len = nalu_len[len_index++]; + srs_verbose("nalu len=%u", len); + SrsBuffer stream((char*)frame + frame_len, 4); + stream.write_4bytes(len); + frame_len += 4; + frame[frame_len++] = rtp_h264_header->nalu_header; + + if ((rtp_h264_header->nalu_header & kNalTypeMask) == SrsAvcNaluTypeIDR) { + srs_verbose("idr"); + idr = true; + } + } + memcpy(frame + frame_len, frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first, + rtp_h264_header->nalu_offset[j].second); + frame_len += rtp_h264_header->nalu_offset[j].second; + } + } + srs_freep(frames[i][n]); + } + + if (video_header_change) { + srs_verbose("sps/pps change or init"); + if (source == NULL) { + // TODO: FIXME: Should refactor it, directly use http server as handler. + ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); + if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + source->on_publish(); + } + + uint8_t* video_header = new uint8_t[1500]; + SrsBuffer *stream = new SrsBuffer((char*)video_header, 1500); + SrsAutoFree(SrsBuffer, stream); + stream->write_1bytes(0x17); + stream->write_1bytes(0x00); + stream->write_1bytes(0x00); + stream->write_1bytes(0x00); + stream->write_1bytes(0x00); + + // NAL SIZE 61 76 63 43 01 42 C0 1E FF E1 SPS_LEN SPS 01 PPS_LEN PPS + stream->write_1bytes(0x01); + stream->write_1bytes(0x42); + stream->write_1bytes(0xC0); + stream->write_1bytes(0x1E); + stream->write_1bytes(0xFF); + stream->write_1bytes(0xE1); + + stream->write_2bytes(sps.size()); + stream->write_string(sps); + + stream->write_1bytes(0x01); + + stream->write_2bytes(pps.size()); + stream->write_string(pps); + + SrsMessageHeader header; + header.message_type = 9; + header.timestamp = timestamp / 90; + SrsCommonMessage* shared_video = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, shared_video); + shared_video->create(&header, reinterpret_cast(video_header), stream->pos()); + srs_error_t e = source->on_video(shared_video); + if (e != srs_success) { + srs_warn("on video header err=%s", srs_error_desc(e).c_str()); + } + + srs_verbose("rtp on video header"); + } + + if (! sps.empty() && ! pps.empty()) { + if (source == NULL) { + // TODO: FIXME: Should refactor it, directly use http server as handler. + ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); + if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + source->on_publish(); + } + + if (idr) { + frame[0] = 0x17; + } else { + frame[0] = 0x27; + } + frame[1] = 0x01; + frame[2] = 0x00; + frame[3] = 0x00; + frame[4] = 0x00; + + SrsMessageHeader header; + header.message_type = 9; + header.timestamp = timestamp / 90; + SrsCommonMessage* shared_video = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, shared_video); + shared_video->create(&header, reinterpret_cast(frame), frame_len); + srs_error_t e = source->on_video(shared_video); + if (e != srs_success) { + srs_warn("on video err=%s", srs_error_desc(e).c_str()); + } + + srs_verbose("rtp on video"); + } + } + + return err; +} + + SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) { rtc_server = rtc_svr; @@ -1435,6 +1824,8 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const // TODO: FIXME: Support reload. sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req.vhost); + + rtc_publisher = new SrsRtcPublisher(this); } SrsRtcSession::~SrsRtcSession() @@ -1686,6 +2077,263 @@ srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxS return err; } +srs_error_t SrsRtcSession::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt) +{ + srs_error_t err = srs_success; + + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P|reserved | PT=XR=207 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + : report blocks : + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + + SrsBuffer stream(buf, nb_buf); + uint8_t first = stream.read_1bytes(); + uint8_t pt = stream.read_1bytes(); + uint16_t length = (stream.read_2bytes() + 1) * 4; + uint32_t ssrc = stream.read_4bytes(); + + if (length != nb_buf) { + return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet, length=%u, nb_buf=%d", length, nb_buf); + } + + while (stream.pos() + 4 < length) { + uint8_t bt = stream.read_1bytes(); + stream.skip(1); + uint16_t block_length = (stream.read_2bytes() + 1) * 4; + srs_verbose("XR, bt=%u", bt); + + if (stream.pos() + block_length - 4 > nb_buf) { + return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet block, block_length=%u, nb_buf=%d", block_length, nb_buf); + } + + if (bt == 5) { + for (int i = 4; i < block_length; i += 12) { + uint32_t ssrc = stream.read_4bytes(); + uint32_t lrr = stream.read_4bytes(); + uint32_t dlrr = stream.read_4bytes(); + + SrsNtp cur_ntp = SrsNtp::from_time_ms(srs_update_system_time()/1000); + uint32_t compact_ntp = (cur_ntp.ntp_second_ << 16) | (cur_ntp.ntp_fractions_ >> 16); + + int rtt_ntp = compact_ntp - lrr - dlrr; + int rtt = ((rtt_ntp * 1000) >> 16) + ((rtt_ntp >> 16) * 1000); + /* + lsr = (srs_ntp.ntp_second_ << 16) | (srs_ntp.ntp_fractions_ >> 16); + uint32_t dlsr = (srs_update_system_time() - ssrc_lsr[ssrc_of_sender]) / 1000; + rr_dlsr = ((dlsr / 1000) << 16) | ((dlsr % 1000) * 65536 / 1000); + */ + srs_verbose("ssrc=%u, compact_ntp=%u, lrr=%u, dlrr=%u, rtt=%d", + ssrc, compact_ntp, lrr, dlrr, rtt); + + if (ssrc == rtc_publisher->video_ssrc) { + rtc_publisher->rtp_video_queue->update_rtt(rtt); + } else if (ssrc == rtc_publisher->audio_ssrc) { + rtc_publisher->rtp_audio_queue->update_rtt(rtt); + } + } + } + } + + return err; +} + +srs_error_t SrsRtcSession::on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt) +{ + srs_error_t err = srs_success; + + if (nb_buf < 28) { + return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp sender report packet, nb_buf=%d", nb_buf); + } + + SrsBuffer* stream = new SrsBuffer(buf, nb_buf); + SrsAutoFree(SrsBuffer, stream); + + srs_verbose("SS=%s", srs_string_dumps_hex(buf, nb_buf).c_str()); + + // @see: https://tools.ietf.org/html/rfc3550#section-6.4.1 + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +header |V=2|P| RC | PT=SR=200 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of sender | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +sender | NTP timestamp, most significant word | +info +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NTP timestamp, least significant word | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | RTP timestamp | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | sender's packet count | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | sender's octet count | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +report | SSRC_1 (SSRC of first source) | +block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 1 | fraction lost | cumulative number of packets lost | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | extended highest sequence number received | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | interarrival jitter | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | last SR (LSR) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | delay since last SR (DLSR) | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +report | SSRC_2 (SSRC of second source) | +block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 2 : ... : + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | profile-specific extensions | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + uint8_t first = stream->read_1bytes(); + //uint8_t version = first & 0xC0; + //uint8_t padding = first & 0x20; + uint8_t rc = first & 0x1F; + + /*uint8_t payload_type = */stream->read_1bytes(); + uint16_t length = stream->read_2bytes(); + + if (((length + 1) * 4) != (rc * 24 + 28)) { + return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtcp sender report packet, length=%u, rc=%u", length, rc); + } + + uint32_t ssrc_of_sender = stream->read_4bytes(); + uint64_t ntp = stream->read_8bytes(); + SrsNtp srs_ntp = SrsNtp::to_time_ms(ntp); + uint32_t rtp_time = stream->read_4bytes(); + uint32_t sender_packet_count = stream->read_4bytes(); + uint32_t sender_octec_count = stream->read_4bytes(); + + SrsNtp cur_ntp = SrsNtp::from_time_ms(srs_update_system_time()/1000); + + srs_verbose("sender report, ssrc_of_sender=%u, ntp={%lu,%lu}, cur_ntp={%lu,%lu},rtp_time=%u, sender_packet_count=%u, sender_octec_count=%u", + ssrc_of_sender, srs_ntp.ntp_, srs_ntp.system_ms_, cur_ntp.ntp_, cur_ntp.system_ms_, rtp_time, sender_packet_count, sender_octec_count); + + for (int i = 0; i < rc; ++i) { + uint32_t ssrc = stream->read_4bytes(); + uint8_t fraction_lost = stream->read_1bytes(); + uint32_t cumulative_number_of_packets_lost = stream->read_3bytes(); + uint32_t highest_seq = stream->read_4bytes(); + uint32_t jitter = stream->read_4bytes(); + uint32_t lst = stream->read_4bytes(); + uint32_t dlsr = stream->read_4bytes(); + + (void)ssrc; (void)fraction_lost; (void)cumulative_number_of_packets_lost; (void)highest_seq; (void)jitter; (void)lst; (void)dlsr; + srs_verbose("sender report, ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, highest_seq=%u, jitter=%u, lst=%u, dlst=%u", + ssrc, fraction_lost, cumulative_number_of_packets_lost, highest_seq, jitter, lst, dlsr); + } + + static map ssrc_lsr; + + // Response RR + { + char buf[1024]; + SrsBuffer stream(buf, sizeof(buf)); + stream.write_1bytes(0x81); + stream.write_1bytes(kRR); + stream.write_2bytes(7); + stream.write_4bytes(ssrc_of_sender); + + SrsRtpQueue* rtp_queue = NULL; + string type = "unknown"; + if (ssrc_of_sender == rtc_publisher->video_ssrc) { + rtp_queue = rtc_publisher->rtp_video_queue; + type = "video"; + } else if (ssrc_of_sender == rtc_publisher->audio_ssrc) { + rtp_queue = rtc_publisher->rtp_audio_queue; + type = "audio"; + } + + uint8_t fraction_lost = rtp_queue->get_fraction_lost(); + uint32_t cumulative_number_of_packets_lost = rtp_queue->get_cumulative_number_of_packets_lost() & 0x7FFFFF; + uint32_t extended_highest_sequence = rtp_queue->get_extended_highest_sequence(); + uint32_t interarrival_jitter = rtp_queue->get_interarrival_jitter(); + + uint32_t lsr = 0; + uint32_t rr_dlsr = 0; + if (ssrc_lsr[ssrc_of_sender] > 0) { + lsr = (srs_ntp.ntp_second_ << 16) | (srs_ntp.ntp_fractions_ >> 16); + uint32_t dlsr = (srs_update_system_time() - ssrc_lsr[ssrc_of_sender]) / 1000; + rr_dlsr = ((dlsr / 1000) << 16) | ((dlsr % 1000) * 65536 / 1000); + } + + + stream.write_4bytes(ssrc_of_sender); + stream.write_1bytes(fraction_lost); + stream.write_3bytes(cumulative_number_of_packets_lost); + stream.write_4bytes(extended_highest_sequence); + stream.write_4bytes(interarrival_jitter); + stream.write_4bytes(lsr); + stream.write_4bytes(rr_dlsr); + + srs_verbose("RR type=%s, ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, extended_highest_sequence=%u, interarrival_jitter=%u", + type.c_str(), ssrc_of_sender, fraction_lost, cumulative_number_of_packets_lost, extended_highest_sequence, interarrival_jitter); + + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + if (dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { + skt->sendto(protected_buf, nb_protected_buf, 0); + } + + // XR + { + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P|reserved | PT=XR=207 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + : report blocks : + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | BT=4 | reserved | block length = 2 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NTP timestamp, most significant word | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NTP timestamp, least significant word | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + char buf[1024]; + SrsBuffer stream(buf, sizeof(buf)); + stream.write_1bytes(0x80); + stream.write_1bytes(kXR); + stream.write_2bytes(4); + stream.write_4bytes(ssrc_of_sender); + stream.write_1bytes(4); + stream.write_1bytes(0); + stream.write_2bytes(2); + stream.write_4bytes(cur_ntp.ntp_second_); + stream.write_4bytes(cur_ntp.ntp_fractions_); + ssrc_lxr[ssrc_of_sender] = srs_ntp.system_ms_; + //skt->sendto(stream.data(), stream.pos(), 0); + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + if (dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { + skt->sendto(protected_buf, nb_protected_buf, 0); + } + } + } + + ssrc_lsr[ssrc_of_sender] = srs_update_system_time(); + + return err; +} + srs_error_t SrsRtcSession::on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; @@ -1757,6 +2405,31 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* skt) { + // FIXME: + if (true) + { + uint32_t video_ssrc = 0; + uint32_t audio_ssrc = 0; + uint16_t video_payload_type = 0; + uint16_t audio_payload_type = 0; + for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { + const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i]; + if (media_desc.is_audio()) { + if (! media_desc.ssrc_infos_.empty()) { + audio_ssrc = media_desc.ssrc_infos_[0].ssrc_; + audio_payload_type = media_desc.payload_types_[0].payload_type_; + } + } else if (media_desc.is_video()) { + if (! media_desc.ssrc_infos_.empty()) { + video_ssrc = media_desc.ssrc_infos_[0].ssrc_; + video_payload_type = media_desc.payload_types_[0].payload_type_; + } + } + } + + rtc_publisher->initialize(video_ssrc, audio_ssrc, request); + } + srs_trace("rtc session=%s, to=%dms connection established", id().c_str(), srsu2msi(sessionStunTimeout)); return start_play(skt); } @@ -1834,6 +2507,7 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) switch (payload_type) { case kSR: { + err = on_rtcp_sender_report(ph, length, skt); break; } case kRR: { @@ -1856,6 +2530,10 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) case kPsFb: { err = on_rtcp_ps_feedback(ph, length, skt); break; + } + case kXR: { + err = on_rtcp_xr(ph, length, skt); + break; } default:{ return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type); @@ -1874,6 +2552,23 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) return err; } +srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* skt) +{ + srs_error_t err = srs_success; + + if (dtls_session == NULL) { + return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done"); + } + + char* unprotected_buf = new char[1460]; + int nb_unprotected_buf = skt->size(); + if ((err = dtls_session->unprotect_rtp(unprotected_buf, skt->data(), nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "rtp unprotect failed"); + } + + return rtc_publisher->on_rtp(skt, unprotected_buf, nb_unprotected_buf); +} + SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s) { lfd = NULL; @@ -2256,7 +2951,11 @@ srs_error_t SrsRtcServer::listen_api() // TODO: FIXME: Fetch api from hybrid manager. SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server(); if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) { - return srs_error_wrap(err, "handle sdp"); + return srs_error_wrap(err, "handle play"); + } + + if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) { + return srs_error_wrap(err, "handle publish"); } return err; @@ -2369,9 +3068,7 @@ srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* skt) if (is_rtcp(reinterpret_cast(skt->data()), skt->size())) { err = rtc_session->on_rtcp(skt); } else { - // We disable it because no RTP for player. - // see https://github.com/ossrs/srs/blob/018577e685a07d9de7a47354e7a9c5f77f5f4202/trunk/src/app/srs_app_rtc_conn.cpp#L1081 - // err = rtc_session->on_rtp(skt); + err = rtc_session->on_rtp(skt); } return err; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index c3796c711..6f6d390f7 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -51,6 +51,8 @@ class SrsSharedPtrMessage; class SrsSource; class SrsRtpPacket2; class ISrsUdpSender; +class SrsRtpQueue; +class SrsRtpH264Demuxer; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -61,6 +63,7 @@ const uint8_t kApp = 204; // @see: https://tools.ietf.org/html/rfc4585#section-6.1 const uint8_t kRtpFb = 205; const uint8_t kPsFb = 206; +const uint8_t kXR = 207; // @see: https://tools.ietf.org/html/rfc4585#section-6.3 const uint8_t kPLI = 1; @@ -68,6 +71,23 @@ const uint8_t kSLI = 2; const uint8_t kRPSI = 3; const uint8_t kAFB = 15; +class SrsNtp +{ +public: + uint64_t system_ms_; + uint64_t ntp_; + uint32_t ntp_second_; + uint32_t ntp_fractions_; +public: + SrsNtp(); + virtual ~SrsNtp(); +public: + static SrsNtp from_time_ms(uint64_t ms); + static SrsNtp to_time_ms(uint64_t ntp); +public: + static uint64_t kMagicNtpFractionalUnit; +}; + enum SrsRtcSessionStateType { // TODO: FIXME: Should prefixed by enum name. @@ -232,9 +252,37 @@ private: srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets); }; +class SrsRtcPublisher +{ + friend class SrsRtcSession; +private: + SrsRtcSession* rtc_session; + uint32_t video_ssrc; + uint32_t audio_ssrc; +private: + SrsRtpH264Demuxer* rtp_h264_demuxer; + SrsRtpQueue* rtp_video_queue; + SrsRtpQueue* rtp_audio_queue; +private: + SrsRequest request; + SrsSource* source; + std::string sps; + std::string pps; +public: + SrsRtcPublisher(SrsRtcSession* session); + virtual ~SrsRtcPublisher(); +public: + void initialize(uint32_t vssrc, uint32_t assrc, SrsRequest request); + srs_error_t on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf); +private: + srs_error_t on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt); + srs_error_t on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt); +}; + class SrsRtcSession { friend class SrsRtcSenderThread; + friend class SrsRtcPublisher; private: SrsRtcServer* rtc_server; SrsSdp remote_sdp; @@ -258,6 +306,8 @@ private: public: SrsRequest request; SrsSource* source; +private: + SrsRtcPublisher* rtc_publisher; public: SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id); virtual ~SrsRtcSession(); @@ -283,6 +333,7 @@ public: srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req); srs_error_t on_dtls(SrsUdpMuxSocket* skt); srs_error_t on_rtcp(SrsUdpMuxSocket* skt); + srs_error_t on_rtp(SrsUdpMuxSocket* skt); public: srs_error_t send_client_hello(SrsUdpMuxSocket* skt); srs_error_t on_connection_established(SrsUdpMuxSocket* skt); @@ -296,6 +347,8 @@ private: private: srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt); srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt); + srs_error_t on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt); + srs_error_t on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt); srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt); }; diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp new file mode 100644 index 000000000..393c23263 --- /dev/null +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -0,0 +1,390 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 John + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include + +using namespace std; + +#include +#include +#include +#include + +SrsRtpNackInfo::SrsRtpNackInfo() +{ + generate_time_ = srs_update_system_time(); + pre_req_nack_time_ = 0; + req_nack_count_ = 0; +} + +SrsRtpNackList::SrsRtpNackList(SrsRtpQueue* rtp_queue) +{ + rtp_queue_ = rtp_queue; + pre_check_time_ = 0; + + srs_info("nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%ld, nack_interval=%ld" + opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval); +} + +SrsRtpNackList::~SrsRtpNackList() +{ +} + +void SrsRtpNackList::insert(uint16_t seq) +{ + // FIXME: full, drop packet, and request key frame. + SrsRtpNackInfo& nack_info = queue_[seq]; +} + +void SrsRtpNackList::remove(uint16_t seq) +{ + queue_.erase(seq); +} + +SrsRtpNackInfo* SrsRtpNackList::find(uint16_t seq) +{ + std::map::iterator iter = queue_.find(seq); + + if (iter == queue_.end()) { + return NULL; + } + + return &(iter->second); +} + +void SrsRtpNackList::dump() +{ + return; + srs_verbose("@debug, queue size=%u", queue_.size()); + for (std::map::iterator iter = queue_.begin(); iter != queue_.end(); ++iter) { + srs_verbose("@debug, nack seq=%u", iter->first); + } +} + +void SrsRtpNackList::get_nack_seqs(vector& seqs) +{ + srs_utime_t now = srs_update_system_time(); + int interval = now - pre_check_time_; + if (interval < opts_.nack_interval / 2) { + return; + } + + pre_check_time_ = now; + std::map::iterator iter = queue_.begin(); + while (iter != queue_.end()) { + const uint16_t& seq = iter->first; + SrsRtpNackInfo& nack_info = iter->second; + + int alive_time = now - nack_info.generate_time_; + if (alive_time > opts_.max_alive_time || nack_info.req_nack_count_ > opts_.max_count) { + srs_verbose("NACK, drop seq=%u alive time %d bigger than max_alive_time=%d OR nack count %d bigger than %d", + seq, alive_time, opts_.max_alive_time, nack_info.req_nack_count_, opts_.max_count); + + rtp_queue_->notify_drop_seq(seq); + queue_.erase(iter++); + continue; + } + + // TODO:Statistics unorder packet. + if (now - nack_info.generate_time_ < opts_.first_nack_interval) { + break; + } + + if (now - nack_info.pre_req_nack_time_ >= opts_.nack_interval && nack_info.req_nack_count_ <= opts_.max_count) { + ++nack_info.req_nack_count_; + nack_info.pre_req_nack_time_ = now; + seqs.push_back(seq); + srs_verbose("NACK, resend seq=%u, count=%d", seq, nack_info.req_nack_count_); + } + + ++iter; + } +} + +void SrsRtpNackList::update_rtt(int rtt) +{ + rtt_ = rtt; + srs_verbose("NACK, update rtt from %ld to %d", opts_.nack_interval, rtt); + // FIXME: limit min and max value. + opts_.nack_interval = rtt_; +} + +SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) + : nack_(this) +{ + capacity_ = capacity; + head_sequence_ = 0; + highest_sequence_ = 0; + initialized_ = false; + start_collected_ = false; + queue_ = new SrsRtpSharedPacket*[capacity_]; + memset(queue_, 0, sizeof(SrsRtpSharedPacket*) * capacity); + + cycle_ = 0; + jitter_ = 0; + last_trans_time_ = 0; + + pre_number_of_packet_received_ = 0; + pre_number_of_packet_lossed_ = 0; + + num_of_packet_received_ = 0; + number_of_packet_lossed_ = 0; + + one_packet_per_frame_ = one_packet_per_frame; +} + +SrsRtpQueue::~SrsRtpQueue() +{ + srs_freepa(queue_); +} + +srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) +{ + srs_error_t err = srs_success; + + uint16_t seq = rtp_pkt->rtp_header.get_sequence(); + + srs_utime_t now = srs_update_system_time(); + + // First packet recv, init head_sequence and highest_sequence. + if (! initialized_) { + initialized_ = true; + head_sequence_ = seq; + highest_sequence_ = seq; + + ++num_of_packet_received_; + + last_trans_time_ = now/1000 - rtp_pkt->rtp_header.get_timestamp()/90; + } else { + SrsRtpNackInfo* nack_info = NULL; + if ((nack_info = nack_.find(seq)) != NULL) { + srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success", seq, now - nack_info->generate_time_, nack_info->req_nack_count_); + nack_.remove(seq); + } else { + // Calc jitter. + { + int trans_time = now/1000 - rtp_pkt->rtp_header.get_timestamp()/90; + + int cur_jitter = trans_time - last_trans_time_; + if (cur_jitter < 0) { + cur_jitter = -cur_jitter; + } + + last_trans_time_ = trans_time; + + jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast(cur_jitter) / 16.0); + srs_verbose("jitter=%.2f", jitter_); + } + + ++num_of_packet_received_; + // seq > highest_sequence_ + if (seq_cmp(highest_sequence_, seq)) { + insert_into_nack_list(highest_sequence_ + 1, seq); + + if (seq < highest_sequence_) { + srs_verbose("warp around, cycle=%lu", cycle_); + ++cycle_; + } + highest_sequence_ = seq; + } else { + // Because we don't know the ISN(initiazlie sequence number), the first packet + // we received maybe no the first paacet client sented. + if (! start_collected_) { + if (seq_cmp(seq, head_sequence_)) { + srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", head_sequence_, seq); + head_sequence_ = seq; + } + insert_into_nack_list(seq + 1, highest_sequence_); + } else { + srs_verbose("seq=%u, rtx success, too old", seq); + } + } + } + } + + int delay = highest_sequence_ - head_sequence_ + 1; + srs_verbose("seqs range=[%u-%u], delay=%d", head_sequence_, highest_sequence_, delay); + + // Check seqs out of range. + if (head_sequence_ + capacity_ < highest_sequence_) { + srs_verbose("try collect packet becuase seq out of range"); + collect_packet(); + } + while (head_sequence_ + capacity_ < highest_sequence_) { + srs_trace("seqs out of range, head seq=%u, hightest seq=%u", head_sequence_, highest_sequence_); + remove(head_sequence_); + uint16_t s = head_sequence_ + 1; + for ( ; s != highest_sequence_; ++s) { + SrsRtpSharedPacket*& pkt = queue_[s % capacity_]; + // Choose the new head sequence. Must be the first packet of frame. + if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) { + srs_trace("find except, update head seq from %u to %u when seqs out of range", head_sequence_, s); + head_sequence_ = s; + break; + } + + // Drop the seq. + nack_.remove(s); + srs_verbose("seqs out of range, drop seq=%u", s); + if (pkt && pkt->rtp_header.get_sequence() == s) { + delete pkt; + pkt = NULL; + } + } + srs_trace("force update, update head seq from %u to %u when seqs out of range", head_sequence_, s); + head_sequence_ = s; + } + + SrsRtpSharedPacket* old_pkt = queue_[seq % capacity_]; + if (old_pkt) { + delete old_pkt; + } + + queue_[seq % capacity_] = rtp_pkt->copy(); + + // Marker bit means the last packet of frame received. + if (rtp_pkt->rtp_header.get_marker() || (highest_sequence_ - head_sequence_ >= capacity_ / 2) || one_packet_per_frame_) { + collect_packet(); + } + + return err; +} + +srs_error_t SrsRtpQueue::remove(uint16_t seq) +{ + srs_error_t err = srs_success; + + SrsRtpSharedPacket*& pkt = queue_[seq % capacity_]; + if (pkt && pkt->rtp_header.get_sequence() == seq) { + delete pkt; + pkt = NULL; + } + + return err; +} + +void SrsRtpQueue::get_and_clean_collected_frames(std::vector >& frames) +{ + frames.swap(frames_); +} + +void SrsRtpQueue::notify_drop_seq(uint16_t seq) +{ + uint16_t s = seq + 1; + for ( ; s != highest_sequence_; ++s) { + SrsRtpSharedPacket* pkt = queue_[s % capacity_]; + if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) { + break; + } + } + + srs_verbose("drop seq=%u, highest seq=%u, update head seq %u to %u", seq, highest_sequence_, head_sequence_, s); + head_sequence_ = s; +} + +uint32_t SrsRtpQueue::get_extended_highest_sequence() +{ + return cycle_ * 65536 + highest_sequence_; +} + +uint8_t SrsRtpQueue::get_fraction_lost() +{ + int64_t total = (number_of_packet_lossed_ - pre_number_of_packet_lossed_ + num_of_packet_received_ - pre_number_of_packet_received_); + uint8_t loss = 0; + if (total > 0) { + loss = (number_of_packet_lossed_ - pre_number_of_packet_lossed_) * 256 / total; + } + + pre_number_of_packet_lossed_ = number_of_packet_lossed_; + pre_number_of_packet_received_ = num_of_packet_received_; + + return loss; +} + +uint32_t SrsRtpQueue::get_cumulative_number_of_packets_lost() +{ + return number_of_packet_lossed_; +} + +uint32_t SrsRtpQueue::get_interarrival_jitter() +{ + return static_cast(jitter_); +} + +void SrsRtpQueue::update_rtt(int rtt) +{ + nack_.update_rtt(rtt); +} + +void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end) +{ + for (uint16_t s = seq_start; s != seq_end; ++s) { + srs_verbose("loss seq=%u, insert into nack list", s); + nack_.insert(s); + ++number_of_packet_lossed_; + } + + // FIXME: Record key frame sequence. + // FIXME: When nack list too long, clear and send PLI. +} + +void SrsRtpQueue::collect_packet() +{ + vector frame; + for (uint16_t s = head_sequence_; s != highest_sequence_; ++s) { + SrsRtpSharedPacket* pkt = queue_[s % capacity_]; + + nack_.dump(); + + if (nack_.find(s) != NULL) { + srs_verbose("seq=%u, found in nack list when collect frame", s); + break; + } + + // We must collect frame from first packet to last packet. + if (s == head_sequence_ && pkt->rtp_payload_size() != 0 && ! pkt->rtp_payload_header->is_first_packet_of_frame) { + break; + } + + frame.push_back(pkt->copy()); + if (pkt->rtp_header.get_marker() || one_packet_per_frame_) { + if (! start_collected_) { + start_collected_ = true; + } + frames_.push_back(frame); + frame.clear(); + + srs_verbose("head seq=%u, update to %u because collect one full farme", head_sequence_, s + 1); + head_sequence_ = s + 1; + } + } + + // remove the tmp buffer + for (size_t i = 0; i < frame.size(); ++i) { + srs_freep(frame[i]); + } +} diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp new file mode 100644 index 000000000..6e71b15cd --- /dev/null +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -0,0 +1,157 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 John + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_RTP_QUEUE_HPP +#define SRS_APP_RTP_QUEUE_HPP + +#include + +#include +#include +#include + +class SrsRtpSharedPacket; +class SrsRtpQueue; + +struct SrsNackOption +{ + SrsNackOption() + { + // Default nack option. + max_count = 5; + max_alive_time = 2 * SRS_UTIME_SECONDS; + first_nack_interval = 10 * SRS_UTIME_MILLISECONDS; + nack_interval = 400 * SRS_UTIME_MILLISECONDS; + } + int max_count; + srs_utime_t max_alive_time; + int64_t first_nack_interval; + int64_t nack_interval; +}; + +struct SrsRtpNackInfo +{ + SrsRtpNackInfo(); + + // Use to control the time of first nack req and the life of seq. + srs_utime_t generate_time_; + // Use to control nack interval. + srs_utime_t pre_req_nack_time_; + // Use to control nack times. + int req_nack_count_; +}; + +inline bool seq_cmp(const uint16_t& l, const uint16_t& r) +{ + return ((int16_t)(r - l)) > 0; +} + +struct SeqComp +{ + bool operator()(const uint16_t& l, const uint16_t& r) const + { + return seq_cmp(l, r); + } +}; + +class SrsRtpNackList +{ +private: + // Nack queue, seq order, oldest to newest. + std::map queue_; + SrsRtpQueue* rtp_queue_; + SrsNackOption opts_; +private: + srs_utime_t pre_check_time_; +private: + int rtt_; +public: + SrsRtpNackList(SrsRtpQueue* rtp_queue); + virtual ~SrsRtpNackList(); +public: + void insert(uint16_t seq); + void remove(uint16_t seq); + SrsRtpNackInfo* find(uint16_t seq); +public: + void dump(); +public: + void get_nack_seqs(std::vector& seqs); +public: + void update_rtt(int rtt); +}; + +class SrsRtpQueue +{ +private: + /* + *[seq1|seq2|seq3|seq4|seq5 ... seq10|seq11(loss)|seq12(loss)|seq13] + * \___(head_sequence_) \ \___(highest_sequence_) + * \___(no received, in nack list) + */ + // Capacity of the ring-buffer. + size_t capacity_; + // Thei highest sequence we have receive. + uint16_t highest_sequence_; + // The sequence waitting to read. + uint16_t head_sequence_; + bool initialized_; + bool start_collected_; + // Ring bufer. + SrsRtpSharedPacket** queue_; +private: + uint64_t cycle_; + double jitter_; + int64_t last_trans_time_; + uint64_t pre_number_of_packet_received_; + uint64_t pre_number_of_packet_lossed_; + uint64_t num_of_packet_received_; + uint64_t number_of_packet_lossed_; +private: + bool one_packet_per_frame_; +public: + SrsRtpNackList nack_; +private: + std::vector > frames_; +public: + SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false); + virtual ~SrsRtpQueue(); +public: + srs_error_t insert(SrsRtpSharedPacket* rtp_pkt); + srs_error_t remove(uint16_t seq); +public: + void get_and_clean_collected_frames(std::vector >& frames); + void notify_drop_seq(uint16_t seq); +public: + uint32_t get_extended_highest_sequence(); + uint8_t get_fraction_lost(); + uint32_t get_cumulative_number_of_packets_lost(); + uint32_t get_interarrival_jitter(); +public: + void update_rtt(int rtt); +private: + void insert_into_nack_list(uint16_t seq_start, uint16_t seq_end); +private: + void collect_packet(); +}; + +#endif diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index 5c23cbd34..daf73ac32 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -32,19 +32,10 @@ using namespace std; #include #include -// @see: https://tools.ietf.org/html/rfc6184#section-5.2 -const uint8_t kStapA = 24; - -// @see: https://tools.ietf.org/html/rfc6184#section-5.2 -const uint8_t kFuA = 28; - -// @see: https://tools.ietf.org/html/rfc6184#section-5.8 -const uint8_t kStart = 0x80; // Fu-header start bit -const uint8_t kEnd = 0x40; // Fu-header end bit - SrsRtpHeader::SrsRtpHeader() { padding = false; + padding_length = 0; extension = false; cc = 0; marker = false; @@ -72,9 +63,70 @@ SrsRtpHeader::~SrsRtpHeader() srs_error_t SrsRtpHeader::decode(SrsBuffer* stream) { - srs_error_t err = srs_success; + srs_error_t err = srs_success; + + if (stream->size() < kRtpHeaderFixedSize) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect"); + } + + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P|X| CC |M| PT | sequence number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | timestamp | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | synchronization source (SSRC) identifier | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | contributing source (CSRC) identifiers | + | .... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + + uint8_t first = stream->read_1bytes(); + padding = (first & 0x20); + extension = (first & 0x10); + cc = (first & 0x0F); + + uint8_t second = stream->read_1bytes(); + marker = (second & 0x80); + payload_type = (second & 0x7F); + + sequence = stream->read_2bytes(); + timestamp = stream->read_4bytes(); + ssrc = stream->read_4bytes(); + + if (stream->size() < header_size()) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect"); + } + + for (uint8_t i = 0; i < cc; ++i) { + csrc[i] = stream->read_4bytes(); + } + + if (extension) { + // TODO: + uint16_t profile_id = stream->read_2bytes(); + extension_length = stream->read_2bytes(); + // @see: https://tools.ietf.org/html/rfc3550#section-5.3.1 + stream->skip(extension_length * 4); + + srs_verbose("extension, profile_id=%u, length=%u", profile_id, extension_length); + + // @see: https://tools.ietf.org/html/rfc5285#section-4.2 + if (profile_id == 0xBEDE) { + } + } + + if (padding) { + padding_length = *(reinterpret_cast(stream->data() + stream->size() - 1)); + if (padding_length > (stream->size() - stream->pos())) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect"); + } - // TODO: FIXME: Implements it. + srs_verbose("offset=%d, padding_length=%u", stream->size(), padding_length); + } return err; } @@ -173,6 +225,7 @@ SrsRtpPacket2::~SrsRtpPacket2() void SrsRtpPacket2::set_padding(int size) { rtp_header.set_padding(size > 0); + rtp_header.set_padding_length(size); if (cache_payload) { cache_payload += size - padding; } @@ -182,6 +235,7 @@ void SrsRtpPacket2::set_padding(int size) void SrsRtpPacket2::add_padding(int size) { rtp_header.set_padding(padding + size > 0); + rtp_header.set_padding_length(rtp_header.get_padding_length() + size); if (cache_payload) { cache_payload += size; } @@ -552,6 +606,69 @@ srs_error_t SrsRtpFUAPayload2::encode(SrsBuffer* buf) return srs_success; } +SrsRtpPayloadHeader::SrsRtpPayloadHeader() +{ + is_first_packet_of_frame = false; + is_last_packet_of_frame = false; +} + +SrsRtpPayloadHeader::~SrsRtpPayloadHeader() +{ +} + +SrsRtpPayloadHeader::SrsRtpPayloadHeader(const SrsRtpPayloadHeader& rhs) +{ + operator=(rhs); +} + +SrsRtpPayloadHeader& SrsRtpPayloadHeader::operator=(const SrsRtpPayloadHeader& rhs) +{ + is_first_packet_of_frame = rhs.is_first_packet_of_frame; + is_last_packet_of_frame = rhs.is_last_packet_of_frame; +} + +SrsRtpH264Header::SrsRtpH264Header() : SrsRtpPayloadHeader() +{ +} + +SrsRtpH264Header::~SrsRtpH264Header() +{ +} + +SrsRtpH264Header::SrsRtpH264Header(const SrsRtpH264Header& rhs) +{ + operator=(rhs); +} + +SrsRtpH264Header& SrsRtpH264Header::operator=(const SrsRtpH264Header& rhs) +{ + SrsRtpPayloadHeader::operator=(rhs); + nalu_type = rhs.nalu_type; + nalu_header = rhs.nalu_header; + nalu_offset = rhs.nalu_offset; + + return *this; +} + +SrsRtpOpusHeader::SrsRtpOpusHeader() : SrsRtpPayloadHeader() +{ +} + +SrsRtpOpusHeader::~SrsRtpOpusHeader() +{ +} + +SrsRtpOpusHeader::SrsRtpOpusHeader(const SrsRtpOpusHeader& rhs) +{ + operator=(rhs); +} + +SrsRtpOpusHeader& SrsRtpOpusHeader::operator=(const SrsRtpOpusHeader& rhs) +{ + SrsRtpPayloadHeader::operator=(rhs); + return *this; +} + SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload() { payload = NULL; @@ -570,6 +687,8 @@ SrsRtpSharedPacket::SrsRtpSharedPacket() payload = NULL; size = 0; + + rtp_payload_header = NULL; } SrsRtpSharedPacket::~SrsRtpSharedPacket() @@ -581,9 +700,11 @@ SrsRtpSharedPacket::~SrsRtpSharedPacket() --payload_ptr->shared_count; } } + + srs_freep(rtp_payload_header); } -srs_error_t SrsRtpSharedPacket::create(int64_t timestamp, uint16_t sequence, uint32_t ssrc, uint16_t payload_type, char* p, int s) +srs_error_t SrsRtpSharedPacket::create(SrsRtpHeader* rtp_h, SrsRtpPayloadHeader* rtp_ph, char* p, int s) { srs_error_t err = srs_success; @@ -593,10 +714,8 @@ srs_error_t SrsRtpSharedPacket::create(int64_t timestamp, uint16_t sequence, uin srs_assert(!payload_ptr); - rtp_header.set_timestamp(timestamp); - rtp_header.set_sequence(sequence); - rtp_header.set_ssrc(ssrc); - rtp_header.set_payload_type(payload_type); + this->rtp_header = *rtp_h; + this->rtp_payload_header = rtp_ph; // TODO: rtp header padding. size_t buffer_size = rtp_header.header_size() + s; @@ -619,6 +738,25 @@ srs_error_t SrsRtpSharedPacket::create(int64_t timestamp, uint16_t sequence, uin return err; } +srs_error_t SrsRtpSharedPacket::decode(char* buf, int nb_buf) +{ + srs_error_t err = srs_success; + + SrsBuffer stream(buf, nb_buf); + if ((err = rtp_header.decode(&stream)) != srs_success) { + return srs_error_wrap(err, "rtp header decode failed"); + } + + payload_ptr = new SrsRtpSharedPacketPayload(); + payload_ptr->payload = buf; + payload_ptr->size = nb_buf; + + this->payload = payload_ptr->payload; + this->size = payload_ptr->size; + + return err; +} + SrsRtpSharedPacket* SrsRtpSharedPacket::copy() { SrsRtpSharedPacket* copy = new SrsRtpSharedPacket(); @@ -627,6 +765,11 @@ SrsRtpSharedPacket* SrsRtpSharedPacket::copy() payload_ptr->shared_count++; copy->rtp_header = rtp_header; + if (dynamic_cast(rtp_payload_header)) { + copy->rtp_payload_header = new SrsRtpH264Header(*(dynamic_cast(rtp_payload_header))); + } else if (dynamic_cast(rtp_payload_header)) { + copy->rtp_payload_header = new SrsRtpOpusHeader(*(dynamic_cast(rtp_payload_header))); + } copy->payload = payload; copy->size = size; diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index 349c388a5..2ad222bee 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -37,6 +37,17 @@ const uint8_t kRtpMarker = 0x80; // H.264 nalu header type mask. const uint8_t kNalTypeMask = 0x1F; +// @see: https://tools.ietf.org/html/rfc6184#section-5.2 +const uint8_t kStapA = 24; + +// @see: https://tools.ietf.org/html/rfc6184#section-5.2 +const uint8_t kFuA = 28; + +// @see: https://tools.ietf.org/html/rfc6184#section-5.8 +const uint8_t kStart = 0x80; // Fu-header start bit +const uint8_t kEnd = 0x40; // Fu-header end bit + + class SrsBuffer; class SrsRtpRawPayload; class SrsRtpFUAPayload2; @@ -45,6 +56,7 @@ class SrsRtpHeader { private: bool padding; + uint8_t padding_length; bool extension; uint8_t cc; bool marker; @@ -76,6 +88,8 @@ public: inline void set_ssrc(uint32_t v) { ssrc = v; } uint32_t get_ssrc() const { return ssrc; } inline void set_padding(bool v) { padding = v; } + inline void set_padding_length(uint8_t v) { padding_length = v; } + uint8_t get_padding_length() const { return padding_length; } }; class SrsRtpPacket2 @@ -212,6 +226,40 @@ public: virtual srs_error_t encode(SrsBuffer* buf); }; +class SrsRtpPayloadHeader +{ +public: + bool is_first_packet_of_frame; + bool is_last_packet_of_frame; +public: + SrsRtpPayloadHeader(); + virtual ~SrsRtpPayloadHeader(); + SrsRtpPayloadHeader(const SrsRtpPayloadHeader& rhs); + SrsRtpPayloadHeader& operator=(const SrsRtpPayloadHeader& rhs); +}; + +class SrsRtpOpusHeader : public SrsRtpPayloadHeader +{ +public: + SrsRtpOpusHeader(); + virtual ~SrsRtpOpusHeader(); + SrsRtpOpusHeader(const SrsRtpOpusHeader& rhs); + SrsRtpOpusHeader& operator=(const SrsRtpOpusHeader& rhs); +}; + +class SrsRtpH264Header : public SrsRtpPayloadHeader +{ +public: + uint8_t nalu_type; + uint8_t nalu_header; + std::vector > nalu_offset; // offset, size +public: + SrsRtpH264Header(); + virtual ~SrsRtpH264Header(); + SrsRtpH264Header(const SrsRtpH264Header& rhs); + SrsRtpH264Header& operator=(const SrsRtpH264Header& rhs); +}; + class SrsRtpSharedPacket { private: @@ -230,14 +278,19 @@ private: SrsRtpSharedPacketPayload* payload_ptr; public: SrsRtpHeader rtp_header; + SrsRtpPayloadHeader* rtp_payload_header; char* payload; int size; public: SrsRtpSharedPacket(); virtual ~SrsRtpSharedPacket(); public: - srs_error_t create(int64_t timestamp, uint16_t sequence, uint32_t ssrc, uint16_t payload_type, char* payload, int size); + srs_error_t create(SrsRtpHeader* rtp_h, SrsRtpPayloadHeader* rtp_ph, char* p, int s); + srs_error_t decode(char* buf, int nb_buf); SrsRtpSharedPacket* copy(); +public: + char* rtp_payload() { return payload + rtp_header.header_size(); } + int rtp_payload_size() { return size - rtp_header.header_size() - rtp_header.get_padding_length(); } // Interface to modify rtp header public: srs_error_t modify_rtp_header_marker(bool marker);