diff --git a/README.md b/README.md
index bfdd86d69..cd44f982c 100755
--- a/README.md
+++ b/README.md
@@ -153,7 +153,7 @@ Please select according to languages:
### V3 changes
* v3.0, 2019-11-30, [3.0 alpha2(3.0.66)][r3.0a2] released. 110831 lines.
-* v3.0, 2019-11-30, Fix [#1501][bug #1501], use request ip for origin cluster. 3.0.66
+* v3.0, 2019-11-30, Fix [#1501][bug #1501], use request coworker for origin cluster. 3.0.67
* v3.0, 2019-11-30, Random tid for docker. 3.0.65
* v3.0, 2019-11-30, Refine debug info for edge. 3.0.64
* v3.0, 2019-10-30, Cover protocol stack RTMP. 3.0.63
diff --git a/trunk/src/app/srs_app_coworkers.cpp b/trunk/src/app/srs_app_coworkers.cpp
index 48338f6a0..2dae87871 100644
--- a/trunk/src/app/srs_app_coworkers.cpp
+++ b/trunk/src/app/srs_app_coworkers.cpp
@@ -58,7 +58,7 @@ SrsCoWorkers* SrsCoWorkers::instance()
return _instance;
}
-SrsJsonAny* SrsCoWorkers::dumps(string vhost, string host, string app, string stream)
+SrsJsonAny* SrsCoWorkers::dumps(string vhost, string coworker, string app, string stream)
{
SrsRequest* r = find_stream_info(vhost, app, stream);
if (!r) {
@@ -80,7 +80,7 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string host, string app, string st
}
}
- // The ip of server, we use the request host as ip, if listen host is localhost or loopback.
+ // The ip of server, we use the request coworker-host as ip, if listen host is localhost or loopback.
// For example, the server may behind a NAT(192.x.x.x), while its ip is a docker ip(172.x.x.x),
// we should use the NAT(192.x.x.x) address as it's the exposed ip.
// @see https://github.com/ossrs/srs/issues/1501
@@ -89,7 +89,13 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string host, string app, string st
service_ip = listen_host;
}
if (service_ip.empty()) {
- service_ip = host;
+ int coworker_port;
+ string coworker_host = coworker;
+ if (coworker.find(":") != string::npos) {
+ srs_parse_hostport(coworker, coworker_host, coworker_port);
+ }
+
+ service_ip = coworker_host;
}
if (service_ip.empty()) {
service_ip = srs_get_public_internet_address();
@@ -103,7 +109,10 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string host, string app, string st
// The routers to detect loop and identify path.
SrsJsonArray* routers = SrsJsonAny::array()->append(SrsJsonAny::str(backend.c_str()));
-
+
+ srs_trace("Redirect vhost=%s, path=%s/%s to ip=%s, port=%d, api=%s",
+ vhost.c_str(), app.c_str(), stream.c_str(), service_ip.c_str(), listen_port, backend.c_str());
+
return SrsJsonAny::object()
->set("ip", SrsJsonAny::str(service_ip.c_str()))
->set("port", SrsJsonAny::integer(listen_port))
diff --git a/trunk/src/app/srs_app_coworkers.hpp b/trunk/src/app/srs_app_coworkers.hpp
index 23094ddb6..c25f1f300 100644
--- a/trunk/src/app/srs_app_coworkers.hpp
+++ b/trunk/src/app/srs_app_coworkers.hpp
@@ -46,7 +46,7 @@ private:
public:
static SrsCoWorkers* instance();
public:
- virtual SrsJsonAny* dumps(std::string vhost, std::string host, std::string app, std::string stream);
+ virtual SrsJsonAny* dumps(std::string vhost, std::string coworker, std::string app, std::string stream);
private:
virtual SrsRequest* find_stream_info(std::string vhost, std::string app, std::string stream);
public:
diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp
index 86bcd5fdf..a2d95e706 100644
--- a/trunk/src/app/srs_app_edge.cpp
+++ b/trunk/src/app/srs_app_edge.cpp
@@ -100,10 +100,13 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
string _schema, _vhost, _app, _stream, _param, _host;
srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _stream, _port, _param);
- srs_warn("RTMP redirect %s:%d to %s:%d stream=%s", server.c_str(), port, _host.c_str(), _port, _stream.c_str());
server = _host;
port = _port;
}
+
+ // Remember the current selected server.
+ selected_ip = server;
+ selected_port = port;
// support vhost tranform for edge,
// @see https://github.com/ossrs/srs/issues/372
@@ -144,6 +147,12 @@ void SrsEdgeRtmpUpstream::close()
srs_freep(sdk);
}
+void SrsEdgeRtmpUpstream::selected(string& server, int& port)
+{
+ server = selected_ip;
+ port = selected_port;
+}
+
void SrsEdgeRtmpUpstream::set_recv_timeout(srs_utime_t tm)
{
sdk->set_recv_timeout(tm);
@@ -272,6 +281,13 @@ srs_error_t SrsEdgeIngester::do_cycle()
// retry for rtmp 302 immediately.
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
+ int port;
+ string server;
+ upstream->selected(server, port);
+
+ string url = req->get_stream_url();
+ srs_warn("RTMP redirect %s from %s:%d to %s", url.c_str(), server.c_str(), port, redirect.c_str());
+
srs_error_reset(err);
continue;
}
diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp
index 122e0f928..f9ead1f63 100644
--- a/trunk/src/app/srs_app_edge.hpp
+++ b/trunk/src/app/srs_app_edge.hpp
@@ -80,6 +80,7 @@ public:
virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) = 0;
virtual void close() = 0;
public:
+ virtual void selected(std::string& server, int& port) = 0;
virtual void set_recv_timeout(srs_utime_t tm) = 0;
virtual void kbps_sample(const char* label, int64_t age) = 0;
};
@@ -91,6 +92,10 @@ private:
// use this as upstream.
std::string redirect;
SrsSimpleRtmpClient* sdk;
+private:
+ // Current selected server, the ip:port.
+ std::string selected_ip;
+ int selected_port;
public:
// @param rediect, override the server. ignore if empty.
SrsEdgeRtmpUpstream(std::string r);
@@ -101,6 +106,7 @@ public:
virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual void close();
public:
+ virtual void selected(std::string& server, int& port);
virtual void set_recv_timeout(srs_utime_t tm);
virtual void kbps_sample(const char* label, int64_t age);
};
diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp
index 9eb7ed2ca..bd6c01ced 100644
--- a/trunk/src/app/srs_app_http_api.cpp
+++ b/trunk/src/app/srs_app_http_api.cpp
@@ -1290,6 +1290,7 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
string vhost = r->query_get("vhost");
string app = r->query_get("app");
string stream = r->query_get("stream");
+ string coworker = r->query_get("coworker");
data->set("query", SrsJsonAny::object()
->set("ip", SrsJsonAny::str(ip.c_str()))
->set("vhost", SrsJsonAny::str(vhost.c_str()))
@@ -1297,7 +1298,7 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
->set("stream", SrsJsonAny::str(stream.c_str())));
SrsCoWorkers* coworkers = SrsCoWorkers::instance();
- data->set("origin", coworkers->dumps(vhost, ip, app, stream));
+ data->set("origin", coworkers->dumps(vhost, coworker, app, stream));
return srs_api_response(w, r, obj->dumps());
}
diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp
index d91417715..867c4aea1 100644
--- a/trunk/src/app/srs_app_rtmp_conn.cpp
+++ b/trunk/src/app/srs_app_rtmp_conn.cpp
@@ -610,7 +610,8 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
int port;
string host;
string url = "http://" + coworkers.at(i) + "/api/v1/clusters?"
- + "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream;
+ + "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream
+ + "&coworker=" + coworkers.at(i);
if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) {
return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str());
}
diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp
index a25f6b096..385fca95c 100644
--- a/trunk/src/core/srs_core.hpp
+++ b/trunk/src/core/srs_core.hpp
@@ -27,7 +27,7 @@
// The version config.
#define VERSION_MAJOR 3
#define VERSION_MINOR 0
-#define VERSION_REVISION 66
+#define VERSION_REVISION 67
// The macros generated by configure script.
#include