|
|
|
@ -64,7 +64,7 @@ bool SrsKafkaString::empty()
|
|
|
|
|
return _size <= 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaString::size()
|
|
|
|
|
int SrsKafkaString::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return _size == -1? 2 : 2 + _size;
|
|
|
|
|
}
|
|
|
|
@ -159,7 +159,7 @@ bool SrsKafkaBytes::empty()
|
|
|
|
|
return _size <= 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaBytes::size()
|
|
|
|
|
int SrsKafkaBytes::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return 4 + (_size == -1? 0 : _size);
|
|
|
|
|
}
|
|
|
|
@ -234,7 +234,7 @@ SrsKafkaRequestHeader::~SrsKafkaRequestHeader()
|
|
|
|
|
|
|
|
|
|
int SrsKafkaRequestHeader::header_size()
|
|
|
|
|
{
|
|
|
|
|
return 2 + 2 + 4 + client_id->size();
|
|
|
|
|
return 2 + 2 + 4 + client_id->nb_bytes();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaRequestHeader::message_size()
|
|
|
|
@ -307,7 +307,7 @@ bool SrsKafkaRequestHeader::is_consumer_metadata_request()
|
|
|
|
|
return _api_key == SrsKafkaApiKeyConsumerMetadataRequest;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaRequestHeader::size()
|
|
|
|
|
int SrsKafkaRequestHeader::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return 4 + header_size();
|
|
|
|
|
}
|
|
|
|
@ -403,7 +403,7 @@ int32_t SrsKafkaResponseHeader::correlation_id()
|
|
|
|
|
return _correlation_id;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaResponseHeader::size()
|
|
|
|
|
int SrsKafkaResponseHeader::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return 4 + header_size();
|
|
|
|
|
}
|
|
|
|
@ -505,9 +505,9 @@ SrsKafkaApiKey SrsKafkaRequest::api_key()
|
|
|
|
|
return header.api_key();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaRequest::size()
|
|
|
|
|
int SrsKafkaRequest::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return header.size();
|
|
|
|
|
return header.nb_bytes();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaRequest::encode(SrsBuffer* buf)
|
|
|
|
@ -533,9 +533,9 @@ void SrsKafkaResponse::update_header(int s)
|
|
|
|
|
header.set_total_size(s);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaResponse::size()
|
|
|
|
|
int SrsKafkaResponse::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return header.size();
|
|
|
|
|
return header.nb_bytes();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaResponse::encode(SrsBuffer* buf)
|
|
|
|
@ -562,9 +562,9 @@ void SrsKafkaTopicMetadataRequest::add_topic(string topic)
|
|
|
|
|
topics.append(new SrsKafkaString(topic));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaTopicMetadataRequest::size()
|
|
|
|
|
int SrsKafkaTopicMetadataRequest::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return SrsKafkaRequest::size() + topics.size();
|
|
|
|
|
return SrsKafkaRequest::nb_bytes() + topics.nb_bytes();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaTopicMetadataRequest::encode(SrsBuffer* buf)
|
|
|
|
@ -610,9 +610,9 @@ SrsKafkaBroker::~SrsKafkaBroker()
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaBroker::size()
|
|
|
|
|
int SrsKafkaBroker::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return 4 + host.size() + 4;
|
|
|
|
|
return 4 + host.nb_bytes() + 4;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaBroker::encode(SrsBuffer* buf)
|
|
|
|
@ -678,9 +678,9 @@ SrsKafkaPartitionMetadata::~SrsKafkaPartitionMetadata()
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaPartitionMetadata::size()
|
|
|
|
|
int SrsKafkaPartitionMetadata::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return 2 + 4 + 4 + replicas.size() + isr.size();
|
|
|
|
|
return 2 + 4 + 4 + replicas.nb_bytes() + isr.nb_bytes();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaPartitionMetadata::encode(SrsBuffer* buf)
|
|
|
|
@ -742,9 +742,9 @@ SrsKafkaTopicMetadata::~SrsKafkaTopicMetadata()
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaTopicMetadata::size()
|
|
|
|
|
int SrsKafkaTopicMetadata::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return 2 + name.size() + metadatas.size();
|
|
|
|
|
return 2 + name.nb_bytes() + metadatas.nb_bytes();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaTopicMetadata::encode(SrsBuffer* buf)
|
|
|
|
@ -803,9 +803,9 @@ SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse()
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaTopicMetadataResponse::size()
|
|
|
|
|
int SrsKafkaTopicMetadataResponse::nb_bytes()
|
|
|
|
|
{
|
|
|
|
|
return SrsKafkaResponse::size() + brokers.size() + metadatas.size();
|
|
|
|
|
return SrsKafkaResponse::nb_bytes() + brokers.nb_bytes() + metadatas.nb_bytes();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf)
|
|
|
|
@ -928,7 +928,7 @@ int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg)
|
|
|
|
|
// TODO: FIXME: refine for performance issue.
|
|
|
|
|
SrsAutoFree(SrsKafkaRequest, msg);
|
|
|
|
|
|
|
|
|
|
int size = msg->size();
|
|
|
|
|
int size = msg->nb_bytes();
|
|
|
|
|
if (size <= 0) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -976,7 +976,7 @@ int SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg)
|
|
|
|
|
SrsKafkaResponseHeader header;
|
|
|
|
|
|
|
|
|
|
// ensure enough bytes for response header.
|
|
|
|
|
if ((ret = reader->grow(skt, header.size())) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = reader->grow(skt, header.nb_bytes())) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("kafka recv message failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -1077,7 +1077,7 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse**
|
|
|
|
|
vector<string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr)
|
|
|
|
|
{
|
|
|
|
|
vector<string> strs;
|
|
|
|
|
for (int i = 0; i < arr->size(); i++) {
|
|
|
|
|
for (int i = 0; i < arr->nb_bytes(); i++) {
|
|
|
|
|
}
|
|
|
|
|
return strs;
|
|
|
|
|
}
|
|
|
|
|