From 12e013142d2b67e8d5fa4989efcad4b3eb3d339b Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 14 Oct 2015 15:08:55 +0800 Subject: [PATCH] refine code, use simple rtmp client. --- trunk/src/app/srs_app_caster_flv.cpp | 6 +- trunk/src/app/srs_app_edge.cpp | 216 ++++++------------------- trunk/src/app/srs_app_edge.hpp | 17 +- trunk/src/app/srs_app_rtmp_conn.cpp | 55 ++++--- trunk/src/app/srs_app_rtmp_conn.hpp | 8 +- trunk/src/kernel/srs_kernel_consts.hpp | 2 +- 6 files changed, 102 insertions(+), 202 deletions(-) diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 400060188..5da82e8fe 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -179,8 +179,10 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) { int ret = ERROR_SUCCESS; - if ((ret = sdk->connect(output, SRS_CONSTS_RTMP_RECV_TIMEOUT_US)) != ERROR_SUCCESS) { - srs_error("flv: connect %s failed. ret=%d", output.c_str(), ret); + int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; + int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; + if ((ret = sdk->connect(output, cto, sto)) != ERROR_SUCCESS) { + srs_error("flv: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret); return ret; } diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index e942dca44..d859e91dd 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -125,7 +125,7 @@ int SrsEdgeIngester::cycle() source->on_source_id_changed(_srs_context->get_id()); - std::string url, vhost; + std::string url; if (true) { SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); @@ -139,22 +139,22 @@ int SrsEdgeIngester::cycle() } // select the origin. - if (true) { - std::string server = lb->select(conf->args); - int port = SRS_CONSTS_RTMP_DEFAULT_PORT; - srs_parse_hostport(server, server, port); - - url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream); - } + std::string server = lb->select(conf->args); + int port = SRS_CONSTS_RTMP_DEFAULT_PORT; + srs_parse_hostport(server, server, port); // support vhost tranform for edge, // @see https://github.com/simple-rtmp-server/srs/issues/372 - vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); + std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); vhost = srs_string_replace(vhost, "[vhost]", req->vhost); + + url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream); } - if ((ret = sdk->connect(url, vhost, SRS_CONSTS_RTMP_TIMEOUT_US)) != ERROR_SUCCESS) { - srs_error("edge pull %s failed. ret=%d", url.c_str(), ret); + int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US; + int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; + if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { + srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); return ret; } @@ -180,8 +180,6 @@ int SrsEdgeIngester::ingest() { int ret = ERROR_SUCCESS; - sdk->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US); - SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); SrsAutoFree(SrsPithyPrint, pprint); @@ -271,27 +269,24 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) SrsEdgeForwarder::SrsEdgeForwarder() { - transport = new SrsTcpClient(); - kbps = new SrsKbps(); - client = NULL; - _edge = NULL; - _req = NULL; + edge = NULL; + req = NULL; + send_error_code = ERROR_SUCCESS; + + sdk = new SrsSimpleRtmpClient(); lb = new SrsLbRoundRobin(); - stream_id = 0; pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); queue = new SrsMessageQueue(); - send_error_code = ERROR_SUCCESS; } SrsEdgeForwarder::~SrsEdgeForwarder() { stop(); - srs_freep(transport); + srs_freep(sdk); srs_freep(lb); srs_freep(pthread); srs_freep(queue); - srs_freep(kbps); } void SrsEdgeForwarder::set_queue_size(double queue_size) @@ -299,13 +294,13 @@ void SrsEdgeForwarder::set_queue_size(double queue_size) return queue->set_queue_size(queue_size); } -int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req) +int SrsEdgeForwarder::initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r) { int ret = ERROR_SUCCESS; - _source = source; - _edge = edge; - _req = req; + source = s; + edge = e; + req = r; return ret; } @@ -314,36 +309,37 @@ int SrsEdgeForwarder::start() { int ret = ERROR_SUCCESS; + // reset the error code. send_error_code = ERROR_SUCCESS; - std::string ep_server; - int ep_port; - if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) { - return ret; + std::string url; + if (true) { + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); + srs_assert(conf); + + // select the origin. + std::string server = lb->select(conf->args); + int port = SRS_CONSTS_RTMP_DEFAULT_PORT; + srs_parse_hostport(server, server, port); + + // support vhost tranform for edge, + // @see https://github.com/simple-rtmp-server/srs/issues/372 + std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); + vhost = srs_string_replace(vhost, "[vhost]", req->vhost); + + url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream); } - srs_assert(client); - - client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); - client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); - - SrsRequest* req = _req; - if ((ret = client->handshake()) != ERROR_SUCCESS) { - srs_error("handshake with server failed. ret=%d", ret); - return ret; - } - if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) { - srs_error("connect with server failed. ret=%d", ret); - return ret; - } - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + // open socket. + int64_t cto = SRS_EDGE_FORWARDER_TIMEOUT_US; + int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US; + if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { + srs_warn("edge push %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); return ret; } - if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) { - srs_error("publish failed, stream=%s, stream_id=%d. ret=%d", - req->stream.c_str(), stream_id, ret); + if ((ret = sdk->publish()) != ERROR_SUCCESS) { + srs_error("edge push publish failed. ret=%d", ret); return ret; } @@ -353,12 +349,8 @@ int SrsEdgeForwarder::start() void SrsEdgeForwarder::stop() { pthread->stop(); - transport->close(); - + sdk->close(); queue->clear(); - - srs_freep(client); - kbps->set_io(NULL, NULL); } #define SYS_MAX_EDGE_SEND_MSGS 128 @@ -366,7 +358,7 @@ int SrsEdgeForwarder::cycle() { int ret = ERROR_SUCCESS; - client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); SrsAutoFree(SrsPithyPrint, pprint); @@ -382,7 +374,7 @@ int SrsEdgeForwarder::cycle() // read from client. if (true) { SrsCommonMessage* msg = NULL; - ret = client->recv_message(&msg); + ret = sdk->recv_message(&msg); srs_verbose("edge loop recv message. ret=%d", ret); if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { @@ -406,22 +398,17 @@ int SrsEdgeForwarder::cycle() // pithy print if (pprint->can_print()) { - kbps->sample(); - srs_trace("-> "SRS_CONSTS_LOG_EDGE_PUBLISH - " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", - pprint->age(), count, - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); + sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PUBLISH, pprint->age(), count); } // ignore when no messages. if (count <= 0) { - srs_verbose("no packets to push."); + srs_verbose("edge no packets to push."); continue; } // sendout messages, all messages are freed by send_and_free_messages(). - if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) { + if ((ret = sdk->send_and_free_messages(msgs.msgs, count)) != ERROR_SUCCESS) { srs_error("edge publish push message to server failed. ret=%d", ret); return ret; } @@ -456,7 +443,7 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) } srs_verbose("initialize shared ptr msg success."); - copy.stream_id = stream_id; + copy.stream_id = sdk->sid(); if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) { srs_error("enqueue edge publish msg failed. ret=%d", ret); } @@ -464,105 +451,6 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) return ret; } -int SrsEdgeForwarder::connect_server(string& ep_server, int& ep_port) -{ - int ret = ERROR_SUCCESS; - - // reopen - transport->close(); - - SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost); - srs_assert(conf); - - // select the origin. - if (true) { - std::string server = lb->select(conf->args); - ep_port = SRS_CONSTS_RTMP_DEFAULT_PORT; - srs_parse_hostport(server, ep_server, ep_port); - } - - // open socket. - int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US; - if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) { - srs_warn("edge push failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", - _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret); - return ret; - } - - srs_freep(client); - client = new SrsRtmpClient(transport); - - kbps->set_io(transport, transport); - - // open socket. - srs_trace("edge push connected, stream=%s, tcUrl=%s to server=%s, port=%d", - _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port); - - return ret; -} - -// TODO: FIXME: refine the connect_app. -int SrsEdgeForwarder::connect_app(string ep_server, int ep_port) -{ - int ret = ERROR_SUCCESS; - - SrsRequest* req = _req; - - // args of request takes the srs info. - if (req->args == NULL) { - req->args = SrsAmf0Any::object(); - } - - // notify server the edge identity, - // @see https://github.com/simple-rtmp-server/srs/issues/147 - SrsAmf0Object* data = req->args; - data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY)); - data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER)); - data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE)); - data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE)); - data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL)); - data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); - data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB)); - data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL)); - data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT)); - data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); - data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); - // for edge to directly get the id of client. - data->set("srs_pid", SrsAmf0Any::number(getpid())); - data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id())); - - // local ip of edge - std::vector ips = srs_get_local_ipv4_ips(); - assert(_srs_config->get_stats_network() < (int)ips.size()); - std::string local_ip = ips[_srs_config->get_stats_network()]; - data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str())); - - // support vhost tranform for edge, - // @see https://github.com/simple-rtmp-server/srs/issues/372 - std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); - vhost = srs_string_replace(vhost, "[vhost]", req->vhost); - // generate the tcUrl - std::string param = ""; - std::string tc_url = srs_generate_tc_url(ep_server, vhost, req->app, ep_port, param); - srs_trace("edge forward to %s:%d at %s", ep_server.c_str(), ep_port, tc_url.c_str()); - - // replace the tcUrl in request, - // which will replace the tc_url in client.connect_app(). - req->tcUrl = tc_url; - - // upnode server identity will show in the connect_app of client. - // @see https://github.com/simple-rtmp-server/srs/issues/160 - // the debug_srs_upnode is config in vhost and default to true. - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost); - if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) { - srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", - tc_url.c_str(), debug_srs_upnode, ret); - return ret; - } - - return ret; -} - SrsPlayEdge::SrsPlayEdge() { state = SrsEdgeStateInit; diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index de3b4891f..708a83f84 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -109,15 +109,11 @@ private: class SrsEdgeForwarder : public ISrsReusableThread2Handler { private: - int stream_id; -private: - SrsSource* _source; - SrsPublishEdge* _edge; - SrsRequest* _req; + SrsSource* source; + SrsPublishEdge* edge; + SrsRequest* req; SrsReusableThread2* pthread; - SrsTcpClient* transport; - SrsKbps* kbps; - SrsRtmpClient* client; + SrsSimpleRtmpClient* sdk; SrsLbRoundRobin* lb; /** * we must ensure one thread one fd principle, @@ -136,7 +132,7 @@ public: public: virtual void set_queue_size(double queue_size); public: - virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); + virtual int initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r); virtual int start(); virtual void stop(); // interface ISrsReusableThread2Handler @@ -144,9 +140,6 @@ public: virtual int cycle(); public: virtual int proxy(SrsCommonMessage* msg); -private: - virtual int connect_server(std::string& ep_server, int& ep_port); - virtual int connect_app(std::string ep_server, int ep_port); }; /** diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e45f2e8db..52865a4a0 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -97,12 +97,7 @@ SrsSimpleRtmpClient::~SrsSimpleRtmpClient() kbps->set_io(NULL, NULL); } -int SrsSimpleRtmpClient::connect(string url, int64_t timeout) -{ - return connect(url, "", timeout); -} - -int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) +int SrsSimpleRtmpClient::connect(string url, int64_t connect_timeout, int64_t stream_timeout) { int ret = ERROR_SUCCESS; @@ -113,14 +108,13 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) } // parse uri - if (!req) { - req = new SrsRequest(); - srs_parse_rtmp_url(url, req->tcUrl, req->stream); - srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param); - } + srs_freep(req); + req = new SrsRequest(); + srs_parse_rtmp_url(url, req->tcUrl, req->stream); + srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param); // connect host. - if ((ret = transport->connect(req->host, req->port, timeout)) != ERROR_SUCCESS) { + if ((ret = transport->connect(req->host, req->port, connect_timeout)) != ERROR_SUCCESS) { return ret; } @@ -129,15 +123,15 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) kbps->set_io(transport, transport); - client->set_recv_timeout(timeout); - client->set_send_timeout(timeout); + client->set_recv_timeout(stream_timeout); + client->set_send_timeout(stream_timeout); // connect to vhost/app if ((ret = client->handshake()) != ERROR_SUCCESS) { srs_error("sdk: handshake with server failed. ret=%d", ret); return ret; } - if ((ret = connect_app(vhost)) != ERROR_SUCCESS) { + if ((ret = connect_app()) != ERROR_SUCCESS) { srs_error("sdk: connect with server failed. ret=%d", ret); return ret; } @@ -149,7 +143,7 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) return ret; } -int SrsSimpleRtmpClient::connect_app(string vhost) +int SrsSimpleRtmpClient::connect_app() { int ret = ERROR_SUCCESS; @@ -185,10 +179,7 @@ int SrsSimpleRtmpClient::connect_app(string vhost) // generate the tcUrl std::string param = ""; std::string target_vhost = req->vhost; - if (vhost.empty()) { - target_vhost = vhost; - } - std::string tc_url = srs_generate_tc_url(req->host, target_vhost, req->app, req->port, param); + std::string tc_url = srs_generate_tc_url(req->host, req->vhost, req->app, req->port, param); // replace the tcUrl in request, // which will replace the tc_url in client.connect_app(). @@ -256,6 +247,25 @@ void SrsSimpleRtmpClient::kbps_sample(const char* label, int64_t age) srs_trace("<- %s time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", age, sr, sr30s, sr5m, rr, rr30s, rr5m); } +void SrsSimpleRtmpClient::kbps_sample(const char* label, int64_t age, int msgs) +{ + kbps->sample(); + + int sr = kbps->get_send_kbps(); + int sr30s = kbps->get_send_kbps_30s(); + int sr5m = kbps->get_send_kbps_5m(); + int rr = kbps->get_recv_kbps(); + int rr30s = kbps->get_recv_kbps_30s(); + int rr5m = kbps->get_recv_kbps_5m(); + + srs_trace("<- %s time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", age, msgs, sr, sr30s, sr5m, rr, rr30s, rr5m); +} + +int SrsSimpleRtmpClient::sid() +{ + return stream_id; +} + int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size) { int ret = ERROR_SUCCESS; @@ -286,6 +296,11 @@ int SrsSimpleRtmpClient::decode_message(SrsCommonMessage* msg, SrsPacket** ppack return client->decode_message(msg, ppacket); } +int SrsSimpleRtmpClient::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs) +{ + return client->send_and_free_messages(msgs, nb_msgs, stream_id); +} + void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) { transport->set_recv_timeout(timeout); diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index e274e471a..dab29ebfb 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -74,20 +74,22 @@ public: SrsSimpleRtmpClient(); virtual ~SrsSimpleRtmpClient(); public: - virtual int connect(std::string url, int64_t timeout); - virtual int connect(std::string url, std::string vhost, int64_t timeout); + virtual int connect(std::string url, int64_t connect_timeout, int64_t stream_timeout); private: - virtual int connect_app(std::string vhost); + virtual int connect_app(); public: virtual void close(); public: virtual int publish(); virtual int play(); virtual void kbps_sample(const char* label, int64_t age); + virtual void kbps_sample(const char* label, int64_t age, int msgs); + virtual int sid(); public: virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); virtual int recv_message(SrsCommonMessage** pmsg); virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); + virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs); public: virtual void set_recv_timeout(int64_t timeout); }; diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index 54788cecf..7a7bb8d3d 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -71,10 +71,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // the common io timeout, for both recv and send. #define SRS_CONSTS_RTMP_TIMEOUT_US (int64_t)(30*1000*1000LL) +// TODO: FIXME: remove following two macros. // the timeout to send data to client, // if timeout, close the connection. #define SRS_CONSTS_RTMP_SEND_TIMEOUT_US (int64_t)(30*1000*1000LL) - // the timeout to wait client data, // if timeout, close the connection. #define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL)