diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 8edb04815..6b9e88d3f 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -42,6 +42,7 @@ using namespace std; #include #include #include +#include #define SRS_HTTP_FLV_STREAM_BUFFER 4096 @@ -117,20 +118,13 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) : SrsHttpConn(cm, fd, m) { - - req = NULL; - transport = new SrsTcpClient(); - client = NULL; - stream_id = 0; - + sdk = new SrsSimpleRtmpClient(); pprint = SrsPithyPrint::create_caster(); } SrsDynamicHttpConn::~SrsDynamicHttpConn() { - close(); - - srs_freep(transport); + srs_freep(sdk); srs_freep(pprint); } @@ -176,7 +170,7 @@ int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std } ret = do_proxy(rr, &dec); - close(); + sdk->close(); return ret; } @@ -189,7 +183,7 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) while (!rr->eof()) { pprint->elapse(); - if ((ret = connect()) != ERROR_SUCCESS) { + if ((ret = sdk->connect(output)) != ERROR_SUCCESS) { return ret; } @@ -212,13 +206,17 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) return ret; } - if ((ret = rtmp_write_packet(type, time, data, size)) != ERROR_SUCCESS) { + if ((ret = sdk->rtmp_write_packet(type, time, data, size)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("flv: proxy rtmp packet failed. ret=%d", ret); } return ret; } + if (pprint->can_print()) { + srs_trace("flv: send msg %d age=%d, dts=%d, size=%d", type, pprint->age(), time, size); + } + if ((ret = dec->read_previous_tag_size(pps)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("flv: proxy tag header pps failed. ret=%d", ret); @@ -230,142 +228,6 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) return ret; } -int SrsDynamicHttpConn::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size) -{ - int ret = ERROR_SUCCESS; - - SrsSharedPtrMessage* msg = NULL; - - if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) { - srs_error("flv: create shared ptr msg failed. ret=%d", ret); - return ret; - } - srs_assert(msg); - - if (pprint->can_print()) { - srs_trace("flv: send msg %s age=%d, dts=%"PRId64", size=%d", - msg->is_audio()? "A":msg->is_video()? "V":"N", pprint->age(), msg->timestamp, msg->size); - } - - // send out encoded msg. - if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { - return ret; - } - - return ret; -} - -int SrsDynamicHttpConn::connect() -{ - int ret = ERROR_SUCCESS; - - // when ok, ignore. - // TODO: FIXME: should reconnect when disconnected. - if (transport->connected()) { - return ret; - } - - // parse uri - if (!req) { - req = new SrsRequest(); - srs_parse_rtmp_url(output, req->tcUrl, req->stream); - srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param); - } - - // connect host. - if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) { - return ret; - } - - srs_freep(client); - client = new SrsRtmpClient(transport); - - client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); - client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); - - // connect to vhost/app - if ((ret = client->handshake()) != ERROR_SUCCESS) { - srs_error("mpegts: handshake with server failed. ret=%d", ret); - return ret; - } - if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) { - srs_error("mpegts: connect with server failed. ret=%d", ret); - return ret; - } - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { - srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret); - return ret; - } - - // publish. - if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) { - srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d", - req->stream.c_str(), stream_id, ret); - return ret; - } - - return ret; -} - -// TODO: FIXME: refine the connect_app. -int SrsDynamicHttpConn::connect_app(string ep_server, int ep_port) -{ - int ret = ERROR_SUCCESS; - - // args of request takes the srs info. - if (req->args == NULL) { - req->args = SrsAmf0Any::object(); - } - - // notify server the edge identity, - // @see https://github.com/simple-rtmp-server/srs/issues/147 - SrsAmf0Object* data = req->args; - data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY)); - data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); - data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE)); - data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE)); - data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL)); - data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); - data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB)); - data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL)); - data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT)); - data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); - data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); - // for edge to directly get the id of client. - data->set("srs_pid", SrsAmf0Any::number(getpid())); - data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id())); - - // local ip of edge - std::vector ips = srs_get_local_ipv4_ips(); - assert(_srs_config->get_stats_network() < (int)ips.size()); - std::string local_ip = ips[_srs_config->get_stats_network()]; - data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str())); - - // generate the tcUrl - std::string param = ""; - std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param); - - // upnode server identity will show in the connect_app of client. - // @see https://github.com/simple-rtmp-server/srs/issues/160 - // the debug_srs_upnode is config in vhost and default to true. - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost); - if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) { - srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d", - tc_url.c_str(), debug_srs_upnode, ret); - return ret; - } - - return ret; -} - -void SrsDynamicHttpConn::close() -{ - transport->close(); - - srs_freep(client); - srs_freep(req); -} - SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) { http = h; diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index 7020fde41..9dd0f8550 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -44,6 +44,7 @@ class SrsPithyPrint; class ISrsHttpResponseReader; class SrsFlvDecoder; class SrsTcpClient; +class SrsSimpleRtmpClient; #include #include @@ -85,11 +86,7 @@ class SrsDynamicHttpConn : public SrsHttpConn private: std::string output; SrsPithyPrint* pprint; -private: - SrsRequest* req; - SrsTcpClient* transport; - SrsRtmpClient* client; - int stream_id; + SrsSimpleRtmpClient* sdk; public: SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); virtual ~SrsDynamicHttpConn(); @@ -99,14 +96,6 @@ public: virtual int proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o); private: virtual int do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec); - virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); -private: - // connect to rtmp output url. - // @remark ignore when not connected, reconnect when disconnected. - virtual int connect(); - virtual int connect_app(std::string ep_server, int ep_port); - // close the connected io and rtmp to ready to be re-connect. - virtual void close(); }; /** diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index c02dd8b75..d612af6e2 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -76,6 +76,152 @@ using namespace std; // when edge timeout, retry next. #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL) +SrsSimpleRtmpClient::SrsSimpleRtmpClient() +{ + req = NULL; + client = NULL; + + transport = new SrsTcpClient(); + stream_id = 0; +} + +SrsSimpleRtmpClient::~SrsSimpleRtmpClient() +{ + close(); + + srs_freep(transport); +} + +int SrsSimpleRtmpClient::connect(string url) +{ + int ret = ERROR_SUCCESS; + + // when ok, ignore. + // TODO: FIXME: should reconnect when disconnected. + if (transport->connected()) { + return ret; + } + + // parse uri + if (!req) { + req = new SrsRequest(); + srs_parse_rtmp_url(url, req->tcUrl, req->stream); + srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param); + } + + // connect host. + if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) { + return ret; + } + + srs_freep(client); + client = new SrsRtmpClient(transport); + + client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); + client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); + + // connect to vhost/app + if ((ret = client->handshake()) != ERROR_SUCCESS) { + srs_error("mpegts: handshake with server failed. ret=%d", ret); + return ret; + } + if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed. ret=%d", ret); + return ret; + } + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + return ret; + } + + // publish. + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) { + srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d", + req->stream.c_str(), stream_id, ret); + return ret; + } + + return ret; +} + +void SrsSimpleRtmpClient::close() +{ + transport->close(); + + srs_freep(client); + srs_freep(req); +} + +int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size) +{ + int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* msg = NULL; + + if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) { + srs_error("flv: create shared ptr msg failed. ret=%d", ret); + return ret; + } + srs_assert(msg); + + // send out encoded msg. + if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsSimpleRtmpClient::connect_app(string ep_server, int ep_port) +{ + int ret = ERROR_SUCCESS; + + // args of request takes the srs info. + if (req->args == NULL) { + req->args = SrsAmf0Any::object(); + } + + // notify server the edge identity, + // @see https://github.com/simple-rtmp-server/srs/issues/147 + SrsAmf0Object* data = req->args; + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY)); + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE)); + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE)); + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL)); + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB)); + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL)); + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT)); + data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); + data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); + // for edge to directly get the id of client. + data->set("srs_pid", SrsAmf0Any::number(getpid())); + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id())); + + // local ip of edge + std::vector ips = srs_get_local_ipv4_ips(); + assert(_srs_config->get_stats_network() < (int)ips.size()); + std::string local_ip = ips[_srs_config->get_stats_network()]; + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str())); + + // generate the tcUrl + std::string param = ""; + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param); + + // upnode server identity will show in the connect_app of client. + // @see https://github.com/simple-rtmp-server/srs/issues/160 + // the debug_srs_upnode is config in vhost and default to true. + bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost); + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d", + tc_url.c_str(), debug_srs_upnode, ret); + return ret; + } + + return ret; +} + SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) : SrsConnection(svr, c) { diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 5ae160537..be8192781 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -58,8 +58,30 @@ class SrsSecurity; class ISrsWakable; /** -* the client provides the main logic control for RTMP clients. -*/ + * the simple rtmp client stub, use SrsRtmpClient and provides high level APIs. + */ +class SrsSimpleRtmpClient +{ +private: + SrsRequest* req; + SrsTcpClient* transport; + SrsRtmpClient* client; + int stream_id; +public: + SrsSimpleRtmpClient(); + virtual ~SrsSimpleRtmpClient(); +public: + virtual int connect(std::string url); + virtual void close(); +public: + virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); +private: + virtual int connect_app(std::string ep_server, int ep_port); +}; + +/** + * the client provides the main logic control for RTMP clients. + */ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandler { // for the thread to directly access any field of connection.