diff --git a/trunk/configure b/trunk/configure index 624fdfea7..af3067d8e 100755 --- a/trunk/configure +++ b/trunk/configure @@ -152,7 +152,7 @@ ModuleLibIncs=(${SRS_OBJS_DIR}) MODULE_FILES=("srs_kernel_error" "srs_kernel_log" "srs_kernel_buffer" "srs_kernel_utility" "srs_kernel_flv" "srs_kernel_codec" "srs_kernel_file" "srs_kernel_consts" "srs_kernel_aac" "srs_kernel_mp3" "srs_kernel_ts" - "srs_kernel_stream") + "srs_kernel_stream" "srs_kernel_balance") KERNEL_INCS="src/kernel"; MODULE_DIR=${KERNEL_INCS} . auto/modules.sh KERNEL_OBJS="${MODULE_OBJS[@]}" # diff --git a/trunk/ide/srs_upp/srs_upp.upp b/trunk/ide/srs_upp/srs_upp.upp index 1bc10d1e3..7f04cce13 100755 --- a/trunk/ide/srs_upp/srs_upp.upp +++ b/trunk/ide/srs_upp/srs_upp.upp @@ -23,6 +23,8 @@ file kernel readonly separator, ../../src/kernel/srs_kernel_aac.hpp, ../../src/kernel/srs_kernel_aac.cpp, + ../../src/kernel/srs_kernel_balance.hpp, + ../../src/kernel/srs_kernel_balance.cpp, ../../src/kernel/srs_kernel_stream.hpp, ../../src/kernel/srs_kernel_stream.cpp, ../../src/kernel/srs_kernel_codec.hpp, @@ -113,6 +115,8 @@ file ../../src/app/srs_app_http_static.cpp, ../../src/app/srs_app_ingest.hpp, ../../src/app/srs_app_ingest.cpp, + ../../src/app/srs_app_kafka.hpp, + ../../src/app/srs_app_kafka.cpp, ../../src/app/srs_app_listener.hpp, ../../src/app/srs_app_listener.cpp, ../../src/app/srs_app_log.hpp, diff --git a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj index 35f350df0..dab5aa4e6 100644 --- a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj +++ b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj @@ -111,6 +111,7 @@ 3CC52DDC1ACE4023006FEB01 /* srs_utest_protocol.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD21ACE4023006FEB01 /* srs_utest_protocol.cpp */; }; 3CC52DDD1ACE4023006FEB01 /* srs_utest_reload.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD41ACE4023006FEB01 /* srs_utest_reload.cpp */; }; 3CC52DDE1ACE4023006FEB01 /* srs_utest.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD61ACE4023006FEB01 /* srs_utest.cpp */; }; + 3CD247C31BB3F14100DC1922 /* srs_kernel_balance.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CD247C11BB3F14000DC1922 /* srs_kernel_balance.cpp */; }; 3CD88B3F1ACA9C58000359E0 /* srs_app_async_call.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */; }; 3CE6CD311AE4AFB800706E07 /* srs_main_ingest_hls.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */; }; /* End PBXBuildFile section */ @@ -389,6 +390,8 @@ 3CC52DD51ACE4023006FEB01 /* srs_utest_reload.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_utest_reload.hpp; path = ../../src/utest/srs_utest_reload.hpp; sourceTree = ""; }; 3CC52DD61ACE4023006FEB01 /* srs_utest.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_utest.cpp; path = ../../src/utest/srs_utest.cpp; sourceTree = ""; }; 3CC52DD71ACE4023006FEB01 /* srs_utest.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_utest.hpp; path = ../../src/utest/srs_utest.hpp; sourceTree = ""; }; + 3CD247C11BB3F14000DC1922 /* srs_kernel_balance.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_kernel_balance.cpp; path = ../../../src/kernel/srs_kernel_balance.cpp; sourceTree = ""; }; + 3CD247C21BB3F14000DC1922 /* srs_kernel_balance.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_kernel_balance.hpp; path = ../../../src/kernel/srs_kernel_balance.hpp; sourceTree = ""; }; 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_async_call.cpp; path = ../../../src/app/srs_app_async_call.cpp; sourceTree = ""; }; 3CD88B3E1ACA9C58000359E0 /* srs_app_async_call.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_async_call.hpp; path = ../../../src/app/srs_app_async_call.hpp; sourceTree = ""; }; 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_main_ingest_hls.cpp; path = ../../../src/main/srs_main_ingest_hls.cpp; sourceTree = ""; }; @@ -486,6 +489,8 @@ children = ( 3C1232081AAE814D00CE8F6C /* srs_kernel_aac.cpp */, 3C1232091AAE814D00CE8F6C /* srs_kernel_aac.hpp */, + 3CD247C11BB3F14000DC1922 /* srs_kernel_balance.cpp */, + 3CD247C21BB3F14000DC1922 /* srs_kernel_balance.hpp */, 3C12321A1AAE814D00CE8F6C /* srs_kernel_buffer.cpp */, 3C12321B1AAE814D00CE8F6C /* srs_kernel_buffer.hpp */, 3C12320C1AAE814D00CE8F6C /* srs_kernel_codec.cpp */, @@ -900,6 +905,7 @@ 3C26E3C61BB146FF00D0F9DB /* srs_app_kafka.cpp in Sources */, 3C663F131AB0155100286D8B /* srs_flv_injecter.c in Sources */, 3C1232971AAE81D900CE8F6C /* srs_app_dvr.cpp in Sources */, + 3CD247C31BB3F14100DC1922 /* srs_kernel_balance.cpp in Sources */, 3C1232271AAE814D00CE8F6C /* srs_kernel_log.cpp in Sources */, 3C689F961AB6AAAC00C9CEEE /* event.c in Sources */, 3C1232A81AAE81D900CE8F6C /* srs_app_log.cpp in Sources */, @@ -970,7 +976,7 @@ 3C036B561B2D0AC10078E2E0 /* srs_app_http_stream.cpp in Sources */, 3C068D6D1B10175500AA722C /* srs_protocol_stream.cpp in Sources */, 3CB25C2A1BB269FD00C97A63 /* jmp_sp.cpp in Sources */, - 3C068D6D1B10175500AA722C /* srs_protocol_buffer.cpp in Sources */, + 3C068D6D1B10175500AA722C /* srs_protocol_stream.cpp in Sources */, 3C1232441AAE81A400CE8F6C /* srs_rtmp_handshake.cpp in Sources */, 3C1232291AAE814D00CE8F6C /* srs_kernel_buffer.cpp in Sources */, 3C663F181AB0155100286D8B /* srs_play.c in Sources */, diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 83c63568e..f0de5ac7a 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4290,7 +4290,7 @@ SrsConfDirective* SrsConfig::get_kafka_brokers() return NULL; } - conf->get("brokers"); + conf = conf->get("brokers"); if (!conf || conf->args.empty()) { return NULL; } diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index a1f100be4..175a50e3c 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -23,21 +23,28 @@ #include +#include +using namespace std; + #include #include #include #include #include +#include +#include #ifdef SRS_AUTO_KAFKA SrsKafkaProducer::SrsKafkaProducer() { + lb = new SrsLbRoundRobin(); worker = new SrsAsyncCallWorker(); } SrsKafkaProducer::~SrsKafkaProducer() { + srs_freep(lb); srs_freep(worker); } @@ -46,7 +53,7 @@ int SrsKafkaProducer::initialize() int ret = ERROR_SUCCESS; // when kafka enabled, request metadata when startup. - if (_srs_config->get_kafka_enabled() && (ret = request_metadata()) != ERROR_SUCCESS) { + if ((ret = request_metadata()) != ERROR_SUCCESS) { srs_error("request kafka metadata failed. ret=%d", ret); return ret; } @@ -65,8 +72,7 @@ int SrsKafkaProducer::start() return ret; } - std::string enabled = srs_bool2switch(_srs_config->get_kafka_enabled()); - srs_trace("kafka worker ok, enabled:%s", enabled.c_str()); + srs_info("kafka worker ok"); return ret; } @@ -80,7 +86,26 @@ int SrsKafkaProducer::request_metadata() { int ret = ERROR_SUCCESS; - srs_info("update kafka metadata ok"); + bool enabled = _srs_config->get_kafka_enabled(); + if (!enabled) { + return ret; + } + + SrsConfDirective* brokers = _srs_config->get_kafka_brokers(); + if (!brokers) { + srs_warn("ignore for empty brokers."); + return ret; + } + + srs_assert(!brokers->args.empty()); + std::string broker = lb->select(brokers->args); + + if (true) { + std::string senabled = srs_bool2switch(enabled); + std::string sbrokers = srs_join_vector_string(brokers->args, ","); + srs_trace("kafka ok, enabled:%s, brokers:%s, current:[%d]%s", + senabled.c_str(), sbrokers.c_str(), lb->current(), broker.c_str()); + } return ret; } diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 946b8e4a6..02dc39724 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -29,6 +29,7 @@ */ #include +class SrsLbRoundRobin; class SrsAsyncCallWorker; #ifdef SRS_AUTO_KAFKA @@ -39,6 +40,7 @@ class SrsAsyncCallWorker; class SrsKafkaProducer { private: + SrsLbRoundRobin* lb; SrsAsyncCallWorker* worker; public: SrsKafkaProducer(); diff --git a/trunk/src/kernel/srs_kernel_balance.cpp b/trunk/src/kernel/srs_kernel_balance.cpp new file mode 100644 index 000000000..b29ab3382 --- /dev/null +++ b/trunk/src/kernel/srs_kernel_balance.cpp @@ -0,0 +1,40 @@ +/* + The MIT License (MIT) + + Copyright (c) 2013-2015 SRS(simple-rtmp-server) + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +SrsLbRoundRobin::SrsLbRoundRobin() +{ + index = -1; + count = 0; +} + +SrsLbRoundRobin::~SrsLbRoundRobin() +{ +} + +u_int32_t SrsLbRoundRobin::current() +{ + return index; +} + diff --git a/trunk/src/kernel/srs_kernel_balance.hpp b/trunk/src/kernel/srs_kernel_balance.hpp new file mode 100644 index 000000000..7bf3be5e3 --- /dev/null +++ b/trunk/src/kernel/srs_kernel_balance.hpp @@ -0,0 +1,63 @@ +/* + The MIT License (MIT) + + Copyright (c) 2013-2015 SRS(simple-rtmp-server) + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_KERNEL_AAC_HPP +#define SRS_KERNEL_AAC_HPP + +/* +#include +*/ +#include + +#include + +/** + * the round-robin load balance algorithm, + * used for edge pull, kafka and other multiple server feature. + */ +class SrsLbRoundRobin +{ +private: + // current selected index. + int index; + // total scheduled count. + u_int32_t count; +public: + SrsLbRoundRobin(); + virtual ~SrsLbRoundRobin(); +public: + virtual u_int32_t current(); +public: + template + const T& select(const std::vector& servers) + { + srs_assert(!servers.empty()); + + index = (int)(count++ % servers.size()); + + return servers.at(index); + } +}; + +#endif +