From bed540bac9bcc08deb4aff76ba6d1d6aa14ff630 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 9 Jul 2015 13:52:41 +0800 Subject: [PATCH] fix #439: http remux support reload --- trunk/src/app/srs_app_config.cpp | 16 ++- trunk/src/app/srs_app_http_stream.cpp | 166 ++++++++++++++++++++------ trunk/src/app/srs_app_http_stream.hpp | 9 +- trunk/src/app/srs_app_reload.cpp | 2 +- trunk/src/app/srs_app_reload.hpp | 2 +- 5 files changed, 153 insertions(+), 42 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index ebebf9e4d..ff5da2d2e 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -852,6 +852,18 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) return ret; } } + + // http_remux, only one per vhost. + if (get_vhost_http_remux_enabled(vhost)) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_vhost_http_remux_updated(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes http_remux failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload http_remux success.", vhost.c_str()); + } srs_trace("reload new vhost %s success.", vhost.c_str()); continue; } @@ -1060,7 +1072,7 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) if (!srs_directive_equals(new_vhost->get("http_remux"), old_vhost->get("http_remux"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { ISrsReloadHandler* subscribe = *it; - if ((ret = subscribe->on_reload_vhost_http_remux_updated()) != ERROR_SUCCESS) { + if ((ret = subscribe->on_reload_vhost_http_remux_updated(vhost)) != ERROR_SUCCESS) { srs_error("vhost %s notify subscribes http_remux failed. ret=%d", vhost.c_str(), ret); return ret; } @@ -1077,7 +1089,7 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) } continue; } - srs_trace("igreno reload vhost, enabled old: %d, new: %d", + srs_trace("ignore reload vhost, enabled old: %d, new: %d", get_vhost_enabled(old_vhost), get_vhost_enabled(new_vhost)); } diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index d2a2af7d2..43be4bc4d 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -508,8 +508,9 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) #ifdef SRS_PERF_FAST_FLV_ENCODER SrsFastFlvStreamEncoder* ffe = dynamic_cast(enc); #endif - - while (true) { + + // TODO: free and erase the disabled entry after all related connections is closed. + while (entry->enabled) { pprint->elapse(); // get messages from consumer. @@ -593,6 +594,9 @@ SrsLiveEntry::SrsLiveEntry(std::string m, bool h) stream = NULL; cache = NULL; + + req = NULL; + source = NULL; std::string ext; size_t pos = string::npos; @@ -605,6 +609,11 @@ SrsLiveEntry::SrsLiveEntry(std::string m, bool h) _is_aac = (ext == ".aac"); } +void SrsLiveEntry::reset_hstrs(bool h) +{ + hstrs = h; +} + bool SrsLiveEntry::is_flv() { return _is_flv; @@ -699,16 +708,19 @@ SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr) server = svr; mux.hijack(this); + _srs_config->subscribe(this); } SrsHttpStreamServer::~SrsHttpStreamServer() { mux.unhijack(this); + _srs_config->unsubscribe(this); if (true) { std::map::iterator it; for (it = tflvs.begin(); it != tflvs.end(); ++it) { SrsLiveEntry* entry = it->second; + srs_freep(entry->req); srs_freep(entry); } tflvs.clear(); @@ -771,23 +783,31 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r) srs_info("ignore mount flv stream for disabled"); return ret; } - + SrsLiveEntry* tmpl = tflvs[r->vhost]; std::string mount = tmpl->mount; - + // replace the vhost variable mount = srs_string_replace(mount, "[vhost]", r->vhost); mount = srs_string_replace(mount, "[app]", r->app); mount = srs_string_replace(mount, "[stream]", r->stream); - + // remove the default vhost mount mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); + // TODO: FIXME: check match + if (mount.at(0) != '/') { + mount = "/" + mount; + } entry = new SrsLiveEntry(mount, tmpl->hstrs); entry->cache = new SrsStreamCache(s, r); entry->stream = new SrsLiveStream(s, r, entry->cache); + + srs_assert(!tmpl->req); + tmpl->source = s; + tmpl->req = r->copy(); sflvs[sid] = entry; @@ -809,8 +829,7 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r) } else { entry = sflvs[sid]; } - - // TODO: FIXME: supports reload. + if (entry->stream) { entry->stream->entry->enabled = true; return ret; @@ -822,7 +841,7 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r) void SrsHttpStreamServer::http_unmount(SrsSource* s, SrsRequest* r) { std::string sid = r->get_stream_url(); - + if (sflvs.find(sid) == sflvs.end()) { srs_info("ignore unmount flv stream for disabled"); return; @@ -832,17 +851,80 @@ void SrsHttpStreamServer::http_unmount(SrsSource* s, SrsRequest* r) entry->stream->entry->enabled = false; } +int SrsHttpStreamServer::on_reload_vhost_http_remux_updated(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (tflvs.find(vhost) == tflvs.end()) { + if ((ret = initialize_flv_entry(vhost)) != ERROR_SUCCESS) { + return ret; + } + + // http mount need SrsRequest and SrsSource param, only create a mapping template entry + // and do mount automatically on playing http flv if this stream is a new http_remux stream. + return ret; + } + + SrsLiveEntry* tmpl = tflvs[vhost]; + SrsRequest* req = tmpl->req; + SrsSource* source = tmpl->source; + + if (source && req) { + // cleanup the exists http remux. + http_unmount(source, req); + } + + if (!_srs_config->get_vhost_http_remux_enabled(vhost)) { + return ret; + } + + string old_tmpl_mount = tmpl->mount; + string new_tmpl_mount = _srs_config->get_vhost_http_remux_mount(vhost); + bool hstrs = _srs_config->get_vhost_http_remux_hstrs(vhost); + + tmpl->reset_hstrs(hstrs); + + /** + * TODO: not support to reload different mount url for the time being. + * if the mount is change, need more logical thing to deal with. + * such as erase stream from sflvs and free all related resource. + */ + srs_assert(old_tmpl_mount == new_tmpl_mount); + + // do http mount directly with SrsRequest and SrsSource if stream is played already. + if (req) { + std::string sid = req->get_stream_url(); + + if (sflvs.find(sid) != sflvs.end()) { + SrsLiveEntry* stream = sflvs[sid]; + stream->reset_hstrs(hstrs); + } + // remount stream. + if ((ret = http_mount(source, req)) != ERROR_SUCCESS) { + srs_trace("vhost %s http_remux reload failed", vhost.c_str()); + return ret; + } + } else { + // for without SrsRequest and SrsSource if stream is not played yet, do http mount automatically + // when start play this http flv stream. + } + + srs_trace("vhost %s http_remux reload success", vhost.c_str()); + + return ret; +} + int SrsHttpStreamServer::mount_hls(SrsRequest* r) { int ret = ERROR_SUCCESS; std::string sid = r->get_stream_url(); - + if (shls.find(sid) == shls.end()) { srs_info("ignore mount hls stream for disabled"); return ret; } - + SrsHlsEntry* entry = shls[sid]; // TODO: FIXME: supports reload. @@ -958,7 +1040,6 @@ int SrsHttpStreamServer::hls_update_ts(SrsRequest* r, string uri, string ts) return ret; } - int SrsHttpStreamServer::hls_remove_ts(SrsRequest* r, string uri) { int ret = ERROR_SUCCESS; @@ -1010,13 +1091,6 @@ void SrsHttpStreamServer::unmount_hls(SrsRequest* r) } } -int SrsHttpStreamServer::on_reload_vhost_http_remux_updated() -{ - int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. - return ret; -} - int SrsHttpStreamServer::on_reload_vhost_hls(string vhost) { int ret = ERROR_SUCCESS; @@ -1056,6 +1130,8 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) } // hstrs not enabled, ignore. + // for origin: generally set hstrs to 'off' and mount while stream is pushed to origin. + // for edge: must set hstrs to 'on' so that it could trigger rtmp stream before mount. entry = it->second; if (!entry->hstrs) { return ret; @@ -1090,6 +1166,17 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) // hijack for entry. SrsRequest* r = hreq->to_request(vhost->arg0()); SrsAutoFree(SrsRequest, r); + + std::string sid = r->get_stream_url(); + // check if the stream is enabled. + if (sflvs.find(sid) != sflvs.end()) { + SrsLiveEntry* entry = sflvs[sid]; + if (!entry->stream->entry->enabled) { + srs_error("stream is disabled, hijack failed. ret=%d", ret); + return ret; + } + } + SrsSource* s = SrsSource::fetch(r); if (!s) { if ((ret = SrsSource::create(r, server, server, &s)) != ERROR_SUCCESS) { @@ -1097,15 +1184,14 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) } } srs_assert(s != NULL); - + // create http streaming handler. if ((ret = http_mount(s, r)) != ERROR_SUCCESS) { return ret; } - + // use the handler if exists. if (ph) { - std::string sid = r->get_stream_url(); if (sflvs.find(sid) != sflvs.end()) { entry = sflvs[sid]; *ph = entry->stream; @@ -1132,7 +1218,7 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) int SrsHttpStreamServer::initialize_flv_streaming() { int ret = ERROR_SUCCESS; - + // http flv live stream mount for each vhost. SrsConfDirective* root = _srs_config->get_root(); for (int i = 0; i < (int)root->directives.size(); i++) { @@ -1141,21 +1227,29 @@ int SrsHttpStreamServer::initialize_flv_streaming() if (!conf->is_vhost()) { continue; } - - std::string vhost = conf->arg0(); - if (!_srs_config->get_vhost_http_remux_enabled(vhost)) { - continue; - } - - SrsLiveEntry* entry = new SrsLiveEntry( - _srs_config->get_vhost_http_remux_mount(vhost), - _srs_config->get_vhost_http_remux_hstrs(vhost) - ); - tflvs[vhost] = entry; - srs_trace("http flv live stream, vhost=%s, mount=%s", - vhost.c_str(), entry->mount.c_str()); + + initialize_flv_entry(conf->arg0()); } - + return ret; +} + +int SrsHttpStreamServer::initialize_flv_entry(std::string vhost) +{ + int ret = ERROR_SUCCESS; + + if (!_srs_config->get_vhost_http_remux_enabled(vhost)) { + return ret; + } + + SrsLiveEntry* entry = new SrsLiveEntry( + _srs_config->get_vhost_http_remux_mount(vhost), + _srs_config->get_vhost_http_remux_hstrs(vhost) + ); + + tflvs[vhost] = entry; + srs_trace("http flv live stream, vhost=%s, mount=%s", + vhost.c_str(), entry->mount.c_str()); + return ret; } diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index ed107b3b9..3df97f4b3 100644 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -241,6 +241,9 @@ private: bool _is_ts; bool _is_aac; bool _is_mp3; +public: + SrsRequest* req; + SrsSource* source; public: // for template, the mount contains variables. // for concrete stream, the mount is url to access. @@ -252,7 +255,8 @@ public: SrsStreamCache* cache; SrsLiveEntry(std::string m, bool h); - + void reset_hstrs(bool h); + bool is_flv(); bool is_ts(); bool is_mp3(); @@ -348,13 +352,14 @@ public: virtual void unmount_hls(SrsRequest* r); // interface ISrsReloadHandler. public: - virtual int on_reload_vhost_http_remux_updated(); + virtual int on_reload_vhost_http_remux_updated(std::string vhost); virtual int on_reload_vhost_hls(std::string vhost); // interface ISrsHttpMatchHijacker public: virtual int hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph); private: virtual int initialize_flv_streaming(); + virtual int initialize_flv_entry(std::string vhost); virtual int initialize_hls_streaming(); virtual std::string hls_mount_generate(SrsRequest* r, std::string uri, std::string tmpl); }; diff --git a/trunk/src/app/srs_app_reload.cpp b/trunk/src/app/srs_app_reload.cpp index ff856d344..905964462 100644 --- a/trunk/src/app/srs_app_reload.cpp +++ b/trunk/src/app/srs_app_reload.cpp @@ -100,7 +100,7 @@ int ISrsReloadHandler::on_reload_vhost_http_updated() return ERROR_SUCCESS; } -int ISrsReloadHandler::on_reload_vhost_http_remux_updated() +int ISrsReloadHandler::on_reload_vhost_http_remux_updated(string vhost) { return ERROR_SUCCESS; } diff --git a/trunk/src/app/srs_app_reload.hpp b/trunk/src/app/srs_app_reload.hpp index 8b6e9e4d8..4d77e882c 100644 --- a/trunk/src/app/srs_app_reload.hpp +++ b/trunk/src/app/srs_app_reload.hpp @@ -58,7 +58,7 @@ public: virtual int on_reload_http_stream_updated(); public: virtual int on_reload_vhost_http_updated(); - virtual int on_reload_vhost_http_remux_updated(); + virtual int on_reload_vhost_http_remux_updated(std::string vhost); virtual int on_reload_vhost_added(std::string vhost); virtual int on_reload_vhost_removed(std::string vhost); virtual int on_reload_vhost_atc(std::string vhost);