diff --git a/README.md b/README.md index 4258e5bd6..0e6c10606 100755 --- a/README.md +++ b/README.md @@ -184,6 +184,7 @@ Please select according to languages: ### V3 changes +* v3.0, 2018-08-05, Refine HTTP-FLV latency, support realtime mode.3.0.38 * v3.0, 2018-08-05, Fix [#1087][bug #1087], Ignore iface without address. 3.0.37 * v3.0, 2018-08-04, For [#1110][bug #1110], Support params in http callback. 3.0.36 * v3.0, 2018-08-02, Always use vhost in stream query, the unify uri. 3.0.35 diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 721bd64fe..597a851ac 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -23,6 +23,7 @@ #include +#include using namespace std; #include @@ -95,6 +96,89 @@ srs_error_t SrsConnection::start() return err; } +srs_error_t SrsConnection::set_tcp_nodelay(bool v) +{ + srs_error_t err = srs_success; + + int r0 = 0; + socklen_t nb_v = sizeof(int); + int fd = srs_netfd_fileno(stfd); + + int ov = 0; + if ((r0 = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &ov, &nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_NO_NODELAY, "getsockopt fd=%d, r0=%d", fd, r0); + } + +#ifndef SRS_PERF_TCP_NODELAY + srs_warn("ignore TCP_NODELAY, fd=%d, ov=%d", fd, ov); + return err; +#endif + + int iv = (v? 1:0); + if ((r0 = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &iv, nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_NO_NODELAY, "setsockopt fd=%d, r0=%v", fd, r0); + } + if ((r0 = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &iv, &nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_NO_NODELAY, "getsockopt fd=%d, r0=%d", fd, r0); + } + + srs_trace("set fd=%d TCP_NODELAY %d=>%d", fd, ov, iv); + + return err; +} + +srs_error_t SrsConnection::set_socket_buffer(int buffer_ms) +{ + srs_error_t err = srs_success; + + int r0 = 0; + int fd = srs_netfd_fileno(stfd); + socklen_t nb_v = sizeof(int); + + int ov = 0; + if ((r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ov, &nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_SNDBUF, "getsockopt fd=%d, r0=%d", fd, r0); + } + +#ifndef SRS_PERF_MW_SO_SNDBUF + srs_warn("ignore SO_SNDBUF, fd=%d, ov=%d", fd, ov); + return err; +#endif + + // the bytes: + // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536, + // 128KB=131072, 256KB=262144, 512KB=524288 + // the buffer should set to sleep*kbps/8, + // for example, your system delivery stream in 1000kbps, + // sleep 800ms for small bytes, the buffer should set to: + // 800*1000/8=100000B(about 128KB). + // other examples: + // 2000*3000/8=750000B(about 732KB). + // 2000*5000/8=1250000B(about 1220KB). + int kbps = 4000; + int iv = buffer_ms * kbps / 8; + + // socket send buffer, system will double it. + iv = iv / 2; + + // override the send buffer by macro. +#ifdef SRS_PERF_SO_SNDBUF_SIZE + iv = SRS_PERF_SO_SNDBUF_SIZE / 2; +#endif + + // set the socket send buffer when required larger buffer + if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &iv, nb_v) < 0) { + return srs_error_new(ERROR_SOCKET_SNDBUF, "setsockopt fd=%d, r0=%v", fd, r0); + } + if ((r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &iv, &nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_SNDBUF, "getsockopt fd=%d, r0=%d", fd, r0); + } + + srs_trace("set fd=%d, SO_SNDBUF=%d=>%d, buffer=%dms", fd, ov, iv, buffer_ms); + + return err; +} + srs_error_t SrsConnection::cycle() { srs_error_t err = do_cycle(); diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 16c8bd9b2..58115fede 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -100,6 +100,10 @@ public: * to remove the client by server->remove(this). */ virtual srs_error_t start(); + // Set socket option TCP_NODELAY. + virtual srs_error_t set_tcp_nodelay(bool v); + // Set socket option SO_SNDBUF in ms. + virtual srs_error_t set_socket_buffer(int buffer_ms); // interface ISrsOneCycleThreadHandler public: /** diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 00c4a9f3f..7c16da63e 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -487,6 +487,7 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage { srs_error_t err = srs_success; + string enc_desc; ISrsBufferEncoder* enc = NULL; srs_assert(entry); @@ -495,21 +496,27 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage #ifdef SRS_PERF_FAST_FLV_ENCODER bool realtime = _srs_config->get_realtime_enabled(req->vhost); if (realtime) { + enc_desc = "FLV"; enc = new SrsFlvStreamEncoder(); } else { + enc_desc = "FastFLV"; enc = new SrsFastFlvStreamEncoder(); } #else + enc_desc = "FLV"; enc = new SrsFlvStreamEncoder(); #endif } else if (srs_string_ends_with(entry->pattern, ".aac")) { w->header()->set_content_type("audio/x-aac"); + enc_desc = "AAC"; enc = new SrsAacStreamEncoder(); } else if (srs_string_ends_with(entry->pattern, ".mp3")) { w->header()->set_content_type("audio/mpeg"); + enc_desc = "MP3"; enc = new SrsMp3StreamEncoder(); } else if (srs_string_ends_with(entry->pattern, ".ts")) { w->header()->set_content_type("video/MP2T"); + enc_desc = "TS"; enc = new SrsTsStreamEncoder(); } else { return srs_error_new(ERROR_HTTP_LIVE_STREAM_EXT, "invalid pattern=%s", entry->pattern.c_str()); @@ -557,7 +564,18 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage SrsHttpMessage* hr = dynamic_cast(r); SrsResponseOnlyHttpConn* hc = dynamic_cast(hr->connection()); + // Set the socket options for transport. + bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost); + if (tcp_nodelay) { + if ((err = hc->set_tcp_nodelay(tcp_nodelay)) != srs_success) { + return srs_error_wrap(err, "set tcp nodelay"); + } + } + int mw_sleep = _srs_config->get_mw_sleep_ms(req->vhost); + if ((err = hc->set_socket_buffer(mw_sleep)) != srs_success) { + return srs_error_wrap(err, "set mw_sleep"); + } SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc); SrsAutoFree(SrsHttpRecvThread, trd); @@ -565,6 +583,9 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "start recv thread"); } + + srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d", + entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, mw_sleep, enc->has_cache(), msgs.max); // TODO: free and erase the disabled entry after all related connections is closed. while (entry->enabled) { @@ -583,16 +604,14 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage } if (count <= 0) { - srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS); - // directly use sleep, donot use consumer wait. - srs_usleep(mw_sleep); - + // Directly use sleep, donot use consumer wait, because we couldn't awake consumer. + srs_usleep(mw_sleep * 1000); // ignore when nothing got. continue; } if (pprint->can_print()) { - srs_trace("-> "SRS_CONSTS_LOG_HTTP_STREAM" http: got %d msgs, age=%d, min=%d, mw=%d", + srs_trace("-> " SRS_CONSTS_LOG_HTTP_STREAM " http: got %d msgs, age=%d, min=%d, mw=%d", count, pprint->age(), SRS_PERF_MW_MIN_MSGS, mw_sleep); } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index db8e67757..cfdc6da76 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1136,48 +1136,7 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms) return; } - // get the sock buffer size. - int fd = srs_netfd_fileno(stfd); - int onb_sbuf = 0; - socklen_t sock_buf_size = sizeof(int); - getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size); - -#ifdef SRS_PERF_MW_SO_SNDBUF - // the bytes: - // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536, - // 128KB=131072, 256KB=262144, 512KB=524288 - // the buffer should set to sleep*kbps/8, - // for example, your system delivery stream in 1000kbps, - // sleep 800ms for small bytes, the buffer should set to: - // 800*1000/8=100000B(about 128KB). - // other examples: - // 2000*3000/8=750000B(about 732KB). - // 2000*5000/8=1250000B(about 1220KB). - int kbps = 5000; - int socket_buffer_size = sleep_ms * kbps / 8; - - // socket send buffer, system will double it. - int nb_sbuf = socket_buffer_size / 2; - - // override the send buffer by macro. -#ifdef SRS_PERF_SO_SNDBUF_SIZE - nb_sbuf = SRS_PERF_SO_SNDBUF_SIZE / 2; -#endif - - // set the socket send buffer when required larger buffer - if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, sock_buf_size) < 0) { - srs_warn("set sock SO_SENDBUF=%d failed.", nb_sbuf); - } - getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, &sock_buf_size); - - srs_trace("mw changed sleep %d=>%d, max_msgs=%d, esbuf=%d, sbuf %d=>%d, realtime=%d", - mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, socket_buffer_size, - onb_sbuf, nb_sbuf, realtime); -#else - srs_trace("mw changed sleep %d=>%d, max_msgs=%d, sbuf %d, realtime=%d", - mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, onb_sbuf, realtime); -#endif - + set_socket_buffer(sleep_ms); mw_sleep = sleep_ms; } @@ -1188,25 +1147,12 @@ void SrsRtmpConn::set_sock_options() bool nvalue = _srs_config->get_tcp_nodelay(req->vhost); if (nvalue != tcp_nodelay) { tcp_nodelay = nvalue; -#ifdef SRS_PERF_TCP_NODELAY - int fd = srs_netfd_fileno(stfd); - socklen_t nb_v = sizeof(int); - - int ov = 0; - getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &ov, &nb_v); - - int v = tcp_nodelay; - // set the socket send buffer when required larger buffer - if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, nb_v) < 0) { - srs_warn("set sock TCP_NODELAY=%d failed.", v); + srs_error_t err = set_tcp_nodelay(tcp_nodelay); + if (err != srs_success) { + srs_warn("ignore err %s", srs_error_desc(err).c_str()); + srs_freep(err); } - getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, &nb_v); - - srs_trace("set TCP_NODELAY %d=>%d", ov, v); -#else - srs_warn("SRS_PERF_TCP_NODELAY is disabled but tcp_nodelay configed."); -#endif } } diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 237cf0aa8..dd60634c7 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -27,7 +27,7 @@ // current release version #define VERSION_MAJOR 3 #define VERSION_MINOR 0 -#define VERSION_REVISION 37 +#define VERSION_REVISION 38 // generated by configure, only macros. #include diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 9ffc1c557..3666f096a 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -112,6 +112,8 @@ #define ERROR_ASPROCESS_PPID 1073 #define ERROR_EXCEED_CONNECTIONS 1074 #define ERROR_SOCKET_SETKEEPALIVE 1075 +#define ERROR_SOCKET_NO_NODELAY 1076 +#define ERROR_SOCKET_SNDBUF 1077 /////////////////////////////////////////////////////// // RTMP protocol error.