For #464, refine result of origin cluster api

pull/1080/head
winlin 7 years ago
parent 92f2bcd878
commit 2f09ec4353

@ -31,6 +31,7 @@ using namespace std;
#include <srs_app_config.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_service_utility.hpp>
#include <srs_kernel_utility.hpp>
SrsCoWorkers* SrsCoWorkers::_instance = NULL;
@ -64,20 +65,32 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string app, string stream)
return SrsJsonAny::null();
}
vector<string>& ips = srs_get_local_ips();
if (ips.empty()) {
vector<string> service_ports = _srs_config->get_listens();
if (service_ports.empty()) {
return SrsJsonAny::null();
}
SrsJsonArray* arr = SrsJsonAny::array();
for (int i = 0; i < (int)ips.size(); i++) {
arr->append(SrsJsonAny::object()
->set("ip", SrsJsonAny::str(ips.at(i).c_str()))
->set("vhost", SrsJsonAny::str(r->vhost.c_str()))
->set("self", SrsJsonAny::boolean(true)));
string service_ip = srs_get_public_internet_address();
string service_hostport = service_ports.at(0);
string service_host;
int service_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
srs_parse_hostport(service_hostport, service_host, service_port);
string backend = _srs_config->get_http_api_listen();
if (backend.find(":") == string::npos) {
backend = service_ip + ":" + backend;
}
return arr;
// The routers to detect loop and identify path.
SrsJsonArray* routers = SrsJsonAny::array()->append(SrsJsonAny::str(backend.c_str()));
return SrsJsonAny::object()
->set("ip", SrsJsonAny::str(service_ip.c_str()))
->set("port", SrsJsonAny::integer(service_port))
->set("vhost", SrsJsonAny::str(r->vhost.c_str()))
->set("api", SrsJsonAny::str(backend.c_str()))
->set("routers", routers);
}
SrsRequest* SrsCoWorkers::find_stream_info(string vhost, string app, string stream)

@ -616,31 +616,40 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
{
srs_error_t err = srs_success;
// create consumer of souce.
// Check page referer of player.
SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) {
if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
// Set the socket options for transport.
set_sock_options();
// Create a consumer of source.
SrsConsumer* consumer = NULL;
if ((err = source->create_consumer(this, consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer");
}
SrsAutoFree(SrsConsumer, consumer);
// use isolate thread to recv,
// Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);
// start isolate recv thread.
if ((err = trd.start()) != srs_success) {
return srs_error_wrap(err, "rtmp: start receive thread");
}
// delivery messages for clients playing stream.
// Deliver packets to peer.
wakable = consumer;
err = do_playing(source, consumer, &trd);
wakable = NULL;
// stop isolate recv thread
trd.stop();
// warn for the message is dropped.
// Drop all packets in receiving thread.
if (!trd.empty()) {
srs_warn("drop the received %d messages", trd.size());
}
@ -652,14 +661,9 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
{
srs_error_t err = srs_success;
srs_assert(consumer != NULL);
SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) {
if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
srs_assert(req);
srs_assert(consumer);
// initialize other components
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
@ -678,9 +682,6 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
// set the sock options.
set_sock_options();
srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
send_min_interval, mw_sleep, mw_enabled, realtime, tcp_nodelay);
@ -800,6 +801,7 @@ srs_error_t SrsRtmpConn::publishing(SrsSource* source)
return srs_error_wrap(err, "rtmp: callback on publish");
}
// TODO: FIXME: Should refine the state of publishing.
if ((err = acquire_publish(source)) == srs_success) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237

@ -103,6 +103,7 @@ private:
SrsBandwidth* bandwidth;
SrsSecurity* security;
// the wakable handler, maybe NULL.
// TODO: FIXME: Should refine the state for receiving thread.
ISrsWakable* wakable;
// elapse duration in ms
// for live play duration, for instance, rtmpdump to record.

@ -260,6 +260,12 @@ string srs_get_public_internet_address()
return ip;
}
// Finally, use first whatever kind of address.
if (!ips.empty()) {
_public_internet_address = ips.at(0);
return _public_internet_address;
}
return "";
}

Loading…
Cancel
Save