diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index 144737913..73ddec2cd 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -371,7 +371,7 @@ int SrsKafkaRequestHeader::decode(SrsBuffer* buf) SrsKafkaResponseHeader::SrsKafkaResponseHeader() { _size = 0; - correlation_id = 0; + _correlation_id = 0; } SrsKafkaResponseHeader::~SrsKafkaResponseHeader() @@ -398,6 +398,11 @@ void SrsKafkaResponseHeader::set_total_size(int s) _size = s - 4; } +int32_t SrsKafkaResponseHeader::correlation_id() +{ + return _correlation_id; +} + int SrsKafkaResponseHeader::size() { return 4 + header_size(); @@ -414,7 +419,7 @@ int SrsKafkaResponseHeader::encode(SrsBuffer* buf) } buf->write_4bytes(_size); - buf->write_4bytes(correlation_id); + buf->write_4bytes(_correlation_id); return ret; } @@ -440,7 +445,7 @@ int SrsKafkaResponseHeader::decode(SrsBuffer* buf) srs_error("kafka decode response message failed. ret=%d", ret); return ret; } - correlation_id = buf->read_4bytes(); + _correlation_id = buf->read_4bytes(); return ret; } @@ -658,17 +663,31 @@ int32_t SrsKafkaCorrelationPool::generate_correlation_id() return cid++; } -void SrsKafkaCorrelationPool::set(int32_t correlation_id, SrsKafkaApiKey request) +SrsKafkaApiKey SrsKafkaCorrelationPool::set(int32_t correlation_id, SrsKafkaApiKey request) { + SrsKafkaApiKey previous = SrsKafkaApiKeyUnknown; + + std::map::iterator it = correlation_ids.find(correlation_id); + if (it != correlation_ids.end()) { + previous = it->second; + } + correlation_ids[correlation_id] = request; + + return previous; } -void SrsKafkaCorrelationPool::unset(int32_t correlation_id) +SrsKafkaApiKey SrsKafkaCorrelationPool::unset(int32_t correlation_id) { std::map::iterator it = correlation_ids.find(correlation_id); + if (it != correlation_ids.end()) { + SrsKafkaApiKey key = it->second; correlation_ids.erase(it); + return key; } + + return SrsKafkaApiKeyUnknown; } SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id) @@ -742,27 +761,72 @@ int SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg) int ret = ERROR_SUCCESS; - SrsKafkaResponseHeader header; - while (reader->size() < header.size()) { + while (true) { + SrsKafkaResponseHeader header; + + // ensure enough bytes for response header. if ((ret = reader->grow(skt, header.size())) != ERROR_SUCCESS) { srs_error("kafka recv message failed. ret=%d", ret); return ret; } + + // decode response header. + SrsBuffer buffer; + if ((ret = buffer.initialize(reader->bytes(), reader->size())) != ERROR_SUCCESS) { + return ret; + } + + SrsBuffer* buf = &buffer; + if ((ret = header.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode response header failed. ret=%d", ret); + return ret; + } + + // skip the used buffer for header. + buf->skip(-1 * buf->pos()); + + // fetch cached api key. + SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance(); + SrsKafkaApiKey key = pool->unset(header.correlation_id()); + srs_trace("kafka got %d bytes response, key=%d", header.total_size(), header.correlation_id()); + + // create message by cached api key. + SrsKafkaResponse* res = NULL; + switch (key) { + case SrsKafkaApiKeyMetadataRequest: + srs_info("kafka got metadata response"); + res = new SrsKafkaTopicMetadataResponse(); + break; + case SrsKafkaApiKeyUnknown: + default: + break; + } + + // ensure enough bytes to decode message. + if ((ret = reader->grow(skt, header.total_size())) != ERROR_SUCCESS) { + srs_freep(res); + srs_error("kafka recv message body failed. ret=%d", ret); + return ret; + } + + // dropped message, fetch next. + if (!res) { + reader->skip(header.total_size()); + srs_warn("kafka ignore unknown message, size=%d.", header.total_size()); + continue; + } + + // parse the whole message. + if ((ret = res->decode(buf)) != ERROR_SUCCESS) { + srs_freep(res); + srs_error("kafka decode message failed. ret=%d", ret); + return ret; + } + + *pmsg = res; + break; } - SrsBuffer buffer; - if ((ret = buffer.initialize(reader->bytes(), reader->size())) != ERROR_SUCCESS) { - return ret; - } - - SrsBuffer* buf = &buffer; - if ((ret = header.decode(buf)) != ERROR_SUCCESS) { - srs_error("kafka decode response header failed. ret=%d", ret); - return ret; - } - - // TODO: FIXME: decode message. - return ret; } diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 986f03b31..36c5e7fe7 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -339,7 +339,7 @@ private: * the response by the server, unmodified. It is useful for matching * request and response between the client and server. */ - int32_t correlation_id; + int32_t _correlation_id; public: SrsKafkaResponseHeader(); virtual ~SrsKafkaResponseHeader(); @@ -361,8 +361,9 @@ private: * the size of message, the bytes left after the header. */ virtual int message_size(); +public: /** - * the total size of the request, includes the 4B size. + * the total size of the request, includes the 4B size and message body. */ virtual int total_size(); public: @@ -371,6 +372,10 @@ public: * @param s the whole message, including the 4 bytes size size. */ virtual void set_total_size(int s); + /** + * get the correlation id of response message. + */ + virtual int32_t correlation_id(); // interface ISrsCodec public: virtual int size(); @@ -567,9 +572,23 @@ private: public: virtual ~SrsKafkaCorrelationPool(); public: + /** + * generate a global correlation id. + */ virtual int32_t generate_correlation_id(); - virtual void set(int32_t correlation_id, SrsKafkaApiKey request); - virtual void unset(int32_t correlation_id); + /** + * set the correlation id to specified request key. + */ + virtual SrsKafkaApiKey set(int32_t correlation_id, SrsKafkaApiKey request); + /** + * unset the correlation id. + * @return the previous api key; unknown if not set. + */ + virtual SrsKafkaApiKey unset(int32_t correlation_id); + /** + * get the key by specified correlation id. + * @return the specified api key; unknown if no correlation id. + */ virtual SrsKafkaApiKey get(int32_t correlation_id); };