extract ISrsCodec for code and decode object.

pull/510/head
winlin 9 years ago
parent 45755fd1e7
commit 9117e1e918

@ -29,6 +29,14 @@ using namespace std;
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp> #include <srs_kernel_utility.hpp>
ISrsCodec::ISrsCodec()
{
}
ISrsCodec::~ISrsCodec()
{
}
SrsBuffer::SrsBuffer() SrsBuffer::SrsBuffer()
{ {
p = bytes = NULL; p = bytes = NULL;

@ -33,11 +33,60 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <sys/types.h> #include <sys/types.h>
#include <string> #include <string>
class SrsBuffer;
/** /**
* bytes utility, used to: * the srs codec, to code and decode object with bytes:
* convert basic types to bytes, * code: to encode/serialize object to bytes in buffer,
* build basic types from bytes. * decode: to decode/deserialize object from bytes in buffer.
*/ * we use SrsBuffer as bytes helper utility,
* for example, to code:
* ISrsCodec* obj = ...
* char* bytes = new char[obj->size()];
*
* SrsBuffer* buf = new SrsBuffer();
* buf->initialize(bytes, obj->size())
*
* obj->encode(buf);
* for example, to decode:
* int nb_bytes = ...
* char* bytes = ...
*
* SrsBuffer* buf = new Srsbuffer();
* buf->initialize(bytes, nb_bytes);
*
* ISrsCodec* obj = ...
* obj->decode(buf);
* @remark protocol or amf0 or json should implements this interface.
*/
// TODO: FIXME: protocol, amf0, json should implements it.
class ISrsCodec
{
public:
ISrsCodec();
virtual ~ISrsCodec();
public:
/**
* get the size of object to encode object to bytes.
*/
virtual int size() = 0;
/**
* encode object to bytes in SrsBuffer.
*/
virtual int encode(SrsBuffer* buf) = 0;
public:
/**
* decode object from bytes in SrsBuffer.
*/
virtual int decode(SrsBuffer* buf) = 0;
};
/**
* bytes utility, used to:
* convert basic types to bytes,
* build basic types from bytes.
* @remark the buffer never mange the bytes, user must manage it.
*/
class SrsBuffer class SrsBuffer
{ {
private: private:
@ -157,7 +206,8 @@ public:
}; };
/** /**
* the bit stream. * the bit stream, base on SrsBuffer,
* for exmaple, the h.264 avc stream is bit stream.
*/ */
class SrsBitBuffer class SrsBitBuffer
{ {

@ -28,6 +28,7 @@ using namespace std;
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_core_autofree.hpp> #include <srs_core_autofree.hpp>
#include <srs_kernel_log.hpp>
#ifdef SRS_AUTO_KAFKA #ifdef SRS_AUTO_KAFKA
@ -103,7 +104,7 @@ int SrsKafkaBytes::total_size()
SrsKafkaRequestHeader::SrsKafkaRequestHeader() SrsKafkaRequestHeader::SrsKafkaRequestHeader()
{ {
size = 0; _size = 0;
api_key = api_version = 0; api_key = api_version = 0;
correlation_id = 0; correlation_id = 0;
client_id = new SrsKafkaString(); client_id = new SrsKafkaString();
@ -121,12 +122,12 @@ int SrsKafkaRequestHeader::header_size()
int SrsKafkaRequestHeader::message_size() int SrsKafkaRequestHeader::message_size()
{ {
return size - header_size(); return _size - header_size();
} }
int SrsKafkaRequestHeader::total_size() int SrsKafkaRequestHeader::total_size()
{ {
return 4 + size; return 4 + _size;
} }
bool SrsKafkaRequestHeader::is_producer_request() bool SrsKafkaRequestHeader::is_producer_request()
@ -169,9 +170,28 @@ void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key)
api_key = (int16_t)key; api_key = (int16_t)key;
} }
int SrsKafkaRequestHeader::size()
{
return 4 + _size;
}
int SrsKafkaRequestHeader::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
int SrsKafkaRequestHeader::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
SrsKafkaResponseHeader::SrsKafkaResponseHeader() SrsKafkaResponseHeader::SrsKafkaResponseHeader()
{ {
size = 0; _size = 0;
correlation_id = 0; correlation_id = 0;
} }
@ -186,12 +206,31 @@ int SrsKafkaResponseHeader::header_size()
int SrsKafkaResponseHeader::message_size() int SrsKafkaResponseHeader::message_size()
{ {
return size - header_size(); return _size - header_size();
} }
int SrsKafkaResponseHeader::total_size() int SrsKafkaResponseHeader::total_size()
{ {
return 4 + size; return 4 + _size;
}
int SrsKafkaResponseHeader::size()
{
return 4 + _size;
}
int SrsKafkaResponseHeader::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
int SrsKafkaResponseHeader::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
} }
SrsKafkaRawMessage::SrsKafkaRawMessage() SrsKafkaRawMessage::SrsKafkaRawMessage()
@ -233,6 +272,21 @@ SrsKafkaRequest::~SrsKafkaRequest()
{ {
} }
int SrsKafkaRequest::size()
{
return header.size();
}
int SrsKafkaRequest::encode(SrsBuffer* buf)
{
return header.encode(buf);
}
int SrsKafkaRequest::decode(SrsBuffer* buf)
{
return header.decode(buf);
}
SrsKafkaResponse::SrsKafkaResponse() SrsKafkaResponse::SrsKafkaResponse()
{ {
} }
@ -241,6 +295,21 @@ SrsKafkaResponse::~SrsKafkaResponse()
{ {
} }
int SrsKafkaResponse::size()
{
return header.size();
}
int SrsKafkaResponse::encode(SrsBuffer* buf)
{
return header.encode(buf);
}
int SrsKafkaResponse::decode(SrsBuffer* buf)
{
return header.decode(buf);
}
SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest() SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest()
{ {
header.set_api_key(SrsKafkaApiKeyMetadataRequest); header.set_api_key(SrsKafkaApiKeyMetadataRequest);
@ -255,6 +324,27 @@ void SrsKafkaTopicMetadataRequest::add_topic(string topic)
topics.append(new SrsKafkaString(topic)); topics.append(new SrsKafkaString(topic));
} }
int SrsKafkaTopicMetadataRequest::size()
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
int SrsKafkaTopicMetadataRequest::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
int SrsKafkaTopicMetadataRequest::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse() SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse()
{ {
} }
@ -263,6 +353,27 @@ SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse()
{ {
} }
int SrsKafkaTopicMetadataResponse::size()
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io) SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io)
{ {
skt = io; skt = io;
@ -296,10 +407,14 @@ int SrsKafkaClient::fetch_metadata(string topic)
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest(); SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest();
SrsAutoFree(SrsKafkaTopicMetadataRequest, req);
req->add_topic(topic); req->add_topic(topic);
if ((ret = protocol->send_and_free_message(req)) != ERROR_SUCCESS) {
srs_error("kafka send message failed. ret=%d", ret);
return ret;
}
// TODO: FIXME: implements it. // TODO: FIXME: implements it.
return ret; return ret;

@ -32,6 +32,8 @@
#include <vector> #include <vector>
#include <string> #include <string>
#include <srs_kernel_buffer.hpp>
class ISrsProtocolReaderWriter; class ISrsProtocolReaderWriter;
#ifdef SRS_AUTO_KAFKA #ifdef SRS_AUTO_KAFKA
@ -136,7 +138,7 @@ public:
* the header of request, includes the size of request. * the header of request, includes the size of request.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
*/ */
class SrsKafkaRequestHeader class SrsKafkaRequestHeader : public ISrsCodec
{ {
private: private:
/** /**
@ -145,7 +147,7 @@ private:
* size as an integer N, and then reading and parsing the subsequent N bytes * size as an integer N, and then reading and parsing the subsequent N bytes
* of the request. * of the request.
*/ */
int32_t size; int32_t _size;
private: private:
/** /**
* This is a numeric id for the API being invoked (i.e. is it * This is a numeric id for the API being invoked (i.e. is it
@ -179,13 +181,13 @@ private:
public: public:
SrsKafkaRequestHeader(); SrsKafkaRequestHeader();
virtual ~SrsKafkaRequestHeader(); virtual ~SrsKafkaRequestHeader();
public: private:
/** /**
* the layout of request: * the layout of request:
* +-----------+----------------------------------+ * +-----------+----------------------------------+
* | 4B size | [size] bytes | * | 4B _size | [_size] bytes |
* +-----------+------------+---------------------+ * +-----------+------------+---------------------+
* | 4B size | header | message | * | 4B _size | header | message |
* +-----------+------------+---------------------+ * +-----------+------------+---------------------+
* | total size = 4 + header + message | * | total size = 4 + header + message |
* +----------------------------------------------+ * +----------------------------------------------+
@ -215,6 +217,11 @@ public:
virtual bool is_consumer_metadata_request(); virtual bool is_consumer_metadata_request();
// set the api key. // set the api key.
virtual void set_api_key(SrsKafkaApiKey key); virtual void set_api_key(SrsKafkaApiKey key);
// interface ISrsCodec
public:
virtual int size();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
}; };
/** /**
@ -223,7 +230,7 @@ public:
* send a MetadataResponse in return to a MetadataRequest). * send a MetadataResponse in return to a MetadataRequest).
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Responses * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Responses
*/ */
class SrsKafkaResponseHeader class SrsKafkaResponseHeader : public ISrsCodec
{ {
private: private:
/** /**
@ -232,7 +239,7 @@ private:
* size as an integer N, and then reading and parsing the subsequent N bytes * size as an integer N, and then reading and parsing the subsequent N bytes
* of the request. * of the request.
*/ */
int32_t size; int32_t _size;
private: private:
/** /**
* This is a user-supplied integer. It will be passed back in * This is a user-supplied integer. It will be passed back in
@ -243,13 +250,13 @@ private:
public: public:
SrsKafkaResponseHeader(); SrsKafkaResponseHeader();
virtual ~SrsKafkaResponseHeader(); virtual ~SrsKafkaResponseHeader();
public: private:
/** /**
* the layout of response: * the layout of response:
* +-----------+----------------------------------+ * +-----------+----------------------------------+
* | 4B size | [size] bytes | * | 4B _size | [_size] bytes |
* +-----------+------------+---------------------+ * +-----------+------------+---------------------+
* | 4B size | 4B header | message | * | 4B _size | 4B header | message |
* +-----------+------------+---------------------+ * +-----------+------------+---------------------+
* | total size = 4 + 4 + message | * | total size = 4 + 4 + message |
* +----------------------------------------------+ * +----------------------------------------------+
@ -265,6 +272,11 @@ public:
* the total size of the request, includes the 4B size. * the total size of the request, includes the 4B size.
*/ */
virtual int total_size(); virtual int total_size();
// interface ISrsCodec
public:
virtual int size();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
}; };
/** /**
@ -335,21 +347,35 @@ public:
/** /**
* the kafka request message, for protocol to send. * the kafka request message, for protocol to send.
*/ */
class SrsKafkaRequest class SrsKafkaRequest : public ISrsCodec
{ {
protected:
SrsKafkaRequestHeader header;
public: public:
SrsKafkaRequest(); SrsKafkaRequest();
virtual ~SrsKafkaRequest(); virtual ~SrsKafkaRequest();
// interface ISrsCodec
public:
virtual int size();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
}; };
/** /**
* the kafka response message, for protocol to recv. * the kafka response message, for protocol to recv.
*/ */
class SrsKafkaResponse class SrsKafkaResponse : public ISrsCodec
{ {
protected:
SrsKafkaResponseHeader header;
public: public:
SrsKafkaResponse(); SrsKafkaResponse();
virtual ~SrsKafkaResponse(); virtual ~SrsKafkaResponse();
// interface ISrsCodec
public:
virtual int size();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
}; };
/** /**
@ -369,13 +395,17 @@ public:
class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest
{ {
private: private:
SrsKafkaRequestHeader header;
SrsKafkaArray<SrsKafkaString*> topics; SrsKafkaArray<SrsKafkaString*> topics;
public: public:
SrsKafkaTopicMetadataRequest(); SrsKafkaTopicMetadataRequest();
virtual ~SrsKafkaTopicMetadataRequest(); virtual ~SrsKafkaTopicMetadataRequest();
public: public:
virtual void add_topic(std::string topic); virtual void add_topic(std::string topic);
// interface ISrsCodec
public:
virtual int size();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
}; };
/** /**
@ -388,11 +418,14 @@ public:
*/ */
class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse
{ {
private:
SrsKafkaResponseHeader header;
public: public:
SrsKafkaTopicMetadataResponse(); SrsKafkaTopicMetadataResponse();
virtual ~SrsKafkaTopicMetadataResponse(); virtual ~SrsKafkaTopicMetadataResponse();
// interface ISrsCodec
public:
virtual int size();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
}; };
/** /**

@ -654,12 +654,12 @@ int SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size)
int nbh = 0; int nbh = 0;
if (p == payload) { if (p == payload) {
nbh = srs_chunk_header_c0( nbh = srs_chunk_header_c0(
mh->perfer_cid, mh->timestamp, mh->payload_length, mh->perfer_cid, (u_int32_t)mh->timestamp, mh->payload_length,
mh->message_type, mh->stream_id, mh->message_type, mh->stream_id,
c0c3, sizeof(c0c3)); c0c3, sizeof(c0c3));
} else { } else {
nbh = srs_chunk_header_c3( nbh = srs_chunk_header_c3(
mh->perfer_cid, mh->timestamp, mh->perfer_cid, (u_int32_t)mh->timestamp,
c0c3, sizeof(c0c3)); c0c3, sizeof(c0c3));
} }
srs_assert(nbh > 0);; srs_assert(nbh > 0);;
@ -668,7 +668,7 @@ int SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size)
iovs[0].iov_base = c0c3; iovs[0].iov_base = c0c3;
iovs[0].iov_len = nbh; iovs[0].iov_len = nbh;
int payload_size = srs_min(end - p, out_chunk_size); int payload_size = srs_min((int)(end - p), out_chunk_size);
iovs[1].iov_base = p; iovs[1].iov_base = p;
iovs[1].iov_len = payload_size; iovs[1].iov_len = payload_size;
p += payload_size; p += payload_size;

Loading…
Cancel
Save