Refine redirect flow for origin cluster

pull/1568/head
winlin 5 years ago
parent e5285ecabf
commit 517974d451

@ -152,7 +152,7 @@ Please select according to languages:
### V3 changes
* v3.0, 2019-11-30, Fix #1501, use request ip for origin cluster. 3.0.66
* v3.0, 2019-11-30, Fix [#1501][bug #1501], use request ip for origin cluster. 3.0.66
* v3.0, 2019-11-30, Random tid for docker. 3.0.65
* v3.0, 2019-11-30, Refine debug info for edge. 3.0.64
* v3.0, 2019-10-30, Cover protocol stack RTMP. 3.0.63
@ -1492,6 +1492,7 @@ Winlin
[bug #1087]: https://github.com/ossrs/srs/issues/1087
[bug #1051]: https://github.com/ossrs/srs/issues/1051
[bug #1093]: https://github.com/ossrs/srs/issues/1093
[bug #1501]: https://github.com/ossrs/srs/issues/1501
[bug #xxxxxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxxxxx
[exo #828]: https://github.com/google/ExoPlayer/pull/828

@ -160,7 +160,7 @@ SrsEdgeIngester::SrsEdgeIngester()
edge = NULL;
req = NULL;
upstream = new SrsEdgeRtmpUpstream(redirect);
upstream = new SrsEdgeRtmpUpstream("");
lb = new SrsLbRoundRobin();
trd = new SrsDummyCoroutine();
}
@ -243,7 +243,8 @@ srs_error_t SrsEdgeIngester::cycle()
srs_error_t SrsEdgeIngester::do_cycle()
{
srs_error_t err = srs_success;
std::string redirect;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "do cycle pull");
@ -252,10 +253,6 @@ srs_error_t SrsEdgeIngester::do_cycle()
srs_freep(upstream);
upstream = new SrsEdgeRtmpUpstream(redirect);
// we only use the redict once.
// reset the redirect to empty, for maybe the origin changed.
redirect = "";
if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {
return srs_error_wrap(err, "on source id changed");
}
@ -267,8 +264,11 @@ srs_error_t SrsEdgeIngester::do_cycle()
if ((err = edge->on_ingest_play()) != srs_success) {
return srs_error_wrap(err, "notify edge play");
}
// set to larger timeout to read av data from origin.
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
err = ingest();
err = ingest(redirect);
// retry for rtmp 302 immediately.
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
@ -286,15 +286,16 @@ srs_error_t SrsEdgeIngester::do_cycle()
return err;
}
srs_error_t SrsEdgeIngester::ingest()
srs_error_t SrsEdgeIngester::ingest(string& redirect)
{
srs_error_t err = srs_success;
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
// set to larger timeout to read av data from origin.
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
// we only use the redict once.
// reset the redirect to empty, for maybe the origin changed.
redirect = "";
while (true) {
srs_error_t err = srs_success;
@ -318,7 +319,7 @@ srs_error_t SrsEdgeIngester::ingest()
srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg);
if ((err = process_publish_message(msg)) != srs_success) {
if ((err = process_publish_message(msg, redirect)) != srs_success) {
return srs_error_wrap(err, "process message");
}
}
@ -326,7 +327,7 @@ srs_error_t SrsEdgeIngester::ingest()
return err;
}
srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{
srs_error_t err = srs_success;

@ -115,8 +115,6 @@ private:
SrsCoroutine* trd;
SrsLbRoundRobin* lb;
SrsEdgeUpstream* upstream;
// For RTMP 302 redirect.
std::string redirect;
public:
SrsEdgeIngester();
virtual ~SrsEdgeIngester();
@ -131,8 +129,8 @@ public:
private:
virtual srs_error_t do_cycle();
private:
virtual srs_error_t ingest();
virtual srs_error_t process_publish_message(SrsCommonMessage* msg);
virtual srs_error_t ingest(std::string& redirect);
virtual srs_error_t process_publish_message(SrsCommonMessage* msg, std::string& redirect);
};
// The edge used to forward stream to origin.

Loading…
Cancel
Save