From cbe4252b4d08f3e1fdf0fa14ed9ffe275d4780dd Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 23 Oct 2015 14:37:34 +0800 Subject: [PATCH] refine code. --- trunk/src/app/srs_app_kafka.cpp | 60 ++++++++++++++++----------------- trunk/src/app/srs_app_kafka.hpp | 10 +++--- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index b4c96129a..ed63d9748 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -381,25 +381,37 @@ void SrsKafkaProducer::stop() worker->stop(); } -int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) +int SrsKafkaProducer::send(int key, SrsJsonObject* obj) { int ret = ERROR_SUCCESS; - bool enabled = _srs_config->get_kafka_enabled(); - if (!enabled) { + // cache the json object. + cache->append(key, obj); + + // too few messages, ignore. + if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) { return ret; } - SrsJsonObject* obj = SrsJsonAny::object(); + // too many messages, warn user. + if (cache->size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) { + srs_warn("kafka cache too many messages: %d", cache->size()); + } - obj->set("msg", SrsJsonAny::str("accept")); - obj->set("type", SrsJsonAny::integer(type)); - obj->set("ip", SrsJsonAny::str(ip.c_str())); + // sync with backgound metadata worker. + st_mutex_lock(lock); - return worker->execute(new SrsKafkaMessage(this, key, obj)); + // flush message when metadata is ok. + if (metadata_ok) { + ret = flush(); + } + + st_mutex_unlock(lock); + + return ret; } -int SrsKafkaProducer::on_close(int key) +int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) { int ret = ERROR_SUCCESS; @@ -410,39 +422,27 @@ int SrsKafkaProducer::on_close(int key) SrsJsonObject* obj = SrsJsonAny::object(); - obj->set("msg", SrsJsonAny::str("close")); + obj->set("msg", SrsJsonAny::str("accept")); + obj->set("type", SrsJsonAny::integer(type)); + obj->set("ip", SrsJsonAny::str(ip.c_str())); return worker->execute(new SrsKafkaMessage(this, key, obj)); } -int SrsKafkaProducer::send(int key, SrsJsonObject* obj) +int SrsKafkaProducer::on_close(int key) { int ret = ERROR_SUCCESS; - // cache the json object. - cache->append(key, obj); - - // too few messages, ignore. - if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) { + bool enabled = _srs_config->get_kafka_enabled(); + if (!enabled) { return ret; } - // too many messages, warn user. - if (cache->size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) { - srs_warn("kafka cache too many messages: %d", cache->size()); - } - - // sync with backgound metadata worker. - st_mutex_lock(lock); - - // flush message when metadata is ok. - if (metadata_ok) { - ret = flush(); - } + SrsJsonObject* obj = SrsJsonAny::object(); - st_mutex_unlock(lock); + obj->set("msg", SrsJsonAny::str("close")); - return ret; + return worker->execute(new SrsKafkaMessage(this, key, obj)); } int SrsKafkaProducer::cycle() diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index b42f1c9d4..3b4129262 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -173,11 +173,7 @@ public: virtual int initialize(); virtual int start(); virtual void stop(); -// interface ISrsKafkaCluster -public: - virtual int on_client(int key, SrsListenerType type, std::string ip); - virtual int on_close(int key); -// for worker to call task to send object. +// internal: for worker to call task to send object. public: /** * send json object to kafka cluster. @@ -186,6 +182,10 @@ public: * @param obj the json object; user must never free it again. */ virtual int send(int key, SrsJsonObject* obj); +// interface ISrsKafkaCluster +public: + virtual int on_client(int key, SrsListenerType type, std::string ip); + virtual int on_close(int key); // interface ISrsReusableThreadHandler public: virtual int cycle();