From 20b4984af4afd20fba7e1c2ff5c036e103959796 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 3 May 2020 07:43:05 +0800 Subject: [PATCH] Refine consumer dumps, extract dumps API --- trunk/src/app/srs_app_http_api.cpp | 1 + trunk/src/app/srs_app_http_stream.cpp | 17 +++++++++++------ trunk/src/app/srs_app_rtc_conn.cpp | 6 +++++- trunk/src/app/srs_app_rtmp_conn.cpp | 5 ++++- trunk/src/app/srs_app_source.cpp | 27 +++++++++++++++++---------- trunk/src/app/srs_app_source.hpp | 6 ++++-- 6 files changed, 42 insertions(+), 20 deletions(-) diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index d84930a81..6394f7037 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1089,6 +1089,7 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str local_media_desc.rtcp_mux_ = true; local_media_desc.rtcp_rsize_ = true; + // TODO: FIXME: Avoid SSRC collision. if (!ssrc_num) { ssrc_num = ::getpid() * 10000 + ::getpid() * 100 + ::getpid(); } diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 95a67c21a..4afe83f3e 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -128,10 +128,13 @@ srs_error_t SrsBufferCache::cycle() // the stream cache will create consumer to cache stream, // which will trigger to fetch stream from origin for edge. SrsConsumer* consumer = NULL; - if ((err = source->create_consumer(NULL, consumer, false, false, true)) != srs_success) { + SrsAutoFree(SrsConsumer, consumer); + if ((err = source->create_consumer(NULL, consumer)) != srs_success) { return srs_error_wrap(err, "create consumer"); } - SrsAutoFree(SrsConsumer, consumer); + if ((err = source->consumer_dumps(consumer, false, false, true)) != srs_success) { + return srs_error_wrap(err, "dumps consumer"); + } SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream_cache(); SrsAutoFree(SrsPithyPrint, pprint); @@ -583,12 +586,14 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // create consumer of souce, ignore gop cache, use the audio gop cache. SrsConsumer* consumer = NULL; - if ((err = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != srs_success) { + SrsAutoFree(SrsConsumer, consumer); + if ((err = source->create_consumer(NULL, consumer)) != srs_success) { return srs_error_wrap(err, "create consumer"); } - SrsAutoFree(SrsConsumer, consumer); - srs_verbose("http: consumer created success."); - + if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) { + return srs_error_wrap(err, "dumps consumer"); + } + SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream(); SrsAutoFree(SrsPithyPrint, pprint); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index d2212cff9..efe93ff9f 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -721,7 +721,6 @@ srs_error_t SrsRtcSenderThread::cycle() SrsConsumer* consumer = NULL; SrsAutoFree(SrsConsumer, consumer); - // TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames. if ((err = source->create_consumer(NULL, consumer)) != srs_success) { return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str()); } @@ -733,6 +732,11 @@ srs_error_t SrsRtcSenderThread::cycle() // messages and drop them if the shared sender queue is full. consumer->enable_pass_timestamp(); + // TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames. + if ((err = source->consumer_dumps(consumer)) != srs_success) { + return srs_error_wrap(err, "dumps consumer, source url=%s", req->get_stream_url().c_str()); + } + realtime = _srs_config->get_realtime_enabled(req->vhost, true); mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 0f8b1012a..4bcf0622e 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -654,10 +654,13 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source) // Create a consumer of source. SrsConsumer* consumer = NULL; + SrsAutoFree(SrsConsumer, consumer); if ((err = source->create_consumer(this, consumer)) != srs_success) { return srs_error_wrap(err, "rtmp: create consumer"); } - SrsAutoFree(SrsConsumer, consumer); + if ((err = source->consumer_dumps(consumer)) != srs_success) { + return srs_error_wrap(err, "rtmp: dumps consumer"); + } // Use receiving thread to receive packets from peer. // @see: https://github.com/ossrs/srs/issues/217 diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 651801600..9fe1e2cdc 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -2594,12 +2594,27 @@ void SrsSource::on_unpublish() } } -srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) +srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer) { srs_error_t err = srs_success; consumer = new SrsConsumer(this, conn); consumers.push_back(consumer); + + // for edge, when play edge stream, check the state + if (_srs_config->get_vhost_is_edge(req->vhost)) { + // notice edge to start for the first client. + if ((err = play_edge->on_client_play()) != srs_success) { + return srs_error_wrap(err, "play edge"); + } + } + + return err; +} + +srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, bool dg) +{ + srs_error_t err = srs_success; srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); consumer->set_queue_size(queue_size); @@ -2636,15 +2651,7 @@ srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consum } else { srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm); } - - // for edge, when play edge stream, check the state - if (_srs_config->get_vhost_is_edge(req->vhost)) { - // notice edge to start for the first client. - if ((err = play_edge->on_client_play()) != srs_success) { - return srs_error_wrap(err, "play edge"); - } - } - + return err; } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 7b6a96107..1d27fdbbb 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -599,12 +599,14 @@ public: virtual srs_error_t on_publish(); virtual void on_unpublish(); public: - // Create consumer and dumps packets in cache. + // Create consumer // @param consumer, output the create consumer. + virtual srs_error_t create_consumer(SrsConnection* conn, SrsConsumer*& consumer); + // Dumps packets in cache to consumer. // @param ds, whether dumps the sequence header. // @param dm, whether dumps the metadata. // @param dg, whether dumps the gop cache. - virtual srs_error_t create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds = true, bool dm = true, bool dg = true); + virtual srs_error_t consumer_dumps(SrsConsumer* consumer, bool ds = true, bool dm = true, bool dg = true); virtual void on_consumer_destroy(SrsConsumer* consumer); virtual void set_cache(bool enabled); virtual SrsRtmpJitterAlgorithm jitter();