diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index cd4603b05..b68b3864e 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -830,6 +830,17 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) } srs_trace("vhost %s reload mr success.", vhost.c_str()); } + // chunk_size, only one per vhost. + if (!srs_directive_equals(new_vhost->get("chunk_size"), old_vhost->get("chunk_size"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_vhost_chunk_size(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes chunk_size failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload chunk_size success.", vhost.c_str()); + } // mw, only one per vhost if (!srs_directive_equals(new_vhost->get("mw_latency"), old_vhost->get("mw_latency"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { diff --git a/trunk/src/app/srs_app_reload.cpp b/trunk/src/app/srs_app_reload.cpp index cc69a9c44..b1f81f5ec 100644 --- a/trunk/src/app/srs_app_reload.cpp +++ b/trunk/src/app/srs_app_reload.cpp @@ -150,6 +150,11 @@ int ISrsReloadHandler::on_reload_vhost_mw(string /*vhost*/) return ERROR_SUCCESS; } +int ISrsReloadHandler::on_reload_vhost_chunk_size(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + int ISrsReloadHandler::on_reload_vhost_transcode(string /*vhost*/) { return ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_reload.hpp b/trunk/src/app/srs_app_reload.hpp index 878518b09..b9377bff5 100644 --- a/trunk/src/app/srs_app_reload.hpp +++ b/trunk/src/app/srs_app_reload.hpp @@ -67,6 +67,7 @@ public: virtual int on_reload_vhost_dvr(std::string vhost); virtual int on_reload_vhost_mr(std::string vhost); virtual int on_reload_vhost_mw(std::string vhost); + virtual int on_reload_vhost_chunk_size(std::string vhost); virtual int on_reload_vhost_transcode(std::string vhost); virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 523dafc5a..7daf92852 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1173,7 +1173,7 @@ int SrsSource::on_audio(SrsCommonMessage* __audio) std::vector::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_audio(msg.copy())) != ERROR_SUCCESS) { + if ((ret = forwarder->on_audio(&msg)) != ERROR_SUCCESS) { srs_error("forwarder process audio message failed. ret=%d", ret); return ret; } diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 26a2d585d..2ea3bdaf9 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -162,6 +162,8 @@ void check_macro_features() srs_warn("MR(merged-read) is disabled, hurts read performance. @see %s", RTMP_SIG_SRS_ISSUES(241)); #endif + srs_trace("writev limits write %d iovs a time", sysconf(_SC_IOV_MAX)); + #if VERSION_MAJOR > 1 #warning "using develop SRS, please use release instead." srs_warn("SRS %s is develop branch, please use %s instead", RTMP_SIG_SRS_VERSION, RTMP_SIG_SRS_RELEASE); diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp index 128253474..dbf5df229 100644 --- a/trunk/src/rtmp/srs_protocol_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_stack.cpp @@ -29,6 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include #include using namespace std; @@ -163,22 +164,6 @@ messages. // the same as the timestamp of Type 0 chunk. #define RTMP_FMT_TYPE3 3 -/**************************************************************************** -***************************************************************************** -****************************************************************************/ -/** -* 6.1. Chunk Format -* Extended timestamp: 0 or 4 bytes -* This field MUST be sent when the normal timsestamp is set to -* 0xffffff, it MUST NOT be sent if the normal timestamp is set to -* anything else. So for values less than 0xffffff the normal -* timestamp field SHOULD be used in which case the extended timestamp -* MUST NOT be present. For values greater than or equal to 0xffffff -* the normal timestamp field MUST NOT be used and MUST be set to -* 0xffffff and the extended timestamp MUST be sent. -*/ -#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF - /**************************************************************************** ***************************************************************************** ****************************************************************************/ @@ -756,13 +741,11 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) // always write the header event payload is empty. while (p < pend) { // always has header - int nbh = 0; - char* header = NULL; - generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, &nbh, &header); + int nbh = srs_chunk_header(c0c3_cache, &msg->header, p == msg->payload); srs_assert(nbh > 0); // header iov - iov[0].iov_base = header; + iov[0].iov_base = c0c3_cache; iov[0].iov_len = nbh; // payload iov @@ -813,7 +796,7 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) // sendout all messages and reset the cache, then send again. if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { - srs_error("send with writev failed. ret=%d", ret); + srs_error("send msgs with writev failed. ret=%d", ret); } return ret; } @@ -834,31 +817,47 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs) if (iov_index <= 0) { return ret; } - - // calc the bytes of iovs, for debug. #if 0 + // calc the bytes of iovs, for debug. int nb_bytes = 0; for (int i = 0; i < iov_index; i++) { iovec* iov = out_iovs + i; nb_bytes += iov->iov_len; } - srs_warn("mw %d msgs %dB in %d iovs, max_msgs=%d, nb_out_iovs=%d", + srs_info("mw %d msgs %dB in %d iovs, max_msgs=%d, nb_out_iovs=%d", nb_msgs, nb_bytes, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs); #else srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d", nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs); #endif - // send by writev - // sendout header and payload by writev. - // decrease the sys invoke count to get higher performance. - if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("send with writev failed. ret=%d", ret); + // the limits of writev iovs. + static int limits = sysconf(_SC_IOV_MAX); + + // send in a time. + if (iov_index < limits) { + if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send with writev failed. ret=%d", ret); + } + return ret; } return ret; } + // send in multiple times. + int cur_iov = 0; + while (cur_iov < iov_index) { + int cur_count = srs_min(limits, iov_index - cur_iov); + if ((ret = skt->writev(out_iovs + cur_iov, cur_count, NULL)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send with writev failed. ret=%d", ret); + } + return ret; + } + cur_iov += cur_count; + } + return ret; } @@ -888,102 +887,46 @@ int SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id) header.message_type = packet->get_message_type(); header.stream_id = stream_id; header.perfer_cid = packet->get_prefer_cid(); - - SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); - ret = msg->create(&header, payload, size); + + ret = do_simple_send(&header, payload, size); if (ret == ERROR_SUCCESS) { - ret = do_send_messages(&msg, 1); - if (ret == ERROR_SUCCESS) { - ret = on_send_packet(msg, packet); - } + ret = on_send_packet(&header, packet); } - - // donot use the auto free to free the msg, - // for performance issue. - srs_freep(msg); return ret; } -void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph) +int SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size) { - // to directly set the field. - char* pp = NULL; - - // generate the header. - char* p = cache; - - // timestamp for c0/c3 - u_int32_t timestamp = (u_int32_t)mh->timestamp; + int ret = ERROR_SUCCESS; - if (c0) { - // write new chunk stream header, fmt is 0 - *p++ = 0x00 | (mh->perfer_cid & 0x3F); - - // chunk message header, 11 bytes - // timestamp, 3bytes, big-endian - if (timestamp < RTMP_EXTENDED_TIMESTAMP) { - pp = (char*)×tamp; - *p++ = pp[2]; - *p++ = pp[1]; - *p++ = pp[0]; - } else { - *p++ = 0xFF; - *p++ = 0xFF; - *p++ = 0xFF; - } + // we directly send out the packet, + // use very simple algorithm, not very fast, + // but it's ok. + char* p = payload; + char* end = p + size; + char c0c3[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE]; + while (p < end) { + int nbh = srs_chunk_header(c0c3, mh, p == payload); - // message_length, 3bytes, big-endian - pp = (char*)&mh->payload_length; - *p++ = pp[2]; - *p++ = pp[1]; - *p++ = pp[0]; + iovec iovs[2]; + iovs[0].iov_base = c0c3; + iovs[0].iov_len = nbh; - // message_type, 1bytes - *p++ = mh->message_type; + int payload_size = srs_min(end - p, out_chunk_size); + iovs[1].iov_base = p; + iovs[1].iov_len = payload_size; + p += payload_size; - // message_length, 3bytes, little-endian - pp = (char*)&mh->stream_id; - *p++ = pp[0]; - *p++ = pp[1]; - *p++ = pp[2]; - *p++ = pp[3]; - } else { - // write no message header chunk stream, fmt is 3 - // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, - // SRS will rollback to 1B chunk header. - *p++ = 0xC0 | (mh->perfer_cid & 0x3F); + if ((ret = skt->writev(iovs, 2, NULL)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send packet with writev failed. ret=%d", ret); + } + return ret; + } } - // for c0 - // chunk extended timestamp header, 0 or 4 bytes, big-endian - // - // for c3: - // chunk extended timestamp header, 0 or 4 bytes, big-endian - // 6.1.3. Extended Timestamp - // This field is transmitted only when the normal time stamp in the - // chunk message header is set to 0x00ffffff. If normal time stamp is - // set to any value less than 0x00ffffff, this field MUST NOT be - // present. This field MUST NOT be present if the timestamp field is not - // present. Type 3 chunks MUST NOT have this field. - // adobe changed for Type3 chunk: - // FMLE always sendout the extended-timestamp, - // must send the extended-timestamp to FMS, - // must send the extended-timestamp to flash-player. - // @see: ngx_rtmp_prepare_message - // @see: http://blog.csdn.net/win_lin/article/details/13363699 - // TODO: FIXME: extract to outer. - if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { - pp = (char*)×tamp; - *p++ = pp[3]; - *p++ = pp[2]; - *p++ = pp[1]; - *p++ = pp[0]; - } - - // always has header - *pnbh = p - cache; - *ph = cache; + return ret; } int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket) @@ -1842,7 +1785,7 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) return ret; } -int SrsProtocol::on_send_packet(SrsSharedPtrMessage* msg, SrsPacket* packet) +int SrsProtocol::on_send_packet(SrsMessageHeader* mh, SrsPacket* packet) { int ret = ERROR_SUCCESS; @@ -1851,7 +1794,7 @@ int SrsProtocol::on_send_packet(SrsSharedPtrMessage* msg, SrsPacket* packet) return ret; } - switch (msg->header.message_type) { + switch (mh->message_type) { case RTMP_MSG_SetChunkSize: { SrsSetChunkSizePacket* pkt = dynamic_cast(packet); srs_assert(pkt != NULL); diff --git a/trunk/src/rtmp/srs_protocol_stack.hpp b/trunk/src/rtmp/srs_protocol_stack.hpp index b99f359c3..753787c11 100644 --- a/trunk/src/rtmp/srs_protocol_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_stack.hpp @@ -56,6 +56,22 @@ class SrsChunkStream; class SrsSharedPtrMessage; class IMergeReadHandler; +/**************************************************************************** +***************************************************************************** +****************************************************************************/ +/** +* 6.1. Chunk Format +* Extended timestamp: 0 or 4 bytes +* This field MUST be sent when the normal timsestamp is set to +* 0xffffff, it MUST NOT be sent if the normal timestamp is set to +* anything else. So for values less than 0xffffff the normal +* timestamp field SHOULD be used in which case the extended timestamp +* MUST NOT be present. For values greater than or equal to 0xffffff +* the normal timestamp field MUST NOT be used and MUST be set to +* 0xffffff and the extended timestamp MUST be sent. +*/ +#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF + /** * 4.1. Message Header */ @@ -493,14 +509,10 @@ private: */ virtual int do_send_and_free_packet(SrsPacket* packet, int stream_id); /** - * generate the chunk header for msg. - * @param mh, the header of msg to send. - * @param c0, whether the first chunk, the c0 chunk. - * @param pnbh, output the size of header. - * @param ph, output the header cache. - * user should never free it, it's cached header. + * use simple algorithm to send the header and bytes. + * @remark, for do_send_and_free_packet to send. */ - virtual void generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph); + virtual int do_simple_send(SrsMessageHeader* mh, char* payload, int size); /** * imp for decode_message */ @@ -534,7 +546,7 @@ private: /** * when message sentout, update the context. */ - virtual int on_send_packet(SrsSharedPtrMessage* msg, SrsPacket* packet); + virtual int on_send_packet(SrsMessageHeader* mh, SrsPacket* packet); private: /** * auto response the ack message. diff --git a/trunk/src/rtmp/srs_protocol_utility.cpp b/trunk/src/rtmp/srs_protocol_utility.cpp index b79e45829..ac4ea8ad6 100644 --- a/trunk/src/rtmp/srs_protocol_utility.cpp +++ b/trunk/src/rtmp/srs_protocol_utility.cpp @@ -29,6 +29,7 @@ using namespace std; #include #include #include +#include void srs_discovery_tc_url( string tcUrl, @@ -203,3 +204,83 @@ bool srs_aac_startswith_adts(SrsStream* stream) return true; } +int srs_chunk_header(char* cache, SrsMessageHeader* mh, bool c0) +{ + // to directly set the field. + char* pp = NULL; + + // generate the header. + char* p = cache; + + // timestamp for c0/c3 + u_int32_t timestamp = (u_int32_t)mh->timestamp; + + if (c0) { + // write new chunk stream header, fmt is 0 + *p++ = 0x00 | (mh->perfer_cid & 0x3F); + + // chunk message header, 11 bytes + // timestamp, 3bytes, big-endian + if (timestamp < RTMP_EXTENDED_TIMESTAMP) { + pp = (char*)×tamp; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + } else { + *p++ = 0xFF; + *p++ = 0xFF; + *p++ = 0xFF; + } + + // message_length, 3bytes, big-endian + pp = (char*)&mh->payload_length; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + + // message_type, 1bytes + *p++ = mh->message_type; + + // stream_id, 4bytes, little-endian + pp = (char*)&mh->stream_id; + *p++ = pp[0]; + *p++ = pp[1]; + *p++ = pp[2]; + *p++ = pp[3]; + } else { + // write no message header chunk stream, fmt is 3 + // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, + // SRS will rollback to 1B chunk header. + *p++ = 0xC0 | (mh->perfer_cid & 0x3F); + } + + // for c0 + // chunk extended timestamp header, 0 or 4 bytes, big-endian + // + // for c3: + // chunk extended timestamp header, 0 or 4 bytes, big-endian + // 6.1.3. Extended Timestamp + // This field is transmitted only when the normal time stamp in the + // chunk message header is set to 0x00ffffff. If normal time stamp is + // set to any value less than 0x00ffffff, this field MUST NOT be + // present. This field MUST NOT be present if the timestamp field is not + // present. Type 3 chunks MUST NOT have this field. + // adobe changed for Type3 chunk: + // FMLE always sendout the extended-timestamp, + // must send the extended-timestamp to FMS, + // must send the extended-timestamp to flash-player. + // @see: ngx_rtmp_prepare_message + // @see: http://blog.csdn.net/win_lin/article/details/13363699 + // TODO: FIXME: extract to outer. + if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { + pp = (char*)×tamp; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + } + + // always has header + return p - cache; +} + diff --git a/trunk/src/rtmp/srs_protocol_utility.hpp b/trunk/src/rtmp/srs_protocol_utility.hpp index 2ec909133..e1deedd93 100644 --- a/trunk/src/rtmp/srs_protocol_utility.hpp +++ b/trunk/src/rtmp/srs_protocol_utility.hpp @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsStream; +class SrsMessageHeader; /** * parse the tcUrl, output the schema, host, vhost, app and port. @@ -103,5 +104,13 @@ extern bool srs_avc_startswith_annexb(SrsStream* stream, int* pnb_start_code = N */ extern bool srs_aac_startswith_adts(SrsStream* stream); +/** +* generate the chunk header for msg. +* @param mh, the header of msg to send. +* @param c0, whether the first chunk, the c0 chunk. +* @return the size of header. +*/ +extern int srs_chunk_header(char* cache, SrsMessageHeader* mh, bool c0); + #endif