From 2263f2a610053a54e93c0cb293fa4bce61dd3637 Mon Sep 17 00:00:00 2001 From: zhengfl Date: Thu, 9 Jul 2015 15:45:55 +0800 Subject: [PATCH] =?UTF-8?q?refine=20code:=20=20=20=20=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=20=E5=88=A4=E6=96=AD=E9=87=8D=E5=A4=8D=E6=8E=A8=E6=B5=81=20?= =?UTF-8?q?=E5=8F=8A=20=E6=8E=A8=E6=B5=81=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- trunk/src/app/srs_app_rtmp_conn.cpp | 170 +++++++++------------------- trunk/src/app/srs_app_rtmp_conn.hpp | 5 +- trunk/src/app/srs_app_source.cpp | 20 ---- trunk/src/app/srs_app_source.hpp | 7 -- 4 files changed, 59 insertions(+), 143 deletions(-) mode change 100644 => 100755 trunk/src/app/srs_app_rtmp_conn.cpp mode change 100644 => 100755 trunk/src/app/srs_app_rtmp_conn.hpp mode change 100644 => 100755 trunk/src/app/srs_app_source.cpp mode change 100644 => 100755 trunk/src/app/srs_app_source.hpp diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp old mode 100644 new mode 100755 index 3ff909724..9cf15db59 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -414,8 +414,6 @@ int SrsRtmpConn::stream_service_cycle() } srs_info("set chunk_size=%d success", chunk_size); - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - // find a source to serve. SrsSource* source = SrsSource::fetch(req); if (!source) { @@ -432,19 +430,7 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - // check ASAP, to fail it faster if invalid. - if (type != SrsRtmpConnPlay) { - // check publish available - if (!source->can_publish(vhost_is_edge)) { - ret = ERROR_SYSTEM_STREAM_BUSY; - srs_warn("stream %s is already publishing. ret=%d", - req->get_stream_url().c_str(), ret); - // to delay request - st_usleep(SRS_STREAM_BUSY_SLEEP_US); - return ret; - } - } - + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, @@ -479,19 +465,7 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - if (!vhost_is_edge) { - if ((ret = source->acquire_publish()) != ERROR_SUCCESS) { - return ret; - } - } - - ret = fmle_publishing(source); - - if (!vhost_is_edge) { - source->release_publish(); - } - - return ret; + return publishing(source); } case SrsRtmpConnFlashPublish: { srs_verbose("flash start to publish stream %s.", req->stream.c_str()); @@ -501,19 +475,7 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - if (!vhost_is_edge) { - if ((ret = source->acquire_publish()) != ERROR_SUCCESS) { - return ret; - } - } - - ret = flash_publishing(source); - - if (!vhost_is_edge) { - source->release_publish(); - } - - return ret; + return publishing(source); } default: { ret = ERROR_SYSTEM_CLIENT_INVALID; @@ -767,69 +729,35 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe return ret; } -int SrsRtmpConn::fmle_publishing(SrsSource* source) +int SrsRtmpConn::publishing(SrsSource* source) { int ret = ERROR_SUCCESS; - - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - - if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { - srs_error("http hook on_publish failed. ret=%d", ret); - return ret; - } - - // use isolate thread to recv, - // @see: https://github.com/simple-rtmp-server/srs/issues/237 - SrsPublishRecvThread trd(rtmp, req, - st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge); - srs_info("start to publish stream %s success", req->stream.c_str()); - ret = do_publishing(source, &trd); - - // stop isolate recv thread - trd.stop(); - - // when edge, notice edge to change state. - // when origin, notice all service to unpublish. - if (vhost_is_edge) { - source->on_edge_proxy_unpublish(); - } else { - source->on_unpublish(); + if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { + srs_error("check publish_refer failed. ret=%d", ret); + return ret; } - - http_hooks_on_unpublish(); - - return ret; -} - -int SrsRtmpConn::flash_publishing(SrsSource* source) -{ - int ret = ERROR_SUCCESS; - - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + srs_verbose("check publish_refer success."); if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { srs_error("http hook on_publish failed. ret=%d", ret); return ret; } - // use isolate thread to recv, - // @see: https://github.com/simple-rtmp-server/srs/issues/237 - SrsPublishRecvThread trd(rtmp, req, - st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge); + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) { + // use isolate thread to recv, + // @see: https://github.com/simple-rtmp-server/srs/issues/237 + SrsPublishRecvThread trd(rtmp, req, + st_netfd_fileno(stfd), 0, this, source, true, vhost_is_edge); - srs_info("start to publish stream %s success", req->stream.c_str()); - ret = do_publishing(source, &trd); + srs_info("start to publish stream %s success", req->stream.c_str()); + ret = do_publishing(source, &trd); - // stop isolate recv thread - trd.stop(); + // stop isolate recv thread + trd.stop(); - // when edge, notice edge to change state. - // when origin, notice all service to unpublish. - if (vhost_is_edge) { - source->on_edge_proxy_unpublish(); - } else { - source->on_unpublish(); + release_publish(source, vhost_is_edge); } http_hooks_on_unpublish(); @@ -840,33 +768,10 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) { int ret = ERROR_SUCCESS; - - if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { - srs_error("check publish_refer failed. ret=%d", ret); - return ret; - } - srs_verbose("check publish_refer success."); SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish(); SrsAutoFree(SrsPithyPrint, pprint); - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - - // when edge, ignore the publish event, directly proxy it. - if (!vhost_is_edge) { - // notify the hls to prepare when publish start. - if ((ret = source->on_publish()) != ERROR_SUCCESS) { - srs_error("hls on_publish failed. ret=%d", ret); - return ret; - } - srs_verbose("hls on_publish success."); - } else { - if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { - srs_error("notice edge start publish stream failed. ret=%d", ret); - return ret; - } - } - // start isolate recv thread. if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("start isolate recv thread failed. ret=%d", ret); @@ -914,6 +819,43 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) return ret; } +int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge) +{ + int ret = ERROR_SUCCESS; + + if (!source->can_publish(is_edge)) { + ret = ERROR_SYSTEM_STREAM_BUSY; + srs_warn("stream %s is already publishing. ret=%d", + req->get_stream_url().c_str(), ret); + return ret; + } + + // when edge, ignore the publish event, directly proxy it. + if (is_edge) { + if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { + srs_error("notice edge start publish stream failed. ret=%d", ret); + } + return ret; + } else { + if ((ret = source->on_publish()) != ERROR_SUCCESS) { + srs_error("notify publish failed. ret=%d", ret); + } + } + + return ret; +} + +void SrsRtmpConn::release_publish(SrsSource* source, bool is_edge) +{ + // when edge, notice edge to change state. + // when origin, notice all service to unpublish. + if (is_edge) { + source->on_edge_proxy_unpublish(); + } else { + source->on_unpublish(); + } +} + int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp old mode 100644 new mode 100755 index 96db60ac7..ddaa7e349 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -111,9 +111,10 @@ private: virtual int check_vhost(); virtual int playing(SrsSource* source); virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd); - virtual int fmle_publishing(SrsSource* source); - virtual int flash_publishing(SrsSource* source); + virtual int publishing(SrsSource* source); virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd); + virtual int acquire_publish(SrsSource* source, bool is_edge); + virtual void release_publish(SrsSource* source, bool is_edge); virtual int handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge); virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp old mode 100644 new mode 100755 index d63154fe2..4b25901f1 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1958,26 +1958,6 @@ int SrsSource::on_aggregate(SrsCommonMessage* msg) return ret; } -int SrsSource::acquire_publish() -{ - int ret = ERROR_SUCCESS; - - if (!_can_publish) { - ret = ERROR_SYSTEM_STREAM_BUSY; - srs_warn("publish lock stream failed, ret=%d", ret); - return ret; - } - - _can_publish = false; - - return ret; -} - -void SrsSource::release_publish() -{ - _can_publish = true; -} - int SrsSource::on_publish() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp old mode 100644 new mode 100755 index 86661b190..372803dd5 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -550,13 +550,6 @@ private: public: virtual int on_aggregate(SrsCommonMessage* msg); /** - * the pre-publish is we are very sure we are - * trying to publish stream, please lock the resource, - * and we use release_publish() to release the resource. - */ - virtual int acquire_publish(); - virtual void release_publish(); - /** * publish stream event notify. * @param _req the request from client, the source will deep copy it, * for when reload the request of client maybe invalid.