From ab93506b018a1837453b26657ccc80c9f33c5f87 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 12 Nov 2014 17:59:32 +0800 Subject: [PATCH] for bug #199, refine the api of send message. --- trunk/src/rtmp/srs_protocol_stack.cpp | 240 ++++++++++++++------------ trunk/src/rtmp/srs_protocol_stack.hpp | 22 ++- 2 files changed, 148 insertions(+), 114 deletions(-) diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp index 397095b5f..f065401e9 100644 --- a/trunk/src/rtmp/srs_protocol_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_stack.cpp @@ -532,137 +532,149 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket) return ret; } -int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet) +int SrsProtocol::do_send_message(SrsMessage* msg) { int ret = ERROR_SUCCESS; - // always not NULL msg. - srs_assert(msg); + // ignore empty message. + if (!msg->payload || msg->size <= 0) { + srs_info("ignore empty message."); + return ret; + } // we donot use the complex basic header, // ensure the basic header is 1bytes. if (msg->header.perfer_cid < 2) { - srs_warn("change the chunk_id=%d to default=%d", msg->header.perfer_cid, RTMP_CID_ProtocolControl); + srs_warn("change the chunk_id=%d to default=%d", + msg->header.perfer_cid, RTMP_CID_ProtocolControl); msg->header.perfer_cid = RTMP_CID_ProtocolControl; } // p set to current write position, // it's ok when payload is NULL and size is 0. char* p = msg->payload; - // to directly set the field. - char* pp = NULL; + char* pend = msg->payload + msg->size; // always write the header event payload is empty. - do { - // generate the header. - char* pheader = out_header_cache; + while (p < pend) { + // always has header + int nbh = 0; + char* header = NULL; + generate_chunk_header(&msg->header, p == msg->payload, &nbh, &header); + srs_assert(nbh > 0); - if (p == msg->payload) { - // write new chunk stream header, fmt is 0 - *pheader++ = 0x00 | (msg->header.perfer_cid & 0x3F); - - // chunk message header, 11 bytes - // timestamp, 3bytes, big-endian - u_int32_t timestamp = (u_int32_t)msg->header.timestamp; - if (timestamp < RTMP_EXTENDED_TIMESTAMP) { - pp = (char*)×tamp; - *pheader++ = pp[2]; - *pheader++ = pp[1]; - *pheader++ = pp[0]; - } else { - *pheader++ = 0xFF; - *pheader++ = 0xFF; - *pheader++ = 0xFF; - } - - // message_length, 3bytes, big-endian - pp = (char*)&msg->header.payload_length; - *pheader++ = pp[2]; - *pheader++ = pp[1]; - *pheader++ = pp[0]; - - // message_type, 1bytes - *pheader++ = msg->header.message_type; - - // message_length, 3bytes, little-endian - pp = (char*)&msg->header.stream_id; - *pheader++ = pp[0]; - *pheader++ = pp[1]; - *pheader++ = pp[2]; - *pheader++ = pp[3]; - - // chunk extended timestamp header, 0 or 4 bytes, big-endian - if(timestamp >= RTMP_EXTENDED_TIMESTAMP) { - pp = (char*)×tamp; - *pheader++ = pp[3]; - *pheader++ = pp[2]; - *pheader++ = pp[1]; - *pheader++ = pp[0]; - } - } else { - // write no message header chunk stream, fmt is 3 - // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, - // SRS will rollback to 1B chunk header. - *pheader++ = 0xC0 | (msg->header.perfer_cid & 0x3F); - - // chunk extended timestamp header, 0 or 4 bytes, big-endian - // 6.1.3. Extended Timestamp - // This field is transmitted only when the normal time stamp in the - // chunk message header is set to 0x00ffffff. If normal time stamp is - // set to any value less than 0x00ffffff, this field MUST NOT be - // present. This field MUST NOT be present if the timestamp field is not - // present. Type 3 chunks MUST NOT have this field. - // adobe changed for Type3 chunk: - // FMLE always sendout the extended-timestamp, - // must send the extended-timestamp to FMS, - // must send the extended-timestamp to flash-player. - // @see: ngx_rtmp_prepare_message - // @see: http://blog.csdn.net/win_lin/article/details/13363699 - u_int32_t timestamp = (u_int32_t)msg->header.timestamp; - if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { - pp = (char*)×tamp; - *pheader++ = pp[3]; - *pheader++ = pp[2]; - *pheader++ = pp[1]; - *pheader++ = pp[0]; - } + // header iov + iov[0].iov_base = header; + iov[0].iov_len = nbh; + + // payload iov + int payload_size = pend - p; + if (payload_size > out_chunk_size) { + payload_size = out_chunk_size; } + iov[1].iov_base = p; + iov[1].iov_len = payload_size; + // send by writev // sendout header and payload by writev. // decrease the sys invoke count to get higher performance. - int payload_size = msg->size - (p - msg->payload); - payload_size = srs_min(payload_size, out_chunk_size); + if ((ret = skt->writev(iov, 2, NULL)) != ERROR_SUCCESS) { + srs_error("send with writev failed. ret=%d", ret); + return ret; + } - // always has header - int header_size = pheader - out_header_cache; - srs_assert(header_size > 0); + // consume sendout bytes. + p += payload_size; + } + + return ret; +} + +void SrsProtocol::generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph) +{ + char* cache = out_c0_cache; + + // to directly set the field. + char* pp = NULL; + + // generate the header. + char* p = cache; + + if (c0) { + // write new chunk stream header, fmt is 0 + *p++ = 0x00 | (mh->perfer_cid & 0x3F); - // send by writev - iovec iov[2]; - iov[0].iov_base = out_header_cache; - iov[0].iov_len = header_size; - iov[1].iov_base = p; - iov[1].iov_len = payload_size; + // chunk message header, 11 bytes + // timestamp, 3bytes, big-endian + u_int32_t timestamp = (u_int32_t)mh->timestamp; + if (timestamp < RTMP_EXTENDED_TIMESTAMP) { + pp = (char*)×tamp; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + } else { + *p++ = 0xFF; + *p++ = 0xFF; + *p++ = 0xFF; + } - ssize_t nwrite; - if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) { - srs_error("send with writev failed. ret=%d", ret); - return ret; + // message_length, 3bytes, big-endian + pp = (char*)&mh->payload_length; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + + // message_type, 1bytes + *p++ = mh->message_type; + + // message_length, 3bytes, little-endian + pp = (char*)&mh->stream_id; + *p++ = pp[0]; + *p++ = pp[1]; + *p++ = pp[2]; + *p++ = pp[3]; + + // chunk extended timestamp header, 0 or 4 bytes, big-endian + if(timestamp >= RTMP_EXTENDED_TIMESTAMP) { + pp = (char*)×tamp; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; } + } else { + // write no message header chunk stream, fmt is 3 + // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, + // SRS will rollback to 1B chunk header. + *p++ = 0xC0 | (mh->perfer_cid & 0x3F); - // consume sendout bytes when not empty packet. - if (msg->payload && msg->size > 0) { - p += payload_size; + // chunk extended timestamp header, 0 or 4 bytes, big-endian + // 6.1.3. Extended Timestamp + // This field is transmitted only when the normal time stamp in the + // chunk message header is set to 0x00ffffff. If normal time stamp is + // set to any value less than 0x00ffffff, this field MUST NOT be + // present. This field MUST NOT be present if the timestamp field is not + // present. Type 3 chunks MUST NOT have this field. + // adobe changed for Type3 chunk: + // FMLE always sendout the extended-timestamp, + // must send the extended-timestamp to FMS, + // must send the extended-timestamp to flash-player. + // @see: ngx_rtmp_prepare_message + // @see: http://blog.csdn.net/win_lin/article/details/13363699 + // TODO: FIXME: extract to outer. + u_int32_t timestamp = (u_int32_t)mh->timestamp; + if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { + pp = (char*)×tamp; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; } - } while (p < msg->payload + msg->size); - - // only process the callback event when with packet - if (packet && (ret = on_send_packet(msg, packet)) != ERROR_SUCCESS) { - srs_error("hook the send message failed. ret=%d", ret); - return ret; } - return ret; + // always has header + *pnbh = p - cache; + *ph = cache; } int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket) @@ -834,14 +846,17 @@ int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id) { - if (msg) { - msg->header.stream_id = stream_id; - } + // always not NULL msg. + srs_assert(msg); + + // update the stream id in header. + msg->header.stream_id = stream_id; // donot use the auto free to free the msg, // for performance issue. - int ret = do_send_message(msg, NULL); + int ret = do_send_message(msg); srs_freep(msg); + return ret; } @@ -878,7 +893,10 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id) // donot use the auto free to free the msg, // for performance issue. - ret = do_send_message(msg, packet); + ret = do_send_message(msg); + if (ret == ERROR_SUCCESS) { + ret = on_send_packet(msg, packet); + } srs_freep(msg); return ret; @@ -1535,8 +1553,10 @@ int SrsProtocol::on_send_packet(SrsMessage* msg, SrsPacket* packet) { int ret = ERROR_SUCCESS; - // should never be raw bytes oriented RTMP message. - srs_assert(packet); + // ignore raw bytes oriented RTMP message. + if (packet == NULL) { + return ret; + } switch (msg->header.message_type) { case RTMP_MSG_SetChunkSize: { diff --git a/trunk/src/rtmp/srs_protocol_stack.hpp b/trunk/src/rtmp/srs_protocol_stack.hpp index e8e4e4456..949b9508a 100644 --- a/trunk/src/rtmp/srs_protocol_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_stack.hpp @@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include #include #include @@ -214,7 +215,11 @@ private: * used for type0, 11bytes(or 15bytes with extended timestamp) header. * or for type3, 1bytes(or 5bytes with extended timestamp) header. */ - char out_header_cache[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE]; + char out_c0_cache[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE]; + /** + * output iovec cache. + */ + iovec iov[2]; /** * output chunk size, default to 128, set by config. */ @@ -339,10 +344,19 @@ public: } private: /** - * send out the message, donot free it, the caller must free the param msg. - * @param packet the packet of message, NULL for raw message. + * send out the message, donot free it, + * the caller must free the param msg. + */ + virtual int do_send_message(SrsMessage* msg); + /** + * generate the chunk header for msg. + * @param mh, the header of msg to send. + * @param c0, whether the first chunk, the c0 chunk. + * @param pnbh, output the size of header. + * @param ph, output the header cache. + * user should never free it, it's cached header. */ - virtual int do_send_message(SrsMessage* msg, SrsPacket* packet); + virtual void generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph); /** * imp for decode_message */