diff --git a/trunk/src/app/srs_app_poll.cpp b/trunk/src/app/srs_app_poll.cpp index fd7546643..6ead87078 100644 --- a/trunk/src/app/srs_app_poll.cpp +++ b/trunk/src/app/srs_app_poll.cpp @@ -24,14 +24,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include SrsPoll::SrsPoll() { + _pds = NULL; pthread = new SrsThread(this, 0, false); } SrsPoll::~SrsPoll() { + srs_freep(_pds); srs_freep(pthread); fds.clear(); } @@ -44,10 +47,84 @@ int SrsPoll::start() int SrsPoll::cycle() { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + + if (fds.size() == 0) { + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + return ret; + } + + int nb_pds = (int)fds.size(); + +st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); +return ret; + + srs_freep(_pds); + _pds = new pollfd[nb_pds]; + + if (true) { + int index = 0; + + std::map::iterator it; + for (it = fds.begin(); it != fds.end(); ++it) { + int fd = it->first; + + pollfd& pfd = _pds[index++]; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + } + + srs_assert(index == (int)fds.size()); + } + + if (st_poll(_pds, nb_pds, ST_UTIME_NO_TIMEOUT) <= 0) { + srs_warn("ignore st_poll failed, size=%d", nb_pds); + return ret; + } + + for (int i = 0; i < nb_pds; i++) { + if (!(_pds[i].revents & POLLIN)) { + continue; + } + + int fd = _pds[i].fd; + if (fds.find(fd) == fds.end()) { + continue; + } + + SrsPollFD* owner = fds[fd]; + owner->set_active(true); + } + + return ret; +} + +int SrsPoll::add(st_netfd_t stfd, SrsPollFD* owner) +{ + int ret = ERROR_SUCCESS; + + int fd = st_netfd_fileno(stfd); + if (fds.find(fd) != fds.end()) { + ret = ERROR_RTMP_POLL_FD_DUPLICATED; + srs_error("fd exists, fd=%d, ret=%d", fd, ret); + return ret; + } + + fds[fd] = owner; + return ret; } +void SrsPoll::remove(st_netfd_t stfd, SrsPollFD* owner) +{ + std::map::iterator it; + + int fd = st_netfd_fileno(stfd); + if ((it = fds.find(fd)) != fds.end()) { + fds.erase(it); + } +} + SrsPoll* SrsPoll::_instance = new SrsPoll(); SrsPoll* SrsPoll::instance() @@ -58,10 +135,15 @@ SrsPoll* SrsPoll::instance() SrsPollFD::SrsPollFD() { _stfd = NULL; + _active = false; } SrsPollFD::~SrsPollFD() { + if (_stfd) { + SrsPoll* poll = SrsPoll::instance(); + poll->remove(_stfd, this); + } } int SrsPollFD::initialize(st_netfd_t stfd) @@ -70,6 +152,22 @@ int SrsPollFD::initialize(st_netfd_t stfd) _stfd = stfd; + SrsPoll* poll = SrsPoll::instance(); + if ((ret = poll->add(stfd, this)) != ERROR_SUCCESS) { + srs_error("add fd to poll failed. ret=%d", ret); + return ret; + } + return ret; } +bool SrsPollFD::active() +{ + return _active; +} + +void SrsPollFD::set_active(bool v) +{ + _active = v; +} + diff --git a/trunk/src/app/srs_app_poll.hpp b/trunk/src/app/srs_app_poll.hpp index 8ad59d10b..7a962da69 100644 --- a/trunk/src/app/srs_app_poll.hpp +++ b/trunk/src/app/srs_app_poll.hpp @@ -46,7 +46,8 @@ class SrsPoll : public ISrsThreadHandler { private: SrsThread* pthread; - std::map fds; + pollfd* _pds; + std::map fds; public: SrsPoll(); virtual ~SrsPoll(); @@ -59,6 +60,15 @@ public: * start an cycle thread. */ virtual int cycle(); +public: + /** + * add the fd to poll. + */ + virtual int add(st_netfd_t stfd, SrsPollFD* owner); + /** + * remove the fd to poll, ignore any error. + */ + virtual void remove(st_netfd_t stfd, SrsPollFD* owner); // singleton private: static SrsPoll* _instance; @@ -73,6 +83,8 @@ class SrsPollFD { private: st_netfd_t _stfd; + // whether current fd is active. + bool _active; public: SrsPollFD(); virtual ~SrsPollFD(); @@ -82,6 +94,15 @@ public: * @param stfd the fd to poll. */ virtual int initialize(st_netfd_t stfd); + /** + * whether fd is active. + */ + virtual bool active(); + /** + * the poll will set to fd active when got data to read, + * the connection will set to deactive when data read. + */ + virtual void set_active(bool v); }; #endif diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 88a186d93..7c91d06a1 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -517,26 +517,31 @@ int SrsRtmpConn::playing(SrsSource* source) SrsAutoFree(SrsConsumer, consumer); srs_verbose("consumer created success."); - rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + // use poll fd to manage the connection, read when active. + SrsPollFD poll_fd; + if ((ret = poll_fd.initialize(stfd)) != ERROR_SUCCESS) { + return ret; + } - SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); + // TODO: FIXME: remove following. + //rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT); + rtmp->set_send_timeout(ST_UTIME_NO_TIMEOUT); + // initialize other components + SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS); - bool user_specified_duration_to_stop = (req->duration > 0); int64_t starttime = -1; - SrsPollFD poll; - if ((ret = poll.initialize(stfd)) != ERROR_SUCCESS) { - return ret; - } - while (true) { // collect elapse for pithy print. pithy_print.elapse(); // read from client. - if (true) { + if (poll_fd.active()) { + poll_fd.set_active(false); + SrsMessage* msg = NULL; ret = rtmp->recv_message(&msg); srs_verbose("play loop recv message. ret=%d", ret); @@ -565,6 +570,13 @@ int SrsRtmpConn::playing(SrsSource* source) srs_error("get messages from consumer failed. ret=%d", ret); return ret; } + + // no data, sleep a while. + // for the poll_fd maybe not active, and no message. + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 + if (count <= 0) { + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + } // reportable if (pithy_print.can_print()) { diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 21ca885de..1bf74e940 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -185,6 +185,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_EDGE_VHOST_REMOVED 3039 #define ERROR_HLS_AVC_TRY_OTHERS 3040 #define ERROR_H264_API_NO_PREFIXED 3041 +#define ERROR_RTMP_POLL_FD_DUPLICATED 3042 /** * whether the error code is an system control error.