From 133cc62b51c7eef01c2e6642aad9354fa12e874b Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Nov 2014 17:54:02 +0800 Subject: [PATCH 1/4] for bug #194, use play fd poll, create the singleton poll --- trunk/configure | 3 +- trunk/src/app/srs_app_poll.cpp | 75 ++++++++++++++++++++++++ trunk/src/app/srs_app_poll.hpp | 88 +++++++++++++++++++++++++++++ trunk/src/app/srs_app_rtmp_conn.cpp | 6 ++ trunk/src/app/srs_app_server.cpp | 9 +++ trunk/src/srs/srs.upp | 2 + 6 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 trunk/src/app/srs_app_poll.cpp create mode 100644 trunk/src/app/srs_app_poll.hpp diff --git a/trunk/configure b/trunk/configure index a380aba72..c6daeedde 100755 --- a/trunk/configure +++ b/trunk/configure @@ -388,7 +388,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config" "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" "srs_app_http_conn" "srs_app_http_hooks" "srs_app_json" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge" - "srs_app_kbps" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_avc_aac") + "srs_app_kbps" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_avc_aac" + "srs_app_poll") APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . auto/modules.sh APP_OBJS="${MODULE_OBJS[@]}" fi diff --git a/trunk/src/app/srs_app_poll.cpp b/trunk/src/app/srs_app_poll.cpp new file mode 100644 index 000000000..fd7546643 --- /dev/null +++ b/trunk/src/app/srs_app_poll.cpp @@ -0,0 +1,75 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include + +SrsPoll::SrsPoll() +{ + pthread = new SrsThread(this, 0, false); +} + +SrsPoll::~SrsPoll() +{ + srs_freep(pthread); + fds.clear(); +} + +int SrsPoll::start() +{ + return pthread->start(); +} + +int SrsPoll::cycle() +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +SrsPoll* SrsPoll::_instance = new SrsPoll(); + +SrsPoll* SrsPoll::instance() +{ + return _instance; +} + +SrsPollFD::SrsPollFD() +{ + _stfd = NULL; +} + +SrsPollFD::~SrsPollFD() +{ +} + +int SrsPollFD::initialize(st_netfd_t stfd) +{ + int ret = ERROR_SUCCESS; + + _stfd = stfd; + + return ret; +} + diff --git a/trunk/src/app/srs_app_poll.hpp b/trunk/src/app/srs_app_poll.hpp new file mode 100644 index 000000000..8ad59d10b --- /dev/null +++ b/trunk/src/app/srs_app_poll.hpp @@ -0,0 +1,88 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_APP_POLL_HPP +#define SRS_APP_POLL_HPP + +/* +#include +*/ + +#include + +#include + +#include +#include + +class SrsPollFD; + +/** +* the poll for all play clients to finger the active fd out. +* for performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 +* the poll is shared by all SrsPollFD, and we start an isolate thread to finger the active fds. +*/ +class SrsPoll : public ISrsThreadHandler +{ +private: + SrsThread* pthread; + std::map fds; +public: + SrsPoll(); + virtual ~SrsPoll(); +public: + /** + * start the poll thread. + */ + virtual int start(); + /** + * start an cycle thread. + */ + virtual int cycle(); +// singleton +private: + static SrsPoll* _instance; +public: + static SrsPoll* instance(); +}; + +/** +* the poll fd to check whether the specified fd is active. +*/ +class SrsPollFD +{ +private: + st_netfd_t _stfd; +public: + SrsPollFD(); + virtual ~SrsPollFD(); +public: + /** + * initialize the poll. + * @param stfd the fd to poll. + */ + virtual int initialize(st_netfd_t stfd); +}; + +#endif + diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index df543c7a3..88a186d93 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -48,6 +48,7 @@ using namespace std; #include #include #include +#include // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, @@ -525,6 +526,11 @@ int SrsRtmpConn::playing(SrsSource* source) bool user_specified_duration_to_stop = (req->duration > 0); int64_t starttime = -1; + SrsPollFD poll; + if ((ret = poll.initialize(stfd)) != ERROR_SUCCESS) { + return ret; + } + while (true) { // collect elapse for pithy print. pithy_print.elapse(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 49df43998..2678f44eb 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // signal defines. #define SIGNAL_RELOAD SIGHUP @@ -664,6 +665,14 @@ int SrsServer::do_cycle() { int ret = ERROR_SUCCESS; + // start the poll for play clients. + // performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 + SrsPoll* poll = SrsPoll::instance(); + if ((ret = poll->start()) != ERROR_SUCCESS) { + srs_error("start poll failed. ret=%d", ret); + return ret; + } + // find the max loop int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES); diff --git a/trunk/src/srs/srs.upp b/trunk/src/srs/srs.upp index 5fa0bfbf9..c73673d53 100755 --- a/trunk/src/srs/srs.upp +++ b/trunk/src/srs/srs.upp @@ -92,6 +92,8 @@ file ..\app\srs_app_kbps.cpp, ..\app\srs_app_log.hpp, ..\app\srs_app_log.cpp, + ..\app\srs_app_poll.hpp, + ..\app\srs_app_poll.cpp, ..\app\srs_app_refer.hpp, ..\app\srs_app_refer.cpp, ..\app\srs_app_reload.hpp, From 164f632b62eb976e21b506d43df0e0e4498ae67c Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Nov 2014 18:35:46 +0800 Subject: [PATCH 2/4] for bug #194, add fds poll, just sleep to send without recv. --- trunk/src/app/srs_app_poll.cpp | 100 +++++++++++++++++++++++++- trunk/src/app/srs_app_poll.hpp | 23 +++++- trunk/src/app/srs_app_rtmp_conn.cpp | 30 +++++--- trunk/src/kernel/srs_kernel_error.hpp | 1 + 4 files changed, 143 insertions(+), 11 deletions(-) diff --git a/trunk/src/app/srs_app_poll.cpp b/trunk/src/app/srs_app_poll.cpp index fd7546643..6ead87078 100644 --- a/trunk/src/app/srs_app_poll.cpp +++ b/trunk/src/app/srs_app_poll.cpp @@ -24,14 +24,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include SrsPoll::SrsPoll() { + _pds = NULL; pthread = new SrsThread(this, 0, false); } SrsPoll::~SrsPoll() { + srs_freep(_pds); srs_freep(pthread); fds.clear(); } @@ -44,10 +47,84 @@ int SrsPoll::start() int SrsPoll::cycle() { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + + if (fds.size() == 0) { + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + return ret; + } + + int nb_pds = (int)fds.size(); + +st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); +return ret; + + srs_freep(_pds); + _pds = new pollfd[nb_pds]; + + if (true) { + int index = 0; + + std::map::iterator it; + for (it = fds.begin(); it != fds.end(); ++it) { + int fd = it->first; + + pollfd& pfd = _pds[index++]; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + } + + srs_assert(index == (int)fds.size()); + } + + if (st_poll(_pds, nb_pds, ST_UTIME_NO_TIMEOUT) <= 0) { + srs_warn("ignore st_poll failed, size=%d", nb_pds); + return ret; + } + + for (int i = 0; i < nb_pds; i++) { + if (!(_pds[i].revents & POLLIN)) { + continue; + } + + int fd = _pds[i].fd; + if (fds.find(fd) == fds.end()) { + continue; + } + + SrsPollFD* owner = fds[fd]; + owner->set_active(true); + } + + return ret; +} + +int SrsPoll::add(st_netfd_t stfd, SrsPollFD* owner) +{ + int ret = ERROR_SUCCESS; + + int fd = st_netfd_fileno(stfd); + if (fds.find(fd) != fds.end()) { + ret = ERROR_RTMP_POLL_FD_DUPLICATED; + srs_error("fd exists, fd=%d, ret=%d", fd, ret); + return ret; + } + + fds[fd] = owner; + return ret; } +void SrsPoll::remove(st_netfd_t stfd, SrsPollFD* owner) +{ + std::map::iterator it; + + int fd = st_netfd_fileno(stfd); + if ((it = fds.find(fd)) != fds.end()) { + fds.erase(it); + } +} + SrsPoll* SrsPoll::_instance = new SrsPoll(); SrsPoll* SrsPoll::instance() @@ -58,10 +135,15 @@ SrsPoll* SrsPoll::instance() SrsPollFD::SrsPollFD() { _stfd = NULL; + _active = false; } SrsPollFD::~SrsPollFD() { + if (_stfd) { + SrsPoll* poll = SrsPoll::instance(); + poll->remove(_stfd, this); + } } int SrsPollFD::initialize(st_netfd_t stfd) @@ -70,6 +152,22 @@ int SrsPollFD::initialize(st_netfd_t stfd) _stfd = stfd; + SrsPoll* poll = SrsPoll::instance(); + if ((ret = poll->add(stfd, this)) != ERROR_SUCCESS) { + srs_error("add fd to poll failed. ret=%d", ret); + return ret; + } + return ret; } +bool SrsPollFD::active() +{ + return _active; +} + +void SrsPollFD::set_active(bool v) +{ + _active = v; +} + diff --git a/trunk/src/app/srs_app_poll.hpp b/trunk/src/app/srs_app_poll.hpp index 8ad59d10b..7a962da69 100644 --- a/trunk/src/app/srs_app_poll.hpp +++ b/trunk/src/app/srs_app_poll.hpp @@ -46,7 +46,8 @@ class SrsPoll : public ISrsThreadHandler { private: SrsThread* pthread; - std::map fds; + pollfd* _pds; + std::map fds; public: SrsPoll(); virtual ~SrsPoll(); @@ -59,6 +60,15 @@ public: * start an cycle thread. */ virtual int cycle(); +public: + /** + * add the fd to poll. + */ + virtual int add(st_netfd_t stfd, SrsPollFD* owner); + /** + * remove the fd to poll, ignore any error. + */ + virtual void remove(st_netfd_t stfd, SrsPollFD* owner); // singleton private: static SrsPoll* _instance; @@ -73,6 +83,8 @@ class SrsPollFD { private: st_netfd_t _stfd; + // whether current fd is active. + bool _active; public: SrsPollFD(); virtual ~SrsPollFD(); @@ -82,6 +94,15 @@ public: * @param stfd the fd to poll. */ virtual int initialize(st_netfd_t stfd); + /** + * whether fd is active. + */ + virtual bool active(); + /** + * the poll will set to fd active when got data to read, + * the connection will set to deactive when data read. + */ + virtual void set_active(bool v); }; #endif diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 88a186d93..7c91d06a1 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -517,26 +517,31 @@ int SrsRtmpConn::playing(SrsSource* source) SrsAutoFree(SrsConsumer, consumer); srs_verbose("consumer created success."); - rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + // use poll fd to manage the connection, read when active. + SrsPollFD poll_fd; + if ((ret = poll_fd.initialize(stfd)) != ERROR_SUCCESS) { + return ret; + } - SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); + // TODO: FIXME: remove following. + //rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT); + rtmp->set_send_timeout(ST_UTIME_NO_TIMEOUT); + // initialize other components + SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS); - bool user_specified_duration_to_stop = (req->duration > 0); int64_t starttime = -1; - SrsPollFD poll; - if ((ret = poll.initialize(stfd)) != ERROR_SUCCESS) { - return ret; - } - while (true) { // collect elapse for pithy print. pithy_print.elapse(); // read from client. - if (true) { + if (poll_fd.active()) { + poll_fd.set_active(false); + SrsMessage* msg = NULL; ret = rtmp->recv_message(&msg); srs_verbose("play loop recv message. ret=%d", ret); @@ -565,6 +570,13 @@ int SrsRtmpConn::playing(SrsSource* source) srs_error("get messages from consumer failed. ret=%d", ret); return ret; } + + // no data, sleep a while. + // for the poll_fd maybe not active, and no message. + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 + if (count <= 0) { + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + } // reportable if (pithy_print.can_print()) { diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 21ca885de..1bf74e940 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -185,6 +185,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_EDGE_VHOST_REMOVED 3039 #define ERROR_HLS_AVC_TRY_OTHERS 3040 #define ERROR_H264_API_NO_PREFIXED 3041 +#define ERROR_RTMP_POLL_FD_DUPLICATED 3042 /** * whether the error code is an system control error. From 21f16f3a83b63ffb7d868bf0ddcf9e5a655ddecc Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Nov 2014 18:46:35 +0800 Subject: [PATCH 3/4] for bug #194, use play fd poll to improve performance, it works. --- trunk/src/app/srs_app_poll.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/trunk/src/app/srs_app_poll.cpp b/trunk/src/app/srs_app_poll.cpp index 6ead87078..859e49e11 100644 --- a/trunk/src/app/srs_app_poll.cpp +++ b/trunk/src/app/srs_app_poll.cpp @@ -26,6 +26,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +// the interval in us to refresh the poll for all fds. +// for performance refine, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 +#define SRS_POLL_CYCLE_INTERVAL 10 * 1000 * 1000 + SrsPoll::SrsPoll() { _pds = NULL; @@ -54,10 +58,8 @@ int SrsPoll::cycle() } int nb_pds = (int)fds.size(); - -st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); -return ret; + // TODO: FIXME: use more efficient way for the poll. srs_freep(_pds); _pds = new pollfd[nb_pds]; @@ -77,7 +79,10 @@ return ret; srs_assert(index == (int)fds.size()); } - if (st_poll(_pds, nb_pds, ST_UTIME_NO_TIMEOUT) <= 0) { + // Upon successful completion, a non-negative value is returned. + // A positive value indicates the total number of OS file descriptors in pds that have events. + // A value of 0 indicates that the call timed out. + if (st_poll(_pds, nb_pds, SRS_POLL_CYCLE_INTERVAL) < 0) { srs_warn("ignore st_poll failed, size=%d", nb_pds); return ret; } From 4f21e92ae06c948a46fda1e862a6c18c7ffd5830 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Nov 2014 18:49:27 +0800 Subject: [PATCH 4/4] fix the message of config for max_connections. --- trunk/src/app/srs_app_config.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index b455459bd..0513cfada 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1502,9 +1502,9 @@ int SrsConfig::check_config() "total=%d(max_connections=%d, nb_consumed_fds=%d), ret=%d. " "you can change max_connections from %d to %d, or " "you can login as root and set the limit: ulimit -HSn %d", - nb_connections, nb_total, max_open_files, + nb_connections, nb_total + 1, max_open_files, nb_total, nb_connections, nb_consumed_fds, - ret, nb_connections, nb_canbe, nb_total); + ret, nb_connections, nb_canbe, nb_total + 1); return ret; } }