From 84b3981d47d300726890a7c63a4317dad4bcae26 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 22 Oct 2015 11:47:38 +0800 Subject: [PATCH] refs #1670: support decode the metadata response. --- trunk/src/kernel/srs_kernel_error.hpp | 1 + trunk/src/protocol/srs_kafka_stack.cpp | 219 ++++++++++++++++++++++++- trunk/src/protocol/srs_kafka_stack.hpp | 129 ++++++++++++++- 3 files changed, 344 insertions(+), 5 deletions(-) diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 354dcd3cd..eb5149b06 100755 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -273,6 +273,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_KAFKA_CODEC_REQUEST 4032 #define ERROR_KAFKA_CODEC_RESPONSE 4033 #define ERROR_KAFKA_CODEC_ARRAY 4034 +#define ERROR_KAFKA_CODEC_METADATA 4035 /////////////////////////////////////////////////////// // HTTP API error. diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index 73ddec2cd..0ae7384e9 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -601,6 +601,200 @@ int SrsKafkaTopicMetadataRequest::decode(SrsBuffer* buf) return ret; } +SrsKafkaBroker::SrsKafkaBroker() +{ + node_id = port = 0; +} + +SrsKafkaBroker::~SrsKafkaBroker() +{ +} + +int SrsKafkaBroker::size() +{ + return 4 + host.size() + 4; +} + +int SrsKafkaBroker::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_METADATA; + srs_error("kafka encode broker node_id failed. ret=%d", ret); + return ret; + } + buf->write_4bytes(node_id); + + if ((ret = host.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode broker host failed. ret=%d", ret); + return ret; + } + + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_METADATA; + srs_error("kafka encode broker port failed. ret=%d", ret); + return ret; + } + buf->write_4bytes(port); + + return ret; +} + +int SrsKafkaBroker::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_METADATA; + srs_error("kafka decode broker node_id failed. ret=%d", ret); + return ret; + } + node_id = buf->read_4bytes(); + + if ((ret = host.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode broker host failed. ret=%d", ret); + return ret; + } + + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_METADATA; + srs_error("kafka decode broker port failed. ret=%d", ret); + return ret; + } + port = buf->read_4bytes(); + + return ret; +} + +SrsKafkaPartitionMetadata::SrsKafkaPartitionMetadata() +{ + error_code = 0; + partition_id = 0; + leader = 0; +} + +SrsKafkaPartitionMetadata::~SrsKafkaPartitionMetadata() +{ +} + +int SrsKafkaPartitionMetadata::size() +{ + return 2 + 4 + 4 + replicas.size() + isr.size(); +} + +int SrsKafkaPartitionMetadata::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(2 + 4 + 4)) { + ret = ERROR_KAFKA_CODEC_METADATA; + srs_error("kafka encode partition metadata failed. ret=%d", ret); + return ret; + } + buf->write_2bytes(error_code); + buf->write_4bytes(partition_id); + buf->write_4bytes(leader); + + if ((ret = replicas.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode partition metadata replicas failed. ret=%d", ret); + return ret; + } + if ((ret = isr.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode partition metadata isr failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsKafkaPartitionMetadata::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(2 + 4 + 4)) { + ret = ERROR_KAFKA_CODEC_METADATA; + srs_error("kafka decode partition metadata failed. ret=%d", ret); + return ret; + } + error_code = buf->read_2bytes(); + partition_id = buf->read_4bytes(); + leader = buf->read_4bytes(); + + if ((ret = replicas.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode partition metadata replicas failed. ret=%d", ret); + return ret; + } + if ((ret = isr.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode partition metadata isr failed. ret=%d", ret); + return ret; + } + + return ret; +} + +SrsKafkaTopicMetadata::SrsKafkaTopicMetadata() +{ + error_code = 0; +} + +SrsKafkaTopicMetadata::~SrsKafkaTopicMetadata() +{ +} + +int SrsKafkaTopicMetadata::size() +{ + return 2 + name.size() + metadatas.size(); +} + +int SrsKafkaTopicMetadata::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(2)) { + ret = ERROR_KAFKA_CODEC_METADATA; + srs_error("kafka encode topic metadata failed. ret=%d", ret); + return ret; + } + buf->write_2bytes(error_code); + + if ((ret = name.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode topic name failed. ret=%d", ret); + return ret; + } + + if ((ret = metadatas.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode topic metadatas failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsKafkaTopicMetadata::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(2)) { + ret = ERROR_KAFKA_CODEC_METADATA; + srs_error("kafka decode topic metadata failed. ret=%d", ret); + return ret; + } + error_code = buf->read_2bytes(); + + if ((ret = name.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode topic name failed. ret=%d", ret); + return ret; + } + + if ((ret = metadatas.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode topic metadatas failed. ret=%d", ret); + return ret; + } + + return ret; +} + SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse() { } @@ -611,8 +805,7 @@ SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse() int SrsKafkaTopicMetadataResponse::size() { - // TODO: FIXME: implements it. - return SrsKafkaResponse::size(); + return SrsKafkaResponse::size() + brokers.size() + metadatas.size(); } int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf) @@ -624,7 +817,16 @@ int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf) return ret; } - // TODO: FIXME: implements it. + if ((ret = brokers.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode metadata brokers failed. ret=%d", ret); + return ret; + } + + if ((ret = metadatas.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode metadatas failed. ret=%d", ret); + return ret; + } + return ret; } @@ -637,7 +839,16 @@ int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf) return ret; } - // TODO: FIXME: implements it. + if ((ret = brokers.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode metadata brokers failed. ret=%d", ret); + return ret; + } + + if ((ret = metadatas.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode metadatas failed. ret=%d", ret); + return ret; + } + return ret; } diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 36c5e7fe7..714f8220d 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -190,7 +190,7 @@ public: srs_error("kafka decode array failed. ret=%d", ret); return ret; } - length = buf->read_2bytes(); + length = buf->read_4bytes(); for (int i = 0; i < length; i++) { T* elem = new T(); @@ -206,6 +206,78 @@ public: return ret; } }; +template<> +class SrsKafkaArray : public ISrsCodec +{ +private: + int32_t length; + std::vector elems; + typedef std::vector::iterator SrsIterator; +public: + SrsKafkaArray() + { + length = 0; + } + virtual ~SrsKafkaArray() + { + elems.clear(); + } +public: + virtual void append(int32_t elem) + { + length++; + elems.push_back(elem); + } + // interface ISrsCodec +public: + virtual int size() + { + return 4 + sizeof(int32_t) * (int)elems.size(); + } + virtual int encode(SrsBuffer* buf) + { + int ret = ERROR_SUCCESS; + + if (!buf->require(4 + sizeof(int32_t) * (int)elems.size())) { + ret = ERROR_KAFKA_CODEC_ARRAY; + srs_error("kafka encode array failed. ret=%d", ret); + return ret; + } + buf->write_4bytes(length); + + for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { + int32_t elem = *it; + buf->write_4bytes(elem); + } + + return ret; + } + virtual int decode(SrsBuffer* buf) + { + int ret = ERROR_SUCCESS; + + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_ARRAY; + srs_error("kafka decode array failed. ret=%d", ret); + return ret; + } + length = buf->read_4bytes(); + + for (int i = 0; i < length; i++) { + if (!buf->require(sizeof(int32_t))) { + ret = ERROR_KAFKA_CODEC_ARRAY; + srs_error("kafka decode array elem failed. ret=%d", ret); + return ret; + + } + + int32_t elem = buf->read_4bytes(); + elems.push_back(elem); + } + + return ret; + } +}; /** * the header of request, includes the size of request. @@ -532,6 +604,58 @@ public: virtual int decode(SrsBuffer* buf); }; +/** + * the metadata response data. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse + */ +struct SrsKafkaBroker : public ISrsCodec +{ +public: + int32_t node_id; + SrsKafkaString host; + int32_t port; +public: + SrsKafkaBroker(); + virtual ~SrsKafkaBroker(); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); +}; +struct SrsKafkaPartitionMetadata : public ISrsCodec +{ +public: + int16_t error_code; + int32_t partition_id; + int32_t leader; + SrsKafkaArray replicas; + SrsKafkaArray isr; +public: + SrsKafkaPartitionMetadata(); + virtual ~SrsKafkaPartitionMetadata(); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); +}; +struct SrsKafkaTopicMetadata : public ISrsCodec +{ +public: + int16_t error_code; + SrsKafkaString name; + SrsKafkaArray metadatas; +public: + SrsKafkaTopicMetadata(); + virtual ~SrsKafkaTopicMetadata(); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); +}; + /** * response for the metadata request from broker. * The response contains metadata for each partition, @@ -542,6 +666,9 @@ public: */ class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse { +private: + SrsKafkaArray brokers; + SrsKafkaArray metadatas; public: SrsKafkaTopicMetadataResponse(); virtual ~SrsKafkaTopicMetadataResponse();