From a2dd63b67835db41498c3534f1eb42056339ba5d Mon Sep 17 00:00:00 2001 From: khjiang <2693305483@qq.com> Date: Wed, 8 Jan 2025 16:09:09 +0800 Subject: [PATCH] fix: Problem that the source is regularly cleared before setting the status after the stream publishing completes the create_of_fatch() operation --- trunk/src/app/srs_app_rtc_source.cpp | 21 +++++++++++++++++++-- trunk/src/app/srs_app_rtc_source.hpp | 1 + trunk/src/app/srs_app_source.cpp | 22 +++++++++++++++++++--- trunk/src/app/srs_app_source.hpp | 1 + trunk/src/app/srs_app_srt_source.cpp | 20 ++++++++++++++++++-- trunk/src/app/srs_app_srt_source.hpp | 1 + 6 files changed, 59 insertions(+), 7 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 5ad61e3f9..b390b9f6c 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -287,6 +287,7 @@ srs_error_t SrsRtcSourceManager::notify(int event, srs_utime_t interval, srs_uti // When source expired, remove it. // @see https://github.com/ossrs/srs/issues/713 + SrsLocker(lock); if (source->stream_is_dead()) { SrsContextId cid = source->source_id(); if (cid.empty()) cid = source->pre_source_id(); @@ -318,6 +319,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtrupdate_auth(r); + source->update_stream_die_at(); pps = source; return err; @@ -387,7 +389,7 @@ SrsRtcSource::SrsRtcSource() #endif pli_for_rtmp_ = pli_elapsed_ = 0; - stream_die_at_ = 0; + stream_die_at_ = srs_get_system_time(); } SrsRtcSource::~SrsRtcSource() @@ -493,6 +495,21 @@ void SrsRtcSource::update_auth(SrsRequest* r) req->update_auth(r); } +void SrsRtcSource::update_stream_die_at() +{ + // already publishing + if (!is_created_) { + return; + } + + // has consumers + if (!consumers.empty()) { + return; + } + + stream_die_at_ = srs_get_system_time(); +} + srs_error_t SrsRtcSource::on_source_changed() { srs_error_t err = srs_success; @@ -554,7 +571,7 @@ srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer) consumer = new SrsRtcConsumer(this); consumers.push_back(consumer); - stream_die_at_ = 0; + // stream_die_at_ = 0; // TODO: FIXME: Implements edge cluster. diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index b8450a51b..6ea1dc6ac 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -215,6 +215,7 @@ private: public: // Update the authentication information in request. virtual void update_auth(SrsRequest* r); + virtual void update_stream_die_at(); private: // The stream source changed. virtual srs_error_t on_source_changed(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 956f7b2e3..644b8c1a4 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1785,6 +1785,7 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH // for origin auth is on, the token in request maybe invalid, // and we only need to update the token of request, it's simple. source->update_auth(r); + source->update_stream_die_at(); pps = source; return err; } @@ -1861,6 +1862,7 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut // When source expired, remove it. // @see https://github.com/ossrs/srs/issues/713 + SrsLocker(lock); if (source->stream_is_dead()) { SrsContextId cid = source->source_id(); if (cid.empty()) cid = source->pre_source_id(); @@ -1887,7 +1889,7 @@ SrsLiveSource::SrsLiveSource() mix_queue = new SrsMixQueue(); can_publish_ = true; - stream_die_at_ = 0; + stream_die_at_ = srs_get_system_time(); //SrsLiveSource should have a die time. publisher_idle_at_ = 0; handler = NULL; @@ -2159,6 +2161,21 @@ void SrsLiveSource::update_auth(SrsRequest* r) req->update_auth(r); } +void SrsLiveSource::update_stream_die_at() +{ + // already publishing + if (!can_publish_ || !publish_edge->can_publish()) { + return; + } + + // has consumers + if (!consumers.empty()) { + return; + } + + stream_die_at_ = srs_get_system_time(); +} + bool SrsLiveSource::can_publish(bool is_edge) { // TODO: FIXME: Should check the status of bridge. @@ -2676,8 +2693,7 @@ srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer) consumer = new SrsLiveConsumer(this); consumers.push_back(consumer); - // There are more than one consumer, so reset the timeout. - stream_die_at_ = 0; + // There are more than one consumer, so reset the publisher idle timeout. publisher_idle_at_ = 0; return err; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index e10b441de..658a4dbaa 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -563,6 +563,7 @@ public: virtual bool inactive(); // Update the authentication information in request. virtual void update_auth(SrsRequest* r); + virtual void update_stream_die_at(); public: virtual bool can_publish(bool is_edge); virtual srs_error_t on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index 98470c879..006c22bbb 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -137,6 +137,7 @@ srs_error_t SrsSrtSourceManager::notify(int event, srs_utime_t interval, srs_uti // When source expired, remove it. // @see https://github.com/ossrs/srs/issues/713 + SrsLocker(lock); if (source->stream_is_dead()) { SrsContextId cid = source->source_id(); if (cid.empty()) cid = source->pre_source_id(); @@ -167,6 +168,7 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtrupdate_auth(r); + source->update_stream_die_at(); pps = source; return err; @@ -900,7 +902,7 @@ SrsSrtSource::SrsSrtSource() can_publish_ = true; frame_builder_ = NULL; bridge_ = NULL; - stream_die_at_ = 0; + stream_die_at_ = srs_get_system_time(); } SrsSrtSource::~SrsSrtSource() @@ -986,6 +988,20 @@ void SrsSrtSource::update_auth(SrsRequest* r) req->update_auth(r); } +void SrsSrtSource::update_stream_die_at() +{ + if (!can_publish_) { + return; + } + + // has consumers + if (!consumers.empty()) { + return; + } + + stream_die_at_ = srs_get_system_time(); +} + void SrsSrtSource::set_bridge(ISrsStreamBridge* bridge) { srs_freep(bridge_); @@ -1002,7 +1018,7 @@ srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer) consumer = new SrsSrtConsumer(this); consumers.push_back(consumer); - stream_die_at_ = 0; + // stream_die_at_ = 0; return err; } diff --git a/trunk/src/app/srs_app_srt_source.hpp b/trunk/src/app/srs_app_srt_source.hpp index a1c53c0df..1c8316b90 100644 --- a/trunk/src/app/srs_app_srt_source.hpp +++ b/trunk/src/app/srs_app_srt_source.hpp @@ -173,6 +173,7 @@ public: virtual SrsContextId pre_source_id(); // Update the authentication information in request. virtual void update_auth(SrsRequest* r); + virtual void update_stream_die_at(); public: void set_bridge(ISrsStreamBridge* bridge); public: