diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 9972b5d9b..21f7cd106 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -67,7 +67,7 @@ SrsEdgeIngester::~SrsEdgeIngester() srs_freep(pthread); } -int SrsEdgeIngester::initialize(SrsSource* source, SrsEdge* edge, SrsRequest* req) +int SrsEdgeIngester::initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req) { int ret = ERROR_SUCCESS; @@ -306,19 +306,226 @@ int SrsEdgeIngester::connect_server() return ret; } -SrsEdge::SrsEdge() +SrsEdgeForwarder::SrsEdgeForwarder() +{ + io = NULL; + client = NULL; + _edge = NULL; + _req = NULL; + origin_index = 0; + stream_id = 0; + stfd = NULL; + pthread = new SrsThread(this, SRS_EDGE_INGESTER_SLEEP_US); +} + +SrsEdgeForwarder::~SrsEdgeForwarder() +{ + stop(); + + srs_freep(pthread); +} + +int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + _source = source; + _edge = edge; + _req = req; + + return ret; +} + +int SrsEdgeForwarder::start() +{ + return pthread->start(); +} + +void SrsEdgeForwarder::stop() +{ + pthread->stop(); + + close_underlayer_socket(); + + srs_freep(client); + srs_freep(io); +} + +int SrsEdgeForwarder::cycle() +{ + int ret = ERROR_SUCCESS; + + if ((ret = connect_server()) != ERROR_SUCCESS) { + return ret; + } + srs_assert(client); + + client->set_recv_timeout(SRS_RECV_TIMEOUT_US); + client->set_send_timeout(SRS_SEND_TIMEOUT_US); + + SrsRequest* req = _req; + + if ((ret = client->handshake()) != ERROR_SUCCESS) { + srs_error("handshake with server failed. ret=%d", ret); + return ret; + } + if ((ret = client->connect_app(req->app, req->tcUrl)) != ERROR_SUCCESS) { + srs_error("connect with server failed, tcUrl=%s. ret=%d", req->tcUrl.c_str(), ret); + return ret; + } + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { + srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + return ret; + } + + if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) { + srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d", + req->stream.c_str(), stream_id, ret); + return ret; + } + + if ((ret = _source->on_publish()) != ERROR_SUCCESS) { + srs_error("edge ingester play stream then publish to edge failed. ret=%d", ret); + return ret; + } + + if ((ret = _edge->on_forward_publish()) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = forward()) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsEdgeForwarder::forward() +{ + int ret = ERROR_SUCCESS; + + client->set_recv_timeout(SRS_EDGE_TIMEOUT_US); + + SrsPithyPrint pithy_print(SRS_STAGE_EDGE); + + while (pthread->can_loop()) { + // switch to other st-threads. + st_usleep(0); + + pithy_print.elapse(); + + // pithy print + if (pithy_print.can_print()) { + srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); + } + + // read from client. + SrsCommonMessage* msg = NULL; + if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv origin server message failed. ret=%d", ret); + return ret; + } + srs_verbose("edge loop recv message. ret=%d", ret); + + srs_assert(msg); + SrsAutoFree(SrsCommonMessage, msg, false); + } + + return ret; +} + +void SrsEdgeForwarder::close_underlayer_socket() +{ + srs_close_stfd(stfd); +} + +int SrsEdgeForwarder::connect_server() +{ + int ret = ERROR_SUCCESS; + + // reopen + close_underlayer_socket(); + + // TODO: FIXME: support reload + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost); + srs_assert(conf); + + // select the origin. + std::string server = conf->args.at(origin_index % conf->args.size()); + origin_index = (origin_index + 1) % conf->args.size(); + + std::string s_port = RTMP_DEFAULT_PORT; + int port = ::atoi(RTMP_DEFAULT_PORT); + size_t pos = server.find(":"); + if (pos != std::string::npos) { + s_port = server.substr(pos + 1); + server = server.substr(0, pos); + port = ::atoi(s_port.c_str()); + } + + // open socket. + srs_trace("connect edge stream=%s, tcUrl=%s to server=%s, port=%d", + _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port); + + // TODO: FIXME: extract utility method + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + srs_assert(!stfd); + stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + srs_freep(client); + srs_freep(io); + + io = new SrsSocket(stfd); + client = new SrsRtmpClient(io); + + // connect to server. + std::string ip = srs_dns_resolve(server); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + return ret; + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); + return ret; + } + srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); + + return ret; +} + +SrsPlayEdge::SrsPlayEdge() { state = SrsEdgeStateInit; user_state = SrsEdgeUserStateInit; ingester = new SrsEdgeIngester(); } -SrsEdge::~SrsEdge() +SrsPlayEdge::~SrsPlayEdge() { srs_freep(ingester); } -int SrsEdge::initialize(SrsSource* source, SrsRequest* req) +int SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req) { int ret = ERROR_SUCCESS; @@ -329,7 +536,7 @@ int SrsEdge::initialize(SrsSource* source, SrsRequest* req) return ret; } -int SrsEdge::on_client_play() +int SrsPlayEdge::on_client_play() { int ret = ERROR_SUCCESS; @@ -350,7 +557,7 @@ int SrsEdge::on_client_play() return ret; } -void SrsEdge::on_all_client_stop() +void SrsPlayEdge::on_all_client_stop() { // when all client disconnected, // and edge is ingesting origin stream, abort it. @@ -365,7 +572,7 @@ void SrsEdge::on_all_client_stop() } } -int SrsEdge::on_ingest_play() +int SrsPlayEdge::on_ingest_play() { int ret = ERROR_SUCCESS; @@ -382,3 +589,52 @@ int SrsEdge::on_ingest_play() return ret; } + +SrsPublishEdge::SrsPublishEdge() +{ + state = SrsEdgeStateInit; + user_state = SrsEdgeUserStateInit; + forwarder = new SrsEdgeForwarder(); +} + +SrsPublishEdge::~SrsPublishEdge() +{ + srs_freep(forwarder); +} + +int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + if ((ret = forwarder->initialize(source, this, req)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsPublishEdge::on_client_publish() +{ + int ret = ERROR_SUCCESS; + + // error state. + if (user_state != SrsEdgeUserStateInit) { + ret = ERROR_RTMP_EDGE_PUBLISH_STATE; + srs_error("invalid state for client to play stream on edge. " + "state=%d, user_state=%d, ret=%d", state, user_state, ret); + return ret; + } + + // start ingest when init state. + if (state == SrsEdgeStateInit) { + state = SrsEdgeStatePublish; + } + + return ret; +} + +int SrsPublishEdge::on_forward_publish() +{ + int ret = ERROR_SUCCESS; + return ret; +} diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 769ab536f..e5d274771 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -33,9 +33,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -class SrsEdge; class SrsSource; class SrsRequest; +class SrsPlayEdge; +class SrsPublishEdge; class SrsRtmpClient; class SrsCommonMessage; class ISrsProtocolReaderWriter; @@ -46,10 +47,14 @@ class ISrsProtocolReaderWriter; enum SrsEdgeState { SrsEdgeStateInit = 0, + + // for play edge SrsEdgeStatePlay = 100, - SrsEdgeStatePublish, // play stream from origin, ingest stream SrsEdgeStateIngestConnected, + + // for publish edge + SrsEdgeStatePublish = 200, // publish stream to edge, forward to origin SrsEdgeStateForwardConnected, }; @@ -72,7 +77,7 @@ private: int stream_id; private: SrsSource* _source; - SrsEdge* _edge; + SrsPlayEdge* _edge; SrsRequest* _req; SrsThread* pthread; st_netfd_t stfd; @@ -83,7 +88,7 @@ public: SrsEdgeIngester(); virtual ~SrsEdgeIngester(); public: - virtual int initialize(SrsSource* source, SrsEdge* edge, SrsRequest* req); + virtual int initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req); virtual int start(); virtual void stop(); // interface ISrsThreadHandler @@ -97,17 +102,50 @@ private: }; /** -* edge control service. +* edge used to forward stream to origin. +*/ +class SrsEdgeForwarder : public ISrsThreadHandler +{ +private: + int stream_id; +private: + SrsSource* _source; + SrsPublishEdge* _edge; + SrsRequest* _req; + SrsThread* pthread; + st_netfd_t stfd; + ISrsProtocolReaderWriter* io; + SrsRtmpClient* client; + int origin_index; +public: + SrsEdgeForwarder(); + virtual ~SrsEdgeForwarder(); +public: + virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); + virtual int start(); + virtual void stop(); +// interface ISrsThreadHandler +public: + virtual int cycle(); +private: + virtual int forward(); + virtual void close_underlayer_socket(); + virtual int connect_server(); +}; + +/** +* play edge control service. +* downloading edge speed-up. */ -class SrsEdge +class SrsPlayEdge { private: SrsEdgeState state; SrsEdgeUserState user_state; SrsEdgeIngester* ingester; public: - SrsEdge(); - virtual ~SrsEdge(); + SrsPlayEdge(); + virtual ~SrsPlayEdge(); public: virtual int initialize(SrsSource* source, SrsRequest* req); /** @@ -125,4 +163,30 @@ public: virtual int on_ingest_play(); }; +/** +* publish edge control service. +* uploading edge speed-up. +*/ +class SrsPublishEdge +{ +private: + SrsEdgeState state; + SrsEdgeUserState user_state; + SrsEdgeForwarder* forwarder; +public: + SrsPublishEdge(); + virtual ~SrsPublishEdge(); +public: + virtual int initialize(SrsSource* source, SrsRequest* req); + /** + * when client publish stream on edge. + */ + virtual int on_client_publish(); +public: + /** + * when forwarder start to publish stream. + */ + virtual int on_forward_publish(); +}; + #endif diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 6f6b80c50..3dc8de6d6 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -321,6 +321,13 @@ int SrsRtmpConn::stream_service_cycle() case SrsRtmpConnFMLEPublish: { srs_verbose("FMLE start to publish stream %s.", req->stream.c_str()); + if (vhost_is_edge) { + if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { + srs_error("notice edge start publish stream failed. ret=%d", ret); + return ret; + } + } + if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to publish stream failed. ret=%d", ret); return ret; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 50c2941a5..b29d821fd 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -456,7 +456,8 @@ SrsSource::SrsSource(SrsRequest* req) frame_rate = sample_rate = 0; _can_publish = true; - edge = new SrsEdge(); + play_edge = new SrsPlayEdge(); + publish_edge = new SrsPublishEdge(); gop_cache = new SrsGopCache(); _srs_config->subscribe(this); @@ -489,7 +490,8 @@ SrsSource::~SrsSource() srs_freep(cache_sh_video); srs_freep(cache_sh_audio); - srs_freep(edge); + srs_freep(play_edge); + srs_freep(publish_edge); srs_freep(gop_cache); #ifdef SRS_AUTO_HLS @@ -515,7 +517,10 @@ int SrsSource::initialize() } #endif - if ((ret = edge->initialize(this, _req)) != ERROR_SUCCESS) { + if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) { + return ret; + } + if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) { return ret; } @@ -1170,7 +1175,7 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer) srs_info("handle consumer destroy success."); if (consumers.empty()) { - edge->on_all_client_stop(); + play_edge->on_all_client_stop(); } } @@ -1186,7 +1191,12 @@ bool SrsSource::is_atc() int SrsSource::on_edge_start_play() { - return edge->on_client_play(); + return play_edge->on_client_play(); +} + +int SrsSource::on_edge_start_publish() +{ + return publish_edge->on_client_publish(); } int SrsSource::create_forwarders() diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index fb83df04f..a7c83217e 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -37,7 +37,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -class SrsEdge; +class SrsPlayEdge; +class SrsPublishEdge; class SrsSource; class SrsCommonMessage; class SrsOnMetaDataPacket; @@ -236,7 +237,8 @@ private: SrsEncoder* encoder; #endif // edge control service - SrsEdge* edge; + SrsPlayEdge* play_edge; + SrsPublishEdge* publish_edge; // gop cache for client fast startup. SrsGopCache* gop_cache; // to forward stream to other servers @@ -314,6 +316,8 @@ public: virtual bool is_atc(); // for edge, when play edge stream, check the state virtual int on_edge_start_play(); + // for edge, when publish edge stream, check the state + virtual int on_edge_start_publish(); private: virtual int create_forwarders(); virtual void destroy_forwarders(); diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 6472dba74..c233d6fd4 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -79,6 +79,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // edge specified errors // invalid state for client to play edge stream. #define ERROR_RTMP_EDGE_PLAY_STATE 320 +// invalid state for client to publish edge stream. +#define ERROR_RTMP_EDGE_PUBLISH_STATE 321 #define ERROR_SYSTEM_STREAM_INIT 400 #define ERROR_SYSTEM_PACKET_INVALID 401