Merge branch 'feature/rtc' into develop

pull/1882/head
winlin 5 years ago
commit 51abd55b16

@ -77,12 +77,15 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
{
srs_error_t err = srs_success;
string ip = srs_get_peer_ip(srs_netfd_fileno(stfd));
int fd = srs_netfd_fileno(stfd);
string ip = srs_get_peer_ip(fd);
int port = srs_get_peer_port(fd);
if (ip.empty() && !_srs_config->empty_ip_ok()) {
srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd));
}
SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip);
SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip, port);
conns.push_back(conn);
if ((err = conn->start()) != srs_success) {
@ -138,8 +141,7 @@ srs_error_t SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
return err;
}
SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip)
: SrsHttpConn(cm, fd, m, cip)
SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port)
{
sdk = NULL;
pprint = SrsPithyPrint::create_caster();
@ -161,7 +163,7 @@ srs_error_t SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage
srs_error_t err = srs_success;
output = o;
srs_trace("flv: proxy %s to %s", r->uri().c_str(), output.c_str());
srs_trace("flv: proxy %s:%d %s to %s", ip.c_str(), port, r->uri().c_str(), output.c_str());
char* buffer = new char[SRS_HTTP_FLV_STREAM_BUFFER];
SrsAutoFreeA(char, buffer);

@ -79,7 +79,7 @@ private:
SrsPithyPrint* pprint;
SrsSimpleRtmpClient* sdk;
public:
SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip);
SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsDynamicHttpConn();
public:
virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg);

@ -31,11 +31,12 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_kernel_utility.hpp>
SrsConnection::SrsConnection(IConnectionManager* cm, srs_netfd_t c, string cip)
SrsConnection::SrsConnection(IConnectionManager* cm, srs_netfd_t c, string cip, int cport)
{
manager = cm;
stfd = c;
ip = cip;
port = cport;
create_time = srsu2ms(srs_get_system_time());
skt = new SrsStSocket();
@ -205,7 +206,8 @@ string SrsConnection::srs_id()
return trd->cid();
}
string SrsConnection::remote_ip() {
string SrsConnection::remote_ip()
{
return ip;
}

@ -50,8 +50,9 @@ protected:
IConnectionManager* manager;
// The underlayer st fd handler.
srs_netfd_t stfd;
// The ip of client.
// The ip and port of client.
std::string ip;
int port;
// The underlayer socket.
SrsStSocket* skt;
// The connection total kbps.
@ -64,7 +65,7 @@ protected:
// for current connection to log self create time and calculate the living time.
int64_t create_time;
public:
SrsConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip);
SrsConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip, int cport);
virtual ~SrsConnection();
// Interface ISrsKbpsDelta
public:

@ -1674,8 +1674,7 @@ srs_error_t SrsGoApiTcmalloc::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
}
#endif
SrsHttpApi::SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip)
: SrsConnection(cm, fd, cip)
SrsHttpApi::SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsConnection(cm, fd, cip, port)
{
mux = m;
cors = new SrsHttpCorsMux();
@ -1701,7 +1700,7 @@ srs_error_t SrsHttpApi::do_cycle()
{
srs_error_t err = srs_success;
srs_trace("API server client, ip=%s", ip.c_str());
srs_trace("API server client, ip=%s:%d", ip.c_str(), port);
// initialize parser
if ((err = parser->initialize(HTTP_REQUEST, true)) != srs_success) {
@ -1717,7 +1716,7 @@ srs_error_t SrsHttpApi::do_cycle()
if ((err = cors->initialize(mux, crossdomain_enabled)) != srs_success) {
return srs_error_wrap(err, "init cors");
}
// process http messages.
while ((err = trd->pull()) == srs_success) {
ISrsHttpMessage* req = NULL;
@ -1773,8 +1772,8 @@ srs_error_t SrsHttpApi::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessa
SrsHttpMessage* hm = dynamic_cast<SrsHttpMessage*>(r);
srs_assert(hm);
srs_trace("HTTP API %s %s, content-length=%" PRId64 ", chunked=%d", r->method_str().c_str(), r->url().c_str(),
r->content_length(), hm->is_chunked());
srs_trace("HTTP API %s:%d %s %s, content-length=%" PRId64 ", chunked=%d", ip.c_str(), port, r->method_str().c_str(),
r->url().c_str(), r->content_length(), hm->is_chunked());
// use cors server mux to serve http request, which will proxy to mux.
if ((err = cors->serve_http(w, r)) != srs_success) {

@ -261,7 +261,7 @@ private:
SrsHttpCorsMux* cors;
SrsHttpServeMux* mux;
public:
SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip);
SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpApi();
// Interface ISrsKbpsDelta
public:

@ -59,7 +59,7 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_app_st.hpp>
SrsHttpConn::SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip) : SrsConnection(cm, fd, cip)
SrsHttpConn::SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsConnection(cm, fd, cip, port)
{
parser = new SrsHttpParser();
cors = new SrsHttpCorsMux();
@ -102,7 +102,7 @@ srs_error_t SrsHttpConn::do_cycle()
// process http messages.
for (int req_id = 0; (err = trd->pull()) == srs_success; req_id++) {
// Try to receive a message from http.
srs_trace("HTTP client ip=%s, request=%d, to=%dms", ip.c_str(), req_id, srsu2ms(SRS_HTTP_RECV_TIMEOUT));
srs_trace("HTTP client ip=%s:%d, request=%d, to=%dms", ip.c_str(), port, req_id, srsu2ms(SRS_HTTP_RECV_TIMEOUT));
// get a http message
ISrsHttpMessage* req = NULL;
@ -154,8 +154,8 @@ srs_error_t SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMess
{
srs_error_t err = srs_success;
srs_trace("HTTP %s %s, content-length=%" PRId64 "",
r->method_str().c_str(), r->url().c_str(), r->content_length());
srs_trace("HTTP %s:%d %s %s, content-length=%" PRId64 "",
ip.c_str(), port, r->method_str().c_str(), r->url().c_str(), r->content_length());
// use cors server mux to serve http request, which will proxy to http_remux.
if ((err = cors->serve_http(w, r)) != srs_success) {
@ -184,8 +184,7 @@ srs_error_t SrsHttpConn::on_reload_http_stream_crossdomain()
return err;
}
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip)
: SrsHttpConn(cm, fd, m, cip)
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port)
{
}

@ -63,7 +63,7 @@ protected:
ISrsHttpServeMux* http_mux;
SrsHttpCorsMux* cors;
public:
SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpConn();
// Interface ISrsKbpsDelta
public:
@ -90,7 +90,7 @@ public:
class SrsResponseOnlyHttpConn : public SrsHttpConn
{
public:
SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsResponseOnlyHttpConn();
public:
// Directly read a HTTP request message.

@ -104,7 +104,7 @@ SrsClientInfo::~SrsClientInfo()
srs_freep(res);
}
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip) : SrsConnection(svr, c, cip)
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int port) : SrsConnection(svr, c, cip, port)
{
server = svr;
@ -151,7 +151,7 @@ srs_error_t SrsRtmpConn::do_cycle()
{
srs_error_t err = srs_success;
srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);

@ -117,7 +117,7 @@ private:
// About the rtmp client.
SrsClientInfo* info;
public:
SrsRtmpConn(SrsServer* svr, srs_netfd_t c, std::string cip);
SrsRtmpConn(SrsServer* svr, srs_netfd_t c, std::string cip, int port);
virtual ~SrsRtmpConn();
public:
virtual void dispose();

@ -253,11 +253,14 @@ srs_error_t SrsRtspConn::do_cycle()
srs_error_t err = srs_success;
// retrieve ip of client.
std::string ip = srs_get_peer_ip(srs_netfd_fileno(stfd));
int fd = srs_netfd_fileno(stfd);
std::string ip = srs_get_peer_ip(fd);
int port = srs_get_peer_port(fd);
if (ip.empty() && !_srs_config->empty_ip_ok()) {
srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd));
}
srs_trace("rtsp: serve %s", ip.c_str());
srs_trace("rtsp: serve %s:%d", ip.c_str(), port);
// consume all rtsp messages.
while (true) {

@ -1526,6 +1526,7 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnec
int fd = srs_netfd_fileno(stfd);
string ip = srs_get_peer_ip(fd);
int port = srs_get_peer_port(fd);
// for some keep alive application, for example, the keepalived,
// will send some tcp packet which we cann't got the ip,
@ -1537,13 +1538,12 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnec
// check connection limitation.
int max_connections = _srs_config->get_max_connections();
if (handler && (err = handler->on_accept_client(max_connections, (int)conns.size())) != srs_success) {
return srs_error_wrap(err, "drop client fd=%d, max=%d, cur=%d for err: %s",
fd, max_connections, (int)conns.size(), srs_error_desc(err).c_str());
return srs_error_wrap(err, "drop client fd=%d, ip=%s:%d, max=%d, cur=%d for err: %s",
fd, ip.c_str(), port, max_connections, (int)conns.size(), srs_error_desc(err).c_str());
}
if ((int)conns.size() >= max_connections) {
return srs_error_new(ERROR_EXCEED_CONNECTIONS,
"drop fd=%d, max=%d, cur=%d for exceed connection limits",
fd, max_connections, (int)conns.size());
return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits",
fd, ip.c_str(), port, max_connections, (int)conns.size());
}
// avoid fd leak when fork.
@ -1560,13 +1560,13 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnec
}
if (type == SrsListenerRtmpStream) {
*pconn = new SrsRtmpConn(this, stfd, ip);
*pconn = new SrsRtmpConn(this, stfd, ip, port);
} else if (type == SrsListenerHttpApi) {
*pconn = new SrsHttpApi(this, stfd, http_api_mux, ip);
*pconn = new SrsHttpApi(this, stfd, http_api_mux, ip, port);
} else if (type == SrsListenerHttpStream) {
*pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip);
*pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip, port);
} else {
srs_warn("close for no service handler. fd=%d, ip=%s", fd, ip.c_str());
srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
srs_close_stfd(stfd);
return err;
}

@ -1143,6 +1143,28 @@ string srs_get_peer_ip(int fd)
return std::string(saddr);
}
int srs_get_peer_port(int fd)
{
// discovery client information
sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
if (getpeername(fd, (sockaddr*)&addr, &addrlen) == -1) {
return 0;
}
int port = 0;
switch(addr.ss_family) {
case AF_INET:
port = ntohs(((sockaddr_in*)&addr)->sin_port);
break;
case AF_INET6:
port = ntohs(((sockaddr_in6*)&addr)->sin6_port);
break;
}
return port;
}
bool srs_is_boolean(string str)
{
return str == "true" || str == "false";

@ -640,6 +640,7 @@ extern std::string srs_get_local_ip(int fd);
extern int srs_get_local_port(int fd);
// Where peer ip is the client public ip which connected to server.
extern std::string srs_get_peer_ip(int fd);
extern int srs_get_peer_port(int fd);
// Whether string is boolean
// is_bool("true") == true

@ -43,6 +43,7 @@ SrsHttpParser::SrsHttpParser()
header = NULL;
p_body_start = p_header_tail = NULL;
type_ = HTTP_REQUEST;
}
SrsHttpParser::~SrsHttpParser()
@ -54,8 +55,13 @@ SrsHttpParser::~SrsHttpParser()
srs_error_t SrsHttpParser::initialize(enum http_parser_type type, bool allow_jsonp)
{
srs_error_t err = srs_success;
jsonp = allow_jsonp;
type_ = type;
// Initialize the parser, however it's not necessary.
http_parser_init(&parser, type_);
parser.data = (void*)this;
memset(&settings, 0, sizeof(settings));
settings.on_message_begin = on_message_begin;
@ -66,10 +72,6 @@ srs_error_t SrsHttpParser::initialize(enum http_parser_type type, bool allow_jso
settings.on_body = on_body;
settings.on_message_complete = on_message_complete;
http_parser_init(&parser, type);
// callback object ptr.
parser.data = (void*)this;
return err;
}
@ -81,7 +83,7 @@ srs_error_t SrsHttpParser::parse_message(ISrsReader* reader, ISrsHttpMessage** p
// Reset request data.
state = SrsHttpParseStateInit;
hp_header = http_parser();
memset(&hp_header, 0, sizeof(http_parser));
// The body that we have read from cache.
p_body_start = p_header_tail = NULL;
// We must reset the field name and value, because we may get a partial value in on_header_value.
@ -89,6 +91,19 @@ srs_error_t SrsHttpParser::parse_message(ISrsReader* reader, ISrsHttpMessage** p
// The header of the request.
srs_freep(header);
header = new SrsHttpHeader();
// Reset parser for each message.
// If the request is large, such as the fifth message at @utest ProtocolHTTPTest.ParsingLargeMessages,
// we got header and part of body, so the parser will stay at SrsHttpParseStateBody:
// ***MESSAGE BEGIN***
// ***HEADERS COMPLETE***
// Body: xxx
// when got next message, the whole next message is parsed as the body of previous one,
// and the message fail.
// @note You can comment the bellow line, the utest will fail.
http_parser_init(&parser, type_);
// callback object ptr.
parser.data = (void*)this;
// do parse
if ((err = parse_message_imp(reader)) != srs_success) {

@ -55,6 +55,7 @@ private:
http_parser hp_header;
std::string url;
SrsHttpHeader* header;
enum http_parser_type type_;
private:
// Point to the start of body.
const char* p_body_start;

@ -358,6 +358,11 @@ int srs_cond_signal(srs_cond_t cond)
return st_cond_signal((st_cond_t)cond);
}
int srs_cond_broadcast(srs_cond_t cond)
{
return st_cond_broadcast((st_cond_t)cond);
}
srs_mutex_t srs_mutex_new()
{
return (srs_mutex_t)st_mutex_new();

@ -74,6 +74,7 @@ extern int srs_cond_destroy(srs_cond_t cond);
extern int srs_cond_wait(srs_cond_t cond);
extern int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout);
extern int srs_cond_signal(srs_cond_t cond);
extern int srs_cond_broadcast(srs_cond_t cond);
extern srs_mutex_t srs_mutex_new();
extern int srs_mutex_destroy(srs_mutex_t mutex);

File diff suppressed because one or more lines are too long

@ -723,8 +723,8 @@ VOID TEST(HTTPServerTest, OPTIONSRead)
{
srs_error_t err;
// If OPTIONS, it has no content-length, not chunkted, but not infinite chunked,
// instead, it has no body.
// If request, it has no content-length, not chunked, it's not infinite chunked,
// actually, it has no body.
if (true) {
MockBufferIO io;
io.append("OPTIONS /rtc/v1/play HTTP/1.1\r\n\r\n");
@ -737,6 +737,19 @@ VOID TEST(HTTPServerTest, OPTIONSRead)
EXPECT_TRUE(br->eof());
}
// If response, it has no content-length, not chunked, it's infinite chunked,
if (true) {
MockBufferIO io;
io.append("HTTP/1.1 200 OK\r\n\r\n");
SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false));
ISrsHttpMessage* req = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &req));
SrsAutoFree(ISrsHttpMessage, req);
ISrsHttpResponseReader* br = req->body_reader();
EXPECT_FALSE(br->eof());
}
// So if OPTIONS has body, with chunked or content-length, it's ok to parsing it.
if (true) {
MockBufferIO io;

Loading…
Cancel
Save