diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index df5eb5625..b28e71a50 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -37,19 +37,27 @@ using namespace std; // the max small bytes to group #define SRS_MR_SMALL_BYTES 4096 -ISrsMessageHandler::ISrsMessageHandler() +ISrsMessageConsumer::ISrsMessageConsumer() { } -ISrsMessageHandler::~ISrsMessageHandler() +ISrsMessageConsumer::~ISrsMessageConsumer() { } -SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int tm) +ISrsMessagePumper::ISrsMessagePumper() { +} + +ISrsMessagePumper::~ISrsMessagePumper() +{ +} + +SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm) +{ + rtmp = r; + pumper = p; timeout = tm; - handler = msg_handler; - rtmp = rtmp_sdk; trd = new SrsReusableThread2("recv", this); } @@ -87,29 +95,29 @@ int SrsRecvThread::cycle() int ret = ERROR_SUCCESS; while (!trd->interrupted()) { - if (!handler->can_handle()) { + // When the pumper is interrupted, wait then retry. + if (pumper->interrupted()) { st_usleep(timeout * 1000); continue; } SrsCommonMessage* msg = NULL; - // recv and handle message - ret = rtmp->recv_message(&msg); - if (ret == ERROR_SUCCESS) { - ret = handler->handle(msg); + // Process the received message. + if ((ret = rtmp->recv_message(&msg)) == ERROR_SUCCESS) { + ret = pumper->consume(msg); } if (ret != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { - srs_error("thread process message failed. ret=%d", ret); + srs_error("recv thread error. ret=%d", ret); } - // we use no timeout to recv, should never got any error. + // Interrupt the receive thread for any error. trd->interrupt(); - // notice the handler got a recv error. - handler->on_recv_error(ret); + // Notify the pumper to quit for error. + pumper->interrupt(ret); return ret; } @@ -128,7 +136,7 @@ void SrsRecvThread::on_thread_start() // @see: https://github.com/ossrs/srs/issues/217 rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS); - handler->on_thread_start(); + pumper->on_start(); } void SrsRecvThread::on_thread_stop() @@ -136,7 +144,7 @@ void SrsRecvThread::on_thread_stop() // reset the timeout to pulse mode. rtmp->set_recv_timeout(timeout * 1000); - handler->on_thread_stop(); + pumper->on_stop(); } SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms) @@ -196,16 +204,7 @@ int SrsQueueRecvThread::error_code() return recv_error_code; } -bool SrsQueueRecvThread::can_handle() -{ - // we only recv one message and then process it, - // for the message may cause the thread to stop, - // when stop, the thread is freed, so the messages - // are dropped. - return empty(); -} - -int SrsQueueRecvThread::handle(SrsCommonMessage* msg) +int SrsQueueRecvThread::consume(SrsCommonMessage* msg) { // put into queue, the send thread will get and process it, // @see SrsRtmpConn::process_play_control_msg @@ -218,9 +217,19 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg) return ERROR_SUCCESS; } -void SrsQueueRecvThread::on_recv_error(int ret) +bool SrsQueueRecvThread::interrupted() +{ + // we only recv one message and then process it, + // for the message may cause the thread to stop, + // when stop, the thread is freed, so the messages + // are dropped. + return !empty(); +} + +void SrsQueueRecvThread::interrupt(int ret) { recv_error_code = ret; + #ifdef SRS_PERF_QUEUE_COND_WAIT if (_consumer) { _consumer->wakeup(); @@ -228,14 +237,14 @@ void SrsQueueRecvThread::on_recv_error(int ret) #endif } -void SrsQueueRecvThread::on_thread_start() +void SrsQueueRecvThread::on_start() { // disable the protocol auto response, // for the isolate recv thread should never send any messages. rtmp->set_auto_response(false); } -void SrsQueueRecvThread::on_thread_stop() +void SrsQueueRecvThread::on_stop() { // enable the protocol auto response, // for the isolate recv thread terminated. @@ -325,7 +334,48 @@ void SrsPublishRecvThread::stop() trd.stop(); } -void SrsPublishRecvThread::on_thread_start() +int SrsPublishRecvThread::consume(SrsCommonMessage* msg) +{ + int ret = ERROR_SUCCESS; + + // when cid changed, change it. + if (ncid != cid) { + _srs_context->set_id(ncid); + cid = ncid; + } + + _nb_msgs++; + + // log to show the time of recv thread. + srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d", + srs_update_system_time_ms(), msg->header.timestamp, msg->size); + + // the rtmp connection will handle this message + ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge); + + // must always free it, + // the source will copy it if need to use. + srs_freep(msg); + + return ret; +} + +bool SrsPublishRecvThread::interrupted() +{ + // Never interrupted, always can handle message. + return false; +} + +void SrsPublishRecvThread::interrupt(int ret) +{ + recv_error_code = ret; + + // when recv thread error, signal the conn thread to process it. + // @see https://github.com/ossrs/srs/issues/244 + st_cond_signal(error); +} + +void SrsPublishRecvThread::on_start() { // we donot set the auto response to false, // for the main thread never send message. @@ -342,7 +392,7 @@ void SrsPublishRecvThread::on_thread_start() #endif } -void SrsPublishRecvThread::on_thread_stop() +void SrsPublishRecvThread::on_stop() { // we donot set the auto response to true, // for we donot set to false yet. @@ -360,47 +410,6 @@ void SrsPublishRecvThread::on_thread_stop() #endif } -bool SrsPublishRecvThread::can_handle() -{ - // publish thread always can handle message. - return true; -} - -int SrsPublishRecvThread::handle(SrsCommonMessage* msg) -{ - int ret = ERROR_SUCCESS; - - // when cid changed, change it. - if (ncid != cid) { - _srs_context->set_id(ncid); - cid = ncid; - } - - _nb_msgs++; - - // log to show the time of recv thread. - srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d", - srs_update_system_time_ms(), msg->header.timestamp, msg->size); - - // the rtmp connection will handle this message - ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge); - - // must always free it, - // the source will copy it if need to use. - srs_freep(msg); - - return ret; -} - -void SrsPublishRecvThread::on_recv_error(int ret) -{ - recv_error_code = ret; - - // when recv thread error, signal the conn thread to process it. - // @see https://github.com/ossrs/srs/issues/244 - st_cond_signal(error); -} - #ifdef SRS_PERF_MERGED_READ void SrsPublishRecvThread::on_read(ssize_t nread) { diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 2f713c4f7..b97dbbc83 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -45,36 +45,48 @@ class SrsRequest; class SrsConsumer; /** - * for the recv thread to handle the message. + * The message consumer which consume a message. */ -class ISrsMessageHandler +class ISrsMessageConsumer { public: - ISrsMessageHandler(); - virtual ~ISrsMessageHandler(); + ISrsMessageConsumer(); + virtual ~ISrsMessageConsumer(); public: /** - * whether the handler can handle, - * for example, when queue recv handler got an message, - * it wait the user to process it, then the recv thread - * never recv message util the handler is ok. - */ - virtual bool can_handle() = 0; - /** - * process the received message. + * Consume the received message. * @remark user must free this message. */ - virtual int handle(SrsCommonMessage* msg) = 0; + virtual int consume(SrsCommonMessage* msg) = 0; +}; + +/** + * The message pumper to pump messages to processer. + */ +class ISrsMessagePumper : public ISrsMessageConsumer +{ +public: + ISrsMessagePumper(); + virtual ~ISrsMessagePumper(); +public: /** - * when recv message error. - */ - virtual void on_recv_error(int ret) = 0; + * Whether the pumper is interrupted. + * For example, when pumpter is busy, it's interrupted, + * please wait for a while then try to feed the pumper. + */ + virtual bool interrupted() = 0; /** - * when thread start or stop, - * for example, the message handler can set whether auto response. - */ - virtual void on_thread_start() = 0; - virtual void on_thread_stop() = 0; + * Interrupt the pumper for a error. + */ + virtual void interrupt(int error) = 0; + /** + * When start the pumper. + */ + virtual void on_start() = 0; + /** + * When stop the pumper. + */ + virtual void on_stop() = 0; }; /** @@ -84,14 +96,14 @@ class SrsRecvThread : public ISrsReusableThread2Handler { protected: SrsReusableThread2* trd; - ISrsMessageHandler* handler; + ISrsMessagePumper* pumper; SrsRtmpServer* rtmp; // The recv timeout in ms. int timeout; public: // Constructor. // @param tm The receive timeout in ms. - SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int tm); + SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm); virtual ~SrsRecvThread(); public: virtual int cid(); @@ -112,7 +124,7 @@ public: * @see: SrsRtmpConn::playing * @see: https://github.com/ossrs/srs/issues/217 */ -class SrsQueueRecvThread : public ISrsMessageHandler +class SrsQueueRecvThread : public ISrsMessagePumper { private: std::vector queue; @@ -132,24 +144,23 @@ public: virtual int size(); virtual SrsCommonMessage* pump(); virtual int error_code(); +// interface ISrsMessagePumper public: - virtual bool can_handle(); - virtual int handle(SrsCommonMessage* msg); - virtual void on_recv_error(int ret); -public: - virtual void on_thread_start(); - virtual void on_thread_stop(); + virtual int consume(SrsCommonMessage* msg); + virtual bool interrupted(); + virtual void interrupt(int ret); + virtual void on_start(); + virtual void on_stop(); }; /** * the publish recv thread got message and callback the source method to process message. * @see: https://github.com/ossrs/srs/issues/237 */ -class SrsPublishRecvThread : virtual public ISrsMessageHandler +class SrsPublishRecvThread : virtual public ISrsMessagePumper, virtual public ISrsReloadHandler #ifdef SRS_PERF_MERGED_READ , virtual public IMergeReadHandler #endif - , virtual public ISrsReloadHandler { private: SrsRecvThread trd; @@ -195,13 +206,13 @@ public: public: virtual int start(); virtual void stop(); - virtual void on_thread_start(); - virtual void on_thread_stop(); -// interface ISrsMessageHandler +// interface ISrsMessagePumper public: - virtual bool can_handle(); - virtual int handle(SrsCommonMessage* msg); - virtual void on_recv_error(int ret); + virtual int consume(SrsCommonMessage* msg); + virtual bool interrupted(); + virtual void interrupt(int ret); + virtual void on_start(); + virtual void on_stop(); // interface IMergeReadHandler public: #ifdef SRS_PERF_MERGED_READ