For #906, #902, remove the thread start and stop event

pull/908/head
winlin 8 years ago
parent 3ffb0980f5
commit b21f92f97a

@ -130,13 +130,10 @@ int SrsConnection::cycle()
srs_warn("client disconnect peer. oret=%d, ret=%d", oret, ret);
}
return ERROR_SUCCESS;
}
void SrsConnection::on_thread_stop()
{
// TODO: FIXME: never remove itself, use isolate thread to do cleanup.
// Notify manager to remove it.
manager->remove(this);
return ERROR_SUCCESS;
}
int SrsConnection::srs_id()

@ -123,12 +123,6 @@ public:
* thread will invoke the on_thread_stop() when it terminated.
*/
virtual int cycle();
/**
* when the thread cycle finished, thread will invoke the on_thread_stop(),
* which will remove self from server, server will remove the connection from manager
* then delete the connection.
*/
virtual void on_thread_stop();
public:
/**
* get the srs id which identify the client.

@ -92,6 +92,21 @@ void SrsEncoder::on_unpublish()
}
int SrsEncoder::cycle()
{
int ret = do_cycle();
// kill ffmpeg when finished and it alive
std::vector<SrsFFMPEG*>::iterator it;
for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
SrsFFMPEG* ffmpeg = *it;
ffmpeg->stop();
}
return ret;
}
int SrsEncoder::do_cycle()
{
int ret = ERROR_SUCCESS;
@ -118,17 +133,6 @@ int SrsEncoder::cycle()
return ret;
}
void SrsEncoder::on_thread_stop()
{
// kill ffmpeg when finished and it alive
std::vector<SrsFFMPEG*>::iterator it;
for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
SrsFFMPEG* ffmpeg = *it;
ffmpeg->stop();
}
}
void SrsEncoder::clear_engines()
{
std::vector<SrsFFMPEG*>::iterator it;

@ -59,7 +59,8 @@ public:
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
virtual void on_thread_stop();
private:
virtual int do_cycle();
private:
virtual void clear_engines();
virtual SrsFFMPEG* at(int index);

@ -214,10 +214,6 @@ int SrsIngester::cycle()
return ret;
}
void SrsIngester::on_thread_stop()
{
}
void SrsIngester::clear_engines()
{
std::vector<SrsIngesterFFMPEG*>::iterator it;

@ -95,7 +95,6 @@ private:
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
virtual void on_thread_stop();
private:
virtual void clear_engines();
virtual int parse();

@ -79,6 +79,19 @@ void SrsNgExec::on_unpublish()
}
int SrsNgExec::cycle()
{
int ret = do_cycle();
std::vector<SrsProcess*>::iterator it;
for (it = exec_publishs.begin(); it != exec_publishs.end(); ++it) {
SrsProcess* ep = *it;
ep->stop();
}
return ret;
}
int SrsNgExec::do_cycle()
{
int ret = ERROR_SUCCESS;
@ -110,15 +123,6 @@ int SrsNgExec::cycle()
return ret;
}
void SrsNgExec::on_thread_stop()
{
std::vector<SrsProcess*>::iterator it;
for (it = exec_publishs.begin(); it != exec_publishs.end(); ++it) {
SrsProcess* ep = *it;
ep->stop();
}
}
int SrsNgExec::parse_exec_publish(SrsRequest* req)
{
int ret = ERROR_SUCCESS;

@ -56,7 +56,8 @@ public:
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
virtual void on_thread_stop();
private:
virtual int do_cycle();
private:
virtual int parse_exec_publish(SrsRequest* req);
virtual void clear_exec_publish();

@ -96,6 +96,29 @@ int SrsRecvThread::cycle()
{
int ret = ERROR_SUCCESS;
// the multiple messages writev improve performance large,
// but the timeout recv will cause 33% sys call performance,
// to use isolate thread to recv, can improve about 33% performance.
// @see https://github.com/ossrs/srs/issues/194
// @see: https://github.com/ossrs/srs/issues/217
rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS);
pumper->on_start();
ret = do_cycle();
// reset the timeout to pulse mode.
rtmp->set_recv_timeout(timeout * 1000);
pumper->on_stop();
return ret;
}
int SrsRecvThread::do_cycle()
{
int ret = ERROR_SUCCESS;
while (!trd->interrupted()) {
// When the pumper is interrupted, wait then retry.
if (pumper->interrupted()) {
@ -129,26 +152,6 @@ int SrsRecvThread::cycle()
return ret;
}
void SrsRecvThread::on_thread_start()
{
// the multiple messages writev improve performance large,
// but the timeout recv will cause 33% sys call performance,
// to use isolate thread to recv, can improve about 33% performance.
// @see https://github.com/ossrs/srs/issues/194
// @see: https://github.com/ossrs/srs/issues/217
rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS);
pumper->on_start();
}
void SrsRecvThread::on_thread_stop()
{
// reset the timeout to pulse mode.
rtmp->set_recv_timeout(timeout * 1000);
pumper->on_stop();
}
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms)
: trd(this, rtmp_sdk, timeout_ms)
{

@ -112,8 +112,8 @@ public:
// interface ISrsReusableThread2Handler
public:
virtual int cycle();
virtual void on_thread_start();
virtual void on_thread_stop();
private:
virtual int do_cycle();
};
/**

@ -417,11 +417,6 @@ int SrsRtspConn::cycle()
srs_warn("client disconnect peer. ret=%d", ret);
}
return ERROR_SUCCESS;
}
void SrsRtspConn::on_thread_stop()
{
if (video_rtp) {
caster->free_port(video_rtp->port(), video_rtp->port() + 1);
}
@ -431,6 +426,8 @@ void SrsRtspConn::on_thread_stop()
}
caster->remove(this);
return ERROR_SUCCESS;
}
int SrsRtspConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts)

@ -161,7 +161,6 @@ public:
// interface ISrsOneCycleThreadHandler
public:
virtual int cycle();
virtual void on_thread_stop();
private:
virtual int on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts);
virtual int on_rtp_audio(SrsRtpPacket* pkt, int64_t dts);

Loading…
Cancel
Save