diff --git a/README.md b/README.md index 46cf4344a..628a135f6 100755 --- a/README.md +++ b/README.md @@ -153,6 +153,8 @@ For previous versions, please read: ## V4 changes +* v4.0, 2020-02-18, For [#1579][bug #1579], support start/final wait for gracefully quit. 4.0.6 +* v4.0, 2020-02-18, For [#1579][bug #1579], support gracefully quit and force to. 4.0.5 * v4.0, 2020-02-13, SRT supports detail config for [DynamicEncoding](https://github.com/runner365/srt_encoder). 4.0.4 * v4.0, 2020-02-04, Update project code. 4.0.3 * v4.0, 2020-01-26, Allow use libsrt.so for SRT is MPL license. 4.0.2 @@ -160,6 +162,10 @@ For previous versions, please read: ## V3 changes +* v3.0, 2020-02-18, For [#1579][bug #1579], support start/final wait for gracefully quit. 3.0.121 +* v3.0, 2020-02-18, For [#1579][bug #1579], support force gracefully quit. 3.0.120 +* v3.0, 2020-02-18, For [#1579][bug #1579], support gracefully quit. 3.0.119 +* v3.0, 2020-02-17, For [#1601][bug #1601], flush async on_dvr/on_hls events before stop. 3.0.118 * v3.0, 2020-02-14, [3.0 beta1(3.0.117)][r3.0b1] released. 121964 lines. * v3.0, 2020-02-14, For [#1595][bug #1595], migrating streaming from ossrs.net to r.ossrs.net. 3.0.117 * v3.0, 2020-02-05, For [#665][bug #665], fix HTTP-FLV reloading bug. 3.0.116 @@ -1672,6 +1678,8 @@ Winlin [bug #1592]: https://github.com/ossrs/srs/issues/1592 [bug #665]: https://github.com/ossrs/srs/issues/665 [bug #1595]: https://github.com/ossrs/srs/issues/1595 +[bug #1601]: https://github.com/ossrs/srs/issues/1601 +[bug #1579]: https://github.com/ossrs/srs/issues/1579 [bug #xxxxxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxxxxx [exo #828]: https://github.com/google/ExoPlayer/pull/828 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index bbfce255f..45fd96d02 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -73,6 +73,23 @@ work_dir ./; # default: off asprocess off; +# For gracefully quit, wait for a while then close listeners, +# because K8S notify SRS with SIGQUIT and update Service simultaneously, +# maybe there is some new connections incoming before Service updated. +# @see https://github.com/ossrs/srs/issues/1595#issuecomment-587516567 +# default: 2300 +grace_start_wait 2300; +# For gracefully quit, final wait for cleanup in milliseconds. +# @see https://github.com/ossrs/srs/issues/1579#issuecomment-587414898 +# default: 3200 +grace_final_wait 3200; +# Whether force gracefully quit, never fast quit. +# By default, SIGTERM which means fast quit, is sent by K8S, so we need to +# force SRS to treat SIGTERM as gracefully quit for gray release or canary. +# @see https://github.com/ossrs/srs/issues/1579#issuecomment-587475077 +# default: off +force_grace_quit off; + ############################################################################################# # heartbeat/stats sections ############################################################################################# diff --git a/trunk/etc/init.d/srs b/trunk/etc/init.d/srs index 7e749044d..0e2afca4b 100755 --- a/trunk/etc/init.d/srs +++ b/trunk/etc/init.d/srs @@ -178,6 +178,18 @@ logrotate() { return 0 } +grace() { + # not start, exit + load_process_info + if [[ 0 -ne $? ]]; then failed_msg "SRS not start."; return 0; fi + + ok_msg "Gracefully quit for SRS(pid ${srs_pid})..." + kill -s SIGQUIT ${srs_pid} + + ok_msg "Gracefully quit" + return 0 +} + menu() { case "$1" in start) @@ -199,10 +211,14 @@ menu() { rotate) logrotate ;; + grace) + grace + ;; *) - echo "Usage: $0 {start|stop|status|restart|reload|rotate}" + echo "Usage: $0 {start|stop|status|restart|reload|rotate|grace}" echo " reload Apply log file by not restarting SRS" echo " rotate For log rotate, to send SIGUSR1 to SRS to reopen the log file." + echo " grace For gracefully quit, to send SIGQUIT to SRS." return 1 ;; esac diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index 42df06b5d..271964d99 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -40,6 +40,7 @@ SrsAsyncCallWorker::SrsAsyncCallWorker() { trd = new SrsDummyCoroutine(); wait = srs_cond_new(); + lock = srs_mutex_new(); } SrsAsyncCallWorker::~SrsAsyncCallWorker() @@ -54,6 +55,7 @@ SrsAsyncCallWorker::~SrsAsyncCallWorker() tasks.clear(); srs_cond_destroy(wait); + srs_mutex_destroy(lock); } srs_error_t SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t) @@ -87,6 +89,7 @@ srs_error_t SrsAsyncCallWorker::start() void SrsAsyncCallWorker::stop() { + flush_tasks(); srs_cond_signal(wait); trd->stop(); } @@ -103,23 +106,40 @@ srs_error_t SrsAsyncCallWorker::cycle() if (tasks.empty()) { srs_cond_wait(wait); } - - std::vector copy = tasks; - tasks.clear(); - - std::vector::iterator it; - for (it = copy.begin(); it != copy.end(); ++it) { - ISrsAsyncCallTask* task = *it; - - if ((err = task->call()) != srs_success) { - srs_warn("ignore task failed %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - srs_freep(task); - } + + flush_tasks(); } return err; } +void SrsAsyncCallWorker::flush_tasks() +{ + srs_error_t err = srs_success; + + // Avoid the async call blocking other coroutines. + std::vector copy; + if (true) { + SrsLocker(lock); + + if (tasks.empty()) { + return; + } + + copy = tasks; + tasks.clear(); + } + + std::vector::iterator it; + for (it = copy.begin(); it != copy.end(); ++it) { + ISrsAsyncCallTask* task = *it; + + if ((err = task->call()) != srs_success) { + srs_warn("ignore task failed %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + srs_freep(task); + } +} + diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp index 4576f312e..7cc183f4a 100644 --- a/trunk/src/app/srs_app_async_call.hpp +++ b/trunk/src/app/srs_app_async_call.hpp @@ -61,6 +61,7 @@ private: protected: std::vector tasks; srs_cond_t wait; + srs_mutex_t lock; public: SrsAsyncCallWorker(); virtual ~SrsAsyncCallWorker(); @@ -73,6 +74,8 @@ public: // Interface ISrsReusableThreadHandler public: virtual srs_error_t cycle(); +private: + virtual void flush_tasks(); }; #endif diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 7a9f86a0a..2a7b98bd3 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3488,7 +3488,8 @@ srs_error_t SrsConfig::check_normal_config() && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" && n != "http_server" && n != "stream_caster" && n != "srt_server" && n != "utc_time" && n != "work_dir" && n != "asprocess" - && n != "ff_log_level" + && n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit" + && n != "grace_start_wait" ) { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str()); } @@ -4064,6 +4065,42 @@ bool SrsConfig::get_asprocess() return SRS_CONF_PERFER_FALSE(conf->arg0()); } +srs_utime_t SrsConfig::get_grace_start_wait() +{ + static srs_utime_t DEFAULT = 2300 * SRS_UTIME_MILLISECONDS; + + SrsConfDirective* conf = root->get("grace_start_wait"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return (srs_utime_t)(::atol(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS); +} + +srs_utime_t SrsConfig::get_grace_final_wait() +{ + static srs_utime_t DEFAULT = 3200 * SRS_UTIME_MILLISECONDS; + + SrsConfDirective* conf = root->get("grace_final_wait"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return (srs_utime_t)(::atol(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS); +} + +bool SrsConfig::is_force_grace_quit() +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = root->get("force_grace_quit"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + vector SrsConfig::get_stream_casters() { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 90a57e0c3..6a58f2ef8 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -468,6 +468,12 @@ public: virtual std::string get_work_dir(); // Whether use asprocess mode. virtual bool get_asprocess(); + // Get the start wait in ms for gracefully quit. + virtual srs_utime_t get_grace_start_wait(); + // Get the final wait in ms for gracefully quit. + virtual srs_utime_t get_grace_final_wait(); + // Whether force to gracefully quit, never fast quit. + virtual bool is_force_grace_quit(); // stream_caster section public: // Get all stream_caster in config file. diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index aca14581a..cb281e7bd 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -740,8 +740,6 @@ srs_error_t SrsDvrSessionPlan::on_publish() void SrsDvrSessionPlan::on_unpublish() { - SrsDvrPlan::on_unpublish(); - // support multiple publish. if (!dvr_enabled) { return; @@ -754,6 +752,10 @@ void SrsDvrSessionPlan::on_unpublish() } dvr_enabled = false; + + // We should notify the on_dvr, then stop the async. + // @see https://github.com/ossrs/srs/issues/1601 + SrsDvrPlan::on_unpublish(); } SrsDvrSegmentPlan::SrsDvrSegmentPlan() @@ -815,14 +817,16 @@ void SrsDvrSegmentPlan::on_unpublish() { srs_error_t err = srs_success; - SrsDvrPlan::on_unpublish(); - if ((err = segment->close()) != srs_success) { srs_warn("ignore err %s", srs_error_desc(err).c_str()); srs_freep(err); } dvr_enabled = false; + + // We should notify the on_dvr, then stop the async. + // @see https://github.com/ossrs/srs/issues/1601 + SrsDvrPlan::on_unpublish(); } srs_error_t SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format) diff --git a/trunk/src/app/srs_app_ffmpeg.cpp b/trunk/src/app/srs_app_ffmpeg.cpp index e9c87a70e..1cde4f5f9 100644 --- a/trunk/src/app/srs_app_ffmpeg.cpp +++ b/trunk/src/app/srs_app_ffmpeg.cpp @@ -230,7 +230,7 @@ srs_error_t SrsFFMPEG::start() params.push_back(ffmpeg); // input params - for (int i = 0; i < iparams.size(); i++) { + for (int i = 0; i < (int)iparams.size(); i++) { string iparam = iparams.at(i); if (!iparam.empty()) { params.push_back(iparam); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 5ffe4b0cd..98bfcd5fc 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -406,7 +406,7 @@ srs_error_t SrsRtmpConn::service_cycle() rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT); rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT); - srs_trace("rtmp: retry for republish"); + srs_info("rtmp: retry for republish"); srs_freep(err); continue; } diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 61eeb2309..e95b4734e 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -392,6 +392,11 @@ srs_error_t SrsSignalManager::start() sa.sa_flags = 0; sigaction(SRS_SIGNAL_RELOAD, &sa, NULL); + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SRS_SIGNAL_FAST_QUIT, &sa, NULL); + sa.sa_handler = SrsSignalManager::sig_catcher; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; @@ -407,8 +412,8 @@ srs_error_t SrsSignalManager::start() sa.sa_flags = 0; sigaction(SRS_SIGNAL_REOPEN_LOG, &sa, NULL); - srs_trace("signal installed, reload=%d, reopen=%d, grace_quit=%d", - SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_GRACEFULLY_QUIT); + srs_trace("signal installed, reload=%d, reopen=%d, fast_quit=%d, grace_quit=%d", + SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_FAST_QUIT, SRS_SIGNAL_GRACEFULLY_QUIT); if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "signal manager"); @@ -465,6 +470,7 @@ SrsServer::SrsServer() signal_reload = false; signal_persistence_config = false; signal_gmc_stop = false; + signal_fast_quit = false; signal_gracefully_quit = false; pid_fd = -1; @@ -533,6 +539,48 @@ void SrsServer::dispose() #endif } +void SrsServer::gracefully_dispose() +{ + _srs_config->unsubscribe(this); + + // Always wait for a while to start. + srs_usleep(_srs_config->get_grace_start_wait()); + srs_trace("start wait for %dms", srsu2msi(_srs_config->get_grace_start_wait())); + + // prevent fresh clients. + close_listeners(SrsListenerRtmpStream); + close_listeners(SrsListenerHttpApi); + close_listeners(SrsListenerHttpStream); + close_listeners(SrsListenerMpegTsOverUdp); + close_listeners(SrsListenerRtsp); + close_listeners(SrsListenerFlv); + + // Fast stop to notify FFMPEG to quit, wait for a while then fast kill. + ingester->stop(); + + // Wait for connections to quit. + // While gracefully quiting, user can requires SRS to fast quit. + int wait_step = 1; + while (!conns.empty() && !signal_fast_quit) { + for (int i = 0; i < wait_step && !conns.empty() && !signal_fast_quit; i++) { + srs_usleep(1000 * SRS_UTIME_MILLISECONDS); + } + + wait_step = (wait_step * 2) % 33; + srs_trace("wait for %d conns to quit", conns.size()); + } + + // dispose the source for hls and dvr. + _srs_sources->dispose(); + +#ifdef SRS_AUTO_MEM_WATCH + srs_memory_report(); +#endif + + srs_usleep(_srs_config->get_grace_final_wait()); + srs_trace("final wait for %dms", srsu2msi(_srs_config->get_grace_final_wait())); +} + srs_error_t SrsServer::initialize(ISrsServerCycle* ch) { srs_error_t err = srs_success; @@ -802,19 +850,33 @@ srs_error_t SrsServer::cycle() srs_warn("sleep a long time for system st-threads to cleanup."); srs_usleep(3 * 1000 * 1000); srs_warn("system quit"); -#else - // normally quit with neccessary cleanup by dispose(). + + return err; +#endif + + // quit normally. srs_warn("main cycle terminated, system quit normally."); - dispose(); + + // fast quit, do some essential cleanup. + if (signal_fast_quit) { + dispose(); + srs_trace("srs disposed"); + } + + // gracefully quit, do carefully cleanup. + if (signal_gracefully_quit) { + gracefully_dispose(); + srs_trace("srs gracefully quit"); + } + srs_trace("srs terminated"); // for valgrind to detect. srs_freep(_srs_config); srs_freep(_srs_log); - + exit(0); -#endif - + return err; } @@ -857,9 +919,22 @@ void SrsServer::on_signal(int signo) #endif #endif } - - if ((signo == SIGINT || signo == SRS_SIGNAL_GRACEFULLY_QUIT) && !signal_gracefully_quit) { - srs_trace("sig=%d, user terminate program, gracefully quit", signo); + + // For K8S, force to gracefully quit for gray release or canary. + // @see https://github.com/ossrs/srs/issues/1595#issuecomment-587473037 + if (signo == SRS_SIGNAL_FAST_QUIT && _srs_config->is_force_grace_quit()) { + srs_trace("force gracefully quit, signo=%d", signo); + signo = SRS_SIGNAL_GRACEFULLY_QUIT; + } + + if ((signo == SIGINT || signo == SRS_SIGNAL_FAST_QUIT) && !signal_fast_quit) { + srs_trace("sig=%d, user terminate program, fast quit", signo); + signal_fast_quit = true; + return; + } + + if (signo == SRS_SIGNAL_GRACEFULLY_QUIT && !signal_gracefully_quit) { + srs_trace("sig=%d, user start gracefully quit", signo); signal_gracefully_quit = true; return; } @@ -904,9 +979,9 @@ srs_error_t SrsServer::do_cycle() return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid()); } - // gracefully quit for SIGINT or SIGTERM. - if (signal_gracefully_quit) { - srs_trace("cleanup for gracefully terminate."); + // gracefully quit for SIGINT or SIGTERM or SIGQUIT. + if (signal_fast_quit || signal_gracefully_quit) { + srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit, signal_gracefully_quit); return err; } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index fd8cbe3ad..28154600e 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -227,6 +227,7 @@ private: bool signal_reload; bool signal_persistence_config; bool signal_gmc_stop; + bool signal_fast_quit; bool signal_gracefully_quit; // Parent pid for asprocess. int ppid; @@ -241,6 +242,9 @@ private: // When SIGTERM, SRS should do cleanup, for example, // to stop all ingesters, cleanup HLS and dvr. virtual void dispose(); + // Close listener to stop accepting new connections, + // then wait and quit when all connections finished. + virtual void gracefully_dispose(); // server startup workflow, @see run_master() public: // Initialize server with callback handler ch. @@ -260,12 +264,13 @@ public: // The signal manager convert signal to io message, // whatever, we will got the signo like the orignal signal(int signo) handler. // @param signo the signal number from user, where: - // SRS_SIGNAL_GRACEFULLY_QUIT, the SIGTERM, dispose then quit. + // SRS_SIGNAL_FAST_QUIT, the SIGTERM, do essential dispose then quit. + // SRS_SIGNAL_GRACEFULLY_QUIT, the SIGQUIT, do careful dispose then quit. // SRS_SIGNAL_REOPEN_LOG, the SIGUSR1, reopen the log file. // SRS_SIGNAL_RELOAD, the SIGHUP, reload the config. // SRS_SIGNAL_PERSISTENCE_CONFIG, application level signal, persistence config to file. // @remark, for SIGINT: - // no gmc, directly exit. + // no gmc, fast quit, do essential dispose then quit. // for gmc, set the variable signal_gmc_stop, the cycle will return and cleanup for gmc. // @remark, maybe the HTTP RAW API will trigger the on_signal() also. virtual void on_signal(int signo); diff --git a/trunk/src/core/srs_core_version3.hpp b/trunk/src/core/srs_core_version3.hpp index c8793a0a8..ac541b5e4 100644 --- a/trunk/src/core/srs_core_version3.hpp +++ b/trunk/src/core/srs_core_version3.hpp @@ -24,6 +24,6 @@ #ifndef SRS_CORE_VERSION3_HPP #define SRS_CORE_VERSION3_HPP -#define SRS_VERSION3_REVISION 117 +#define SRS_VERSION3_REVISION 121 #endif diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 208807ee9..8e9e26875 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -24,6 +24,6 @@ #ifndef SRS_CORE_VERSION4_HPP #define SRS_CORE_VERSION4_HPP -#define SRS_VERSION4_REVISION 4 +#define SRS_VERSION4_REVISION 6 #endif diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index b470c644e..208765ae7 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -124,8 +124,15 @@ #define SRS_SIGNAL_RELOAD SIGHUP // Reopen the log file. #define SRS_SIGNAL_REOPEN_LOG SIGUSR1 -// The signal for srs to gracefully quit, do dispose then exit. -#define SRS_SIGNAL_GRACEFULLY_QUIT SIGTERM +// For gracefully upgrade, start new SRS and gracefully quit old one. +// @see https://github.com/ossrs/srs/issues/1579 +// TODO: Not implemented. +#define SRS_SIGNAL_UPGRADE SIGUSR2 +// The signal for srs to fast quit, do essential dispose then exit. +#define SRS_SIGNAL_FAST_QUIT SIGTERM +// The signal for srs to gracefully quit, do carefully dispose then exit. +// @see https://github.com/ossrs/srs/issues/1579 +#define SRS_SIGNAL_GRACEFULLY_QUIT SIGQUIT // The application level signals. // Persistence the config in memory to config file. @@ -146,7 +153,7 @@ // Downloading speed-up, play to edge, ingest from origin #define SRS_CONSTS_LOG_EDGE_PLAY "EIG" // Uploading speed-up, publish to edge, foward to origin -#define SRS_CONSTS_LOG_EDGE_PUBLISH "EFW" +#define SRS_CONSTS_LOG_EDGE_PUBLISH "EPB" // The edge/origin forwarder. #define SRS_CONSTS_LOG_FOWARDER "FWR" // Play stream on edge/origin.