From c5cd58761c23efd395749943f3e9f9c7af8c9a39 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 19 Sep 2015 13:46:55 +0800 Subject: [PATCH] move the srs thread to st. --- trunk/src/app/srs_app_st.cpp | 224 +++++++++++++++++++++++++++++++ trunk/src/app/srs_app_st.hpp | 131 ++++++++++++++++++ trunk/src/app/srs_app_thread.cpp | 223 ------------------------------ trunk/src/app/srs_app_thread.hpp | 130 ------------------ 4 files changed, 355 insertions(+), 353 deletions(-) diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index be57af3eb..949b46661 100644 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -26,6 +26,230 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +namespace internal +{ + ISrsThreadHandler::ISrsThreadHandler() + { + } + + ISrsThreadHandler::~ISrsThreadHandler() + { + } + + void ISrsThreadHandler::on_thread_start() + { + } + + int ISrsThreadHandler::on_before_cycle() + { + int ret = ERROR_SUCCESS; + return ret; + } + + int ISrsThreadHandler::on_end_cycle() + { + int ret = ERROR_SUCCESS; + return ret; + } + + void ISrsThreadHandler::on_thread_stop() + { + } + + SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable) + { + _name = name; + handler = thread_handler; + cycle_interval_us = interval_us; + + tid = NULL; + loop = false; + really_terminated = true; + _cid = -1; + _joinable = joinable; + disposed = false; + + // in start(), the thread cycle method maybe stop and remove the thread itself, + // and the thread start() is waiting for the _cid, and segment fault then. + // @see https://github.com/simple-rtmp-server/srs/issues/110 + // thread will set _cid, callback on_thread_start(), then wait for the can_run signal. + can_run = false; + } + + SrsThread::~SrsThread() + { + stop(); + } + + int SrsThread::cid() + { + return _cid; + } + + int SrsThread::start() + { + int ret = ERROR_SUCCESS; + + if(tid) { + srs_info("thread %s already running.", _name); + return ret; + } + + if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){ + ret = ERROR_ST_CREATE_CYCLE_THREAD; + srs_error("st_thread_create failed. ret=%d", ret); + return ret; + } + + // we set to loop to true for thread to run. + loop = true; + + // wait for cid to ready, for parent thread to get the cid. + while (_cid < 0 && loop) { + st_usleep(10 * 1000); + } + + // now, cycle thread can run. + can_run = true; + + return ret; + } + + void SrsThread::stop() + { + if (!tid) { + return; + } + + loop = false; + + dispose(); + + tid = NULL; + } + + bool SrsThread::can_loop() + { + return loop; + } + + void SrsThread::stop_loop() + { + loop = false; + } + + void SrsThread::dispose() + { + if (disposed) { + return; + } + + // the interrupt will cause the socket to read/write error, + // which will terminate the cycle thread. + st_thread_interrupt(tid); + + // when joinable, wait util quit. + if (_joinable) { + // wait the thread to exit. + int ret = st_thread_join(tid, NULL); + if (ret) { + srs_warn("core: ignore join thread failed."); + } + } + + // wait the thread actually terminated. + // sometimes the thread join return -1, for example, + // when thread use st_recvfrom, the thread join return -1. + // so here, we use a variable to ensure the thread stopped. + // @remark even the thread not joinable, we must ensure the thread stopped when stop. + while (!really_terminated) { + st_usleep(10 * 1000); + + if (really_terminated) { + break; + } + srs_warn("core: wait thread to actually terminated"); + } + + disposed = true; + } + + void SrsThread::thread_cycle() + { + int ret = ERROR_SUCCESS; + + _srs_context->generate_id(); + srs_info("thread %s cycle start", _name); + + _cid = _srs_context->get_id(); + + srs_assert(handler); + handler->on_thread_start(); + + // thread is running now. + really_terminated = false; + + // wait for cid to ready, for parent thread to get the cid. + while (!can_run && loop) { + st_usleep(10 * 1000); + } + + while (loop) { + if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) { + srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret); + goto failed; + } + srs_info("thread %s on before cycle success"); + + if ((ret = handler->cycle()) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { + srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret); + } + goto failed; + } + srs_info("thread %s cycle success", _name); + + if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) { + srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret); + goto failed; + } + srs_info("thread %s on end cycle success", _name); + + failed: + if (!loop) { + break; + } + + // to improve performance, donot sleep when interval is zero. + // @see: https://github.com/simple-rtmp-server/srs/issues/237 + if (cycle_interval_us != 0) { + st_usleep(cycle_interval_us); + } + } + + // readly terminated now. + really_terminated = true; + + handler->on_thread_stop(); + srs_info("thread %s cycle finished", _name); + + // when thread terminated normally, also disposed. + disposed = true; + } + + void* SrsThread::thread_fun(void* arg) + { + SrsThread* obj = (SrsThread*)arg; + srs_assert(obj); + + obj->thread_cycle(); + + st_thread_exit(NULL); + + return NULL; + } +} + SrsStSocket::SrsStSocket(st_netfd_t client_stfd) { stfd = client_stfd; diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index ef70cd08e..c794e4a32 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -35,6 +35,137 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +// the internal classes, user should never use it. +// user should use the public classes at the bellow: +// @see SrsEndlessThread, SrsOneCycleThread, SrsReusableThread +namespace internal +{ + /** + * the handler for the thread, callback interface. + * the thread model defines as: + * handler->on_thread_start() + * while loop: + * handler->on_before_cycle() + * handler->cycle() + * handler->on_end_cycle() + * if !loop then break for user stop thread. + * sleep(CycleIntervalMilliseconds) + * handler->on_thread_stop() + * when stop, the thread will interrupt the st_thread, + * which will cause the socket to return error and + * terminate the cycle thread. + * + * @remark why should check can_loop() in cycle method? + * when thread interrupt, the socket maybe not got EINT, + * espectially on st_usleep(), so the cycle must check the loop, + * when handler->cycle() has loop itself, for example: + * while (true): + * if (read_from_socket(skt) < 0) break; + * if thread stop when read_from_socket, it's ok, the loop will break, + * but when thread stop interrupt the s_usleep(0), then the loop is + * death loop. + * in a word, the handler->cycle() must: + * while (pthread->can_loop()): + * if (read_from_socket(skt) < 0) break; + * check the loop, then it works. + * + * @remark why should use stop_loop() to terminate thread in itself? + * in the thread itself, that is the cycle method, + * if itself want to terminate the thread, should never use stop(), + * but use stop_loop() to set the loop to false and terminate normally. + * + * @remark when should set the interval_us, and when not? + * the cycle will invoke util cannot loop, eventhough the return code of cycle is error, + * so the interval_us used to sleep for each cycle. + */ + class ISrsThreadHandler + { + public: + ISrsThreadHandler(); + virtual ~ISrsThreadHandler(); + public: + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int cycle() = 0; + virtual int on_end_cycle(); + virtual void on_thread_stop(); + }; + + /** + * provides servies from st_thread_t, + * for common thread usage. + */ + class SrsThread + { + private: + st_thread_t tid; + int _cid; + bool loop; + bool can_run; + bool really_terminated; + bool _joinable; + const char* _name; + bool disposed; + private: + ISrsThreadHandler* handler; + int64_t cycle_interval_us; + public: + /** + * initialize the thread. + * @param name, human readable name for st debug. + * @param thread_handler, the cycle handler for the thread. + * @param interval_us, the sleep interval when cycle finished. + * @param joinable, if joinable, other thread must stop the thread. + * @remark if joinable, thread never quit itself, or memory leak. + * @see: https://github.com/simple-rtmp-server/srs/issues/78 + * @remark about st debug, see st-1.9/README, _st_iterate_threads_flag + */ + /** + * TODO: FIXME: maybe all thread must be reap by others threads, + * @see: https://github.com/simple-rtmp-server/srs/issues/77 + */ + SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable); + virtual ~SrsThread(); + public: + /** + * get the context id. @see: ISrsThreadContext.get_id(). + * used for parent thread to get the id. + * @remark when start thread, parent thread will block and wait for this id ready. + */ + virtual int cid(); + /** + * start the thread, invoke the cycle of handler util + * user stop the thread. + * @remark ignore any error of cycle of handler. + * @remark user can start multiple times, ignore if already started. + * @remark wait for the cid is set by thread pfn. + */ + virtual int start(); + /** + * stop the thread, wait for the thread to terminate. + * @remark user can stop multiple times, ignore if already stopped. + */ + virtual void stop(); + public: + /** + * whether the thread should loop, + * used for handler->cycle() which has a loop method, + * to check this method, break if false. + */ + virtual bool can_loop(); + /** + * for the loop thread to stop the loop. + * other thread can directly use stop() to stop loop and wait for quit. + * this stop loop method only set loop to false. + */ + virtual void stop_loop(); + private: + virtual void dispose(); + virtual void thread_cycle(); + static void* thread_fun(void* arg); + }; +} + /** * the socket provides TCP socket over st, * that is, the sync socket mechanism. diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index e4f5da414..7062e9d18 100644 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -26,229 +26,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -namespace internal { - ISrsThreadHandler::ISrsThreadHandler() - { - } - - ISrsThreadHandler::~ISrsThreadHandler() - { - } - - void ISrsThreadHandler::on_thread_start() - { - } - - int ISrsThreadHandler::on_before_cycle() - { - int ret = ERROR_SUCCESS; - return ret; - } - - int ISrsThreadHandler::on_end_cycle() - { - int ret = ERROR_SUCCESS; - return ret; - } - - void ISrsThreadHandler::on_thread_stop() - { - } - - SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable) - { - _name = name; - handler = thread_handler; - cycle_interval_us = interval_us; - - tid = NULL; - loop = false; - really_terminated = true; - _cid = -1; - _joinable = joinable; - disposed = false; - - // in start(), the thread cycle method maybe stop and remove the thread itself, - // and the thread start() is waiting for the _cid, and segment fault then. - // @see https://github.com/simple-rtmp-server/srs/issues/110 - // thread will set _cid, callback on_thread_start(), then wait for the can_run signal. - can_run = false; - } - - SrsThread::~SrsThread() - { - stop(); - } - - int SrsThread::cid() - { - return _cid; - } - - int SrsThread::start() - { - int ret = ERROR_SUCCESS; - - if(tid) { - srs_info("thread %s already running.", _name); - return ret; - } - - if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){ - ret = ERROR_ST_CREATE_CYCLE_THREAD; - srs_error("st_thread_create failed. ret=%d", ret); - return ret; - } - - // we set to loop to true for thread to run. - loop = true; - - // wait for cid to ready, for parent thread to get the cid. - while (_cid < 0 && loop) { - st_usleep(10 * 1000); - } - - // now, cycle thread can run. - can_run = true; - - return ret; - } - - void SrsThread::stop() - { - if (!tid) { - return; - } - - loop = false; - - dispose(); - - tid = NULL; - } - - bool SrsThread::can_loop() - { - return loop; - } - - void SrsThread::stop_loop() - { - loop = false; - } - - void SrsThread::dispose() - { - if (disposed) { - return; - } - - // the interrupt will cause the socket to read/write error, - // which will terminate the cycle thread. - st_thread_interrupt(tid); - - // when joinable, wait util quit. - if (_joinable) { - // wait the thread to exit. - int ret = st_thread_join(tid, NULL); - if (ret) { - srs_warn("core: ignore join thread failed."); - } - } - - // wait the thread actually terminated. - // sometimes the thread join return -1, for example, - // when thread use st_recvfrom, the thread join return -1. - // so here, we use a variable to ensure the thread stopped. - // @remark even the thread not joinable, we must ensure the thread stopped when stop. - while (!really_terminated) { - st_usleep(10 * 1000); - - if (really_terminated) { - break; - } - srs_warn("core: wait thread to actually terminated"); - } - - disposed = true; - } - - void SrsThread::thread_cycle() - { - int ret = ERROR_SUCCESS; - - _srs_context->generate_id(); - srs_info("thread %s cycle start", _name); - - _cid = _srs_context->get_id(); - - srs_assert(handler); - handler->on_thread_start(); - - // thread is running now. - really_terminated = false; - - // wait for cid to ready, for parent thread to get the cid. - while (!can_run && loop) { - st_usleep(10 * 1000); - } - - while (loop) { - if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) { - srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret); - goto failed; - } - srs_info("thread %s on before cycle success"); - - if ((ret = handler->cycle()) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { - srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret); - } - goto failed; - } - srs_info("thread %s cycle success", _name); - - if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) { - srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret); - goto failed; - } - srs_info("thread %s on end cycle success", _name); - - failed: - if (!loop) { - break; - } - - // to improve performance, donot sleep when interval is zero. - // @see: https://github.com/simple-rtmp-server/srs/issues/237 - if (cycle_interval_us != 0) { - st_usleep(cycle_interval_us); - } - } - - // readly terminated now. - really_terminated = true; - - handler->on_thread_stop(); - srs_info("thread %s cycle finished", _name); - - // when thread terminated normally, also disposed. - disposed = true; - } - - void* SrsThread::thread_fun(void* arg) - { - SrsThread* obj = (SrsThread*)arg; - srs_assert(obj); - - obj->thread_cycle(); - - st_thread_exit(NULL); - - return NULL; - } -} - ISrsEndlessThreadHandler::ISrsEndlessThreadHandler() { } diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index d00d94aa5..a4307e099 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -31,136 +31,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include -// the internal classes, user should never use it. -// user should use the public classes at the bellow: -// @see SrsEndlessThread, SrsOneCycleThread, SrsReusableThread -namespace internal { - /** - * the handler for the thread, callback interface. - * the thread model defines as: - * handler->on_thread_start() - * while loop: - * handler->on_before_cycle() - * handler->cycle() - * handler->on_end_cycle() - * if !loop then break for user stop thread. - * sleep(CycleIntervalMilliseconds) - * handler->on_thread_stop() - * when stop, the thread will interrupt the st_thread, - * which will cause the socket to return error and - * terminate the cycle thread. - * - * @remark why should check can_loop() in cycle method? - * when thread interrupt, the socket maybe not got EINT, - * espectially on st_usleep(), so the cycle must check the loop, - * when handler->cycle() has loop itself, for example: - * while (true): - * if (read_from_socket(skt) < 0) break; - * if thread stop when read_from_socket, it's ok, the loop will break, - * but when thread stop interrupt the s_usleep(0), then the loop is - * death loop. - * in a word, the handler->cycle() must: - * while (pthread->can_loop()): - * if (read_from_socket(skt) < 0) break; - * check the loop, then it works. - * - * @remark why should use stop_loop() to terminate thread in itself? - * in the thread itself, that is the cycle method, - * if itself want to terminate the thread, should never use stop(), - * but use stop_loop() to set the loop to false and terminate normally. - * - * @remark when should set the interval_us, and when not? - * the cycle will invoke util cannot loop, eventhough the return code of cycle is error, - * so the interval_us used to sleep for each cycle. - */ - class ISrsThreadHandler - { - public: - ISrsThreadHandler(); - virtual ~ISrsThreadHandler(); - public: - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int cycle() = 0; - virtual int on_end_cycle(); - virtual void on_thread_stop(); - }; - - /** - * provides servies from st_thread_t, - * for common thread usage. - */ - class SrsThread - { - private: - st_thread_t tid; - int _cid; - bool loop; - bool can_run; - bool really_terminated; - bool _joinable; - const char* _name; - bool disposed; - private: - ISrsThreadHandler* handler; - int64_t cycle_interval_us; - public: - /** - * initialize the thread. - * @param name, human readable name for st debug. - * @param thread_handler, the cycle handler for the thread. - * @param interval_us, the sleep interval when cycle finished. - * @param joinable, if joinable, other thread must stop the thread. - * @remark if joinable, thread never quit itself, or memory leak. - * @see: https://github.com/simple-rtmp-server/srs/issues/78 - * @remark about st debug, see st-1.9/README, _st_iterate_threads_flag - */ - /** - * TODO: FIXME: maybe all thread must be reap by others threads, - * @see: https://github.com/simple-rtmp-server/srs/issues/77 - */ - SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable); - virtual ~SrsThread(); - public: - /** - * get the context id. @see: ISrsThreadContext.get_id(). - * used for parent thread to get the id. - * @remark when start thread, parent thread will block and wait for this id ready. - */ - virtual int cid(); - /** - * start the thread, invoke the cycle of handler util - * user stop the thread. - * @remark ignore any error of cycle of handler. - * @remark user can start multiple times, ignore if already started. - * @remark wait for the cid is set by thread pfn. - */ - virtual int start(); - /** - * stop the thread, wait for the thread to terminate. - * @remark user can stop multiple times, ignore if already stopped. - */ - virtual void stop(); - public: - /** - * whether the thread should loop, - * used for handler->cycle() which has a loop method, - * to check this method, break if false. - */ - virtual bool can_loop(); - /** - * for the loop thread to stop the loop. - * other thread can directly use stop() to stop loop and wait for quit. - * this stop loop method only set loop to false. - */ - virtual void stop_loop(); - private: - virtual void dispose(); - virtual void thread_cycle(); - static void* thread_fun(void* arg); - }; -} - /** * the endless thread is a loop thread never quit. * user can create thread always running util server terminate.