From cfc0877ec9ff4fe7826a24a76f7a109fe1210f8f Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 23 May 2015 09:49:15 +0800 Subject: [PATCH] refine the thread, add all callback. --- trunk/src/app/srs_app_edge.cpp | 4 +- trunk/src/app/srs_app_edge.hpp | 12 +- trunk/src/app/srs_app_forward.cpp | 2 +- trunk/src/app/srs_app_forward.hpp | 6 +- trunk/src/app/srs_app_listener.cpp | 36 +++-- trunk/src/app/srs_app_recv_thread.cpp | 12 +- trunk/src/app/srs_app_recv_thread.hpp | 7 +- trunk/src/app/srs_app_thread.cpp | 183 +++++++++++++++++++++++++- trunk/src/app/srs_app_thread.hpp | 126 ++++++++++++++++-- 9 files changed, 336 insertions(+), 52 deletions(-) diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 57dfa5cb6..c95480c15 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -70,7 +70,7 @@ SrsEdgeIngester::SrsEdgeIngester() origin_index = 0; stream_id = 0; stfd = NULL; - pthread = new SrsReusableThread("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); + pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); } SrsEdgeIngester::~SrsEdgeIngester() @@ -397,7 +397,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() origin_index = 0; stream_id = 0; stfd = NULL; - pthread = new SrsReusableThread("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); + pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); queue = new SrsMessageQueue(); send_error_code = ERROR_SUCCESS; } diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index a2d4fd7cc..aa0cfc82d 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -75,7 +75,7 @@ enum SrsEdgeUserState /** * edge used to ingest stream from origin. */ -class SrsEdgeIngester : public ISrsReusableThreadHandler +class SrsEdgeIngester : public ISrsReusableThread2Handler { private: int stream_id; @@ -83,7 +83,7 @@ private: SrsSource* _source; SrsPlayEdge* _edge; SrsRequest* _req; - SrsReusableThread* pthread; + SrsReusableThread2* pthread; st_netfd_t stfd; ISrsProtocolReaderWriter* io; SrsKbps* kbps; @@ -96,7 +96,7 @@ public: virtual int initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req); virtual int start(); virtual void stop(); -// interface ISrsReusableThreadHandler +// interface ISrsReusableThread2Handler public: virtual int cycle(); private: @@ -110,7 +110,7 @@ private: /** * edge used to forward stream to origin. */ -class SrsEdgeForwarder : public ISrsReusableThreadHandler +class SrsEdgeForwarder : public ISrsReusableThread2Handler { private: int stream_id; @@ -118,7 +118,7 @@ private: SrsSource* _source; SrsPublishEdge* _edge; SrsRequest* _req; - SrsReusableThread* pthread; + SrsReusableThread2* pthread; st_netfd_t stfd; ISrsProtocolReaderWriter* io; SrsKbps* kbps; @@ -144,7 +144,7 @@ public: virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); virtual int start(); virtual void stop(); -// interface ISrsReusableThreadHandler +// interface ISrsReusableThread2Handler public: virtual int cycle(); public: diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 986799057..4e68c0e9f 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -59,7 +59,7 @@ SrsForwarder::SrsForwarder(SrsSource* _source) kbps = new SrsKbps(); stream_id = 0; - pthread = new SrsReusableThread("forward", this, SRS_FORWARDER_SLEEP_US); + pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US); queue = new SrsMessageQueue(); jitter = new SrsRtmpJitter(); diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index f57060fa4..68b4314ab 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -48,7 +48,7 @@ class SrsKbps; * forward the stream to other servers. */ // TODO: FIXME: refine the error log, comments it. -class SrsForwarder : public ISrsReusableThreadHandler +class SrsForwarder : public ISrsReusableThread2Handler { private: // the ep to forward, server[:port]. @@ -57,7 +57,7 @@ private: int stream_id; private: st_netfd_t stfd; - SrsReusableThread* pthread; + SrsReusableThread2* pthread; private: SrsSource* source; ISrsProtocolReaderWriter* io; @@ -95,7 +95,7 @@ public: * @param shared_video, directly ptr, copy it if need to save it. */ virtual int on_video(SrsSharedPtrMessage* shared_video); -// interface ISrsReusableThreadHandler. +// interface ISrsReusableThread2Handler. public: virtual int cycle(); private: diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index bc6626d11..42d77414f 100644 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -156,26 +156,24 @@ int SrsUdpListener::listen() int SrsUdpListener::cycle() { int ret = ERROR_SUCCESS; + + // TODO: FIXME: support ipv6, @see man 7 ipv6 + sockaddr_in from; + int nb_from = sizeof(sockaddr_in); + int nread = 0; + + if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { + srs_warn("ignore recv udp packet failed, nread=%d", nread); + return ret; + } - while (!pthread->interrupted()) { - // TODO: FIXME: support ipv6, @see man 7 ipv6 - sockaddr_in from; - int nb_from = sizeof(sockaddr_in); - int nread = 0; - - if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { - srs_warn("ignore recv udp packet failed, nread=%d", nread); - continue; - } - - if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) { - srs_warn("handle udp packet failed. ret=%d", ret); - continue; - } - - if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { - st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); - } + if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) { + srs_warn("handle udp packet failed. ret=%d", ret); + return ret; + } + + if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { + st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); } return ret; diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 05352e16f..dad1a7823 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -50,7 +50,7 @@ SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtm timeout = timeout_ms; handler = msg_handler; rtmp = rtmp_sdk; - trd = new SrsReusableThread("recv", this); + trd = new SrsReusableThread2("recv", this); } SrsRecvThread::~SrsRecvThread() @@ -72,6 +72,11 @@ void SrsRecvThread::stop() trd->stop(); } +void SrsRecvThread::stop_loop() +{ + trd->interrupt(); +} + int SrsRecvThread::cycle() { int ret = ERROR_SUCCESS; @@ -109,11 +114,6 @@ int SrsRecvThread::cycle() return ret; } -void SrsRecvThread::stop_loop() -{ - trd->interrupt(); -} - void SrsRecvThread::on_thread_start() { // the multiple messages writev improve performance large, diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index e99ca1666..7d0271e28 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -79,10 +79,10 @@ public: /** * the recv thread, use message handler to handle each received message. */ -class SrsRecvThread : public ISrsReusableThreadHandler +class SrsRecvThread : public ISrsReusableThread2Handler { protected: - SrsReusableThread* trd; + SrsReusableThread2* trd; ISrsMessageHandler* handler; SrsRtmpServer* rtmp; int timeout; @@ -92,9 +92,10 @@ public: public: virtual int start(); virtual void stop(); - virtual int cycle(); virtual void stop_loop(); +// interface ISrsReusableThread2Handler public: + virtual int cycle(); virtual void on_thread_start(); virtual void on_thread_stop(); }; diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index c4a004a96..dbc09ee0d 100644 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -239,6 +239,24 @@ ISrsEndlessThreadHandler::~ISrsEndlessThreadHandler() { } +void ISrsEndlessThreadHandler::on_thread_start() +{ +} + +int ISrsEndlessThreadHandler::on_before_cycle() +{ + return ERROR_SUCCESS; +} + +int ISrsEndlessThreadHandler::on_end_cycle() +{ + return ERROR_SUCCESS; +} + +void ISrsEndlessThreadHandler::on_thread_stop() +{ +} + SrsEndlessThread::SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h) { handler = h; @@ -261,6 +279,26 @@ int SrsEndlessThread::cycle() return handler->cycle(); } +void SrsEndlessThread::on_thread_start() +{ + handler->on_thread_start(); +} + +int SrsEndlessThread::on_before_cycle() +{ + return handler->on_before_cycle(); +} + +int SrsEndlessThread::on_end_cycle() +{ + return handler->on_end_cycle(); +} + +void SrsEndlessThread::on_thread_stop() +{ + handler->on_thread_stop(); +} + ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler() { } @@ -269,6 +307,20 @@ ISrsOneCycleThreadHandler::~ISrsOneCycleThreadHandler() { } +void ISrsOneCycleThreadHandler::on_thread_start() +{ +} + +int ISrsOneCycleThreadHandler::on_before_cycle() +{ + return ERROR_SUCCESS; +} + +int ISrsOneCycleThreadHandler::on_end_cycle() +{ + return ERROR_SUCCESS; +} + void ISrsOneCycleThreadHandler::on_thread_stop() { } @@ -297,6 +349,21 @@ int SrsOneCycleThread::cycle() return ret; } +void SrsOneCycleThread::on_thread_start() +{ + handler->on_thread_start(); +} + +int SrsOneCycleThread::on_before_cycle() +{ + return handler->on_before_cycle(); +} + +int SrsOneCycleThread::on_end_cycle() +{ + return handler->on_end_cycle(); +} + void SrsOneCycleThread::on_thread_stop() { handler->on_thread_stop(); @@ -310,6 +377,20 @@ ISrsReusableThreadHandler::~ISrsReusableThreadHandler() { } +void ISrsReusableThreadHandler::on_thread_start() +{ +} + +int ISrsReusableThreadHandler::on_before_cycle() +{ + return ERROR_SUCCESS; +} + +int ISrsReusableThreadHandler::on_end_cycle() +{ + return ERROR_SUCCESS; +} + void ISrsReusableThreadHandler::on_thread_stop() { } @@ -341,22 +422,116 @@ int SrsReusableThread::cid() return pthread->cid(); } -void SrsReusableThread::interrupt() +int SrsReusableThread::cycle() +{ + return handler->cycle(); +} + +void SrsReusableThread::on_thread_start() +{ + handler->on_thread_start(); +} + +int SrsReusableThread::on_before_cycle() +{ + return handler->on_before_cycle(); +} + +int SrsReusableThread::on_end_cycle() +{ + return handler->on_end_cycle(); +} + +void SrsReusableThread::on_thread_stop() +{ + handler->on_thread_stop(); +} + +ISrsReusableThread2Handler::ISrsReusableThread2Handler() +{ +} + +ISrsReusableThread2Handler::~ISrsReusableThread2Handler() +{ +} + +void ISrsReusableThread2Handler::on_thread_start() +{ +} + +int ISrsReusableThread2Handler::on_before_cycle() +{ + return ERROR_SUCCESS; +} + +int ISrsReusableThread2Handler::on_end_cycle() +{ + return ERROR_SUCCESS; +} + +void ISrsReusableThread2Handler::on_thread_stop() +{ +} + +SrsReusableThread2::SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t interval_us) +{ + handler = h; + pthread = new internal::SrsThread(n, this, interval_us, true); +} + +SrsReusableThread2::~SrsReusableThread2() +{ + pthread->stop(); + srs_freep(pthread); +} + +int SrsReusableThread2::start() +{ + return pthread->start(); +} + +void SrsReusableThread2::stop() +{ + pthread->stop(); +} + +int SrsReusableThread2::cid() +{ + return pthread->cid(); +} + +void SrsReusableThread2::interrupt() { pthread->stop_loop(); } -bool SrsReusableThread::interrupted() +bool SrsReusableThread2::interrupted() { return !pthread->can_loop(); } -int SrsReusableThread::cycle() +int SrsReusableThread2::cycle() { return handler->cycle(); } -void SrsReusableThread::on_thread_stop() +void SrsReusableThread2::on_thread_start() +{ + handler->on_thread_start(); +} + +int SrsReusableThread2::on_before_cycle() +{ + return handler->on_before_cycle(); +} + +int SrsReusableThread2::on_end_cycle() +{ + return handler->on_end_cycle(); +} + +void SrsReusableThread2::on_thread_stop() { handler->on_thread_stop(); } + diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index d6e399c10..1974cff22 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -184,6 +184,15 @@ public: * @remark user must use block method in cycle method, for example, sleep or socket io. */ virtual int cycle() = 0; +public: + /** + * other callback for handler. + * @remark all callback is optional, handler can ignore it. + */ + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int on_end_cycle(); + virtual void on_thread_stop(); }; class SrsEndlessThread : public internal::ISrsThreadHandler { @@ -201,6 +210,10 @@ public: // interface internal::ISrsThreadHandler public: virtual int cycle(); + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int on_end_cycle(); + virtual void on_thread_stop(); }; /** @@ -236,10 +249,14 @@ public: * the cycle method for the one cycle thread. */ virtual int cycle() = 0; +public: /** - * when thread stop, the handler can do cleanup. - * @remark this method is optional, handler can ignore it. + * other callback for handler. + * @remark all callback is optional, handler can ignore it. */ + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int on_end_cycle(); virtual void on_thread_stop(); }; class SrsOneCycleThread : public internal::ISrsThreadHandler @@ -258,6 +275,9 @@ public: // interface internal::ISrsThreadHandler public: virtual int cycle(); + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int on_end_cycle(); virtual void on_thread_stop(); }; @@ -266,11 +286,11 @@ public: * user can create thread and stop then start again and again, * generally must provides a start and stop method, @see SrsIngester. * the step to create a thread stop by other thread: - * 1. create SrsThread field, with joinable true. - * 2. must use stop to stop and join the thread. + * 1. create SrsReusableThread field. + * 2. must manually stop the thread when started it. * for example: - * class SrsIngester : public ISrsThreadHandler { - * public: SrsIngester() { pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true); } + * class SrsIngester : public ISrsReusableThreadHandler { + * public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US); } * public: virtual int start() { return pthread->start(); } * public: virtual void stop() { pthread->stop(); } * public: virtual int cycle() { @@ -290,10 +310,14 @@ public: * the thread is interrupted. */ virtual int cycle() = 0; +public: /** - * when thread stop, the handler can do cleanup. - * @remark this method is optional, handler can ignore it. + * other callback for handler. + * @remark all callback is optional, handler can ignore it. */ + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int on_end_cycle(); virtual void on_thread_stop(); }; class SrsReusableThread : public internal::ISrsThreadHandler @@ -314,6 +338,89 @@ public: * @remark user can stop multiple times, ignore if already stopped. */ virtual void stop(); +public: + /** + * get the context id. @see: ISrsThreadContext.get_id(). + * used for parent thread to get the id. + * @remark when start thread, parent thread will block and wait for this id ready. + */ + virtual int cid(); +// interface internal::ISrsThreadHandler +public: + virtual int cycle(); + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int on_end_cycle(); + virtual void on_thread_stop(); +}; + +/** + * the reuse thread is a thread stop and start by other thread. + * the version 2, is the thread cycle has its inner loop, which should + * check the intterrupt, and should interrupt thread when the inner loop want + * to quit the thread. + * user can create thread and stop then start again and again, + * generally must provides a start and stop method, @see SrsIngester. + * the step to create a thread stop by other thread: + * 1. create SrsReusableThread field. + * 2. must manually stop the thread when started it. + * for example: + * class SrsIngester : public ISrsReusableThreadHandler { + * public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US); } + * public: virtual int start() { return pthread->start(); } + * public: virtual void stop() { pthread->stop(); } + * public: virtual int cycle() { + * while (!pthread->interrupted()) { + * // quit thread when error. + * if (ret != ERROR_SUCCESS) { + * pthread->interrupt(); + * } + * + * // do something. + * } + * } + * }; + */ +class ISrsReusableThread2Handler +{ +public: + ISrsReusableThread2Handler(); + virtual ~ISrsReusableThread2Handler(); +public: + /** + * the cycle method for the one cycle thread. + * @remark when the cycle has its inner loop, it must check whether + * the thread is interrupted. + */ + virtual int cycle() = 0; +public: + /** + * other callback for handler. + * @remark all callback is optional, handler can ignore it. + */ + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int on_end_cycle(); + virtual void on_thread_stop(); +}; +class SrsReusableThread2 : public internal::ISrsThreadHandler +{ +private: + internal::SrsThread* pthread; + ISrsReusableThread2Handler* handler; +public: + SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t interval_us = 0); + virtual ~SrsReusableThread2(); +public: + /** + * for the reusable thread, start and stop by user. + */ + virtual int start(); + /** + * stop the thread, wait for the thread to terminate. + * @remark user can stop multiple times, ignore if already stopped. + */ + virtual void stop(); public: /** * get the context id. @see: ISrsThreadContext.get_id(). @@ -335,6 +442,9 @@ public: // interface internal::ISrsThreadHandler public: virtual int cycle(); + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int on_end_cycle(); virtual void on_thread_stop(); };