diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 2ac197ebc..8fcb06c05 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -481,7 +481,7 @@ vhost rtc.vhost.srs.com { # the streamer cast stream from other protocol to SRS over RTMP. # @see https://github.com/ossrs/srs/tree/develop#stream-architecture -# MPEGTS over UDP +# Push MPEGTS over UDP, see https://github.com/ossrs/srs/issues/250 stream_caster { # whether stream caster is enabled. # default: off @@ -498,7 +498,7 @@ stream_caster { listen 8935; } -# FLV +# Push HTTP-FLV stream, see https://github.com/ossrs/srs/issues/2611 stream_caster { # whether stream caster is enabled. # default: off diff --git a/trunk/conf/push.flv.conf b/trunk/conf/push.flv.conf index d3079b7d5..985f7a784 100644 --- a/trunk/conf/push.flv.conf +++ b/trunk/conf/push.flv.conf @@ -18,12 +18,8 @@ stream_caster { listen 8936; } vhost __defaultVhost__ { - hls { - enabled on; - hls_fragment 10; - hls_window 60; - hls_path ./objs/nginx/html; - hls_m3u8_file [app]/[stream].m3u8; - hls_ts_file [app]/[stream]-[seq].ts; + http_remux { + enabled on; + mount [vhost]/[app]/[stream].flv; } } diff --git a/trunk/conf/push.mpegts.over.udp.conf b/trunk/conf/push.mpegts.over.udp.conf index 2e02753ab..594347923 100644 --- a/trunk/conf/push.mpegts.over.udp.conf +++ b/trunk/conf/push.mpegts.over.udp.conf @@ -11,7 +11,16 @@ stream_caster { enabled on; caster mpegts_over_udp; output rtmp://127.0.0.1/live/livestream; - listen 1935; + listen 8935; +} +http_server { + enabled on; + listen 8080; + dir ./objs/nginx/html; } vhost __defaultVhost__ { + http_remux { + enabled on; + mount [vhost]/[app]/[stream].flv; + } } diff --git a/trunk/conf/push.rtsp.conf b/trunk/conf/push.rtsp.conf deleted file mode 100644 index ea17332e4..000000000 --- a/trunk/conf/push.rtsp.conf +++ /dev/null @@ -1,2 +0,0 @@ -# push MPEG-TS over UDP to SRS. -# @note Removed for https://github.com/ossrs/srs/issues/2304#issuecomment-826009290 diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 8c0762136..80d3e8534 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -28,10 +28,66 @@ using namespace std; #define SRS_HTTP_FLV_STREAM_BUFFER 4096 -SrsAppCasterFlv::SrsAppCasterFlv(SrsConfDirective* c) +SrsHttpFlvListener::SrsHttpFlvListener() +{ + listener_ = new SrsTcpListener(this); + caster_ = new SrsAppCasterFlv(); +} + +SrsHttpFlvListener::~SrsHttpFlvListener() +{ + srs_freep(caster_); + srs_freep(listener_); +} + +srs_error_t SrsHttpFlvListener::initialize(SrsConfDirective* c) +{ + srs_error_t err = srs_success; + + int port = _srs_config->get_stream_caster_listen(c); + if (port <= 0) { + return srs_error_new(ERROR_STREAM_CASTER_PORT, "invalid port=%d", port); + } + + listener_->set_endpoint(srs_any_address_for_listener(), port)->set_label("PUSH-FLV"); + + if ((err = caster_->initialize(c)) != srs_success) { + return srs_error_wrap(err, "init caster port=%d", port); + } + + return err; +} + +srs_error_t SrsHttpFlvListener::listen() +{ + srs_error_t err = srs_success; + + if ((err = listener_->listen()) != srs_success) { + return srs_error_wrap(err, "listen"); + } + + return err; +} + +void SrsHttpFlvListener::close() +{ + listener_->close(); +} + +srs_error_t SrsHttpFlvListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stfd) +{ + srs_error_t err = caster_->on_tcp_client(listener, stfd); + if (err != srs_success) { + srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + + return err; +} + +SrsAppCasterFlv::SrsAppCasterFlv() { http_mux = new SrsHttpServeMux(); - output = _srs_config->get_stream_caster_output(c); manager = new SrsResourceManager("CFLV"); } @@ -41,9 +97,11 @@ SrsAppCasterFlv::~SrsAppCasterFlv() srs_freep(manager); } -srs_error_t SrsAppCasterFlv::initialize() +srs_error_t SrsAppCasterFlv::initialize(SrsConfDirective* c) { srs_error_t err = srs_success; + + output = _srs_config->get_stream_caster_output(c); if ((err = http_mux->handle("/", this)) != srs_success) { return srs_error_wrap(err, "handle root"); @@ -56,7 +114,7 @@ srs_error_t SrsAppCasterFlv::initialize() return err; } -srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) +srs_error_t SrsAppCasterFlv::on_tcp_client(ISrsListener* listener, srs_netfd_t stfd) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index 54d61cf08..b0378e788 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -22,6 +22,7 @@ class ISrsHttpResponseReader; class SrsFlvDecoder; class SrsTcpClient; class SrsSimpleRtmpClient; +class SrsAppCasterFlv; #include #include @@ -29,6 +30,24 @@ class SrsSimpleRtmpClient; #include #include +// A TCP listener, for flv stream server. +class SrsHttpFlvListener : public ISrsTcpHandler, public ISrsListener +{ +private: + SrsTcpListener* listener_; + SrsAppCasterFlv* caster_; +public: + SrsHttpFlvListener(); + virtual ~SrsHttpFlvListener(); +public: + srs_error_t initialize(SrsConfDirective* c); + virtual srs_error_t listen(); + void close(); +// Interface ISrsTcpHandler +public: + virtual srs_error_t on_tcp_client(ISrsListener* listener, srs_netfd_t stfd); +}; + // The stream caster for flv stream over HTTP POST. class SrsAppCasterFlv : public ISrsTcpHandler, public ISrsResourceManager, public ISrsHttpHandler { @@ -38,13 +57,13 @@ private: std::vector conns; SrsResourceManager* manager; public: - SrsAppCasterFlv(SrsConfDirective* c); + SrsAppCasterFlv(); virtual ~SrsAppCasterFlv(); public: - virtual srs_error_t initialize(); + virtual srs_error_t initialize(SrsConfDirective* c); // Interface ISrsTcpHandler public: - virtual srs_error_t on_tcp_client(srs_netfd_t stfd); + virtual srs_error_t on_tcp_client(ISrsListener* listener, srs_netfd_t stfd); // Interface ISrsResourceManager public: virtual void remove(ISrsResource* c); diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 07f5d2e3f..05d1a310e 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -49,11 +49,6 @@ ISrsUdpHandler::~ISrsUdpHandler() { } -srs_error_t ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/) -{ - return srs_success; -} - ISrsUdpMuxHandler::ISrsUdpMuxHandler() { } @@ -62,12 +57,11 @@ ISrsUdpMuxHandler::~ISrsUdpMuxHandler() { } -srs_error_t ISrsUdpMuxHandler::on_stfd_change(srs_netfd_t /*fd*/) +ISrsListener::ISrsListener() { - return srs_success; } -void ISrsUdpHandler::set_stfd(srs_netfd_t /*fd*/) +ISrsListener::~ISrsListener() { } @@ -79,12 +73,12 @@ ISrsTcpHandler::~ISrsTcpHandler() { } -SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p) +SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h) { handler = h; - ip = i; - port = p; lfd = NULL; + port = 0; + label_ = "UDP"; nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; @@ -99,6 +93,19 @@ SrsUdpListener::~SrsUdpListener() srs_freepa(buf); } +SrsUdpListener* SrsUdpListener::set_label(const std::string& label) +{ + label_ = label; + return this; +} + +SrsUdpListener* SrsUdpListener::set_endpoint(const std::string& i, int p) +{ + ip = i; + port = p; + return this; +} + int SrsUdpListener::fd() { return srs_netfd_fileno(lfd); @@ -163,8 +170,6 @@ srs_error_t SrsUdpListener::listen() } set_socket_buffer(); - - handler->set_stfd(lfd); srs_freep(trd); trd = new SrsSTCoroutine("udp", this, _srs_context->get_id()); @@ -175,6 +180,11 @@ srs_error_t SrsUdpListener::listen() return err; } +void SrsUdpListener::close() +{ + trd->stop(); +} + srs_error_t SrsUdpListener::cycle() { srs_error_t err = srs_success; @@ -211,14 +221,12 @@ srs_error_t SrsUdpListener::cycle() return err; } -SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p) +SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h) { handler = h; - ip = i; - port = p; - + port = 0; lfd = NULL; - + label_ = "TCP"; trd = new SrsDummyCoroutine(); } @@ -228,15 +236,31 @@ SrsTcpListener::~SrsTcpListener() srs_close_stfd(lfd); } -int SrsTcpListener::fd() +SrsTcpListener* SrsTcpListener::set_label(const std::string& label) { - return srs_netfd_fileno(lfd);; + label_ = label; + return this; +} + +SrsTcpListener* SrsTcpListener::set_endpoint(const std::string& i, int p) +{ + ip = i; + port = p; + return this; +} + +SrsTcpListener* SrsTcpListener::set_endpoint(const std::string& endpoint) +{ + std::string ip; int port; + srs_parse_endpoint(endpoint, ip, port); + return set_endpoint(ip, port); } srs_error_t SrsTcpListener::listen() { srs_error_t err = srs_success; + srs_close_stfd(lfd); if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) { return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port); } @@ -246,10 +270,19 @@ srs_error_t SrsTcpListener::listen() if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "start coroutine"); } + + int fd = srs_netfd_fileno(lfd); + srs_trace("%s listen at tcp://%s:%d, fd=%d", label_.c_str(), ip.c_str(), port, fd); return err; } +void SrsTcpListener::close() +{ + trd->stop(); + srs_close_stfd(lfd); +} + srs_error_t SrsTcpListener::cycle() { srs_error_t err = srs_success; @@ -268,7 +301,7 @@ srs_error_t SrsTcpListener::cycle() return srs_error_wrap(err, "set closeexec"); } - if ((err = handler->on_tcp_client(fd)) != srs_success) { + if ((err = handler->on_tcp_client(this, fd)) != srs_success) { return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd)); } } @@ -276,6 +309,71 @@ srs_error_t SrsTcpListener::cycle() return err; } +SrsMultipleTcpListeners::SrsMultipleTcpListeners(ISrsTcpHandler* h) +{ + handler_ = h; +} + +SrsMultipleTcpListeners::~SrsMultipleTcpListeners() +{ + for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { + SrsTcpListener* l = *it; + srs_freep(l); + } +} + +SrsMultipleTcpListeners* SrsMultipleTcpListeners::set_label(const std::string& label) +{ + for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { + SrsTcpListener* l = *it; + l->set_label(label); + } + + return this; +} + +SrsMultipleTcpListeners* SrsMultipleTcpListeners::add(const std::vector& endpoints) +{ + for (int i = 0; i < (int) endpoints.size(); i++) { + string ip; int port; + srs_parse_endpoint(endpoints[i], ip, port); + + SrsTcpListener* l = new SrsTcpListener(this); + listeners_.push_back(l->set_endpoint(ip, port)); + } + + return this; +} + +srs_error_t SrsMultipleTcpListeners::listen() +{ + srs_error_t err = srs_success; + + for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { + SrsTcpListener* l = *it; + + if ((err = l->listen()) != srs_success) { + return srs_error_wrap(err, "listen"); + } + } + + return err; +} + +void SrsMultipleTcpListeners::close() +{ + for (vector::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { + SrsTcpListener* l = *it; + srs_freep(l); + } + listeners_.clear(); +} + +srs_error_t SrsMultipleTcpListeners::on_tcp_client(ISrsListener* listener, srs_netfd_t stfd) +{ + return handler_->on_tcp_client(this, stfd); +} + SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd) { nn_msgs_for_yield_ = 0; diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index ca35b8d60..c70976180 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -14,6 +14,7 @@ #include #include +#include #include @@ -21,6 +22,7 @@ struct sockaddr; class SrsBuffer; class SrsUdpMuxSocket; +class ISrsListener; // The udp packet handler. class ISrsUdpHandler @@ -28,12 +30,6 @@ class ISrsUdpHandler public: ISrsUdpHandler(); virtual ~ISrsUdpHandler(); -public: - // When fd changed, for instance, reload the listen port, - // notify the handler and user can do something. - virtual srs_error_t on_stfd_change(srs_netfd_t fd); - - virtual void set_stfd(srs_netfd_t fd); public: // When udp listener got a udp packet, notice server to process it. // @param type, the client type, used to create concrete connection, @@ -45,17 +41,26 @@ public: virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) = 0; }; -// TODO: FIXME: Add comments? +// The UDP packet handler. TODO: FIXME: Merge with ISrsUdpHandler class ISrsUdpMuxHandler { public: ISrsUdpMuxHandler(); virtual ~ISrsUdpMuxHandler(); public: - virtual srs_error_t on_stfd_change(srs_netfd_t fd); virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt) = 0; }; +// All listener should support listen method. +class ISrsListener +{ +public: + ISrsListener(); + virtual ~ISrsListener(); +public: + virtual srs_error_t listen() = 0; +}; + // The tcp connection handler. class ISrsTcpHandler { @@ -64,13 +69,14 @@ public: virtual ~ISrsTcpHandler(); public: // When got tcp client. - virtual srs_error_t on_tcp_client(srs_netfd_t stfd) = 0; + virtual srs_error_t on_tcp_client(ISrsListener* listener, srs_netfd_t stfd) = 0; }; // Bind udp port, start thread to recv packet and handler it. class SrsUdpListener : public ISrsCoroutineHandler { protected: + std::string label_; srs_netfd_t lfd; SrsCoroutine* trd; protected: @@ -81,24 +87,29 @@ protected: std::string ip; int port; public: - SrsUdpListener(ISrsUdpHandler* h, std::string i, int p); + SrsUdpListener(ISrsUdpHandler* h); virtual ~SrsUdpListener(); public: + SrsUdpListener* set_label(const std::string& label); + SrsUdpListener* set_endpoint(const std::string& i, int p); +private: virtual int fd(); virtual srs_netfd_t stfd(); private: void set_socket_buffer(); public: virtual srs_error_t listen(); + void close(); // Interface ISrsReusableThreadHandler. public: virtual srs_error_t cycle(); }; // Bind and listen tcp port, use handler to process the client. -class SrsTcpListener : public ISrsCoroutineHandler +class SrsTcpListener : public ISrsCoroutineHandler, public ISrsListener { private: + std::string label_; srs_netfd_t lfd; SrsCoroutine* trd; private: @@ -106,17 +117,40 @@ private: std::string ip; int port; public: - SrsTcpListener(ISrsTcpHandler* h, std::string i, int p); + SrsTcpListener(ISrsTcpHandler* h); virtual ~SrsTcpListener(); public: - virtual int fd(); + SrsTcpListener* set_label(const std::string& label); + SrsTcpListener* set_endpoint(const std::string& i, int p); + SrsTcpListener* set_endpoint(const std::string& endpoint); public: virtual srs_error_t listen(); + void close(); // Interface ISrsReusableThreadHandler. public: virtual srs_error_t cycle(); }; +// Bind and listen tcp port, use handler to process the client. +class SrsMultipleTcpListeners : public ISrsListener, public ISrsTcpHandler +{ +private: + ISrsTcpHandler* handler_; + std::vector listeners_; +public: + SrsMultipleTcpListeners(ISrsTcpHandler* h); + virtual ~SrsMultipleTcpListeners(); +public: + SrsMultipleTcpListeners* set_label(const std::string& label); + SrsMultipleTcpListeners* add(const std::vector& endpoints); +public: + srs_error_t listen(); + void close(); +// Interface ISrsTcpHandler +public: + virtual srs_error_t on_tcp_client(ISrsListener* listener, srs_netfd_t stfd); +}; + // TODO: FIXME: Rename it. Refine it for performance issue. class SrsUdpMuxSocket { diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index 17be018f7..f92617f1f 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -34,6 +34,52 @@ using namespace std; #include #include +SrsUdpCasterListener::SrsUdpCasterListener() +{ + caster_ = new SrsMpegtsOverUdp(); + listener_ = new SrsUdpListener(caster_); +} + +SrsUdpCasterListener::~SrsUdpCasterListener() +{ + srs_freep(listener_); + srs_freep(caster_); +} + +srs_error_t SrsUdpCasterListener::initialize(SrsConfDirective* conf) +{ + srs_error_t err = srs_success; + + int port = _srs_config->get_stream_caster_listen(conf); + if (port <= 0) { + return srs_error_new(ERROR_STREAM_CASTER_PORT, "invalid port=%d", port); + } + + listener_->set_endpoint(srs_any_address_for_listener(), port)->set_label("MPEGTS"); + + if ((err = caster_->initialize(conf)) != srs_success) { + return srs_error_wrap(err, "init caster port=%d", port); + } + + return err; +} + +srs_error_t SrsUdpCasterListener::listen() +{ + srs_error_t err = srs_success; + + if ((err = listener_->listen()) != srs_success) { + return srs_error_wrap(err, "listen"); + } + + return err; +} + +void SrsUdpCasterListener::close() +{ + listener_->close(); +} + SrsMpegtsQueue::SrsMpegtsQueue() { nb_audios = nb_videos = 0; @@ -62,7 +108,7 @@ srs_error_t SrsMpegtsQueue::push(SrsSharedPtrMessage* msg) // adjust the ts, add 1ms. msg->timestamp += 1; - if (i >= 5) { + if (i >= 100) { srs_warn("mpegts: free the msg for dts exists, dts=%" PRId64, msg->timestamp); srs_freep(msg); return err; @@ -108,11 +154,10 @@ SrsSharedPtrMessage* SrsMpegtsQueue::dequeue() return NULL; } -SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) +SrsMpegtsOverUdp::SrsMpegtsOverUdp() { context = new SrsTsContext(); buffer = new SrsSimpleStream(); - output = _srs_config->get_stream_caster_output(c); sdk = NULL; @@ -137,6 +182,12 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp() srs_freep(pprint); } +srs_error_t SrsMpegtsOverUdp::initialize(SrsConfDirective* c) +{ + output = _srs_config->get_stream_caster_output(c); + return srs_success; +} + srs_error_t SrsMpegtsOverUdp::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) { char address_string[64]; @@ -223,7 +274,7 @@ srs_error_t SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int // process each ts packet if ((err = context->decode(stream, this)) != srs_success) { - srs_warn("parse ts packet err=%s", srs_error_desc(err).c_str()); + srs_info("parse ts packet err=%s", srs_error_desc(err).c_str()); srs_error_reset(err); continue; } diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index b930ffe11..e475d4b72 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -26,11 +26,27 @@ class SrsRawAacStream; struct SrsRawAacStreamCodec; class SrsPithyPrint; class SrsSimpleRtmpClient; +class SrsMpegtsOverUdp; #include #include #include +// A UDP listener, for udp stream caster server. +class SrsUdpCasterListener : public ISrsListener +{ +private: + SrsUdpListener* listener_; + SrsMpegtsOverUdp* caster_; +public: + SrsUdpCasterListener(); + virtual ~SrsUdpCasterListener(); +public: + srs_error_t initialize(SrsConfDirective* conf); + srs_error_t listen(); + void close(); +}; + // The queue for mpegts over udp to send packets. // For the aac in mpegts contains many flv packets in a pes packet, // we must recalc the timestamp. @@ -72,8 +88,10 @@ private: SrsMpegtsQueue* queue; SrsPithyPrint* pprint; public: - SrsMpegtsOverUdp(SrsConfDirective* c); + SrsMpegtsOverUdp(); virtual ~SrsMpegtsOverUdp(); +public: + srs_error_t initialize(SrsConfDirective* c); // Interface ISrsUdpHandler public: virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index c7369d33b..84385eaea 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -41,199 +41,6 @@ using namespace std; #include #endif -std::string srs_listener_type2string(SrsListenerType type) -{ - switch (type) { - case SrsListenerRtmpStream: - return "RTMP"; - case SrsListenerHttpApi: - return "HTTP-API"; - case SrsListenerHttpsApi: - return "HTTPS-API"; - case SrsListenerHttpStream: - return "HTTP-Server"; - case SrsListenerHttpsStream: - return "HTTPS-Server"; - case SrsListenerMpegTsOverUdp: - return "MPEG-TS over UDP"; - case SrsListenerFlv: - return "HTTP-FLV"; - case SrsListenerTcp: - return "TCP"; - default: - return "UNKONWN"; - } -} - -SrsListener::SrsListener(SrsServer* svr, SrsListenerType t) -{ - port = 0; - server = svr; - type = t; -} - -SrsListener::~SrsListener() -{ -} - -SrsListenerType SrsListener::listen_type() -{ - return type; -} - -SrsBufferListener::SrsBufferListener(SrsServer* svr, SrsListenerType t) : SrsListener(svr, t) -{ - listener = NULL; -} - -SrsBufferListener::~SrsBufferListener() -{ - srs_freep(listener); -} - -srs_error_t SrsBufferListener::listen(string i, int p) -{ - srs_error_t err = srs_success; - - ip = i; - port = p; - - srs_freep(listener); - listener = new SrsTcpListener(this, ip, port); - - if ((err = listener->listen()) != srs_success) { - return srs_error_wrap(err, "buffered tcp listen"); - } - - string v = srs_listener_type2string(type); - srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - - return err; -} - -srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd) -{ - srs_error_t err = server->accept_client(type, stfd); - if (err != srs_success) { - srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - - return srs_success; -} - -SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t) -{ - listener = NULL; - - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - srs_assert(type == SrsListenerFlv); - if (type == SrsListenerFlv) { - caster = new SrsAppCasterFlv(c); - } -} - -SrsHttpFlvListener::~SrsHttpFlvListener() -{ - srs_freep(caster); - srs_freep(listener); -} - -srs_error_t SrsHttpFlvListener::listen(string i, int p) -{ - srs_error_t err = srs_success; - - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - srs_assert(type == SrsListenerFlv); - - ip = i; - port = p; - - if ((err = caster->initialize()) != srs_success) { - return srs_error_wrap(err, "init caster %s:%d", ip.c_str(), port); - } - - srs_freep(listener); - listener = new SrsTcpListener(this, ip, port); - - if ((err = listener->listen()) != srs_success) { - return srs_error_wrap(err, "listen"); - } - - string v = srs_listener_type2string(type); - srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - - return err; -} - -srs_error_t SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd) -{ - srs_error_t err = caster->on_tcp_client(stfd); - if (err != srs_success) { - srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - - return err; -} - -SrsUdpStreamListener::SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c) : SrsListener(svr, t) -{ - listener = NULL; - caster = c; -} - -SrsUdpStreamListener::~SrsUdpStreamListener() -{ - srs_freep(listener); -} - -srs_error_t SrsUdpStreamListener::listen(string i, int p) -{ - srs_error_t err = srs_success; - - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - srs_assert(type == SrsListenerMpegTsOverUdp); - - ip = i; - port = p; - - srs_freep(listener); - listener = new SrsUdpListener(caster, ip, port); - - if ((err = listener->listen()) != srs_success) { - return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); - } - - // notify the handler the fd changed. - if ((err = caster->on_stfd_change(listener->stfd())) != srs_success) { - return srs_error_wrap(err, "notify fd change failed"); - } - - string v = srs_listener_type2string(type); - srs_trace("%s listen at udp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - - return err; -} - -SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsUdpStreamListener(svr, t, NULL) -{ - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - srs_assert(type == SrsListenerMpegTsOverUdp); - if (type == SrsListenerMpegTsOverUdp) { - caster = new SrsMpegtsOverUdp(c); - } -} - -SrsUdpCasterListener::~SrsUdpCasterListener() -{ - srs_freep(caster); -} - SrsSignalManager* SrsSignalManager::instance = NULL; SrsSignalManager::SrsSignalManager(SrsServer* s) @@ -506,14 +313,6 @@ srs_error_t SrsInotifyWorker::cycle() return err; } -ISrsServerCycle::ISrsServerCycle() -{ -} - -ISrsServerCycle::~ISrsServerCycle() -{ -} - SrsServer::SrsServer() { signal_reload = false; @@ -526,8 +325,14 @@ SrsServer::SrsServer() signal_manager = new SrsSignalManager(this); conn_manager = new SrsResourceManager("TCP", true); latest_version_ = new SrsLatestVersion(); - - handler = NULL; + rtmp_listener_ = new SrsMultipleTcpListeners(this); + api_listener_ = new SrsTcpListener(this); + apis_listener_ = new SrsTcpListener(this); + http_listener_ = new SrsTcpListener(this); + https_listener_ = new SrsTcpListener(this); + webrtc_listener_ = new SrsTcpListener(this); + stream_caster_flv_listener_ = new SrsHttpFlvListener(); + stream_caster_mpegts_ = new SrsUdpCasterListener(); ppid = ::getppid(); // donot new object in constructor, @@ -576,22 +381,30 @@ void SrsServer::destroy() srs_freep(signal_manager); srs_freep(latest_version_); srs_freep(conn_manager); + srs_freep(rtmp_listener_); + srs_freep(api_listener_); + srs_freep(apis_listener_); + srs_freep(http_listener_); + srs_freep(https_listener_); + srs_freep(webrtc_listener_); + srs_freep(stream_caster_flv_listener_); + srs_freep(stream_caster_mpegts_); } void SrsServer::dispose() { _srs_config->unsubscribe(this); - // prevent fresh clients. - close_listeners(SrsListenerRtmpStream); - close_listeners(SrsListenerHttpApi); - close_listeners(SrsListenerHttpsApi); - close_listeners(SrsListenerHttpStream); - close_listeners(SrsListenerHttpsStream); - close_listeners(SrsListenerMpegTsOverUdp); - close_listeners(SrsListenerFlv); - close_listeners(SrsListenerTcp); - + // Destroy all listeners. + rtmp_listener_->close(); + api_listener_->close(); + apis_listener_->close(); + http_listener_->close(); + https_listener_->close(); + webrtc_listener_->close(); + stream_caster_flv_listener_->close(); + stream_caster_mpegts_->close(); + // Fast stop to notify FFMPEG to quit, wait for a while then fast kill. ingester->dispose(); @@ -609,15 +422,15 @@ void SrsServer::gracefully_dispose() srs_usleep(_srs_config->get_grace_start_wait()); srs_trace("start wait for %dms", srsu2msi(_srs_config->get_grace_start_wait())); - // prevent fresh clients. - close_listeners(SrsListenerRtmpStream); - close_listeners(SrsListenerHttpApi); - close_listeners(SrsListenerHttpsApi); - close_listeners(SrsListenerHttpStream); - close_listeners(SrsListenerHttpsStream); - close_listeners(SrsListenerMpegTsOverUdp); - close_listeners(SrsListenerFlv); - close_listeners(SrsListenerTcp); + // Destroy all listeners. + rtmp_listener_->close(); + api_listener_->close(); + apis_listener_->close(); + http_listener_->close(); + https_listener_->close(); + webrtc_listener_->close(); + stream_caster_flv_listener_->close(); + stream_caster_mpegts_->close(); srs_trace("listeners closed"); // Fast stop to notify FFMPEG to quit, wait for a while then fast kill. @@ -644,7 +457,7 @@ void SrsServer::gracefully_dispose() srs_trace("final wait for %dms", srsu2msi(_srs_config->get_grace_final_wait())); } -srs_error_t SrsServer::initialize(ISrsServerCycle* ch) +srs_error_t SrsServer::initialize() { srs_error_t err = srs_success; @@ -653,11 +466,6 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch) // instead, subscribe handler in initialize method. srs_assert(_srs_config); _srs_config->subscribe(this); - - handler = ch; - if(handler && (err = handler->initialize()) != srs_success){ - return srs_error_wrap(err, "handler initialize"); - } bool stream = _srs_config->get_http_stream_enabled(); string http_listen = _srs_config->get_http_stream_listen(); @@ -742,51 +550,92 @@ srs_error_t SrsServer::initialize_signal() srs_error_t SrsServer::listen() { srs_error_t err = srs_success; - - if ((err = listen_rtmp()) != srs_success) { + + // Create RTMP listeners. + rtmp_listener_->add(_srs_config->get_listens())->set_label("RTMP"); + if ((err = rtmp_listener_->listen()) != srs_success) { return srs_error_wrap(err, "rtmp listen"); } - - if ((err = listen_http_api()) != srs_success) { - return srs_error_wrap(err, "http api listen"); - } - if ((err = listen_https_api()) != srs_success) { - return srs_error_wrap(err, "https api listen"); + // Create HTTP API listener. + if (_srs_config->get_http_api_enabled()) { + if (reuse_api_over_server_) { + srs_trace("HTTP-API: Reuse listen to http server %s", _srs_config->get_http_stream_listen().c_str()); + } else { + api_listener_->set_endpoint(_srs_config->get_http_api_listen())->set_label("HTTP-API"); + if ((err = api_listener_->listen()) != srs_success) { + return srs_error_wrap(err, "http api listen"); + } + } } - - if ((err = listen_http_stream()) != srs_success) { - return srs_error_wrap(err, "http stream listen"); + + // Create HTTPS API listener. + if (_srs_config->get_https_api_enabled()) { + if (reuse_api_over_server_) { + srs_trace("HTTPS-API: Reuse listen to http server %s", _srs_config->get_http_stream_listen().c_str()); + } else { + apis_listener_->set_endpoint(_srs_config->get_https_api_listen())->set_label("HTTPS-API"); + if ((err = apis_listener_->listen()) != srs_success) { + return srs_error_wrap(err, "https api listen"); + } + } } - if ((err = listen_https_stream()) != srs_success) { - return srs_error_wrap(err, "https stream listen"); + // Create HTTP server listener. + if (_srs_config->get_http_stream_enabled()) { + http_listener_->set_endpoint(_srs_config->get_http_stream_listen())->set_label("HTTP-Server"); + if ((err = http_listener_->listen()) != srs_success) { + return srs_error_wrap(err, "http server listen"); + } } - - if ((err = listen_stream_caster()) != srs_success) { - return srs_error_wrap(err, "stream caster listen"); + + // Create HTTP server listener. + if (_srs_config->get_https_stream_enabled()) { + https_listener_->set_endpoint(_srs_config->get_https_stream_listen())->set_label("HTTPS-Server"); + if ((err = https_listener_->listen()) != srs_success) { + return srs_error_wrap(err, "https server listen"); + } } + // Start WebRTC over TCP listener. #ifdef SRS_RTC - if (!reuse_rtc_over_server_) { - // TODO: FIXME: Refine the listeners. - close_listeners(SrsListenerTcp); - if (_srs_config->get_rtc_server_tcp_enabled()) { - SrsListener* listener = new SrsBufferListener(this, SrsListenerTcp); - listeners.push_back(listener); - - std::string ep = srs_int2str(_srs_config->get_rtc_server_tcp_listen()); + if (!reuse_rtc_over_server_ && _srs_config->get_rtc_server_tcp_enabled()) { + webrtc_listener_->set_endpoint(srs_int2str(_srs_config->get_rtc_server_tcp_listen()))->set_label("WebRTC"); + if ((err = webrtc_listener_->listen()) != srs_success) { + return srs_error_wrap(err, "webrtc tcp listen"); + } + } +#endif - std::string ip; - int port; - srs_parse_endpoint(ep, ip, port); + // Start all listeners for stream caster. + std::vector confs = _srs_config->get_stream_casters(); + for (vector::iterator it = confs.begin(); it != confs.end(); ++it) { + SrsConfDirective* conf = *it; + if (!_srs_config->get_stream_caster_enabled(conf)) { + continue; + } - if ((err = listener->listen(ip, port)) != srs_success) { - return srs_error_wrap(err, "tcp listen %s:%d", ip.c_str(), port); + ISrsListener* listener = NULL; + std::string caster = _srs_config->get_stream_caster_engine(conf); + if (srs_stream_caster_is_udp(caster)) { + listener = stream_caster_mpegts_; + if ((err = stream_caster_mpegts_->initialize(conf)) != srs_success) { + return srs_error_wrap(err, "initialize"); } + } else if (srs_stream_caster_is_flv(caster)) { + listener = stream_caster_flv_listener_; + if ((err = stream_caster_flv_listener_->initialize(conf)) != srs_success) { + return srs_error_wrap(err, "initialize"); + } + } else { + return srs_error_new(ERROR_STREAM_CASTER_ENGINE, "invalid caster %s", caster.c_str()); + } + + srs_assert(listener); + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "listen"); } } -#endif if ((err = conn_manager->start()) != srs_success) { return srs_error_wrap(err, "connection manager"); @@ -1011,10 +860,6 @@ void SrsServer::on_signal(int signo) if (signo == SRS_SIGNAL_REOPEN_LOG) { _srs_log->reopen(); - if (handler) { - handler->on_logrotate(); - } - srs_warn("reopen log file, signo=%d", signo); return; } @@ -1071,10 +916,6 @@ srs_error_t SrsServer::do_cycle() if ((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "pull"); } - - if (handler && (err = handler->on_cycle()) != srs_success) { - return srs_error_wrap(err, "handle callback"); - } // asprocess check. if (asprocess && ::getppid() != ppid) { @@ -1191,202 +1032,6 @@ srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick) return err; } -srs_error_t SrsServer::listen_rtmp() -{ - srs_error_t err = srs_success; - - // stream service port. - std::vector ip_ports = _srs_config->get_listens(); - srs_assert((int)ip_ports.size() > 0); - - close_listeners(SrsListenerRtmpStream); - - for (int i = 0; i < (int)ip_ports.size(); i++) { - SrsListener* listener = new SrsBufferListener(this, SrsListenerRtmpStream); - listeners.push_back(listener); - - int port; string ip; - srs_parse_endpoint(ip_ports[i], ip, port); - - if ((err = listener->listen(ip, port)) != srs_success) { - srs_error_wrap(err, "rtmp listen %s:%d", ip.c_str(), port); - } - } - - return err; -} - -srs_error_t SrsServer::listen_http_api() -{ - srs_error_t err = srs_success; - - close_listeners(SrsListenerHttpApi); - - // Ignore if not enabled. - if (!_srs_config->get_http_api_enabled()) { - return err; - } - - // Ignore if reuse same port to http server. - if (reuse_api_over_server_) { - srs_trace("HTTP-API: Reuse listen to http server %s", _srs_config->get_http_stream_listen().c_str()); - return err; - } - - // Listen at a dedicated HTTP API endpoint. - SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpApi); - listeners.push_back(listener); - - std::string ep = _srs_config->get_http_api_listen(); - - std::string ip; - int port; - srs_parse_endpoint(ep, ip, port); - - if ((err = listener->listen(ip, port)) != srs_success) { - return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port); - } - - return err; -} - -srs_error_t SrsServer::listen_https_api() -{ - srs_error_t err = srs_success; - - close_listeners(SrsListenerHttpsApi); - - // Ignore if not enabled. - if (!_srs_config->get_https_api_enabled()) { - return err; - } - - // Ignore if reuse same port to https server. - if (reuse_api_over_server_) { - srs_trace("HTTPS-API: Reuse listen to https server %s", _srs_config->get_https_stream_listen().c_str()); - return err; - } - - // Listen at a dedicated HTTPS API endpoint. - SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpsApi); - listeners.push_back(listener); - - std::string ep = _srs_config->get_https_api_listen(); - - std::string ip; - int port; - srs_parse_endpoint(ep, ip, port); - - if ((err = listener->listen(ip, port)) != srs_success) { - return srs_error_wrap(err, "https api listen %s:%d", ip.c_str(), port); - } - - return err; -} - -srs_error_t SrsServer::listen_http_stream() -{ - srs_error_t err = srs_success; - - close_listeners(SrsListenerHttpStream); - if (_srs_config->get_http_stream_enabled()) { - SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpStream); - listeners.push_back(listener); - - std::string ep = _srs_config->get_http_stream_listen(); - - std::string ip; - int port; - srs_parse_endpoint(ep, ip, port); - - if ((err = listener->listen(ip, port)) != srs_success) { - return srs_error_wrap(err, "http stream listen %s:%d", ip.c_str(), port); - } - } - - return err; -} - -srs_error_t SrsServer::listen_https_stream() -{ - srs_error_t err = srs_success; - - close_listeners(SrsListenerHttpsStream); - if (_srs_config->get_https_stream_enabled()) { - SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpsStream); - listeners.push_back(listener); - - std::string ep = _srs_config->get_https_stream_listen(); - - std::string ip; - int port; - srs_parse_endpoint(ep, ip, port); - - if ((err = listener->listen(ip, port)) != srs_success) { - return srs_error_wrap(err, "https stream listen %s:%d", ip.c_str(), port); - } - } - - return err; -} - -srs_error_t SrsServer::listen_stream_caster() -{ - srs_error_t err = srs_success; - - close_listeners(SrsListenerMpegTsOverUdp); - - std::vector::iterator it; - std::vector stream_casters = _srs_config->get_stream_casters(); - - for (it = stream_casters.begin(); it != stream_casters.end(); ++it) { - SrsConfDirective* stream_caster = *it; - if (!_srs_config->get_stream_caster_enabled(stream_caster)) { - continue; - } - - SrsListener* listener = NULL; - - std::string caster = _srs_config->get_stream_caster_engine(stream_caster); - if (srs_stream_caster_is_udp(caster)) { - listener = new SrsUdpCasterListener(this, SrsListenerMpegTsOverUdp, stream_caster); - } else if (srs_stream_caster_is_flv(caster)) { - listener = new SrsHttpFlvListener(this, SrsListenerFlv, stream_caster); - } else { - return srs_error_new(ERROR_STREAM_CASTER_ENGINE, "invalid caster %s", caster.c_str()); - } - srs_assert(listener != NULL); - - listeners.push_back(listener); - int port = _srs_config->get_stream_caster_listen(stream_caster); - if (port <= 0) { - return srs_error_new(ERROR_STREAM_CASTER_PORT, "invalid port=%d", port); - } - // TODO: support listen at <[ip:]port> - if ((err = listener->listen(srs_any_address_for_listener(), port)) != srs_success) { - return srs_error_wrap(err, "listen at %d", port); - } - } - - return err; -} - -void SrsServer::close_listeners(SrsListenerType type) -{ - std::vector::iterator it; - for (it = listeners.begin(); it != listeners.end();) { - SrsListener* listener = *it; - - if (listener->listen_type() != type) { - ++it; - continue; - } - - srs_freep(listener); - it = listeners.erase(it); - } -} - void SrsServer::resample_kbps() { SrsStatistic* stat = SrsStatistic::instance(); @@ -1423,94 +1068,57 @@ void SrsServer::resample_kbps() stat->kbps_sample(); } -srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) +ISrsHttpServeMux* SrsServer::api_server() { - srs_error_t err = srs_success; - - ISrsResource* resource = NULL; - - if ((err = fd_to_resource(type, stfd, &resource)) != srs_success) { - srs_close_stfd(stfd); - if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) { - srs_error_reset(err); - return srs_success; - } - return srs_error_wrap(err, "fd to resource"); - } - - // Ignore if no resource found. - if (!resource) { - return err; - } - - // directly enqueue, the cycle thread will remove the client. - conn_manager->add(resource); - - ISrsStartable* conn = dynamic_cast(resource); - if ((err = conn->start()) != srs_success) { - return srs_error_wrap(err, "start conn coroutine"); - } - - return err; + return http_api_mux; } -ISrsHttpServeMux* SrsServer::api_server() +srs_error_t SrsServer::on_tcp_client(ISrsListener* listener, srs_netfd_t stfd) { - return http_api_mux; + srs_error_t err = do_on_tcp_client(listener, stfd); + + // We always try to close the stfd, because it should be NULL if it has been handled or closed. + srs_close_stfd(stfd); + + return err; } -srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, ISrsResource** pr) +srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stfd) { srs_error_t err = srs_success; - + int fd = srs_netfd_fileno(stfd); string ip = srs_get_peer_ip(fd); int port = srs_get_peer_port(fd); - - // for some keep alive application, for example, the keepalived, - // will send some tcp packet which we cann't got the ip, - // we just ignore it. + + // Ignore if ip is empty, for example, load balancer keepalive. if (ip.empty()) { + if (_srs_config->empty_ip_ok()) return err; return srs_error_new(ERROR_SOCKET_GET_PEER_IP, "ignore empty ip, fd=%d", fd); } - - // check connection limitation. - int max_connections = _srs_config->get_max_connections(); - if (handler && (err = handler->on_accept_client(max_connections, (int)conn_manager->size())) != srs_success) { - return srs_error_wrap(err, "drop client fd=%d, ip=%s:%d, max=%d, cur=%d for err: %s", - fd, ip.c_str(), port, max_connections, (int)conn_manager->size(), srs_error_desc(err).c_str()); - } - if ((int)conn_manager->size() >= max_connections) { - return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits", - fd, ip.c_str(), port, max_connections, (int)conn_manager->size()); - } - - // avoid fd leak when fork. - // @see https://github.com/ossrs/srs/issues/518 - if (true) { - int val; - if ((val = fcntl(fd, F_GETFD, 0)) < 0) { - return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fnctl F_GETFD error! fd=%d", fd); - } - val |= FD_CLOEXEC; - if (fcntl(fd, F_SETFD, val) < 0) { - return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "fcntl F_SETFD error! fd=%d", fd); - } + + // Security or system flow control check. + if ((err = on_before_connection(stfd, ip, port)) != srs_success) { + return srs_error_wrap(err, "check"); } - // We will free the stfd from now on. - srs_netfd_t fd2 = stfd; - stfd = NULL; + // Covert handler to resource. + ISrsResource* resource = NULL; // The context id may change during creating the bellow objects. SrsContextRestore(_srs_context->get_id()); + // From now on, we always handle the stfd, so we set the original one to NULL. + srs_netfd_t stfd2 = stfd; + stfd = NULL; + #ifdef SRS_RTC // If reuse HTTP server with WebRTC TCP, peek to detect the client. - if (reuse_rtc_over_server_ && (type == SrsListenerHttpStream || type == SrsListenerHttpsStream)) { - SrsTcpConnection* skt = new SrsTcpConnection(fd2); + if (reuse_rtc_over_server_ && (listener == http_listener_ || listener == https_listener_)) { + SrsTcpConnection* skt = new SrsTcpConnection(stfd2); SrsBufferedReadWriter* io = new SrsBufferedReadWriter(skt); + // Peek first N bytes to finger out the real client type. uint8_t b[10]; int nn = sizeof(b); if ((err = io->peek((char*)b, &nn)) != srs_success) { srs_freep(io); srs_freep(skt); @@ -1527,30 +1135,71 @@ srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, I if (nn == 10 && b[0] == 0 && b[2] == 0 && b[3] == 1 && b[1] - b[5] == 20 && b[6] == 0x21 && b[7] == 0x12 && b[8] == 0xa4 && b[9] == 0x42 ) { - *pr = new SrsRtcTcpConn(io, ip, port, this); + resource = new SrsRtcTcpConn(io, ip, port, this); } else { - *pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, io, http_server, ip, port); + resource = new SrsHttpxConn(listener == http_listener_, this, io, http_server, ip, port); } - return err; } #endif - - if (type == SrsListenerRtmpStream) { - *pr = new SrsRtmpConn(this, fd2, ip, port); - } else if (type == SrsListenerHttpApi || type == SrsListenerHttpsApi) { - *pr = new SrsHttpxConn(type == SrsListenerHttpsApi, this, new SrsTcpConnection(fd2), http_api_mux, ip, port); - } else if (type == SrsListenerHttpStream || type == SrsListenerHttpsStream) { - *pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, new SrsTcpConnection(fd2), http_server, ip, port); -#ifdef SRS_RTC - } else if (type == SrsListenerTcp) { - *pr = new SrsRtcTcpConn(new SrsTcpConnection(fd2), ip, port, this); -#endif - } else { - srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port); - srs_close_stfd(fd2); - return err; + + // Create resource by normal listeners. + if (!resource) { + if (listener == rtmp_listener_) { + resource = new SrsRtmpConn(this, stfd2, ip, port); + } else if (listener == api_listener_ || listener == apis_listener_) { + bool is_https = listener == apis_listener_; + resource = new SrsHttpxConn(is_https, this, new SrsTcpConnection(stfd2), http_api_mux, ip, port); + } else if (listener == http_listener_ || listener == https_listener_) { + bool is_https = listener == https_listener_; + resource = new SrsHttpxConn(is_https, this, new SrsTcpConnection(stfd2), http_server, ip, port); + } else if (listener == webrtc_listener_) { + resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port, this); + } else { + srs_close_stfd(stfd2); + srs_warn("Close for invalid fd=%d, ip=%s:%d", fd, ip.c_str(), port); + return err; + } } - + + // Use connection manager to manage all the resources. + conn_manager->add(resource); + + // If connection is a resource to start, start a coroutine to handle it. + ISrsStartable* conn = dynamic_cast(resource); + if ((err = conn->start()) != srs_success) { + return srs_error_wrap(err, "start conn coroutine"); + } + + return err; +} + +srs_error_t SrsServer::on_before_connection(srs_netfd_t& stfd, const std::string& ip, int port) +{ + srs_error_t err = srs_success; + + int fd = srs_netfd_fileno(stfd); + + // Failed if exceed the connection limitation. + int max_connections = _srs_config->get_max_connections(); + + if ((int)conn_manager->size() >= max_connections) { + return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits", + fd, ip.c_str(), port, max_connections, (int)conn_manager->size()); + } + + // Set to close the fd when forking, to avoid fd leak when start a process. + // See https://github.com/ossrs/srs/issues/518 + if (true) { + int val; + if ((val = fcntl(fd, F_GETFD, 0)) < 0) { + return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fnctl F_GETFD error! fd=%d", fd); + } + val |= FD_CLOEXEC; + if (fcntl(fd, F_SETFD, val) < 0) { + return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "fcntl F_SETFD error! fd=%d", fd); + } + } + return err; } @@ -1616,7 +1265,7 @@ srs_error_t SrsServerAdapter::run(SrsWaitGroup* wg) srs_error_t err = srs_success; // Initialize the whole system, set hooks to handle server level events. - if ((err = srs->initialize(NULL)) != srs_success) { + if ((err = srs->initialize()) != srs_success) { return srs_error_wrap(err, "server initialize"); } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index e3df09a67..4718ceb2a 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -37,97 +37,9 @@ class SrsAppCasterFlv; class SrsResourceManager; class SrsLatestVersion; class SrsWaitGroup; - -// The listener type for server to identify the connection, -// that is, use different type to process the connection. -enum SrsListenerType -{ - // RTMP client, - SrsListenerRtmpStream = 0, - // HTTP api, - SrsListenerHttpApi = 1, - // HTTP stream, HDS/HLS/DASH - SrsListenerHttpStream = 2, - // UDP stream, MPEG-TS over udp. - SrsListenerMpegTsOverUdp = 3, - // TCP stream, FLV stream over HTTP. - SrsListenerFlv = 5, - // HTTPS api, - SrsListenerHttpsApi = 8, - // HTTPS stream, - SrsListenerHttpsStream = 9, - // WebRTC over TCP, - SrsListenerTcp = 10, -}; - -// A common tcp listener, for RTMP/HTTP server. -class SrsListener -{ -protected: - SrsListenerType type; -protected: - std::string ip; - int port; - SrsServer* server; -public: - SrsListener(SrsServer* svr, SrsListenerType t); - virtual ~SrsListener(); -public: - virtual SrsListenerType listen_type(); - virtual srs_error_t listen(std::string i, int p) = 0; -}; - -// A buffered TCP listener. -class SrsBufferListener : public SrsListener, public ISrsTcpHandler -{ -private: - SrsTcpListener* listener; -public: - SrsBufferListener(SrsServer* server, SrsListenerType type); - virtual ~SrsBufferListener(); -public: - virtual srs_error_t listen(std::string ip, int port); -// Interface ISrsTcpHandler -public: - virtual srs_error_t on_tcp_client(srs_netfd_t stfd); -}; - -// A TCP listener, for flv stream server. -class SrsHttpFlvListener : public SrsListener, public ISrsTcpHandler -{ -private: - SrsTcpListener* listener; - SrsAppCasterFlv* caster; -public: - SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); - virtual ~SrsHttpFlvListener(); -public: - virtual srs_error_t listen(std::string i, int p); -// Interface ISrsTcpHandler -public: - virtual srs_error_t on_tcp_client(srs_netfd_t stfd); -}; - -// A UDP listener, for udp server. -class SrsUdpStreamListener : public SrsListener -{ -protected: - SrsUdpListener* listener; - ISrsUdpHandler* caster; -public: - SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c); - virtual ~SrsUdpStreamListener(); -public: - virtual srs_error_t listen(std::string i, int p); -}; - -// A UDP listener, for udp stream caster server. -class SrsUdpCasterListener : public SrsUdpStreamListener -{ -public: - SrsUdpCasterListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); - virtual ~SrsUdpCasterListener(); -}; +class SrsMultipleTcpListeners; +class SrsHttpFlvListener; +class SrsUdpCasterListener; // Convert signal to io, // @see: st-1.9/docs/notes.html @@ -176,37 +88,15 @@ public: virtual srs_error_t cycle(); }; -// A handler to the handle cycle in SRS RTMP server. -class ISrsServerCycle -{ -public: - ISrsServerCycle(); - virtual ~ISrsServerCycle(); -public: - // Initialize the cycle handler. - virtual srs_error_t initialize() = 0; - // Do on_cycle while server doing cycle. - virtual srs_error_t on_cycle() = 0; - // Callback the handler when got client. - virtual srs_error_t on_accept_client(int max, int cur) = 0; - // Callback for logrotate. - virtual void on_logrotate() = 0; -}; - // TODO: FIXME: Rename to SrsLiveServer. // SRS RTMP server, initialize and listen, start connection service thread, destroy client. -class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler - , public ISrsResourceManager, public ISrsCoroutineHandler - , public ISrsHourGlass +class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler, public ISrsTcpHandler + , public ISrsResourceManager, public ISrsCoroutineHandler, public ISrsHourGlass { private: // TODO: FIXME: Extract an HttpApiServer. ISrsHttpServeMux* http_api_mux; SrsHttpServer* http_server; - // If reusing, HTTP API use the same port of HTTP server. - bool reuse_api_over_server_; - // If reusing, WebRTC TCP use the same port of HTTP server. - bool reuse_rtc_over_server_; private: SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; @@ -220,14 +110,34 @@ private: // for the server never delete the file; when system startup, the pid in pid file // maybe valid but the process is not SRS, the init.d script will never start server. int pid_fd; - // All listners, listener manager. - std::vector listeners; +private: + // If reusing, HTTP API use the same port of HTTP server. + bool reuse_api_over_server_; + // If reusing, WebRTC TCP use the same port of HTTP server. + bool reuse_rtc_over_server_; + // RTMP stream listeners, over TCP. + SrsMultipleTcpListeners* rtmp_listener_; + // HTTP API listener, over TCP. Please note that it might reuse with stream listener. + SrsTcpListener* api_listener_; + // HTTPS API listener, over TCP. Please note that it might reuse with stream listener. + SrsTcpListener* apis_listener_; + // HTTP server listener, over TCP. Please note that request of both HTTP static and stream are served by this + // listener, and it might be reused by HTTP API and WebRTC TCP. + SrsTcpListener* http_listener_; + // HTTPS server listener, over TCP. Please note that request of both HTTP static and stream are served by this + // listener, and it might be reused by HTTP API and WebRTC TCP. + SrsTcpListener* https_listener_; + // WebRTC over TCP listener. Please note that there is always a UDP listener by RTC server. + SrsTcpListener* webrtc_listener_; + // Stream Caster for push over HTTP-FLV. + SrsHttpFlvListener* stream_caster_flv_listener_; + // Stream Caster for push over MPEGTS-UDP + SrsUdpCasterListener* stream_caster_mpegts_; +private: // Signal manager which convert gignal to io message. SrsSignalManager* signal_manager; // To query the latest available version of SRS. SrsLatestVersion* latest_version_; - // Handle in server cycle. - ISrsServerCycle* handler; // User send the signal, convert to variable. bool signal_reload; bool signal_persistence_config; @@ -254,7 +164,7 @@ private: public: // Initialize server with callback handler ch. // @remark user must free the handler. - virtual srs_error_t initialize(ISrsServerCycle* ch); + virtual srs_error_t initialize(); virtual srs_error_t initialize_st(); virtual srs_error_t initialize_signal(); virtual srs_error_t listen(); @@ -293,29 +203,18 @@ private: virtual srs_error_t setup_ticks(); virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); private: - // listen at specified protocol. - virtual srs_error_t listen_rtmp(); - virtual srs_error_t listen_http_api(); - virtual srs_error_t listen_https_api(); - virtual srs_error_t listen_http_stream(); - virtual srs_error_t listen_https_stream(); - virtual srs_error_t listen_stream_caster(); - // Close the listeners for specified type, - // Remove the listen object from manager. - virtual void close_listeners(SrsListenerType type); // Resample the server kbs. virtual void resample_kbps(); // For internal only public: - // When listener got a fd, notice server to accept it. - // @param type, the client type, used to create concrete connection, - // for instance RTMP connection to serve client. - // @param stfd, the client fd in st boxed, the underlayer fd. - virtual srs_error_t accept_client(SrsListenerType type, srs_netfd_t stfd); // TODO: FIXME: Fetch from hybrid server manager. virtual ISrsHttpServeMux* api_server(); +// Interface ISrsTcpHandler +public: + virtual srs_error_t on_tcp_client(ISrsListener* listener, srs_netfd_t stfd); private: - virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, ISrsResource** pr); + virtual srs_error_t do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stfd); + virtual srs_error_t on_before_connection(srs_netfd_t& stfd, const std::string& ip, int port); // Interface ISrsResourceManager public: // A callback for connection to remove itself. diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index 3b564dbe1..081368865 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -76,7 +76,7 @@ public: srs_close_stfd(fd); } public: - virtual srs_error_t on_tcp_client(srs_netfd_t stfd) { + virtual srs_error_t on_tcp_client(ISrsListener* listener, srs_netfd_t stfd) { fd = stfd; return srs_success; } @@ -87,15 +87,16 @@ VOID TEST(TCPServerTest, PingPong) srs_error_t err; if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); - EXPECT_TRUE(l.fd() > 0); } if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); @@ -107,7 +108,8 @@ VOID TEST(TCPServerTest, PingPong) if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); @@ -128,7 +130,8 @@ VOID TEST(TCPServerTest, PingPong) if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); @@ -151,7 +154,8 @@ VOID TEST(TCPServerTest, PingPong) if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); @@ -185,7 +189,8 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); @@ -206,7 +211,8 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); @@ -227,7 +233,8 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); @@ -406,7 +413,8 @@ VOID TEST(TCPServerTest, WritevIOVC) if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); @@ -435,7 +443,8 @@ VOID TEST(TCPServerTest, WritevIOVC) if (true) { MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); @@ -1516,7 +1525,8 @@ VOID TEST(ThreadCriticalTest, FailIfCloseActiveFD) srs_error_t err; MockTcpHandler h; - SrsTcpListener l(&h, _srs_tmp_host, _srs_tmp_port); + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); HELPER_EXPECT_SUCCESS(l.listen()); SrsTcpClient c0(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout);