From 45755fd1e71cb9c3f8a7b43019ebe9e4a79d4559 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 16 Oct 2015 15:34:55 +0800 Subject: [PATCH] refine code for kakfa request/response, string/bytes. --- trunk/src/protocol/srs_kafka_stack.cpp | 63 ++++++++++++++++++++++++-- trunk/src/protocol/srs_kafka_stack.hpp | 50 ++++++++++++++++---- 2 files changed, 99 insertions(+), 14 deletions(-) diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index 1137f4add..efc8e89f3 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -27,6 +27,7 @@ using namespace std; #include +#include #ifdef SRS_AUTO_KAFKA @@ -36,6 +37,15 @@ SrsKafkaString::SrsKafkaString() data = NULL; } +SrsKafkaString::SrsKafkaString(string v) +{ + size = (int16_t)v.length(); + + srs_assert(size > 0); + data = new char[size]; + memcpy(data, v.data(), size); +} + SrsKafkaString::~SrsKafkaString() { srs_freep(data); @@ -62,6 +72,15 @@ SrsKafkaBytes::SrsKafkaBytes() data = NULL; } +SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v) +{ + size = (int16_t)nb_v; + + srs_assert(size > 0); + data = new char[size]; + memcpy(data, v, size); +} + SrsKafkaBytes::~SrsKafkaBytes() { srs_freep(data); @@ -175,7 +194,7 @@ int SrsKafkaResponseHeader::total_size() return 4 + size; } -SrsKafkaMessage::SrsKafkaMessage() +SrsKafkaRawMessage::SrsKafkaRawMessage() { offset = 0; message_size = 0; @@ -186,7 +205,7 @@ SrsKafkaMessage::SrsKafkaMessage() value = new SrsKafkaBytes(); } -SrsKafkaMessage::~SrsKafkaMessage() +SrsKafkaRawMessage::~SrsKafkaRawMessage() { srs_freep(key); srs_freep(value); @@ -198,14 +217,30 @@ SrsKafkaMessageSet::SrsKafkaMessageSet() SrsKafkaMessageSet::~SrsKafkaMessageSet() { - vector::iterator it; + vector::iterator it; for (it = messages.begin(); it != messages.end(); ++it) { - SrsKafkaMessage* message = *it; + SrsKafkaRawMessage* message = *it; srs_freep(message); } messages.clear(); } +SrsKafkaRequest::SrsKafkaRequest() +{ +} + +SrsKafkaRequest::~SrsKafkaRequest() +{ +} + +SrsKafkaResponse::SrsKafkaResponse() +{ +} + +SrsKafkaResponse::~SrsKafkaResponse() +{ +} + SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest() { header.set_api_key(SrsKafkaApiKeyMetadataRequest); @@ -215,6 +250,19 @@ SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest() { } +void SrsKafkaTopicMetadataRequest::add_topic(string topic) +{ + topics.append(new SrsKafkaString(topic)); +} + +SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse() +{ +} + +SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse() +{ +} + SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io) { skt = io; @@ -224,7 +272,7 @@ SrsKafkaProtocol::~SrsKafkaProtocol() { } -int SrsKafkaProtocol::send_and_free_message(SrsKafkaMessage* msg) +int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg) { int ret = ERROR_SUCCESS; @@ -247,6 +295,11 @@ int SrsKafkaClient::fetch_metadata(string topic) { int ret = ERROR_SUCCESS; + SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest(); + SrsAutoFree(SrsKafkaTopicMetadataRequest, req); + + req->add_topic(topic); + // TODO: FIXME: implements it. return ret; diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index bcb0b7ffa..92d30d50e 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -64,6 +64,7 @@ private: char* data; public: SrsKafkaString(); + SrsKafkaString(std::string v); virtual ~SrsKafkaString(); public: virtual bool null(); @@ -83,6 +84,7 @@ private: char* data; public: SrsKafkaBytes(); + SrsKafkaBytes(const char* v, int nb_v); virtual ~SrsKafkaBytes(); public: virtual bool null(); @@ -269,7 +271,7 @@ public: * the kafka message in message set. * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets */ -struct SrsKafkaMessage +struct SrsKafkaRawMessage { // metadata. public: @@ -313,8 +315,8 @@ public: */ SrsKafkaBytes* value; public: - SrsKafkaMessage(); - virtual ~SrsKafkaMessage(); + SrsKafkaRawMessage(); + virtual ~SrsKafkaRawMessage(); }; /** @@ -324,12 +326,32 @@ public: class SrsKafkaMessageSet { private: - std::vector messages; + std::vector messages; public: SrsKafkaMessageSet(); virtual ~SrsKafkaMessageSet(); }; +/** + * the kafka request message, for protocol to send. + */ +class SrsKafkaRequest +{ +public: + SrsKafkaRequest(); + virtual ~SrsKafkaRequest(); +}; + +/** + * the kafka response message, for protocol to recv. + */ +class SrsKafkaResponse +{ +public: + SrsKafkaResponse(); + virtual ~SrsKafkaResponse(); +}; + /** * request the metadata from broker. * This API answers the following questions: @@ -344,20 +366,30 @@ public: * * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI */ -class SrsKafkaTopicMetadataRequest +class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest { private: SrsKafkaRequestHeader header; - SrsKafkaArray request; + SrsKafkaArray topics; public: SrsKafkaTopicMetadataRequest(); virtual ~SrsKafkaTopicMetadataRequest(); +public: + virtual void add_topic(std::string topic); }; -class SrsKafkaTopicMetadataResponse +/** + * response for the metadata request from broker. + * The response contains metadata for each partition, + * with partitions grouped together by topic. This + * metadata refers to brokers by their broker id. + * The brokers each have a host and port. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse + */ +class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse { private: - SrsKafkaRequestHeader header; + SrsKafkaResponseHeader header; public: SrsKafkaTopicMetadataResponse(); virtual ~SrsKafkaTopicMetadataResponse(); @@ -378,7 +410,7 @@ public: * write the message to kafka server. * @param msg the msg to send. user must not free it again. */ - virtual int send_and_free_message(SrsKafkaMessage* msg); + virtual int send_and_free_message(SrsKafkaRequest* msg); }; /**