diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index c59a1dce1..03f832429 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -99,6 +99,32 @@ std::string srs_kafka_summary_partitions(const vector& parti return srs_join_vector_string(ret, ", "); } +void srs_kafka_metadata2connector(SrsKafkaTopicMetadataResponse* metadata, vector& partitions) +{ + for (int i = 0; i < metadata->metadatas.size(); i++) { + SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); + + for (int j = 0; j < topic->metadatas.size(); j++) { + SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j); + + SrsKafkaPartition* p = new SrsKafkaPartition(); + p->id = partition->partition_id; + p->broker = partition->leader; + + for (int i = 0; i < metadata->brokers.size(); i++) { + SrsKafkaBroker* broker = metadata->brokers.at(i); + if (broker->node_id == p->broker) { + p->host = broker->host.to_str(); + p->port = broker->port; + break; + } + } + + partitions.push_back(p); + } + } +} + SrsKafkaPartition::SrsKafkaPartition() { id = broker = 0; @@ -120,7 +146,7 @@ string SrsKafkaPartition::hostport() SrsKafkaProducer::SrsKafkaProducer() { - meatadata_ok = false; + metadata_ok = false; metadata_expired = st_cond_new(); lock = st_mutex_new(); @@ -156,7 +182,6 @@ int SrsKafkaProducer::initialize() { int ret = ERROR_SUCCESS; - meatadata_ok = false; srs_info("initialize kafka producer ok."); return ret; @@ -175,9 +200,7 @@ int SrsKafkaProducer::start() srs_error("start kafka thread failed. ret=%d", ret); } - meatadata_ok = false; - st_cond_signal(metadata_expired); - srs_trace("kafka work in background"); + refresh_metadata(); return ret; } @@ -203,7 +226,7 @@ int SrsKafkaProducer::on_before_cycle() { // wait for the metadata expired. // when metadata is ok, wait for it expired. - if (meatadata_ok) { + if (metadata_ok) { st_cond_wait(metadata_expired); } @@ -291,34 +314,20 @@ int SrsKafkaProducer::request_metadata() srs_trace("kafka metadata: %s", summary.c_str()); // generate the partition info. - for (int i = 0; i < metadata->metadatas.size(); i++) { - SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); - - for (int j = 0; j < topic->metadatas.size(); j++) { - SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j); - - SrsKafkaPartition* p = new SrsKafkaPartition(); - p->id = partition->partition_id; - p->broker = partition->leader; - - for (int i = 0; i < metadata->brokers.size(); i++) { - SrsKafkaBroker* broker = metadata->brokers.at(i); - if (broker->node_id == p->broker) { - p->host = broker->host.to_str(); - p->port = broker->port; - break; - } - } - - partitions.push_back(p); - } - } + srs_kafka_metadata2connector(metadata, partitions); srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str()); - meatadata_ok = true; + metadata_ok = true; return ret; } +void SrsKafkaProducer::refresh_metadata() +{ + metadata_ok = false; + st_cond_signal(metadata_expired); + srs_trace("kafka async refresh metadata in background"); +} + #endif diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index dfd28f954..f7edaa47f 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -69,7 +69,7 @@ private: st_mutex_t lock; SrsReusableThread* pthread; private: - bool meatadata_ok; + bool metadata_ok; st_cond_t metadata_expired; public: std::vector partitions; @@ -93,6 +93,8 @@ public: private: virtual int do_cycle(); virtual int request_metadata(); + // set the metadata to invalid and refresh it. + virtual void refresh_metadata(); }; #endif diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index ed66d76f5..c0b0eaa16 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -1010,7 +1010,7 @@ int SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg) // fetch cached api key. SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance(); SrsKafkaApiKey key = pool->unset(header.correlation_id()); - srs_trace("kafka got %d bytes response, key=%d", header.total_size(), header.correlation_id()); + srs_info("kafka got %d bytes response, key=%d", header.total_size(), header.correlation_id()); // create message by cached api key. SrsKafkaResponse* res = NULL;