From a7feedabc618dff8ff839263c175327654f04f7b Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Fri, 27 Aug 2021 07:44:19 +0800 Subject: [PATCH] Use SrsAsyncCallWorker in http hooks instead, to covert to async call. (#2542) * Use SrsAsyncCallWorker in http hooks instead, to covert to async call. * delete invalid function --- trunk/src/app/srs_app_rtc_conn.cpp | 196 ++++++++++++++++----------- trunk/src/app/srs_app_rtc_conn.hpp | 38 +++++- trunk/src/app/srs_app_rtc_server.cpp | 11 ++ trunk/src/app/srs_app_rtc_server.hpp | 3 + 4 files changed, 162 insertions(+), 86 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index d9b9552a1..1d4bd742b 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -351,6 +351,53 @@ srs_error_t SrsRtcPLIWorker::cycle() return err; } + +SrsRtcAsyncCallOnStop::SrsRtcAsyncCallOnStop(SrsContextId c, SrsRequest * r) +{ + cid = c; + req = r->copy(); +} + +SrsRtcAsyncCallOnStop::~SrsRtcAsyncCallOnStop() +{ + srs_freep(req); +} + +srs_error_t SrsRtcAsyncCallOnStop::call() +{ + srs_error_t err = srs_success; + + if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { + return err; + } + + // the http hooks will cause context switch, + // so we must copy all hooks for the on_connect may freed. + // @see https://github.com/ossrs/srs/issues/475 + vector hooks; + + if (true) { + SrsConfDirective* conf = _srs_config->get_vhost_on_stop(req->vhost); + + if (!conf) { + return err; + } + + hooks = conf->args; + } + + for (int i = 0; i < (int)hooks.size(); i++) { + std::string url = hooks.at(i); + SrsHttpHooks::on_stop(url, req); + } + + return err; +} + +std::string SrsRtcAsyncCallOnStop::to_string() +{ + return std::string(""); +} SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) { @@ -379,9 +426,8 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) SrsRtcPlayStream::~SrsRtcPlayStream() { - // TODO: FIXME: Use SrsAsyncCallWorker in http hooks instead, to covert to async call. if (req_) { - http_hooks_on_stop(); + session_->server_->exec_async_work(new SrsRtcAsyncCallOnStop(cid_, req_)); } // TODO: FIXME: Should not do callback in de-constructor? @@ -871,35 +917,6 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci return err; } -void SrsRtcPlayStream::http_hooks_on_stop() -{ - if (!_srs_config->get_vhost_http_hooks_enabled(req_->vhost)) { - return; - } - - // the http hooks will cause context switch, - // so we must copy all hooks for the on_connect may freed. - // @see https://github.com/ossrs/srs/issues/475 - vector hooks; - - if (true) { - SrsConfDirective* conf = _srs_config->get_vhost_on_stop(req_->vhost); - - if (!conf) { - return; - } - - hooks = conf->args; - } - - for (int i = 0; i < (int)hooks.size(); i++) { - std::string url = hooks.at(i); - SrsHttpHooks::on_stop(url, req_); - } - - return; -} - SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream* p) : p_(p) { _srs_hybrid->timer1s()->subscribe(this); @@ -979,6 +996,54 @@ srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval) return err; } + +SrsRtcAsyncCallOnUnpublish::SrsRtcAsyncCallOnUnpublish(SrsContextId c, SrsRequest * r) +{ + cid = c; + req = r->copy(); +} + +SrsRtcAsyncCallOnUnpublish::~SrsRtcAsyncCallOnUnpublish() +{ + srs_freep(req); +} + +srs_error_t SrsRtcAsyncCallOnUnpublish::call() +{ + srs_error_t err = srs_success; + + if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { + return err; + } + + // the http hooks will cause context switch, + // so we must copy all hooks for the on_connect may freed. + // @see https://github.com/ossrs/srs/issues/475 + vector hooks; + + if (true) { + SrsConfDirective* conf = _srs_config->get_vhost_on_unpublish(req->vhost); + + if (!conf) { + return err; + } + + hooks = conf->args; + } + + for (int i = 0; i < (int)hooks.size(); i++) { + std::string url = hooks.at(i); + SrsHttpHooks::on_unpublish(url, req); + } + + return err; +} + +std::string SrsRtcAsyncCallOnUnpublish::to_string() +{ + return std::string(""); +} + SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid) { cid_ = cid; @@ -988,7 +1053,7 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon pli_epp = new SrsErrorPithyPrint(); twcc_epp_ = new SrsErrorPithyPrint(3.0); - req = NULL; + req_ = NULL; source = NULL; nn_simulate_nack_drop = 0; nack_enabled_ = false; @@ -1009,8 +1074,8 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon SrsRtcPublishStream::~SrsRtcPublishStream() { - if (req) { - http_hooks_on_unpublish(); + if (req_) { + session_->server_->exec_async_work(new SrsRtcAsyncCallOnUnpublish(cid_, req_)); } srs_freep(timer_rtcp_); @@ -1027,7 +1092,7 @@ SrsRtcPublishStream::~SrsRtcPublishStream() // it must be called after source stream unpublish (set source stream is_created=false). // if not, it lead to republish failed. if (_srs_rtc_hijacker) { - _srs_rtc_hijacker->on_stop_publish(session_, this, req); + _srs_rtc_hijacker->on_stop_publish(session_, this, req_); } for (int i = 0; i < (int)video_tracks_.size(); ++i) { @@ -1045,7 +1110,7 @@ SrsRtcPublishStream::~SrsRtcPublishStream() srs_freep(pli_worker_); srs_freep(twcc_epp_); srs_freep(pli_epp); - srs_freep(req); + srs_freep(req_); // update the statistic when client coveried. SrsStatistic* stat = SrsStatistic::instance(); @@ -1056,7 +1121,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti { srs_error_t err = srs_success; - req = r->copy(); + req_ = r->copy(); if (stream_desc->audio_track_desc_) { audio_tracks_.push_back(new SrsRtcAudioRecvTrack(session_, stream_desc->audio_track_desc_)); @@ -1083,10 +1148,10 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti rtcp_twcc_.set_media_ssrc(media_ssrc); } - nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost); - nack_no_copy_ = _srs_config->get_rtc_nack_no_copy(req->vhost); - pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(req->vhost); - twcc_enabled_ = _srs_config->get_rtc_twcc_enabled(req->vhost); + nack_enabled_ = _srs_config->get_rtc_nack_enabled(req_->vhost); + nack_no_copy_ = _srs_config->get_rtc_nack_no_copy(req_->vhost); + pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(req_->vhost); + twcc_enabled_ = _srs_config->get_rtc_twcc_enabled(req_->vhost); // No TWCC when negotiate, disable it. if (twcc_id <= 0) { @@ -1107,14 +1172,14 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti } // Setup the publish stream in source to enable PLI as such. - if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { + if ((err = _srs_rtc_sources->fetch_or_create(req_, &source)) != srs_success) { return srs_error_wrap(err, "create source"); } source->set_publish_stream(this); // Bridge to rtmp #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) - bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req->vhost); + bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost); if (rtc_to_rtmp) { SrsLiveSource *rtmp = NULL; if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) { @@ -1160,14 +1225,14 @@ srs_error_t SrsRtcPublishStream::start() } if (_srs_rtc_hijacker) { - if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) { + if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req_)) != srs_success) { return srs_error_wrap(err, "on start publish"); } } // update the statistic when client discoveried. SrsStatistic* stat = SrsStatistic::instance(); - if ((err = stat->on_client(cid_.c_str(), req, session_, SrsRtcConnPublish)) != srs_success) { + if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPublish)) != srs_success) { return srs_error_wrap(err, "rtc: stat client"); } @@ -1385,7 +1450,7 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuff } if (_srs_rtc_hijacker) { - if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req, pkt)) != srs_success) { + if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req_, pkt)) != srs_success) { return srs_error_wrap(err, "on rtp packet"); } } @@ -1677,33 +1742,6 @@ void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp& n } } -void SrsRtcPublishStream::http_hooks_on_unpublish() -{ - if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { - return; - } - - // the http hooks will cause context switch, - // so we must copy all hooks for the on_connect may freed. - // @see https://github.com/ossrs/srs/issues/475 - vector hooks; - - if (true) { - SrsConfDirective* conf = _srs_config->get_vhost_on_unpublish(req->vhost); - - if (!conf) { - return; - } - - hooks = conf->args; - } - - for (int i = 0; i < (int)hooks.size(); i++) { - std::string url = hooks.at(i); - SrsHttpHooks::on_unpublish(url, req); - } -} - ISrsRtcConnectionHijacker::ISrsRtcConnectionHijacker() { } @@ -1753,7 +1791,7 @@ srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval) SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) { - req = NULL; + req_ = NULL; cid_ = cid; hijacker_ = NULL; @@ -1820,7 +1858,7 @@ SrsRtcConnection::~SrsRtcConnection() srs_freep(cache_buffer_); srs_freep(transport_); - srs_freep(req); + srs_freep(req_); srs_freep(pp_address_change); srs_freep(pli_epp); } @@ -2019,7 +2057,7 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st srs_error_t err = srs_success; username_ = username; - req = r->copy(); + req_ = r->copy(); if (!srtp) { srs_freep(transport_); @@ -2036,10 +2074,10 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st } // TODO: FIXME: Support reload. - session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost); + session_timeout = _srs_config->get_rtc_stun_timeout(req_->vhost); last_stun_time = srs_get_system_time(); - nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost); + nack_enabled_ = _srs_config->get_rtc_nack_enabled(req_->vhost); srs_trace("RTC init session, user=%s, url=%s, encrypt=%u/%u, DTLS(role=%s, version=%s), timeout=%dms, nack=%d", username.c_str(), r->get_stream_url().c_str(), dtls, srtp, cfg->dtls_role.c_str(), cfg->dtls_version.c_str(), @@ -2666,7 +2704,7 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) ++_srs_pps_sstuns->sugar; - bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost); + bool strict_check = _srs_config->get_rtc_stun_strict_check(req_->vhost); if (strict_check && r->get_ice_controlled()) { // @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1 // TODO: Send 487 (Role Conflict) error response. diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index ae968ba75..17e5a7155 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -195,6 +196,20 @@ public: virtual srs_error_t cycle(); }; +// the rtc on_stop async call. +class SrsRtcAsyncCallOnStop : public ISrsAsyncCallTask +{ +private: + SrsContextId cid; + SrsRequest* req; +public: + SrsRtcAsyncCallOnStop(SrsContextId c, SrsRequest* r); + virtual ~SrsRtcAsyncCallOnStop(); +public: + virtual srs_error_t call(); + virtual std::string to_string(); +}; + // A RTC play stream, client pull and play stream from SRS. class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler , public ISrsRtcPLIWorkerHandler, public ISrsRtcSourceChangeCallback @@ -264,8 +279,6 @@ private: // Interface ISrsRtcPLIWorkerHandler public: virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); -private: - virtual void http_hooks_on_stop(); }; // A fast timer for publish stream, for RTCP feedback. @@ -294,6 +307,20 @@ private: srs_error_t on_timer(srs_utime_t interval); }; +// the rtc on_unpublish async call. +class SrsRtcAsyncCallOnUnpublish : public ISrsAsyncCallTask +{ +private: + SrsContextId cid; + SrsRequest* req; +public: + SrsRtcAsyncCallOnUnpublish(SrsContextId c, SrsRequest* r); + virtual ~SrsRtcAsyncCallOnUnpublish(); +public: + virtual srs_error_t call(); + virtual std::string to_string(); +}; + // A RTC publish stream, client push and publish stream to SRS. class SrsRtcPublishStream : public ISrsRtspPacketDecodeHandler , public ISrsRtcPublishStream, public ISrsRtcPLIWorkerHandler @@ -319,7 +346,7 @@ private: bool request_keyframe_; SrsErrorPithyPrint* pli_epp; private: - SrsRequest* req; + SrsRequest* req_; SrsRtcSource* source; // Simulators. int nn_simulate_nack_drop; @@ -377,8 +404,6 @@ private: SrsRtcVideoRecvTrack* get_video_track(uint32_t ssrc); void update_rtt(uint32_t ssrc, int rtt); void update_send_report_time(uint32_t ssrc, const SrsNtp& ntp, uint32_t rtp_time); -private: - virtual void http_hooks_on_unpublish(); }; // Callback for RTC connection. @@ -451,8 +476,7 @@ private: private: // For each RTC session, we use a specified cid for debugging logs. SrsContextId cid_; - // TODO: FIXME: Rename to req_. - SrsRequest* req; + SrsRequest* req_; SrsSdp remote_sdp; SrsSdp local_sdp; private: diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 62b9a418c..8a77428d9 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -243,6 +243,7 @@ SrsRtcServer::SrsRtcServer() { handler = NULL; hijacker = NULL; + async = new SrsAsyncCallWorker(); _srs_config->subscribe(this); } @@ -258,6 +259,9 @@ SrsRtcServer::~SrsRtcServer() srs_freep(listener); } } + + async->stop(); + srs_freep(async); } srs_error_t SrsRtcServer::initialize() @@ -273,6 +277,8 @@ srs_error_t SrsRtcServer::initialize() return srs_error_wrap(err, "black hole"); } + async->start(); + return err; } @@ -289,6 +295,11 @@ void SrsRtcServer::set_handler(ISrsRtcServerHandler* h) void SrsRtcServer::set_hijacker(ISrsRtcServerHijacker* h) { hijacker = h; +} + +srs_error_t SrsRtcServer::exec_async_work(ISrsAsyncCallTask * t) +{ + return async->execute(t); } srs_error_t SrsRtcServer::listen_udp() diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index d90ed6e7c..f138e4638 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include @@ -94,6 +95,7 @@ private: std::vector listeners; ISrsRtcServerHandler* handler; ISrsRtcServerHijacker* hijacker; + SrsAsyncCallWorker* async; public: SrsRtcServer(); virtual ~SrsRtcServer(); @@ -106,6 +108,7 @@ public: // Set the handler for server events. void set_handler(ISrsRtcServerHandler* h); void set_hijacker(ISrsRtcServerHijacker* h); + srs_error_t exec_async_work(ISrsAsyncCallTask* t); public: // TODO: FIXME: Support gracefully quit. // TODO: FIXME: Support reload.