refine the thread, add all callback.

pull/499/head
winlin 10 years ago
parent e5f449ce36
commit cfc0877ec9

@ -70,7 +70,7 @@ SrsEdgeIngester::SrsEdgeIngester()
origin_index = 0; origin_index = 0;
stream_id = 0; stream_id = 0;
stfd = NULL; stfd = NULL;
pthread = new SrsReusableThread("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US);
} }
SrsEdgeIngester::~SrsEdgeIngester() SrsEdgeIngester::~SrsEdgeIngester()
@ -397,7 +397,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
origin_index = 0; origin_index = 0;
stream_id = 0; stream_id = 0;
stfd = NULL; stfd = NULL;
pthread = new SrsReusableThread("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US);
queue = new SrsMessageQueue(); queue = new SrsMessageQueue();
send_error_code = ERROR_SUCCESS; send_error_code = ERROR_SUCCESS;
} }

@ -75,7 +75,7 @@ enum SrsEdgeUserState
/** /**
* edge used to ingest stream from origin. * edge used to ingest stream from origin.
*/ */
class SrsEdgeIngester : public ISrsReusableThreadHandler class SrsEdgeIngester : public ISrsReusableThread2Handler
{ {
private: private:
int stream_id; int stream_id;
@ -83,7 +83,7 @@ private:
SrsSource* _source; SrsSource* _source;
SrsPlayEdge* _edge; SrsPlayEdge* _edge;
SrsRequest* _req; SrsRequest* _req;
SrsReusableThread* pthread; SrsReusableThread2* pthread;
st_netfd_t stfd; st_netfd_t stfd;
ISrsProtocolReaderWriter* io; ISrsProtocolReaderWriter* io;
SrsKbps* kbps; SrsKbps* kbps;
@ -96,7 +96,7 @@ public:
virtual int initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req); virtual int initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req);
virtual int start(); virtual int start();
virtual void stop(); virtual void stop();
// interface ISrsReusableThreadHandler // interface ISrsReusableThread2Handler
public: public:
virtual int cycle(); virtual int cycle();
private: private:
@ -110,7 +110,7 @@ private:
/** /**
* edge used to forward stream to origin. * edge used to forward stream to origin.
*/ */
class SrsEdgeForwarder : public ISrsReusableThreadHandler class SrsEdgeForwarder : public ISrsReusableThread2Handler
{ {
private: private:
int stream_id; int stream_id;
@ -118,7 +118,7 @@ private:
SrsSource* _source; SrsSource* _source;
SrsPublishEdge* _edge; SrsPublishEdge* _edge;
SrsRequest* _req; SrsRequest* _req;
SrsReusableThread* pthread; SrsReusableThread2* pthread;
st_netfd_t stfd; st_netfd_t stfd;
ISrsProtocolReaderWriter* io; ISrsProtocolReaderWriter* io;
SrsKbps* kbps; SrsKbps* kbps;
@ -144,7 +144,7 @@ public:
virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req);
virtual int start(); virtual int start();
virtual void stop(); virtual void stop();
// interface ISrsReusableThreadHandler // interface ISrsReusableThread2Handler
public: public:
virtual int cycle(); virtual int cycle();
public: public:

@ -59,7 +59,7 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
kbps = new SrsKbps(); kbps = new SrsKbps();
stream_id = 0; stream_id = 0;
pthread = new SrsReusableThread("forward", this, SRS_FORWARDER_SLEEP_US); pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US);
queue = new SrsMessageQueue(); queue = new SrsMessageQueue();
jitter = new SrsRtmpJitter(); jitter = new SrsRtmpJitter();

@ -48,7 +48,7 @@ class SrsKbps;
* forward the stream to other servers. * forward the stream to other servers.
*/ */
// TODO: FIXME: refine the error log, comments it. // TODO: FIXME: refine the error log, comments it.
class SrsForwarder : public ISrsReusableThreadHandler class SrsForwarder : public ISrsReusableThread2Handler
{ {
private: private:
// the ep to forward, server[:port]. // the ep to forward, server[:port].
@ -57,7 +57,7 @@ private:
int stream_id; int stream_id;
private: private:
st_netfd_t stfd; st_netfd_t stfd;
SrsReusableThread* pthread; SrsReusableThread2* pthread;
private: private:
SrsSource* source; SrsSource* source;
ISrsProtocolReaderWriter* io; ISrsProtocolReaderWriter* io;
@ -95,7 +95,7 @@ public:
* @param shared_video, directly ptr, copy it if need to save it. * @param shared_video, directly ptr, copy it if need to save it.
*/ */
virtual int on_video(SrsSharedPtrMessage* shared_video); virtual int on_video(SrsSharedPtrMessage* shared_video);
// interface ISrsReusableThreadHandler. // interface ISrsReusableThread2Handler.
public: public:
virtual int cycle(); virtual int cycle();
private: private:

@ -157,7 +157,6 @@ int SrsUdpListener::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
while (!pthread->interrupted()) {
// TODO: FIXME: support ipv6, @see man 7 ipv6 // TODO: FIXME: support ipv6, @see man 7 ipv6
sockaddr_in from; sockaddr_in from;
int nb_from = sizeof(sockaddr_in); int nb_from = sizeof(sockaddr_in);
@ -165,18 +164,17 @@ int SrsUdpListener::cycle()
if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) {
srs_warn("ignore recv udp packet failed, nread=%d", nread); srs_warn("ignore recv udp packet failed, nread=%d", nread);
continue; return ret;
} }
if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) { if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) {
srs_warn("handle udp packet failed. ret=%d", ret); srs_warn("handle udp packet failed. ret=%d", ret);
continue; return ret;
} }
if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) {
st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000);
} }
}
return ret; return ret;
} }

@ -50,7 +50,7 @@ SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtm
timeout = timeout_ms; timeout = timeout_ms;
handler = msg_handler; handler = msg_handler;
rtmp = rtmp_sdk; rtmp = rtmp_sdk;
trd = new SrsReusableThread("recv", this); trd = new SrsReusableThread2("recv", this);
} }
SrsRecvThread::~SrsRecvThread() SrsRecvThread::~SrsRecvThread()
@ -72,6 +72,11 @@ void SrsRecvThread::stop()
trd->stop(); trd->stop();
} }
void SrsRecvThread::stop_loop()
{
trd->interrupt();
}
int SrsRecvThread::cycle() int SrsRecvThread::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -109,11 +114,6 @@ int SrsRecvThread::cycle()
return ret; return ret;
} }
void SrsRecvThread::stop_loop()
{
trd->interrupt();
}
void SrsRecvThread::on_thread_start() void SrsRecvThread::on_thread_start()
{ {
// the multiple messages writev improve performance large, // the multiple messages writev improve performance large,

@ -79,10 +79,10 @@ public:
/** /**
* the recv thread, use message handler to handle each received message. * the recv thread, use message handler to handle each received message.
*/ */
class SrsRecvThread : public ISrsReusableThreadHandler class SrsRecvThread : public ISrsReusableThread2Handler
{ {
protected: protected:
SrsReusableThread* trd; SrsReusableThread2* trd;
ISrsMessageHandler* handler; ISrsMessageHandler* handler;
SrsRtmpServer* rtmp; SrsRtmpServer* rtmp;
int timeout; int timeout;
@ -92,9 +92,10 @@ public:
public: public:
virtual int start(); virtual int start();
virtual void stop(); virtual void stop();
virtual int cycle();
virtual void stop_loop(); virtual void stop_loop();
// interface ISrsReusableThread2Handler
public: public:
virtual int cycle();
virtual void on_thread_start(); virtual void on_thread_start();
virtual void on_thread_stop(); virtual void on_thread_stop();
}; };

@ -239,6 +239,24 @@ ISrsEndlessThreadHandler::~ISrsEndlessThreadHandler()
{ {
} }
void ISrsEndlessThreadHandler::on_thread_start()
{
}
int ISrsEndlessThreadHandler::on_before_cycle()
{
return ERROR_SUCCESS;
}
int ISrsEndlessThreadHandler::on_end_cycle()
{
return ERROR_SUCCESS;
}
void ISrsEndlessThreadHandler::on_thread_stop()
{
}
SrsEndlessThread::SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h) SrsEndlessThread::SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h)
{ {
handler = h; handler = h;
@ -261,6 +279,26 @@ int SrsEndlessThread::cycle()
return handler->cycle(); return handler->cycle();
} }
void SrsEndlessThread::on_thread_start()
{
handler->on_thread_start();
}
int SrsEndlessThread::on_before_cycle()
{
return handler->on_before_cycle();
}
int SrsEndlessThread::on_end_cycle()
{
return handler->on_end_cycle();
}
void SrsEndlessThread::on_thread_stop()
{
handler->on_thread_stop();
}
ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler() ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler()
{ {
} }
@ -269,6 +307,20 @@ ISrsOneCycleThreadHandler::~ISrsOneCycleThreadHandler()
{ {
} }
void ISrsOneCycleThreadHandler::on_thread_start()
{
}
int ISrsOneCycleThreadHandler::on_before_cycle()
{
return ERROR_SUCCESS;
}
int ISrsOneCycleThreadHandler::on_end_cycle()
{
return ERROR_SUCCESS;
}
void ISrsOneCycleThreadHandler::on_thread_stop() void ISrsOneCycleThreadHandler::on_thread_stop()
{ {
} }
@ -297,6 +349,21 @@ int SrsOneCycleThread::cycle()
return ret; return ret;
} }
void SrsOneCycleThread::on_thread_start()
{
handler->on_thread_start();
}
int SrsOneCycleThread::on_before_cycle()
{
return handler->on_before_cycle();
}
int SrsOneCycleThread::on_end_cycle()
{
return handler->on_end_cycle();
}
void SrsOneCycleThread::on_thread_stop() void SrsOneCycleThread::on_thread_stop()
{ {
handler->on_thread_stop(); handler->on_thread_stop();
@ -310,6 +377,20 @@ ISrsReusableThreadHandler::~ISrsReusableThreadHandler()
{ {
} }
void ISrsReusableThreadHandler::on_thread_start()
{
}
int ISrsReusableThreadHandler::on_before_cycle()
{
return ERROR_SUCCESS;
}
int ISrsReusableThreadHandler::on_end_cycle()
{
return ERROR_SUCCESS;
}
void ISrsReusableThreadHandler::on_thread_stop() void ISrsReusableThreadHandler::on_thread_stop()
{ {
} }
@ -341,22 +422,116 @@ int SrsReusableThread::cid()
return pthread->cid(); return pthread->cid();
} }
void SrsReusableThread::interrupt() int SrsReusableThread::cycle()
{
return handler->cycle();
}
void SrsReusableThread::on_thread_start()
{
handler->on_thread_start();
}
int SrsReusableThread::on_before_cycle()
{
return handler->on_before_cycle();
}
int SrsReusableThread::on_end_cycle()
{
return handler->on_end_cycle();
}
void SrsReusableThread::on_thread_stop()
{
handler->on_thread_stop();
}
ISrsReusableThread2Handler::ISrsReusableThread2Handler()
{
}
ISrsReusableThread2Handler::~ISrsReusableThread2Handler()
{
}
void ISrsReusableThread2Handler::on_thread_start()
{
}
int ISrsReusableThread2Handler::on_before_cycle()
{
return ERROR_SUCCESS;
}
int ISrsReusableThread2Handler::on_end_cycle()
{
return ERROR_SUCCESS;
}
void ISrsReusableThread2Handler::on_thread_stop()
{
}
SrsReusableThread2::SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t interval_us)
{
handler = h;
pthread = new internal::SrsThread(n, this, interval_us, true);
}
SrsReusableThread2::~SrsReusableThread2()
{
pthread->stop();
srs_freep(pthread);
}
int SrsReusableThread2::start()
{
return pthread->start();
}
void SrsReusableThread2::stop()
{
pthread->stop();
}
int SrsReusableThread2::cid()
{
return pthread->cid();
}
void SrsReusableThread2::interrupt()
{ {
pthread->stop_loop(); pthread->stop_loop();
} }
bool SrsReusableThread::interrupted() bool SrsReusableThread2::interrupted()
{ {
return !pthread->can_loop(); return !pthread->can_loop();
} }
int SrsReusableThread::cycle() int SrsReusableThread2::cycle()
{ {
return handler->cycle(); return handler->cycle();
} }
void SrsReusableThread::on_thread_stop() void SrsReusableThread2::on_thread_start()
{
handler->on_thread_start();
}
int SrsReusableThread2::on_before_cycle()
{
return handler->on_before_cycle();
}
int SrsReusableThread2::on_end_cycle()
{
return handler->on_end_cycle();
}
void SrsReusableThread2::on_thread_stop()
{ {
handler->on_thread_stop(); handler->on_thread_stop();
} }

@ -184,6 +184,15 @@ public:
* @remark user must use block method in cycle method, for example, sleep or socket io. * @remark user must use block method in cycle method, for example, sleep or socket io.
*/ */
virtual int cycle() = 0; virtual int cycle() = 0;
public:
/**
* other callback for handler.
* @remark all callback is optional, handler can ignore it.
*/
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
}; };
class SrsEndlessThread : public internal::ISrsThreadHandler class SrsEndlessThread : public internal::ISrsThreadHandler
{ {
@ -201,6 +210,10 @@ public:
// interface internal::ISrsThreadHandler // interface internal::ISrsThreadHandler
public: public:
virtual int cycle(); virtual int cycle();
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
}; };
/** /**
@ -236,10 +249,14 @@ public:
* the cycle method for the one cycle thread. * the cycle method for the one cycle thread.
*/ */
virtual int cycle() = 0; virtual int cycle() = 0;
public:
/** /**
* when thread stop, the handler can do cleanup. * other callback for handler.
* @remark this method is optional, handler can ignore it. * @remark all callback is optional, handler can ignore it.
*/ */
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop(); virtual void on_thread_stop();
}; };
class SrsOneCycleThread : public internal::ISrsThreadHandler class SrsOneCycleThread : public internal::ISrsThreadHandler
@ -258,6 +275,9 @@ public:
// interface internal::ISrsThreadHandler // interface internal::ISrsThreadHandler
public: public:
virtual int cycle(); virtual int cycle();
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop(); virtual void on_thread_stop();
}; };
@ -266,11 +286,11 @@ public:
* user can create thread and stop then start again and again, * user can create thread and stop then start again and again,
* generally must provides a start and stop method, @see SrsIngester. * generally must provides a start and stop method, @see SrsIngester.
* the step to create a thread stop by other thread: * the step to create a thread stop by other thread:
* 1. create SrsThread field, with joinable true. * 1. create SrsReusableThread field.
* 2. must use stop to stop and join the thread. * 2. must manually stop the thread when started it.
* for example: * for example:
* class SrsIngester : public ISrsThreadHandler { * class SrsIngester : public ISrsReusableThreadHandler {
* public: SrsIngester() { pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true); } * public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US); }
* public: virtual int start() { return pthread->start(); } * public: virtual int start() { return pthread->start(); }
* public: virtual void stop() { pthread->stop(); } * public: virtual void stop() { pthread->stop(); }
* public: virtual int cycle() { * public: virtual int cycle() {
@ -290,10 +310,14 @@ public:
* the thread is interrupted. * the thread is interrupted.
*/ */
virtual int cycle() = 0; virtual int cycle() = 0;
public:
/** /**
* when thread stop, the handler can do cleanup. * other callback for handler.
* @remark this method is optional, handler can ignore it. * @remark all callback is optional, handler can ignore it.
*/ */
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop(); virtual void on_thread_stop();
}; };
class SrsReusableThread : public internal::ISrsThreadHandler class SrsReusableThread : public internal::ISrsThreadHandler
@ -314,6 +338,89 @@ public:
* @remark user can stop multiple times, ignore if already stopped. * @remark user can stop multiple times, ignore if already stopped.
*/ */
virtual void stop(); virtual void stop();
public:
/**
* get the context id. @see: ISrsThreadContext.get_id().
* used for parent thread to get the id.
* @remark when start thread, parent thread will block and wait for this id ready.
*/
virtual int cid();
// interface internal::ISrsThreadHandler
public:
virtual int cycle();
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};
/**
* the reuse thread is a thread stop and start by other thread.
* the version 2, is the thread cycle has its inner loop, which should
* check the intterrupt, and should interrupt thread when the inner loop want
* to quit the thread.
* user can create thread and stop then start again and again,
* generally must provides a start and stop method, @see SrsIngester.
* the step to create a thread stop by other thread:
* 1. create SrsReusableThread field.
* 2. must manually stop the thread when started it.
* for example:
* class SrsIngester : public ISrsReusableThreadHandler {
* public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US); }
* public: virtual int start() { return pthread->start(); }
* public: virtual void stop() { pthread->stop(); }
* public: virtual int cycle() {
* while (!pthread->interrupted()) {
* // quit thread when error.
* if (ret != ERROR_SUCCESS) {
* pthread->interrupt();
* }
*
* // do something.
* }
* }
* };
*/
class ISrsReusableThread2Handler
{
public:
ISrsReusableThread2Handler();
virtual ~ISrsReusableThread2Handler();
public:
/**
* the cycle method for the one cycle thread.
* @remark when the cycle has its inner loop, it must check whether
* the thread is interrupted.
*/
virtual int cycle() = 0;
public:
/**
* other callback for handler.
* @remark all callback is optional, handler can ignore it.
*/
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};
class SrsReusableThread2 : public internal::ISrsThreadHandler
{
private:
internal::SrsThread* pthread;
ISrsReusableThread2Handler* handler;
public:
SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t interval_us = 0);
virtual ~SrsReusableThread2();
public:
/**
* for the reusable thread, start and stop by user.
*/
virtual int start();
/**
* stop the thread, wait for the thread to terminate.
* @remark user can stop multiple times, ignore if already stopped.
*/
virtual void stop();
public: public:
/** /**
* get the context id. @see: ISrsThreadContext.get_id(). * get the context id. @see: ISrsThreadContext.get_id().
@ -335,6 +442,9 @@ public:
// interface internal::ISrsThreadHandler // interface internal::ISrsThreadHandler
public: public:
virtual int cycle(); virtual int cycle();
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop(); virtual void on_thread_stop();
}; };

Loading…
Cancel
Save