for bug #251, refine code before mic.

pull/133/head
winlin 10 years ago
parent d827928eeb
commit 32d537b96b

@ -830,6 +830,17 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
} }
srs_trace("vhost %s reload mr success.", vhost.c_str()); srs_trace("vhost %s reload mr success.", vhost.c_str());
} }
// chunk_size, only one per vhost.
if (!srs_directive_equals(new_vhost->get("chunk_size"), old_vhost->get("chunk_size"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_vhost_chunk_size(vhost)) != ERROR_SUCCESS) {
srs_error("vhost %s notify subscribes chunk_size failed. ret=%d", vhost.c_str(), ret);
return ret;
}
}
srs_trace("vhost %s reload chunk_size success.", vhost.c_str());
}
// mw, only one per vhost // mw, only one per vhost
if (!srs_directive_equals(new_vhost->get("mw_latency"), old_vhost->get("mw_latency"))) { if (!srs_directive_equals(new_vhost->get("mw_latency"), old_vhost->get("mw_latency"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) { for (it = subscribes.begin(); it != subscribes.end(); ++it) {

@ -150,6 +150,11 @@ int ISrsReloadHandler::on_reload_vhost_mw(string /*vhost*/)
return ERROR_SUCCESS; return ERROR_SUCCESS;
} }
int ISrsReloadHandler::on_reload_vhost_chunk_size(string /*vhost*/)
{
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_vhost_transcode(string /*vhost*/) int ISrsReloadHandler::on_reload_vhost_transcode(string /*vhost*/)
{ {
return ERROR_SUCCESS; return ERROR_SUCCESS;

@ -67,6 +67,7 @@ public:
virtual int on_reload_vhost_dvr(std::string vhost); virtual int on_reload_vhost_dvr(std::string vhost);
virtual int on_reload_vhost_mr(std::string vhost); virtual int on_reload_vhost_mr(std::string vhost);
virtual int on_reload_vhost_mw(std::string vhost); virtual int on_reload_vhost_mw(std::string vhost);
virtual int on_reload_vhost_chunk_size(std::string vhost);
virtual int on_reload_vhost_transcode(std::string vhost); virtual int on_reload_vhost_transcode(std::string vhost);
virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);
virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id); virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id);

@ -1173,7 +1173,7 @@ int SrsSource::on_audio(SrsCommonMessage* __audio)
std::vector<SrsForwarder*>::iterator it; std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) { for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it; SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_audio(msg.copy())) != ERROR_SUCCESS) { if ((ret = forwarder->on_audio(&msg)) != ERROR_SUCCESS) {
srs_error("forwarder process audio message failed. ret=%d", ret); srs_error("forwarder process audio message failed. ret=%d", ret);
return ret; return ret;
} }

@ -162,6 +162,8 @@ void check_macro_features()
srs_warn("MR(merged-read) is disabled, hurts read performance. @see %s", RTMP_SIG_SRS_ISSUES(241)); srs_warn("MR(merged-read) is disabled, hurts read performance. @see %s", RTMP_SIG_SRS_ISSUES(241));
#endif #endif
srs_trace("writev limits write %d iovs a time", sysconf(_SC_IOV_MAX));
#if VERSION_MAJOR > 1 #if VERSION_MAJOR > 1
#warning "using develop SRS, please use release instead." #warning "using develop SRS, please use release instead."
srs_warn("SRS %s is develop branch, please use %s instead", RTMP_SIG_SRS_VERSION, RTMP_SIG_SRS_RELEASE); srs_warn("SRS %s is develop branch, please use %s instead", RTMP_SIG_SRS_VERSION, RTMP_SIG_SRS_RELEASE);

@ -29,6 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_autofree.hpp> #include <srs_core_autofree.hpp>
#include <srs_kernel_utility.hpp> #include <srs_kernel_utility.hpp>
#include <srs_protocol_buffer.hpp> #include <srs_protocol_buffer.hpp>
#include <srs_protocol_utility.hpp>
#include <stdlib.h> #include <stdlib.h>
using namespace std; using namespace std;
@ -163,22 +164,6 @@ messages.
// the same as the timestamp of Type 0 chunk. // the same as the timestamp of Type 0 chunk.
#define RTMP_FMT_TYPE3 3 #define RTMP_FMT_TYPE3 3
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
* 6.1. Chunk Format
* Extended timestamp: 0 or 4 bytes
* This field MUST be sent when the normal timsestamp is set to
* 0xffffff, it MUST NOT be sent if the normal timestamp is set to
* anything else. So for values less than 0xffffff the normal
* timestamp field SHOULD be used in which case the extended timestamp
* MUST NOT be present. For values greater than or equal to 0xffffff
* the normal timestamp field MUST NOT be used and MUST be set to
* 0xffffff and the extended timestamp MUST be sent.
*/
#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF
/**************************************************************************** /****************************************************************************
***************************************************************************** *****************************************************************************
****************************************************************************/ ****************************************************************************/
@ -756,13 +741,11 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
// always write the header event payload is empty. // always write the header event payload is empty.
while (p < pend) { while (p < pend) {
// always has header // always has header
int nbh = 0; int nbh = srs_chunk_header(c0c3_cache, &msg->header, p == msg->payload);
char* header = NULL;
generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, &nbh, &header);
srs_assert(nbh > 0); srs_assert(nbh > 0);
// header iov // header iov
iov[0].iov_base = header; iov[0].iov_base = c0c3_cache;
iov[0].iov_len = nbh; iov[0].iov_len = nbh;
// payload iov // payload iov
@ -813,7 +796,7 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
// sendout all messages and reset the cache, then send again. // sendout all messages and reset the cache, then send again.
if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) { if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {
srs_error("send with writev failed. ret=%d", ret); srs_error("send msgs with writev failed. ret=%d", ret);
} }
return ret; return ret;
} }
@ -834,31 +817,47 @@ int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
if (iov_index <= 0) { if (iov_index <= 0) {
return ret; return ret;
} }
// calc the bytes of iovs, for debug.
#if 0 #if 0
// calc the bytes of iovs, for debug.
int nb_bytes = 0; int nb_bytes = 0;
for (int i = 0; i < iov_index; i++) { for (int i = 0; i < iov_index; i++) {
iovec* iov = out_iovs + i; iovec* iov = out_iovs + i;
nb_bytes += iov->iov_len; nb_bytes += iov->iov_len;
} }
srs_warn("mw %d msgs %dB in %d iovs, max_msgs=%d, nb_out_iovs=%d", srs_info("mw %d msgs %dB in %d iovs, max_msgs=%d, nb_out_iovs=%d",
nb_msgs, nb_bytes, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs); nb_msgs, nb_bytes, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
#else #else
srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d", srs_info("mw %d msgs in %d iovs, max_msgs=%d, nb_out_iovs=%d",
nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs); nb_msgs, iov_index, SRS_PERF_MW_MSGS, nb_out_iovs);
#endif #endif
// send by writev // the limits of writev iovs.
// sendout header and payload by writev. static int limits = sysconf(_SC_IOV_MAX);
// decrease the sys invoke count to get higher performance.
if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) { // send in a time.
if (!srs_is_client_gracefully_close(ret)) { if (iov_index < limits) {
srs_error("send with writev failed. ret=%d", ret); if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send with writev failed. ret=%d", ret);
}
return ret;
} }
return ret; return ret;
} }
// send in multiple times.
int cur_iov = 0;
while (cur_iov < iov_index) {
int cur_count = srs_min(limits, iov_index - cur_iov);
if ((ret = skt->writev(out_iovs + cur_iov, cur_count, NULL)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send with writev failed. ret=%d", ret);
}
return ret;
}
cur_iov += cur_count;
}
return ret; return ret;
} }
@ -888,102 +887,46 @@ int SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id)
header.message_type = packet->get_message_type(); header.message_type = packet->get_message_type();
header.stream_id = stream_id; header.stream_id = stream_id;
header.perfer_cid = packet->get_prefer_cid(); header.perfer_cid = packet->get_prefer_cid();
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); ret = do_simple_send(&header, payload, size);
ret = msg->create(&header, payload, size);
if (ret == ERROR_SUCCESS) { if (ret == ERROR_SUCCESS) {
ret = do_send_messages(&msg, 1); ret = on_send_packet(&header, packet);
if (ret == ERROR_SUCCESS) {
ret = on_send_packet(msg, packet);
}
} }
// donot use the auto free to free the msg,
// for performance issue.
srs_freep(msg);
return ret; return ret;
} }
void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph) int SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size)
{ {
// to directly set the field. int ret = ERROR_SUCCESS;
char* pp = NULL;
// generate the header.
char* p = cache;
// timestamp for c0/c3
u_int32_t timestamp = (u_int32_t)mh->timestamp;
if (c0) { // we directly send out the packet,
// write new chunk stream header, fmt is 0 // use very simple algorithm, not very fast,
*p++ = 0x00 | (mh->perfer_cid & 0x3F); // but it's ok.
char* p = payload;
// chunk message header, 11 bytes char* end = p + size;
// timestamp, 3bytes, big-endian char c0c3[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE];
if (timestamp < RTMP_EXTENDED_TIMESTAMP) { while (p < end) {
pp = (char*)&timestamp; int nbh = srs_chunk_header(c0c3, mh, p == payload);
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
} else {
*p++ = 0xFF;
*p++ = 0xFF;
*p++ = 0xFF;
}
// message_length, 3bytes, big-endian iovec iovs[2];
pp = (char*)&mh->payload_length; iovs[0].iov_base = c0c3;
*p++ = pp[2]; iovs[0].iov_len = nbh;
*p++ = pp[1];
*p++ = pp[0];
// message_type, 1bytes int payload_size = srs_min(end - p, out_chunk_size);
*p++ = mh->message_type; iovs[1].iov_base = p;
iovs[1].iov_len = payload_size;
p += payload_size;
// message_length, 3bytes, little-endian if ((ret = skt->writev(iovs, 2, NULL)) != ERROR_SUCCESS) {
pp = (char*)&mh->stream_id; if (!srs_is_client_gracefully_close(ret)) {
*p++ = pp[0]; srs_error("send packet with writev failed. ret=%d", ret);
*p++ = pp[1]; }
*p++ = pp[2]; return ret;
*p++ = pp[3]; }
} else {
// write no message header chunk stream, fmt is 3
// @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header,
// SRS will rollback to 1B chunk header.
*p++ = 0xC0 | (mh->perfer_cid & 0x3F);
} }
// for c0 return ret;
// chunk extended timestamp header, 0 or 4 bytes, big-endian
//
// for c3:
// chunk extended timestamp header, 0 or 4 bytes, big-endian
// 6.1.3. Extended Timestamp
// This field is transmitted only when the normal time stamp in the
// chunk message header is set to 0x00ffffff. If normal time stamp is
// set to any value less than 0x00ffffff, this field MUST NOT be
// present. This field MUST NOT be present if the timestamp field is not
// present. Type 3 chunks MUST NOT have this field.
// adobe changed for Type3 chunk:
// FMLE always sendout the extended-timestamp,
// must send the extended-timestamp to FMS,
// must send the extended-timestamp to flash-player.
// @see: ngx_rtmp_prepare_message
// @see: http://blog.csdn.net/win_lin/article/details/13363699
// TODO: FIXME: extract to outer.
if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp;
*p++ = pp[3];
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
}
// always has header
*pnbh = p - cache;
*ph = cache;
} }
int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket) int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, SrsPacket** ppacket)
@ -1842,7 +1785,7 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
return ret; return ret;
} }
int SrsProtocol::on_send_packet(SrsSharedPtrMessage* msg, SrsPacket* packet) int SrsProtocol::on_send_packet(SrsMessageHeader* mh, SrsPacket* packet)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -1851,7 +1794,7 @@ int SrsProtocol::on_send_packet(SrsSharedPtrMessage* msg, SrsPacket* packet)
return ret; return ret;
} }
switch (msg->header.message_type) { switch (mh->message_type) {
case RTMP_MSG_SetChunkSize: { case RTMP_MSG_SetChunkSize: {
SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(packet); SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(packet);
srs_assert(pkt != NULL); srs_assert(pkt != NULL);

@ -56,6 +56,22 @@ class SrsChunkStream;
class SrsSharedPtrMessage; class SrsSharedPtrMessage;
class IMergeReadHandler; class IMergeReadHandler;
/****************************************************************************
*****************************************************************************
****************************************************************************/
/**
* 6.1. Chunk Format
* Extended timestamp: 0 or 4 bytes
* This field MUST be sent when the normal timsestamp is set to
* 0xffffff, it MUST NOT be sent if the normal timestamp is set to
* anything else. So for values less than 0xffffff the normal
* timestamp field SHOULD be used in which case the extended timestamp
* MUST NOT be present. For values greater than or equal to 0xffffff
* the normal timestamp field MUST NOT be used and MUST be set to
* 0xffffff and the extended timestamp MUST be sent.
*/
#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF
/** /**
* 4.1. Message Header * 4.1. Message Header
*/ */
@ -493,14 +509,10 @@ private:
*/ */
virtual int do_send_and_free_packet(SrsPacket* packet, int stream_id); virtual int do_send_and_free_packet(SrsPacket* packet, int stream_id);
/** /**
* generate the chunk header for msg. * use simple algorithm to send the header and bytes.
* @param mh, the header of msg to send. * @remark, for do_send_and_free_packet to send.
* @param c0, whether the first chunk, the c0 chunk.
* @param pnbh, output the size of header.
* @param ph, output the header cache.
* user should never free it, it's cached header.
*/ */
virtual void generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph); virtual int do_simple_send(SrsMessageHeader* mh, char* payload, int size);
/** /**
* imp for decode_message * imp for decode_message
*/ */
@ -534,7 +546,7 @@ private:
/** /**
* when message sentout, update the context. * when message sentout, update the context.
*/ */
virtual int on_send_packet(SrsSharedPtrMessage* msg, SrsPacket* packet); virtual int on_send_packet(SrsMessageHeader* mh, SrsPacket* packet);
private: private:
/** /**
* auto response the ack message. * auto response the ack message.

@ -29,6 +29,7 @@ using namespace std;
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp> #include <srs_kernel_utility.hpp>
#include <srs_kernel_stream.hpp> #include <srs_kernel_stream.hpp>
#include <srs_protocol_stack.hpp>
void srs_discovery_tc_url( void srs_discovery_tc_url(
string tcUrl, string tcUrl,
@ -203,3 +204,83 @@ bool srs_aac_startswith_adts(SrsStream* stream)
return true; return true;
} }
int srs_chunk_header(char* cache, SrsMessageHeader* mh, bool c0)
{
// to directly set the field.
char* pp = NULL;
// generate the header.
char* p = cache;
// timestamp for c0/c3
u_int32_t timestamp = (u_int32_t)mh->timestamp;
if (c0) {
// write new chunk stream header, fmt is 0
*p++ = 0x00 | (mh->perfer_cid & 0x3F);
// chunk message header, 11 bytes
// timestamp, 3bytes, big-endian
if (timestamp < RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp;
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
} else {
*p++ = 0xFF;
*p++ = 0xFF;
*p++ = 0xFF;
}
// message_length, 3bytes, big-endian
pp = (char*)&mh->payload_length;
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
// message_type, 1bytes
*p++ = mh->message_type;
// stream_id, 4bytes, little-endian
pp = (char*)&mh->stream_id;
*p++ = pp[0];
*p++ = pp[1];
*p++ = pp[2];
*p++ = pp[3];
} else {
// write no message header chunk stream, fmt is 3
// @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header,
// SRS will rollback to 1B chunk header.
*p++ = 0xC0 | (mh->perfer_cid & 0x3F);
}
// for c0
// chunk extended timestamp header, 0 or 4 bytes, big-endian
//
// for c3:
// chunk extended timestamp header, 0 or 4 bytes, big-endian
// 6.1.3. Extended Timestamp
// This field is transmitted only when the normal time stamp in the
// chunk message header is set to 0x00ffffff. If normal time stamp is
// set to any value less than 0x00ffffff, this field MUST NOT be
// present. This field MUST NOT be present if the timestamp field is not
// present. Type 3 chunks MUST NOT have this field.
// adobe changed for Type3 chunk:
// FMLE always sendout the extended-timestamp,
// must send the extended-timestamp to FMS,
// must send the extended-timestamp to flash-player.
// @see: ngx_rtmp_prepare_message
// @see: http://blog.csdn.net/win_lin/article/details/13363699
// TODO: FIXME: extract to outer.
if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
pp = (char*)&timestamp;
*p++ = pp[3];
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
}
// always has header
return p - cache;
}

@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_consts.hpp> #include <srs_kernel_consts.hpp>
class SrsStream; class SrsStream;
class SrsMessageHeader;
/** /**
* parse the tcUrl, output the schema, host, vhost, app and port. * parse the tcUrl, output the schema, host, vhost, app and port.
@ -103,5 +104,13 @@ extern bool srs_avc_startswith_annexb(SrsStream* stream, int* pnb_start_code = N
*/ */
extern bool srs_aac_startswith_adts(SrsStream* stream); extern bool srs_aac_startswith_adts(SrsStream* stream);
/**
* generate the chunk header for msg.
* @param mh, the header of msg to send.
* @param c0, whether the first chunk, the c0 chunk.
* @return the size of header.
*/
extern int srs_chunk_header(char* cache, SrsMessageHeader* mh, bool c0);
#endif #endif

Loading…
Cancel
Save