diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index 882331e78..c3a30d288 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -28,9 +28,6 @@ using namespace std; #include #include -// the sleep interval in ms for http async callback. -#define SRS_AUTO_ASYNC_CALLBACL_CIMS 30 - ISrsAsyncCallTask::ISrsAsyncCallTask() { } @@ -41,13 +38,13 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask() SrsAsyncCallWorker::SrsAsyncCallWorker() { - pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_CIMS); + trd = NULL; wait = st_cond_new(); } SrsAsyncCallWorker::~SrsAsyncCallWorker() { - srs_freep(pthread); + srs_freep(trd); std::vector::iterator it; for (it = tasks.begin(); it != tasks.end(); ++it) { @@ -76,29 +73,31 @@ int SrsAsyncCallWorker::count() int SrsAsyncCallWorker::start() { - return pthread->start(); + srs_freep(trd); + trd = new SrsCoroutine("async", this, _srs_context->get_id()); + return trd->start(); } void SrsAsyncCallWorker::stop() { st_cond_signal(wait); - pthread->stop(); + trd->stop(); } int SrsAsyncCallWorker::cycle() { int ret = ERROR_SUCCESS; - while (pthread->can_loop()) { + while (!trd->pull()) { if (tasks.empty()) { st_cond_wait(wait); } - std::vector copies = tasks; + std::vector copy = tasks; tasks.clear(); std::vector::iterator it; - for (it = copies.begin(); it != copies.end(); ++it) { + for (it = copy.begin(); it != copy.end(); ++it) { ISrsAsyncCallTask* task = *it; if ((ret = task->call()) != ERROR_SUCCESS) { srs_warn("ignore async callback %s, ret=%d", task->to_string().c_str(), ret); diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp index 2da094cab..0d303dfc5 100644 --- a/trunk/src/app/srs_app_async_call.hpp +++ b/trunk/src/app/srs_app_async_call.hpp @@ -63,10 +63,10 @@ public: * when worker call with the task, the worker will do it in isolate thread. * that is, the task is execute/call in async mode. */ -class SrsAsyncCallWorker : public ISrsReusableThreadHandler +class SrsAsyncCallWorker : public ISrsCoroutineHandler { private: - SrsReusableThread* pthread; + SrsCoroutine* trd; protected: std::vector tasks; st_cond_t wait; diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp index 5f5378658..d4021b0a3 100644 --- a/trunk/src/app/srs_app_encoder.cpp +++ b/trunk/src/app/srs_app_encoder.cpp @@ -36,15 +36,12 @@ using namespace std; #ifdef SRS_AUTO_TRANSCODE -// when error, encoder sleep for a while and retry. -#define SRS_RTMP_ENCODER_CIMS (3000) - // for encoder to detect the dead loop static std::vector _transcoded_url; SrsEncoder::SrsEncoder() { - pthread = new SrsReusableThread("encoder", this, SRS_RTMP_ENCODER_CIMS); + trd = NULL; pprint = SrsPithyPrint::create_encoder(); } @@ -52,7 +49,7 @@ SrsEncoder::~SrsEncoder() { on_unpublish(); - srs_freep(pthread); + srs_freep(trd); srs_freep(pprint); } @@ -76,24 +73,35 @@ int SrsEncoder::on_publish(SrsRequest* req) } // start thread to run all encoding engines. - if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_freep(trd); + trd = new SrsCoroutine("encoder", this, _srs_context->get_id()); + if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create failed. ret=%d", ret); return ret; } - srs_trace("encoder thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id()); return ret; } void SrsEncoder::on_unpublish() { - pthread->stop(); + trd->stop(); clear_engines(); } +// when error, encoder sleep for a while and retry. +#define SRS_RTMP_ENCODER_CIMS (3000) + int SrsEncoder::cycle() { - int ret = do_cycle(); + int ret = ERROR_SUCCESS; + + while (!trd->pull()) { + if ((ret = do_cycle()) != ERROR_SUCCESS) { + srs_warn("Encoder: Ignore error, ret=%d", ret); + } + st_usleep(SRS_RTMP_ENCODER_CIMS * 1000); + } // kill ffmpeg when finished and it alive std::vector::iterator it; diff --git a/trunk/src/app/srs_app_encoder.hpp b/trunk/src/app/srs_app_encoder.hpp index cc68c3e13..ba83483d3 100644 --- a/trunk/src/app/srs_app_encoder.hpp +++ b/trunk/src/app/srs_app_encoder.hpp @@ -42,13 +42,13 @@ class SrsFFMPEG; * the encoder for a stream, * may use multiple ffmpegs to transcode the specified stream. */ -class SrsEncoder : public ISrsReusableThreadHandler +class SrsEncoder : public ISrsCoroutineHandler { private: std::string input_stream_name; std::vector ffmpegs; private: - SrsReusableThread* pthread; + SrsCoroutine* trd; SrsPithyPrint* pprint; public: SrsEncoder(); diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index b3f6d772d..562f4a2fb 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -37,10 +37,6 @@ using namespace std; #include #include -// when error, ingester sleep for a while and retry. -// ingest never sleep a long time, for we must start the stream ASAP. -#define SRS_AUTO_INGESTER_CIMS (3000) - SrsIngesterFFMPEG::SrsIngesterFFMPEG() { ffmpeg = NULL; @@ -109,7 +105,7 @@ SrsIngester::SrsIngester() expired = false; - pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_CIMS); + trd = NULL; pprint = SrsPithyPrint::create_ingester(); } @@ -117,7 +113,7 @@ SrsIngester::~SrsIngester() { _srs_config->unsubscribe(this); - srs_freep(pthread); + srs_freep(trd); clear_engines(); } @@ -144,18 +140,19 @@ int SrsIngester::start() // for the reload may add more ingesters. // start thread to run all encoding engines. - if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_freep(trd); + trd = new SrsCoroutine("ingest", this, _srs_context->get_id()); + if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create failed. ret=%d", ret); return ret; } - srs_trace("ingest thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id()); return ret; } void SrsIngester::stop() { - pthread->stop(); + trd->stop(); clear_engines(); } @@ -172,10 +169,28 @@ void SrsIngester::fast_stop() } } +// when error, ingester sleep for a while and retry. +// ingest never sleep a long time, for we must start the stream ASAP. +#define SRS_AUTO_INGESTER_CIMS (3000) + int SrsIngester::cycle() { int ret = ERROR_SUCCESS; + while (!trd->pull()) { + if ((ret = do_cycle()) != ERROR_SUCCESS) { + srs_warn("Ingester: Ignore error, ret=%d", ret); + } + st_usleep(SRS_AUTO_INGESTER_CIMS * 1000); + } + + return ret; +} + +int SrsIngester::do_cycle() +{ + int ret = ERROR_SUCCESS; + // when expired, restart all ingesters. if (expired) { expired = false; diff --git a/trunk/src/app/srs_app_ingest.hpp b/trunk/src/app/srs_app_ingest.hpp index a40d27af0..a4f5ca9a4 100644 --- a/trunk/src/app/srs_app_ingest.hpp +++ b/trunk/src/app/srs_app_ingest.hpp @@ -71,12 +71,12 @@ public: * encode with FFMPEG(optional), * push to SRS(or any RTMP server) over RTMP. */ -class SrsIngester : public ISrsReusableThreadHandler, public ISrsReloadHandler +class SrsIngester : public ISrsCoroutineHandler, public ISrsReloadHandler { private: std::vector ingesters; private: - SrsReusableThread* pthread; + SrsCoroutine* trd; SrsPithyPrint* pprint; // whether the ingesters are expired, // for example, the listen port changed, @@ -95,6 +95,8 @@ private: // interface ISrsReusableThreadHandler. public: virtual int cycle(); +private: + virtual int do_cycle(); private: virtual void clear_engines(); virtual int parse(); diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index d832db03f..21d6f753e 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -40,7 +40,6 @@ using namespace std; #ifdef SRS_AUTO_KAFKA -#define SRS_KAKFA_CIMS 3000 #define SRS_KAFKA_PRODUCER_TIMEOUT 30000 #define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 1 @@ -366,7 +365,7 @@ SrsKafkaProducer::SrsKafkaProducer() metadata_expired = st_cond_new(); lock = st_mutex_new(); - pthread = new SrsReusableThread("kafka", this, SRS_KAKFA_CIMS); + trd = NULL; worker = new SrsAsyncCallWorker(); cache = new SrsKafkaCache(); @@ -380,7 +379,7 @@ SrsKafkaProducer::~SrsKafkaProducer() srs_freep(lb); srs_freep(worker); - srs_freep(pthread); + srs_freep(trd); srs_freep(cache); st_mutex_destroy(lock); @@ -410,7 +409,9 @@ int SrsKafkaProducer::start() return ret; } - if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_freep(trd); + trd = new SrsCoroutine("kafka", this, _srs_context->get_id()); + if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("start kafka thread failed. ret=%d", ret); } @@ -425,7 +426,7 @@ void SrsKafkaProducer::stop() return; } - pthread->stop(); + trd->stop(); worker->stop(); } @@ -491,12 +492,16 @@ int SrsKafkaProducer::on_close(int key) return worker->execute(new SrsKafkaMessage(this, key, obj)); } +#define SRS_KAKFA_CIMS 3000 int SrsKafkaProducer::cycle() { int ret = ERROR_SUCCESS; - if ((ret = do_cycle()) != ERROR_SUCCESS) { - srs_warn("ignore kafka error. ret=%d", ret); + while (!trd->pull()) { + if ((ret = do_cycle()) != ERROR_SUCCESS) { + srs_warn("ignore kafka error. ret=%d", ret); + } + st_usleep(SRS_KAKFA_CIMS * 1000); } return ret; diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 919152f3d..ef2dcec1a 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -158,13 +158,13 @@ extern void srs_dispose_kafka(); /** * the kafka producer used to save log to kafka cluster. */ -class SrsKafkaProducer : virtual public ISrsReusableThreadHandler, virtual public ISrsKafkaCluster +class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISrsKafkaCluster { private: // TODO: FIXME: support reload. bool enabled; st_mutex_t lock; - SrsReusableThread* pthread; + SrsCoroutine* trd; private: bool metadata_ok; st_cond_t metadata_expired; diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index c897ec138..5b047f2a4 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -79,7 +79,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p) nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; - pthread = new SrsReusableThread("udp", this); + trd = NULL; } SrsUdpListener::~SrsUdpListener() @@ -87,8 +87,7 @@ SrsUdpListener::~SrsUdpListener() // close the stfd to trigger thread to interrupted. srs_close_stfd(_stfd); - pthread->stop(); - srs_freep(pthread); + srs_freep(trd); // st does not close it sometimes, // close it manually. @@ -118,13 +117,8 @@ int SrsUdpListener::listen() } srs_verbose("create linux socket success. ip=%s, port=%d, fd=%d", ip.c_str(), port, _fd); - int reuse_socket = 1; - if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) { - ret = ERROR_SOCKET_SETREUSE; - srs_error("setsockopt reuse-addr error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); - return ret; - } - srs_verbose("setsockopt reuse-addr success. ip=%s, port=%d, fd=%d", ip.c_str(), port, _fd); + srs_fd_close_exec(_fd); + srs_socket_reuse_addr(_fd); sockaddr_in addr; addr.sin_family = AF_INET; @@ -144,7 +138,9 @@ int SrsUdpListener::listen() } srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); - if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_freep(trd); + trd = new SrsCoroutine("udp", this); + if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; } @@ -157,23 +153,25 @@ int SrsUdpListener::cycle() { int ret = ERROR_SUCCESS; - // TODO: FIXME: support ipv6, @see man 7 ipv6 - sockaddr_in from; - int nb_from = sizeof(sockaddr_in); - int nread = 0; - - if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { - srs_warn("ignore recv udp packet failed, nread=%d", nread); - return ret; - } - - if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) { - srs_warn("handle udp packet failed. ret=%d", ret); - return ret; - } - - if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { - st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); + while (!trd->pull()) { + // TODO: FIXME: support ipv6, @see man 7 ipv6 + sockaddr_in from; + int nb_from = sizeof(sockaddr_in); + int nread = 0; + + if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { + srs_warn("ignore recv udp packet failed, nread=%d", nread); + return ret; + } + + if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) { + srs_warn("handle udp packet failed. ret=%d", ret); + return ret; + } + + if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { + st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); + } } return ret; @@ -188,13 +186,12 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p) _fd = -1; _stfd = NULL; - pthread = new SrsReusableThread("tcp", this); + trd = NULL; } SrsTcpListener::~SrsTcpListener() { - pthread->stop(); - srs_freep(pthread); + srs_freep(trd); srs_close_stfd(_stfd); } @@ -215,13 +212,8 @@ int SrsTcpListener::listen() } srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd); - int reuse_socket = 1; - if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) { - ret = ERROR_SOCKET_SETREUSE; - srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd); + srs_fd_close_exec(_fd); + srs_socket_reuse_addr(_fd); sockaddr_in addr; addr.sin_family = AF_INET; @@ -248,7 +240,9 @@ int SrsTcpListener::listen() } srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); - if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_freep(trd); + trd = new SrsCoroutine("tcp", this); + if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; } @@ -261,20 +255,25 @@ int SrsTcpListener::cycle() { int ret = ERROR_SUCCESS; - st_netfd_t client_stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); - - if(client_stfd == NULL){ - // ignore error. - if (errno != EINTR) { - srs_error("ignore accept thread stoppped for accept client error"); + while (!trd->pull()) { + st_netfd_t stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); + int fd = st_netfd_fileno(stfd); + + srs_fd_close_exec(fd); + + if(stfd == NULL){ + // ignore error. + if (errno != EINTR) { + srs_error("ignore accept thread stoppped for accept client error"); + } + return ret; + } + srs_verbose("get a client. fd=%d", fd); + + if ((ret = handler->on_tcp_client(stfd)) != ERROR_SUCCESS) { + srs_warn("accept client error. ret=%d", ret); + return ret; } - return ret; - } - srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); - - if ((ret = handler->on_tcp_client(client_stfd)) != ERROR_SUCCESS) { - srs_warn("accept client error. ret=%d", ret); - return ret; } return ret; diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index fc9966229..cad00c87e 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -78,12 +78,12 @@ public: /** * bind udp port, start thread to recv packet and handler it. */ -class SrsUdpListener : public ISrsReusableThreadHandler +class SrsUdpListener : public ISrsCoroutineHandler { private: int _fd; st_netfd_t _stfd; - SrsReusableThread* pthread; + SrsCoroutine* trd; private: char* buf; int nb_buf; @@ -107,12 +107,12 @@ public: /** * bind and listen tcp port, use handler to process the client. */ -class SrsTcpListener : public ISrsReusableThreadHandler +class SrsTcpListener : public ISrsCoroutineHandler { private: int _fd; st_netfd_t _stfd; - SrsReusableThread* pthread; + SrsCoroutine* trd; private: ISrsTcpHandler* handler; std::string ip; diff --git a/trunk/src/app/srs_app_ng_exec.cpp b/trunk/src/app/srs_app_ng_exec.cpp index dfaef8ecb..7f5e920f6 100644 --- a/trunk/src/app/srs_app_ng_exec.cpp +++ b/trunk/src/app/srs_app_ng_exec.cpp @@ -36,12 +36,9 @@ using namespace std; #include #include -// when error, ng-exec sleep for a while and retry. -#define SRS_RTMP_EXEC_CIMS (3000) - SrsNgExec::SrsNgExec() { - pthread = new SrsReusableThread("encoder", this, SRS_RTMP_EXEC_CIMS); + trd = NULL; pprint = SrsPithyPrint::create_exec(); } @@ -49,7 +46,7 @@ SrsNgExec::~SrsNgExec() { on_unpublish(); - srs_freep(pthread); + srs_freep(trd); srs_freep(pprint); } @@ -63,24 +60,34 @@ int SrsNgExec::on_publish(SrsRequest* req) } // start thread to run all processes. - if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_freep(trd); + trd = new SrsCoroutine("encoder", this, _srs_context->get_id()); + if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create failed. ret=%d", ret); return ret; } - srs_trace("exec thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id()); return ret; } void SrsNgExec::on_unpublish() { - pthread->stop(); + trd->stop(); clear_exec_publish(); } +// when error, ng-exec sleep for a while and retry. +#define SRS_RTMP_EXEC_CIMS (3000) int SrsNgExec::cycle() { - int ret = do_cycle(); + int ret = ERROR_SUCCESS; + + while (!trd->pull()) { + if ((ret = do_cycle()) != ERROR_SUCCESS) { + srs_warn("EXEC: Ignore error, ret=%d", ret); + } + st_usleep(SRS_RTMP_EXEC_CIMS * 1000); + } std::vector::iterator it; for (it = exec_publishs.begin(); it != exec_publishs.end(); ++it) { diff --git a/trunk/src/app/srs_app_ng_exec.hpp b/trunk/src/app/srs_app_ng_exec.hpp index 06ac2e01c..1b963e71b 100644 --- a/trunk/src/app/srs_app_ng_exec.hpp +++ b/trunk/src/app/srs_app_ng_exec.hpp @@ -40,10 +40,10 @@ class SrsProcess; * @see https://github.com/arut/nginx-rtmp-module/wiki/Directives#exec_push * @see https://github.com/ossrs/srs/issues/367 */ -class SrsNgExec : public ISrsReusableThreadHandler +class SrsNgExec : public ISrsCoroutineHandler { private: - SrsReusableThread* pthread; + SrsCoroutine* trd; SrsPithyPrint* pprint; std::string input_stream_name; std::vector exec_publishs; diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index 7080d0251..3713d8251 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -124,8 +124,12 @@ int SrsCoroutine::cid() int SrsCoroutine::cycle() { - if (!context && _srs_context) { - context = _srs_context->generate_id(); + if (_srs_context) { + if (context) { + _srs_context->set_id(context); + } else { + context = _srs_context->generate_id(); + } } srs_info("Thread.cycle: Start with cid=%d, err=%d", context, err); diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index 1fa5668f6..014c69b4b 100755 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -78,89 +78,6 @@ void SrsCoroutineManager::clear() } } -ISrsReusableThreadHandler::ISrsReusableThreadHandler() -{ -} - -ISrsReusableThreadHandler::~ISrsReusableThreadHandler() -{ -} - -void ISrsReusableThreadHandler::on_thread_start() -{ -} - -int ISrsReusableThreadHandler::on_before_cycle() -{ - return ERROR_SUCCESS; -} - -int ISrsReusableThreadHandler::on_end_cycle() -{ - return ERROR_SUCCESS; -} - -void ISrsReusableThreadHandler::on_thread_stop() -{ -} - -SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t cims) -{ - handler = h; - pthread = new internal::SrsThread(n, this, cims, true); -} - -SrsReusableThread::~SrsReusableThread() -{ - pthread->stop(); - srs_freep(pthread); -} - -int SrsReusableThread::start() -{ - return pthread->start(); -} - -void SrsReusableThread::stop() -{ - pthread->stop(); -} - -bool SrsReusableThread::can_loop() -{ - return pthread->can_loop(); -} - -int SrsReusableThread::cid() -{ - return pthread->cid(); -} - -int SrsReusableThread::cycle() -{ - return handler->cycle(); -} - -void SrsReusableThread::on_thread_start() -{ - handler->on_thread_start(); -} - -int SrsReusableThread::on_before_cycle() -{ - return handler->on_before_cycle(); -} - -int SrsReusableThread::on_end_cycle() -{ - return handler->on_end_cycle(); -} - -void SrsReusableThread::on_thread_stop() -{ - handler->on_thread_stop(); -} - ISrsReusableThread2Handler::ISrsReusableThread2Handler() { } diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index 85feabf8f..ee9983493 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -58,85 +58,6 @@ private: void clear(); }; -/** - * the reuse thread is a thread stop and start by other thread. - * user can create thread and stop then start again and again, - * generally must provides a start and stop method, @see SrsIngester. - * the step to create a thread stop by other thread: - * 1. create SrsReusableThread field. - * 2. must manually stop the thread when started it. - * for example: - * class SrsIngester : public ISrsReusableThreadHandler { - * public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_CIMS); } - * public: virtual int start() { return pthread->start(); } - * public: virtual void stop() { pthread->stop(); } - * public: virtual int cycle() { - * // check status, start ffmpeg when stopped. - * } - * }; - */ -class ISrsReusableThreadHandler -{ -public: - ISrsReusableThreadHandler(); - virtual ~ISrsReusableThreadHandler(); -public: - /** - * the cycle method for the one cycle thread. - * @remark when the cycle has its inner loop, it must check whether - * the thread is interrupted. - */ - virtual int cycle() = 0; -public: - /** - * other callback for handler. - * @remark all callback is optional, handler can ignore it. - */ - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int on_end_cycle(); - virtual void on_thread_stop(); -}; -class SrsReusableThread : public internal::ISrsThreadHandler -{ -private: - internal::SrsThread* pthread; - ISrsReusableThreadHandler* handler; -public: - SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t cims = 0); - virtual ~SrsReusableThread(); -public: - /** - * for the reusable thread, start and stop by user. - */ - virtual int start(); - /** - * stop the thread, wait for the thread to terminate. - * @remark user can stop multiple times, ignore if already stopped. - */ - virtual void stop(); - /** - * whether the thread should loop, - * used for handler->cycle() which has a loop method, - * to check this method, break if false. - */ - virtual bool can_loop(); -public: - /** - * get the context id. @see: ISrsThreadContext.get_id(). - * used for parent thread to get the id. - * @remark when start thread, parent thread will block and wait for this id ready. - */ - virtual int cid(); -// interface internal::ISrsThreadHandler -public: - virtual int cycle(); - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int on_end_cycle(); - virtual void on_thread_stop(); -}; - /** * the reuse thread is a thread stop and start by other thread. * the version 2, is the thread cycle has its inner loop, which should diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index e713f2bfa..d921cf54f 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -23,6 +23,8 @@ #include +#include +#include using namespace std; #include @@ -88,6 +90,19 @@ void srs_close_stfd(st_netfd_t& stfd) } } +void srs_fd_close_exec(int fd) +{ + int flags = fcntl(fd, F_GETFD); + flags |= FD_CLOEXEC; + fcntl(fd, F_SETFD, flags); +} + +void srs_socket_reuse_addr(int fd) +{ + int v = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int)); +} + SrsStSocket::SrsStSocket() { stfd = NULL; diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp index 281de85e4..1faf95373 100644 --- a/trunk/src/service/srs_service_st.hpp +++ b/trunk/src/service/srs_service_st.hpp @@ -38,6 +38,12 @@ extern int srs_st_init(); // @remark when close, user must ensure io completed. extern void srs_close_stfd(st_netfd_t& stfd); +// Set the FD_CLOEXEC of FD. +extern void srs_fd_close_exec(int fd); + +// Set the SO_REUSEADDR of socket. +extern void srs_socket_reuse_addr(int fd); + /** * the socket provides TCP socket over st, * that is, the sync socket mechanism. diff --git a/trunk/src/service/srs_service_utility.cpp b/trunk/src/service/srs_service_utility.cpp index e3454782a..2d36d2387 100644 --- a/trunk/src/service/srs_service_utility.cpp +++ b/trunk/src/service/srs_service_utility.cpp @@ -56,6 +56,8 @@ int srs_socket_connect(string server, int port, int64_t tm, st_netfd_t* pstfd) return ret; } + srs_fd_close_exec(sock); + srs_assert(!stfd); stfd = st_netfd_open_socket(sock); if(stfd == NULL){