diff --git a/trunk/configure b/trunk/configure index a380aba72..c6daeedde 100755 --- a/trunk/configure +++ b/trunk/configure @@ -388,7 +388,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config" "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" "srs_app_http_conn" "srs_app_http_hooks" "srs_app_json" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge" - "srs_app_kbps" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_avc_aac") + "srs_app_kbps" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_avc_aac" + "srs_app_poll") APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . auto/modules.sh APP_OBJS="${MODULE_OBJS[@]}" fi diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index b455459bd..0513cfada 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1502,9 +1502,9 @@ int SrsConfig::check_config() "total=%d(max_connections=%d, nb_consumed_fds=%d), ret=%d. " "you can change max_connections from %d to %d, or " "you can login as root and set the limit: ulimit -HSn %d", - nb_connections, nb_total, max_open_files, + nb_connections, nb_total + 1, max_open_files, nb_total, nb_connections, nb_consumed_fds, - ret, nb_connections, nb_canbe, nb_total); + ret, nb_connections, nb_canbe, nb_total + 1); return ret; } } diff --git a/trunk/src/app/srs_app_poll.cpp b/trunk/src/app/srs_app_poll.cpp new file mode 100644 index 000000000..859e49e11 --- /dev/null +++ b/trunk/src/app/srs_app_poll.cpp @@ -0,0 +1,178 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include +#include + +// the interval in us to refresh the poll for all fds. +// for performance refine, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 +#define SRS_POLL_CYCLE_INTERVAL 10 * 1000 * 1000 + +SrsPoll::SrsPoll() +{ + _pds = NULL; + pthread = new SrsThread(this, 0, false); +} + +SrsPoll::~SrsPoll() +{ + srs_freep(_pds); + srs_freep(pthread); + fds.clear(); +} + +int SrsPoll::start() +{ + return pthread->start(); +} + +int SrsPoll::cycle() +{ + int ret = ERROR_SUCCESS; + + if (fds.size() == 0) { + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + return ret; + } + + int nb_pds = (int)fds.size(); + + // TODO: FIXME: use more efficient way for the poll. + srs_freep(_pds); + _pds = new pollfd[nb_pds]; + + if (true) { + int index = 0; + + std::map::iterator it; + for (it = fds.begin(); it != fds.end(); ++it) { + int fd = it->first; + + pollfd& pfd = _pds[index++]; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + } + + srs_assert(index == (int)fds.size()); + } + + // Upon successful completion, a non-negative value is returned. + // A positive value indicates the total number of OS file descriptors in pds that have events. + // A value of 0 indicates that the call timed out. + if (st_poll(_pds, nb_pds, SRS_POLL_CYCLE_INTERVAL) < 0) { + srs_warn("ignore st_poll failed, size=%d", nb_pds); + return ret; + } + + for (int i = 0; i < nb_pds; i++) { + if (!(_pds[i].revents & POLLIN)) { + continue; + } + + int fd = _pds[i].fd; + if (fds.find(fd) == fds.end()) { + continue; + } + + SrsPollFD* owner = fds[fd]; + owner->set_active(true); + } + + return ret; +} + +int SrsPoll::add(st_netfd_t stfd, SrsPollFD* owner) +{ + int ret = ERROR_SUCCESS; + + int fd = st_netfd_fileno(stfd); + if (fds.find(fd) != fds.end()) { + ret = ERROR_RTMP_POLL_FD_DUPLICATED; + srs_error("fd exists, fd=%d, ret=%d", fd, ret); + return ret; + } + + fds[fd] = owner; + + return ret; +} + +void SrsPoll::remove(st_netfd_t stfd, SrsPollFD* owner) +{ + std::map::iterator it; + + int fd = st_netfd_fileno(stfd); + if ((it = fds.find(fd)) != fds.end()) { + fds.erase(it); + } +} + +SrsPoll* SrsPoll::_instance = new SrsPoll(); + +SrsPoll* SrsPoll::instance() +{ + return _instance; +} + +SrsPollFD::SrsPollFD() +{ + _stfd = NULL; + _active = false; +} + +SrsPollFD::~SrsPollFD() +{ + if (_stfd) { + SrsPoll* poll = SrsPoll::instance(); + poll->remove(_stfd, this); + } +} + +int SrsPollFD::initialize(st_netfd_t stfd) +{ + int ret = ERROR_SUCCESS; + + _stfd = stfd; + + SrsPoll* poll = SrsPoll::instance(); + if ((ret = poll->add(stfd, this)) != ERROR_SUCCESS) { + srs_error("add fd to poll failed. ret=%d", ret); + return ret; + } + + return ret; +} + +bool SrsPollFD::active() +{ + return _active; +} + +void SrsPollFD::set_active(bool v) +{ + _active = v; +} + diff --git a/trunk/src/app/srs_app_poll.hpp b/trunk/src/app/srs_app_poll.hpp new file mode 100644 index 000000000..7a962da69 --- /dev/null +++ b/trunk/src/app/srs_app_poll.hpp @@ -0,0 +1,109 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_APP_POLL_HPP +#define SRS_APP_POLL_HPP + +/* +#include +*/ + +#include + +#include + +#include +#include + +class SrsPollFD; + +/** +* the poll for all play clients to finger the active fd out. +* for performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 +* the poll is shared by all SrsPollFD, and we start an isolate thread to finger the active fds. +*/ +class SrsPoll : public ISrsThreadHandler +{ +private: + SrsThread* pthread; + pollfd* _pds; + std::map fds; +public: + SrsPoll(); + virtual ~SrsPoll(); +public: + /** + * start the poll thread. + */ + virtual int start(); + /** + * start an cycle thread. + */ + virtual int cycle(); +public: + /** + * add the fd to poll. + */ + virtual int add(st_netfd_t stfd, SrsPollFD* owner); + /** + * remove the fd to poll, ignore any error. + */ + virtual void remove(st_netfd_t stfd, SrsPollFD* owner); +// singleton +private: + static SrsPoll* _instance; +public: + static SrsPoll* instance(); +}; + +/** +* the poll fd to check whether the specified fd is active. +*/ +class SrsPollFD +{ +private: + st_netfd_t _stfd; + // whether current fd is active. + bool _active; +public: + SrsPollFD(); + virtual ~SrsPollFD(); +public: + /** + * initialize the poll. + * @param stfd the fd to poll. + */ + virtual int initialize(st_netfd_t stfd); + /** + * whether fd is active. + */ + virtual bool active(); + /** + * the poll will set to fd active when got data to read, + * the connection will set to deactive when data read. + */ + virtual void set_active(bool v); +}; + +#endif + diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index df543c7a3..7c91d06a1 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -48,6 +48,7 @@ using namespace std; #include #include #include +#include // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, @@ -516,12 +517,20 @@ int SrsRtmpConn::playing(SrsSource* source) SrsAutoFree(SrsConsumer, consumer); srs_verbose("consumer created success."); - rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + // use poll fd to manage the connection, read when active. + SrsPollFD poll_fd; + if ((ret = poll_fd.initialize(stfd)) != ERROR_SUCCESS) { + return ret; + } - SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); + // TODO: FIXME: remove following. + //rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT); + rtmp->set_send_timeout(ST_UTIME_NO_TIMEOUT); + // initialize other components + SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS); - bool user_specified_duration_to_stop = (req->duration > 0); int64_t starttime = -1; @@ -530,7 +539,9 @@ int SrsRtmpConn::playing(SrsSource* source) pithy_print.elapse(); // read from client. - if (true) { + if (poll_fd.active()) { + poll_fd.set_active(false); + SrsMessage* msg = NULL; ret = rtmp->recv_message(&msg); srs_verbose("play loop recv message. ret=%d", ret); @@ -559,6 +570,13 @@ int SrsRtmpConn::playing(SrsSource* source) srs_error("get messages from consumer failed. ret=%d", ret); return ret; } + + // no data, sleep a while. + // for the poll_fd maybe not active, and no message. + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 + if (count <= 0) { + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + } // reportable if (pithy_print.can_print()) { diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 49df43998..2678f44eb 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // signal defines. #define SIGNAL_RELOAD SIGHUP @@ -664,6 +665,14 @@ int SrsServer::do_cycle() { int ret = ERROR_SUCCESS; + // start the poll for play clients. + // performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 + SrsPoll* poll = SrsPoll::instance(); + if ((ret = poll->start()) != ERROR_SUCCESS) { + srs_error("start poll failed. ret=%d", ret); + return ret; + } + // find the max loop int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES); diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 21ca885de..1bf74e940 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -185,6 +185,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_EDGE_VHOST_REMOVED 3039 #define ERROR_HLS_AVC_TRY_OTHERS 3040 #define ERROR_H264_API_NO_PREFIXED 3041 +#define ERROR_RTMP_POLL_FD_DUPLICATED 3042 /** * whether the error code is an system control error. diff --git a/trunk/src/srs/srs.upp b/trunk/src/srs/srs.upp index 5fa0bfbf9..c73673d53 100755 --- a/trunk/src/srs/srs.upp +++ b/trunk/src/srs/srs.upp @@ -92,6 +92,8 @@ file ..\app\srs_app_kbps.cpp, ..\app\srs_app_log.hpp, ..\app\srs_app_log.cpp, + ..\app\srs_app_poll.hpp, + ..\app\srs_app_poll.cpp, ..\app\srs_app_refer.hpp, ..\app\srs_app_refer.cpp, ..\app\srs_app_reload.hpp,