diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 0b7b51846..d1744e4f2 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "153" +#define VERSION_REVISION "154" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "SRS" diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp index 2dbc6249e..253d090dd 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp @@ -297,8 +297,7 @@ SrsProtocol::AckWindowSize::AckWindowSize() SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io) { - buffer = new SrsBuffer(); - decode_stream = new SrsStream(); + in_buffer = new SrsBuffer(); skt = io; in_chunk_size = out_chunk_size = RTMP_DEFAULT_CHUNK_SIZE; @@ -317,8 +316,7 @@ SrsProtocol::~SrsProtocol() chunk_streams.clear(); } - srs_freep(decode_stream); - srs_freep(buffer); + srs_freep(in_buffer); } void SrsProtocol::set_recv_timeout(int64_t timeout_us) @@ -406,10 +404,12 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket) srs_assert(msg != NULL); srs_assert(msg->payload != NULL); srs_assert(msg->size > 0); + + SrsStream stream; // initialize the decode stream for all message, // it's ok for the initialize if fast and without memory copy. - if ((ret = decode_stream->initialize((char*)(msg->payload), msg->size)) != ERROR_SUCCESS) { + if ((ret = stream.initialize((char*)(msg->payload), msg->size)) != ERROR_SUCCESS) { srs_error("initialize stream failed. ret=%d", ret); return ret; } @@ -417,7 +417,7 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket) // decode the packet. SrsPacket* packet = NULL; - if ((ret = do_decode_message(msg->header, decode_stream, &packet)) != ERROR_SUCCESS) { + if ((ret = do_decode_message(msg->header, &stream, &packet)) != ERROR_SUCCESS) { srs_freep(packet); return ret; } @@ -445,6 +445,8 @@ int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet) // p set to current write position, // it's ok when payload is NULL and size is 0. char* p = (char*)msg->payload; + // to directly set the field. + char* pp = NULL; // always write the header event payload is empty. do { @@ -871,14 +873,14 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) int ret = ERROR_SUCCESS; int required_size = 1; - if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); } return ret; } - char* p = buffer->bytes(); + char* p = in_buffer->bytes(); fmt = (*p >> 6) & 0x03; cid = *p & 0x3f; @@ -891,7 +893,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) if (cid == 0) { required_size = 2; - if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); } @@ -904,7 +906,7 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid); } else if (cid == 1) { required_size = 3; - if ((ret = buffer->grow(skt, 3)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, 3)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); } @@ -997,13 +999,13 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size); int required_size = bh_size + mh_size; - if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); } return ret; } - char* p = buffer->bytes() + bh_size; + char* p = in_buffer->bytes() + bh_size; /** * parse the message header. @@ -1107,7 +1109,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz mh_size += 4; required_size = bh_size + mh_size; srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); - if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); } @@ -1210,7 +1212,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh // empty message if (chunk->header.payload_length <= 0) { // need erase the header in buffer. - buffer->erase(bh_size + mh_size); + in_buffer->erase(bh_size + mh_size); srs_trace("get an empty RTMP " "message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type, @@ -1238,14 +1240,14 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh // read payload to buffer int required_size = bh_size + mh_size + payload_size; - if ((ret = buffer->grow(skt, required_size)) != ERROR_SUCCESS) { + if ((ret = in_buffer->grow(skt, required_size)) != ERROR_SUCCESS) { if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); } return ret; } - memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size); - buffer->erase(bh_size + mh_size + payload_size); + memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->bytes() + bh_size + mh_size, payload_size); + in_buffer->erase(bh_size + mh_size + payload_size); chunk->msg->size += payload_size; srs_verbose("chunk payload read completed. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp index a17bc8fb6..50cc11642 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp @@ -97,8 +97,10 @@ private: }; // peer in/out private: + /** + * underlayer socket object, send/recv bytes. + */ ISrsProtocolReaderWriter* skt; - char* pp; /** * requests sent out, used to build the response. * key: transactionId @@ -107,14 +109,33 @@ private: std::map requests; // peer in private: + /** + * chunk stream to decode RTMP messages. + */ std::map chunk_streams; - SrsStream* decode_stream; - SrsBuffer* buffer; + /** + * bytes buffer cache, recv from skt, provide services for stream. + */ + SrsBuffer* in_buffer; + /** + * input chunk size, default to 128, set by peer packet. + */ int32_t in_chunk_size; + /** + * input ack size, when to send the acked packet. + */ AckWindowSize in_ack_size; // peer out private: + /** + * output header cache. + * used for type0, 11bytes(or 15bytes with extended timestamp) header. + * or for type3, 1bytes(or 5bytes with extended timestamp) header. + */ char out_header_cache[RTMP_MAX_FMT0_HEADER_SIZE]; + /** + * output chunk size, default to 128, set by config. + */ int32_t out_chunk_size; public: /**