For #906, #902, use connection manager to remove connection

pull/908/head
winlin 8 years ago
parent 44f542f77f
commit 3ffb0980f5

@ -51,10 +51,13 @@ SrsAppCasterFlv::SrsAppCasterFlv(SrsConfDirective* c)
{
http_mux = new SrsHttpServeMux();
output = _srs_config->get_stream_caster_output(c);
manager = new SrsCoroutineManager();
}
SrsAppCasterFlv::~SrsAppCasterFlv()
{
srs_freep(http_mux);
srs_freep(manager);
}
int SrsAppCasterFlv::initialize()
@ -65,6 +68,10 @@ int SrsAppCasterFlv::initialize()
return ret;
}
if ((ret = manager->start()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
@ -95,7 +102,7 @@ void SrsAppCasterFlv::remove(ISrsConnection* c)
// fixbug: SrsHttpConn for CasterFlv is not freed, which could cause memory leak
// so, free conn which is not managed by SrsServer->conns;
// @see: https://github.com/ossrs/srs/issues/826
srs_freep(c);
manager->remove(c);
}
int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)

@ -42,7 +42,7 @@ class SrsFlvDecoder;
class SrsTcpClient;
class SrsSimpleRtmpClient;
#include <srs_app_st.hpp>
#include <srs_app_thread.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_http_conn.hpp>
@ -52,24 +52,25 @@ class SrsSimpleRtmpClient;
* the stream caster for flv stream over HTTP POST.
*/
class SrsAppCasterFlv : virtual public ISrsTcpHandler
, virtual public IConnectionManager, virtual public ISrsHttpHandler
, virtual public IConnectionManager, virtual public ISrsHttpHandler
{
private:
std::string output;
SrsHttpServeMux* http_mux;
std::vector<SrsHttpConn*> conns;
SrsCoroutineManager* manager;
public:
SrsAppCasterFlv(SrsConfDirective* c);
virtual ~SrsAppCasterFlv();
public:
virtual int initialize();
// ISrsTcpHandler
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
// IConnectionManager
// IConnectionManager
public:
virtual void remove(ISrsConnection* c);
// ISrsHttpHandler
// ISrsHttpHandler
public:
virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
};

@ -50,6 +50,7 @@ using namespace std;
#include <srs_core_mem_watch.hpp>
#include <srs_kernel_consts.hpp>
#include <srs_app_kafka.hpp>
#include <srs_app_thread.hpp>
// system interval in ms,
// all resolution times should be times togother,
@ -483,6 +484,7 @@ SrsServer::SrsServer()
pid_fd = -1;
signal_manager = NULL;
conn_manager = new SrsCoroutineManager();
handler = NULL;
ppid = ::getppid();
@ -524,6 +526,7 @@ void SrsServer::destroy()
}
srs_freep(signal_manager);
srs_freep(conn_manager);
}
void SrsServer::dispose()
@ -742,6 +745,10 @@ int SrsServer::listen()
return ret;
}
if ((ret = conn_manager->start()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
@ -1334,7 +1341,7 @@ void SrsServer::remove(ISrsConnection* c)
// all connections are created by server,
// so we free it here.
srs_freep(conn);
conn_manager->remove(c);
}
int SrsServer::on_reload_listen()

@ -54,6 +54,7 @@ class SrsAppCasterFlv;
#ifdef SRS_AUTO_KAFKA
class SrsKafkaProducer;
#endif
class SrsCoroutineManager;
// listener type for server to identify the connection,
// that is, use different type to process the connection.
@ -233,8 +234,8 @@ public:
* start connection service thread, destroy client.
*/
class SrsServer : virtual public ISrsReloadHandler
, virtual public ISrsSourceHandler
, virtual public IConnectionManager
, virtual public ISrsSourceHandler
, virtual public IConnectionManager
{
private:
// TODO: FIXME: rename to http_api
@ -244,6 +245,7 @@ private:
#ifdef SRS_AUTO_INGEST
SrsIngester* ingester;
#endif
SrsCoroutineManager* conn_manager;
private:
/**
* the pid file fd, lock the file write when server is running.

@ -26,6 +26,58 @@
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <vector>
using namespace std;
SrsCoroutineManager::SrsCoroutineManager()
{
cond = st_cond_new();
trd = new SrsCoroutine("manager", this);
}
SrsCoroutineManager::~SrsCoroutineManager()
{
srs_freep(trd);
st_cond_destroy(cond);
clear();
}
int SrsCoroutineManager::start()
{
return trd->start();
}
int SrsCoroutineManager::cycle()
{
while (!trd->pull()) {
st_cond_wait(cond);
clear();
}
return ERROR_SUCCESS;
}
void SrsCoroutineManager::remove(ISrsConnection* c)
{
conns.push_back(c);
st_cond_signal(cond);
}
void SrsCoroutineManager::clear()
{
// To prevent thread switch when delete connection,
// we copy all connections then free one by one.
vector<ISrsConnection*> copy = conns;
conns.clear();
vector<ISrsConnection*>::iterator it;
for (it = copy.begin(); it != copy.end(); ++it) {
ISrsConnection* conn = *it;
srs_freep(conn);
}
}
ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler()
{
}

@ -26,7 +26,37 @@
#include <srs_core.hpp>
#include <vector>
#include <srs_app_st.hpp>
#include <srs_service_conn.hpp>
/**
* The coroutine manager use a thread to delete a connection, which will stop the service
* thread, for example, when the RTMP connection thread cycle terminated, it will notify
* the manager(the server) to remove the connection from list of server and push it to
* the manager thread to delete it, finally the thread of connection will stop.
*/
class SrsCoroutineManager : virtual public ISrsCoroutineHandler, virtual public IConnectionManager
{
private:
SrsCoroutine* trd;
std::vector<ISrsConnection*> conns;
st_cond_t cond;
public:
SrsCoroutineManager();
virtual ~SrsCoroutineManager();
public:
int start();
// ISrsCoroutineHandler
public:
virtual int cycle();
// IConnectionManager
public:
virtual void remove(ISrsConnection* c);
private:
void clear();
};
/**
* the one cycle thread is a thread do the cycle only one time,

Loading…
Cancel
Save