diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 11a4f1a03..1f6b7a16c 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -25,23 +25,66 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifdef SRS_AUTO_STREAM_CASTER +#include +using namespace std; + #include #include #include #include #include +#include +#include SrsAppCasterFlv::SrsAppCasterFlv(SrsConfDirective* c) { + http_mux = new SrsHttpServeMux(); + output = _srs_config->get_stream_caster_output(c); } SrsAppCasterFlv::~SrsAppCasterFlv() { } +int SrsAppCasterFlv::initialize() +{ + int ret = ERROR_SUCCESS; + + if ((ret = http_mux->handle("/", this)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + int SrsAppCasterFlv::on_tcp_client(st_netfd_t stfd) { int ret = ERROR_SUCCESS; + + SrsHttpConn* conn = new SrsHttpConn(this, stfd, http_mux); + conns.push_back(conn); + + if ((ret = conn->start()) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +void SrsAppCasterFlv::remove(SrsConnection* c) +{ + std::vector::iterator it; + if ((it = std::find(conns.begin(), conns.end(), c)) != conns.end()) { + conns.erase(it); + } +} + +int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) +{ + int ret = ERROR_SUCCESS; + + srs_trace("flv: handle request at %s", r->path().c_str()); + return ret; } diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index 6d0015e8c..fb092f5b2 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -30,21 +30,41 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include +#include + #ifdef SRS_AUTO_STREAM_CASTER class SrsConfDirective; +class SrsHttpServeMux; +class SrsHttpConn; #include #include +#include +#include -class SrsAppCasterFlv : public ISrsTcpHandler +class SrsAppCasterFlv : virtual public ISrsTcpHandler + , virtual public IConnectionManager, virtual public ISrsHttpHandler { +private: + std::string output; + SrsHttpServeMux* http_mux; + std::vector conns; public: SrsAppCasterFlv(SrsConfDirective* c); virtual ~SrsAppCasterFlv(); +public: + virtual int initialize(); // ISrsTcpHandler public: virtual int on_tcp_client(st_netfd_t stfd); +// IConnectionManager +public: + virtual void remove(SrsConnection* c); +// ISrsHttpHandler +public: + virtual int serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r); }; #endif diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 656cbe031..4c483ba1c 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -25,14 +25,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#include #include -SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd) +IConnectionManager::IConnectionManager() +{ +} + +IConnectionManager::~IConnectionManager() +{ +} + +SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) { id = 0; - server = srs_server; - stfd = client_stfd; + manager = cm; + stfd = c; // the client thread should reap itself, // so we never use joinable. @@ -86,7 +93,7 @@ int SrsConnection::cycle() void SrsConnection::on_thread_stop() { // TODO: FIXME: never remove itself, use isolate thread to do cleanup. - server->remove(this); + manager->remove(this); } int SrsConnection::srs_id() diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 8cff99a18..29a6eace7 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -36,7 +36,22 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -class SrsServer; +class SrsConnection; + +/** + * the manager for connection. + */ +class IConnectionManager +{ +public: + IConnectionManager(); + virtual ~IConnectionManager(); +public: + /** + * remove the specified connection. + */ + virtual void remove(SrsConnection* c) = 0; +}; /** * the basic connection of SRS, @@ -57,9 +72,9 @@ private: int id; protected: /** - * the server object to manage the connection. + * the manager object to manage the connection. */ - SrsServer* server; + IConnectionManager* manager; /** * the underlayer st fd handler. */ @@ -69,7 +84,7 @@ protected: */ std::string ip; public: - SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd); + SrsConnection(IConnectionManager* cm, st_netfd_t c); virtual ~SrsConnection(); public: /** diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 5c95af7b0..0f3aac193 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -473,8 +473,8 @@ int SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r) return srs_go_http_response_json(w, ss.str()); } -SrsHttpApi::SrsHttpApi(SrsServer* svr, st_netfd_t fd, SrsHttpServeMux* m) - : SrsConnection(svr, fd) +SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) + : SrsConnection(cm, fd) { mux = m; parser = new SrsHttpParser(); diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index 6251fa30c..66e971088 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -166,7 +166,7 @@ private: SrsHttpServeMux* mux; bool crossdomain_required; public: - SrsHttpApi(SrsServer* svr, st_netfd_t fd, SrsHttpServeMux* m); + SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); virtual ~SrsHttpApi(); // interface IKbpsDelta public: diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 43fa69e0b..c6f4b94b5 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -1334,11 +1334,11 @@ int SrsHttpServer::initialize_hls_streaming() return ret; } -SrsHttpConn::SrsHttpConn(SrsServer* svr, st_netfd_t fd, SrsHttpServer* m) - : SrsConnection(svr, fd) +SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) + : SrsConnection(cm, fd) { parser = new SrsHttpParser(); - http_server = m; + http_mux = m; } SrsHttpConn::~SrsHttpConn() @@ -1424,7 +1424,7 @@ int SrsHttpConn::process_request(ISrsHttpResponseWriter* w, SrsHttpMessage* r) r->method_str().c_str(), r->url().c_str(), r->content_length()); // use default server mux to serve http request. - if ((ret = http_server->mux.serve_http(w, r)) != ERROR_SUCCESS) { + if ((ret = http_mux->serve_http(w, r)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("serve http msg failed. ret=%d", ret); } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 418364294..63f87a5ff 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -39,6 +39,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +class SrsServer; class SrsSource; class SrsRequest; class SrsConsumer; @@ -375,9 +376,9 @@ class SrsHttpConn : public SrsConnection { private: SrsHttpParser* parser; - SrsHttpServer* http_server; + SrsHttpServeMux* http_mux; public: - SrsHttpConn(SrsServer* svr, st_netfd_t fd, SrsHttpServer* m); + SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); virtual ~SrsHttpConn(); // interface IKbpsDelta public: diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 588a6cdf4..6b0583f49 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -75,12 +75,13 @@ using namespace std; // when edge timeout, retry next. #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL) -SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd) - : SrsConnection(srs_server, client_stfd) +SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) + : SrsConnection(svr, c) { + server = svr; req = new SrsRequest(); res = new SrsResponse(); - skt = new SrsStSocket(client_stfd); + skt = new SrsStSocket(c); rtmp = new SrsRtmpServer(skt); refer = new SrsRefer(); bandwidth = new SrsBandwidth(); diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 7c1a2205a..39aad6e0e 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +class SrsServer; class SrsRtmpServer; class SrsRequest; class SrsResponse; @@ -61,6 +62,7 @@ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandl // for the thread to directly access any field of connection. friend class SrsPublishRecvThread; private: + SrsServer* server; SrsRequest* req; SrsResponse* res; SrsStSocket* skt; @@ -81,7 +83,7 @@ private: // @see https://github.com/simple-rtmp-server/srs/issues/257 bool realtime; public: - SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd); + SrsRtmpConn(SrsServer* svr, st_netfd_t c); virtual ~SrsRtmpConn(); protected: virtual int do_cycle(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index b522c2545..851c8023c 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -106,6 +106,8 @@ std::string srs_listener_type2string(SrsListenerType type) return "MPEG-TS over UDP"; case SrsListenerRtsp: return "RTSP"; + case SrsListenerFlv: + return "HTTP-FLV"; default: return "UNKONWN"; } @@ -238,8 +240,8 @@ SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* server, SrsListenerType type, // the caller already ensure the type is ok, // we just assert here for unknown stream caster. - srs_assert(_type == SrsListenerRtsp); - if (_type == SrsListenerRtsp) { + srs_assert(_type == SrsListenerFlv); + if (_type == SrsListenerFlv) { caster = new SrsAppCasterFlv(c); } } @@ -256,11 +258,15 @@ int SrsHttpFlvListener::listen(string ip, int port) // the caller already ensure the type is ok, // we just assert here for unknown stream caster. - srs_assert(_type == SrsListenerRtsp); + srs_assert(_type == SrsListenerFlv); _ip = ip; _port = port; + if ((ret = caster->initialize()) != ERROR_SUCCESS) { + return ret; + } + srs_freep(listener); listener = new SrsTcpListener(this, ip, port); @@ -1157,7 +1163,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) #endif } else if (type == SrsListenerHttpStream) { #ifdef SRS_AUTO_HTTP_SERVER - conn = new SrsHttpConn(this, client_stfd, http_stream_mux); + conn = new SrsHttpConn(this, client_stfd, &http_stream_mux->mux); #else srs_warn("close http client for server not support http-server"); srs_close_stfd(client_stfd); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index b2ecafbb1..1ff7cee90 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include class SrsServer; class SrsConnection; @@ -51,6 +52,9 @@ class ISrsTcpHandler; class ISrsUdpHandler; class SrsUdpListener; class SrsTcpListener; +#ifdef SRS_AUTO_STREAM_CASTER +class SrsAppCasterFlv; +#endif // listener type for server to identify the connection, // that is, use different type to process the connection. @@ -66,7 +70,7 @@ enum SrsListenerType SrsListenerMpegTsOverUdp = 3, // TCP stream, RTSP stream. SrsListenerRtsp = 4, - // HTTP stream, FLV over HTTP POST. + // TCP stream, FLV stream over HTTP. SrsListenerFlv = 5, }; @@ -126,13 +130,13 @@ public: }; /** - * the tcp listener, for http flv server. + * the tcp listener, for flv stream server. */ class SrsHttpFlvListener : virtual public SrsListener, virtual public ISrsTcpHandler { private: SrsTcpListener* listener; - ISrsTcpHandler* caster; + SrsAppCasterFlv* caster; public: SrsHttpFlvListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c); virtual ~SrsHttpFlvListener(); @@ -215,6 +219,7 @@ public: */ class SrsServer : virtual public ISrsReloadHandler , virtual public ISrsSourceHandler, virtual public ISrsHlsHandler + , virtual public IConnectionManager { private: #ifdef SRS_AUTO_HTTP_API @@ -279,7 +284,7 @@ public: virtual int http_handle(); virtual int ingest(); virtual int cycle(); -// server utility +// IConnectionManager public: /** * callback for connection to remove itself. @@ -287,6 +292,8 @@ public: * @see SrsConnection.on_thread_stop(). */ virtual void remove(SrsConnection* conn); +// server utilities. +public: /** * callback for signal manager got a signal. * the signal manager convert signal to io message,