diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 03f832429..5c15f1604 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -36,10 +36,12 @@ using namespace std; #include #include #include +#include #ifdef SRS_AUTO_KAFKA #define SRS_KAKFA_CYCLE_INTERVAL_MS 3000 +#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 10 std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata) { @@ -144,6 +146,33 @@ string SrsKafkaPartition::hostport() return ep; } +SrsKafkaMessageOnClient::SrsKafkaMessageOnClient(SrsKafkaProducer* p, SrsListenerType t, string i) +{ + producer = p; + type = t; + ip = i; +} + +SrsKafkaMessageOnClient::~SrsKafkaMessageOnClient() +{ +} + +int SrsKafkaMessageOnClient::call() +{ + SrsJsonObject* obj = SrsJsonAny::object(); + + obj->set("msg", SrsJsonAny::str("accept")); + obj->set("type", SrsJsonAny::integer(type)); + obj->set("ip", SrsJsonAny::str(ip.c_str())); + + return producer->send(obj); +} + +string SrsKafkaMessageOnClient::to_string() +{ + return ip; +} + SrsKafkaProducer::SrsKafkaProducer() { metadata_ok = false; @@ -211,6 +240,41 @@ void SrsKafkaProducer::stop() worker->stop(); } +int SrsKafkaProducer::on_client(SrsListenerType type, st_netfd_t stfd) +{ + return worker->execute(new SrsKafkaMessageOnClient(this, type, srs_get_peer_ip(st_netfd_fileno(stfd)))); +} + +int SrsKafkaProducer::send(SrsJsonObject* obj) +{ + int ret = ERROR_SUCCESS; + + // cache the json object. + objects.push_back(obj); + + // too few messages, ignore. + if (objects.size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) { + return ret; + } + + // too many messages, warn user. + if (objects.size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) { + srs_warn("kafka cache too many messages: %d", objects.size()); + } + + // sync with backgound metadata worker. + st_mutex_lock(lock); + + // flush message when metadata is ok. + if (metadata_ok) { + ret = flush(); + } + + st_mutex_unlock(lock); + + return ret; +} + int SrsKafkaProducer::cycle() { int ret = ERROR_SUCCESS; @@ -329,5 +393,12 @@ void SrsKafkaProducer::refresh_metadata() srs_trace("kafka async refresh metadata in background"); } +int SrsKafkaProducer::flush() +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + #endif diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index f7edaa47f..f4ba4231d 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -35,8 +35,12 @@ class SrsLbRoundRobin; class SrsAsyncCallWorker; class SrsTcpClient; class SrsKafkaClient; +class SrsJsonObject; +class SrsKafkaProducer; #include +#include +#include #ifdef SRS_AUTO_KAFKA @@ -60,6 +64,24 @@ public: virtual std::string hostport(); }; +/** + * the following is all types of kafka messages. + */ +struct SrsKafkaMessageOnClient : public ISrsAsyncCallTask +{ +public: + SrsKafkaProducer* producer; + SrsListenerType type; + std::string ip; +public: + SrsKafkaMessageOnClient(SrsKafkaProducer* p, SrsListenerType t, std::string i); + virtual ~SrsKafkaMessageOnClient(); +// interface ISrsAsyncCallTask +public: + virtual int call(); + virtual std::string to_string(); +}; + /** * the kafka producer used to save log to kafka cluster. */ @@ -73,6 +95,7 @@ private: st_cond_t metadata_expired; public: std::vector partitions; + std::vector objects; private: SrsLbRoundRobin* lb; SrsAsyncCallWorker* worker; @@ -85,6 +108,17 @@ public: virtual int initialize(); virtual int start(); virtual void stop(); +public: + /** + * when got any client connect to SRS, notify kafka. + */ + virtual int on_client(SrsListenerType type, st_netfd_t stfd); + /** + * send json object to kafka cluster. + * the producer will aggregate message and send in kafka message set. + * @param obj the json object; user must never free it again. + */ + virtual int send(SrsJsonObject* obj); // interface ISrsReusableThreadHandler public: virtual int cycle(); @@ -95,6 +129,7 @@ private: virtual int request_metadata(); // set the metadata to invalid and refresh it. virtual void refresh_metadata(); + virtual int flush(); }; #endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 89d568c20..6fc8aec49 100755 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1289,6 +1289,14 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) } srs_assert(conn); +#ifdef SRS_AUTO_KAFKA + // notify kafka cluster. + if ((ret = kafka->on_client(type, client_stfd)) != ERROR_SUCCESS) { + srs_error("kafka handler on_client failed. ret=%d", ret); + return ret; + } +#endif + // directly enqueue, the cycle thread will remove the client. conns.push_back(conn); srs_verbose("add conn to vector.");