From 18028ad4cbb2bf3259d9f48cd6ec8ca0d28ba423 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 1 Dec 2013 13:04:09 +0800 Subject: [PATCH] update readme --- README.md | 3 +- trunk/src/core/srs_core_encoder.cpp | 1157 ++++++++++++++------------- trunk/src/core/srs_core_forward.cpp | 704 ++++++++-------- 3 files changed, 934 insertions(+), 930 deletions(-) mode change 100644 => 100755 trunk/src/core/srs_core_encoder.cpp mode change 100644 => 100755 trunk/src/core/srs_core_forward.cpp diff --git a/README.md b/README.md index 8eed1b997..d2c7ec404 100755 --- a/README.md +++ b/README.md @@ -36,7 +36,8 @@ FMS URL: rtmp://127.0.0.1:1935/live Stream: livestream For example, use ffmpeg to publish: for((;;)); do \ - ./objs/ffmpeg/bin/ffmpeg -re -i ./doc/source.200kbps.768x320.flv -vcodec copy -acodec copy \ + ./objs/ffmpeg/bin/ffmpeg -re -i ./doc/source.200kbps.768x320.flv \ + -vcodec copy -acodec copy \ -f flv -y rtmp://127.0.0.1:1935/live/livestream; \ sleep 1; \ done diff --git a/trunk/src/core/srs_core_encoder.cpp b/trunk/src/core/srs_core_encoder.cpp old mode 100644 new mode 100755 index 10f4acf31..3c23f289e --- a/trunk/src/core/srs_core_encoder.cpp +++ b/trunk/src/core/srs_core_encoder.cpp @@ -1,578 +1,579 @@ -/* -The MIT License (MIT) - -Copyright (c) 2013 winlin - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software is furnished to do so, -subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -*/ - -#include - -#include -#include - -#include -#include -#include - -#define SRS_ENCODER_SLEEP_MS 2000 - -#define SRS_ENCODER_VCODEC "libx264" -#define SRS_ENCODER_ACODEC "libaacplus" - -SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin) -{ - started = false; - pid = -1; - ffmpeg = ffmpeg_bin; - - vbitrate = 0; - vfps = 0; - vwidth = 0; - vheight = 0; - vthreads = 0; - abitrate = 0; - asample_rate = 0; - achannels = 0; -} - -SrsFFMPEG::~SrsFFMPEG() -{ - stop(); -} - -int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine) -{ - int ret = ERROR_SUCCESS; - - config->get_engine_vfilter(engine, vfilter); - vcodec = config->get_engine_vcodec(engine); - vbitrate = config->get_engine_vbitrate(engine); - vfps = config->get_engine_vfps(engine); - vwidth = config->get_engine_vwidth(engine); - vheight = config->get_engine_vheight(engine); - vthreads = config->get_engine_vthreads(engine); - vprofile = config->get_engine_vprofile(engine); - vpreset = config->get_engine_vpreset(engine); - config->get_engine_vparams(engine, vparams); - acodec = config->get_engine_acodec(engine); - abitrate = config->get_engine_abitrate(engine); - asample_rate = config->get_engine_asample_rate(engine); - achannels = config->get_engine_achannels(engine); - config->get_engine_aparams(engine, aparams); - output = config->get_engine_output(engine); - - // ensure the size is even. - vwidth -= vwidth % 2; - vheight -= vheight % 2; - - // input stream, from local. - // ie. rtmp://127.0.0.1:1935/live/livestream - input = "rtmp://127.0.0.1:"; - input += port; - input += "/"; - input += app; - input += "/"; - input += stream; - - // output stream, to other/self server - // ie. rtmp://127.0.0.1:1935/live/livestream_sd - if (vhost == RTMP_VHOST_DEFAULT) { - output = srs_replace(output, "[vhost]", "127.0.0.1"); - } else { - output = srs_replace(output, "[vhost]", vhost); - } - output = srs_replace(output, "[port]", port); - output = srs_replace(output, "[app]", app); - output = srs_replace(output, "[stream]", stream); - - // important: loop check, donot transcode again. - // we think the following is loop circle: - // input: rtmp://127.0.0.1:1935/live/livestream_sd - // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd - std::string tail = ""; // tail="_sd" - if (output.length() > input.length()) { - tail = output.substr(input.length()); - } - // if input also endwiths the tail, loop detected. - if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) { - ret = ERROR_ENCODER_LOOP; - srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d", - input.c_str(), output.c_str(), ret); - return ret; - } - - if (vcodec != SRS_ENCODER_VCODEC) { - ret = ERROR_ENCODER_VCODEC; - srs_error("invalid vcodec, must be %s, actual %s, ret=%d", - SRS_ENCODER_VCODEC, vcodec.c_str(), ret); - return ret; - } - if (vbitrate <= 0) { - ret = ERROR_ENCODER_VBITRATE; - srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret); - return ret; - } - if (vfps <= 0) { - ret = ERROR_ENCODER_VFPS; - srs_error("invalid vfps: %.2f, ret=%d", vfps, ret); - return ret; - } - if (vwidth <= 0) { - ret = ERROR_ENCODER_VWIDTH; - srs_error("invalid vwidth: %d, ret=%d", vwidth, ret); - return ret; - } - if (vheight <= 0) { - ret = ERROR_ENCODER_VHEIGHT; - srs_error("invalid vheight: %d, ret=%d", vheight, ret); - return ret; - } - if (vthreads < 0) { - ret = ERROR_ENCODER_VTHREADS; - srs_error("invalid vthreads: %d, ret=%d", vthreads, ret); - return ret; - } - if (vprofile.empty()) { - ret = ERROR_ENCODER_VPROFILE; - srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret); - return ret; - } - if (vpreset.empty()) { - ret = ERROR_ENCODER_VPRESET; - srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret); - return ret; - } - if (acodec != SRS_ENCODER_ACODEC) { - ret = ERROR_ENCODER_ACODEC; - srs_error("invalid acodec, must be %s, actual %s, ret=%d", - SRS_ENCODER_ACODEC, acodec.c_str(), ret); - return ret; - } - if (abitrate <= 0) { - ret = ERROR_ENCODER_ABITRATE; - srs_error("invalid abitrate: %d, ret=%d", - abitrate, ret); - return ret; - } - if (asample_rate <= 0) { - ret = ERROR_ENCODER_ASAMPLE_RATE; - srs_error("invalid sample rate: %d, ret=%d", - asample_rate, ret); - return ret; - } - if (achannels != 1 && achannels != 2) { - ret = ERROR_ENCODER_ACHANNELS; - srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d", - achannels, ret); - return ret; - } - if (output.empty()) { - ret = ERROR_ENCODER_OUTPUT; - srs_error("invalid empty output, ret=%d", ret); - return ret; - } - - return ret; -} - -int SrsFFMPEG::start() -{ - int ret = ERROR_SUCCESS; - - if (started) { - return ret; - } - - // prepare exec params - char tmp[256]; - std::vector params; - - // argv[0], set to ffmpeg bin. - // The execv() and execvp() functions .... - // The first argument, by convention, should point to - // the filename associated with the file being executed. - params.push_back(ffmpeg); - - // input. - params.push_back("-f"); - params.push_back("flv"); - - params.push_back("-i"); - params.push_back(input); - - // build the filter - if (!vfilter.empty()) { - std::vector::iterator it; - for (it = vfilter.begin(); it != vfilter.end(); ++it) { - std::string p = *it; - if (!p.empty()) { - params.push_back(p); - } - } - } - - // video specified. - params.push_back("-vcodec"); - params.push_back(vcodec); - - params.push_back("-b:v"); - snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000); - params.push_back(tmp); - - params.push_back("-r"); - snprintf(tmp, sizeof(tmp), "%.2f", vfps); - params.push_back(tmp); - - params.push_back("-s"); - snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight); - params.push_back(tmp); - - // TODO: add aspect if needed. - params.push_back("-aspect"); - snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight); - params.push_back(tmp); - - params.push_back("-threads"); - snprintf(tmp, sizeof(tmp), "%d", vthreads); - params.push_back(tmp); - - params.push_back("-profile:v"); - params.push_back(vprofile); - - params.push_back("-preset"); - params.push_back(vpreset); - - // vparams - if (!vparams.empty()) { - std::vector::iterator it; - for (it = vparams.begin(); it != vparams.end(); ++it) { - std::string p = *it; - if (!p.empty()) { - params.push_back(p); - } - } - } - - // audio specified. - params.push_back("-acodec"); - params.push_back(acodec); - - params.push_back("-b:a"); - snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000); - params.push_back(tmp); - - params.push_back("-ar"); - snprintf(tmp, sizeof(tmp), "%d", asample_rate); - params.push_back(tmp); - - params.push_back("-ac"); - snprintf(tmp, sizeof(tmp), "%d", achannels); - params.push_back(tmp); - - // aparams - if (!aparams.empty()) { - std::vector::iterator it; - for (it = aparams.begin(); it != aparams.end(); ++it) { - std::string p = *it; - if (!p.empty()) { - params.push_back(p); - } - } - } - - // output - params.push_back("-f"); - params.push_back("flv"); - - params.push_back("-y"); - params.push_back(output); - - // TODO: fork or vfork? - if ((pid = fork()) < 0) { - ret = ERROR_ENCODER_FORK; - srs_error("vfork process failed. ret=%d", ret); - return ret; - } - - // child process: ffmpeg encoder engine. - if (pid == 0) { - // memory leak in child process, it's ok. - char** charpv_params = new char*[params.size() + 1]; - for (int i = 0; i < (int)params.size(); i++) { - std::string p = params[i]; - charpv_params[i] = (char*)p.c_str(); - } - // EOF: NULL - charpv_params[params.size()] = NULL; - - // TODO: execv or execvp - ret = execv(ffmpeg.c_str(), charpv_params); - if (ret < 0) { - fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)", - errno, strerror(errno)); - } - exit(ret); - } - - // parent. - if (pid > 0) { - started = true; - srs_trace("vfored ffmpeg encoder engine, pid=%d", pid); - return ret; - } - - return ret; -} - -void SrsFFMPEG::stop() -{ - if (!started) { - return; - } -} - -SrsEncoder::SrsEncoder() -{ - tid = NULL; - loop = false; -} - -SrsEncoder::~SrsEncoder() -{ - on_unpublish(); -} - -int SrsEncoder::parse_scope_engines() -{ - int ret = ERROR_SUCCESS; - - // parse all transcode engines. - SrsConfDirective* conf = NULL; - - // parse vhost scope engines - std::string scope = ""; - if ((conf = config->get_transcode(vhost, "")) != NULL) { - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) { - srs_error("parse vhost scope=%s transcode engines failed. " - "ret=%d", scope.c_str(), ret); - return ret; - } - } - // parse app scope engines - scope = app; - if ((conf = config->get_transcode(vhost, app)) != NULL) { - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) { - srs_error("parse app scope=%s transcode engines failed. " - "ret=%d", scope.c_str(), ret); - return ret; - } - } - // parse stream scope engines - scope += "/"; - scope += stream; - if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) { - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) { - srs_error("parse stream scope=%s transcode engines failed. " - "ret=%d", scope.c_str(), ret); - return ret; - } - } - - return ret; -} - -int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream) -{ - int ret = ERROR_SUCCESS; - - vhost = _vhost; - port = _port; - app = _app; - stream = _stream; - - ret = parse_scope_engines(); - - // ignore the loop encoder - if (ret == ERROR_ENCODER_LOOP) { - ret = ERROR_SUCCESS; - } - - // return for error or no engine. - if (ret != ERROR_SUCCESS || ffmpegs.empty()) { - return ret; - } - - // start thread to run all encoding engines. - srs_assert(!tid); - if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) { - ret = ERROR_ST_CREATE_FORWARD_THREAD; - srs_error("st_thread_create failed. ret=%d", ret); - return ret; - } - - return ret; -} - -void SrsEncoder::on_unpublish() -{ - if (tid) { - loop = false; - st_thread_interrupt(tid); - st_thread_join(tid, NULL); - tid = NULL; - } - - clear_engines(); -} - -void SrsEncoder::clear_engines() -{ - std::vector::iterator it; - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { - SrsFFMPEG* ffmpeg = *it; - srs_freep(ffmpeg); - } - ffmpegs.clear(); -} - -SrsFFMPEG* SrsEncoder::at(int index) -{ - return ffmpegs[index]; -} - -int SrsEncoder::parse_transcode(SrsConfDirective* conf) -{ - int ret = ERROR_SUCCESS; - - srs_assert(conf); - - // enabled - if (!config->get_transcode_enabled(conf)) { - srs_trace("ignore the disabled transcode: %s", - conf->arg0().c_str()); - return ret; - } - - // ffmpeg - std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf); - if (ffmpeg_bin.empty()) { - srs_trace("ignore the empty ffmpeg transcode: %s", - conf->arg0().c_str()); - return ret; - } - - // get all engines. - std::vector engines; - config->get_transcode_engines(conf, engines); - if (engines.empty()) { - srs_trace("ignore the empty transcode engine: %s", - conf->arg0().c_str()); - return ret; - } - - // create engine - for (int i = 0; i < (int)engines.size(); i++) { - SrsConfDirective* engine = engines[i]; - if (!config->get_engine_enabled(engine)) { - srs_trace("ignore the diabled transcode engine: %s %s", - conf->arg0().c_str(), engine->arg0().c_str()); - continue; - } - - SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); - - if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) { - srs_freep(ffmpeg); - - // if got a loop, donot transcode the whole stream. - if (ret == ERROR_ENCODER_LOOP) { - clear_engines(); - break; - } - - srs_error("invalid transcode engine: %s %s", - conf->arg0().c_str(), engine->arg0().c_str()); - return ret; - } - - ffmpegs.push_back(ffmpeg); - } - - return ret; -} - -int SrsEncoder::cycle() -{ - int ret = ERROR_SUCCESS; - - // start all ffmpegs. - std::vector::iterator it; - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { - SrsFFMPEG* ffmpeg = *it; - if ((ret = ffmpeg->start()) != ERROR_SUCCESS) { - srs_error("ffmpeg start failed. ret=%d", ret); - return ret; - } - } - - return ret; -} - -void SrsEncoder::encoder_cycle() -{ - int ret = ERROR_SUCCESS; - - log_context->generate_id(); - srs_trace("encoder cycle start"); - - while (loop) { - if ((ret = cycle()) != ERROR_SUCCESS) { - srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret); - } else { - srs_info("encoder cycle success, retry"); - } - - if (!loop) { - break; - } - - st_usleep(SRS_ENCODER_SLEEP_MS * 1000); - } - - // kill ffmpeg when finished and it alive - std::vector::iterator it; - for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { - SrsFFMPEG* ffmpeg = *it; - ffmpeg->stop(); - } - - srs_trace("encoder cycle finished"); -} - -void* SrsEncoder::encoder_thread(void* arg) -{ - SrsEncoder* obj = (SrsEncoder*)arg; - srs_assert(obj != NULL); - - obj->loop = true; - obj->encoder_cycle(); - - return NULL; -} - +/* +The MIT License (MIT) + +Copyright (c) 2013 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include +#include + +#include +#include +#include + +#define SRS_ENCODER_SLEEP_MS 2000 + +#define SRS_ENCODER_VCODEC "libx264" +#define SRS_ENCODER_ACODEC "libaacplus" + +SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin) +{ + started = false; + pid = -1; + ffmpeg = ffmpeg_bin; + + vbitrate = 0; + vfps = 0; + vwidth = 0; + vheight = 0; + vthreads = 0; + abitrate = 0; + asample_rate = 0; + achannels = 0; +} + +SrsFFMPEG::~SrsFFMPEG() +{ + stop(); +} + +int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine) +{ + int ret = ERROR_SUCCESS; + + config->get_engine_vfilter(engine, vfilter); + vcodec = config->get_engine_vcodec(engine); + vbitrate = config->get_engine_vbitrate(engine); + vfps = config->get_engine_vfps(engine); + vwidth = config->get_engine_vwidth(engine); + vheight = config->get_engine_vheight(engine); + vthreads = config->get_engine_vthreads(engine); + vprofile = config->get_engine_vprofile(engine); + vpreset = config->get_engine_vpreset(engine); + config->get_engine_vparams(engine, vparams); + acodec = config->get_engine_acodec(engine); + abitrate = config->get_engine_abitrate(engine); + asample_rate = config->get_engine_asample_rate(engine); + achannels = config->get_engine_achannels(engine); + config->get_engine_aparams(engine, aparams); + output = config->get_engine_output(engine); + + // ensure the size is even. + vwidth -= vwidth % 2; + vheight -= vheight % 2; + + // input stream, from local. + // ie. rtmp://127.0.0.1:1935/live/livestream + input = "rtmp://127.0.0.1:"; + input += port; + input += "/"; + input += app; + input += "/"; + input += stream; + + // output stream, to other/self server + // ie. rtmp://127.0.0.1:1935/live/livestream_sd + if (vhost == RTMP_VHOST_DEFAULT) { + output = srs_replace(output, "[vhost]", "127.0.0.1"); + } else { + output = srs_replace(output, "[vhost]", vhost); + } + output = srs_replace(output, "[port]", port); + output = srs_replace(output, "[app]", app); + output = srs_replace(output, "[stream]", stream); + + // important: loop check, donot transcode again. + // we think the following is loop circle: + // input: rtmp://127.0.0.1:1935/live/livestream_sd + // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd + std::string tail = ""; // tail="_sd" + if (output.length() > input.length()) { + tail = output.substr(input.length()); + } + // TODO: better dead loop check. + // if input also endwiths the tail, loop detected. + if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) { + ret = ERROR_ENCODER_LOOP; + srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d", + input.c_str(), output.c_str(), ret); + return ret; + } + + if (vcodec != SRS_ENCODER_VCODEC) { + ret = ERROR_ENCODER_VCODEC; + srs_error("invalid vcodec, must be %s, actual %s, ret=%d", + SRS_ENCODER_VCODEC, vcodec.c_str(), ret); + return ret; + } + if (vbitrate <= 0) { + ret = ERROR_ENCODER_VBITRATE; + srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret); + return ret; + } + if (vfps <= 0) { + ret = ERROR_ENCODER_VFPS; + srs_error("invalid vfps: %.2f, ret=%d", vfps, ret); + return ret; + } + if (vwidth <= 0) { + ret = ERROR_ENCODER_VWIDTH; + srs_error("invalid vwidth: %d, ret=%d", vwidth, ret); + return ret; + } + if (vheight <= 0) { + ret = ERROR_ENCODER_VHEIGHT; + srs_error("invalid vheight: %d, ret=%d", vheight, ret); + return ret; + } + if (vthreads < 0) { + ret = ERROR_ENCODER_VTHREADS; + srs_error("invalid vthreads: %d, ret=%d", vthreads, ret); + return ret; + } + if (vprofile.empty()) { + ret = ERROR_ENCODER_VPROFILE; + srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret); + return ret; + } + if (vpreset.empty()) { + ret = ERROR_ENCODER_VPRESET; + srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret); + return ret; + } + if (acodec != SRS_ENCODER_ACODEC) { + ret = ERROR_ENCODER_ACODEC; + srs_error("invalid acodec, must be %s, actual %s, ret=%d", + SRS_ENCODER_ACODEC, acodec.c_str(), ret); + return ret; + } + if (abitrate <= 0) { + ret = ERROR_ENCODER_ABITRATE; + srs_error("invalid abitrate: %d, ret=%d", + abitrate, ret); + return ret; + } + if (asample_rate <= 0) { + ret = ERROR_ENCODER_ASAMPLE_RATE; + srs_error("invalid sample rate: %d, ret=%d", + asample_rate, ret); + return ret; + } + if (achannels != 1 && achannels != 2) { + ret = ERROR_ENCODER_ACHANNELS; + srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d", + achannels, ret); + return ret; + } + if (output.empty()) { + ret = ERROR_ENCODER_OUTPUT; + srs_error("invalid empty output, ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsFFMPEG::start() +{ + int ret = ERROR_SUCCESS; + + if (started) { + return ret; + } + + // prepare exec params + char tmp[256]; + std::vector params; + + // argv[0], set to ffmpeg bin. + // The execv() and execvp() functions .... + // The first argument, by convention, should point to + // the filename associated with the file being executed. + params.push_back(ffmpeg); + + // input. + params.push_back("-f"); + params.push_back("flv"); + + params.push_back("-i"); + params.push_back(input); + + // build the filter + if (!vfilter.empty()) { + std::vector::iterator it; + for (it = vfilter.begin(); it != vfilter.end(); ++it) { + std::string p = *it; + if (!p.empty()) { + params.push_back(p); + } + } + } + + // video specified. + params.push_back("-vcodec"); + params.push_back(vcodec); + + params.push_back("-b:v"); + snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000); + params.push_back(tmp); + + params.push_back("-r"); + snprintf(tmp, sizeof(tmp), "%.2f", vfps); + params.push_back(tmp); + + params.push_back("-s"); + snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight); + params.push_back(tmp); + + // TODO: add aspect if needed. + params.push_back("-aspect"); + snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight); + params.push_back(tmp); + + params.push_back("-threads"); + snprintf(tmp, sizeof(tmp), "%d", vthreads); + params.push_back(tmp); + + params.push_back("-profile:v"); + params.push_back(vprofile); + + params.push_back("-preset"); + params.push_back(vpreset); + + // vparams + if (!vparams.empty()) { + std::vector::iterator it; + for (it = vparams.begin(); it != vparams.end(); ++it) { + std::string p = *it; + if (!p.empty()) { + params.push_back(p); + } + } + } + + // audio specified. + params.push_back("-acodec"); + params.push_back(acodec); + + params.push_back("-b:a"); + snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000); + params.push_back(tmp); + + params.push_back("-ar"); + snprintf(tmp, sizeof(tmp), "%d", asample_rate); + params.push_back(tmp); + + params.push_back("-ac"); + snprintf(tmp, sizeof(tmp), "%d", achannels); + params.push_back(tmp); + + // aparams + if (!aparams.empty()) { + std::vector::iterator it; + for (it = aparams.begin(); it != aparams.end(); ++it) { + std::string p = *it; + if (!p.empty()) { + params.push_back(p); + } + } + } + + // output + params.push_back("-f"); + params.push_back("flv"); + + params.push_back("-y"); + params.push_back(output); + + // TODO: fork or vfork? + if ((pid = fork()) < 0) { + ret = ERROR_ENCODER_FORK; + srs_error("vfork process failed. ret=%d", ret); + return ret; + } + + // child process: ffmpeg encoder engine. + if (pid == 0) { + // memory leak in child process, it's ok. + char** charpv_params = new char*[params.size() + 1]; + for (int i = 0; i < (int)params.size(); i++) { + std::string p = params[i]; + charpv_params[i] = (char*)p.c_str(); + } + // EOF: NULL + charpv_params[params.size()] = NULL; + + // TODO: execv or execvp + ret = execv(ffmpeg.c_str(), charpv_params); + if (ret < 0) { + fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)", + errno, strerror(errno)); + } + exit(ret); + } + + // parent. + if (pid > 0) { + started = true; + srs_trace("vfored ffmpeg encoder engine, pid=%d", pid); + return ret; + } + + return ret; +} + +void SrsFFMPEG::stop() +{ + if (!started) { + return; + } +} + +SrsEncoder::SrsEncoder() +{ + tid = NULL; + loop = false; +} + +SrsEncoder::~SrsEncoder() +{ + on_unpublish(); +} + +int SrsEncoder::parse_scope_engines() +{ + int ret = ERROR_SUCCESS; + + // parse all transcode engines. + SrsConfDirective* conf = NULL; + + // parse vhost scope engines + std::string scope = ""; + if ((conf = config->get_transcode(vhost, "")) != NULL) { + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) { + srs_error("parse vhost scope=%s transcode engines failed. " + "ret=%d", scope.c_str(), ret); + return ret; + } + } + // parse app scope engines + scope = app; + if ((conf = config->get_transcode(vhost, app)) != NULL) { + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) { + srs_error("parse app scope=%s transcode engines failed. " + "ret=%d", scope.c_str(), ret); + return ret; + } + } + // parse stream scope engines + scope += "/"; + scope += stream; + if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) { + if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) { + srs_error("parse stream scope=%s transcode engines failed. " + "ret=%d", scope.c_str(), ret); + return ret; + } + } + + return ret; +} + +int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream) +{ + int ret = ERROR_SUCCESS; + + vhost = _vhost; + port = _port; + app = _app; + stream = _stream; + + ret = parse_scope_engines(); + + // ignore the loop encoder + if (ret == ERROR_ENCODER_LOOP) { + ret = ERROR_SUCCESS; + } + + // return for error or no engine. + if (ret != ERROR_SUCCESS || ffmpegs.empty()) { + return ret; + } + + // start thread to run all encoding engines. + srs_assert(!tid); + if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) { + ret = ERROR_ST_CREATE_FORWARD_THREAD; + srs_error("st_thread_create failed. ret=%d", ret); + return ret; + } + + return ret; +} + +void SrsEncoder::on_unpublish() +{ + if (tid) { + loop = false; + st_thread_interrupt(tid); + st_thread_join(tid, NULL); + tid = NULL; + } + + clear_engines(); +} + +void SrsEncoder::clear_engines() +{ + std::vector::iterator it; + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { + SrsFFMPEG* ffmpeg = *it; + srs_freep(ffmpeg); + } + ffmpegs.clear(); +} + +SrsFFMPEG* SrsEncoder::at(int index) +{ + return ffmpegs[index]; +} + +int SrsEncoder::parse_transcode(SrsConfDirective* conf) +{ + int ret = ERROR_SUCCESS; + + srs_assert(conf); + + // enabled + if (!config->get_transcode_enabled(conf)) { + srs_trace("ignore the disabled transcode: %s", + conf->arg0().c_str()); + return ret; + } + + // ffmpeg + std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf); + if (ffmpeg_bin.empty()) { + srs_trace("ignore the empty ffmpeg transcode: %s", + conf->arg0().c_str()); + return ret; + } + + // get all engines. + std::vector engines; + config->get_transcode_engines(conf, engines); + if (engines.empty()) { + srs_trace("ignore the empty transcode engine: %s", + conf->arg0().c_str()); + return ret; + } + + // create engine + for (int i = 0; i < (int)engines.size(); i++) { + SrsConfDirective* engine = engines[i]; + if (!config->get_engine_enabled(engine)) { + srs_trace("ignore the diabled transcode engine: %s %s", + conf->arg0().c_str(), engine->arg0().c_str()); + continue; + } + + SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); + + if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) { + srs_freep(ffmpeg); + + // if got a loop, donot transcode the whole stream. + if (ret == ERROR_ENCODER_LOOP) { + clear_engines(); + break; + } + + srs_error("invalid transcode engine: %s %s", + conf->arg0().c_str(), engine->arg0().c_str()); + return ret; + } + + ffmpegs.push_back(ffmpeg); + } + + return ret; +} + +int SrsEncoder::cycle() +{ + int ret = ERROR_SUCCESS; + + // start all ffmpegs. + std::vector::iterator it; + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { + SrsFFMPEG* ffmpeg = *it; + if ((ret = ffmpeg->start()) != ERROR_SUCCESS) { + srs_error("ffmpeg start failed. ret=%d", ret); + return ret; + } + } + + return ret; +} + +void SrsEncoder::encoder_cycle() +{ + int ret = ERROR_SUCCESS; + + log_context->generate_id(); + srs_trace("encoder cycle start"); + + while (loop) { + if ((ret = cycle()) != ERROR_SUCCESS) { + srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret); + } else { + srs_info("encoder cycle success, retry"); + } + + if (!loop) { + break; + } + + st_usleep(SRS_ENCODER_SLEEP_MS * 1000); + } + + // kill ffmpeg when finished and it alive + std::vector::iterator it; + for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) { + SrsFFMPEG* ffmpeg = *it; + ffmpeg->stop(); + } + + srs_trace("encoder cycle finished"); +} + +void* SrsEncoder::encoder_thread(void* arg) +{ + SrsEncoder* obj = (SrsEncoder*)arg; + srs_assert(obj != NULL); + + obj->loop = true; + obj->encoder_cycle(); + + return NULL; +} + diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp old mode 100644 new mode 100755 index 967ba807b..b706c43b8 --- a/trunk/src/core/srs_core_forward.cpp +++ b/trunk/src/core/srs_core_forward.cpp @@ -1,351 +1,353 @@ -/* -The MIT License (MIT) - -Copyright (c) 2013 winlin - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software is furnished to do so, -subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -*/ - -#include - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#define SRS_PULSE_TIMEOUT_MS 100 -#define SRS_FORWARDER_SLEEP_MS 2000 -#define SRS_SEND_TIMEOUT_US 3000000L -#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US - -SrsForwarder::SrsForwarder() -{ - client = NULL; - stfd = NULL; - stream_id = 0; - - tid = NULL; - loop = false; -} - -SrsForwarder::~SrsForwarder() -{ - on_unpublish(); - - std::vector::iterator it; - for (it = msgs.begin(); it != msgs.end(); ++it) { - SrsSharedPtrMessage* msg = *it; - srs_freep(msg); - } - msgs.clear(); -} - -int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server) -{ - int ret = ERROR_SUCCESS; - - app = _app; - - tc_url = "rtmp://"; - tc_url += vhost; - tc_url += "/"; - tc_url += app; - - stream_name = stream; - server = forward_server; - port = 1935; - - size_t pos = forward_server.find(":"); - if (pos != std::string::npos) { - port = ::atoi(forward_server.substr(pos + 1).c_str()); - server = forward_server.substr(0, pos); - } - - if ((ret = open_socket()) != ERROR_SUCCESS) { - return ret; - } - - srs_assert(!tid); - if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){ - ret = ERROR_ST_CREATE_FORWARD_THREAD; - srs_error("st_thread_create failed. ret=%d", ret); - return ret; - } - - return ret; -} - -void SrsForwarder::on_unpublish() -{ - if (tid) { - loop = false; - st_thread_interrupt(tid); - st_thread_join(tid, NULL); - tid = NULL; - } - - if (stfd) { - int fd = st_netfd_fileno(stfd); - st_netfd_close(stfd); - stfd = NULL; - - // st does not close it sometimes, - // close it manually. - close(fd); - } - - srs_freep(client); -} - -int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) -{ - int ret = ERROR_SUCCESS; - - msgs.push_back(metadata); - - return ret; -} - -int SrsForwarder::on_audio(SrsSharedPtrMessage* msg) -{ - int ret = ERROR_SUCCESS; - - msgs.push_back(msg); - - return ret; -} - -int SrsForwarder::on_video(SrsSharedPtrMessage* msg) -{ - int ret = ERROR_SUCCESS; - - msgs.push_back(msg); - - return ret; -} - -int SrsForwarder::open_socket() -{ - int ret = ERROR_SUCCESS; - - srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d", - stream_name.c_str(), tc_url.c_str(), server.c_str(), port); - - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1){ - ret = ERROR_SOCKET_CREATE; - srs_error("create socket error. ret=%d", ret); - return ret; - } - - stfd = st_netfd_open_socket(sock); - if(stfd == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket failed. ret=%d", ret); - return ret; - } - - srs_freep(client); - client = new SrsRtmpClient(stfd); - - return ret; -} - -int SrsForwarder::connect_server() -{ - int ret = ERROR_SUCCESS; - - std::string ip = parse_server(server); - if (ip.empty()) { - ret = ERROR_SYSTEM_IP_INVALID; - srs_error("dns resolve server error, ip empty. ret=%d", ret); - return ret; - } - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(ip.c_str()); - - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ - ret = ERROR_ST_CONNECT; - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); - return ret; - } - srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); - - return ret; -} - -std::string SrsForwarder::parse_server(std::string host) -{ - if (inet_addr(host.c_str()) != INADDR_NONE) { - return host; - } - - hostent* answer = gethostbyname(host.c_str()); - if (answer == NULL) { - srs_error("dns resolve host %s error.", host.c_str()); - return ""; - } - - char ipv4[16]; - memset(ipv4, 0, sizeof(ipv4)); - for (int i = 0; i < answer->h_length; i++) { - inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4)); - srs_info("dns resolve host %s to %s.", host.c_str(), ipv4); - break; - } - - return ipv4; -} - -int SrsForwarder::cycle() -{ - int ret = ERROR_SUCCESS; - - client->set_recv_timeout(SRS_RECV_TIMEOUT_US); - client->set_send_timeout(SRS_SEND_TIMEOUT_US); - - if ((ret = connect_server()) != ERROR_SUCCESS) { - return ret; - } - srs_assert(client); - - if ((ret = client->handshake()) != ERROR_SUCCESS) { - srs_error("handshake with server failed. ret=%d", ret); - return ret; - } - if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) { - srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret); - return ret; - } - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); - return ret; - } - if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) { - srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", - stream_name.c_str(), stream_id, ret); - return ret; - } - - if ((ret = forward()) != ERROR_SUCCESS) { - return ret; - } - - return ret; -} - -int SrsForwarder::forward() -{ - int ret = ERROR_SUCCESS; - - client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000); - - SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER); - - while (loop) { - pithy_print.elapse(SRS_PULSE_TIMEOUT_MS); - - // switch to other st-threads. - st_usleep(0); - - // read from client. - if (true) { - SrsCommonMessage* msg = NULL; - ret = client->recv_message(&msg); - - srs_verbose("play loop recv message. ret=%d", ret); - if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { - srs_error("recv server control message failed. ret=%d", ret); - return ret; - } - } - - int count = (int)msgs.size(); - - // reportable - if (pithy_print.can_print()) { - srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); - } - - // all msgs to forward. - for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs[i]; - msgs[i] = NULL; - - if ((ret = client->send_message(msg)) != ERROR_SUCCESS) { - srs_error("forwarder send message to server failed. ret=%d", ret); - return ret; - } - } - msgs.clear(); - } - - return ret; -} - -void SrsForwarder::forward_cycle() -{ - int ret = ERROR_SUCCESS; - - log_context->generate_id(); - srs_trace("forward cycle start"); - - while (loop) { - if ((ret = cycle()) != ERROR_SUCCESS) { - srs_warn("forward cycle failed, ignored and retry, ret=%d", ret); - } else { - srs_info("forward cycle success, retry"); - } - - if (!loop) { - break; - } - - st_usleep(SRS_FORWARDER_SLEEP_MS * 1000); - - if ((ret = open_socket()) != ERROR_SUCCESS) { - srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret); - } else { - srs_info("forward cycle reopen success"); - } - } - srs_trace("forward cycle finished"); -} - -void* SrsForwarder::forward_thread(void* arg) -{ - SrsForwarder* obj = (SrsForwarder*)arg; - srs_assert(obj != NULL); - - obj->loop = true; - obj->forward_cycle(); - - return NULL; -} - +/* +The MIT License (MIT) + +Copyright (c) 2013 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#define SRS_PULSE_TIMEOUT_MS 100 +#define SRS_FORWARDER_SLEEP_MS 2000 +#define SRS_SEND_TIMEOUT_US 3000000L +#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US + +SrsForwarder::SrsForwarder() +{ + client = NULL; + stfd = NULL; + stream_id = 0; + + tid = NULL; + loop = false; +} + +SrsForwarder::~SrsForwarder() +{ + on_unpublish(); + + std::vector::iterator it; + for (it = msgs.begin(); it != msgs.end(); ++it) { + SrsSharedPtrMessage* msg = *it; + srs_freep(msg); + } + msgs.clear(); +} + +int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server) +{ + int ret = ERROR_SUCCESS; + + app = _app; + + tc_url = "rtmp://"; + tc_url += vhost; + tc_url += "/"; + tc_url += app; + + stream_name = stream; + server = forward_server; + port = 1935; + + // TODO: dead loop check. + + size_t pos = forward_server.find(":"); + if (pos != std::string::npos) { + port = ::atoi(forward_server.substr(pos + 1).c_str()); + server = forward_server.substr(0, pos); + } + + if ((ret = open_socket()) != ERROR_SUCCESS) { + return ret; + } + + srs_assert(!tid); + if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){ + ret = ERROR_ST_CREATE_FORWARD_THREAD; + srs_error("st_thread_create failed. ret=%d", ret); + return ret; + } + + return ret; +} + +void SrsForwarder::on_unpublish() +{ + if (tid) { + loop = false; + st_thread_interrupt(tid); + st_thread_join(tid, NULL); + tid = NULL; + } + + if (stfd) { + int fd = st_netfd_fileno(stfd); + st_netfd_close(stfd); + stfd = NULL; + + // st does not close it sometimes, + // close it manually. + close(fd); + } + + srs_freep(client); +} + +int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) +{ + int ret = ERROR_SUCCESS; + + msgs.push_back(metadata); + + return ret; +} + +int SrsForwarder::on_audio(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + + msgs.push_back(msg); + + return ret; +} + +int SrsForwarder::on_video(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + + msgs.push_back(msg); + + return ret; +} + +int SrsForwarder::open_socket() +{ + int ret = ERROR_SUCCESS; + + srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d", + stream_name.c_str(), tc_url.c_str(), server.c_str(), port); + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + srs_freep(client); + client = new SrsRtmpClient(stfd); + + return ret; +} + +int SrsForwarder::connect_server() +{ + int ret = ERROR_SUCCESS; + + std::string ip = parse_server(server); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + return ret; + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); + return ret; + } + srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); + + return ret; +} + +std::string SrsForwarder::parse_server(std::string host) +{ + if (inet_addr(host.c_str()) != INADDR_NONE) { + return host; + } + + hostent* answer = gethostbyname(host.c_str()); + if (answer == NULL) { + srs_error("dns resolve host %s error.", host.c_str()); + return ""; + } + + char ipv4[16]; + memset(ipv4, 0, sizeof(ipv4)); + for (int i = 0; i < answer->h_length; i++) { + inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4)); + srs_info("dns resolve host %s to %s.", host.c_str(), ipv4); + break; + } + + return ipv4; +} + +int SrsForwarder::cycle() +{ + int ret = ERROR_SUCCESS; + + client->set_recv_timeout(SRS_RECV_TIMEOUT_US); + client->set_send_timeout(SRS_SEND_TIMEOUT_US); + + if ((ret = connect_server()) != ERROR_SUCCESS) { + return ret; + } + srs_assert(client); + + if ((ret = client->handshake()) != ERROR_SUCCESS) { + srs_error("handshake with server failed. ret=%d", ret); + return ret; + } + if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) { + srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret); + return ret; + } + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { + srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + return ret; + } + if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) { + srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", + stream_name.c_str(), stream_id, ret); + return ret; + } + + if ((ret = forward()) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsForwarder::forward() +{ + int ret = ERROR_SUCCESS; + + client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000); + + SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER); + + while (loop) { + pithy_print.elapse(SRS_PULSE_TIMEOUT_MS); + + // switch to other st-threads. + st_usleep(0); + + // read from client. + if (true) { + SrsCommonMessage* msg = NULL; + ret = client->recv_message(&msg); + + srs_verbose("play loop recv message. ret=%d", ret); + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { + srs_error("recv server control message failed. ret=%d", ret); + return ret; + } + } + + int count = (int)msgs.size(); + + // reportable + if (pithy_print.can_print()) { + srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); + } + + // all msgs to forward. + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs[i]; + msgs[i] = NULL; + + if ((ret = client->send_message(msg)) != ERROR_SUCCESS) { + srs_error("forwarder send message to server failed. ret=%d", ret); + return ret; + } + } + msgs.clear(); + } + + return ret; +} + +void SrsForwarder::forward_cycle() +{ + int ret = ERROR_SUCCESS; + + log_context->generate_id(); + srs_trace("forward cycle start"); + + while (loop) { + if ((ret = cycle()) != ERROR_SUCCESS) { + srs_warn("forward cycle failed, ignored and retry, ret=%d", ret); + } else { + srs_info("forward cycle success, retry"); + } + + if (!loop) { + break; + } + + st_usleep(SRS_FORWARDER_SLEEP_MS * 1000); + + if ((ret = open_socket()) != ERROR_SUCCESS) { + srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret); + } else { + srs_info("forward cycle reopen success"); + } + } + srs_trace("forward cycle finished"); +} + +void* SrsForwarder::forward_thread(void* arg) +{ + SrsForwarder* obj = (SrsForwarder*)arg; + srs_assert(obj != NULL); + + obj->loop = true; + obj->forward_cycle(); + + return NULL; +} +