From 7a0aaf5900604de8e600b29b8de270a72fb18f57 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 22 Oct 2015 17:43:15 +0800 Subject: [PATCH] kafka refine code --- trunk/src/app/srs_app_kafka.cpp | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index ac4e96041..01e99fab8 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -281,28 +281,32 @@ int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitio // ensure the key exists. srs_assert (cache.find(key) != cache.end()); + // the cache is vector, which is continous store. + // we remember the messages we have written and clear it when completed. + int nb_msgs = (int)pc->size(); + if (pc->empty()) { + return ret; + } + // connect transport. if ((ret = partition->connect()) != ERROR_SUCCESS) { srs_error("connect to partition failed. ret=%d", ret); return ret; } - // copy the messages to a temp cache. - SrsKafkaPartitionCache tpc(*pc); - // TODO: FIXME: implements it. // free all wrote messages. - for (vector::iterator it = tpc.begin(); it != tpc.end(); ++it) { + for (vector::iterator it = pc->begin(); it != pc->end(); ++it) { SrsJsonObject* obj = *it; srs_freep(obj); } // remove the messages from cache. - if (pc->size() == tpc.size()) { + if (pc->size() == nb_msgs) { pc->clear(); } else { - pc->erase(pc->begin(), pc->begin() + tpc.size()); + pc->erase(pc->begin(), pc->begin() + nb_msgs); } return ret;