diff --git a/README.md b/README.md
index d2c7ec404..633188ca9 100755
--- a/README.md
+++ b/README.md
@@ -22,7 +22,7 @@ step 2: start srs
./objs/simple_rtmp_server -c conf/srs.conf
-step 3(optinal): start srs listen at 19350 for forward
+step 3(optinal): start srs listen at 19350 to forward to
./objs/simple_rtmp_server -c conf/srs.19350.conf
diff --git a/trunk/src/core/srs_core_encoder.cpp b/trunk/src/core/srs_core_encoder.cpp
old mode 100755
new mode 100644
index 3c23f289e..53d042299
--- a/trunk/src/core/srs_core_encoder.cpp
+++ b/trunk/src/core/srs_core_encoder.cpp
@@ -1,579 +1,579 @@
-/*
-The MIT License (MIT)
-
-Copyright (c) 2013 winlin
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
-the Software, and to permit persons to whom the Software is furnished to do so,
-subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
-IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-*/
-
-#include
-
-#include
-#include
-
-#include
-#include
-#include
-
-#define SRS_ENCODER_SLEEP_MS 2000
-
-#define SRS_ENCODER_VCODEC "libx264"
-#define SRS_ENCODER_ACODEC "libaacplus"
-
-SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
-{
- started = false;
- pid = -1;
- ffmpeg = ffmpeg_bin;
-
- vbitrate = 0;
- vfps = 0;
- vwidth = 0;
- vheight = 0;
- vthreads = 0;
- abitrate = 0;
- asample_rate = 0;
- achannels = 0;
-}
-
-SrsFFMPEG::~SrsFFMPEG()
-{
- stop();
-}
-
-int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine)
-{
- int ret = ERROR_SUCCESS;
-
- config->get_engine_vfilter(engine, vfilter);
- vcodec = config->get_engine_vcodec(engine);
- vbitrate = config->get_engine_vbitrate(engine);
- vfps = config->get_engine_vfps(engine);
- vwidth = config->get_engine_vwidth(engine);
- vheight = config->get_engine_vheight(engine);
- vthreads = config->get_engine_vthreads(engine);
- vprofile = config->get_engine_vprofile(engine);
- vpreset = config->get_engine_vpreset(engine);
- config->get_engine_vparams(engine, vparams);
- acodec = config->get_engine_acodec(engine);
- abitrate = config->get_engine_abitrate(engine);
- asample_rate = config->get_engine_asample_rate(engine);
- achannels = config->get_engine_achannels(engine);
- config->get_engine_aparams(engine, aparams);
- output = config->get_engine_output(engine);
-
- // ensure the size is even.
- vwidth -= vwidth % 2;
- vheight -= vheight % 2;
-
- // input stream, from local.
- // ie. rtmp://127.0.0.1:1935/live/livestream
- input = "rtmp://127.0.0.1:";
- input += port;
- input += "/";
- input += app;
- input += "/";
- input += stream;
-
- // output stream, to other/self server
- // ie. rtmp://127.0.0.1:1935/live/livestream_sd
- if (vhost == RTMP_VHOST_DEFAULT) {
- output = srs_replace(output, "[vhost]", "127.0.0.1");
- } else {
- output = srs_replace(output, "[vhost]", vhost);
- }
- output = srs_replace(output, "[port]", port);
- output = srs_replace(output, "[app]", app);
- output = srs_replace(output, "[stream]", stream);
-
- // important: loop check, donot transcode again.
- // we think the following is loop circle:
- // input: rtmp://127.0.0.1:1935/live/livestream_sd
- // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd
- std::string tail = ""; // tail="_sd"
- if (output.length() > input.length()) {
- tail = output.substr(input.length());
- }
- // TODO: better dead loop check.
- // if input also endwiths the tail, loop detected.
- if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) {
- ret = ERROR_ENCODER_LOOP;
- srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",
- input.c_str(), output.c_str(), ret);
- return ret;
- }
-
- if (vcodec != SRS_ENCODER_VCODEC) {
- ret = ERROR_ENCODER_VCODEC;
- srs_error("invalid vcodec, must be %s, actual %s, ret=%d",
- SRS_ENCODER_VCODEC, vcodec.c_str(), ret);
- return ret;
- }
- if (vbitrate <= 0) {
- ret = ERROR_ENCODER_VBITRATE;
- srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);
- return ret;
- }
- if (vfps <= 0) {
- ret = ERROR_ENCODER_VFPS;
- srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);
- return ret;
- }
- if (vwidth <= 0) {
- ret = ERROR_ENCODER_VWIDTH;
- srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);
- return ret;
- }
- if (vheight <= 0) {
- ret = ERROR_ENCODER_VHEIGHT;
- srs_error("invalid vheight: %d, ret=%d", vheight, ret);
- return ret;
- }
- if (vthreads < 0) {
- ret = ERROR_ENCODER_VTHREADS;
- srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);
- return ret;
- }
- if (vprofile.empty()) {
- ret = ERROR_ENCODER_VPROFILE;
- srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);
- return ret;
- }
- if (vpreset.empty()) {
- ret = ERROR_ENCODER_VPRESET;
- srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);
- return ret;
- }
- if (acodec != SRS_ENCODER_ACODEC) {
- ret = ERROR_ENCODER_ACODEC;
- srs_error("invalid acodec, must be %s, actual %s, ret=%d",
- SRS_ENCODER_ACODEC, acodec.c_str(), ret);
- return ret;
- }
- if (abitrate <= 0) {
- ret = ERROR_ENCODER_ABITRATE;
- srs_error("invalid abitrate: %d, ret=%d",
- abitrate, ret);
- return ret;
- }
- if (asample_rate <= 0) {
- ret = ERROR_ENCODER_ASAMPLE_RATE;
- srs_error("invalid sample rate: %d, ret=%d",
- asample_rate, ret);
- return ret;
- }
- if (achannels != 1 && achannels != 2) {
- ret = ERROR_ENCODER_ACHANNELS;
- srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",
- achannels, ret);
- return ret;
- }
- if (output.empty()) {
- ret = ERROR_ENCODER_OUTPUT;
- srs_error("invalid empty output, ret=%d", ret);
- return ret;
- }
-
- return ret;
-}
-
-int SrsFFMPEG::start()
-{
- int ret = ERROR_SUCCESS;
-
- if (started) {
- return ret;
- }
-
- // prepare exec params
- char tmp[256];
- std::vector params;
-
- // argv[0], set to ffmpeg bin.
- // The execv() and execvp() functions ....
- // The first argument, by convention, should point to
- // the filename associated with the file being executed.
- params.push_back(ffmpeg);
-
- // input.
- params.push_back("-f");
- params.push_back("flv");
-
- params.push_back("-i");
- params.push_back(input);
-
- // build the filter
- if (!vfilter.empty()) {
- std::vector::iterator it;
- for (it = vfilter.begin(); it != vfilter.end(); ++it) {
- std::string p = *it;
- if (!p.empty()) {
- params.push_back(p);
- }
- }
- }
-
- // video specified.
- params.push_back("-vcodec");
- params.push_back(vcodec);
-
- params.push_back("-b:v");
- snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000);
- params.push_back(tmp);
-
- params.push_back("-r");
- snprintf(tmp, sizeof(tmp), "%.2f", vfps);
- params.push_back(tmp);
-
- params.push_back("-s");
- snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight);
- params.push_back(tmp);
-
- // TODO: add aspect if needed.
- params.push_back("-aspect");
- snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight);
- params.push_back(tmp);
-
- params.push_back("-threads");
- snprintf(tmp, sizeof(tmp), "%d", vthreads);
- params.push_back(tmp);
-
- params.push_back("-profile:v");
- params.push_back(vprofile);
-
- params.push_back("-preset");
- params.push_back(vpreset);
-
- // vparams
- if (!vparams.empty()) {
- std::vector::iterator it;
- for (it = vparams.begin(); it != vparams.end(); ++it) {
- std::string p = *it;
- if (!p.empty()) {
- params.push_back(p);
- }
- }
- }
-
- // audio specified.
- params.push_back("-acodec");
- params.push_back(acodec);
-
- params.push_back("-b:a");
- snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);
- params.push_back(tmp);
-
- params.push_back("-ar");
- snprintf(tmp, sizeof(tmp), "%d", asample_rate);
- params.push_back(tmp);
-
- params.push_back("-ac");
- snprintf(tmp, sizeof(tmp), "%d", achannels);
- params.push_back(tmp);
-
- // aparams
- if (!aparams.empty()) {
- std::vector::iterator it;
- for (it = aparams.begin(); it != aparams.end(); ++it) {
- std::string p = *it;
- if (!p.empty()) {
- params.push_back(p);
- }
- }
- }
-
- // output
- params.push_back("-f");
- params.push_back("flv");
-
- params.push_back("-y");
- params.push_back(output);
-
- // TODO: fork or vfork?
- if ((pid = fork()) < 0) {
- ret = ERROR_ENCODER_FORK;
- srs_error("vfork process failed. ret=%d", ret);
- return ret;
- }
-
- // child process: ffmpeg encoder engine.
- if (pid == 0) {
- // memory leak in child process, it's ok.
- char** charpv_params = new char*[params.size() + 1];
- for (int i = 0; i < (int)params.size(); i++) {
- std::string p = params[i];
- charpv_params[i] = (char*)p.c_str();
- }
- // EOF: NULL
- charpv_params[params.size()] = NULL;
-
- // TODO: execv or execvp
- ret = execv(ffmpeg.c_str(), charpv_params);
- if (ret < 0) {
- fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",
- errno, strerror(errno));
- }
- exit(ret);
- }
-
- // parent.
- if (pid > 0) {
- started = true;
- srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);
- return ret;
- }
-
- return ret;
-}
-
-void SrsFFMPEG::stop()
-{
- if (!started) {
- return;
- }
-}
-
-SrsEncoder::SrsEncoder()
-{
- tid = NULL;
- loop = false;
-}
-
-SrsEncoder::~SrsEncoder()
-{
- on_unpublish();
-}
-
-int SrsEncoder::parse_scope_engines()
-{
- int ret = ERROR_SUCCESS;
-
- // parse all transcode engines.
- SrsConfDirective* conf = NULL;
-
- // parse vhost scope engines
- std::string scope = "";
- if ((conf = config->get_transcode(vhost, "")) != NULL) {
- if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
- srs_error("parse vhost scope=%s transcode engines failed. "
- "ret=%d", scope.c_str(), ret);
- return ret;
- }
- }
- // parse app scope engines
- scope = app;
- if ((conf = config->get_transcode(vhost, app)) != NULL) {
- if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
- srs_error("parse app scope=%s transcode engines failed. "
- "ret=%d", scope.c_str(), ret);
- return ret;
- }
- }
- // parse stream scope engines
- scope += "/";
- scope += stream;
- if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) {
- if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
- srs_error("parse stream scope=%s transcode engines failed. "
- "ret=%d", scope.c_str(), ret);
- return ret;
- }
- }
-
- return ret;
-}
-
-int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream)
-{
- int ret = ERROR_SUCCESS;
-
- vhost = _vhost;
- port = _port;
- app = _app;
- stream = _stream;
-
- ret = parse_scope_engines();
-
- // ignore the loop encoder
- if (ret == ERROR_ENCODER_LOOP) {
- ret = ERROR_SUCCESS;
- }
-
- // return for error or no engine.
- if (ret != ERROR_SUCCESS || ffmpegs.empty()) {
- return ret;
- }
-
- // start thread to run all encoding engines.
- srs_assert(!tid);
- if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {
- ret = ERROR_ST_CREATE_FORWARD_THREAD;
- srs_error("st_thread_create failed. ret=%d", ret);
- return ret;
- }
-
- return ret;
-}
-
-void SrsEncoder::on_unpublish()
-{
- if (tid) {
- loop = false;
- st_thread_interrupt(tid);
- st_thread_join(tid, NULL);
- tid = NULL;
- }
-
- clear_engines();
-}
-
-void SrsEncoder::clear_engines()
-{
- std::vector::iterator it;
- for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
- SrsFFMPEG* ffmpeg = *it;
- srs_freep(ffmpeg);
- }
- ffmpegs.clear();
-}
-
-SrsFFMPEG* SrsEncoder::at(int index)
-{
- return ffmpegs[index];
-}
-
-int SrsEncoder::parse_transcode(SrsConfDirective* conf)
-{
- int ret = ERROR_SUCCESS;
-
- srs_assert(conf);
-
- // enabled
- if (!config->get_transcode_enabled(conf)) {
- srs_trace("ignore the disabled transcode: %s",
- conf->arg0().c_str());
- return ret;
- }
-
- // ffmpeg
- std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);
- if (ffmpeg_bin.empty()) {
- srs_trace("ignore the empty ffmpeg transcode: %s",
- conf->arg0().c_str());
- return ret;
- }
-
- // get all engines.
- std::vector engines;
- config->get_transcode_engines(conf, engines);
- if (engines.empty()) {
- srs_trace("ignore the empty transcode engine: %s",
- conf->arg0().c_str());
- return ret;
- }
-
- // create engine
- for (int i = 0; i < (int)engines.size(); i++) {
- SrsConfDirective* engine = engines[i];
- if (!config->get_engine_enabled(engine)) {
- srs_trace("ignore the diabled transcode engine: %s %s",
- conf->arg0().c_str(), engine->arg0().c_str());
- continue;
- }
-
- SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
-
- if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) {
- srs_freep(ffmpeg);
-
- // if got a loop, donot transcode the whole stream.
- if (ret == ERROR_ENCODER_LOOP) {
- clear_engines();
- break;
- }
-
- srs_error("invalid transcode engine: %s %s",
- conf->arg0().c_str(), engine->arg0().c_str());
- return ret;
- }
-
- ffmpegs.push_back(ffmpeg);
- }
-
- return ret;
-}
-
-int SrsEncoder::cycle()
-{
- int ret = ERROR_SUCCESS;
-
- // start all ffmpegs.
- std::vector::iterator it;
- for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
- SrsFFMPEG* ffmpeg = *it;
- if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
- srs_error("ffmpeg start failed. ret=%d", ret);
- return ret;
- }
- }
-
- return ret;
-}
-
-void SrsEncoder::encoder_cycle()
-{
- int ret = ERROR_SUCCESS;
-
- log_context->generate_id();
- srs_trace("encoder cycle start");
-
- while (loop) {
- if ((ret = cycle()) != ERROR_SUCCESS) {
- srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);
- } else {
- srs_info("encoder cycle success, retry");
- }
-
- if (!loop) {
- break;
- }
-
- st_usleep(SRS_ENCODER_SLEEP_MS * 1000);
- }
-
- // kill ffmpeg when finished and it alive
- std::vector::iterator it;
- for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
- SrsFFMPEG* ffmpeg = *it;
- ffmpeg->stop();
- }
-
- srs_trace("encoder cycle finished");
-}
-
-void* SrsEncoder::encoder_thread(void* arg)
-{
- SrsEncoder* obj = (SrsEncoder*)arg;
- srs_assert(obj != NULL);
-
- obj->loop = true;
- obj->encoder_cycle();
-
- return NULL;
-}
-
+/*
+The MIT License (MIT)
+
+Copyright (c) 2013 winlin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+
+#define SRS_ENCODER_SLEEP_MS 2000
+
+#define SRS_ENCODER_VCODEC "libx264"
+#define SRS_ENCODER_ACODEC "libaacplus"
+
+SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
+{
+ started = false;
+ pid = -1;
+ ffmpeg = ffmpeg_bin;
+
+ vbitrate = 0;
+ vfps = 0;
+ vwidth = 0;
+ vheight = 0;
+ vthreads = 0;
+ abitrate = 0;
+ asample_rate = 0;
+ achannels = 0;
+}
+
+SrsFFMPEG::~SrsFFMPEG()
+{
+ stop();
+}
+
+int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine)
+{
+ int ret = ERROR_SUCCESS;
+
+ config->get_engine_vfilter(engine, vfilter);
+ vcodec = config->get_engine_vcodec(engine);
+ vbitrate = config->get_engine_vbitrate(engine);
+ vfps = config->get_engine_vfps(engine);
+ vwidth = config->get_engine_vwidth(engine);
+ vheight = config->get_engine_vheight(engine);
+ vthreads = config->get_engine_vthreads(engine);
+ vprofile = config->get_engine_vprofile(engine);
+ vpreset = config->get_engine_vpreset(engine);
+ config->get_engine_vparams(engine, vparams);
+ acodec = config->get_engine_acodec(engine);
+ abitrate = config->get_engine_abitrate(engine);
+ asample_rate = config->get_engine_asample_rate(engine);
+ achannels = config->get_engine_achannels(engine);
+ config->get_engine_aparams(engine, aparams);
+ output = config->get_engine_output(engine);
+
+ // ensure the size is even.
+ vwidth -= vwidth % 2;
+ vheight -= vheight % 2;
+
+ // input stream, from local.
+ // ie. rtmp://127.0.0.1:1935/live/livestream
+ input = "rtmp://127.0.0.1:";
+ input += port;
+ input += "/";
+ input += app;
+ input += "/";
+ input += stream;
+
+ // output stream, to other/self server
+ // ie. rtmp://127.0.0.1:1935/live/livestream_sd
+ if (vhost == RTMP_VHOST_DEFAULT) {
+ output = srs_replace(output, "[vhost]", "127.0.0.1");
+ } else {
+ output = srs_replace(output, "[vhost]", vhost);
+ }
+ output = srs_replace(output, "[port]", port);
+ output = srs_replace(output, "[app]", app);
+ output = srs_replace(output, "[stream]", stream);
+
+ // important: loop check, donot transcode again.
+ // we think the following is loop circle:
+ // input: rtmp://127.0.0.1:1935/live/livestream_sd
+ // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd
+ std::string tail = ""; // tail="_sd"
+ if (output.length() > input.length()) {
+ tail = output.substr(input.length());
+ }
+ // TODO: better dead loop check.
+ // if input also endwiths the tail, loop detected.
+ if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) {
+ ret = ERROR_ENCODER_LOOP;
+ srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d",
+ input.c_str(), output.c_str(), ret);
+ return ret;
+ }
+
+ if (vcodec != SRS_ENCODER_VCODEC) {
+ ret = ERROR_ENCODER_VCODEC;
+ srs_error("invalid vcodec, must be %s, actual %s, ret=%d",
+ SRS_ENCODER_VCODEC, vcodec.c_str(), ret);
+ return ret;
+ }
+ if (vbitrate <= 0) {
+ ret = ERROR_ENCODER_VBITRATE;
+ srs_error("invalid vbitrate: %d, ret=%d", vbitrate, ret);
+ return ret;
+ }
+ if (vfps <= 0) {
+ ret = ERROR_ENCODER_VFPS;
+ srs_error("invalid vfps: %.2f, ret=%d", vfps, ret);
+ return ret;
+ }
+ if (vwidth <= 0) {
+ ret = ERROR_ENCODER_VWIDTH;
+ srs_error("invalid vwidth: %d, ret=%d", vwidth, ret);
+ return ret;
+ }
+ if (vheight <= 0) {
+ ret = ERROR_ENCODER_VHEIGHT;
+ srs_error("invalid vheight: %d, ret=%d", vheight, ret);
+ return ret;
+ }
+ if (vthreads < 0) {
+ ret = ERROR_ENCODER_VTHREADS;
+ srs_error("invalid vthreads: %d, ret=%d", vthreads, ret);
+ return ret;
+ }
+ if (vprofile.empty()) {
+ ret = ERROR_ENCODER_VPROFILE;
+ srs_error("invalid vprofile: %s, ret=%d", vprofile.c_str(), ret);
+ return ret;
+ }
+ if (vpreset.empty()) {
+ ret = ERROR_ENCODER_VPRESET;
+ srs_error("invalid vpreset: %s, ret=%d", vpreset.c_str(), ret);
+ return ret;
+ }
+ if (acodec != SRS_ENCODER_ACODEC) {
+ ret = ERROR_ENCODER_ACODEC;
+ srs_error("invalid acodec, must be %s, actual %s, ret=%d",
+ SRS_ENCODER_ACODEC, acodec.c_str(), ret);
+ return ret;
+ }
+ if (abitrate <= 0) {
+ ret = ERROR_ENCODER_ABITRATE;
+ srs_error("invalid abitrate: %d, ret=%d",
+ abitrate, ret);
+ return ret;
+ }
+ if (asample_rate <= 0) {
+ ret = ERROR_ENCODER_ASAMPLE_RATE;
+ srs_error("invalid sample rate: %d, ret=%d",
+ asample_rate, ret);
+ return ret;
+ }
+ if (achannels != 1 && achannels != 2) {
+ ret = ERROR_ENCODER_ACHANNELS;
+ srs_error("invalid achannels, must be 1 or 2, actual %d, ret=%d",
+ achannels, ret);
+ return ret;
+ }
+ if (output.empty()) {
+ ret = ERROR_ENCODER_OUTPUT;
+ srs_error("invalid empty output, ret=%d", ret);
+ return ret;
+ }
+
+ return ret;
+}
+
+int SrsFFMPEG::start()
+{
+ int ret = ERROR_SUCCESS;
+
+ if (started) {
+ return ret;
+ }
+
+ // prepare exec params
+ char tmp[256];
+ std::vector params;
+
+ // argv[0], set to ffmpeg bin.
+ // The execv() and execvp() functions ....
+ // The first argument, by convention, should point to
+ // the filename associated with the file being executed.
+ params.push_back(ffmpeg);
+
+ // input.
+ params.push_back("-f");
+ params.push_back("flv");
+
+ params.push_back("-i");
+ params.push_back(input);
+
+ // build the filter
+ if (!vfilter.empty()) {
+ std::vector::iterator it;
+ for (it = vfilter.begin(); it != vfilter.end(); ++it) {
+ std::string p = *it;
+ if (!p.empty()) {
+ params.push_back(p);
+ }
+ }
+ }
+
+ // video specified.
+ params.push_back("-vcodec");
+ params.push_back(vcodec);
+
+ params.push_back("-b:v");
+ snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000);
+ params.push_back(tmp);
+
+ params.push_back("-r");
+ snprintf(tmp, sizeof(tmp), "%.2f", vfps);
+ params.push_back(tmp);
+
+ params.push_back("-s");
+ snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight);
+ params.push_back(tmp);
+
+ // TODO: add aspect if needed.
+ params.push_back("-aspect");
+ snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight);
+ params.push_back(tmp);
+
+ params.push_back("-threads");
+ snprintf(tmp, sizeof(tmp), "%d", vthreads);
+ params.push_back(tmp);
+
+ params.push_back("-profile:v");
+ params.push_back(vprofile);
+
+ params.push_back("-preset");
+ params.push_back(vpreset);
+
+ // vparams
+ if (!vparams.empty()) {
+ std::vector::iterator it;
+ for (it = vparams.begin(); it != vparams.end(); ++it) {
+ std::string p = *it;
+ if (!p.empty()) {
+ params.push_back(p);
+ }
+ }
+ }
+
+ // audio specified.
+ params.push_back("-acodec");
+ params.push_back(acodec);
+
+ params.push_back("-b:a");
+ snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);
+ params.push_back(tmp);
+
+ params.push_back("-ar");
+ snprintf(tmp, sizeof(tmp), "%d", asample_rate);
+ params.push_back(tmp);
+
+ params.push_back("-ac");
+ snprintf(tmp, sizeof(tmp), "%d", achannels);
+ params.push_back(tmp);
+
+ // aparams
+ if (!aparams.empty()) {
+ std::vector::iterator it;
+ for (it = aparams.begin(); it != aparams.end(); ++it) {
+ std::string p = *it;
+ if (!p.empty()) {
+ params.push_back(p);
+ }
+ }
+ }
+
+ // output
+ params.push_back("-f");
+ params.push_back("flv");
+
+ params.push_back("-y");
+ params.push_back(output);
+
+ // TODO: fork or vfork?
+ if ((pid = fork()) < 0) {
+ ret = ERROR_ENCODER_FORK;
+ srs_error("vfork process failed. ret=%d", ret);
+ return ret;
+ }
+
+ // child process: ffmpeg encoder engine.
+ if (pid == 0) {
+ // memory leak in child process, it's ok.
+ char** charpv_params = new char*[params.size() + 1];
+ for (int i = 0; i < (int)params.size(); i++) {
+ std::string p = params[i];
+ charpv_params[i] = (char*)p.c_str();
+ }
+ // EOF: NULL
+ charpv_params[params.size()] = NULL;
+
+ // TODO: execv or execvp
+ ret = execv(ffmpeg.c_str(), charpv_params);
+ if (ret < 0) {
+ fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",
+ errno, strerror(errno));
+ }
+ exit(ret);
+ }
+
+ // parent.
+ if (pid > 0) {
+ started = true;
+ srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);
+ return ret;
+ }
+
+ return ret;
+}
+
+void SrsFFMPEG::stop()
+{
+ if (!started) {
+ return;
+ }
+}
+
+SrsEncoder::SrsEncoder()
+{
+ tid = NULL;
+ loop = false;
+}
+
+SrsEncoder::~SrsEncoder()
+{
+ on_unpublish();
+}
+
+int SrsEncoder::parse_scope_engines()
+{
+ int ret = ERROR_SUCCESS;
+
+ // parse all transcode engines.
+ SrsConfDirective* conf = NULL;
+
+ // parse vhost scope engines
+ std::string scope = "";
+ if ((conf = config->get_transcode(vhost, "")) != NULL) {
+ if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
+ srs_error("parse vhost scope=%s transcode engines failed. "
+ "ret=%d", scope.c_str(), ret);
+ return ret;
+ }
+ }
+ // parse app scope engines
+ scope = app;
+ if ((conf = config->get_transcode(vhost, app)) != NULL) {
+ if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
+ srs_error("parse app scope=%s transcode engines failed. "
+ "ret=%d", scope.c_str(), ret);
+ return ret;
+ }
+ }
+ // parse stream scope engines
+ scope += "/";
+ scope += stream;
+ if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) {
+ if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) {
+ srs_error("parse stream scope=%s transcode engines failed. "
+ "ret=%d", scope.c_str(), ret);
+ return ret;
+ }
+ }
+
+ return ret;
+}
+
+int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream)
+{
+ int ret = ERROR_SUCCESS;
+
+ vhost = _vhost;
+ port = _port;
+ app = _app;
+ stream = _stream;
+
+ ret = parse_scope_engines();
+
+ // ignore the loop encoder
+ if (ret == ERROR_ENCODER_LOOP) {
+ ret = ERROR_SUCCESS;
+ }
+
+ // return for error or no engine.
+ if (ret != ERROR_SUCCESS || ffmpegs.empty()) {
+ return ret;
+ }
+
+ // start thread to run all encoding engines.
+ srs_assert(!tid);
+ if((tid = st_thread_create(encoder_thread, this, 1, 0)) == NULL) {
+ ret = ERROR_ST_CREATE_FORWARD_THREAD;
+ srs_error("st_thread_create failed. ret=%d", ret);
+ return ret;
+ }
+
+ return ret;
+}
+
+void SrsEncoder::on_unpublish()
+{
+ if (tid) {
+ loop = false;
+ st_thread_interrupt(tid);
+ st_thread_join(tid, NULL);
+ tid = NULL;
+ }
+
+ clear_engines();
+}
+
+void SrsEncoder::clear_engines()
+{
+ std::vector::iterator it;
+ for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
+ SrsFFMPEG* ffmpeg = *it;
+ srs_freep(ffmpeg);
+ }
+ ffmpegs.clear();
+}
+
+SrsFFMPEG* SrsEncoder::at(int index)
+{
+ return ffmpegs[index];
+}
+
+int SrsEncoder::parse_transcode(SrsConfDirective* conf)
+{
+ int ret = ERROR_SUCCESS;
+
+ srs_assert(conf);
+
+ // enabled
+ if (!config->get_transcode_enabled(conf)) {
+ srs_trace("ignore the disabled transcode: %s",
+ conf->arg0().c_str());
+ return ret;
+ }
+
+ // ffmpeg
+ std::string ffmpeg_bin = config->get_transcode_ffmpeg(conf);
+ if (ffmpeg_bin.empty()) {
+ srs_trace("ignore the empty ffmpeg transcode: %s",
+ conf->arg0().c_str());
+ return ret;
+ }
+
+ // get all engines.
+ std::vector engines;
+ config->get_transcode_engines(conf, engines);
+ if (engines.empty()) {
+ srs_trace("ignore the empty transcode engine: %s",
+ conf->arg0().c_str());
+ return ret;
+ }
+
+ // create engine
+ for (int i = 0; i < (int)engines.size(); i++) {
+ SrsConfDirective* engine = engines[i];
+ if (!config->get_engine_enabled(engine)) {
+ srs_trace("ignore the diabled transcode engine: %s %s",
+ conf->arg0().c_str(), engine->arg0().c_str());
+ continue;
+ }
+
+ SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
+
+ if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) {
+ srs_freep(ffmpeg);
+
+ // if got a loop, donot transcode the whole stream.
+ if (ret == ERROR_ENCODER_LOOP) {
+ clear_engines();
+ break;
+ }
+
+ srs_error("invalid transcode engine: %s %s",
+ conf->arg0().c_str(), engine->arg0().c_str());
+ return ret;
+ }
+
+ ffmpegs.push_back(ffmpeg);
+ }
+
+ return ret;
+}
+
+int SrsEncoder::cycle()
+{
+ int ret = ERROR_SUCCESS;
+
+ // start all ffmpegs.
+ std::vector::iterator it;
+ for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
+ SrsFFMPEG* ffmpeg = *it;
+ if ((ret = ffmpeg->start()) != ERROR_SUCCESS) {
+ srs_error("ffmpeg start failed. ret=%d", ret);
+ return ret;
+ }
+ }
+
+ return ret;
+}
+
+void SrsEncoder::encoder_cycle()
+{
+ int ret = ERROR_SUCCESS;
+
+ log_context->generate_id();
+ srs_trace("encoder cycle start");
+
+ while (loop) {
+ if ((ret = cycle()) != ERROR_SUCCESS) {
+ srs_warn("encoder cycle failed, ignored and retry, ret=%d", ret);
+ } else {
+ srs_info("encoder cycle success, retry");
+ }
+
+ if (!loop) {
+ break;
+ }
+
+ st_usleep(SRS_ENCODER_SLEEP_MS * 1000);
+ }
+
+ // kill ffmpeg when finished and it alive
+ std::vector::iterator it;
+ for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
+ SrsFFMPEG* ffmpeg = *it;
+ ffmpeg->stop();
+ }
+
+ srs_trace("encoder cycle finished");
+}
+
+void* SrsEncoder::encoder_thread(void* arg)
+{
+ SrsEncoder* obj = (SrsEncoder*)arg;
+ srs_assert(obj != NULL);
+
+ obj->loop = true;
+ obj->encoder_cycle();
+
+ return NULL;
+}
+
diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp
old mode 100755
new mode 100644
index b706c43b8..f766f13bc
--- a/trunk/src/core/srs_core_forward.cpp
+++ b/trunk/src/core/srs_core_forward.cpp
@@ -1,353 +1,353 @@
-/*
-The MIT License (MIT)
-
-Copyright (c) 2013 winlin
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
-the Software, and to permit persons to whom the Software is furnished to do so,
-subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
-IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-*/
-
-#include
-
-#include
-#include
-#include
-#include
-#include
-
-#include
-#include
-#include
-#include
-#include
-
-#define SRS_PULSE_TIMEOUT_MS 100
-#define SRS_FORWARDER_SLEEP_MS 2000
-#define SRS_SEND_TIMEOUT_US 3000000L
-#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US
-
-SrsForwarder::SrsForwarder()
-{
- client = NULL;
- stfd = NULL;
- stream_id = 0;
-
- tid = NULL;
- loop = false;
-}
-
-SrsForwarder::~SrsForwarder()
-{
- on_unpublish();
-
- std::vector::iterator it;
- for (it = msgs.begin(); it != msgs.end(); ++it) {
- SrsSharedPtrMessage* msg = *it;
- srs_freep(msg);
- }
- msgs.clear();
-}
-
-int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server)
-{
- int ret = ERROR_SUCCESS;
-
- app = _app;
-
- tc_url = "rtmp://";
- tc_url += vhost;
- tc_url += "/";
- tc_url += app;
-
- stream_name = stream;
- server = forward_server;
- port = 1935;
-
- // TODO: dead loop check.
-
- size_t pos = forward_server.find(":");
- if (pos != std::string::npos) {
- port = ::atoi(forward_server.substr(pos + 1).c_str());
- server = forward_server.substr(0, pos);
- }
-
- if ((ret = open_socket()) != ERROR_SUCCESS) {
- return ret;
- }
-
- srs_assert(!tid);
- if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){
- ret = ERROR_ST_CREATE_FORWARD_THREAD;
- srs_error("st_thread_create failed. ret=%d", ret);
- return ret;
- }
-
- return ret;
-}
-
-void SrsForwarder::on_unpublish()
-{
- if (tid) {
- loop = false;
- st_thread_interrupt(tid);
- st_thread_join(tid, NULL);
- tid = NULL;
- }
-
- if (stfd) {
- int fd = st_netfd_fileno(stfd);
- st_netfd_close(stfd);
- stfd = NULL;
-
- // st does not close it sometimes,
- // close it manually.
- close(fd);
- }
-
- srs_freep(client);
-}
-
-int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
-{
- int ret = ERROR_SUCCESS;
-
- msgs.push_back(metadata);
-
- return ret;
-}
-
-int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
-{
- int ret = ERROR_SUCCESS;
-
- msgs.push_back(msg);
-
- return ret;
-}
-
-int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
-{
- int ret = ERROR_SUCCESS;
-
- msgs.push_back(msg);
-
- return ret;
-}
-
-int SrsForwarder::open_socket()
-{
- int ret = ERROR_SUCCESS;
-
- srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
- stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
-
- int sock = socket(AF_INET, SOCK_STREAM, 0);
- if(sock == -1){
- ret = ERROR_SOCKET_CREATE;
- srs_error("create socket error. ret=%d", ret);
- return ret;
- }
-
- stfd = st_netfd_open_socket(sock);
- if(stfd == NULL){
- ret = ERROR_ST_OPEN_SOCKET;
- srs_error("st_netfd_open_socket failed. ret=%d", ret);
- return ret;
- }
-
- srs_freep(client);
- client = new SrsRtmpClient(stfd);
-
- return ret;
-}
-
-int SrsForwarder::connect_server()
-{
- int ret = ERROR_SUCCESS;
-
- std::string ip = parse_server(server);
- if (ip.empty()) {
- ret = ERROR_SYSTEM_IP_INVALID;
- srs_error("dns resolve server error, ip empty. ret=%d", ret);
- return ret;
- }
-
- sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = inet_addr(ip.c_str());
-
- if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
- ret = ERROR_ST_CONNECT;
- srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
-
- return ret;
-}
-
-std::string SrsForwarder::parse_server(std::string host)
-{
- if (inet_addr(host.c_str()) != INADDR_NONE) {
- return host;
- }
-
- hostent* answer = gethostbyname(host.c_str());
- if (answer == NULL) {
- srs_error("dns resolve host %s error.", host.c_str());
- return "";
- }
-
- char ipv4[16];
- memset(ipv4, 0, sizeof(ipv4));
- for (int i = 0; i < answer->h_length; i++) {
- inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));
- srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);
- break;
- }
-
- return ipv4;
-}
-
-int SrsForwarder::cycle()
-{
- int ret = ERROR_SUCCESS;
-
- client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
- client->set_send_timeout(SRS_SEND_TIMEOUT_US);
-
- if ((ret = connect_server()) != ERROR_SUCCESS) {
- return ret;
- }
- srs_assert(client);
-
- if ((ret = client->handshake()) != ERROR_SUCCESS) {
- srs_error("handshake with server failed. ret=%d", ret);
- return ret;
- }
- if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {
- srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);
- return ret;
- }
- if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
- srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
- return ret;
- }
- if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
- srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
- stream_name.c_str(), stream_id, ret);
- return ret;
- }
-
- if ((ret = forward()) != ERROR_SUCCESS) {
- return ret;
- }
-
- return ret;
-}
-
-int SrsForwarder::forward()
-{
- int ret = ERROR_SUCCESS;
-
- client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);
-
- SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
-
- while (loop) {
- pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);
-
- // switch to other st-threads.
- st_usleep(0);
-
- // read from client.
- if (true) {
- SrsCommonMessage* msg = NULL;
- ret = client->recv_message(&msg);
-
- srs_verbose("play loop recv message. ret=%d", ret);
- if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
- srs_error("recv server control message failed. ret=%d", ret);
- return ret;
- }
- }
-
- int count = (int)msgs.size();
-
- // reportable
- if (pithy_print.can_print()) {
- srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
- pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
- }
-
- // all msgs to forward.
- for (int i = 0; i < count; i++) {
- SrsSharedPtrMessage* msg = msgs[i];
- msgs[i] = NULL;
-
- if ((ret = client->send_message(msg)) != ERROR_SUCCESS) {
- srs_error("forwarder send message to server failed. ret=%d", ret);
- return ret;
- }
- }
- msgs.clear();
- }
-
- return ret;
-}
-
-void SrsForwarder::forward_cycle()
-{
- int ret = ERROR_SUCCESS;
-
- log_context->generate_id();
- srs_trace("forward cycle start");
-
- while (loop) {
- if ((ret = cycle()) != ERROR_SUCCESS) {
- srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);
- } else {
- srs_info("forward cycle success, retry");
- }
-
- if (!loop) {
- break;
- }
-
- st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);
-
- if ((ret = open_socket()) != ERROR_SUCCESS) {
- srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);
- } else {
- srs_info("forward cycle reopen success");
- }
- }
- srs_trace("forward cycle finished");
-}
-
-void* SrsForwarder::forward_thread(void* arg)
-{
- SrsForwarder* obj = (SrsForwarder*)arg;
- srs_assert(obj != NULL);
-
- obj->loop = true;
- obj->forward_cycle();
-
- return NULL;
-}
-
+/*
+The MIT License (MIT)
+
+Copyright (c) 2013 winlin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+
+#define SRS_PULSE_TIMEOUT_MS 100
+#define SRS_FORWARDER_SLEEP_MS 2000
+#define SRS_SEND_TIMEOUT_US 3000000L
+#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US
+
+SrsForwarder::SrsForwarder()
+{
+ client = NULL;
+ stfd = NULL;
+ stream_id = 0;
+
+ tid = NULL;
+ loop = false;
+}
+
+SrsForwarder::~SrsForwarder()
+{
+ on_unpublish();
+
+ std::vector::iterator it;
+ for (it = msgs.begin(); it != msgs.end(); ++it) {
+ SrsSharedPtrMessage* msg = *it;
+ srs_freep(msg);
+ }
+ msgs.clear();
+}
+
+int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server)
+{
+ int ret = ERROR_SUCCESS;
+
+ app = _app;
+
+ tc_url = "rtmp://";
+ tc_url += vhost;
+ tc_url += "/";
+ tc_url += app;
+
+ stream_name = stream;
+ server = forward_server;
+ port = 1935;
+
+ // TODO: dead loop check.
+
+ size_t pos = forward_server.find(":");
+ if (pos != std::string::npos) {
+ port = ::atoi(forward_server.substr(pos + 1).c_str());
+ server = forward_server.substr(0, pos);
+ }
+
+ if ((ret = open_socket()) != ERROR_SUCCESS) {
+ return ret;
+ }
+
+ srs_assert(!tid);
+ if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){
+ ret = ERROR_ST_CREATE_FORWARD_THREAD;
+ srs_error("st_thread_create failed. ret=%d", ret);
+ return ret;
+ }
+
+ return ret;
+}
+
+void SrsForwarder::on_unpublish()
+{
+ if (tid) {
+ loop = false;
+ st_thread_interrupt(tid);
+ st_thread_join(tid, NULL);
+ tid = NULL;
+ }
+
+ if (stfd) {
+ int fd = st_netfd_fileno(stfd);
+ st_netfd_close(stfd);
+ stfd = NULL;
+
+ // st does not close it sometimes,
+ // close it manually.
+ close(fd);
+ }
+
+ srs_freep(client);
+}
+
+int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
+{
+ int ret = ERROR_SUCCESS;
+
+ msgs.push_back(metadata);
+
+ return ret;
+}
+
+int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
+{
+ int ret = ERROR_SUCCESS;
+
+ msgs.push_back(msg);
+
+ return ret;
+}
+
+int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
+{
+ int ret = ERROR_SUCCESS;
+
+ msgs.push_back(msg);
+
+ return ret;
+}
+
+int SrsForwarder::open_socket()
+{
+ int ret = ERROR_SUCCESS;
+
+ srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
+ stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
+
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ if(sock == -1){
+ ret = ERROR_SOCKET_CREATE;
+ srs_error("create socket error. ret=%d", ret);
+ return ret;
+ }
+
+ stfd = st_netfd_open_socket(sock);
+ if(stfd == NULL){
+ ret = ERROR_ST_OPEN_SOCKET;
+ srs_error("st_netfd_open_socket failed. ret=%d", ret);
+ return ret;
+ }
+
+ srs_freep(client);
+ client = new SrsRtmpClient(stfd);
+
+ return ret;
+}
+
+int SrsForwarder::connect_server()
+{
+ int ret = ERROR_SUCCESS;
+
+ std::string ip = parse_server(server);
+ if (ip.empty()) {
+ ret = ERROR_SYSTEM_IP_INVALID;
+ srs_error("dns resolve server error, ip empty. ret=%d", ret);
+ return ret;
+ }
+
+ sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = inet_addr(ip.c_str());
+
+ if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
+ ret = ERROR_ST_CONNECT;
+ srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
+ return ret;
+ }
+ srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
+
+ return ret;
+}
+
+std::string SrsForwarder::parse_server(std::string host)
+{
+ if (inet_addr(host.c_str()) != INADDR_NONE) {
+ return host;
+ }
+
+ hostent* answer = gethostbyname(host.c_str());
+ if (answer == NULL) {
+ srs_error("dns resolve host %s error.", host.c_str());
+ return "";
+ }
+
+ char ipv4[16];
+ memset(ipv4, 0, sizeof(ipv4));
+ for (int i = 0; i < answer->h_length; i++) {
+ inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));
+ srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);
+ break;
+ }
+
+ return ipv4;
+}
+
+int SrsForwarder::cycle()
+{
+ int ret = ERROR_SUCCESS;
+
+ client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
+ client->set_send_timeout(SRS_SEND_TIMEOUT_US);
+
+ if ((ret = connect_server()) != ERROR_SUCCESS) {
+ return ret;
+ }
+ srs_assert(client);
+
+ if ((ret = client->handshake()) != ERROR_SUCCESS) {
+ srs_error("handshake with server failed. ret=%d", ret);
+ return ret;
+ }
+ if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {
+ srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);
+ return ret;
+ }
+ if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
+ srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
+ return ret;
+ }
+ if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
+ srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
+ stream_name.c_str(), stream_id, ret);
+ return ret;
+ }
+
+ if ((ret = forward()) != ERROR_SUCCESS) {
+ return ret;
+ }
+
+ return ret;
+}
+
+int SrsForwarder::forward()
+{
+ int ret = ERROR_SUCCESS;
+
+ client->set_recv_timeout(SRS_PULSE_TIMEOUT_MS * 1000);
+
+ SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
+
+ while (loop) {
+ pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);
+
+ // switch to other st-threads.
+ st_usleep(0);
+
+ // read from client.
+ if (true) {
+ SrsCommonMessage* msg = NULL;
+ ret = client->recv_message(&msg);
+
+ srs_verbose("play loop recv message. ret=%d", ret);
+ if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
+ srs_error("recv server control message failed. ret=%d", ret);
+ return ret;
+ }
+ }
+
+ int count = (int)msgs.size();
+
+ // reportable
+ if (pithy_print.can_print()) {
+ srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
+ pithy_print.get_age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
+ }
+
+ // all msgs to forward.
+ for (int i = 0; i < count; i++) {
+ SrsSharedPtrMessage* msg = msgs[i];
+ msgs[i] = NULL;
+
+ if ((ret = client->send_message(msg)) != ERROR_SUCCESS) {
+ srs_error("forwarder send message to server failed. ret=%d", ret);
+ return ret;
+ }
+ }
+ msgs.clear();
+ }
+
+ return ret;
+}
+
+void SrsForwarder::forward_cycle()
+{
+ int ret = ERROR_SUCCESS;
+
+ log_context->generate_id();
+ srs_trace("forward cycle start");
+
+ while (loop) {
+ if ((ret = cycle()) != ERROR_SUCCESS) {
+ srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);
+ } else {
+ srs_info("forward cycle success, retry");
+ }
+
+ if (!loop) {
+ break;
+ }
+
+ st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);
+
+ if ((ret = open_socket()) != ERROR_SUCCESS) {
+ srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);
+ } else {
+ srs_info("forward cycle reopen success");
+ }
+ }
+ srs_trace("forward cycle finished");
+}
+
+void* SrsForwarder::forward_thread(void* arg)
+{
+ SrsForwarder* obj = (SrsForwarder*)arg;
+ srs_assert(obj != NULL);
+
+ obj->loop = true;
+ obj->forward_cycle();
+
+ return NULL;
+}
+