diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index b6c35d67d..c8f5cf106 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -913,6 +913,21 @@ SrsConfDirective* SrsConfig::get_vhost(string vhost) return NULL; } +void SrsConfig::get_vhosts(std::vector& vhosts) +{ + srs_assert(root); + + for (int i = 0; i < (int)root->directives.size(); i++) { + SrsConfDirective* conf = root->at(i); + + if (!conf->is_vhost()) { + continue; + } + + vhosts.push_back(conf); + } +} + SrsConfDirective* SrsConfig::get_vhost_on_connect(string vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -1608,6 +1623,28 @@ void SrsConfig::get_ingesters(std::string vhost, std::vector& return; } +bool SrsConfig::get_ingest_enabled(SrsConfDirective* ingest) +{ + SrsConfDirective* conf = ingest->get("enable"); + + if (!conf || conf->arg0() != "on") { + return false; + } + + return true; +} + +string SrsConfig::get_ingest_ffmpeg(SrsConfDirective* ingest) +{ + SrsConfDirective* conf = ingest->get("ffmpeg"); + + if (!conf) { + return ""; + } + + return conf->arg0(); +} + string SrsConfig::get_srs_log_file() { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 50b822458..e351f4f6a 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -139,6 +139,7 @@ public: // vhost section public: virtual SrsConfDirective* get_vhost(std::string vhost); + virtual void get_vhosts(std::vector& vhosts); virtual bool get_vhost_enabled(std::string vhost); virtual bool get_vhost_enabled(SrsConfDirective* vhost); virtual SrsConfDirective* get_vhost_on_connect(std::string vhost); @@ -184,9 +185,11 @@ public: virtual int get_engine_achannels(SrsConfDirective* engine); virtual void get_engine_aparams(SrsConfDirective* engine, std::vector& aparams); virtual std::string get_engine_output(SrsConfDirective* engine); -// vhost ingest section +// ingest section public: virtual void get_ingesters(std::string vhost, std::vector& ingeters); + virtual bool get_ingest_enabled(SrsConfDirective* ingest); + virtual std::string get_ingest_ffmpeg(SrsConfDirective* ingest); // log section public: virtual bool get_srs_log_tank_file(); diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp index c5cfb8697..af018d8e7 100644 --- a/trunk/src/app/srs_app_encoder.cpp +++ b/trunk/src/app/srs_app_encoder.cpp @@ -309,7 +309,10 @@ int SrsEncoder::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDir } _transcoded_url.push_back(output); - if ((ret = ffmpeg->initialize(input, output, log_file, engine)) != ERROR_SUCCESS) { + if ((ret = ffmpeg->initialize(input, output, log_file)) != ERROR_SUCCESS) { + return ret; + } + if ((ret = ffmpeg->initialize_transcode(engine)) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_app_ffmpeg.cpp b/trunk/src/app/srs_app_ffmpeg.cpp index 5823a486e..3c1079add 100644 --- a/trunk/src/app/srs_app_ffmpeg.cpp +++ b/trunk/src/app/srs_app_ffmpeg.cpp @@ -78,7 +78,18 @@ string SrsFFMPEG::output() return _output; } -int SrsFFMPEG::initialize(string in, string out, string log, SrsConfDirective* engine) +int SrsFFMPEG::initialize(string in, string out, string log) +{ + int ret = ERROR_SUCCESS; + + input = in; + _output = out; + log_file = log; + + return ret; +} + +int SrsFFMPEG::initialize_transcode(SrsConfDirective* engine) { int ret = ERROR_SUCCESS; @@ -102,10 +113,6 @@ int SrsFFMPEG::initialize(string in, string out, string log, SrsConfDirective* e vwidth -= vwidth % 2; vheight -= vheight % 2; - input = in; - _output = out; - log_file = log; - if (vcodec == SRS_ENCODER_NO_VIDEO && acodec == SRS_ENCODER_NO_AUDIO) { ret = ERROR_ENCODER_VCODEC; srs_warn("video and audio disabled. ret=%d", ret); @@ -191,6 +198,22 @@ int SrsFFMPEG::initialize(string in, string out, string log, SrsConfDirective* e return ret; } +int SrsFFMPEG::initialize_copy() +{ + int ret = ERROR_SUCCESS; + + vcodec = SRS_ENCODER_COPY; + acodec = SRS_ENCODER_COPY; + + if (_output.empty()) { + ret = ERROR_ENCODER_OUTPUT; + srs_error("invalid empty output, ret=%d", ret); + return ret; + } + + return ret; +} + int SrsFFMPEG::start() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_ffmpeg.hpp b/trunk/src/app/srs_app_ffmpeg.hpp index 9f5ce9498..6121eb3e0 100644 --- a/trunk/src/app/srs_app_ffmpeg.hpp +++ b/trunk/src/app/srs_app_ffmpeg.hpp @@ -74,7 +74,9 @@ public: public: virtual std::string output(); public: - virtual int initialize(std::string in, std::string out, std::string log, SrsConfDirective* engine); + virtual int initialize(std::string in, std::string out, std::string log); + virtual int initialize_transcode(SrsConfDirective* engine); + virtual int initialize_copy(); virtual int start(); virtual int cycle(); virtual void stop(); diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index 569fd2367..cc4e3d963 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -26,6 +26,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifdef SRS_INGEST #include +#include +#include +#include // when error, ingester sleep for a while and retry. #define SRS_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL) @@ -39,11 +42,71 @@ SrsIngester::SrsIngester() SrsIngester::~SrsIngester() { srs_freep(pthread); + clear_engines(); } int SrsIngester::start() { int ret = ERROR_SUCCESS; + + // parse ingesters + std::vector vhosts; + _srs_config->get_vhosts(vhosts); + + for (int i = 0; i < (int)vhosts.size(); i++) { + SrsConfDirective* vhost = vhosts[i]; + if ((ret = parse_ingesters(vhost)) != ERROR_SUCCESS) { + return ret; + } + } + + return ret; +} + +int SrsIngester::parse_ingesters(SrsConfDirective* vhost) +{ + int ret = ERROR_SUCCESS; + + std::vector ingesters; + _srs_config->get_ingesters(vhost->arg0(), ingesters); + + // create engine + for (int i = 0; i < (int)ingesters.size(); i++) { + SrsConfDirective* ingest = ingesters[i]; + if (!_srs_config->get_ingest_enabled(ingest)) { + continue; + } + + std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest); + if (ffmpeg_bin.empty()) { + srs_trace("ignore the empty ffmpeg ingest: %s", ingest->arg0().c_str()); + continue; + } + + // get all engines. + std::vector engines; + _srs_config->get_transcode_engines(ingest, engines); + if (engines.empty()) { + srs_trace("ignore the empty transcode engine: %s", ingest->arg0().c_str()); + continue; + } + + // create engine + for (int i = 0; i < (int)engines.size(); i++) { + SrsConfDirective* engine = engines[i]; + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); + if ((ret = initialize_ffmpeg(ffmpeg, ingest, engine)) != ERROR_SUCCESS) { + srs_freep(ffmpeg); + if (ret != ERROR_ENCODER_LOOP) { + srs_error("invalid ingest engine: %s %s", ingest->arg0().c_str(), engine->arg0().c_str()); + } + return ret; + } + + ffmpegs.push_back(ffmpeg); + } + } + return ret; } @@ -61,4 +124,26 @@ void SrsIngester::on_thread_stop() { } +void SrsIngester::clear_engines() +{ + std::vector::iterator it; + + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { + SrsFFMPEG* ffmpeg = *it; + srs_freep(ffmpeg); + } + + ffmpegs.clear(); +} + +int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine) +{ + int ret = ERROR_SUCCESS; + + if (!_srs_config->get_engine_enabled(engine)) { + } + + return ret; +} + #endif diff --git a/trunk/src/app/srs_app_ingest.hpp b/trunk/src/app/srs_app_ingest.hpp index 54cadc1a1..82fce08ad 100644 --- a/trunk/src/app/srs_app_ingest.hpp +++ b/trunk/src/app/srs_app_ingest.hpp @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsFFMPEG; +class SrsConfDirective; /** * ingest file/stream/device, @@ -58,6 +59,10 @@ public: public: virtual int cycle(); virtual void on_thread_stop(); +private: + virtual void clear_engines(); + virtual int parse_ingesters(SrsConfDirective* vhost); + virtual int initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* ingest, SrsConfDirective* engine); }; #endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index c5859cc3f..e6fde04ec 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -373,7 +373,7 @@ int SrsServer::listen() return ret; } -int SrsServer::cycle() +int SrsServer::ingest() { int ret = ERROR_SUCCESS; @@ -383,6 +383,13 @@ int SrsServer::cycle() return ret; } #endif + + return ret; +} + +int SrsServer::cycle() +{ + int ret = ERROR_SUCCESS; // the deamon thread, update the time cache while (true) { diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index ff8e5fd83..d46982a83 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -100,6 +100,7 @@ public: virtual int acquire_pid_file(); virtual int initialize_st(); virtual int listen(); + virtual int ingest(); virtual int cycle(); virtual void remove(SrsConnection* conn); virtual void on_signal(int signo); diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 1607c1c8c..2888c3ace 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -73,6 +73,10 @@ int run_master() return ret; } + if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) { + return ret; + } + if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) { return ret; }