diff --git a/trunk/src/kernel/srs_kernel_buffer.cpp b/trunk/src/kernel/srs_kernel_buffer.cpp index 2c998b198..a4b58e197 100644 --- a/trunk/src/kernel/srs_kernel_buffer.cpp +++ b/trunk/src/kernel/srs_kernel_buffer.cpp @@ -29,6 +29,14 @@ using namespace std; #include #include +ISrsCodec::ISrsCodec() +{ +} + +ISrsCodec::~ISrsCodec() +{ +} + SrsBuffer::SrsBuffer() { p = bytes = NULL; diff --git a/trunk/src/kernel/srs_kernel_buffer.hpp b/trunk/src/kernel/srs_kernel_buffer.hpp index f20d57933..f79e08f33 100644 --- a/trunk/src/kernel/srs_kernel_buffer.hpp +++ b/trunk/src/kernel/srs_kernel_buffer.hpp @@ -33,11 +33,60 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +class SrsBuffer; + /** -* bytes utility, used to: -* convert basic types to bytes, -* build basic types from bytes. -*/ + * the srs codec, to code and decode object with bytes: + * code: to encode/serialize object to bytes in buffer, + * decode: to decode/deserialize object from bytes in buffer. + * we use SrsBuffer as bytes helper utility, + * for example, to code: + * ISrsCodec* obj = ... + * char* bytes = new char[obj->size()]; + * + * SrsBuffer* buf = new SrsBuffer(); + * buf->initialize(bytes, obj->size()) + * + * obj->encode(buf); + * for example, to decode: + * int nb_bytes = ... + * char* bytes = ... + * + * SrsBuffer* buf = new Srsbuffer(); + * buf->initialize(bytes, nb_bytes); + * + * ISrsCodec* obj = ... + * obj->decode(buf); + * @remark protocol or amf0 or json should implements this interface. + */ +// TODO: FIXME: protocol, amf0, json should implements it. +class ISrsCodec +{ +public: + ISrsCodec(); + virtual ~ISrsCodec(); +public: + /** + * get the size of object to encode object to bytes. + */ + virtual int size() = 0; + /** + * encode object to bytes in SrsBuffer. + */ + virtual int encode(SrsBuffer* buf) = 0; +public: + /** + * decode object from bytes in SrsBuffer. + */ + virtual int decode(SrsBuffer* buf) = 0; +}; + +/** + * bytes utility, used to: + * convert basic types to bytes, + * build basic types from bytes. + * @remark the buffer never mange the bytes, user must manage it. + */ class SrsBuffer { private: @@ -157,7 +206,8 @@ public: }; /** - * the bit stream. + * the bit stream, base on SrsBuffer, + * for exmaple, the h.264 avc stream is bit stream. */ class SrsBitBuffer { diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index efc8e89f3..67ee28907 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -28,6 +28,7 @@ using namespace std; #include #include +#include #ifdef SRS_AUTO_KAFKA @@ -103,7 +104,7 @@ int SrsKafkaBytes::total_size() SrsKafkaRequestHeader::SrsKafkaRequestHeader() { - size = 0; + _size = 0; api_key = api_version = 0; correlation_id = 0; client_id = new SrsKafkaString(); @@ -121,12 +122,12 @@ int SrsKafkaRequestHeader::header_size() int SrsKafkaRequestHeader::message_size() { - return size - header_size(); + return _size - header_size(); } int SrsKafkaRequestHeader::total_size() { - return 4 + size; + return 4 + _size; } bool SrsKafkaRequestHeader::is_producer_request() @@ -169,9 +170,28 @@ void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key) api_key = (int16_t)key; } +int SrsKafkaRequestHeader::size() +{ + return 4 + _size; +} + +int SrsKafkaRequestHeader::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +int SrsKafkaRequestHeader::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + SrsKafkaResponseHeader::SrsKafkaResponseHeader() { - size = 0; + _size = 0; correlation_id = 0; } @@ -186,12 +206,31 @@ int SrsKafkaResponseHeader::header_size() int SrsKafkaResponseHeader::message_size() { - return size - header_size(); + return _size - header_size(); } int SrsKafkaResponseHeader::total_size() { - return 4 + size; + return 4 + _size; +} + +int SrsKafkaResponseHeader::size() +{ + return 4 + _size; +} + +int SrsKafkaResponseHeader::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +int SrsKafkaResponseHeader::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; } SrsKafkaRawMessage::SrsKafkaRawMessage() @@ -233,6 +272,21 @@ SrsKafkaRequest::~SrsKafkaRequest() { } +int SrsKafkaRequest::size() +{ + return header.size(); +} + +int SrsKafkaRequest::encode(SrsBuffer* buf) +{ + return header.encode(buf); +} + +int SrsKafkaRequest::decode(SrsBuffer* buf) +{ + return header.decode(buf); +} + SrsKafkaResponse::SrsKafkaResponse() { } @@ -241,6 +295,21 @@ SrsKafkaResponse::~SrsKafkaResponse() { } +int SrsKafkaResponse::size() +{ + return header.size(); +} + +int SrsKafkaResponse::encode(SrsBuffer* buf) +{ + return header.encode(buf); +} + +int SrsKafkaResponse::decode(SrsBuffer* buf) +{ + return header.decode(buf); +} + SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest() { header.set_api_key(SrsKafkaApiKeyMetadataRequest); @@ -255,6 +324,27 @@ void SrsKafkaTopicMetadataRequest::add_topic(string topic) topics.append(new SrsKafkaString(topic)); } +int SrsKafkaTopicMetadataRequest::size() +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +int SrsKafkaTopicMetadataRequest::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +int SrsKafkaTopicMetadataRequest::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse() { } @@ -263,6 +353,27 @@ SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse() { } +int SrsKafkaTopicMetadataResponse::size() +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io) { skt = io; @@ -296,10 +407,14 @@ int SrsKafkaClient::fetch_metadata(string topic) int ret = ERROR_SUCCESS; SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest(); - SrsAutoFree(SrsKafkaTopicMetadataRequest, req); req->add_topic(topic); + if ((ret = protocol->send_and_free_message(req)) != ERROR_SUCCESS) { + srs_error("kafka send message failed. ret=%d", ret); + return ret; + } + // TODO: FIXME: implements it. return ret; diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 92d30d50e..871403aab 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -32,6 +32,8 @@ #include #include +#include + class ISrsProtocolReaderWriter; #ifdef SRS_AUTO_KAFKA @@ -136,7 +138,7 @@ public: * the header of request, includes the size of request. * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests */ -class SrsKafkaRequestHeader +class SrsKafkaRequestHeader : public ISrsCodec { private: /** @@ -145,7 +147,7 @@ private: * size as an integer N, and then reading and parsing the subsequent N bytes * of the request. */ - int32_t size; + int32_t _size; private: /** * This is a numeric id for the API being invoked (i.e. is it @@ -179,13 +181,13 @@ private: public: SrsKafkaRequestHeader(); virtual ~SrsKafkaRequestHeader(); -public: +private: /** * the layout of request: * +-----------+----------------------------------+ - * | 4B size | [size] bytes | + * | 4B _size | [_size] bytes | * +-----------+------------+---------------------+ - * | 4B size | header | message | + * | 4B _size | header | message | * +-----------+------------+---------------------+ * | total size = 4 + header + message | * +----------------------------------------------+ @@ -215,6 +217,11 @@ public: virtual bool is_consumer_metadata_request(); // set the api key. virtual void set_api_key(SrsKafkaApiKey key); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** @@ -223,7 +230,7 @@ public: * send a MetadataResponse in return to a MetadataRequest). * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Responses */ -class SrsKafkaResponseHeader +class SrsKafkaResponseHeader : public ISrsCodec { private: /** @@ -232,7 +239,7 @@ private: * size as an integer N, and then reading and parsing the subsequent N bytes * of the request. */ - int32_t size; + int32_t _size; private: /** * This is a user-supplied integer. It will be passed back in @@ -243,13 +250,13 @@ private: public: SrsKafkaResponseHeader(); virtual ~SrsKafkaResponseHeader(); -public: +private: /** * the layout of response: * +-----------+----------------------------------+ - * | 4B size | [size] bytes | + * | 4B _size | [_size] bytes | * +-----------+------------+---------------------+ - * | 4B size | 4B header | message | + * | 4B _size | 4B header | message | * +-----------+------------+---------------------+ * | total size = 4 + 4 + message | * +----------------------------------------------+ @@ -265,6 +272,11 @@ public: * the total size of the request, includes the 4B size. */ virtual int total_size(); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** @@ -335,21 +347,35 @@ public: /** * the kafka request message, for protocol to send. */ -class SrsKafkaRequest +class SrsKafkaRequest : public ISrsCodec { +protected: + SrsKafkaRequestHeader header; public: SrsKafkaRequest(); virtual ~SrsKafkaRequest(); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** * the kafka response message, for protocol to recv. */ -class SrsKafkaResponse +class SrsKafkaResponse : public ISrsCodec { +protected: + SrsKafkaResponseHeader header; public: SrsKafkaResponse(); virtual ~SrsKafkaResponse(); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** @@ -369,13 +395,17 @@ public: class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest { private: - SrsKafkaRequestHeader header; SrsKafkaArray topics; public: SrsKafkaTopicMetadataRequest(); virtual ~SrsKafkaTopicMetadataRequest(); public: virtual void add_topic(std::string topic); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** @@ -388,11 +418,14 @@ public: */ class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse { -private: - SrsKafkaResponseHeader header; public: SrsKafkaTopicMetadataResponse(); virtual ~SrsKafkaTopicMetadataResponse(); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index 0336f748e..8fc391d77 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -654,12 +654,12 @@ int SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size) int nbh = 0; if (p == payload) { nbh = srs_chunk_header_c0( - mh->perfer_cid, mh->timestamp, mh->payload_length, + mh->perfer_cid, (u_int32_t)mh->timestamp, mh->payload_length, mh->message_type, mh->stream_id, c0c3, sizeof(c0c3)); } else { nbh = srs_chunk_header_c3( - mh->perfer_cid, mh->timestamp, + mh->perfer_cid, (u_int32_t)mh->timestamp, c0c3, sizeof(c0c3)); } srs_assert(nbh > 0);; @@ -668,7 +668,7 @@ int SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size) iovs[0].iov_base = c0c3; iovs[0].iov_len = nbh; - int payload_size = srs_min(end - p, out_chunk_size); + int payload_size = srs_min((int)(end - p), out_chunk_size); iovs[1].iov_base = p; iovs[1].iov_len = payload_size; p += payload_size;