diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index d4a576b45..7bfb19b4c 100755 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS #endif +#include #include #define srs_assert(expression) assert(expression) diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index 9ce1b1e78..a98909858 100755 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -31,9 +31,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include -// wait for client message. -#define SRS_PULSE_TIME_MS 100 +#define SRS_PULSE_TIMEOUT_MS 100 +#define SRS_SEND_TIMEOUT_MS 5000 SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) @@ -61,6 +62,9 @@ int SrsClient::do_cycle() return ret; } srs_verbose("get peer ip success. ip=%s", ip); + + rtmp->set_recv_timeout(SRS_SEND_TIMEOUT_MS); + rtmp->set_send_timeout(SRS_SEND_TIMEOUT_MS); if ((ret = rtmp->handshake()) != ERROR_SUCCESS) { srs_error("rtmp handshake failed. ret=%d", ret); @@ -167,27 +171,33 @@ int SrsClient::streaming_play(SrsSource* source) SrsAutoFree(SrsConsumer, consumer, false); srs_verbose("consumer created success."); + rtmp->set_recv_timeout(SRS_PULSE_TIMEOUT_MS); + + int64_t report_time = 0; + int64_t reported_time = 0; + while (true) { + report_time += SRS_PULSE_TIMEOUT_MS; + // switch to other st-threads. st_usleep(0); - - bool ready = false; - if ((ret = rtmp->can_read(SRS_PULSE_TIME_MS, ready)) != ERROR_SUCCESS) { - srs_error("wait client control message failed. ret=%d", ret); - return ret; - } - srs_verbose("client pulse %dms, ready=%d", SRS_PULSE_TIME_MS, ready); // read from client. - if (ready) { + int ctl_msg_ret = ERROR_SUCCESS; + if (true) { SrsCommonMessage* msg = NULL; - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + ctl_msg_ret = ret = rtmp->recv_message(&msg); + + srs_verbose("play loop recv message. ret=%d", ret); + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { srs_error("recv client control message failed. ret=%d", ret); return ret; } - - SrsAutoFree(SrsCommonMessage, msg, false); - // TODO: process it. + if (ret == ERROR_SUCCESS && !msg) { + srs_info("play loop got a message."); + SrsAutoFree(SrsCommonMessage, msg, false); + // TODO: process it. + } } // get messages from consumer. @@ -197,6 +207,11 @@ int SrsClient::streaming_play(SrsSource* source) srs_error("get messages from consumer failed. ret=%d", ret); return ret; } + + // reportable + if (server->can_report(reported_time, report_time)) { + srs_trace("play report, time=%"PRId64", ctl_msg_ret=%d, msgs=%d", report_time, ctl_msg_ret, count); + } if (count <= 0) { srs_verbose("no packets in queue."); diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index 44a69dfcd..6304c1181 100755 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -49,6 +49,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SOCKET_READ_FULLY 208 #define ERROR_SOCKET_WRITE 209 #define ERROR_SOCKET_WAIT 210 +#define ERROR_SOCKET_TIMEOUT 211 #define ERROR_RTMP_PLAIN_REQUIRED 300 #define ERROR_RTMP_CHUNK_START 301 diff --git a/trunk/src/core/srs_core_log.hpp b/trunk/src/core/srs_core_log.hpp index d2627f6cf..8c91ca94b 100755 --- a/trunk/src/core/srs_core_log.hpp +++ b/trunk/src/core/srs_core_log.hpp @@ -78,7 +78,7 @@ extern ILogContext* log_context; #undef srs_verbose #define srs_verbose(msg, ...) (void)0 #endif -#if 1 +#if 0 #undef srs_info #define srs_info(msg, ...) (void)0 #endif diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 0a5f503cf..0b007b8ac 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -274,9 +274,14 @@ SrsProtocol::~SrsProtocol() srs_freep(skt); } -int SrsProtocol::can_read(int timeout_ms, bool& ready) +void SrsProtocol::set_recv_timeout(int timeout_ms) { - return skt->can_read(timeout_ms, ready); + return skt->set_recv_timeout(timeout_ms); +} + +void SrsProtocol::set_send_timeout(int timeout_ms) +{ + return skt->set_send_timeout(timeout_ms); } int SrsProtocol::recv_message(SrsCommonMessage** pmsg) @@ -289,7 +294,9 @@ int SrsProtocol::recv_message(SrsCommonMessage** pmsg) SrsCommonMessage* msg = NULL; if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) { - srs_error("recv interlaced message failed. ret=%d", ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("recv interlaced message failed. ret=%d", ret); + } return ret; } srs_verbose("entire msg received"); @@ -518,7 +525,9 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) int cid = 0; int bh_size = 0; if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) { - srs_error("read basic header failed. ret=%d", ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("read basic header failed. ret=%d", ret); + } return ret; } srs_info("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size); @@ -539,7 +548,9 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) // chunk stream message header int mh_size = 0; if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) { - srs_error("read message header failed. ret=%d", ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("read message header failed. ret=%d", ret); + } return ret; } srs_info("read message header success. " @@ -551,7 +562,9 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) SrsCommonMessage* msg = NULL; int payload_size = 0; if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) { - srs_error("read message payload failed. ret=%d", ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("read message payload failed. ret=%d", ret); + } return ret; } @@ -577,7 +590,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) int required_size = 1; if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + } return ret; } @@ -595,7 +610,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) if (cid == 0) { required_size = 2; if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + } return ret; } @@ -606,7 +623,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) } else if (cid == 1) { required_size = 3; if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) { - srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); + } return ret; } @@ -682,7 +701,9 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz int required_size = bh_size + mh_size; if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); + } return ret; } char* p = buffer->bytes() + bh_size; @@ -768,7 +789,9 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz required_size = bh_size + mh_size; srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); + } return ret; } @@ -833,7 +856,9 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh // read payload to buffer int required_size = bh_size + mh_size + payload_size; if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { - srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); + if (ret != ERROR_SOCKET_TIMEOUT) { + srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); + } return ret; } memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size); diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index 14f1dd83b..b526f0b31 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -95,9 +95,11 @@ public: virtual ~SrsProtocol(); public: /** - * whether the peer can read. + * set the timeout in ms. + * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT. */ - virtual int can_read(int timeout_ms, bool& ready); + virtual void set_recv_timeout(int timeout_ms); + virtual void set_send_timeout(int timeout_ms); /** * recv a message with raw/undecoded payload from peer. * the payload is not decoded, use srs_rtmp_expect_message if requires diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index dde8fe1c8..dc2105fe0 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -146,14 +146,19 @@ SrsRtmp::~SrsRtmp() srs_freep(protocol); } -int SrsRtmp::recv_message(SrsCommonMessage** pmsg) +void SrsRtmp::set_recv_timeout(int timeout_ms) { - return protocol->recv_message(pmsg); + return protocol->set_recv_timeout(timeout_ms); } -int SrsRtmp::can_read(int timeout_ms, bool& ready) +void SrsRtmp::set_send_timeout(int timeout_ms) { - return protocol->can_read(timeout_ms, ready); + return protocol->set_send_timeout(timeout_ms); +} + +int SrsRtmp::recv_message(SrsCommonMessage** pmsg) +{ + return protocol->recv_message(pmsg); } int SrsRtmp::send_message(ISrsMessage* msg) diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 023fe692e..054669e84 100755 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -101,8 +101,9 @@ public: SrsRtmp(st_netfd_t client_stfd); virtual ~SrsRtmp(); public: + virtual void set_recv_timeout(int timeout_ms); + virtual void set_send_timeout(int timeout_ms); virtual int recv_message(SrsCommonMessage** pmsg); - virtual int can_read(int timeout_ms, bool& ready); virtual int send_message(ISrsMessage* msg); public: virtual int handshake(); diff --git a/trunk/src/core/srs_core_server.cpp b/trunk/src/core/srs_core_server.cpp index bc7dcb063..945d931b2 100755 --- a/trunk/src/core/srs_core_server.cpp +++ b/trunk/src/core/srs_core_server.cpp @@ -37,8 +37,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SERVER_LISTEN_BACKLOG 10 +// global value, ensure the report interval, +// it will be changed when clients increase. +#define SRS_CONST_REPORT_INTERVAL_MS 3000 + SrsServer::SrsServer() { + srs_report_interval_ms = SRS_CONST_REPORT_INTERVAL_MS; } SrsServer::~SrsServer() @@ -155,6 +160,20 @@ void SrsServer::remove(SrsConnection* conn) srs_freep(conn); } +bool SrsServer::can_report(int64_t& reported, int64_t time) +{ + if (srs_report_interval_ms <= 0) { + return false; + } + + if (time - reported < srs_report_interval_ms) { + return false; + } + + reported = time; + return true; +} + int SrsServer::accept_client(st_netfd_t client_stfd) { int ret = ERROR_SUCCESS; @@ -164,6 +183,9 @@ int SrsServer::accept_client(st_netfd_t client_stfd) // directly enqueue, the cycle thread will remove the client. conns.push_back(conn); srs_verbose("add conn to vector. conns=%d", (int)conns.size()); + + // ensure the report interval is consts + srs_report_interval_ms = SRS_CONST_REPORT_INTERVAL_MS * (int)conns.size(); // cycle will start process thread and when finished remove the client. if ((ret = conn->start()) != ERROR_SUCCESS) { diff --git a/trunk/src/core/srs_core_server.hpp b/trunk/src/core/srs_core_server.hpp index e5007b46f..a32fce47f 100755 --- a/trunk/src/core/srs_core_server.hpp +++ b/trunk/src/core/srs_core_server.hpp @@ -41,6 +41,7 @@ private: int fd; st_netfd_t stfd; std::vector conns; + int srs_report_interval_ms; public: SrsServer(); virtual ~SrsServer(); @@ -49,6 +50,7 @@ public: virtual int listen(int port); virtual int cycle(); virtual void remove(SrsConnection* conn); + virtual bool can_report(int64_t& reported, int64_t time); private: virtual int accept_client(st_netfd_t client_stfd); virtual void listen_cycle(); diff --git a/trunk/src/core/srs_core_socket.cpp b/trunk/src/core/srs_core_socket.cpp index 7708c2902..41abcea6a 100755 --- a/trunk/src/core/srs_core_socket.cpp +++ b/trunk/src/core/srs_core_socket.cpp @@ -28,41 +28,37 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. SrsSocket::SrsSocket(st_netfd_t client_stfd) { stfd = client_stfd; + recv_timeout = ST_UTIME_NO_TIMEOUT; + send_timeout = ST_UTIME_NO_TIMEOUT; } SrsSocket::~SrsSocket() { } -int SrsSocket::can_read(int timeout_ms, bool& ready) +void SrsSocket::set_recv_timeout(int timeout_ms) { - ready = false; - int ret = ERROR_SUCCESS; - - // If the named file descriptor object is ready for I/O within the specified amount of time, - // a value of 0 is returned. Otherwise, a value of -1 is returned and errno is set to - // indicate the error - if(st_netfd_poll(stfd, POLLIN, timeout_ms * 1000) == -1){ - if(errno == ETIME){ - return ret; - } - - return ERROR_SOCKET_WAIT; - } - - ready = true; - return ret; + recv_timeout = timeout_ms * 1000; +} + +void SrsSocket::set_send_timeout(int timeout_ms) +{ + send_timeout = timeout_ms * 1000; } int SrsSocket::read(const void* buf, size_t size, ssize_t* nread) { int ret = ERROR_SUCCESS; - *nread = st_read(stfd, (void*)buf, size, ST_UTIME_NO_TIMEOUT); + *nread = st_read(stfd, (void*)buf, size, recv_timeout); // On success a non-negative integer indicating the number of bytes actually read is returned // (a value of 0 means the network connection is closed or end of file is reached). if (*nread <= 0) { + if (errno == ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + if (*nread == 0) { errno = ECONNRESET; } @@ -77,11 +73,15 @@ int SrsSocket::read_fully(const void* buf, size_t size, ssize_t* nread) { int ret = ERROR_SUCCESS; - *nread = st_read_fully(stfd, (void*)buf, size, ST_UTIME_NO_TIMEOUT); + *nread = st_read_fully(stfd, (void*)buf, size, recv_timeout); // On success a non-negative integer indicating the number of bytes actually read is returned // (a value less than nbyte means the network connection is closed or end of file is reached) if (*nread != (ssize_t)size) { + if (errno == ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + if (*nread >= 0) { errno = ECONNRESET; } @@ -96,9 +96,13 @@ int SrsSocket::write(const void* buf, size_t size, ssize_t* nwrite) { int ret = ERROR_SUCCESS; - *nwrite = st_write(stfd, (void*)buf, size, ST_UTIME_NO_TIMEOUT); + *nwrite = st_write(stfd, (void*)buf, size, send_timeout); if (*nwrite <= 0) { + if (errno == ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + ret = ERROR_SOCKET_WRITE; } @@ -109,9 +113,13 @@ int SrsSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { int ret = ERROR_SUCCESS; - *nwrite = st_writev(stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); + *nwrite = st_writev(stfd, iov, iov_size, send_timeout); if (*nwrite <= 0) { + if (errno == ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + ret = ERROR_SOCKET_WRITE; } diff --git a/trunk/src/core/srs_core_socket.hpp b/trunk/src/core/srs_core_socket.hpp index 678306020..e680aac17 100755 --- a/trunk/src/core/srs_core_socket.hpp +++ b/trunk/src/core/srs_core_socket.hpp @@ -39,12 +39,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsSocket { private: + int64_t recv_timeout; + int64_t send_timeout; st_netfd_t stfd; public: SrsSocket(st_netfd_t client_stfd); virtual ~SrsSocket(); public: - virtual int can_read(int timeout_ms, bool& ready); + virtual void set_recv_timeout(int timeout_ms); + virtual void set_send_timeout(int timeout_ms); virtual int read(const void* buf, size_t size, ssize_t* nread); virtual int read_fully(const void* buf, size_t size, ssize_t* nread); virtual int write(const void* buf, size_t size, ssize_t* nwrite); diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index 5236ac5fb..f9d376a5a 100755 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include + #include #include #include @@ -40,8 +42,9 @@ SrsSource* SrsSource::find(std::string stream_url) return pool[stream_url]; } -SrsConsumer::SrsConsumer() +SrsConsumer::SrsConsumer(SrsSource* _source) { + source = _source; } SrsConsumer::~SrsConsumer() @@ -52,6 +55,8 @@ SrsConsumer::~SrsConsumer() srs_freep(msg); } msgs.clear(); + + source->on_consumer_destroy(this); } int SrsConsumer::enqueue(SrsSharedPtrMessage* msg) @@ -235,7 +240,7 @@ int SrsSource::on_video(SrsCommonMessage* video) { int ret = ERROR_SUCCESS; - consumer = new SrsConsumer(); + consumer = new SrsConsumer(this); consumers.push_back(consumer); if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy())) != ERROR_SUCCESS) { @@ -259,3 +264,13 @@ int SrsSource::on_video(SrsCommonMessage* video) return ret; } +void SrsSource::on_consumer_destroy(SrsConsumer* consumer) +{ + std::vector::iterator it; + it = std::find(consumers.begin(), consumers.end(), consumer); + if (it != consumers.end()) { + consumers.erase(it); + } + srs_info("handle consumer destroy success."); +} + diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index 0cf0ac524..624c8ad80 100755 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +class SrsSource; class SrsCommonMessage; class SrsOnMetaDataPacket; class SrsSharedPtrMessage; @@ -44,9 +45,10 @@ class SrsSharedPtrMessage; class SrsConsumer { private: + SrsSource* source; std::vector msgs; public: - SrsConsumer(); + SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); public: /** @@ -95,6 +97,7 @@ public: virtual int on_video(SrsCommonMessage* video); public: virtual int create_consumer(SrsConsumer*& consumer); + virtual void on_consumer_destroy(SrsConsumer* consumer); }; #endif \ No newline at end of file