From acba4cfdc6e5f7fcc6dcdd15b934034cd0c98ad6 Mon Sep 17 00:00:00 2001
From: winlin <winlin@vip.126.com>
Date: Sat, 26 Apr 2014 18:08:21 +0800
Subject: [PATCH] add edge framework

---
 trunk/src/app/srs_app_edge.cpp        | 58 ++++++++++++++++++++++++++-
 trunk/src/app/srs_app_edge.hpp        | 25 +++++++++++-
 trunk/src/kernel/srs_kernel_error.hpp |  3 ++
 3 files changed, 84 insertions(+), 2 deletions(-)

diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp
index e6528e448..726449a7f 100644
--- a/trunk/src/app/srs_app_edge.cpp
+++ b/trunk/src/app/srs_app_edge.cpp
@@ -25,21 +25,63 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
 #include <srs_kernel_error.hpp>
 #include <srs_protocol_rtmp.hpp>
+#include <srs_kernel_log.hpp>
+
+// when error, edge ingester sleep for a while and retry.
+#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL)
+
+SrsEdgeIngester::SrsEdgeIngester()
+{
+    _edge = NULL;
+    _req = NULL;
+    pthread = new SrsThread(this, SRS_EDGE_INGESTER_SLEEP_US);
+}
+
+SrsEdgeIngester::~SrsEdgeIngester()
+{
+}
+
+int SrsEdgeIngester::initialize(SrsEdge* edge, SrsRequest* req)
+{
+    int ret = ERROR_SUCCESS;
+    
+    _edge = edge;
+    _req = req;
+    
+    return ret;
+}
+
+int SrsEdgeIngester::start()
+{
+    int ret = ERROR_SUCCESS;
+    return ret;
+    //return pthread->start();
+}
+
+int SrsEdgeIngester::cycle()
+{
+    int ret = ERROR_SUCCESS;
+    return ret;
+}
 
 SrsEdge::SrsEdge()
 {
     state = SrsEdgeStateInit;
+    ingester = new SrsEdgeIngester();
 }
 
 SrsEdge::~SrsEdge()
 {
+    srs_freep(ingester);
 }
 
 int SrsEdge::initialize(SrsRequest* req)
 {
     int ret = ERROR_SUCCESS;
     
-    _req = req;
+    if ((ret = ingester->initialize(this, req)) != ERROR_SUCCESS) {
+        return ret;
+    }
     
     return ret;
 }
@@ -47,6 +89,20 @@ int SrsEdge::initialize(SrsRequest* req)
 int SrsEdge::on_client_play()
 {
     int ret = ERROR_SUCCESS;
+    
+    // error state.
+    if (state == SrsEdgeStateAborting || state == SrsEdgeStateReloading) {
+        ret = ERROR_RTMP_EDGE_PLAY_STATE;
+        srs_error("invalid state for client to play stream on edge. state=%d, ret=%d", state, ret);
+        return ret;
+    }
+    
+    // start ingest when init state.
+    if (state == SrsEdgeStateInit) {
+        state = SrsEdgeStatePlay;
+        return ingester->start();
+    }
+
     return ret;
 }
 
diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp
index 37d610973..130c0ef56 100644
--- a/trunk/src/app/srs_app_edge.hpp
+++ b/trunk/src/app/srs_app_edge.hpp
@@ -30,6 +30,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
 #include <srs_core.hpp>
 
+#include <srs_app_thread.hpp>
+
+class SrsEdge;
 class SrsRequest;
 
 /**
@@ -45,14 +48,34 @@ enum SrsEdgeState
     SrsEdgeStateReloading,
 };
 
+/**
+* edge used to ingest stream from origin.
+*/
+class SrsEdgeIngester : public ISrsThreadHandler
+{
+private:
+    SrsEdge* _edge;
+    SrsRequest* _req;
+    SrsThread* pthread;
+public:
+    SrsEdgeIngester();
+    virtual ~SrsEdgeIngester();
+public:
+    virtual int initialize(SrsEdge* edge, SrsRequest* req);
+    virtual int start();
+// interface ISrsThreadHandler
+public:
+    virtual int cycle();
+};
+
 /**
 * edge control service.
 */
 class SrsEdge
 {
 private:
-    SrsRequest* _req;
     SrsEdgeState state;
+    SrsEdgeIngester* ingester;
 public:
     SrsEdge();
     virtual ~SrsEdge();
diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp
index 81894bc75..6472dba74 100644
--- a/trunk/src/kernel/srs_kernel_error.hpp
+++ b/trunk/src/kernel/srs_kernel_error.hpp
@@ -76,6 +76,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 // 2. srs-librtmp return error, to terminate the program.
 #define ERROR_RTMP_HS_SSL_REQUIRE           318
 #define ERROR_RTMP_DURATION_EXCEED          319
+// edge specified errors
+// invalid state for client to play edge stream.
+#define ERROR_RTMP_EDGE_PLAY_STATE          320
 
 #define ERROR_SYSTEM_STREAM_INIT            400
 #define ERROR_SYSTEM_PACKET_INVALID         401