diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 0f75217f0..287751585 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -178,41 +178,31 @@ int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc) return kafka->write_messages(topic, id, *pc); } -SrsKafkaMessage::SrsKafkaMessage(int k) +SrsKafkaMessage::SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j) { + producer = p; key = k; + obj = j; } SrsKafkaMessage::~SrsKafkaMessage() { + srs_freep(obj); } -SrsKafkaMessageOnClient::SrsKafkaMessageOnClient(SrsKafkaProducer* p, int k, SrsListenerType t, string i) - : SrsKafkaMessage(k) -{ - producer = p; - type = t; - ip = i; -} - -SrsKafkaMessageOnClient::~SrsKafkaMessageOnClient() -{ -} - -int SrsKafkaMessageOnClient::call() +int SrsKafkaMessage::call() { - SrsJsonObject* obj = SrsJsonAny::object(); + int ret = producer->send(key, obj); - obj->set("msg", SrsJsonAny::str("accept")); - obj->set("type", SrsJsonAny::integer(type)); - obj->set("ip", SrsJsonAny::str(ip.c_str())); + // the obj is manged by producer now. + obj = NULL; - return producer->send(key, obj); + return ret; } -string SrsKafkaMessageOnClient::to_string() +string SrsKafkaMessage::to_string() { - return ip; + return "kafka"; } SrsKafkaCache::SrsKafkaCache() @@ -393,7 +383,13 @@ void SrsKafkaProducer::stop() int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) { - return worker->execute(new SrsKafkaMessageOnClient(this, key, type, ip)); + SrsJsonObject* obj = SrsJsonAny::object(); + + obj->set("msg", SrsJsonAny::str("accept")); + obj->set("type", SrsJsonAny::integer(type)); + obj->set("ip", SrsJsonAny::str(ip.c_str())); + + return worker->execute(new SrsKafkaMessage(this, key, obj)); } int SrsKafkaProducer::send(int key, SrsJsonObject* obj) diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 457470f0e..77a4c217c 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -80,21 +80,13 @@ public: */ class SrsKafkaMessage : public ISrsAsyncCallTask { -protected: +private: + SrsKafkaProducer* producer; int key; + SrsJsonObject* obj; public: - SrsKafkaMessage(int k); + SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j); virtual ~SrsKafkaMessage(); -}; -struct SrsKafkaMessageOnClient : public SrsKafkaMessage -{ -public: - SrsKafkaProducer* producer; - SrsListenerType type; - std::string ip; -public: - SrsKafkaMessageOnClient(SrsKafkaProducer* p, int k, SrsListenerType t, std::string i); - virtual ~SrsKafkaMessageOnClient(); // interface ISrsAsyncCallTask public: virtual int call();