From cba667556026c0bd3b7eef9e8c5975be413659e8 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 4 May 2015 19:06:38 +0800 Subject: [PATCH] refine the http message, set the connection if required. --- trunk/src/app/srs_app_caster_flv.cpp | 173 ++++++++++++++++++++++++-- trunk/src/app/srs_app_caster_flv.hpp | 20 +++ trunk/src/app/srs_app_http.cpp | 15 ++- trunk/src/app/srs_app_http.hpp | 8 +- trunk/src/app/srs_app_http_api.cpp | 2 +- trunk/src/app/srs_app_http_client.cpp | 4 +- trunk/src/app/srs_app_http_conn.cpp | 2 +- 7 files changed, 206 insertions(+), 18 deletions(-) diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 8be72aec1..3a7493403 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -37,6 +37,12 @@ using namespace std; #include #include #include +#include +#include +#include +#include +#include +#include #define SRS_HTTP_FLV_STREAM_BUFFER 4096 @@ -84,10 +90,54 @@ void SrsAppCasterFlv::remove(SrsConnection* c) } int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) +{ + SrsDynamicHttpConn* conn = dynamic_cast(r->connection()); + srs_assert(conn); + + std::string app = srs_path_dirname(r->path()); + std::string stream = srs_path_basename(r->path()); + + std::string o = output; + if (!app.empty() && app != "/") { + o = srs_string_replace(o, "[app]", app); + } + o = srs_string_replace(o, "[stream]", stream); + + // remove the extension. + if (srs_string_ends_with(o, ".flv")) { + o = o.substr(0, o.length() - 4); + } + + return conn->proxy(w, r, o); +} + +SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) + : SrsHttpConn(cm, fd, m) +{ + + req = NULL; + io = NULL; + client = NULL; + stfd = NULL; + stream_id = 0; +} + +SrsDynamicHttpConn::~SrsDynamicHttpConn() +{ +} + +int SrsDynamicHttpConn::on_got_http_message(SrsHttpMessage* msg) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +int SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, SrsHttpMessage* r, std::string o) { int ret = ERROR_SUCCESS; - srs_info("flv: handle request at %s", r->path().c_str()); + output = o; + srs_trace("flv: proxy %s to %s", r->uri().c_str(), output.c_str()); char* buffer = new char[SRS_HTTP_FLV_STREAM_BUFFER]; SrsAutoFree(char, buffer); @@ -111,19 +161,126 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) return ret; } -SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) - : SrsHttpConn(cm, fd, m) +int SrsDynamicHttpConn::connect() { + int ret = ERROR_SUCCESS; + + // when ok, ignore. + // TODO: FIXME: should reconnect when disconnected. + if (io || client) { + return ret; + } + + // parse uri + if (!req) { + req = new SrsRequest(); + + size_t pos = string::npos; + string uri = req->tcUrl = output; + + // tcUrl, stream + if ((pos = uri.rfind("/")) != string::npos) { + req->stream = uri.substr(pos + 1); + req->tcUrl = uri = uri.substr(0, pos); + } + + srs_discovery_tc_url(req->tcUrl, + req->schema, req->host, req->vhost, req->app, req->port, + req->param); + } + + // connect host. + if ((ret = srs_socket_connect(req->host, ::atoi(req->port.c_str()), ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { + srs_error("mpegts: connect server %s:%s failed. ret=%d", req->host.c_str(), req->port.c_str(), ret); + return ret; + } + io = new SrsStSocket(stfd); + client = new SrsRtmpClient(io); + + client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); + client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); + + // connect to vhost/app + if ((ret = client->handshake()) != ERROR_SUCCESS) { + srs_error("mpegts: handshake with server failed. ret=%d", ret); + return ret; + } + if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed. ret=%d", ret); + return ret; + } + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + return ret; + } + + // publish. + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) { + srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d", + req->stream.c_str(), stream_id, ret); + return ret; + } + + return ret; } -SrsDynamicHttpConn::~SrsDynamicHttpConn() +// TODO: FIXME: refine the connect_app. +int SrsDynamicHttpConn::connect_app(string ep_server, string ep_port) { + int ret = ERROR_SUCCESS; + + // args of request takes the srs info. + if (req->args == NULL) { + req->args = SrsAmf0Any::object(); + } + + // notify server the edge identity, + // @see https://github.com/simple-rtmp-server/srs/issues/147 + SrsAmf0Object* data = req->args; + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY)); + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE)); + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE)); + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL)); + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB)); + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL)); + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT)); + data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); + data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); + // for edge to directly get the id of client. + data->set("srs_pid", SrsAmf0Any::number(getpid())); + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id())); + + // local ip of edge + std::vector ips = srs_get_local_ipv4_ips(); + assert(_srs_config->get_stats_network() < (int)ips.size()); + std::string local_ip = ips[_srs_config->get_stats_network()]; + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str())); + + // generate the tcUrl + std::string param = ""; + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param); + + // upnode server identity will show in the connect_app of client. + // @see https://github.com/simple-rtmp-server/srs/issues/160 + // the debug_srs_upnode is config in vhost and default to true. + bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost); + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d", + tc_url.c_str(), debug_srs_upnode, ret); + return ret; + } + + return ret; } -int SrsDynamicHttpConn::on_got_http_message(SrsHttpMessage* msg) +void SrsDynamicHttpConn::close() { - int ret = ERROR_SUCCESS; - return ret; + srs_freep(client); + srs_freep(io); + srs_freep(req); + srs_close_stfd(stfd); } SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) @@ -146,7 +303,7 @@ void SrsHttpFileReader::close() bool SrsHttpFileReader::is_open() { - return false; + return true; } int64_t SrsHttpFileReader::tellg() diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index 61899f713..79ee4e370 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -38,6 +38,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsConfDirective; class SrsHttpServeMux; class SrsHttpConn; +class SrsRtmpClient; +class SrsStSocket; +class SrsRequest; #include #include @@ -77,11 +80,28 @@ public: */ class SrsDynamicHttpConn : public SrsHttpConn { +private: + std::string output; +private: + SrsRequest* req; + st_netfd_t stfd; + SrsStSocket* io; + SrsRtmpClient* client; + int stream_id; public: SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); virtual ~SrsDynamicHttpConn(); public: virtual int on_got_http_message(SrsHttpMessage* msg); +public: + virtual int proxy(ISrsHttpResponseWriter* w, SrsHttpMessage* r, std::string o); +private: + // connect to rtmp output url. + // @remark ignore when not connected, reconnect when disconnected. + virtual int connect(); + virtual int connect_app(std::string ep_server, std::string ep_port); + // close the connected io and rtmp to ready to be re-connect. + virtual void close(); }; /** diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index 27979de4f..4ed1453f2 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -1078,8 +1078,9 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) return ret; } -SrsHttpMessage::SrsHttpMessage(SrsStSocket* io) +SrsHttpMessage::SrsHttpMessage(SrsStSocket* io, SrsConnection* c) { + conn = c; chunked = false; _uri = new SrsHttpUri(); _body = new SrsHttpResponseReader(this, io); @@ -1165,6 +1166,11 @@ char* SrsHttpMessage::http_ts_send_buffer() return _http_ts_send_buffer; } +SrsConnection* SrsHttpMessage::connection() +{ + return conn; +} + u_int8_t SrsHttpMessage::method() { return (u_int8_t)_header.method; @@ -1230,8 +1236,9 @@ string SrsHttpMessage::uri() { std::string uri = _uri->get_schema(); if (uri.empty()) { - uri += "http://"; + uri += "http"; } + uri += "://"; uri += host(); uri += path(); @@ -1393,7 +1400,7 @@ int SrsHttpParser::initialize(enum http_parser_type type) return ret; } -int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) +int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, SrsHttpMessage** ppmsg) { *ppmsg = NULL; @@ -1418,7 +1425,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) } // create msg - SrsHttpMessage* msg = new SrsHttpMessage(skt); + SrsHttpMessage* msg = new SrsHttpMessage(skt, conn); // initalize http msg, parse url. if ((ret = msg->update(url, &header, buffer, headers)) != ERROR_SUCCESS) { diff --git a/trunk/src/app/srs_app_http.hpp b/trunk/src/app/srs_app_http.hpp index 90344751b..190999b50 100644 --- a/trunk/src/app/srs_app_http.hpp +++ b/trunk/src/app/srs_app_http.hpp @@ -50,6 +50,7 @@ class SrsSimpleBuffer; class SrsHttpMuxEntry; class ISrsHttpResponseWriter; class SrsFastBuffer; +class SrsConnection; // http specification // CR = @@ -506,8 +507,10 @@ private: std::vector _headers; // the query map std::map _query; + // the transport connection, can be NULL. + SrsConnection* conn; public: - SrsHttpMessage(SrsStSocket* io); + SrsHttpMessage(SrsStSocket* io, SrsConnection* c); virtual ~SrsHttpMessage(); public: /** @@ -518,6 +521,7 @@ public: ); public: virtual char* http_ts_send_buffer(); + virtual SrsConnection* connection(); public: virtual u_int8_t method(); virtual u_int16_t status_code(); @@ -617,7 +621,7 @@ public: * or error and *ppmsg must be NULL. * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete(). */ - virtual int parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg); + virtual int parse_message(SrsStSocket* skt, SrsConnection* conn, SrsHttpMessage** ppmsg); private: /** * parse the HTTP message to member field: msg. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 0f3aac193..d4e3ea5fc 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -528,7 +528,7 @@ int SrsHttpApi::do_cycle() SrsHttpMessage* req = NULL; // get a http message - if ((ret = parser->parse_message(&skt, &req)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index a36ef69e1..377f9a79b 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -105,7 +105,7 @@ int SrsHttpClient::post(string path, string req, SrsHttpMessage** ppmsg) } SrsHttpMessage* msg = NULL; - if ((ret = parser->parse_message(skt, &msg)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) { srs_error("parse http post response failed. ret=%d", ret); return ret; } @@ -151,7 +151,7 @@ int SrsHttpClient::get(string path, std::string req, SrsHttpMessage** ppmsg) } SrsHttpMessage* msg = NULL; - if ((ret = parser->parse_message(skt, &msg)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) { srs_error("parse http post response failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 21eb95511..bdf8e2dc0 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -1388,7 +1388,7 @@ int SrsHttpConn::do_cycle() SrsHttpMessage* req = NULL; // get a http message - if ((ret = parser->parse_message(&skt, &req)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) { return ret; }