refine the http remux for http flv stream.

pull/133/head
winlin 10 years ago
parent f0c24eeacc
commit 022b6aa561

@ -25,23 +25,66 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#ifdef SRS_AUTO_STREAM_CASTER
#include <algorithm>
using namespace std;
#include <srs_app_config.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_app_config.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_http.hpp>
#include <srs_app_http_conn.hpp>
SrsAppCasterFlv::SrsAppCasterFlv(SrsConfDirective* c)
{
http_mux = new SrsHttpServeMux();
output = _srs_config->get_stream_caster_output(c);
}
SrsAppCasterFlv::~SrsAppCasterFlv()
{
}
int SrsAppCasterFlv::initialize()
{
int ret = ERROR_SUCCESS;
if ((ret = http_mux->handle("/", this)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsAppCasterFlv::on_tcp_client(st_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
SrsHttpConn* conn = new SrsHttpConn(this, stfd, http_mux);
conns.push_back(conn);
if ((ret = conn->start()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
void SrsAppCasterFlv::remove(SrsConnection* c)
{
std::vector<SrsHttpConn*>::iterator it;
if ((it = std::find(conns.begin(), conns.end(), c)) != conns.end()) {
conns.erase(it);
}
}
int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r)
{
int ret = ERROR_SUCCESS;
srs_trace("flv: handle request at %s", r->path().c_str());
return ret;
}

@ -30,21 +30,41 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <string>
#include <vector>
#ifdef SRS_AUTO_STREAM_CASTER
class SrsConfDirective;
class SrsHttpServeMux;
class SrsHttpConn;
#include <srs_app_st.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_http.hpp>
class SrsAppCasterFlv : public ISrsTcpHandler
class SrsAppCasterFlv : virtual public ISrsTcpHandler
, virtual public IConnectionManager, virtual public ISrsHttpHandler
{
private:
std::string output;
SrsHttpServeMux* http_mux;
std::vector<SrsHttpConn*> conns;
public:
SrsAppCasterFlv(SrsConfDirective* c);
virtual ~SrsAppCasterFlv();
public:
virtual int initialize();
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
// IConnectionManager
public:
virtual void remove(SrsConnection* c);
// ISrsHttpHandler
public:
virtual int serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r);
};
#endif

@ -25,14 +25,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_server.hpp>
#include <srs_app_utility.hpp>
SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd)
IConnectionManager::IConnectionManager()
{
}
IConnectionManager::~IConnectionManager()
{
}
SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
{
id = 0;
server = srs_server;
stfd = client_stfd;
manager = cm;
stfd = c;
// the client thread should reap itself,
// so we never use joinable.
@ -86,7 +93,7 @@ int SrsConnection::cycle()
void SrsConnection::on_thread_stop()
{
// TODO: FIXME: never remove itself, use isolate thread to do cleanup.
server->remove(this);
manager->remove(this);
}
int SrsConnection::srs_id()

@ -36,7 +36,22 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_thread.hpp>
#include <srs_app_kbps.hpp>
class SrsServer;
class SrsConnection;
/**
* the manager for connection.
*/
class IConnectionManager
{
public:
IConnectionManager();
virtual ~IConnectionManager();
public:
/**
* remove the specified connection.
*/
virtual void remove(SrsConnection* c) = 0;
};
/**
* the basic connection of SRS,
@ -57,9 +72,9 @@ private:
int id;
protected:
/**
* the server object to manage the connection.
* the manager object to manage the connection.
*/
SrsServer* server;
IConnectionManager* manager;
/**
* the underlayer st fd handler.
*/
@ -69,7 +84,7 @@ protected:
*/
std::string ip;
public:
SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd);
SrsConnection(IConnectionManager* cm, st_netfd_t c);
virtual ~SrsConnection();
public:
/**

@ -473,8 +473,8 @@ int SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, SrsHttpMessage* r)
return srs_go_http_response_json(w, ss.str());
}
SrsHttpApi::SrsHttpApi(SrsServer* svr, st_netfd_t fd, SrsHttpServeMux* m)
: SrsConnection(svr, fd)
SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)
: SrsConnection(cm, fd)
{
mux = m;
parser = new SrsHttpParser();

@ -166,7 +166,7 @@ private:
SrsHttpServeMux* mux;
bool crossdomain_required;
public:
SrsHttpApi(SrsServer* svr, st_netfd_t fd, SrsHttpServeMux* m);
SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m);
virtual ~SrsHttpApi();
// interface IKbpsDelta
public:

@ -1334,11 +1334,11 @@ int SrsHttpServer::initialize_hls_streaming()
return ret;
}
SrsHttpConn::SrsHttpConn(SrsServer* svr, st_netfd_t fd, SrsHttpServer* m)
: SrsConnection(svr, fd)
SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m)
: SrsConnection(cm, fd)
{
parser = new SrsHttpParser();
http_server = m;
http_mux = m;
}
SrsHttpConn::~SrsHttpConn()
@ -1424,7 +1424,7 @@ int SrsHttpConn::process_request(ISrsHttpResponseWriter* w, SrsHttpMessage* r)
r->method_str().c_str(), r->url().c_str(), r->content_length());
// use default server mux to serve http request.
if ((ret = http_server->mux.serve_http(w, r)) != ERROR_SUCCESS) {
if ((ret = http_mux->serve_http(w, r)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("serve http msg failed. ret=%d", ret);
}

@ -39,6 +39,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_file.hpp>
#include <srs_app_thread.hpp>
class SrsServer;
class SrsSource;
class SrsRequest;
class SrsConsumer;
@ -375,9 +376,9 @@ class SrsHttpConn : public SrsConnection
{
private:
SrsHttpParser* parser;
SrsHttpServer* http_server;
SrsHttpServeMux* http_mux;
public:
SrsHttpConn(SrsServer* svr, st_netfd_t fd, SrsHttpServer* m);
SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m);
virtual ~SrsHttpConn();
// interface IKbpsDelta
public:

@ -75,12 +75,13 @@ using namespace std;
// when edge timeout, retry next.
#define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL)
SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd)
: SrsConnection(srs_server, client_stfd)
SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c)
: SrsConnection(svr, c)
{
server = svr;
req = new SrsRequest();
res = new SrsResponse();
skt = new SrsStSocket(client_stfd);
skt = new SrsStSocket(c);
rtmp = new SrsRtmpServer(skt);
refer = new SrsRefer();
bandwidth = new SrsBandwidth();

@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_conn.hpp>
#include <srs_app_reload.hpp>
class SrsServer;
class SrsRtmpServer;
class SrsRequest;
class SrsResponse;
@ -61,6 +62,7 @@ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandl
// for the thread to directly access any field of connection.
friend class SrsPublishRecvThread;
private:
SrsServer* server;
SrsRequest* req;
SrsResponse* res;
SrsStSocket* skt;
@ -81,7 +83,7 @@ private:
// @see https://github.com/simple-rtmp-server/srs/issues/257
bool realtime;
public:
SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd);
SrsRtmpConn(SrsServer* svr, st_netfd_t c);
virtual ~SrsRtmpConn();
protected:
virtual int do_cycle();

@ -106,6 +106,8 @@ std::string srs_listener_type2string(SrsListenerType type)
return "MPEG-TS over UDP";
case SrsListenerRtsp:
return "RTSP";
case SrsListenerFlv:
return "HTTP-FLV";
default:
return "UNKONWN";
}
@ -238,8 +240,8 @@ SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* server, SrsListenerType type,
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerRtsp);
if (_type == SrsListenerRtsp) {
srs_assert(_type == SrsListenerFlv);
if (_type == SrsListenerFlv) {
caster = new SrsAppCasterFlv(c);
}
}
@ -256,11 +258,15 @@ int SrsHttpFlvListener::listen(string ip, int port)
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerRtsp);
srs_assert(_type == SrsListenerFlv);
_ip = ip;
_port = port;
if ((ret = caster->initialize()) != ERROR_SUCCESS) {
return ret;
}
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
@ -1157,7 +1163,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
#endif
} else if (type == SrsListenerHttpStream) {
#ifdef SRS_AUTO_HTTP_SERVER
conn = new SrsHttpConn(this, client_stfd, http_stream_mux);
conn = new SrsHttpConn(this, client_stfd, &http_stream_mux->mux);
#else
srs_warn("close http client for server not support http-server");
srs_close_stfd(client_stfd);

@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_source.hpp>
#include <srs_app_hls.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_conn.hpp>
class SrsServer;
class SrsConnection;
@ -51,6 +52,9 @@ class ISrsTcpHandler;
class ISrsUdpHandler;
class SrsUdpListener;
class SrsTcpListener;
#ifdef SRS_AUTO_STREAM_CASTER
class SrsAppCasterFlv;
#endif
// listener type for server to identify the connection,
// that is, use different type to process the connection.
@ -66,7 +70,7 @@ enum SrsListenerType
SrsListenerMpegTsOverUdp = 3,
// TCP stream, RTSP stream.
SrsListenerRtsp = 4,
// HTTP stream, FLV over HTTP POST.
// TCP stream, FLV stream over HTTP.
SrsListenerFlv = 5,
};
@ -126,13 +130,13 @@ public:
};
/**
* the tcp listener, for http flv server.
* the tcp listener, for flv stream server.
*/
class SrsHttpFlvListener : virtual public SrsListener, virtual public ISrsTcpHandler
{
private:
SrsTcpListener* listener;
ISrsTcpHandler* caster;
SrsAppCasterFlv* caster;
public:
SrsHttpFlvListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c);
virtual ~SrsHttpFlvListener();
@ -215,6 +219,7 @@ public:
*/
class SrsServer : virtual public ISrsReloadHandler
, virtual public ISrsSourceHandler, virtual public ISrsHlsHandler
, virtual public IConnectionManager
{
private:
#ifdef SRS_AUTO_HTTP_API
@ -279,7 +284,7 @@ public:
virtual int http_handle();
virtual int ingest();
virtual int cycle();
// server utility
// IConnectionManager
public:
/**
* callback for connection to remove itself.
@ -287,6 +292,8 @@ public:
* @see SrsConnection.on_thread_stop().
*/
virtual void remove(SrsConnection* conn);
// server utilities.
public:
/**
* callback for signal manager got a signal.
* the signal manager convert signal to io message,

Loading…
Cancel
Save