fix #78 st joinable thread must be stop by other threads, 0.9.113

pull/133/head
winlin 11 years ago
parent 6fba0db9b5
commit 0eb9e0af5b

@ -231,6 +231,7 @@ Supported operating systems and hardware:
* 2013-10-17, Created.<br/> * 2013-10-17, Created.<br/>
## History ## History
* v1.0, 2014-05-22, fix [#78](https://github.com/winlinvip/simple-rtmp-server/issues/78), st joinable thread must be stop by other threads, 0.9.113
* v1.0, 2014-05-22, support amf0 StrictArray(0x0a). 0.9.111. * v1.0, 2014-05-22, support amf0 StrictArray(0x0a). 0.9.111.
* v1.0, 2014-05-22, support flv parser, add amf0 to librtmp. 0.9.110 * v1.0, 2014-05-22, support flv parser, add amf0 to librtmp. 0.9.110
* v1.0, 2014-05-22, fix [#74](https://github.com/winlinvip/simple-rtmp-server/issues/74), add tcUrl for http callback on_connect, 0.9.109 * v1.0, 2014-05-22, fix [#74](https://github.com/winlinvip/simple-rtmp-server/issues/74), add tcUrl for http callback on_connect, 0.9.109

@ -35,7 +35,11 @@ SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd)
server = srs_server; server = srs_server;
stfd = client_stfd; stfd = client_stfd;
connection_id = 0; connection_id = 0;
pthread = new SrsThread(this, 0); // the client thread should reap itself,
// so we never use joinable.
// TODO: FIXME: maybe other thread need to stop it.
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/78
pthread = new SrsThread(this, 0, false);
} }
SrsConnection::~SrsConnection() SrsConnection::~SrsConnection()

@ -69,7 +69,7 @@ SrsEdgeIngester::SrsEdgeIngester()
origin_index = 0; origin_index = 0;
stream_id = 0; stream_id = 0;
stfd = NULL; stfd = NULL;
pthread = new SrsThread(this, SRS_EDGE_INGESTER_SLEEP_US); pthread = new SrsThread(this, SRS_EDGE_INGESTER_SLEEP_US, true);
} }
SrsEdgeIngester::~SrsEdgeIngester() SrsEdgeIngester::~SrsEdgeIngester()
@ -344,7 +344,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
origin_index = 0; origin_index = 0;
stream_id = 0; stream_id = 0;
stfd = NULL; stfd = NULL;
pthread = new SrsThread(this, SRS_EDGE_FORWARDER_SLEEP_US); pthread = new SrsThread(this, SRS_EDGE_FORWARDER_SLEEP_US, true);
queue = new SrsMessageQueue(); queue = new SrsMessageQueue();
send_error_code = ERROR_SUCCESS; send_error_code = ERROR_SUCCESS;
} }

@ -44,7 +44,7 @@ static std::vector<std::string> _transcoded_url;
SrsEncoder::SrsEncoder() SrsEncoder::SrsEncoder()
{ {
pthread = new SrsThread(this, SRS_RTMP_ENCODER_SLEEP_US); pthread = new SrsThread(this, SRS_RTMP_ENCODER_SLEEP_US, true);
pithy_print = new SrsPithyPrint(SRS_STAGE_ENCODER); pithy_print = new SrsPithyPrint(SRS_STAGE_ENCODER);
} }

@ -54,7 +54,7 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
kbps = new SrsKbps(); kbps = new SrsKbps();
stream_id = 0; stream_id = 0;
pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_US); pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_US, true);
queue = new SrsMessageQueue(); queue = new SrsMessageQueue();
jitter = new SrsRtmpJitter(); jitter = new SrsRtmpJitter();
} }

@ -291,6 +291,13 @@ SrsHttpHandler* SrsHttpHandler::res_content_type_mpegts(stringstream& ss)
return this; return this;
} }
SrsHttpHandler* SrsHttpHandler::res_content_type_flv(stringstream& ss)
{
ss << "Content-Type: video/x-flv" << __CRLF
<< "Allow: DELETE, GET, HEAD, OPTIONS, POST, PUT" << __CRLF;
return this;
}
SrsHttpHandler* SrsHttpHandler::res_content_length(stringstream& ss, int64_t length) SrsHttpHandler* SrsHttpHandler::res_content_length(stringstream& ss, int64_t length)
{ {
ss << "Content-Length: "<< length << __CRLF; ss << "Content-Length: "<< length << __CRLF;
@ -1008,7 +1015,7 @@ const char* SrsHttpUri::get_path()
const char* SrsHttpUri::get_query() const char* SrsHttpUri::get_query()
{ {
return path.data(); return query.data();
} }
string SrsHttpUri::get_uri_field(string uri, http_parser_url* hp_u, http_parser_url_fields field) string SrsHttpUri::get_uri_field(string uri, http_parser_url* hp_u, http_parser_url_fields field)

@ -250,6 +250,7 @@ public:
virtual SrsHttpHandler* res_content_type_json(std::stringstream& ss); virtual SrsHttpHandler* res_content_type_json(std::stringstream& ss);
virtual SrsHttpHandler* res_content_type_m3u8(std::stringstream& ss); virtual SrsHttpHandler* res_content_type_m3u8(std::stringstream& ss);
virtual SrsHttpHandler* res_content_type_mpegts(std::stringstream& ss); virtual SrsHttpHandler* res_content_type_mpegts(std::stringstream& ss);
virtual SrsHttpHandler* res_content_type_flv(std::stringstream& ss);
virtual SrsHttpHandler* res_content_length(std::stringstream& ss, int64_t length); virtual SrsHttpHandler* res_content_length(std::stringstream& ss, int64_t length);
virtual SrsHttpHandler* res_enable_crossdomain(std::stringstream& ss); virtual SrsHttpHandler* res_enable_crossdomain(std::stringstream& ss);
virtual SrsHttpHandler* res_header_eof(std::stringstream& ss); virtual SrsHttpHandler* res_header_eof(std::stringstream& ss);

@ -165,6 +165,8 @@ int SrsHttpVhost::do_process_request(SrsSocket* skt, SrsHttpMessage* req)
if (srs_string_ends_with(fullpath, ".ts")) { if (srs_string_ends_with(fullpath, ".ts")) {
return response_ts_file(skt, req, fullpath); return response_ts_file(skt, req, fullpath);
} else if (srs_string_ends_with(fullpath, ".flv") || srs_string_ends_with(fullpath, ".fhv")) {
return response_flv_file(skt, req, fullpath);
} else { } else {
return response_regular_file(skt, req, fullpath); return response_regular_file(skt, req, fullpath);
} }
@ -225,6 +227,62 @@ int SrsHttpVhost::response_regular_file(SrsSocket* skt, SrsHttpMessage* req, str
return ret; return ret;
} }
int SrsHttpVhost::response_flv_file(SrsSocket* skt, SrsHttpMessage* req, string fullpath)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: use more advance cache.
// for ts video large file, use bytes to write it.
int fd = ::open(fullpath.c_str(), O_RDONLY);
if (fd < 0) {
ret = ERROR_HTTP_OPEN_FILE;
srs_warn("open file %s failed, ret=%d", fullpath.c_str(), ret);
return ret;
}
int64_t length = (int64_t)::lseek(fd, 0, SEEK_END);
::lseek(fd, 0, SEEK_SET);
// write http header for ts.
std::stringstream ss;
res_status_line(ss)->res_content_type_flv(ss)
->res_content_length(ss, (int)length);
if (req->requires_crossdomain()) {
res_enable_crossdomain(ss);
}
res_header_eof(ss);
// flush http header to peer
if ((ret = res_flush(skt, ss)) != ERROR_SUCCESS) {
return ret;
}
// write body.
int64_t left = length;
char* buf = req->http_ts_send_buffer();
while (left > 0) {
ssize_t nread = -1;
// TODO: FIXME: use st_read.
if ((nread = ::read(fd, buf, HTTP_TS_SEND_BUFFER_SIZE)) < 0) {
ret = ERROR_HTTP_READ_FILE;
srs_warn("read file %s failed, ret=%d", fullpath.c_str(), ret);
break;
}
left -= nread;
if ((ret = skt->write(buf, nread, NULL)) != ERROR_SUCCESS) {
break;
}
}
::close(fd);
return ret;
}
int SrsHttpVhost::response_ts_file(SrsSocket* skt, SrsHttpMessage* req, string fullpath) int SrsHttpVhost::response_ts_file(SrsSocket* skt, SrsHttpMessage* req, string fullpath)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -266,10 +324,9 @@ int SrsHttpVhost::response_ts_file(SrsSocket* skt, SrsHttpMessage* req, string f
ssize_t nread = -1; ssize_t nread = -1;
// TODO: FIXME: use st_read. // TODO: FIXME: use st_read.
if ((nread = ::read(fd, buf, HTTP_TS_SEND_BUFFER_SIZE)) < 0) { if ((nread = ::read(fd, buf, HTTP_TS_SEND_BUFFER_SIZE)) < 0) {
::close(fd);
ret = ERROR_HTTP_READ_FILE; ret = ERROR_HTTP_READ_FILE;
srs_warn("read file %s failed, ret=%d", fullpath.c_str(), ret); srs_warn("read file %s failed, ret=%d", fullpath.c_str(), ret);
return ret; break;
} }
left -= nread; left -= nread;

@ -71,6 +71,7 @@ protected:
virtual int do_process_request(SrsSocket* skt, SrsHttpMessage* req); virtual int do_process_request(SrsSocket* skt, SrsHttpMessage* req);
private: private:
virtual int response_regular_file(SrsSocket* skt, SrsHttpMessage* req, std::string fullpath); virtual int response_regular_file(SrsSocket* skt, SrsHttpMessage* req, std::string fullpath);
virtual int response_flv_file(SrsSocket* skt, SrsHttpMessage* req, std::string fullpath);
virtual int response_ts_file(SrsSocket* skt, SrsHttpMessage* req, std::string fullpath); virtual int response_ts_file(SrsSocket* skt, SrsHttpMessage* req, std::string fullpath);
virtual std::string get_request_file(SrsHttpMessage* req); virtual std::string get_request_file(SrsHttpMessage* req);
public: public:

@ -53,7 +53,7 @@ SrsIngester::SrsIngester()
{ {
_srs_config->subscribe(this); _srs_config->subscribe(this);
pthread = new SrsThread(this, SRS_AUTO_INGESTER_SLEEP_US); pthread = new SrsThread(this, SRS_AUTO_INGESTER_SLEEP_US, true);
pithy_print = new SrsPithyPrint(SRS_STAGE_INGESTER); pithy_print = new SrsPithyPrint(SRS_STAGE_INGESTER);
} }

@ -84,7 +84,7 @@ SrsListener::SrsListener(SrsServer* server, SrsListenerType type)
_server = server; _server = server;
_type = type; _type = type;
pthread = new SrsThread(this, 0); pthread = new SrsThread(this, 0, true);
} }
SrsListener::~SrsListener() SrsListener::~SrsListener()
@ -197,7 +197,7 @@ SrsSignalManager::SrsSignalManager(SrsServer* server)
_server = server; _server = server;
sig_pipe[0] = sig_pipe[1] = -1; sig_pipe[0] = sig_pipe[1] = -1;
pthread = new SrsThread(this, 0); pthread = new SrsThread(this, 0, true);
signal_read_stfd = NULL; signal_read_stfd = NULL;
} }

@ -54,7 +54,7 @@ void ISrsThreadHandler::on_thread_stop()
{ {
} }
SrsThread::SrsThread(ISrsThreadHandler* thread_handler, int64_t interval_us) SrsThread::SrsThread(ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
{ {
handler = thread_handler; handler = thread_handler;
cycle_interval_us = interval_us; cycle_interval_us = interval_us;
@ -62,6 +62,7 @@ SrsThread::SrsThread(ISrsThreadHandler* thread_handler, int64_t interval_us)
tid = NULL; tid = NULL;
loop = false; loop = false;
_cid = -1; _cid = -1;
_joinable = joinable;
} }
SrsThread::~SrsThread() SrsThread::~SrsThread()
@ -83,7 +84,7 @@ int SrsThread::start()
return ret; return ret;
} }
if((tid = st_thread_create(thread_fun, this, 1, 0)) == NULL){ if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD; ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("st_thread_create failed. ret=%d", ret); srs_error("st_thread_create failed. ret=%d", ret);
return ret; return ret;
@ -93,7 +94,7 @@ int SrsThread::start()
loop = true; loop = true;
// wait for cid to ready, for parent thread to get the cid. // wait for cid to ready, for parent thread to get the cid.
while (_cid < 0) { while (_cid < 0 && loop) {
st_usleep(10 * SRS_TIME_MILLISECONDS); st_usleep(10 * SRS_TIME_MILLISECONDS);
} }

@ -88,6 +88,7 @@ private:
st_thread_t tid; st_thread_t tid;
int _cid; int _cid;
bool loop; bool loop;
bool _joinable;
private: private:
ISrsThreadHandler* handler; ISrsThreadHandler* handler;
int64_t cycle_interval_us; int64_t cycle_interval_us;
@ -96,8 +97,15 @@ public:
* initialize the thread. * initialize the thread.
* @param thread_handler, the cycle handler for the thread. * @param thread_handler, the cycle handler for the thread.
* @param interval_us, the sleep interval when cycle finished. * @param interval_us, the sleep interval when cycle finished.
* @param joinable, if joinable, other thread must stop the thread.
* @remark if joinable, thread never quit itself, or memory leak.
* @see: https://github.com/winlinvip/simple-rtmp-server/issues/78
*/ */
SrsThread(ISrsThreadHandler* thread_handler, int64_t interval_us); /**
* TODO: FIXME: maybe all thread must be reap by others threads,
* @see: https://github.com/winlinvip/simple-rtmp-server/issues/77
*/
SrsThread(ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable);
virtual ~SrsThread(); virtual ~SrsThread();
public: public:
/** /**

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version // current release version
#define VERSION_MAJOR "0" #define VERSION_MAJOR "0"
#define VERSION_MINOR "9" #define VERSION_MINOR "9"
#define VERSION_REVISION "112" #define VERSION_REVISION "113"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info. // server info.
#define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_KEY "SRS"

Loading…
Cancel
Save