From a5a9f7c8c5ceb6b41fc0fed7c43c126fb2e1b7c4 Mon Sep 17 00:00:00 2001 From: zhengfl Date: Thu, 16 Jul 2015 11:36:14 +0800 Subject: [PATCH 1/4] =?UTF-8?q?refine=20code:=E4=BC=98=E5=8C=96edge?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E5=9B=9E=E6=BA=90=E6=92=AD=E6=94=BE=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=90=AF=E5=8A=A8=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- trunk/src/app/srs_app_http_stream.cpp | 9 --------- trunk/src/app/srs_app_rtmp_conn.cpp | 9 --------- trunk/src/app/srs_app_source.cpp | 16 ++++++++++------ trunk/src/app/srs_app_source.hpp | 2 -- 4 files changed, 10 insertions(+), 26 deletions(-) diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 5d99d9917..e74660ac0 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -1210,15 +1210,6 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) srs_trace("hstrs: source url=%s, is_edge=%d, source_id=%d[%d]", r->get_stream_url().c_str(), vhost_is_edge, s->source_id(), s->source_id()); - // TODO: FIXME: disconnect when all connection closed. - if (vhost_is_edge) { - // notice edge to start for the first client. - if ((ret = s->on_edge_start_play()) != ERROR_SUCCESS) { - srs_error("notice edge start play stream failed. ret=%d", ret); - return ret; - } - } - return ret; } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 9cf15db59..1a936cacf 100755 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -537,14 +537,6 @@ int SrsRtmpConn::playing(SrsSource* source) SrsAutoFree(SrsConsumer, consumer); srs_verbose("consumer created success."); - if (_srs_config->get_vhost_is_edge(req->vhost)) { - // notice edge to start for the first client. - if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) { - srs_error("notice edge start play stream failed. ret=%d", ret); - return ret; - } - } - // use isolate thread to recv, // @see: https://github.com/simple-rtmp-server/srs/issues/217 SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP); @@ -835,7 +827,6 @@ int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge) if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { srs_error("notice edge start publish stream failed. ret=%d", ret); } - return ret; } else { if ((ret = source->on_publish()) != ERROR_SUCCESS) { srs_error("notify publish failed. ret=%d", ret); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 5dd98c204..1c38fbaa9 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -765,7 +765,7 @@ SrsSource* SrsSource::fetch(SrsRequest* r) SrsSource* SrsSource::fetch(std::string vhost, std::string app, std::string stream) { SrsSource* source = NULL; - string stream_url = srs_generate_stream_url(vhost, app, stream); + string stream_url = srs_generate_stream_url(vhost, app, stream); if (pool.find(stream_url) == pool.end()) { return NULL; @@ -2135,6 +2135,15 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg } else { srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm); } + + // for edge, when play edge stream, check the state + if (_srs_config->get_vhost_is_edge(_req->vhost)) { + // notice edge to start for the first client. + if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) { + srs_error("notice edge start play stream failed. ret=%d", ret); + return ret; + } + } return ret; } @@ -2163,11 +2172,6 @@ SrsRtmpJitterAlgorithm SrsSource::jitter() return jitter_algorithm; } -int SrsSource::on_edge_start_play() -{ - return play_edge->on_client_play(); -} - int SrsSource::on_edge_start_publish() { return publish_edge->on_client_publish(); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 372803dd5..523115964 100755 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -574,8 +574,6 @@ public: virtual SrsRtmpJitterAlgorithm jitter(); // internal public: - // for edge, when play edge stream, check the state - virtual int on_edge_start_play(); // for edge, when publish edge stream, check the state virtual int on_edge_start_publish(); // for edge, proxy the publish From 366d3a3f560be157e7070f36a5bc604d1993c5b5 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 16 Jul 2015 17:30:27 +0800 Subject: [PATCH 2/4] for #441, use 30s timeout for first msg. 2.0.178 --- README.md | 1 + trunk/src/app/srs_app_rtmp_conn.cpp | 10 ++++++++-- trunk/src/core/srs_core.hpp | 2 +- trunk/src/kernel/srs_kernel_consts.hpp | 2 ++ 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1fd12e5ee..094532eca 100755 --- a/README.md +++ b/README.md @@ -344,6 +344,7 @@ Remark: ### SRS 2.0 history +* v2.0, 2015-07-16, for [#441](https://github.com/simple-rtmp-server/srs/issues/441) use 30s timeout for first msg. 2.0.178 * v2.0, 2015-07-14, refine hls disable the time jitter, support not mix monotonically increase. 2.0.177 * v2.0, 2015-07-01, fix [#433](https://github.com/simple-rtmp-server/srs/issues/433) fix the sps parse bug. 2.0.176 * v2.0, 2015-06-10, fix [#425](https://github.com/simple-rtmp-server/srs/issues/425) refine the time jitter, correct (-inf,-250)+(250,+inf) to 10ms. 2.0.175 diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 9cf15db59..7e3adef37 100755 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -782,8 +782,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) while (!disposed) { pprint->elapse(); - // cond wait for error. - trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000); + // cond wait for timeout. + if (nb_msgs == 0) { + // when not got msgs, wait for a larger timeout. + // @see https://github.com/simple-rtmp-server/srs/issues/441 + trd->wait(SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US / 1000); + } else { + trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000); + } // check the thread error code. if ((ret = trd->error_code()) != ERROR_SUCCESS) { diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 10d9e060c..bafab467e 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 177 +#define VERSION_REVISION 178 // server info. #define RTMP_SIG_SRS_KEY "SRS" diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index 4c2428352..1cf9b5b99 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -78,6 +78,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // we must use more smaller timeout, for the recv never know the status // of underlayer socket. #define SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US (int64_t)(3*1000*1000LL) +// when no msg recevied for publisher, use larger timeout. +#define SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US 10*SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US // the timeout to wait for client control message, // if timeout, we generally ignore and send the data to client, From 99db2888e771500295293e12ca508650a479909c Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 16 Jul 2015 18:42:27 +0800 Subject: [PATCH 3/4] refine code for hstrs. --- trunk/src/app/srs_app_http_stream.cpp | 24 +++++++++++++----------- trunk/src/app/srs_app_http_stream.hpp | 2 ++ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index e74660ac0..21ebfda4e 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -63,6 +63,9 @@ SrsStreamCache::SrsStreamCache(SrsSource* s, SrsRequest* r) source = s; queue = new SrsMessageQueue(true); pthread = new SrsEndlessThread("http-stream", this); + + // TODO: FIXME: support reload. + fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); } SrsStreamCache::~SrsStreamCache() @@ -82,8 +85,6 @@ int SrsStreamCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jit { int ret = ERROR_SUCCESS; - double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); - if (fast_cache <= 0) { srs_info("http: ignore dump fast cache."); return ret; @@ -104,6 +105,13 @@ int SrsStreamCache::cycle() { int ret = ERROR_SUCCESS; + // TODO: FIXME: support reload. + if (fast_cache <= 0) { + return ret; + } + + // the stream cache will create consumer to cache stream, + // which will trigger to fetch stream from origin for edge. SrsConsumer* consumer = NULL; if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) { srs_error("http: create consumer failed. ret=%d", ret); @@ -116,11 +124,9 @@ int SrsStreamCache::cycle() SrsMessageArray msgs(SRS_PERF_MW_MSGS); + // set the queue size, which used for max cache. // TODO: FIXME: support reload. - double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); - if (fast_cache > 0) { - queue->set_queue_size(fast_cache); - } + queue->set_queue_size(fast_cache); while (true) { pprint->elapse(); @@ -150,11 +156,7 @@ int SrsStreamCache::cycle() // free the messages. for (int i = 0; i < count; i++) { SrsSharedPtrMessage* msg = msgs.msgs[i]; - if (fast_cache > 0) { - queue->enqueue(msg); - } else { - srs_freep(msg); - } + queue->enqueue(msg); } } diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 3df97f4b3..b07dd448c 100644 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -41,6 +41,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ class SrsStreamCache : public ISrsEndlessThreadHandler { +private: + double fast_cache; private: SrsMessageQueue* queue; SrsSource* source; From 3b65af9bd20354d44f4147deca88bb1e4a2cc02a Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 17 Jul 2015 14:05:34 +0800 Subject: [PATCH 4/4] fix the hstrs bug on edge. --- trunk/src/app/srs_app_http_stream.cpp | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 21ebfda4e..cc41986cb 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#define SRS_STREAM_CACHE_CYCLE_SECONDS 30 + #if defined(SRS_AUTO_HTTP_CORE) #include @@ -107,6 +109,7 @@ int SrsStreamCache::cycle() // TODO: FIXME: support reload. if (fast_cache <= 0) { + st_sleep(SRS_STREAM_CACHE_CYCLE_SECONDS); return ret; } @@ -1139,8 +1142,10 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) } // hstrs not enabled, ignore. - // for origin: generally set hstrs to 'off' and mount while stream is pushed to origin. - // for edge: must set hstrs to 'on' so that it could trigger rtmp stream before mount. + // for origin, the http stream will be mount already when publish, + // so it must never enter this line for stream already mounted. + // for edge, the http stream is trigger by hstrs and mount by it, + // so we only hijack when only edge and hstrs is on. entry = it->second; if (!entry->hstrs) { return ret; @@ -1177,12 +1182,18 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) SrsAutoFree(SrsRequest, r); std::string sid = r->get_stream_url(); - // check if the stream is enabled. + // check whether the http remux is enabled, + // for example, user disable the http flv then reload. if (sflvs.find(sid) != sflvs.end()) { SrsLiveEntry* s_entry = sflvs[sid]; if (!s_entry->stream->entry->enabled) { - srs_error("stream is disabled, hijack failed. ret=%d", ret); - return ret; + // only when the http entry is disabled, check the config whether http flv disable, + // for the http flv edge use hijack to trigger the edge ingester, we always mount it + // eventhough the origin does not exists the specified stream. + if (!_srs_config->get_vhost_http_remux_enabled(r->vhost)) { + srs_error("stream is disabled, hijack failed. ret=%d", ret); + return ret; + } } }