From 31a77a83edac6414a791c210e52098d46ad87caa Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 22 Oct 2015 14:22:10 +0800 Subject: [PATCH] convert metadata to partitions --- trunk/src/app/srs_app_kafka.cpp | 137 ++++++++++++++++++++++++-------- trunk/src/app/srs_app_kafka.hpp | 24 ++++++ 2 files changed, 128 insertions(+), 33 deletions(-) diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 3c7b01c99..c59a1dce1 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -41,6 +41,83 @@ using namespace std; #define SRS_KAKFA_CYCLE_INTERVAL_MS 3000 +std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata) +{ + vector bs; + for (int i = 0; i < metadata->brokers.size(); i++) { + SrsKafkaBroker* broker = metadata->brokers.at(i); + + string hostport = srs_int2str(broker->node_id) + "/" + broker->host.to_str(); + if (broker->port > 0) { + hostport += ":" + srs_int2str(broker->port); + } + + bs.push_back(hostport); + } + + vector ps; + for (int i = 0; i < metadata->metadatas.size(); i++) { + SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); + + for (int j = 0; j < topic->metadatas.size(); j++) { + string desc = "topic=" + topic->name.to_str(); + + SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j); + + desc += "?partition=" + srs_int2str(partition->partition_id); + desc += "&leader=" + srs_int2str(partition->leader); + + vector replicas = srs_kafka_array2vector(&partition->replicas); + desc += "&replicas=" + srs_join_vector_string(replicas, ","); + + ps.push_back(desc); + } + } + + std::stringstream ss; + ss << "brokers=" << srs_join_vector_string(bs, ","); + ss << ", " << srs_join_vector_string(ps, ", "); + + return ss.str(); +} + +std::string srs_kafka_summary_partitions(const vector& partitions) +{ + vector ret; + + vector::const_iterator it; + for (it = partitions.begin(); it != partitions.end(); ++it) { + SrsKafkaPartition* partition = *it; + + string desc = "tcp://"; + desc += partition->host + ":" + srs_int2str(partition->port); + desc += "?broker=" + srs_int2str(partition->broker); + desc += "&partition=" + srs_int2str(partition->id); + ret.push_back(desc); + } + + return srs_join_vector_string(ret, ", "); +} + +SrsKafkaPartition::SrsKafkaPartition() +{ + id = broker = 0; + port = SRS_CONSTS_KAFKA_DEFAULT_PORT; +} + +SrsKafkaPartition::~SrsKafkaPartition() +{ +} + +string SrsKafkaPartition::hostport() +{ + if (ep.empty()) { + ep = host + ":" + srs_int2str(port); + } + + return ep; +} + SrsKafkaProducer::SrsKafkaProducer() { meatadata_ok = false; @@ -57,6 +134,13 @@ SrsKafkaProducer::SrsKafkaProducer() SrsKafkaProducer::~SrsKafkaProducer() { + vector::iterator it; + for (it = partitions.begin(); it != partitions.end(); ++it) { + SrsKafkaPartition* partition = *it; + srs_freep(partition); + } + partitions.clear(); + srs_freep(lb); srs_freep(kafka); srs_freep(transport); @@ -203,46 +287,33 @@ int SrsKafkaProducer::request_metadata() SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata); // show kafka metadata. - string summary; - if (true) { - vector bs; - for (int i = 0; i < metadata->brokers.size(); i++) { - SrsKafkaBroker* broker = metadata->brokers.at(i); - - string hostport = srs_int2str(broker->node_id) + "/" + broker->host.to_str(); - if (broker->port > 0) { - hostport += ":" + srs_int2str(broker->port); - } - - bs.push_back(hostport); - } + string summary = srs_kafka_metadata_summary(metadata); + srs_trace("kafka metadata: %s", summary.c_str()); + + // generate the partition info. + for (int i = 0; i < metadata->metadatas.size(); i++) { + SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); - vector ps; - for (int i = 0; i < metadata->metadatas.size(); i++) { - SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); + for (int j = 0; j < topic->metadatas.size(); j++) { + SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j); - string desc = "topic=" + topic->name.to_str(); + SrsKafkaPartition* p = new SrsKafkaPartition(); + p->id = partition->partition_id; + p->broker = partition->leader; - for (int j = 0; j < topic->metadatas.size(); j++) { - SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j); - - desc += ", partition" + srs_int2str(partition->partition_id) +"="; - desc += srs_int2str(partition->leader) + "/"; - - vector replicas = srs_kafka_array2vector(&partition->replicas); - desc += srs_join_vector_string(replicas, ","); + for (int i = 0; i < metadata->brokers.size(); i++) { + SrsKafkaBroker* broker = metadata->brokers.at(i); + if (broker->node_id == p->broker) { + p->host = broker->host.to_str(); + p->port = broker->port; + break; + } } - ps.push_back(desc); + partitions.push_back(p); } - - std::stringstream ss; - ss << "brokers=" << srs_join_vector_string(bs, ","); - ss << ", " << srs_join_vector_string(ps, ","); - - summary = ss.str(); } - srs_trace("kafka metadata: %s", summary.c_str()); + srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str()); meatadata_ok = true; diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index e3ec46453..dfd28f954 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -29,6 +29,8 @@ */ #include +#include + class SrsLbRoundRobin; class SrsAsyncCallWorker; class SrsTcpClient; @@ -38,6 +40,26 @@ class SrsKafkaClient; #ifdef SRS_AUTO_KAFKA +/** + * the kafka partition info. + */ +struct SrsKafkaPartition +{ +private: + std::string ep; +public: + int id; + // leader. + int broker; + std::string host; + int port; +public: + SrsKafkaPartition(); + virtual ~SrsKafkaPartition(); +public: + virtual std::string hostport(); +}; + /** * the kafka producer used to save log to kafka cluster. */ @@ -49,6 +71,8 @@ private: private: bool meatadata_ok; st_cond_t metadata_expired; +public: + std::vector partitions; private: SrsLbRoundRobin* lb; SrsAsyncCallWorker* worker;