diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 251e27f11..c347295d6 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -44,6 +44,7 @@ using namespace std; #include #include #include +#include // when error, edge ingester sleep for a while and retry. #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) @@ -67,10 +68,9 @@ SrsEdgeIngester::SrsEdgeIngester() client = NULL; _edge = NULL; _req = NULL; - origin_index = 0; stream_id = 0; stfd = NULL; - curr_origin_server = ""; + lb = new SrsLbRoundRobin(); pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); } @@ -78,6 +78,7 @@ SrsEdgeIngester::~SrsEdgeIngester() { stop(); + srs_freep(lb); srs_freep(pthread); srs_freep(kbps); } @@ -121,7 +122,7 @@ void SrsEdgeIngester::stop() string SrsEdgeIngester::get_curr_origin() { - return curr_origin_server; + return lb->selected(); } int SrsEdgeIngester::cycle() @@ -130,7 +131,8 @@ int SrsEdgeIngester::cycle() _source->on_source_id_changed(_srs_context->get_id()); - std::string ep_server, ep_port; + std::string ep_server; + int ep_port; if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) { return ret; } @@ -216,7 +218,7 @@ int SrsEdgeIngester::ingest() } // TODO: FIXME: refine the connect_app. -int SrsEdgeIngester::connect_app(string ep_server, string ep_port) +int SrsEdgeIngester::connect_app(string ep_server, int ep_port) { int ret = ERROR_SUCCESS; @@ -258,7 +260,7 @@ int SrsEdgeIngester::connect_app(string ep_server, string ep_port) // generate the tcUrl std::string param = ""; std::string tc_url = srs_generate_tc_url(ep_server, vhost, req->app, ep_port, param); - srs_trace("edge ingest from %s:%s at %s", ep_server.c_str(), ep_port.c_str(), tc_url.c_str()); + srs_trace("edge ingest from %s:%d at %s", ep_server.c_str(), ep_port, tc_url.c_str()); // replace the tcUrl in request, // which will replace the tc_url in client.connect_app(). @@ -339,7 +341,7 @@ void SrsEdgeIngester::close_underlayer_socket() srs_close_stfd(stfd); } -int SrsEdgeIngester::connect_server(string& ep_server, string& ep_port) +int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port) { int ret = ERROR_SUCCESS; @@ -358,21 +360,13 @@ int SrsEdgeIngester::connect_server(string& ep_server, string& ep_port) } // select the origin. - std::string server = curr_origin_server = conf->args.at(origin_index % conf->args.size()); - origin_index = (origin_index + 1) % conf->args.size(); - - std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT; + std::string server = lb->select(conf->args); int port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT); - size_t pos = server.find(":"); - if (pos != std::string::npos) { - s_port = server.substr(pos + 1); - server = server.substr(0, pos); - port = ::atoi(s_port.c_str()); - } + srs_parse_hostport(server, server, port); // output the connected server and port. ep_server = server; - ep_port = s_port; + ep_port = port; // open socket. int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US; diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 8efb70999..a8b555eb7 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -46,6 +46,7 @@ class SrsCommonMessage; class SrsMessageQueue; class ISrsProtocolReaderWriter; class SrsKbps; +class SrsLbRoundRobin; /** * the state of edge, auto machine @@ -88,9 +89,7 @@ private: ISrsProtocolReaderWriter* io; SrsKbps* kbps; SrsRtmpClient* client; - int origin_index; - // current origin server of current source. - std::string curr_origin_server; + SrsLbRoundRobin* lb; public: SrsEdgeIngester(); virtual ~SrsEdgeIngester(); @@ -105,8 +104,8 @@ public: private: virtual int ingest(); virtual void close_underlayer_socket(); - virtual int connect_server(std::string& ep_server, std::string& ep_port); - virtual int connect_app(std::string ep_server, std::string ep_port); + virtual int connect_server(std::string& ep_server, int& ep_port); + virtual int connect_app(std::string ep_server, int ep_port); virtual int process_publish_message(SrsCommonMessage* msg); }; diff --git a/trunk/src/app/srs_app_ffmpeg.cpp b/trunk/src/app/srs_app_ffmpeg.cpp index ca1f8850a..e7843057d 100644 --- a/trunk/src/app/srs_app_ffmpeg.cpp +++ b/trunk/src/app/srs_app_ffmpeg.cpp @@ -43,6 +43,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_AUTO_FFMPEG_STUB @@ -248,12 +249,6 @@ int SrsFFMPEG::start() return ret; } - // prepare exec params - // @remark we should never use stack variable, use heap to alloc to make lldb happy. - #define SRS_TMP_SIZE 512 - char* tmp = new char[SRS_TMP_SIZE]; - SrsAutoFree(char, tmp); - // the argv for process. params.clear(); @@ -300,33 +295,28 @@ int SrsFFMPEG::start() if (vcodec != SRS_RTMP_ENCODER_COPY && vcodec != SRS_RTMP_ENCODER_NO_VIDEO) { if (vbitrate > 0) { params.push_back("-b:v"); - snprintf(tmp, SRS_TMP_SIZE, "%d", vbitrate * 1000); - params.push_back(tmp); + params.push_back(srs_int2str(vbitrate * 1000)); } if (vfps > 0) { params.push_back("-r"); - snprintf(tmp, SRS_TMP_SIZE, "%.2f", vfps); - params.push_back(tmp); + params.push_back(srs_float2str(vfps)); } if (vwidth > 0 && vheight > 0) { params.push_back("-s"); - snprintf(tmp, SRS_TMP_SIZE, "%dx%d", vwidth, vheight); - params.push_back(tmp); + params.push_back(srs_int2str(vwidth) + "x" + srs_int2str(vheight)); } // TODO: add aspect if needed. if (vwidth > 0 && vheight > 0) { params.push_back("-aspect"); - snprintf(tmp, SRS_TMP_SIZE, "%d:%d", vwidth, vheight); - params.push_back(tmp); + params.push_back(srs_int2str(vwidth) + ":" + srs_int2str(vheight)); } if (vthreads > 0) { params.push_back("-threads"); - snprintf(tmp, SRS_TMP_SIZE, "%d", vthreads); - params.push_back(tmp); + params.push_back(srs_int2str(vthreads)); } params.push_back("-profile:v"); @@ -360,20 +350,17 @@ int SrsFFMPEG::start() if (acodec != SRS_RTMP_ENCODER_COPY) { if (abitrate > 0) { params.push_back("-b:a"); - snprintf(tmp, SRS_TMP_SIZE, "%d", abitrate * 1000); - params.push_back(tmp); + params.push_back(srs_int2str(abitrate * 1000)); } if (asample_rate > 0) { params.push_back("-ar"); - snprintf(tmp, SRS_TMP_SIZE, "%d", asample_rate); - params.push_back(tmp); + params.push_back(srs_int2str(asample_rate)); } if (achannels > 0) { params.push_back("-ac"); - snprintf(tmp, SRS_TMP_SIZE, "%d", achannels); - params.push_back(tmp); + params.push_back(srs_int2str(achannels)); } // aparams diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 175a50e3c..9eb821941 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -98,7 +98,7 @@ int SrsKafkaProducer::request_metadata() } srs_assert(!brokers->args.empty()); - std::string broker = lb->select(brokers->args); + std::string broker = lb->select(brokers->args); if (true) { std::string senabled = srs_bool2switch(enabled); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index c4af37664..89d568c20 100755 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -720,15 +720,12 @@ int SrsServer::acquire_pid_file() srs_error("truncate pid file %s error! ret=%#x", pid_file.c_str(), ret); return ret; } - - int pid = (int)getpid(); // write the pid - char buf[512]; - snprintf(buf, sizeof(buf), "%d", pid); - if (write(fd, buf, strlen(buf)) != (int)strlen(buf)) { + string pid = srs_int2str(getpid()); + if (write(fd, pid.c_str(), pid.length()) != pid.length()) { ret = ERROR_SYSTEM_PID_WRITE_FILE; - srs_error("write our pid error! pid=%d file=%s ret=%#x", pid, pid_file.c_str(), ret); + srs_error("write our pid error! pid=%s file=%s ret=%#x", pid.c_str(), pid_file.c_str(), ret); return ret; } @@ -746,7 +743,7 @@ int SrsServer::acquire_pid_file() return ret; } - srs_trace("write pid=%d to %s success!", pid, pid_file.c_str()); + srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file.c_str()); pid_fd = fd; return ret; diff --git a/trunk/src/app/srs_app_utility.cpp b/trunk/src/app/srs_app_utility.cpp index e06d83b30..7f25b82fe 100644 --- a/trunk/src/app/srs_app_utility.cpp +++ b/trunk/src/app/srs_app_utility.cpp @@ -204,8 +204,7 @@ string srs_path_build_timestamp(string template_path) // [timestamp],replace this const to current UNIX timestamp in ms. if (true) { int64_t now_us = ((int64_t)tv.tv_sec) * 1000 * 1000 + (int64_t)tv.tv_usec; - snprintf(buf, sizeof(buf), "%"PRId64, now_us / 1000); - path = srs_string_replace(path, "[timestamp]", buf); + path = srs_string_replace(path, "[timestamp]", srs_int2str(now_us / 1000)); } return path; diff --git a/trunk/src/kernel/srs_kernel_balance.cpp b/trunk/src/kernel/srs_kernel_balance.cpp index b29ab3382..78f2808f4 100644 --- a/trunk/src/kernel/srs_kernel_balance.cpp +++ b/trunk/src/kernel/srs_kernel_balance.cpp @@ -23,6 +23,8 @@ #include +using namespace std; + SrsLbRoundRobin::SrsLbRoundRobin() { index = -1; @@ -38,3 +40,18 @@ u_int32_t SrsLbRoundRobin::current() return index; } +string SrsLbRoundRobin::selected() +{ + return elem; +} + +string SrsLbRoundRobin::select(const vector& servers) +{ + srs_assert(!servers.empty()); + + index = (int)(count++ % servers.size()); + elem = servers.at(index); + + return elem; +} + diff --git a/trunk/src/kernel/srs_kernel_balance.hpp b/trunk/src/kernel/srs_kernel_balance.hpp index 7bf3be5e3..057a6aec6 100644 --- a/trunk/src/kernel/srs_kernel_balance.hpp +++ b/trunk/src/kernel/srs_kernel_balance.hpp @@ -30,6 +30,7 @@ #include #include +#include /** * the round-robin load balance algorithm, @@ -42,21 +43,15 @@ private: int index; // total scheduled count. u_int32_t count; + // current selected server. + std::string elem; public: SrsLbRoundRobin(); virtual ~SrsLbRoundRobin(); public: virtual u_int32_t current(); -public: - template - const T& select(const std::vector& servers) - { - srs_assert(!servers.empty()); - - index = (int)(count++ % servers.size()); - - return servers.at(index); - } + virtual std::string selected(); + virtual std::string select(const std::vector& servers); }; #endif diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index ae93f7a84..3e06ac379 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -169,14 +169,43 @@ string srs_dns_resolve(string host) char ipv4[16]; memset(ipv4, 0, sizeof(ipv4)); - for (int i = 0; i < answer->h_length; i++) { - inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4)); - break; + + // covert the first entry to ip. + if (answer->h_length > 0) { + inet_ntop(AF_INET, answer->h_addr_list[0], ipv4, sizeof(ipv4)); } return ipv4; } +void srs_parse_hostport(const string& hostport, string& host, int& port) +{ + host = hostport; + + size_t pos = hostport.find(":"); + if (pos != std::string::npos) { + string p = hostport.substr(pos + 1); + host = hostport.substr(0, pos); + port = ::atoi(p.c_str()); + } +} + +string srs_int2str(int64_t value) +{ + // len(max int64_t) is 20, plus one "+-." + char tmp[22]; + snprintf(tmp, 22, "%"PRId64, value); + return tmp; +} + +string srs_float2str(double value) +{ + // len(max int64_t) is 20, plus one "+-." + char tmp[22]; + snprintf(tmp, 22, "%.2f", value); + return tmp; +} + bool srs_is_little_endian() { // convert to network(big-endian) order, if not equals, diff --git a/trunk/src/kernel/srs_kernel_utility.hpp b/trunk/src/kernel/srs_kernel_utility.hpp index 628833826..759bc383b 100644 --- a/trunk/src/kernel/srs_kernel_utility.hpp +++ b/trunk/src/kernel/srs_kernel_utility.hpp @@ -52,6 +52,13 @@ extern int64_t srs_update_system_time_ms(); // dns resolve utility, return the resolved ip address. extern std::string srs_dns_resolve(std::string host); +// split the host:port to host and port. +extern void srs_parse_hostport(const std::string& hostport, std::string& host, int& port); + +// parse the int64 value to string. +extern std::string srs_int2str(int64_t value); +// parse the float value to string, precise is 2. +extern std::string srs_float2str(double value); // whether system is little endian extern bool srs_is_little_endian(); diff --git a/trunk/src/protocol/srs_http_stack.cpp b/trunk/src/protocol/srs_http_stack.cpp index 33f89e7af..e1fd3a310 100755 --- a/trunk/src/protocol/srs_http_stack.cpp +++ b/trunk/src/protocol/srs_http_stack.cpp @@ -174,9 +174,7 @@ int64_t SrsHttpHeader::content_length() void SrsHttpHeader::set_content_length(int64_t size) { - char buf[64]; - snprintf(buf, sizeof(buf), "%"PRId64, size); - set("Content-Length", buf); + set("Content-Length", srs_int2str(size)); } string SrsHttpHeader::content_type() diff --git a/trunk/src/protocol/srs_protocol_json.cpp b/trunk/src/protocol/srs_protocol_json.cpp index cf8369b71..60987626a 100644 --- a/trunk/src/protocol/srs_protocol_json.cpp +++ b/trunk/src/protocol/srs_protocol_json.cpp @@ -28,6 +28,7 @@ using namespace std; #include #include +#include /* json encode cout<< SRS_JOBJECT_START @@ -328,10 +329,7 @@ string SrsJsonAny::dumps() return to_boolean()? "true":"false"; } case SRS_JSON_Integer: { - // len(max int64_t) is 20, plus one "+-." - char tmp[22]; - snprintf(tmp, 22, "%"PRId64, to_integer()); - return tmp; + return srs_int2str(to_integer()); } case SRS_JSON_Number: { // len(max int64_t) is 20, plus one "+-." diff --git a/trunk/src/protocol/srs_protocol_utility.cpp b/trunk/src/protocol/srs_protocol_utility.cpp index 9d9cd1999..60be8e561 100644 --- a/trunk/src/protocol/srs_protocol_utility.cpp +++ b/trunk/src/protocol/srs_protocol_utility.cpp @@ -125,6 +125,11 @@ void srs_random_generate(char* bytes, int size) } } +string srs_generate_tc_url(string ip, string vhost, string app, int port, string param) +{ + return srs_generate_tc_url(ip, vhost, app, srs_int2str(port), param); +} + string srs_generate_tc_url(string ip, string vhost, string app, string port, string param) { string tcUrl = "rtmp://"; diff --git a/trunk/src/protocol/srs_protocol_utility.hpp b/trunk/src/protocol/srs_protocol_utility.hpp index 05c2b7f27..beecf51a9 100644 --- a/trunk/src/protocol/srs_protocol_utility.hpp +++ b/trunk/src/protocol/srs_protocol_utility.hpp @@ -89,6 +89,10 @@ extern std::string srs_generate_tc_url( std::string ip, std::string vhost, std::string app, std::string port, std::string param ); +extern std::string srs_generate_tc_url( + std::string ip, std::string vhost, std::string app, int port, + std::string param +); /** * compare the memory in bytes.