diff --git a/trunk/configure b/trunk/configure index f34c47e30..648911fb1 100755 --- a/trunk/configure +++ b/trunk/configure @@ -280,7 +280,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_coworkers" "srs_app_hybrid") if [[ $SRS_RTC == YES ]]; then MODULE_FILES+=("srs_app_rtc" "srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_codec" "srs_app_rtc_sdp" - "srs_app_rtc_queue" "srs_app_rtc_server") + "srs_app_rtc_queue" "srs_app_rtc_server" "srs_app_rtc_source") fi if [[ $SRS_GB28181 == YES ]]; then MODULE_FILES+=("srs_app_gb28181" "srs_app_gb28181_sip") diff --git a/trunk/src/app/srs_app_rtc.cpp b/trunk/src/app/srs_app_rtc.cpp index 2e05100b8..1ddcc3842 100644 --- a/trunk/src/app/srs_app_rtc.cpp +++ b/trunk/src/app/srs_app_rtc.cpp @@ -215,8 +215,7 @@ srs_error_t SrsRtpOpusMuxer::transcode(SrsSharedPtrMessage* shared_audio, char* SrsRtc::SrsRtc() { req = NULL; - hub = NULL; - + enabled = false; disposable = false; last_update_time = 0; @@ -244,11 +243,10 @@ srs_error_t SrsRtc::cycle() return err; } -srs_error_t SrsRtc::initialize(SrsOriginHub* h, SrsRequest* r) +srs_error_t SrsRtc::initialize(SrsRequest* r) { srs_error_t err = srs_success; - hub = h; req = r; rtp_h264_muxer = new SrsRtpH264Muxer(); diff --git a/trunk/src/app/srs_app_rtc.hpp b/trunk/src/app/srs_app_rtc.hpp index df3f85306..914576d5a 100644 --- a/trunk/src/app/srs_app_rtc.hpp +++ b/trunk/src/app/srs_app_rtc.hpp @@ -87,7 +87,6 @@ private: srs_utime_t last_update_time; SrsRtpH264Muxer* rtp_h264_muxer; SrsRtpOpusMuxer* rtp_opus_muxer; - SrsOriginHub* hub; public: SrsRtc(); virtual ~SrsRtc(); @@ -95,7 +94,7 @@ public: virtual void dispose(); virtual srs_error_t cycle(); public: - virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r); + virtual srs_error_t initialize(SrsRequest* r); virtual srs_error_t on_publish(); virtual void on_unpublish(); virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index b4352f236..9653b314d 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -64,6 +64,7 @@ using namespace std; #include #include #include +#include // The RTP payload max size, reserved some paddings for SRTP as such: // kRtpPacketSize = kRtpMaxPayloadSize + paddings @@ -683,17 +684,15 @@ srs_error_t SrsRtcPlayer::cycle() { srs_error_t err = srs_success; - SrsSource* source = NULL; + SrsRtcSource* source = NULL; SrsRequest* req = session_->req; - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "rtc fetch source failed"); } - SrsConsumer* consumer = NULL; - SrsAutoFree(SrsConsumer, consumer); + SrsRtcConsumer* consumer = NULL; + SrsAutoFree(SrsRtcConsumer, consumer); if ((err = source->create_consumer(NULL, consumer)) != srs_success) { return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str()); } @@ -807,7 +806,7 @@ srs_error_t SrsRtcPlayer::cycle() } srs_error_t SrsRtcPlayer::send_messages( - SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets + SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets ) { srs_error_t err = srs_success; @@ -841,7 +840,7 @@ srs_error_t SrsRtcPlayer::send_messages( } srs_error_t SrsRtcPlayer::messages_to_packets( - SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets + SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets ) { srs_error_t err = srs_success; @@ -1428,7 +1427,7 @@ srs_error_t SrsRtcPlayer::package_single_nalu(SrsSharedPtrMessage* msg, SrsSampl return err; } -srs_error_t SrsRtcPlayer::package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets) +srs_error_t SrsRtcPlayer::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets) { srs_error_t err = srs_success; @@ -1783,9 +1782,7 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsReque return srs_error_wrap(err, "start report_timer"); } - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -2751,7 +2748,7 @@ int SrsRtcSession::context_id() return cid; } -srs_error_t SrsRtcSession::initialize(SrsSource* source, SrsRequest* r, bool is_publisher, string username, int context_id) +srs_error_t SrsRtcSession::initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, string username, int context_id) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index b47dc3091..f7303dd2c 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -50,7 +50,7 @@ class SrsStunPacket; class SrsRtcServer; class SrsRtcSession; class SrsSharedPtrMessage; -class SrsSource; +class SrsRtcSource; class SrsRtpPacket2; class ISrsUdpSender; class SrsRtpQueue; @@ -251,8 +251,8 @@ public: public: virtual srs_error_t cycle(); private: - srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets); - srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets); + srs_error_t send_messages(SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets); + srs_error_t messages_to_packets(SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets); srs_error_t send_packets(SrsRtcOutgoingPackets& packets); srs_error_t send_packets_gso(SrsRtcOutgoingPackets& packets); private: @@ -261,7 +261,7 @@ private: srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcOutgoingPackets& packets); srs_error_t package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets); srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcOutgoingPackets& packets); - srs_error_t package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets); + srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets); public: void nack_fetch(std::vector& pkts, uint32_t ssrc, uint16_t seq); void simulate_nack_drop(int nn); @@ -293,7 +293,7 @@ private: SrsRtpNackForReceiver* audio_nack_; private: SrsRequest* req; - SrsSource* source; + SrsRtcSource* source; // Whether enabled nack. bool nack_enabled_; // Simulators. @@ -365,7 +365,7 @@ private: // TODO: FIXME: Support reload. bool encrypt; SrsRequest* req; - SrsSource* source_; + SrsRtcSource* source_; SrsSdp remote_sdp; SrsSdp local_sdp; private: @@ -390,7 +390,7 @@ public: void switch_to_context(); int context_id(); public: - srs_error_t initialize(SrsSource* source, SrsRequest* r, bool is_publisher, std::string username, int context_id); + srs_error_t initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, std::string username, int context_id); // The peer address may change, we can identify that by STUN messages. srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r); srs_error_t on_dtls(char* data, int nb_data); diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index f4d3dcd5a..992319c7e 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -38,6 +38,7 @@ #include #include #include +#include using namespace std; @@ -569,11 +570,8 @@ srs_error_t SrsRtcServer::create_session( ) { srs_error_t err = srs_success; - SrsSource* source = NULL; - - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + SrsRtcSource* source = NULL; + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -663,11 +661,8 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req return err; } - SrsSource* source = NULL; - - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + SrsRtcSource* source = NULL; + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "create source"); } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp new file mode 100644 index 000000000..88f2e946c --- /dev/null +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -0,0 +1,567 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 John + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s, SrsConnection* c) +{ + source = s; + conn = c; + should_update_source_id = false; + queue = new SrsMessageQueue(); + +#ifdef SRS_PERF_QUEUE_COND_WAIT + mw_wait = srs_cond_new(); + mw_min_msgs = 0; + mw_duration = 0; + mw_waiting = false; +#endif +} + +SrsRtcConsumer::~SrsRtcConsumer() +{ + source->on_consumer_destroy(this); + + srs_freep(queue); + +#ifdef SRS_PERF_QUEUE_COND_WAIT + srs_cond_destroy(mw_wait); +#endif +} + +void SrsRtcConsumer::enable_pass_timestamp() +{ +} + +void SrsRtcConsumer::set_queue_size(srs_utime_t queue_size) +{ +} + +void SrsRtcConsumer::update_source_id() +{ + should_update_source_id = true; +} + +srs_error_t SrsRtcConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) +{ + srs_error_t err = srs_success; + + SrsSharedPtrMessage* msg = shared_msg->copy(); + + if ((err = queue->enqueue(msg, NULL, true)) != srs_success) { + return srs_error_wrap(err, "enqueue message"); + } + +#ifdef SRS_PERF_QUEUE_COND_WAIT + // fire the mw when msgs is enough. + if (mw_waiting) { + if (queue->size() > mw_min_msgs) { + srs_cond_signal(mw_wait); + mw_waiting = false; + return err; + } + return err; + } +#endif + + return err; +} + +srs_error_t SrsRtcConsumer::dump_packets(SrsMessageArray* msgs, int& count) +{ + srs_error_t err = srs_success; + + srs_assert(count >= 0); + srs_assert(msgs->max > 0); + + // the count used as input to reset the max if positive. + int max = count? srs_min(count, msgs->max) : msgs->max; + + // the count specifies the max acceptable count, + // here maybe 1+, and we must set to 0 when got nothing. + count = 0; + + if (should_update_source_id) { + srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id()); + should_update_source_id = false; + } + + // pump msgs from queue. + if ((err = queue->dump_packets(max, msgs->msgs, count, true)) != srs_success) { + return srs_error_wrap(err, "dump packets"); + } + + return err; +} + +#ifdef SRS_PERF_QUEUE_COND_WAIT +void SrsRtcConsumer::wait(int nb_msgs, srs_utime_t msgs_duration) +{ + mw_min_msgs = nb_msgs; + mw_duration = msgs_duration; + + srs_utime_t duration = queue->duration(); + bool match_min_msgs = queue->size() > mw_min_msgs; + + // when duration ok, signal to flush. + if (match_min_msgs && duration > mw_duration) { + return; + } + + // the enqueue will notify this cond. + mw_waiting = true; + + // use cond block wait for high performance mode. + srs_cond_wait(mw_wait); +} +#endif + +SrsRtcSource::SrsRtcSource() +{ + req = NULL; + _source_id = _pre_source_id = -1; + + meta = new SrsMetaCache(); + format = new SrsRtmpFormat(); + rtc = new SrsRtc(); + + _can_publish = true; +} + +SrsRtcSource::~SrsRtcSource() +{ + // never free the consumers, + // for all consumers are auto free. + consumers.clear(); + + srs_freep(meta); + srs_freep(format); + srs_freep(rtc); + + srs_freep(req); +} + +srs_error_t SrsRtcSource::initialize(SrsRequest* r) +{ + srs_error_t err = srs_success; + + req = r->copy(); + + if ((err = format->initialize()) != srs_success) { + return srs_error_wrap(err, "format initialize"); + } + + if ((err = rtc->initialize(req)) != srs_success) { + return srs_error_wrap(err, "rtc initialize"); + } + + return err; +} + +void SrsRtcSource::update_auth(SrsRequest* r) +{ + req->update_auth(r); +} + +srs_error_t SrsRtcSource::on_source_id_changed(int id) +{ + srs_error_t err = srs_success; + + if (_source_id == id) { + return err; + } + + if (_pre_source_id == -1) { + _pre_source_id = id; + } else if (_pre_source_id != _source_id) { + _pre_source_id = _source_id; + } + + _source_id = id; + + // notice all consumer + std::vector::iterator it; + for (it = consumers.begin(); it != consumers.end(); ++it) { + SrsRtcConsumer* consumer = *it; + consumer->update_source_id(); + } + + return err; +} + +int SrsRtcSource::source_id() +{ + return _source_id; +} + +int SrsRtcSource::pre_source_id() +{ + return _pre_source_id; +} + +srs_error_t SrsRtcSource::create_consumer(SrsConnection* conn, SrsRtcConsumer*& consumer) +{ + srs_error_t err = srs_success; + + consumer = new SrsRtcConsumer(this, conn); + consumers.push_back(consumer); + + // TODO: FIXME: Implements edge cluster. + + return err; +} + +srs_error_t SrsRtcSource::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool dm, bool dg) +{ + srs_error_t err = srs_success; + + srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); + consumer->set_queue_size(queue_size); + + // Copy metadata and sequence header to consumer. + // TODO: FIXME: Maybe should not do this for RTC? + if ((err = meta->dumps(consumer, true, SrsRtmpJitterAlgorithmOFF, dm, dg)) != srs_success) { + return srs_error_wrap(err, "meta dumps"); + } + + // print status. + if (dg) { + srs_trace("create consumer, queue_size=%.2f", queue_size); + } else { + srs_trace("create consumer, ignore gop cache"); + } + + return err; +} + +void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer) +{ + std::vector::iterator it; + it = std::find(consumers.begin(), consumers.end(), consumer); + if (it != consumers.end()) { + consumers.erase(it); + } +} + +bool SrsRtcSource::can_publish(bool is_edge) +{ + return _can_publish; +} + +srs_error_t SrsRtcSource::on_publish() +{ + srs_error_t err = srs_success; + + // update the request object. + srs_assert(req); + + _can_publish = false; + + if ((err = rtc->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtc publish"); + } + + // whatever, the publish thread is the source or edge source, + // save its id to srouce id. + if ((err = on_source_id_changed(_srs_context->get_id())) != srs_success) { + return srs_error_wrap(err, "source id change"); + } + + // Reset the metadata cache, to make VLC happy when disable/enable stream. + // @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448 + meta->clear(); + + // TODO: FIXME: Handle by statistic. + + return err; +} + +void SrsRtcSource::on_unpublish() +{ + // ignore when already unpublished. + if (_can_publish) { + return; + } + + rtc->on_unpublish(); + + // Reset the metadata cache, to make VLC happy when disable/enable stream. + // @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448 + meta->update_previous_vsh(); + meta->update_previous_ash(); + + srs_trace("cleanup when unpublish"); + + _can_publish = true; + _source_id = -1; + + // TODO: FIXME: Handle by statistic. +} + +SrsMetaCache* SrsRtcSource::cached_meta() +{ + return meta; +} + +SrsRtcPublisher* SrsRtcSource::rtc_publisher() +{ + return rtc_publisher_; +} + +void SrsRtcSource::set_rtc_publisher(SrsRtcPublisher* v) +{ + rtc_publisher_ = v; +} + +srs_error_t SrsRtcSource::on_rtc_audio(SrsSharedPtrMessage* audio) +{ + // TODO: FIXME: Merge with on_audio. + // TODO: FIXME: Print key information. + return on_audio_imp(audio); +} + +srs_error_t SrsRtcSource::on_video(SrsCommonMessage* shared_video) +{ + srs_error_t err = srs_success; + + // drop any unknown header video. + // @see https://github.com/ossrs/srs/issues/421 + if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) { + char b0 = 0x00; + if (shared_video->size > 0) { + b0 = shared_video->payload[0]; + } + + srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0); + return err; + } + + // convert shared_video to msg, user should not use shared_video again. + // the payload is transfer to msg, and set to NULL in shared_video. + SrsSharedPtrMessage msg; + if ((err = msg.create(shared_video)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + + // directly process the video message. + return on_video_imp(&msg); +} + +srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg) +{ + srs_error_t err = srs_success; + + bool is_aac_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size); + bool is_sequence_header = is_aac_sequence_header; + + // whether consumer should drop for the duplicated sequence header. + bool drop_for_reduce = false; + if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) { + if (meta->previous_ash()->size == msg->size) { + drop_for_reduce = srs_bytes_equals(meta->previous_ash()->payload, msg->payload, msg->size); + srs_warn("drop for reduce sh audio, size=%d", msg->size); + } + } + + // copy to all consumer + if (!drop_for_reduce) { + for (int i = 0; i < (int)consumers.size(); i++) { + SrsRtcConsumer* consumer = consumers.at(i); + if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) { + return srs_error_wrap(err, "consume message"); + } + } + } + + // cache the sequence header of aac, or first packet of mp3. + // for example, the mp3 is used for hls to write the "right" audio codec. + // TODO: FIXME: to refine the stream info system. + if (is_aac_sequence_header || !meta->ash()) { + if ((err = meta->update_ash(msg)) != srs_success) { + return srs_error_wrap(err, "meta consume audio"); + } + } + + // when sequence header, donot push to gop cache and adjust the timestamp. + if (is_sequence_header) { + return err; + } + + // if atc, update the sequence header to abs time. + if (meta->ash()) { + meta->ash()->timestamp = msg->timestamp; + } + if (meta->data()) { + meta->data()->timestamp = msg->timestamp; + } + + return err; +} + +srs_error_t SrsRtcSource::on_video_imp(SrsSharedPtrMessage* msg) +{ + srs_error_t err = srs_success; + + bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); + + // user can disable the sps parse to workaround when parse sps failed. + // @see https://github.com/ossrs/srs/issues/474 + if (is_sequence_header) { + format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost); + } + + if ((err = format->on_video(msg)) != srs_success) { + return srs_error_wrap(err, "format consume video"); + } + + // Parse RTMP message to RTP packets, in FU-A if too large. + if ((err = rtc->on_video(msg, format)) != srs_success) { + // TODO: We should support more strategies. + srs_warn("rtc: ignore video error %s", srs_error_desc(err).c_str()); + srs_error_reset(err); + rtc->on_unpublish(); + } + + // whether consumer should drop for the duplicated sequence header. + bool drop_for_reduce = false; + if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) { + if (meta->previous_vsh()->size == msg->size) { + drop_for_reduce = srs_bytes_equals(meta->previous_vsh()->payload, msg->payload, msg->size); + srs_warn("drop for reduce sh video, size=%d", msg->size); + } + } + + // cache the sequence header if h264 + if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { + return srs_error_wrap(err, "meta update video"); + } + + // copy to all consumer + if (!drop_for_reduce) { + for (int i = 0; i < (int)consumers.size(); i++) { + SrsRtcConsumer* consumer = consumers.at(i); + if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) { + return srs_error_wrap(err, "consume video"); + } + } + } + + // when sequence header, donot push to gop cache and adjust the timestamp. + if (is_sequence_header) { + return err; + } + + // if atc, update the sequence header to abs time. + if (meta->vsh()) { + meta->vsh()->timestamp = msg->timestamp; + } + if (meta->data()) { + meta->data()->timestamp = msg->timestamp; + } + + return err; +} + +SrsRtcSourceManager::SrsRtcSourceManager() +{ + lock = NULL; +} + +SrsRtcSourceManager::~SrsRtcSourceManager() +{ + srs_mutex_destroy(lock); +} + +srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** pps) +{ + srs_error_t err = srs_success; + + // Lazy create lock, because ST is not ready in SrsRtcSourceManager constructor. + if (!lock) { + lock = srs_mutex_new(); + } + + // Use lock to protect coroutine switch. + // @bug https://github.com/ossrs/srs/issues/1230 + SrsLocker(lock); + + SrsRtcSource* source = NULL; + if ((source = fetch(r)) != NULL) { + *pps = source; + return err; + } + + string stream_url = r->get_stream_url(); + string vhost = r->vhost; + + // should always not exists for create a source. + srs_assert (pool.find(stream_url) == pool.end()); + + srs_trace("new source, stream_url=%s", stream_url.c_str()); + + source = new SrsRtcSource(); + if ((err = source->initialize(r)) != srs_success) { + return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); + } + + pool[stream_url] = source; + + *pps = source; + + return err; +} + +SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r) +{ + SrsRtcSource* source = NULL; + + string stream_url = r->get_stream_url(); + if (pool.find(stream_url) == pool.end()) { + return NULL; + } + + source = pool[stream_url]; + + // we always update the request of resource, + // for origin auth is on, the token in request maybe invalid, + // and we only need to update the token of request, it's simple. + source->update_auth(r); + + return source; +} + +SrsRtcSourceManager* _srs_rtc_sources = new SrsRtcSourceManager(); + diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp new file mode 100644 index 000000000..8cbaf115e --- /dev/null +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -0,0 +1,181 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 John + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_RTC_SOURCE_HPP +#define SRS_APP_RTC_SOURCE_HPP + +#include + +#include +#include + +#include +#include + +class SrsRequest; +class SrsConnection; +class SrsMetaCache; +class SrsRtcPublisher; +class SrsSharedPtrMessage; +class SrsCommonMessage; +class SrsMessageArray; +class SrsRtcSource; + +class SrsRtcConsumer : public ISrsConsumerQueue +{ +private: + SrsRtcSource* source; + // The owner connection for debug, maybe NULL. + SrsConnection* conn; + SrsMessageQueue* queue; + // when source id changed, notice all consumers + bool should_update_source_id; +#ifdef SRS_PERF_QUEUE_COND_WAIT + // The cond wait for mw. + // @see https://github.com/ossrs/srs/issues/251 + srs_cond_t mw_wait; + bool mw_waiting; + int mw_min_msgs; + srs_utime_t mw_duration; +#endif +public: + SrsRtcConsumer(SrsRtcSource* s, SrsConnection* c); + virtual ~SrsRtcConsumer(); +public: + // Use pass timestamp mode. + // TODO: FIXME: Remove it. + void enable_pass_timestamp(); + // Set the size of queue. + // TODO: FIXME: Remove it. + virtual void set_queue_size(srs_utime_t queue_size); + // when source id changed, notice client to print. + virtual void update_source_id(); + // Enqueue an shared ptr message. + // @param shared_msg, directly ptr, copy it if need to save it. + // @param whether atc, donot use jitter correct if true. + // @param ag the algorithm of time jitter. + virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag); + // Get packets in consumer queue. + // @param msgs the msgs array to dump packets to send. + // @param count the count in array, intput and output param. + // @remark user can specifies the count to get specified msgs; 0 to get all if possible. + virtual srs_error_t dump_packets(SrsMessageArray* msgs, int& count); +#ifdef SRS_PERF_QUEUE_COND_WAIT + // wait for messages incomming, atleast nb_msgs and in duration. + // @param nb_msgs the messages count to wait. + // @param msgs_duration the messages duration to wait. + virtual void wait(int nb_msgs, srs_utime_t msgs_duration); +#endif +}; + +class SrsRtcSource +{ +private: + // For publish, it's the publish client id. + // For edge, it's the edge ingest id. + // when source id changed, for example, the edge reconnect, + // invoke the on_source_id_changed() to let all clients know. + int _source_id; + // previous source id. + int _pre_source_id; + SrsRequest* req; + SrsRtcPublisher* rtc_publisher_; +private: + // To delivery stream to clients. + std::vector consumers; + // The metadata cache. + SrsMetaCache* meta; + // Whether source is avaiable for publishing. + bool _can_publish; +private: + // The format, codec information. + SrsRtmpFormat* format; + // rtc handler + SrsRtc* rtc; +public: + SrsRtcSource(); + virtual ~SrsRtcSource(); +public: + virtual srs_error_t initialize(SrsRequest* r); + // Update the authentication information in request. + virtual void update_auth(SrsRequest* r); + // The source id changed. + virtual srs_error_t on_source_id_changed(int id); + // Get current source id. + virtual int source_id(); + virtual int pre_source_id(); +public: + // Create consumer + // @param consumer, output the create consumer. + virtual srs_error_t create_consumer(SrsConnection* conn, SrsRtcConsumer*& consumer); + // Dumps packets in cache to consumer. + // @param ds, whether dumps the sequence header. + // @param dm, whether dumps the metadata. + // @param dg, whether dumps the gop cache. + virtual srs_error_t consumer_dumps(SrsRtcConsumer* consumer, bool ds = true, bool dm = true, bool dg = true); + virtual void on_consumer_destroy(SrsRtcConsumer* consumer); + // TODO: FIXME: Remove the param is_edge. + virtual bool can_publish(bool is_edge); + // When start publish stream. + virtual srs_error_t on_publish(); + // When stop publish stream. + virtual void on_unpublish(); +public: + // For RTC, we need to package SPS/PPS(in cached meta) before each IDR. + SrsMetaCache* cached_meta(); + // Get and set the publisher, passed to consumer to process requests such as PLI. + SrsRtcPublisher* rtc_publisher(); + void set_rtc_publisher(SrsRtcPublisher* v); + // When got RTC audio message, which is encoded in opus. + // TODO: FIXME: Merge with on_audio. + srs_error_t on_rtc_audio(SrsSharedPtrMessage* audio); + virtual srs_error_t on_video(SrsCommonMessage* video); +private: + virtual srs_error_t on_audio_imp(SrsSharedPtrMessage* audio); + virtual srs_error_t on_video_imp(SrsSharedPtrMessage* video); +}; + +class SrsRtcSourceManager +{ +private: + srs_mutex_t lock; + std::map pool; +public: + SrsRtcSourceManager(); + virtual ~SrsRtcSourceManager(); +public: + // create source when fetch from cache failed. + // @param r the client request. + // @param pps the matched source, if success never be NULL. + virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcSource** pps); +private: + // Get the exists source, NULL when not exists. + // update the request and return the exists source. + virtual SrsRtcSource* fetch(SrsRequest* r); +}; + +// Global singleton instance. +extern SrsRtcSourceManager* _srs_rtc_sources; + +#endif + diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d881ef140..9f1faa2a8 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -427,6 +427,14 @@ ISrsWakable::~ISrsWakable() { } +ISrsConsumerQueue::ISrsConsumerQueue() +{ +} + +ISrsConsumerQueue::~ISrsConsumerQueue() +{ +} + SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c) { source = s; @@ -907,7 +915,7 @@ srs_error_t SrsOriginHub::initialize(SrsSource* s, SrsRequest* r) } #ifdef SRS_RTC - if ((err = rtc->initialize(this, req)) != srs_success) { + if ((err = rtc->initialize(req)) != srs_success) { return srs_error_wrap(err, "rtc initialize"); } #endif @@ -1629,7 +1637,7 @@ SrsFormat* SrsMetaCache::ash_format() return aformat; } -srs_error_t SrsMetaCache::dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds) +srs_error_t SrsMetaCache::dumps(ISrsConsumerQueue* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 5aba081d1..d8f1b38db 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -186,8 +186,18 @@ public: virtual void wakeup() = 0; }; +// Enqueue the packet to consumer. +class ISrsConsumerQueue +{ +public: + ISrsConsumerQueue(); + virtual ~ISrsConsumerQueue(); +public: + virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) = 0; +}; + // The consumer for SrsSource, that is a play client. -class SrsConsumer : public ISrsWakable +class SrsConsumer : virtual public ISrsWakable, virtual public ISrsConsumerQueue { private: SrsRtmpJitter* jitter; @@ -207,6 +217,7 @@ private: srs_utime_t mw_duration; #endif private: + // TODO: FIXME: Move to RTC consumer. // For RTC, we never use jitter to correct timestamp. // But we should not change the atc or time_jitter for source or RTMP. // @remark In this mode, we also never check the queue by timstamp, but only by count. @@ -450,7 +461,7 @@ public: // Dumps cached metadata to consumer. // @param dm Whether dumps the metadata. // @param ds Whether dumps the sequence header. - virtual srs_error_t dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds); + virtual srs_error_t dumps(ISrsConsumerQueue* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds); public: // Previous exists sequence header. virtual SrsSharedPtrMessage* previous_vsh();