Use SrsAsyncCallWorker in http hooks instead, to covert to async call. (#2542)

* Use SrsAsyncCallWorker in http hooks instead, to covert to async call.

* delete invalid function
pull/2578/head
Haibo Chen 4 years ago committed by GitHub
parent 826f5121c5
commit a7feedabc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -351,6 +351,53 @@ srs_error_t SrsRtcPLIWorker::cycle()
return err;
}
SrsRtcAsyncCallOnStop::SrsRtcAsyncCallOnStop(SrsContextId c, SrsRequest * r)
{
cid = c;
req = r->copy();
}
SrsRtcAsyncCallOnStop::~SrsRtcAsyncCallOnStop()
{
srs_freep(req);
}
srs_error_t SrsRtcAsyncCallOnStop::call()
{
srs_error_t err = srs_success;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return err;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_stop(req->vhost);
if (!conf) {
return err;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_stop(url, req);
}
return err;
}
std::string SrsRtcAsyncCallOnStop::to_string()
{
return std::string("");
}
SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
{
@ -379,9 +426,8 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
SrsRtcPlayStream::~SrsRtcPlayStream()
{
// TODO: FIXME: Use SrsAsyncCallWorker in http hooks instead, to covert to async call.
if (req_) {
http_hooks_on_stop();
session_->server_->exec_async_work(new SrsRtcAsyncCallOnStop(cid_, req_));
}
// TODO: FIXME: Should not do callback in de-constructor?
@ -871,35 +917,6 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
return err;
}
void SrsRtcPlayStream::http_hooks_on_stop()
{
if (!_srs_config->get_vhost_http_hooks_enabled(req_->vhost)) {
return;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_stop(req_->vhost);
if (!conf) {
return;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_stop(url, req_);
}
return;
}
SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream* p) : p_(p)
{
_srs_hybrid->timer1s()->subscribe(this);
@ -979,6 +996,54 @@ srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval)
return err;
}
SrsRtcAsyncCallOnUnpublish::SrsRtcAsyncCallOnUnpublish(SrsContextId c, SrsRequest * r)
{
cid = c;
req = r->copy();
}
SrsRtcAsyncCallOnUnpublish::~SrsRtcAsyncCallOnUnpublish()
{
srs_freep(req);
}
srs_error_t SrsRtcAsyncCallOnUnpublish::call()
{
srs_error_t err = srs_success;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return err;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_unpublish(req->vhost);
if (!conf) {
return err;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_unpublish(url, req);
}
return err;
}
std::string SrsRtcAsyncCallOnUnpublish::to_string()
{
return std::string("");
}
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid)
{
cid_ = cid;
@ -988,7 +1053,7 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon
pli_epp = new SrsErrorPithyPrint();
twcc_epp_ = new SrsErrorPithyPrint(3.0);
req = NULL;
req_ = NULL;
source = NULL;
nn_simulate_nack_drop = 0;
nack_enabled_ = false;
@ -1009,8 +1074,8 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon
SrsRtcPublishStream::~SrsRtcPublishStream()
{
if (req) {
http_hooks_on_unpublish();
if (req_) {
session_->server_->exec_async_work(new SrsRtcAsyncCallOnUnpublish(cid_, req_));
}
srs_freep(timer_rtcp_);
@ -1027,7 +1092,7 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
// it must be called after source stream unpublish (set source stream is_created=false).
// if not, it lead to republish failed.
if (_srs_rtc_hijacker) {
_srs_rtc_hijacker->on_stop_publish(session_, this, req);
_srs_rtc_hijacker->on_stop_publish(session_, this, req_);
}
for (int i = 0; i < (int)video_tracks_.size(); ++i) {
@ -1045,7 +1110,7 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
srs_freep(pli_worker_);
srs_freep(twcc_epp_);
srs_freep(pli_epp);
srs_freep(req);
srs_freep(req_);
// update the statistic when client coveried.
SrsStatistic* stat = SrsStatistic::instance();
@ -1056,7 +1121,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
{
srs_error_t err = srs_success;
req = r->copy();
req_ = r->copy();
if (stream_desc->audio_track_desc_) {
audio_tracks_.push_back(new SrsRtcAudioRecvTrack(session_, stream_desc->audio_track_desc_));
@ -1083,10 +1148,10 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
rtcp_twcc_.set_media_ssrc(media_ssrc);
}
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost);
nack_no_copy_ = _srs_config->get_rtc_nack_no_copy(req->vhost);
pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(req->vhost);
twcc_enabled_ = _srs_config->get_rtc_twcc_enabled(req->vhost);
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req_->vhost);
nack_no_copy_ = _srs_config->get_rtc_nack_no_copy(req_->vhost);
pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(req_->vhost);
twcc_enabled_ = _srs_config->get_rtc_twcc_enabled(req_->vhost);
// No TWCC when negotiate, disable it.
if (twcc_id <= 0) {
@ -1107,14 +1172,14 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
}
// Setup the publish stream in source to enable PLI as such.
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
if ((err = _srs_rtc_sources->fetch_or_create(req_, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
source->set_publish_stream(this);
// Bridge to rtmp
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req->vhost);
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost);
if (rtc_to_rtmp) {
SrsLiveSource *rtmp = NULL;
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) {
@ -1160,14 +1225,14 @@ srs_error_t SrsRtcPublishStream::start()
}
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) {
if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req_)) != srs_success) {
return srs_error_wrap(err, "on start publish");
}
}
// update the statistic when client discoveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req, session_, SrsRtcConnPublish)) != srs_success) {
if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPublish)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}
@ -1385,7 +1450,7 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuff
}
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req, pkt)) != srs_success) {
if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req_, pkt)) != srs_success) {
return srs_error_wrap(err, "on rtp packet");
}
}
@ -1677,33 +1742,6 @@ void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp& n
}
}
void SrsRtcPublishStream::http_hooks_on_unpublish()
{
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_unpublish(req->vhost);
if (!conf) {
return;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_unpublish(url, req);
}
}
ISrsRtcConnectionHijacker::ISrsRtcConnectionHijacker()
{
}
@ -1753,7 +1791,7 @@ srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval)
SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
{
req = NULL;
req_ = NULL;
cid_ = cid;
hijacker_ = NULL;
@ -1820,7 +1858,7 @@ SrsRtcConnection::~SrsRtcConnection()
srs_freep(cache_buffer_);
srs_freep(transport_);
srs_freep(req);
srs_freep(req_);
srs_freep(pp_address_change);
srs_freep(pli_epp);
}
@ -2019,7 +2057,7 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
srs_error_t err = srs_success;
username_ = username;
req = r->copy();
req_ = r->copy();
if (!srtp) {
srs_freep(transport_);
@ -2036,10 +2074,10 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
}
// TODO: FIXME: Support reload.
session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost);
session_timeout = _srs_config->get_rtc_stun_timeout(req_->vhost);
last_stun_time = srs_get_system_time();
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost);
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req_->vhost);
srs_trace("RTC init session, user=%s, url=%s, encrypt=%u/%u, DTLS(role=%s, version=%s), timeout=%dms, nack=%d",
username.c_str(), r->get_stream_url().c_str(), dtls, srtp, cfg->dtls_role.c_str(), cfg->dtls_version.c_str(),
@ -2666,7 +2704,7 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r)
++_srs_pps_sstuns->sugar;
bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost);
bool strict_check = _srs_config->get_rtc_stun_strict_check(req_->vhost);
if (strict_check && r->get_ice_controlled()) {
// @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1
// TODO: Send 487 (Role Conflict) error response.

@ -23,6 +23,7 @@
#include <srs_app_rtc_dtls.hpp>
#include <srs_service_conn.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_async_call.hpp>
#include <string>
#include <map>
@ -195,6 +196,20 @@ public:
virtual srs_error_t cycle();
};
// the rtc on_stop async call.
class SrsRtcAsyncCallOnStop : public ISrsAsyncCallTask
{
private:
SrsContextId cid;
SrsRequest* req;
public:
SrsRtcAsyncCallOnStop(SrsContextId c, SrsRequest* r);
virtual ~SrsRtcAsyncCallOnStop();
public:
virtual srs_error_t call();
virtual std::string to_string();
};
// A RTC play stream, client pull and play stream from SRS.
class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler
, public ISrsRtcPLIWorkerHandler, public ISrsRtcSourceChangeCallback
@ -264,8 +279,6 @@ private:
// Interface ISrsRtcPLIWorkerHandler
public:
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid);
private:
virtual void http_hooks_on_stop();
};
// A fast timer for publish stream, for RTCP feedback.
@ -294,6 +307,20 @@ private:
srs_error_t on_timer(srs_utime_t interval);
};
// the rtc on_unpublish async call.
class SrsRtcAsyncCallOnUnpublish : public ISrsAsyncCallTask
{
private:
SrsContextId cid;
SrsRequest* req;
public:
SrsRtcAsyncCallOnUnpublish(SrsContextId c, SrsRequest* r);
virtual ~SrsRtcAsyncCallOnUnpublish();
public:
virtual srs_error_t call();
virtual std::string to_string();
};
// A RTC publish stream, client push and publish stream to SRS.
class SrsRtcPublishStream : public ISrsRtspPacketDecodeHandler
, public ISrsRtcPublishStream, public ISrsRtcPLIWorkerHandler
@ -319,7 +346,7 @@ private:
bool request_keyframe_;
SrsErrorPithyPrint* pli_epp;
private:
SrsRequest* req;
SrsRequest* req_;
SrsRtcSource* source;
// Simulators.
int nn_simulate_nack_drop;
@ -377,8 +404,6 @@ private:
SrsRtcVideoRecvTrack* get_video_track(uint32_t ssrc);
void update_rtt(uint32_t ssrc, int rtt);
void update_send_report_time(uint32_t ssrc, const SrsNtp& ntp, uint32_t rtp_time);
private:
virtual void http_hooks_on_unpublish();
};
// Callback for RTC connection.
@ -451,8 +476,7 @@ private:
private:
// For each RTC session, we use a specified cid for debugging logs.
SrsContextId cid_;
// TODO: FIXME: Rename to req_.
SrsRequest* req;
SrsRequest* req_;
SrsSdp remote_sdp;
SrsSdp local_sdp;
private:

@ -243,6 +243,7 @@ SrsRtcServer::SrsRtcServer()
{
handler = NULL;
hijacker = NULL;
async = new SrsAsyncCallWorker();
_srs_config->subscribe(this);
}
@ -258,6 +259,9 @@ SrsRtcServer::~SrsRtcServer()
srs_freep(listener);
}
}
async->stop();
srs_freep(async);
}
srs_error_t SrsRtcServer::initialize()
@ -273,6 +277,8 @@ srs_error_t SrsRtcServer::initialize()
return srs_error_wrap(err, "black hole");
}
async->start();
return err;
}
@ -289,6 +295,11 @@ void SrsRtcServer::set_handler(ISrsRtcServerHandler* h)
void SrsRtcServer::set_hijacker(ISrsRtcServerHijacker* h)
{
hijacker = h;
}
srs_error_t SrsRtcServer::exec_async_work(ISrsAsyncCallTask * t)
{
return async->execute(t);
}
srs_error_t SrsRtcServer::listen_udp()

@ -15,6 +15,7 @@
#include <srs_app_hourglass.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_rtc_sdp.hpp>
#include <srs_app_async_call.hpp>
#include <string>
@ -94,6 +95,7 @@ private:
std::vector<SrsUdpMuxListener*> listeners;
ISrsRtcServerHandler* handler;
ISrsRtcServerHijacker* hijacker;
SrsAsyncCallWorker* async;
public:
SrsRtcServer();
virtual ~SrsRtcServer();
@ -106,6 +108,7 @@ public:
// Set the handler for server events.
void set_handler(ISrsRtcServerHandler* h);
void set_hijacker(ISrsRtcServerHijacker* h);
srs_error_t exec_async_work(ISrsAsyncCallTask* t);
public:
// TODO: FIXME: Support gracefully quit.
// TODO: FIXME: Support reload.

Loading…
Cancel
Save