From 8d99ef27cf35c1167962414ffced3bbc6e2fdec6 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 27 May 2014 17:59:59 +0800 Subject: [PATCH] fix #84: unpublish source when edge stop, clear gop cache --- trunk/src/app/srs_app_edge.cpp | 7 +++-- trunk/src/app/srs_app_rtmp_conn.cpp | 42 ++++++++++++++++------------- trunk/src/app/srs_app_source.cpp | 24 +++++++++++++++++ trunk/src/app/srs_app_source.hpp | 13 +++++++++ trunk/src/core/srs_core.hpp | 2 +- 5 files changed, 67 insertions(+), 21 deletions(-) diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 11fc13645..aa54b4cd6 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -105,6 +105,9 @@ void SrsEdgeIngester::stop() srs_freep(client); srs_freep(io); kbps->set_io(NULL, NULL); + + // notice to unpublish. + _source->on_unpublish(); } int SrsEdgeIngester::cycle() @@ -285,8 +288,8 @@ int SrsEdgeIngester::connect_server() } // open socket. - srs_trace("connect edge stream=%s, tcUrl=%s to server=%s, port=%d", - _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port); + srs_trace("edge connected, can_publish=%d, url=%s/%s, server=%s:%d", + _source->can_publish(), _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port); // TODO: FIXME: extract utility method int sock = socket(AF_INET, SOCK_STREAM, 0); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 88f81f0f7..cd368b98c 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -271,7 +271,7 @@ int SrsRtmpConn::stream_service_cycle() srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret); return ret; } - srs_trace("set chunk_size=%d success", chunk_size); + srs_info("set chunk_size=%d success", chunk_size); // find a source to serve. SrsSource* source = NULL; @@ -296,16 +296,16 @@ int SrsRtmpConn::stream_service_cycle() } bool enabled_cache = _srs_config->get_gop_cache(req->vhost); - srs_trace("source found, ip=%s, url=%s, enabled_cache=%d, edge=%d", - ip.c_str(), req->get_stream_url().c_str(), enabled_cache, vhost_is_edge); + srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, id=%d", + req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, source->source_id()); source->set_cache(enabled_cache); switch (type) { case SrsRtmpConnPlay: { srs_verbose("start to play stream %s.", req->stream.c_str()); - // notice edge to start for the first client. if (vhost_is_edge) { + // notice edge to start for the first client. if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) { srs_error("notice edge start play stream failed. ret=%d", ret); return ret; @@ -568,15 +568,18 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); - // notify the hls to prepare when publish start. - if ((ret = source->on_publish()) != ERROR_SUCCESS) { - srs_error("fmle hls on_publish failed. ret=%d", ret); - return ret; - } - srs_verbose("fmle hls on_publish success."); - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + // when edge, ignore the publish event, directly proxy it. + if (vhost_is_edge) { + // notify the hls to prepare when publish start. + if ((ret = source->on_publish()) != ERROR_SUCCESS) { + srs_error("fmle hls on_publish failed. ret=%d", ret); + return ret; + } + srs_verbose("fmle hls on_publish success."); + } + while (true) { // switch to other st-threads. st_usleep(0); @@ -644,15 +647,18 @@ int SrsRtmpConn::flash_publish(SrsSource* source) SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); - // notify the hls to prepare when publish start. - if ((ret = source->on_publish()) != ERROR_SUCCESS) { - srs_error("flash hls on_publish failed. ret=%d", ret); - return ret; - } - srs_verbose("flash hls on_publish success."); - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + // when edge, ignore the publish event, directly proxy it. + if (vhost_is_edge) { + // notify the hls to prepare when publish start. + if ((ret = source->on_publish()) != ERROR_SUCCESS) { + srs_error("flash hls on_publish failed. ret=%d", ret); + return ret; + } + srs_verbose("flash hls on_publish success."); + } + while (true) { // switch to other st-threads. st_usleep(0); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 78182b5a8..0c96a7a6d 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -465,6 +465,7 @@ SrsSource::SrsSource(SrsRequest* req) frame_rate = sample_rate = 0; _can_publish = true; + _source_id = -1; play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); @@ -791,6 +792,24 @@ int SrsSource::on_dvr_request_sh() return ret; } +int SrsSource::on_source_id_changed(int id) +{ + int ret = ERROR_SUCCESS; + + if (_source_id == id) { + return ret; + } + + _source_id = id; + + return ret; +} + +int SrsSource::source_id() +{ + return _source_id; +} + bool SrsSource::can_publish() { return _can_publish; @@ -1181,6 +1200,10 @@ int SrsSource::on_publish() _can_publish = false; + // whatever, the publish thread is the source or edge source, + // save its id to srouce id. + on_source_id_changed(_srs_context->get_id()); + // create forwarders if ((ret = create_forwarders()) != ERROR_SUCCESS) { srs_error("create forwarders failed. ret=%d", ret); @@ -1239,6 +1262,7 @@ void SrsSource::on_unpublish() srs_trace("clear cache/metadata/sequence-headers when unpublish."); _can_publish = true; + _source_id = -1; } int SrsSource::create_consumer(SrsConsumer*& consumer) diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index b9bfa248f..48a454223 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -229,6 +229,12 @@ public: */ static void destroy(); private: + // source id, + // for publish, it's the publish client id. + // for edge, it's the edge ingest id. + // when source id changed, for example, the edge reconnect, + // invoke the on_source_id_changed() to let all clients know. + int _source_id; // deep copy of client request. SrsRequest* _req; // to delivery stream to clients. @@ -298,6 +304,7 @@ public: virtual int on_reload_vhost_hls(std::string vhost); virtual int on_reload_vhost_dvr(std::string vhost); virtual int on_reload_vhost_transcode(std::string vhost); +// for the tools callback public: // for the SrsForwarder to callback to request the sequence headers. virtual int on_forwarder_start(SrsForwarder* forwarder); @@ -305,6 +312,11 @@ public: virtual int on_hls_start(); // for the SrsDvr to callback to request the sequence headers. virtual int on_dvr_request_sh(); + // source id changed. + virtual int on_source_id_changed(int id); + // get current source id. + virtual int source_id(); +// logic data methods public: virtual bool can_publish(); virtual int on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata); @@ -318,6 +330,7 @@ public: */ virtual int on_publish(); virtual void on_unpublish(); +// consumer methods public: virtual int create_consumer(SrsConsumer*& consumer); virtual void on_consumer_destroy(SrsConsumer* consumer); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 241fc9b13..39304e46d 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "118" +#define VERSION_REVISION "119" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "SRS"