From 4583a63789276ca113cd5dae62a2849aef8d70fe Mon Sep 17 00:00:00 2001
From: winlin <winlin@vip.126.com>
Date: Mon, 23 Jan 2017 16:38:23 +0800
Subject: [PATCH] for #742, refine code for recv thread.

---
 trunk/src/app/srs_app_recv_thread.cpp | 153 ++++++++++++++------------
 trunk/src/app/srs_app_recv_thread.hpp |  87 ++++++++-------
 2 files changed, 130 insertions(+), 110 deletions(-)

diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp
index df5eb5625..b28e71a50 100644
--- a/trunk/src/app/srs_app_recv_thread.cpp
+++ b/trunk/src/app/srs_app_recv_thread.cpp
@@ -37,19 +37,27 @@ using namespace std;
 // the max small bytes to group
 #define SRS_MR_SMALL_BYTES 4096
 
-ISrsMessageHandler::ISrsMessageHandler()
+ISrsMessageConsumer::ISrsMessageConsumer()
 {
 }
 
-ISrsMessageHandler::~ISrsMessageHandler()
+ISrsMessageConsumer::~ISrsMessageConsumer()
 {
 }
 
-SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int tm)
+ISrsMessagePumper::ISrsMessagePumper()
 {
+}
+
+ISrsMessagePumper::~ISrsMessagePumper()
+{
+}
+
+SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm)
+{
+    rtmp = r;
+    pumper = p;
     timeout = tm;
-    handler = msg_handler;
-    rtmp = rtmp_sdk;
     trd = new SrsReusableThread2("recv", this);
 }
 
@@ -87,29 +95,29 @@ int SrsRecvThread::cycle()
     int ret = ERROR_SUCCESS;
 
     while (!trd->interrupted()) {
-        if (!handler->can_handle()) {
+        // When the pumper is interrupted, wait then retry.
+        if (pumper->interrupted()) {
             st_usleep(timeout * 1000);
             continue;
         }
     
         SrsCommonMessage* msg = NULL;
         
-        // recv and handle message
-        ret = rtmp->recv_message(&msg);
-        if (ret == ERROR_SUCCESS) {
-            ret = handler->handle(msg);
+        // Process the received message.
+        if ((ret = rtmp->recv_message(&msg)) == ERROR_SUCCESS) {
+            ret = pumper->consume(msg);
         }
     
         if (ret != ERROR_SUCCESS) {
             if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
-                srs_error("thread process message failed. ret=%d", ret);
+                srs_error("recv thread error. ret=%d", ret);
             }
     
-            // we use no timeout to recv, should never got any error.
+            // Interrupt the receive thread for any error.
             trd->interrupt();
             
-            // notice the handler got a recv error.
-            handler->on_recv_error(ret);
+            // Notify the pumper to quit for error.
+            pumper->interrupt(ret);
     
             return ret;
         }
@@ -128,7 +136,7 @@ void SrsRecvThread::on_thread_start()
     // @see: https://github.com/ossrs/srs/issues/217
     rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS);
     
-    handler->on_thread_start();
+    pumper->on_start();
 }
 
 void SrsRecvThread::on_thread_stop()
@@ -136,7 +144,7 @@ void SrsRecvThread::on_thread_stop()
     // reset the timeout to pulse mode.
     rtmp->set_recv_timeout(timeout * 1000);
     
-    handler->on_thread_stop();
+    pumper->on_stop();
 }
 
 SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms)
@@ -196,16 +204,7 @@ int SrsQueueRecvThread::error_code()
     return recv_error_code;
 }
 
-bool SrsQueueRecvThread::can_handle()
-{
-    // we only recv one message and then process it,
-    // for the message may cause the thread to stop,
-    // when stop, the thread is freed, so the messages
-    // are dropped.
-    return empty();
-}
-
-int SrsQueueRecvThread::handle(SrsCommonMessage* msg)
+int SrsQueueRecvThread::consume(SrsCommonMessage* msg)
 {
     // put into queue, the send thread will get and process it,
     // @see SrsRtmpConn::process_play_control_msg
@@ -218,9 +217,19 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg)
     return ERROR_SUCCESS;
 }
 
-void SrsQueueRecvThread::on_recv_error(int ret)
+bool SrsQueueRecvThread::interrupted()
+{
+    // we only recv one message and then process it,
+    // for the message may cause the thread to stop,
+    // when stop, the thread is freed, so the messages
+    // are dropped.
+    return !empty();
+}
+
+void SrsQueueRecvThread::interrupt(int ret)
 {
     recv_error_code = ret;
+    
 #ifdef SRS_PERF_QUEUE_COND_WAIT
     if (_consumer) {
         _consumer->wakeup();
@@ -228,14 +237,14 @@ void SrsQueueRecvThread::on_recv_error(int ret)
 #endif
 }
 
-void SrsQueueRecvThread::on_thread_start()
+void SrsQueueRecvThread::on_start()
 {
     // disable the protocol auto response,
     // for the isolate recv thread should never send any messages.
     rtmp->set_auto_response(false);
 }
 
-void SrsQueueRecvThread::on_thread_stop()
+void SrsQueueRecvThread::on_stop()
 {
     // enable the protocol auto response,
     // for the isolate recv thread terminated.
@@ -325,7 +334,48 @@ void SrsPublishRecvThread::stop()
     trd.stop();
 }
 
-void SrsPublishRecvThread::on_thread_start()
+int SrsPublishRecvThread::consume(SrsCommonMessage* msg)
+{
+    int ret = ERROR_SUCCESS;
+    
+    // when cid changed, change it.
+    if (ncid != cid) {
+        _srs_context->set_id(ncid);
+        cid = ncid;
+    }
+    
+    _nb_msgs++;
+    
+    // log to show the time of recv thread.
+    srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",
+                srs_update_system_time_ms(), msg->header.timestamp, msg->size);
+    
+    // the rtmp connection will handle this message
+    ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);
+    
+    // must always free it,
+    // the source will copy it if need to use.
+    srs_freep(msg);
+    
+    return ret;
+}
+
+bool SrsPublishRecvThread::interrupted()
+{
+    // Never interrupted, always can handle message.
+    return false;
+}
+
+void SrsPublishRecvThread::interrupt(int ret)
+{
+    recv_error_code = ret;
+    
+    // when recv thread error, signal the conn thread to process it.
+    // @see https://github.com/ossrs/srs/issues/244
+    st_cond_signal(error);
+}
+
+void SrsPublishRecvThread::on_start()
 {
     // we donot set the auto response to false,
     // for the main thread never send message.
@@ -342,7 +392,7 @@ void SrsPublishRecvThread::on_thread_start()
 #endif
 }
 
-void SrsPublishRecvThread::on_thread_stop()
+void SrsPublishRecvThread::on_stop()
 {
     // we donot set the auto response to true,
     // for we donot set to false yet.
@@ -360,47 +410,6 @@ void SrsPublishRecvThread::on_thread_stop()
 #endif
 }
 
-bool SrsPublishRecvThread::can_handle()
-{
-    // publish thread always can handle message.
-    return true;
-}
-
-int SrsPublishRecvThread::handle(SrsCommonMessage* msg)
-{
-    int ret = ERROR_SUCCESS;
-    
-    // when cid changed, change it.
-    if (ncid != cid) {
-        _srs_context->set_id(ncid);
-        cid = ncid;
-    }
-
-    _nb_msgs++;
-    
-    // log to show the time of recv thread.
-    srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",
-        srs_update_system_time_ms(), msg->header.timestamp, msg->size);
-
-    // the rtmp connection will handle this message
-    ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);
-
-    // must always free it,
-    // the source will copy it if need to use.
-    srs_freep(msg);
-    
-    return ret;
-}
-
-void SrsPublishRecvThread::on_recv_error(int ret)
-{
-    recv_error_code = ret;
-
-    // when recv thread error, signal the conn thread to process it.
-    // @see https://github.com/ossrs/srs/issues/244
-    st_cond_signal(error);
-}
-
 #ifdef SRS_PERF_MERGED_READ
 void SrsPublishRecvThread::on_read(ssize_t nread)
 {
diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp
index 2f713c4f7..b97dbbc83 100644
--- a/trunk/src/app/srs_app_recv_thread.hpp
+++ b/trunk/src/app/srs_app_recv_thread.hpp
@@ -45,36 +45,48 @@ class SrsRequest;
 class SrsConsumer;
 
 /**
- * for the recv thread to handle the message.
+ * The message consumer which consume a message.
  */
-class ISrsMessageHandler
+class ISrsMessageConsumer
 {
 public:
-    ISrsMessageHandler();
-    virtual ~ISrsMessageHandler();
+    ISrsMessageConsumer();
+    virtual ~ISrsMessageConsumer();
 public:
     /**
-    * whether the handler can handle,
-    * for example, when queue recv handler got an message,
-    * it wait the user to process it, then the recv thread
-    * never recv message util the handler is ok.
-    */
-    virtual bool can_handle() = 0;
-    /**
-     * process the received message.
+     * Consume the received message.
      * @remark user must free this message.
      */
-    virtual int handle(SrsCommonMessage* msg) = 0;
+    virtual int consume(SrsCommonMessage* msg) = 0;
+};
+
+/**
+ * The message pumper to pump messages to processer.
+ */
+class ISrsMessagePumper : public ISrsMessageConsumer
+{
+public:
+    ISrsMessagePumper();
+    virtual ~ISrsMessagePumper();
+public:
     /**
-    * when recv message error.
-    */
-    virtual void on_recv_error(int ret) = 0;
+     * Whether the pumper is interrupted.
+     * For example, when pumpter is busy, it's interrupted,
+     * please wait for a while then try to feed the pumper.
+     */
+    virtual bool interrupted() = 0;
     /**
-    * when thread start or stop, 
-    * for example, the message handler can set whether auto response.
-    */
-    virtual void on_thread_start() = 0;
-    virtual void on_thread_stop() = 0;
+     * Interrupt the pumper for a error.
+     */
+    virtual void interrupt(int error) = 0;
+    /**
+     * When start the pumper.
+     */
+    virtual void on_start() = 0;
+    /**
+     * When stop the pumper.
+     */
+    virtual void on_stop() = 0;
 };
 
 /**
@@ -84,14 +96,14 @@ class SrsRecvThread : public ISrsReusableThread2Handler
 {
 protected:
     SrsReusableThread2* trd;
-    ISrsMessageHandler* handler;
+    ISrsMessagePumper* pumper;
     SrsRtmpServer* rtmp;
     // The recv timeout in ms.
     int timeout;
 public:
     // Constructor.
     // @param tm The receive timeout in ms.
-    SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int tm);
+    SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm);
     virtual ~SrsRecvThread();
 public:
     virtual int cid();
@@ -112,7 +124,7 @@ public:
 * @see: SrsRtmpConn::playing
 * @see: https://github.com/ossrs/srs/issues/217
 */
-class SrsQueueRecvThread : public ISrsMessageHandler
+class SrsQueueRecvThread : public ISrsMessagePumper
 {
 private:
     std::vector<SrsCommonMessage*> queue;
@@ -132,24 +144,23 @@ public:
     virtual int size();
     virtual SrsCommonMessage* pump();
     virtual int error_code();
+// interface ISrsMessagePumper
 public:
-    virtual bool can_handle();
-    virtual int handle(SrsCommonMessage* msg);
-    virtual void on_recv_error(int ret);
-public:
-    virtual void on_thread_start();
-    virtual void on_thread_stop();
+    virtual int consume(SrsCommonMessage* msg);
+    virtual bool interrupted();
+    virtual void interrupt(int ret);
+    virtual void on_start();
+    virtual void on_stop();
 };
 
 /**
 * the publish recv thread got message and callback the source method to process message.
 * @see: https://github.com/ossrs/srs/issues/237
 */
-class SrsPublishRecvThread : virtual public ISrsMessageHandler
+class SrsPublishRecvThread : virtual public ISrsMessagePumper, virtual public ISrsReloadHandler
 #ifdef SRS_PERF_MERGED_READ
     , virtual public IMergeReadHandler
 #endif
-    , virtual public ISrsReloadHandler
 {
 private:
     SrsRecvThread trd;
@@ -195,13 +206,13 @@ public:
 public:
     virtual int start();
     virtual void stop();
-    virtual void on_thread_start();
-    virtual void on_thread_stop();
-// interface ISrsMessageHandler    
+// interface ISrsMessagePumper
 public:
-    virtual bool can_handle();
-    virtual int handle(SrsCommonMessage* msg);
-    virtual void on_recv_error(int ret);
+    virtual int consume(SrsCommonMessage* msg);
+    virtual bool interrupted();
+    virtual void interrupt(int ret);
+    virtual void on_start();
+    virtual void on_stop();
 // interface IMergeReadHandler
 public:
 #ifdef SRS_PERF_MERGED_READ