RTC: Refine resouce management

pull/1969/head
winlin 4 years ago
parent 1173d35e33
commit 64705d1cc8

@ -49,7 +49,7 @@ SrsAppCasterFlv::SrsAppCasterFlv(SrsConfDirective* c)
{
http_mux = new SrsHttpServeMux();
output = _srs_config->get_stream_caster_output(c);
manager = new SrsConnectionManager();
manager = new SrsResourceManager("CasterFLV");
}
SrsAppCasterFlv::~SrsAppCasterFlv()
@ -95,7 +95,7 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
return err;
}
void SrsAppCasterFlv::remove(ISrsConnection* c)
void SrsAppCasterFlv::remove(ISrsResource* c)
{
SrsTcpConnection* conn = dynamic_cast<SrsTcpConnection*>(c);
@ -141,7 +141,7 @@ srs_error_t SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
return err;
}
SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port)
SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port)
{
sdk = NULL;
pprint = SrsPithyPrint::create_caster();

@ -48,13 +48,13 @@ class SrsSimpleRtmpClient;
// The stream caster for flv stream over HTTP POST.
class SrsAppCasterFlv : virtual public ISrsTcpHandler
, virtual public IConnectionManager, virtual public ISrsHttpHandler
, virtual public ISrsResourceManager, virtual public ISrsHttpHandler
{
private:
std::string output;
SrsHttpServeMux* http_mux;
std::vector<SrsHttpConn*> conns;
SrsConnectionManager* manager;
SrsResourceManager* manager;
public:
SrsAppCasterFlv(SrsConfDirective* c);
virtual ~SrsAppCasterFlv();
@ -63,9 +63,9 @@ public:
// Interface ISrsTcpHandler
public:
virtual srs_error_t on_tcp_client(srs_netfd_t stfd);
// Interface IConnectionManager
// Interface ISrsResourceManager
public:
virtual void remove(ISrsConnection* c);
virtual void remove(ISrsResource* c);
// Interface ISrsHttpHandler
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
@ -79,7 +79,7 @@ private:
SrsPithyPrint* pprint;
SrsSimpleRtmpClient* sdk;
public:
SrsDynamicHttpConn(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsDynamicHttpConn();
public:
virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg);

@ -31,28 +31,44 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_service_log.hpp>
SrsConnectionManager::SrsConnectionManager()
ISrsDisposingHandler::ISrsDisposingHandler()
{
}
ISrsDisposingHandler::~ISrsDisposingHandler()
{
}
SrsResourceManager::SrsResourceManager(const std::string& label, bool verbose)
{
verbose_ = verbose;
label_ = label;
cond = srs_cond_new();
trd = new SrsSTCoroutine("manager", this);
trd = NULL;
}
SrsConnectionManager::~SrsConnectionManager()
SrsResourceManager::~SrsResourceManager()
{
if (trd) {
srs_cond_signal(cond);
trd->stop();
srs_freep(trd);
srs_cond_destroy(cond);
}
clear();
}
srs_error_t SrsConnectionManager::start()
srs_error_t SrsResourceManager::start()
{
srs_error_t err = srs_success;
cid_ = _srs_context->generate_id();
trd = new SrsSTCoroutine("manager", this, cid_);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "conn manager");
}
@ -60,20 +76,22 @@ srs_error_t SrsConnectionManager::start()
return err;
}
bool SrsConnectionManager::empty()
bool SrsResourceManager::empty()
{
return conns_.empty();
}
size_t SrsConnectionManager::size()
size_t SrsResourceManager::size()
{
return conns_.size();
}
srs_error_t SrsConnectionManager::cycle()
srs_error_t SrsResourceManager::cycle()
{
srs_error_t err = srs_success;
srs_trace("%s connection manager run", label_.c_str());
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "conn manager");
@ -91,67 +109,110 @@ srs_error_t SrsConnectionManager::cycle()
return err;
}
void SrsConnectionManager::add(ISrsConnection* conn)
void SrsResourceManager::add(ISrsResource* conn)
{
if (std::find(conns_.begin(), conns_.end(), conn) == conns_.end()) {
conns_.push_back(conn);
}
}
void SrsConnectionManager::add_with_id(const std::string& id, ISrsConnection* conn)
void SrsResourceManager::add_with_id(const std::string& id, ISrsResource* conn)
{
add(conn);
conns_id_.insert(make_pair(id, conn));
}
void SrsConnectionManager::add_with_name(const std::string& name, ISrsConnection* conn)
void SrsResourceManager::add_with_name(const std::string& name, ISrsResource* conn)
{
add(conn);
conns_name_.insert(make_pair(name, conn));
}
ISrsConnection* SrsConnectionManager::at(int index)
ISrsResource* SrsResourceManager::at(int index)
{
return conns_.at(index);
return (index < (int)conns_.size())? conns_.at(index) : NULL;
}
ISrsConnection* SrsConnectionManager::find_by_id(std::string id)
ISrsResource* SrsResourceManager::find_by_id(std::string id)
{
map<string, ISrsConnection*>::iterator it = conns_id_.find(id);
map<string, ISrsResource*>::iterator it = conns_id_.find(id);
return (it != conns_id_.end())? it->second : NULL;
}
ISrsConnection* SrsConnectionManager::find_by_name(std::string name)
ISrsResource* SrsResourceManager::find_by_name(std::string name)
{
map<string, ISrsConnection*>::iterator it = conns_name_.find(name);
map<string, ISrsResource*>::iterator it = conns_name_.find(name);
return (it != conns_name_.end())? it->second : NULL;
}
void SrsConnectionManager::remove(ISrsConnection* c)
void SrsResourceManager::subscribe(ISrsDisposingHandler* h)
{
if (std::find(handlers_.begin(), handlers_.end(), h) == handlers_.end()) {
handlers_.push_back(h);
}
}
void SrsResourceManager::unsubscribe(ISrsDisposingHandler* h)
{
vector<ISrsDisposingHandler*>::iterator it = find(handlers_.begin(), handlers_.end(), h);
if (it != handlers_.end()) {
handlers_.erase(it);
}
}
void SrsResourceManager::remove(ISrsResource* c)
{
SrsContextRestore(cid_);
if (verbose_) {
_srs_context->set_id(c->get_id());
srs_trace("before dispose resource(%s), zombies=%d", c->desc().c_str(), (int)zombies_.size());
}
if (std::find(zombies_.begin(), zombies_.end(), c) == zombies_.end()) {
zombies_.push_back(c);
srs_cond_signal(cond);
}
for (int i = 0; i < (int)handlers_.size(); i++) {
ISrsDisposingHandler* h = handlers_.at(i);
h->on_before_dispose(c);
}
srs_cond_signal(cond);
}
void SrsConnectionManager::clear()
void SrsResourceManager::clear()
{
if (zombies_.empty()) {
return;
}
SrsContextRestore(cid_);
if (verbose_) {
srs_trace("clear zombies=%d connections", (int)zombies_.size());
}
// To prevent thread switch when delete connection,
// we copy all connections then free one by one.
vector<ISrsConnection*> copy;
vector<ISrsResource*> copy;
copy.swap(zombies_);
vector<ISrsConnection*>::iterator it;
vector<ISrsResource*>::iterator it;
for (it = copy.begin(); it != copy.end(); ++it) {
ISrsConnection* conn = *it;
ISrsResource* conn = *it;
if (verbose_) {
_srs_context->set_id(conn->get_id());
srs_trace("disposing resource(%s), zombies=%d/%d", conn->desc().c_str(),
(int)copy.size(), (int)zombies_.size());
}
dispose(conn);
}
}
void SrsConnectionManager::dispose(ISrsConnection* c)
void SrsResourceManager::dispose(ISrsResource* c)
{
for (map<string, ISrsConnection*>::iterator it = conns_name_.begin(); it != conns_name_.end();) {
for (map<string, ISrsResource*>::iterator it = conns_name_.begin(); it != conns_name_.end();) {
if (c != it->second) {
++it;
} else {
@ -160,7 +221,7 @@ void SrsConnectionManager::dispose(ISrsConnection* c)
}
}
for (map<string, ISrsConnection*>::iterator it = conns_id_.begin(); it != conns_id_.end();) {
for (map<string, ISrsResource*>::iterator it = conns_id_.begin(); it != conns_id_.end();) {
if (c != it->second) {
++it;
} else {
@ -169,15 +230,20 @@ void SrsConnectionManager::dispose(ISrsConnection* c)
}
}
vector<ISrsConnection*>::iterator it = std::find(conns_.begin(), conns_.end(), c);
vector<ISrsResource*>::iterator it = std::find(conns_.begin(), conns_.end(), c);
if (it != conns_.end()) {
conns_.erase(it);
}
for (int i = 0; i < (int)handlers_.size(); i++) {
ISrsDisposingHandler* h = handlers_.at(i);
h->on_disposing(c);
}
srs_freep(c);
}
SrsTcpConnection::SrsTcpConnection(IConnectionManager* cm, srs_netfd_t c, string cip, int cport)
SrsTcpConnection::SrsTcpConnection(ISrsResourceManager* cm, srs_netfd_t c, string cip, int cport)
{
manager = cm;
stfd = c;
@ -357,6 +423,11 @@ string SrsTcpConnection::remote_ip()
return ip;
}
const SrsContextId& SrsTcpConnection::get_id()
{
return trd->cid();
}
void SrsTcpConnection::expire()
{
trd->interrupt();

@ -37,24 +37,43 @@
class SrsWallClock;
// The connection manager remove connection and delete it asynchronously.
class SrsConnectionManager : virtual public ISrsCoroutineHandler, virtual public IConnectionManager
// Hooks for connection manager, to handle the event when disposing connections.
class ISrsDisposingHandler
{
public:
ISrsDisposingHandler();
virtual ~ISrsDisposingHandler();
public:
// When before disposing resource, trigger when manager.remove(c), sync API.
virtual void on_before_dispose(ISrsResource* c) = 0;
// When disposing resource, async API, c is freed after it.
virtual void on_disposing(ISrsResource* c) = 0;
};
// The resource manager remove resource and delete it asynchronously.
class SrsResourceManager : virtual public ISrsCoroutineHandler, virtual public ISrsResourceManager
{
private:
std::string label_;
SrsContextId cid_;
bool verbose_;
private:
SrsCoroutine* trd;
srs_cond_t cond;
// Callback handlers.
std::vector<ISrsDisposingHandler*> handlers_;
// The zombie connections, we will delete it asynchronously.
std::vector<ISrsConnection*> zombies_;
std::vector<ISrsResource*> zombies_;
private:
// The connections without any id.
std::vector<ISrsConnection*> conns_;
// The connections with connection id.
std::map<std::string, ISrsConnection*> conns_id_;
// The connections with connection name.
std::map<std::string, ISrsConnection*> conns_name_;
std::vector<ISrsResource*> conns_;
// The connections with resource id.
std::map<std::string, ISrsResource*> conns_id_;
// The connections with resource name.
std::map<std::string, ISrsResource*> conns_name_;
public:
SrsConnectionManager();
virtual ~SrsConnectionManager();
SrsResourceManager(const std::string& label, bool verbose = false);
virtual ~SrsResourceManager();
public:
srs_error_t start();
bool empty();
@ -63,18 +82,21 @@ public:
public:
virtual srs_error_t cycle();
public:
void add(ISrsConnection* conn);
void add_with_id(const std::string& id, ISrsConnection* conn);
void add_with_name(const std::string& name, ISrsConnection* conn);
ISrsConnection* at(int index);
ISrsConnection* find_by_id(std::string id);
ISrsConnection* find_by_name(std::string name);
// Interface IConnectionManager
void add(ISrsResource* conn);
void add_with_id(const std::string& id, ISrsResource* conn);
void add_with_name(const std::string& name, ISrsResource* conn);
ISrsResource* at(int index);
ISrsResource* find_by_id(std::string id);
ISrsResource* find_by_name(std::string name);
public:
void subscribe(ISrsDisposingHandler* h);
void unsubscribe(ISrsDisposingHandler* h);
// Interface ISrsResourceManager
public:
virtual void remove(ISrsConnection* c);
virtual void remove(ISrsResource* c);
private:
void clear();
void dispose(ISrsConnection* c);
void dispose(ISrsResource* c);
};
// The basic connection of SRS, for TCP based protocols,
@ -88,7 +110,7 @@ protected:
// when thread stop, the connection will be delete by server.
SrsCoroutine* trd;
// The manager object to manage the connection.
IConnectionManager* manager;
ISrsResourceManager* manager;
// The underlayer st fd handler.
srs_netfd_t stfd;
// The ip and port of client.
@ -106,7 +128,7 @@ protected:
// for current connection to log self create time and calculate the living time.
int64_t create_time;
public:
SrsTcpConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip, int cport);
SrsTcpConnection(ISrsResourceManager* cm, srs_netfd_t c, std::string cip, int cport);
virtual ~SrsTcpConnection();
// Interface ISrsKbpsDelta
public:
@ -136,8 +158,11 @@ public:
// Get the srs id which identify the client.
// TODO: FIXME: Rename to cid.
virtual SrsContextId srs_id();
// Get the remote ip of peer.
// Interface ISrsConnection.
public:
virtual std::string remote_ip();
virtual const SrsContextId& get_id();
public:
// Set connection to expired.
virtual void expire();
protected:

@ -1011,6 +1011,11 @@ std::string SrsGb28181RtmpMuxer::remote_ip()
return "";
}
std::string SrsGb28181RtmpMuxer::desc()
{
return "GBConn";
}
std::string SrsGb28181RtmpMuxer::get_channel_id()
{
return channel_id;
@ -1890,7 +1895,7 @@ SrsGb28181Manger::SrsGb28181Manger(SrsServer *s, SrsConfDirective* c)
// TODO: FIXME: support reload.
server = s;
config = new SrsGb28181Config(c);
manager = new SrsConnectionManager();
manager = new SrsResourceManager("GB28181");
}
SrsGb28181Manger::~SrsGb28181Manger()

@ -348,6 +348,7 @@ private:
public:
virtual srs_error_t cycle();
virtual std::string remote_ip();
virtual std::string desc();
public:
virtual srs_error_t on_rtp_video(SrsSimpleStream* stream, int64_t dts);
virtual srs_error_t on_rtp_audio(SrsSimpleStream* stream, int64_t dts, int type);
@ -473,7 +474,7 @@ private:
std::map<uint32_t, SrsPsRtpListener*> rtp_pool;
std::map<uint32_t, SrsGb28181RtmpMuxer*> rtmpmuxers_ssrc;
std::map<std::string, SrsGb28181RtmpMuxer*> rtmpmuxers;
SrsConnectionManager* manager;
SrsResourceManager* manager;
SrsGb28181SipService* sip_service;
SrsServer* server;
public:

@ -301,6 +301,11 @@ std::string SrsGb28181SipSession::remote_ip()
return _peer_ip;
}
std::string SrsGb28181SipSession::desc()
{
return "SipConn";
}
srs_error_t SrsGb28181SipSession::cycle()
{
srs_error_t err = do_cycle();

@ -144,6 +144,7 @@ public:
public:
virtual srs_error_t cycle();
virtual std::string remote_ip();
virtual std::string desc();
private:
virtual srs_error_t do_cycle();
};

@ -1674,7 +1674,7 @@ srs_error_t SrsGoApiTcmalloc::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
}
#endif
SrsHttpApi::SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port)
SrsHttpApi::SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port)
: SrsTcpConnection(cm, fd, cip, port)
{
mux = m;
@ -1692,6 +1692,11 @@ SrsHttpApi::~SrsHttpApi()
_srs_config->unsubscribe(this);
}
std::string SrsHttpApi::desc()
{
return "HttpConn";
}
void SrsHttpApi::remark(int64_t* in, int64_t* out)
{
// TODO: FIXME: implements it

@ -261,8 +261,11 @@ private:
SrsHttpCorsMux* cors;
SrsHttpServeMux* mux;
public:
SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpApi();
// Interface ISrsResource.
public:
virtual std::string desc();
// Interface ISrsKbpsDelta
public:
virtual void remark(int64_t* in, int64_t* out);

@ -59,7 +59,7 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_app_st.hpp>
SrsHttpConn::SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port)
SrsHttpConn::SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port)
: SrsTcpConnection(cm, fd, cip, port)
{
parser = new SrsHttpParser();
@ -73,6 +73,11 @@ SrsHttpConn::~SrsHttpConn()
srs_freep(cors);
}
std::string SrsHttpConn::desc()
{
return "HttpConn";
}
void SrsHttpConn::remark(int64_t* in, int64_t* out)
{
// TODO: FIXME: implements it
@ -185,7 +190,7 @@ srs_error_t SrsHttpConn::on_reload_http_stream_crossdomain()
return err;
}
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port)
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port)
{
}

@ -62,8 +62,11 @@ protected:
ISrsHttpServeMux* http_mux;
SrsHttpCorsMux* cors;
public:
SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpConn();
// Interface ISrsResource.
public:
virtual std::string desc();
// Interface ISrsKbpsDelta
public:
virtual void remark(int64_t* in, int64_t* out);
@ -89,7 +92,7 @@ public:
class SrsResponseOnlyHttpConn : public SrsHttpConn
{
public:
SrsResponseOnlyHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsResponseOnlyHttpConn();
public:
// Directly read a HTTP request message.

@ -1603,10 +1603,14 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
nn_simulate_player_nack_drop = 0;
pp_address_change = new SrsErrorPithyPrint();
pli_epp = new SrsErrorPithyPrint();
_srs_rtc_manager->subscribe(this);
}
SrsRtcConnection::~SrsRtcConnection()
{
_srs_rtc_manager->unsubscribe(this);
srs_freep(timer_);
// Cleanup publishers.
@ -1640,6 +1644,31 @@ SrsRtcConnection::~SrsRtcConnection()
srs_freep(pli_epp);
}
void SrsRtcConnection::on_before_dispose(ISrsResource* c)
{
if (disposing_) {
return;
}
SrsRtcConnection* session = dynamic_cast<SrsRtcConnection*>(c);
if (session == this) {
disposing_ = true;
}
if (session && session == this) {
_srs_context->set_id(cid_);
srs_trace("RTC: session detach from [%s](%s), disposing=%d", c->get_id().c_str(),
c->desc().c_str(), disposing_);
}
}
void SrsRtcConnection::on_disposing(ISrsResource* c)
{
if (disposing_) {
return;
}
}
SrsSdp* SrsRtcConnection::get_local_sdp()
{
return &local_sdp;
@ -1688,12 +1717,14 @@ vector<SrsUdpMuxSocket*> SrsRtcConnection::peer_addresses()
return addresses;
}
string SrsRtcConnection::remote_ip()
const SrsContextId& SrsRtcConnection::get_id()
{
if (sendonly_skt) {
return sendonly_skt->get_peer_ip();
return cid_;
}
return "";
std::string SrsRtcConnection::desc()
{
return "RtcConn";
}
void SrsRtcConnection::switch_to_context()
@ -2126,17 +2157,14 @@ srs_error_t SrsRtcConnection::on_dtls_alert(std::string type, std::string desc)
{
srs_error_t err = srs_success;
SrsRtcConnection* session = this;
// CN(Close Notify) is sent when client close the PeerConnection.
if (type == "warning" && desc == "CN") {
SrsContextRestore(_srs_context->get_id());
session->switch_to_context();
string username = session->username();
srs_trace("RTC: session DTLS alert, username=%s, summary: %s", username.c_str(), session->stat_->summary().c_str());
switch_to_context();
server_->dispose(session);
srs_trace("RTC: session destroy by DTLS alert, username=%s, summary: %s",
username_.c_str(), stat_->summary().c_str());
_srs_rtc_manager->remove(this);
}
return err;

@ -39,6 +39,7 @@
#include <srs_app_rtc_source.hpp>
#include <srs_app_rtc_dtls.hpp>
#include <srs_service_conn.hpp>
#include <srs_app_conn.hpp>
#include <string>
#include <map>
@ -416,7 +417,8 @@ public:
};
// A RTC Peer Connection, SDP level object.
class SrsRtcConnection : virtual public ISrsHourGlass, virtual public ISrsConnection
class SrsRtcConnection : virtual public ISrsHourGlass, virtual public ISrsResource
, virtual public ISrsDisposingHandler
{
friend class SrsSecurityTransport;
friend class SrsRtcPlayStream;
@ -469,6 +471,10 @@ private:
public:
SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid);
virtual ~SrsRtcConnection();
// interface ISrsDisposingHandler
public:
virtual void on_before_dispose(ISrsResource* c);
virtual void on_disposing(ISrsResource* c);
public:
// TODO: FIXME: save only connection info.
SrsSdp* get_local_sdp();
@ -482,9 +488,10 @@ public:
std::string username();
// Get all addresses client used.
std::vector<SrsUdpMuxSocket*> peer_addresses();
// interface ISrsConnection
// Interface ISrsResource.
public:
virtual std::string remote_ip();
virtual const SrsContextId& get_id();
virtual std::string desc();
public:
void switch_to_context();
const SrsContextId& context_id();

@ -216,7 +216,6 @@ SrsRtcServer::SrsRtcServer()
{
handler = NULL;
hijacker = NULL;
manager = new SrsConnectionManager();
timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS);
}
@ -231,8 +230,6 @@ SrsRtcServer::~SrsRtcServer()
srs_freep(listener);
}
}
srs_freep(manager);
}
srs_error_t SrsRtcServer::initialize()
@ -251,10 +248,6 @@ srs_error_t SrsRtcServer::initialize()
return srs_error_wrap(err, "black hole");
}
if ((err = manager->start()) != srs_success) {
return srs_error_wrap(err, "start manager");
}
srs_trace("RTC server init ok");
return err;
@ -311,7 +304,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
SrsRtcConnection* session = NULL;
if (true) {
ISrsConnection* conn = manager->find_by_id(peer_id);
ISrsResource* conn = _srs_rtc_manager->find_by_id(peer_id);
if (conn) {
// Switch to the session to write logs to the context.
session = dynamic_cast<SrsRtcConnection*>(conn);
@ -458,7 +451,7 @@ srs_error_t SrsRtcServer::do_create_session(
local_ufrag = srs_random_str(8);
username = local_ufrag + ":" + remote_sdp.get_ice_ufrag();
if (!manager->find_by_name(username)) {
if (!_srs_rtc_manager->find_by_name(username)) {
break;
}
}
@ -504,7 +497,7 @@ srs_error_t SrsRtcServer::do_create_session(
}
// We allows username is optional, but it never empty here.
manager->add_with_name(username, session);
_srs_rtc_manager->add_with_name(username, session);
return err;
}
@ -567,7 +560,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest*
}
// We allows username is optional, but it never empty here.
manager->add_with_name(username, session);
_srs_rtc_manager->add_with_name(username, session);
session->set_remote_sdp(remote_sdp);
session->set_state(WAITING_STUN);
@ -575,37 +568,14 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest*
return err;
}
void SrsRtcServer::dispose(SrsRtcConnection* session)
{
if (session->disposing_) {
return;
}
destroy(session);
if (handler) {
handler->on_timeout(session);
}
}
void SrsRtcServer::destroy(SrsRtcConnection* session)
{
if (session->disposing_) {
return;
}
session->disposing_ = true;
manager->remove(session);
}
void SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session)
{
manager->add_with_id(peer_id, session);
_srs_rtc_manager->add_with_id(peer_id, session);
}
SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& username)
{
ISrsConnection* conn = manager->find_by_name(username);
ISrsResource* conn = _srs_rtc_manager->find_by_name(username);
return dynamic_cast<SrsRtcConnection*>(conn);
}
@ -614,11 +584,9 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic
srs_error_t err = srs_success;
// Check all sessions and dispose the dead sessions.
for (int i = 0; i < (int)manager->size(); i++) {
SrsRtcConnection* session = dynamic_cast<SrsRtcConnection*>(manager->at(i));
srs_assert(session);
if (!session->is_stun_timeout()) {
for (int i = 0; i < (int)_srs_rtc_manager->size(); i++) {
SrsRtcConnection* session = dynamic_cast<SrsRtcConnection*>(_srs_rtc_manager->at(i));
if (!session || !session->is_stun_timeout() || session->disposing_) {
continue;
}
@ -626,10 +594,11 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic
session->switch_to_context();
string username = session->username();
srs_trace("RTC: session STUN timeout, username=%s, summary: %s", username.c_str(), session->stat_->summary().c_str());
srs_trace("RTC: session destroy by timeout, username=%s, summary: %s", username.c_str(),
session->stat_->summary().c_str());
// Destroy session and notify the handler.
dispose(session);
// Use manager to free session and notify other objects.
_srs_rtc_manager->remove(session);
}
return err;
@ -672,6 +641,10 @@ srs_error_t RtcServerAdapter::run()
return srs_error_wrap(err, "listen api");
}
if ((err = _srs_rtc_manager->start()) != srs_success) {
return srs_error_wrap(err, "start manager");
}
return err;
}
@ -679,3 +652,5 @@ void RtcServerAdapter::stop()
{
}
SrsResourceManager* _srs_rtc_manager = new SrsResourceManager("RTC", true);

@ -40,7 +40,7 @@ class SrsRtcConnection;
class SrsRequest;
class SrsSdp;
class SrsRtcStream;
class SrsConnectionManager;
class SrsResourceManager;
// The UDP black hole, for developer to use wireshark to catch plaintext packets.
// For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole,
@ -92,7 +92,6 @@ private:
std::vector<SrsUdpMuxListener*> listeners;
ISrsRtcServerHandler* handler;
ISrsRtcServerHijacker* hijacker;
SrsConnectionManager* manager;
public:
SrsRtcServer();
virtual ~SrsRtcServer();
@ -123,10 +122,6 @@ public:
// We start offering, create_session2 to generate offer, setup_session2 to handle answer.
srs_error_t create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, bool unified_plan, SrsRtcConnection** psession);
srs_error_t setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp);
// Destroy the session and notify the callback.
void dispose(SrsRtcConnection* session);
// Destroy the session from server, without notify callback.
void destroy(SrsRtcConnection* session);
public:
void insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session);
public:
@ -150,5 +145,8 @@ public:
virtual void stop();
};
// Manager for RTC connections.
extern SrsResourceManager* _srs_rtc_manager;
#endif

@ -140,6 +140,11 @@ SrsRtmpConn::~SrsRtmpConn()
srs_freep(security);
}
std::string SrsRtmpConn::desc()
{
return "RtmpConn";
}
void SrsRtmpConn::dispose()
{
SrsTcpConnection::dispose();

@ -119,6 +119,9 @@ private:
public:
SrsRtmpConn(SrsServer* svr, srs_netfd_t c, std::string cip, int port);
virtual ~SrsRtmpConn();
// Interface ISrsResource.
public:
virtual std::string desc();
public:
virtual void dispose();
protected:

@ -256,6 +256,16 @@ std::string SrsRtspConn::remote_ip()
return "";
}
std::string SrsRtspConn::desc()
{
return "RtspConn";
}
const SrsContextId& SrsRtspConn::get_id()
{
return _srs_context->get_id();
}
srs_error_t SrsRtspConn::do_cycle()
{
srs_error_t err = srs_success;
@ -712,7 +722,7 @@ SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c)
output = _srs_config->get_stream_caster_output(c);
local_port_min = _srs_config->get_stream_caster_rtp_port_min(c);
local_port_max = _srs_config->get_stream_caster_rtp_port_max(c);
manager = new SrsConnectionManager();
manager = new SrsResourceManager("CasterRTSP");
}
SrsRtspCaster::~SrsRtspCaster()

@ -51,7 +51,7 @@ class SrsAudioFrame;
class SrsSimpleStream;
class SrsPithyPrint;
class SrsSimpleRtmpClient;
class SrsConnectionManager;
class SrsResourceManager;
// A rtp connection which transport a stream.
class SrsRtpConn: public ISrsUdpHandler
@ -144,7 +144,11 @@ public:
virtual ~SrsRtspConn();
public:
virtual srs_error_t serve();
// Interface ISrsConnection.
public:
virtual std::string remote_ip();
virtual const SrsContextId& get_id();
virtual std::string desc();
private:
virtual srs_error_t do_cycle();
// internal methods
@ -182,7 +186,7 @@ private:
std::map<int, bool> used_ports;
private:
std::vector<SrsRtspConn*> clients;
SrsConnectionManager* manager;
SrsResourceManager* manager;
public:
SrsRtspCaster(SrsConfDirective* c);
virtual ~SrsRtspCaster();

@ -656,7 +656,7 @@ SrsServer::SrsServer()
pid_fd = -1;
signal_manager = new SrsSignalManager(this);
conn_manager = new SrsConnectionManager();
conn_manager = new SrsResourceManager("RTMP/API");
handler = NULL;
ppid = ::getppid();
@ -1575,7 +1575,7 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon
return err;
}
void SrsServer::remove(ISrsConnection* c)
void SrsServer::remove(ISrsResource* c)
{
SrsTcpConnection* conn = dynamic_cast<SrsTcpConnection*>(c);

@ -53,7 +53,7 @@ class SrsUdpListener;
class SrsTcpListener;
class SrsAppCasterFlv;
class SrsRtspCaster;
class SrsConnectionManager;
class SrsResourceManager;
class SrsGb28181Caster;
@ -241,7 +241,7 @@ public:
};
// SRS RTMP server, initialize and listen, start connection service thread, destroy client.
class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler, virtual public IConnectionManager
class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler, virtual public ISrsResourceManager
{
private:
// TODO: FIXME: rename to http_api
@ -249,7 +249,7 @@ private:
SrsHttpServer* http_server;
SrsHttpHeartbeat* http_heartbeat;
SrsIngester* ingester;
SrsConnectionManager* conn_manager;
SrsResourceManager* conn_manager;
private:
// The pid file fd, lock the file write when server is running.
// @remark the init.d script should cleanup the pid file, when stop service,
@ -342,12 +342,12 @@ public:
virtual SrsHttpServeMux* api_server();
private:
virtual srs_error_t fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpConnection** pconn);
// Interface IConnectionManager
// Interface ISrsResourceManager
public:
// A callback for connection to remove itself.
// When connection thread cycle terminated, callback this to delete connection.
// @see SrsTcpConnection.on_thread_stop().
virtual void remove(ISrsConnection* c);
virtual void remove(ISrsResource* c);
// Interface ISrsReloadHandler.
public:
virtual srs_error_t on_reload_listen();

@ -74,9 +74,9 @@ srs_error_t SrsDummyCoroutine::pull()
return srs_error_new(ERROR_THREAD_DUMMY, "dummy pull");
}
SrsContextId SrsDummyCoroutine::cid()
const SrsContextId& SrsDummyCoroutine::cid()
{
return SrsContextId();
return _srs_context->get_id();
}
_ST_THREAD_CREATE_PFN _pfn_st_thread_create = (_ST_THREAD_CREATE_PFN)st_thread_create;
@ -189,7 +189,7 @@ srs_error_t SrsSTCoroutine::pull()
return srs_error_copy(trd_err);
}
SrsContextId SrsSTCoroutine::cid()
const SrsContextId& SrsSTCoroutine::cid()
{
return cid_;
}

@ -81,7 +81,7 @@ public:
// @return a copy of error, which should be freed by user.
// NULL if not terminated and user should pull again.
virtual srs_error_t pull() = 0;
virtual SrsContextId cid() = 0;
virtual const SrsContextId& cid() = 0;
};
// An empty coroutine, user can default to this object before create any real coroutine.
@ -96,7 +96,7 @@ public:
virtual void stop();
virtual void interrupt();
virtual srs_error_t pull();
virtual SrsContextId cid();
virtual const SrsContextId& cid();
};
// For utest to mock the thread create.
@ -156,7 +156,7 @@ public:
// @remark Return ERROR_THREAD_INTERRUPED when thread is interrupted.
virtual srs_error_t pull();
// Get the context id of thread.
virtual SrsContextId cid();
virtual const SrsContextId& cid();
private:
virtual srs_error_t cycle();
static void* pfn(void* arg);

@ -23,19 +23,27 @@
#include <srs_service_conn.hpp>
ISrsConnection::ISrsConnection()
ISrsResource::ISrsResource()
{
}
ISrsConnection::~ISrsConnection()
ISrsResource::~ISrsResource()
{
}
ISrsResourceManager::ISrsResourceManager()
{
}
IConnectionManager::IConnectionManager()
ISrsResourceManager::~ISrsResourceManager()
{
}
IConnectionManager::~IConnectionManager()
ISrsConnection::ISrsConnection()
{
}
ISrsConnection::~ISrsConnection()
{
}

@ -28,26 +28,39 @@
#include <string>
// The connection interface for all HTTP/RTMP/RTSP object.
class ISrsConnection
// The resource managed by ISrsResourceManager.
class ISrsResource
{
public:
ISrsConnection();
virtual ~ISrsConnection();
ISrsResource();
virtual ~ISrsResource();
public:
// Get remote ip address.
virtual std::string remote_ip() = 0;
// Get the context id of connection.
virtual const SrsContextId& get_id() = 0;
// The resource description, optional.
virtual std::string desc() = 0;
};
// The manager for connection.
class IConnectionManager
// The manager for resource.
class ISrsResourceManager
{
public:
IConnectionManager();
virtual ~IConnectionManager();
ISrsResourceManager();
virtual ~ISrsResourceManager();
public:
// Remove then free the specified connection.
virtual void remove(ISrsConnection* c) = 0;
virtual void remove(ISrsResource* c) = 0;
};
// The connection interface for all HTTP/RTMP/RTSP object.
class ISrsConnection : public ISrsResource
{
public:
ISrsConnection();
virtual ~ISrsConnection();
public:
// Get remote ip address.
virtual std::string remote_ip() = 0;
};
#endif

@ -43,7 +43,7 @@ VOID TEST(KernelRTCTest, ConnectionManagerTest)
// Normal scenario, free object by manager.
if (true) {
SrsConnectionManager manager;
SrsResourceManager manager;
HELPER_EXPECT_SUCCESS(manager.start());
EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty());
@ -58,7 +58,7 @@ VOID TEST(KernelRTCTest, ConnectionManagerTest)
// Coroutine switch context, signal is lost.
if (true) {
SrsConnectionManager manager;
SrsResourceManager manager;
HELPER_EXPECT_SUCCESS(manager.start());
EXPECT_EQ(0, manager.size()); EXPECT_TRUE(manager.empty());

@ -54,6 +54,16 @@ MockSrsConnection::~MockSrsConnection()
}
}
const SrsContextId& MockSrsConnection::get_id()
{
return _srs_context->get_id();
}
std::string MockSrsConnection::desc()
{
return "Mock";
}
std::string MockSrsConnection::remote_ip()
{
return "127.0.0.1";
@ -1371,7 +1381,7 @@ VOID TEST(HTTPClientTest, HTTPClientUtility)
}
}
class MockConnectionManager : public IConnectionManager
class MockConnectionManager : public ISrsResourceManager
{
public:
MockConnectionManager() {
@ -1379,7 +1389,7 @@ public:
virtual ~MockConnectionManager() {
}
public:
virtual void remove(ISrsConnection* /*c*/) {
virtual void remove(ISrsResource* /*c*/) {
}
};

@ -40,6 +40,8 @@ public:
public:
MockSrsConnection();
virtual ~MockSrsConnection();
virtual const SrsContextId& get_id();
virtual std::string desc();
virtual std::string remote_ip();
};

Loading…
Cancel
Save