diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 2bcc0f2c7..89da50e5f 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -37,10 +37,18 @@ using namespace std; #ifdef SRS_AUTO_KAFKA +#define SRS_KAKFA_CYCLE_INTERVAL_MS 3000 + SrsKafkaProducer::SrsKafkaProducer() { - lb = new SrsLbRoundRobin(); + meatadata_ok = false; + metadata_expired = st_cond_new(); + + lock = st_mutex_new(); + pthread = new SrsReusableThread("kafka", this, SRS_KAKFA_CYCLE_INTERVAL_MS * 1000); worker = new SrsAsyncCallWorker(); + + lb = new SrsLbRoundRobin(); transport = new SrsTcpClient(); kafka = new SrsKafkaClient(transport); } @@ -48,21 +56,21 @@ SrsKafkaProducer::SrsKafkaProducer() SrsKafkaProducer::~SrsKafkaProducer() { srs_freep(lb); - srs_freep(worker); srs_freep(kafka); srs_freep(transport); + + srs_freep(worker); + srs_freep(pthread); + + st_mutex_destroy(lock); + st_cond_destroy(metadata_expired); } int SrsKafkaProducer::initialize() { int ret = ERROR_SUCCESS; - // when kafka enabled, request metadata when startup. - if ((ret = request_metadata()) != ERROR_SUCCESS) { - srs_error("request kafka metadata failed. ret=%d", ret); - return ret; - } - + meatadata_ok = false; srs_info("initialize kafka producer ok."); return ret; @@ -73,20 +81,78 @@ int SrsKafkaProducer::start() int ret = ERROR_SUCCESS; if ((ret = worker->start()) != ERROR_SUCCESS) { - srs_error("start kafka failed. ret=%d", ret); + srs_error("start kafka worker failed. ret=%d", ret); return ret; } - srs_info("kafka worker ok"); + if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_error("start kafka thread failed. ret=%d", ret); + } + + meatadata_ok = false; + st_cond_signal(metadata_expired); + srs_trace("kafka work in background"); return ret; } void SrsKafkaProducer::stop() { + pthread->stop(); worker->stop(); } +int SrsKafkaProducer::cycle() +{ + int ret = ERROR_SUCCESS; + + if ((ret = do_cycle()) != ERROR_SUCCESS) { + srs_warn("ignore kafka error. ret=%d", ret); + } + + return ret; +} + +int SrsKafkaProducer::on_before_cycle() +{ + // wait for the metadata expired. + // when metadata is ok, wait for it expired. + if (meatadata_ok) { + st_cond_wait(metadata_expired); + } + + // request to lock to acquire the socket. + st_mutex_lock(lock); + + return ERROR_SUCCESS; +} + +int SrsKafkaProducer::on_end_cycle() +{ + st_mutex_unlock(lock); + + return ERROR_SUCCESS; +} + +int SrsKafkaProducer::do_cycle() +{ + int ret = ERROR_SUCCESS; + + // ignore when disabled. + bool enabled = _srs_config->get_kafka_enabled(); + if (!enabled) { + return ret; + } + + // when kafka enabled, request metadata when startup. + if ((ret = request_metadata()) != ERROR_SUCCESS) { + srs_error("request kafka metadata failed. ret=%d", ret); + return ret; + } + + return ret; +} + int SrsKafkaProducer::request_metadata() { int ret = ERROR_SUCCESS; @@ -130,9 +196,11 @@ int SrsKafkaProducer::request_metadata() std::string senabled = srs_bool2switch(enabled); std::string sbrokers = srs_join_vector_string(brokers->args, ","); srs_trace("kafka ok, enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s", - senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); + senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); } + meatadata_ok = true; + return ret; } diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 81251317f..e3ec46453 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -34,13 +34,21 @@ class SrsAsyncCallWorker; class SrsTcpClient; class SrsKafkaClient; +#include + #ifdef SRS_AUTO_KAFKA /** * the kafka producer used to save log to kafka cluster. */ -class SrsKafkaProducer +class SrsKafkaProducer : public ISrsReusableThreadHandler { +private: + st_mutex_t lock; + SrsReusableThread* pthread; +private: + bool meatadata_ok; + st_cond_t metadata_expired; private: SrsLbRoundRobin* lb; SrsAsyncCallWorker* worker; @@ -53,7 +61,13 @@ public: virtual int initialize(); virtual int start(); virtual void stop(); +// interface ISrsReusableThreadHandler +public: + virtual int cycle(); + virtual int on_before_cycle(); + virtual int on_end_cycle(); private: + virtual int do_cycle(); virtual int request_metadata(); };