From 5a3fd1e68a537065b3e2f700ef7245a7c710f498 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 1 Dec 2019 19:24:17 +0800 Subject: [PATCH] Release v3.0-a2, 3.0.67 --- README.md | 2 +- trunk/src/app/srs_app_coworkers.cpp | 17 +++++++++++++---- trunk/src/app/srs_app_coworkers.hpp | 2 +- trunk/src/app/srs_app_edge.cpp | 18 +++++++++++++++++- trunk/src/app/srs_app_edge.hpp | 6 ++++++ trunk/src/app/srs_app_http_api.cpp | 3 ++- trunk/src/app/srs_app_rtmp_conn.cpp | 3 ++- trunk/src/core/srs_core.hpp | 2 +- 8 files changed, 43 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index bfdd86d69..cd44f982c 100755 --- a/README.md +++ b/README.md @@ -153,7 +153,7 @@ Please select according to languages: ### V3 changes * v3.0, 2019-11-30, [3.0 alpha2(3.0.66)][r3.0a2] released. 110831 lines. -* v3.0, 2019-11-30, Fix [#1501][bug #1501], use request ip for origin cluster. 3.0.66 +* v3.0, 2019-11-30, Fix [#1501][bug #1501], use request coworker for origin cluster. 3.0.67 * v3.0, 2019-11-30, Random tid for docker. 3.0.65 * v3.0, 2019-11-30, Refine debug info for edge. 3.0.64 * v3.0, 2019-10-30, Cover protocol stack RTMP. 3.0.63 diff --git a/trunk/src/app/srs_app_coworkers.cpp b/trunk/src/app/srs_app_coworkers.cpp index 48338f6a0..2dae87871 100644 --- a/trunk/src/app/srs_app_coworkers.cpp +++ b/trunk/src/app/srs_app_coworkers.cpp @@ -58,7 +58,7 @@ SrsCoWorkers* SrsCoWorkers::instance() return _instance; } -SrsJsonAny* SrsCoWorkers::dumps(string vhost, string host, string app, string stream) +SrsJsonAny* SrsCoWorkers::dumps(string vhost, string coworker, string app, string stream) { SrsRequest* r = find_stream_info(vhost, app, stream); if (!r) { @@ -80,7 +80,7 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string host, string app, string st } } - // The ip of server, we use the request host as ip, if listen host is localhost or loopback. + // The ip of server, we use the request coworker-host as ip, if listen host is localhost or loopback. // For example, the server may behind a NAT(192.x.x.x), while its ip is a docker ip(172.x.x.x), // we should use the NAT(192.x.x.x) address as it's the exposed ip. // @see https://github.com/ossrs/srs/issues/1501 @@ -89,7 +89,13 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string host, string app, string st service_ip = listen_host; } if (service_ip.empty()) { - service_ip = host; + int coworker_port; + string coworker_host = coworker; + if (coworker.find(":") != string::npos) { + srs_parse_hostport(coworker, coworker_host, coworker_port); + } + + service_ip = coworker_host; } if (service_ip.empty()) { service_ip = srs_get_public_internet_address(); @@ -103,7 +109,10 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string host, string app, string st // The routers to detect loop and identify path. SrsJsonArray* routers = SrsJsonAny::array()->append(SrsJsonAny::str(backend.c_str())); - + + srs_trace("Redirect vhost=%s, path=%s/%s to ip=%s, port=%d, api=%s", + vhost.c_str(), app.c_str(), stream.c_str(), service_ip.c_str(), listen_port, backend.c_str()); + return SrsJsonAny::object() ->set("ip", SrsJsonAny::str(service_ip.c_str())) ->set("port", SrsJsonAny::integer(listen_port)) diff --git a/trunk/src/app/srs_app_coworkers.hpp b/trunk/src/app/srs_app_coworkers.hpp index 23094ddb6..c25f1f300 100644 --- a/trunk/src/app/srs_app_coworkers.hpp +++ b/trunk/src/app/srs_app_coworkers.hpp @@ -46,7 +46,7 @@ private: public: static SrsCoWorkers* instance(); public: - virtual SrsJsonAny* dumps(std::string vhost, std::string host, std::string app, std::string stream); + virtual SrsJsonAny* dumps(std::string vhost, std::string coworker, std::string app, std::string stream); private: virtual SrsRequest* find_stream_info(std::string vhost, std::string app, std::string stream); public: diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 86bcd5fdf..a2d95e706 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -100,10 +100,13 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) string _schema, _vhost, _app, _stream, _param, _host; srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _stream, _port, _param); - srs_warn("RTMP redirect %s:%d to %s:%d stream=%s", server.c_str(), port, _host.c_str(), _port, _stream.c_str()); server = _host; port = _port; } + + // Remember the current selected server. + selected_ip = server; + selected_port = port; // support vhost tranform for edge, // @see https://github.com/ossrs/srs/issues/372 @@ -144,6 +147,12 @@ void SrsEdgeRtmpUpstream::close() srs_freep(sdk); } +void SrsEdgeRtmpUpstream::selected(string& server, int& port) +{ + server = selected_ip; + port = selected_port; +} + void SrsEdgeRtmpUpstream::set_recv_timeout(srs_utime_t tm) { sdk->set_recv_timeout(tm); @@ -272,6 +281,13 @@ srs_error_t SrsEdgeIngester::do_cycle() // retry for rtmp 302 immediately. if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) { + int port; + string server; + upstream->selected(server, port); + + string url = req->get_stream_url(); + srs_warn("RTMP redirect %s from %s:%d to %s", url.c_str(), server.c_str(), port, redirect.c_str()); + srs_error_reset(err); continue; } diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 122e0f928..f9ead1f63 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -80,6 +80,7 @@ public: virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) = 0; virtual void close() = 0; public: + virtual void selected(std::string& server, int& port) = 0; virtual void set_recv_timeout(srs_utime_t tm) = 0; virtual void kbps_sample(const char* label, int64_t age) = 0; }; @@ -91,6 +92,10 @@ private: // use this as upstream. std::string redirect; SrsSimpleRtmpClient* sdk; +private: + // Current selected server, the ip:port. + std::string selected_ip; + int selected_port; public: // @param rediect, override the server. ignore if empty. SrsEdgeRtmpUpstream(std::string r); @@ -101,6 +106,7 @@ public: virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); virtual void close(); public: + virtual void selected(std::string& server, int& port); virtual void set_recv_timeout(srs_utime_t tm); virtual void kbps_sample(const char* label, int64_t age); }; diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 9eb7ed2ca..bd6c01ced 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1290,6 +1290,7 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess string vhost = r->query_get("vhost"); string app = r->query_get("app"); string stream = r->query_get("stream"); + string coworker = r->query_get("coworker"); data->set("query", SrsJsonAny::object() ->set("ip", SrsJsonAny::str(ip.c_str())) ->set("vhost", SrsJsonAny::str(vhost.c_str())) @@ -1297,7 +1298,7 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess ->set("stream", SrsJsonAny::str(stream.c_str()))); SrsCoWorkers* coworkers = SrsCoWorkers::instance(); - data->set("origin", coworkers->dumps(vhost, ip, app, stream)); + data->set("origin", coworkers->dumps(vhost, coworker, app, stream)); return srs_api_response(w, r, obj->dumps()); } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index d91417715..867c4aea1 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -610,7 +610,8 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source) int port; string host; string url = "http://" + coworkers.at(i) + "/api/v1/clusters?" - + "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream; + + "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream + + "&coworker=" + coworkers.at(i); if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) { return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str()); } diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index a25f6b096..385fca95c 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -27,7 +27,7 @@ // The version config. #define VERSION_MAJOR 3 #define VERSION_MINOR 0 -#define VERSION_REVISION 66 +#define VERSION_REVISION 67 // The macros generated by configure script. #include