RTC: Extract RTC Source and Consumer

pull/1804/head
winlin 5 years ago
parent d0d8f676c3
commit 25496b734b

2
trunk/configure vendored

@ -280,7 +280,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
"srs_app_coworkers" "srs_app_hybrid")
if [[ $SRS_RTC == YES ]]; then
MODULE_FILES+=("srs_app_rtc" "srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_codec" "srs_app_rtc_sdp"
"srs_app_rtc_queue" "srs_app_rtc_server")
"srs_app_rtc_queue" "srs_app_rtc_server" "srs_app_rtc_source")
fi
if [[ $SRS_GB28181 == YES ]]; then
MODULE_FILES+=("srs_app_gb28181" "srs_app_gb28181_sip")

@ -215,7 +215,6 @@ srs_error_t SrsRtpOpusMuxer::transcode(SrsSharedPtrMessage* shared_audio, char*
SrsRtc::SrsRtc()
{
req = NULL;
hub = NULL;
enabled = false;
disposable = false;
@ -244,11 +243,10 @@ srs_error_t SrsRtc::cycle()
return err;
}
srs_error_t SrsRtc::initialize(SrsOriginHub* h, SrsRequest* r)
srs_error_t SrsRtc::initialize(SrsRequest* r)
{
srs_error_t err = srs_success;
hub = h;
req = r;
rtp_h264_muxer = new SrsRtpH264Muxer();

@ -87,7 +87,6 @@ private:
srs_utime_t last_update_time;
SrsRtpH264Muxer* rtp_h264_muxer;
SrsRtpOpusMuxer* rtp_opus_muxer;
SrsOriginHub* hub;
public:
SrsRtc();
virtual ~SrsRtc();
@ -95,7 +94,7 @@ public:
virtual void dispose();
virtual srs_error_t cycle();
public:
virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r);
virtual srs_error_t initialize(SrsRequest* r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format);

@ -64,6 +64,7 @@ using namespace std;
#include <srs_app_pithy_print.hpp>
#include <srs_service_st.hpp>
#include <srs_app_rtc_server.hpp>
#include <srs_app_rtc_source.hpp>
// The RTP payload max size, reserved some paddings for SRTP as such:
// kRtpPacketSize = kRtpMaxPayloadSize + paddings
@ -683,17 +684,15 @@ srs_error_t SrsRtcPlayer::cycle()
{
srs_error_t err = srs_success;
SrsSource* source = NULL;
SrsRtcSource* source = NULL;
SrsRequest* req = session_->req;
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "rtc fetch source failed");
}
SrsConsumer* consumer = NULL;
SrsAutoFree(SrsConsumer, consumer);
SrsRtcConsumer* consumer = NULL;
SrsAutoFree(SrsRtcConsumer, consumer);
if ((err = source->create_consumer(NULL, consumer)) != srs_success) {
return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str());
}
@ -807,7 +806,7 @@ srs_error_t SrsRtcPlayer::cycle()
}
srs_error_t SrsRtcPlayer::send_messages(
SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets
SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets
) {
srs_error_t err = srs_success;
@ -841,7 +840,7 @@ srs_error_t SrsRtcPlayer::send_messages(
}
srs_error_t SrsRtcPlayer::messages_to_packets(
SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets
SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets
) {
srs_error_t err = srs_success;
@ -1428,7 +1427,7 @@ srs_error_t SrsRtcPlayer::package_single_nalu(SrsSharedPtrMessage* msg, SrsSampl
return err;
}
srs_error_t SrsRtcPlayer::package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets)
srs_error_t SrsRtcPlayer::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets)
{
srs_error_t err = srs_success;
@ -1783,9 +1782,7 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsReque
return srs_error_wrap(err, "start report_timer");
}
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -2751,7 +2748,7 @@ int SrsRtcSession::context_id()
return cid;
}
srs_error_t SrsRtcSession::initialize(SrsSource* source, SrsRequest* r, bool is_publisher, string username, int context_id)
srs_error_t SrsRtcSession::initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, string username, int context_id)
{
srs_error_t err = srs_success;

@ -50,7 +50,7 @@ class SrsStunPacket;
class SrsRtcServer;
class SrsRtcSession;
class SrsSharedPtrMessage;
class SrsSource;
class SrsRtcSource;
class SrsRtpPacket2;
class ISrsUdpSender;
class SrsRtpQueue;
@ -251,8 +251,8 @@ public:
public:
virtual srs_error_t cycle();
private:
srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets);
srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets);
srs_error_t send_messages(SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets);
srs_error_t messages_to_packets(SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets);
srs_error_t send_packets(SrsRtcOutgoingPackets& packets);
srs_error_t send_packets_gso(SrsRtcOutgoingPackets& packets);
private:
@ -261,7 +261,7 @@ private:
srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcOutgoingPackets& packets);
srs_error_t package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets);
srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcOutgoingPackets& packets);
srs_error_t package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets);
srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets);
public:
void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq);
void simulate_nack_drop(int nn);
@ -293,7 +293,7 @@ private:
SrsRtpNackForReceiver* audio_nack_;
private:
SrsRequest* req;
SrsSource* source;
SrsRtcSource* source;
// Whether enabled nack.
bool nack_enabled_;
// Simulators.
@ -365,7 +365,7 @@ private:
// TODO: FIXME: Support reload.
bool encrypt;
SrsRequest* req;
SrsSource* source_;
SrsRtcSource* source_;
SrsSdp remote_sdp;
SrsSdp local_sdp;
private:
@ -390,7 +390,7 @@ public:
void switch_to_context();
int context_id();
public:
srs_error_t initialize(SrsSource* source, SrsRequest* r, bool is_publisher, std::string username, int context_id);
srs_error_t initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, std::string username, int context_id);
// The peer address may change, we can identify that by STUN messages.
srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r);
srs_error_t on_dtls(char* data, int nb_data);

@ -38,6 +38,7 @@
#include <srs_app_http_api.hpp>
#include <srs_app_rtc_dtls.hpp>
#include <srs_service_utility.hpp>
#include <srs_app_rtc_source.hpp>
using namespace std;
@ -569,11 +570,8 @@ srs_error_t SrsRtcServer::create_session(
) {
srs_error_t err = srs_success;
SrsSource* source = NULL;
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -663,11 +661,8 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req
return err;
}
SrsSource* source = NULL;
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}

@ -0,0 +1,567 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 John
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_app_rtc_source.hpp>
#include <srs_app_conn.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_app_config.hpp>
#include <srs_app_source.hpp>
#include <srs_kernel_flv.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_rtmp_msg_array.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_format.hpp>
#include <srs_app_rtc.hpp>
using namespace std;
SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s, SrsConnection* c)
{
source = s;
conn = c;
should_update_source_id = false;
queue = new SrsMessageQueue();
#ifdef SRS_PERF_QUEUE_COND_WAIT
mw_wait = srs_cond_new();
mw_min_msgs = 0;
mw_duration = 0;
mw_waiting = false;
#endif
}
SrsRtcConsumer::~SrsRtcConsumer()
{
source->on_consumer_destroy(this);
srs_freep(queue);
#ifdef SRS_PERF_QUEUE_COND_WAIT
srs_cond_destroy(mw_wait);
#endif
}
void SrsRtcConsumer::enable_pass_timestamp()
{
}
void SrsRtcConsumer::set_queue_size(srs_utime_t queue_size)
{
}
void SrsRtcConsumer::update_source_id()
{
should_update_source_id = true;
}
srs_error_t SrsRtcConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{
srs_error_t err = srs_success;
SrsSharedPtrMessage* msg = shared_msg->copy();
if ((err = queue->enqueue(msg, NULL, true)) != srs_success) {
return srs_error_wrap(err, "enqueue message");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// fire the mw when msgs is enough.
if (mw_waiting) {
if (queue->size() > mw_min_msgs) {
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
return err;
}
#endif
return err;
}
srs_error_t SrsRtcConsumer::dump_packets(SrsMessageArray* msgs, int& count)
{
srs_error_t err = srs_success;
srs_assert(count >= 0);
srs_assert(msgs->max > 0);
// the count used as input to reset the max if positive.
int max = count? srs_min(count, msgs->max) : msgs->max;
// the count specifies the max acceptable count,
// here maybe 1+, and we must set to 0 when got nothing.
count = 0;
if (should_update_source_id) {
srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id());
should_update_source_id = false;
}
// pump msgs from queue.
if ((err = queue->dump_packets(max, msgs->msgs, count, true)) != srs_success) {
return srs_error_wrap(err, "dump packets");
}
return err;
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
void SrsRtcConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
{
mw_min_msgs = nb_msgs;
mw_duration = msgs_duration;
srs_utime_t duration = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
// when duration ok, signal to flush.
if (match_min_msgs && duration > mw_duration) {
return;
}
// the enqueue will notify this cond.
mw_waiting = true;
// use cond block wait for high performance mode.
srs_cond_wait(mw_wait);
}
#endif
SrsRtcSource::SrsRtcSource()
{
req = NULL;
_source_id = _pre_source_id = -1;
meta = new SrsMetaCache();
format = new SrsRtmpFormat();
rtc = new SrsRtc();
_can_publish = true;
}
SrsRtcSource::~SrsRtcSource()
{
// never free the consumers,
// for all consumers are auto free.
consumers.clear();
srs_freep(meta);
srs_freep(format);
srs_freep(rtc);
srs_freep(req);
}
srs_error_t SrsRtcSource::initialize(SrsRequest* r)
{
srs_error_t err = srs_success;
req = r->copy();
if ((err = format->initialize()) != srs_success) {
return srs_error_wrap(err, "format initialize");
}
if ((err = rtc->initialize(req)) != srs_success) {
return srs_error_wrap(err, "rtc initialize");
}
return err;
}
void SrsRtcSource::update_auth(SrsRequest* r)
{
req->update_auth(r);
}
srs_error_t SrsRtcSource::on_source_id_changed(int id)
{
srs_error_t err = srs_success;
if (_source_id == id) {
return err;
}
if (_pre_source_id == -1) {
_pre_source_id = id;
} else if (_pre_source_id != _source_id) {
_pre_source_id = _source_id;
}
_source_id = id;
// notice all consumer
std::vector<SrsRtcConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsRtcConsumer* consumer = *it;
consumer->update_source_id();
}
return err;
}
int SrsRtcSource::source_id()
{
return _source_id;
}
int SrsRtcSource::pre_source_id()
{
return _pre_source_id;
}
srs_error_t SrsRtcSource::create_consumer(SrsConnection* conn, SrsRtcConsumer*& consumer)
{
srs_error_t err = srs_success;
consumer = new SrsRtcConsumer(this, conn);
consumers.push_back(consumer);
// TODO: FIXME: Implements edge cluster.
return err;
}
srs_error_t SrsRtcSource::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool dm, bool dg)
{
srs_error_t err = srs_success;
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
consumer->set_queue_size(queue_size);
// Copy metadata and sequence header to consumer.
// TODO: FIXME: Maybe should not do this for RTC?
if ((err = meta->dumps(consumer, true, SrsRtmpJitterAlgorithmOFF, dm, dg)) != srs_success) {
return srs_error_wrap(err, "meta dumps");
}
// print status.
if (dg) {
srs_trace("create consumer, queue_size=%.2f", queue_size);
} else {
srs_trace("create consumer, ignore gop cache");
}
return err;
}
void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer)
{
std::vector<SrsRtcConsumer*>::iterator it;
it = std::find(consumers.begin(), consumers.end(), consumer);
if (it != consumers.end()) {
consumers.erase(it);
}
}
bool SrsRtcSource::can_publish(bool is_edge)
{
return _can_publish;
}
srs_error_t SrsRtcSource::on_publish()
{
srs_error_t err = srs_success;
// update the request object.
srs_assert(req);
_can_publish = false;
if ((err = rtc->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtc publish");
}
// whatever, the publish thread is the source or edge source,
// save its id to srouce id.
if ((err = on_source_id_changed(_srs_context->get_id())) != srs_success) {
return srs_error_wrap(err, "source id change");
}
// Reset the metadata cache, to make VLC happy when disable/enable stream.
// @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448
meta->clear();
// TODO: FIXME: Handle by statistic.
return err;
}
void SrsRtcSource::on_unpublish()
{
// ignore when already unpublished.
if (_can_publish) {
return;
}
rtc->on_unpublish();
// Reset the metadata cache, to make VLC happy when disable/enable stream.
// @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448
meta->update_previous_vsh();
meta->update_previous_ash();
srs_trace("cleanup when unpublish");
_can_publish = true;
_source_id = -1;
// TODO: FIXME: Handle by statistic.
}
SrsMetaCache* SrsRtcSource::cached_meta()
{
return meta;
}
SrsRtcPublisher* SrsRtcSource::rtc_publisher()
{
return rtc_publisher_;
}
void SrsRtcSource::set_rtc_publisher(SrsRtcPublisher* v)
{
rtc_publisher_ = v;
}
srs_error_t SrsRtcSource::on_rtc_audio(SrsSharedPtrMessage* audio)
{
// TODO: FIXME: Merge with on_audio.
// TODO: FIXME: Print key information.
return on_audio_imp(audio);
}
srs_error_t SrsRtcSource::on_video(SrsCommonMessage* shared_video)
{
srs_error_t err = srs_success;
// drop any unknown header video.
// @see https://github.com/ossrs/srs/issues/421
if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {
char b0 = 0x00;
if (shared_video->size > 0) {
b0 = shared_video->payload[0];
}
srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);
return err;
}
// convert shared_video to msg, user should not use shared_video again.
// the payload is transfer to msg, and set to NULL in shared_video.
SrsSharedPtrMessage msg;
if ((err = msg.create(shared_video)) != srs_success) {
return srs_error_wrap(err, "create message");
}
// directly process the video message.
return on_video_imp(&msg);
}
srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
bool is_aac_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size);
bool is_sequence_header = is_aac_sequence_header;
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->previous_ash()->size == msg->size) {
drop_for_reduce = srs_bytes_equals(meta->previous_ash()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh audio, size=%d", msg->size);
}
}
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsRtcConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
}
// cache the sequence header of aac, or first packet of mp3.
// for example, the mp3 is used for hls to write the "right" audio codec.
// TODO: FIXME: to refine the stream info system.
if (is_aac_sequence_header || !meta->ash()) {
if ((err = meta->update_ash(msg)) != srs_success) {
return srs_error_wrap(err, "meta consume audio");
}
}
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
return err;
}
// if atc, update the sequence header to abs time.
if (meta->ash()) {
meta->ash()->timestamp = msg->timestamp;
}
if (meta->data()) {
meta->data()->timestamp = msg->timestamp;
}
return err;
}
srs_error_t SrsRtcSource::on_video_imp(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size);
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/ossrs/srs/issues/474
if (is_sequence_header) {
format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost);
}
if ((err = format->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "format consume video");
}
// Parse RTMP message to RTP packets, in FU-A if too large.
if ((err = rtc->on_video(msg, format)) != srs_success) {
// TODO: We should support more strategies.
srs_warn("rtc: ignore video error %s", srs_error_desc(err).c_str());
srs_error_reset(err);
rtc->on_unpublish();
}
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->previous_vsh()->size == msg->size) {
drop_for_reduce = srs_bytes_equals(meta->previous_vsh()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh video, size=%d", msg->size);
}
}
// cache the sequence header if h264
if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {
return srs_error_wrap(err, "meta update video");
}
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsRtcConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) {
return srs_error_wrap(err, "consume video");
}
}
}
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
return err;
}
// if atc, update the sequence header to abs time.
if (meta->vsh()) {
meta->vsh()->timestamp = msg->timestamp;
}
if (meta->data()) {
meta->data()->timestamp = msg->timestamp;
}
return err;
}
SrsRtcSourceManager::SrsRtcSourceManager()
{
lock = NULL;
}
SrsRtcSourceManager::~SrsRtcSourceManager()
{
srs_mutex_destroy(lock);
}
srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** pps)
{
srs_error_t err = srs_success;
// Lazy create lock, because ST is not ready in SrsRtcSourceManager constructor.
if (!lock) {
lock = srs_mutex_new();
}
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsRtcSource* source = NULL;
if ((source = fetch(r)) != NULL) {
*pps = source;
return err;
}
string stream_url = r->get_stream_url();
string vhost = r->vhost;
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
srs_trace("new source, stream_url=%s", stream_url.c_str());
source = new SrsRtcSource();
if ((err = source->initialize(r)) != srs_success) {
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
}
pool[stream_url] = source;
*pps = source;
return err;
}
SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r)
{
SrsRtcSource* source = NULL;
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
return NULL;
}
source = pool[stream_url];
// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->update_auth(r);
return source;
}
SrsRtcSourceManager* _srs_rtc_sources = new SrsRtcSourceManager();

@ -0,0 +1,181 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 John
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_RTC_SOURCE_HPP
#define SRS_APP_RTC_SOURCE_HPP
#include <srs_core.hpp>
#include <vector>
#include <map>
#include <srs_service_st.hpp>
#include <srs_app_source.hpp>
class SrsRequest;
class SrsConnection;
class SrsMetaCache;
class SrsRtcPublisher;
class SrsSharedPtrMessage;
class SrsCommonMessage;
class SrsMessageArray;
class SrsRtcSource;
class SrsRtcConsumer : public ISrsConsumerQueue
{
private:
SrsRtcSource* source;
// The owner connection for debug, maybe NULL.
SrsConnection* conn;
SrsMessageQueue* queue;
// when source id changed, notice all consumers
bool should_update_source_id;
#ifdef SRS_PERF_QUEUE_COND_WAIT
// The cond wait for mw.
// @see https://github.com/ossrs/srs/issues/251
srs_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
srs_utime_t mw_duration;
#endif
public:
SrsRtcConsumer(SrsRtcSource* s, SrsConnection* c);
virtual ~SrsRtcConsumer();
public:
// Use pass timestamp mode.
// TODO: FIXME: Remove it.
void enable_pass_timestamp();
// Set the size of queue.
// TODO: FIXME: Remove it.
virtual void set_queue_size(srs_utime_t queue_size);
// when source id changed, notice client to print.
virtual void update_source_id();
// Enqueue an shared ptr message.
// @param shared_msg, directly ptr, copy it if need to save it.
// @param whether atc, donot use jitter correct if true.
// @param ag the algorithm of time jitter.
virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag);
// Get packets in consumer queue.
// @param msgs the msgs array to dump packets to send.
// @param count the count in array, intput and output param.
// @remark user can specifies the count to get specified msgs; 0 to get all if possible.
virtual srs_error_t dump_packets(SrsMessageArray* msgs, int& count);
#ifdef SRS_PERF_QUEUE_COND_WAIT
// wait for messages incomming, atleast nb_msgs and in duration.
// @param nb_msgs the messages count to wait.
// @param msgs_duration the messages duration to wait.
virtual void wait(int nb_msgs, srs_utime_t msgs_duration);
#endif
};
class SrsRtcSource
{
private:
// For publish, it's the publish client id.
// For edge, it's the edge ingest id.
// when source id changed, for example, the edge reconnect,
// invoke the on_source_id_changed() to let all clients know.
int _source_id;
// previous source id.
int _pre_source_id;
SrsRequest* req;
SrsRtcPublisher* rtc_publisher_;
private:
// To delivery stream to clients.
std::vector<SrsRtcConsumer*> consumers;
// The metadata cache.
SrsMetaCache* meta;
// Whether source is avaiable for publishing.
bool _can_publish;
private:
// The format, codec information.
SrsRtmpFormat* format;
// rtc handler
SrsRtc* rtc;
public:
SrsRtcSource();
virtual ~SrsRtcSource();
public:
virtual srs_error_t initialize(SrsRequest* r);
// Update the authentication information in request.
virtual void update_auth(SrsRequest* r);
// The source id changed.
virtual srs_error_t on_source_id_changed(int id);
// Get current source id.
virtual int source_id();
virtual int pre_source_id();
public:
// Create consumer
// @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsConnection* conn, SrsRtcConsumer*& consumer);
// Dumps packets in cache to consumer.
// @param ds, whether dumps the sequence header.
// @param dm, whether dumps the metadata.
// @param dg, whether dumps the gop cache.
virtual srs_error_t consumer_dumps(SrsRtcConsumer* consumer, bool ds = true, bool dm = true, bool dg = true);
virtual void on_consumer_destroy(SrsRtcConsumer* consumer);
// TODO: FIXME: Remove the param is_edge.
virtual bool can_publish(bool is_edge);
// When start publish stream.
virtual srs_error_t on_publish();
// When stop publish stream.
virtual void on_unpublish();
public:
// For RTC, we need to package SPS/PPS(in cached meta) before each IDR.
SrsMetaCache* cached_meta();
// Get and set the publisher, passed to consumer to process requests such as PLI.
SrsRtcPublisher* rtc_publisher();
void set_rtc_publisher(SrsRtcPublisher* v);
// When got RTC audio message, which is encoded in opus.
// TODO: FIXME: Merge with on_audio.
srs_error_t on_rtc_audio(SrsSharedPtrMessage* audio);
virtual srs_error_t on_video(SrsCommonMessage* video);
private:
virtual srs_error_t on_audio_imp(SrsSharedPtrMessage* audio);
virtual srs_error_t on_video_imp(SrsSharedPtrMessage* video);
};
class SrsRtcSourceManager
{
private:
srs_mutex_t lock;
std::map<std::string, SrsRtcSource*> pool;
public:
SrsRtcSourceManager();
virtual ~SrsRtcSourceManager();
public:
// create source when fetch from cache failed.
// @param r the client request.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcSource** pps);
private:
// Get the exists source, NULL when not exists.
// update the request and return the exists source.
virtual SrsRtcSource* fetch(SrsRequest* r);
};
// Global singleton instance.
extern SrsRtcSourceManager* _srs_rtc_sources;
#endif

@ -427,6 +427,14 @@ ISrsWakable::~ISrsWakable()
{
}
ISrsConsumerQueue::ISrsConsumerQueue()
{
}
ISrsConsumerQueue::~ISrsConsumerQueue()
{
}
SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
{
source = s;
@ -907,7 +915,7 @@ srs_error_t SrsOriginHub::initialize(SrsSource* s, SrsRequest* r)
}
#ifdef SRS_RTC
if ((err = rtc->initialize(this, req)) != srs_success) {
if ((err = rtc->initialize(req)) != srs_success) {
return srs_error_wrap(err, "rtc initialize");
}
#endif
@ -1629,7 +1637,7 @@ SrsFormat* SrsMetaCache::ash_format()
return aformat;
}
srs_error_t SrsMetaCache::dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds)
srs_error_t SrsMetaCache::dumps(ISrsConsumerQueue* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds)
{
srs_error_t err = srs_success;

@ -186,8 +186,18 @@ public:
virtual void wakeup() = 0;
};
// Enqueue the packet to consumer.
class ISrsConsumerQueue
{
public:
ISrsConsumerQueue();
virtual ~ISrsConsumerQueue();
public:
virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) = 0;
};
// The consumer for SrsSource, that is a play client.
class SrsConsumer : public ISrsWakable
class SrsConsumer : virtual public ISrsWakable, virtual public ISrsConsumerQueue
{
private:
SrsRtmpJitter* jitter;
@ -207,6 +217,7 @@ private:
srs_utime_t mw_duration;
#endif
private:
// TODO: FIXME: Move to RTC consumer.
// For RTC, we never use jitter to correct timestamp.
// But we should not change the atc or time_jitter for source or RTMP.
// @remark In this mode, we also never check the queue by timstamp, but only by count.
@ -450,7 +461,7 @@ public:
// Dumps cached metadata to consumer.
// @param dm Whether dumps the metadata.
// @param ds Whether dumps the sequence header.
virtual srs_error_t dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds);
virtual srs_error_t dumps(ISrsConsumerQueue* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds);
public:
// Previous exists sequence header.
virtual SrsSharedPtrMessage* previous_vsh();

Loading…
Cancel
Save