diff --git a/README.md b/README.md index 224abf2d3..aea022769 100755 --- a/README.md +++ b/README.md @@ -192,6 +192,7 @@ Supported operating systems and hardware: * 2013-10-17, Created.
## History +* v1.0, 2014-04-27, support basic edge(play/publish) RTMP server. 0.9.78 * v1.0, 2014-04-25, add donation page. 0.9.76 * v1.0, 2014-04-24, support live flashP2P(integrated by chnvideo VDN). 0.9.75 * v1.0, 2014-04-21, support android app to start srs for internal edge. 0.9.72 diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index a9042b580..490a09681 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -49,6 +49,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // when edge timeout, retry next. #define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(3*1000*1000LL) +// when error, edge ingester sleep for a while and retry. +#define SRS_EDGE_FORWARDER_SLEEP_US (int64_t)(1*1000*1000LL) + +// when edge timeout, retry next. +#define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(3*1000*1000LL) + SrsEdgeIngester::SrsEdgeIngester() { io = NULL; @@ -316,6 +322,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() origin_index = 0; stream_id = 0; stfd = NULL; + pthread = new SrsThread(this, SRS_EDGE_FORWARDER_SLEEP_US); } SrsEdgeForwarder::~SrsEdgeForwarder() @@ -367,17 +374,54 @@ int SrsEdgeForwarder::start() return ret; } - return ret; + return pthread->start(); } void SrsEdgeForwarder::stop() { + pthread->stop(); + close_underlayer_socket(); srs_freep(client); srs_freep(io); } +int SrsEdgeForwarder::cycle() +{ + int ret = ERROR_SUCCESS; + + client->set_recv_timeout(SRS_EDGE_FORWARDER_TIMEOUT_US); + + SrsPithyPrint pithy_print(SRS_STAGE_EDGE); + + while (pthread->can_loop()) { + // switch to other st-threads. + st_usleep(0); + + pithy_print.elapse(); + + // pithy print + if (pithy_print.can_print()) { + srs_trace("-> time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); + } + + // read from client. + SrsCommonMessage* msg = NULL; + if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { + srs_info("ignore forwarder recv origin server message failed. ret=%d", ret); + continue; + } + srs_verbose("edge loop recv message. ret=%d", ret); + + srs_assert(msg); + SrsAutoFree(SrsCommonMessage, msg, false); + } + + return ret; +} + int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 76874491e..21da18932 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -104,7 +104,7 @@ private: /** * edge used to forward stream to origin. */ -class SrsEdgeForwarder +class SrsEdgeForwarder : public ISrsThreadHandler { private: int stream_id; @@ -112,6 +112,7 @@ private: SrsSource* _source; SrsPublishEdge* _edge; SrsRequest* _req; + SrsThread* pthread; st_netfd_t stfd; ISrsProtocolReaderWriter* io; SrsRtmpClient* client; @@ -123,6 +124,9 @@ public: virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); virtual int start(); virtual void stop(); +// interface ISrsThreadHandler +public: + virtual int cycle(); public: virtual int proxy(SrsCommonMessage* msg); private: @@ -134,6 +138,7 @@ private: * play edge control service. * downloading edge speed-up. */ +// TODO: FIXME: support reload class SrsPlayEdge { private: @@ -164,6 +169,7 @@ public: * publish edge control service. * uploading edge speed-up. */ +// TODO: FIXME: support reload class SrsPublishEdge { private: diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 2824e58f3..a1118a63f 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "77" +#define VERSION_REVISION "78" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "srs"