From 5c9a12e72ad608ad2aa3b636256fa04b0be58620 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 29 Jul 2017 12:45:17 +0800 Subject: [PATCH] For #913, use complex error for listener --- trunk/src/app/srs_app_caster_flv.cpp | 28 ++--- trunk/src/app/srs_app_caster_flv.hpp | 4 +- trunk/src/app/srs_app_edge.cpp | 16 ++- trunk/src/app/srs_app_forward.cpp | 10 +- trunk/src/app/srs_app_http_api.cpp | 10 +- trunk/src/app/srs_app_http_conn.cpp | 10 +- trunk/src/app/srs_app_listener.cpp | 99 ++++++--------- trunk/src/app/srs_app_listener.hpp | 10 +- trunk/src/app/srs_app_mpegts_udp.cpp | 8 +- trunk/src/app/srs_app_mpegts_udp.hpp | 2 +- trunk/src/app/srs_app_rtmp_conn.cpp | 10 +- trunk/src/app/srs_app_rtsp.cpp | 40 +++--- trunk/src/app/srs_app_rtsp.hpp | 6 +- trunk/src/app/srs_app_server.cpp | 168 +++++++++++--------------- trunk/src/app/srs_app_server.hpp | 18 +-- trunk/src/app/srs_app_st.hpp | 10 +- trunk/src/core/srs_core.hpp | 4 +- trunk/src/kernel/srs_kernel_error.cpp | 26 ++-- trunk/src/kernel/srs_kernel_error.hpp | 32 ++--- 19 files changed, 244 insertions(+), 267 deletions(-) diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 42d464172..93c1325d0 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -60,33 +60,23 @@ SrsAppCasterFlv::~SrsAppCasterFlv() srs_freep(manager); } -int SrsAppCasterFlv::initialize() +srs_error_t SrsAppCasterFlv::initialize() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; if ((err = http_mux->handle("/", this)) != srs_success) { - // TODO: FIXME: Use error. - ret = srs_error_code(err); - srs_freep(err); - - return ret; + return srs_error_wrap(err, "handle root"); } if ((err = manager->start()) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - - return ret; + return srs_error_wrap(err, "start manager"); } - return ret; + return err; } -int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) +srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; string ip = srs_get_peer_ip(srs_netfd_fileno(stfd)); @@ -94,14 +84,10 @@ int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) conns.push_back(conn); if ((err = conn->start()) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - - return ret; + return srs_error_wrap(err, "start tcp listener"); } - return ret; + return err; } void SrsAppCasterFlv::remove(ISrsConnection* c) diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index f22e074a6..da76343f5 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -63,10 +63,10 @@ public: SrsAppCasterFlv(SrsConfDirective* c); virtual ~SrsAppCasterFlv(); public: - virtual int initialize(); + virtual srs_error_t initialize(); // ISrsTcpHandler public: - virtual int on_tcp_client(srs_netfd_t stfd); + virtual srs_error_t on_tcp_client(srs_netfd_t stfd); // IConnectionManager public: virtual void remove(ISrsConnection* c); diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index a4da33dd8..b0f04c844 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -302,7 +302,15 @@ int SrsEdgeIngester::ingest() // set to larger timeout to read av data from origin. upstream->set_recv_timeout(SRS_EDGE_INGESTER_TMMS); - while (!trd->pull()) { + while (true) { + srs_error_t err = srs_success; + if ((err = trd->pull()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; + } + pprint->elapse(); // pithy print @@ -555,7 +563,11 @@ srs_error_t SrsEdgeForwarder::do_cycle() SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS); - while (!trd->pull()) { + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "edge forward pull"); + } + if (send_error_code != ERROR_SUCCESS) { srs_usleep(SRS_EDGE_FORWARDER_TMMS * 1000); continue; diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index e5de37a50..f418e8b34 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -313,7 +313,15 @@ int SrsForwarder::forward() } } - while (!trd->pull()) { + while (true) { + srs_error_t err = srs_success; + if ((err = trd->pull()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; + } + pprint->elapse(); // read from client. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 90bcea87c..d75bcae1d 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1353,7 +1353,15 @@ int SrsHttpApi::do_cycle() } // process http messages. - while(!trd->pull()) { + while (true) { + srs_error_t err = srs_success; + if ((err = trd->pull()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; + } + ISrsHttpMessage* req = NULL; // get a http message diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index e9a527cc3..a04d4dba1 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -121,7 +121,15 @@ int SrsHttpConn::do_cycle() } // process http messages. - while (!trd->pull()) { + while (true) { + srs_error_t err = srs_success; + if ((err = trd->pull()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; + } + ISrsHttpMessage* req = NULL; // get a http message diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 152d82214..72c13c21d 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -55,9 +55,9 @@ ISrsUdpHandler::~ISrsUdpHandler() { } -int ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/) +srs_error_t ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/) { - return ERROR_SUCCESS; + return srs_success; } ISrsTcpHandler::ISrsTcpHandler() @@ -107,17 +107,13 @@ srs_netfd_t SrsUdpListener::stfd() return _stfd; } -int SrsUdpListener::listen() +srs_error_t SrsUdpListener::listen() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; if ((_fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { - ret = ERROR_SOCKET_CREATE; - srs_error("create linux socket error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); - return ret; + return srs_error_new(ERROR_SOCKET_CREATE, "create socket"); } - srs_verbose("create linux socket success. ip=%s, port=%d, fd=%d", ip.c_str(), port, _fd); srs_fd_close_exec(_fd); srs_socket_reuse_addr(_fd); @@ -127,40 +123,31 @@ int SrsUdpListener::listen() addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) { - ret = ERROR_SOCKET_BIND; - srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); - return ret; + return srs_error_new(ERROR_SOCKET_BIND, "bind socket"); } - srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret); - return ret; + return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket"); } - srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); srs_freep(trd); trd = new SrsSTCoroutine("udp", this); if ((err = trd->start()) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - - srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); - return ret; + return srs_error_wrap(err, "start thread"); } - srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port); - return ret; + return err; } srs_error_t SrsUdpListener::cycle() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; - while (!trd->pull()) { + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "udp listener"); + } + // TODO: FIXME: support ipv6, @see man 7 ipv6 sockaddr_in from; int nb_from = sizeof(sockaddr_in); @@ -170,8 +157,8 @@ srs_error_t SrsUdpListener::cycle() return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread); } - if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) { - return srs_error_new(ret, "handle packet %d bytes", nread); + if ((err = handler->on_udp_packet(&from, buf, nread)) != srs_success) { + return srs_error_wrap(err, "handle packet %d bytes", nread); } if (SrsUdpPacketRecvCycleMS > 0) { @@ -206,17 +193,13 @@ int SrsTcpListener::fd() return _fd; } -int SrsTcpListener::listen() +srs_error_t SrsTcpListener::listen() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - ret = ERROR_SOCKET_CREATE; - srs_error("create linux socket error. port=%d, ret=%d", port, ret); - return ret; + return srs_error_new(ERROR_SOCKET_CREATE, "create socket"); } - srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd); srs_fd_close_exec(_fd); srs_socket_reuse_addr(_fd); @@ -226,59 +209,45 @@ int SrsTcpListener::listen() addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) { - ret = ERROR_SOCKET_BIND; - srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); - return ret; + return srs_error_new(ERROR_SOCKET_BIND, "bind socket"); } - srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) { - ret = ERROR_SOCKET_LISTEN; - srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); - return ret; + return srs_error_new(ERROR_SOCKET_LISTEN, "listen socket"); } - srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret); - return ret; + return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket"); } - srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); srs_freep(trd); trd = new SrsSTCoroutine("tcp", this); if ((err = trd->start()) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - - srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); - return ret; + return srs_error_wrap(err, "start coroutine"); } - srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port); - return ret; + return err; } srs_error_t SrsTcpListener::cycle() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; - while (!trd->pull()) { - srs_netfd_t stfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); - int fd = srs_netfd_fileno(stfd); - - srs_fd_close_exec(fd); + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "tcp listener"); + } - if(stfd == NULL){ - return err; + srs_netfd_t cstfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); + if(cstfd == NULL){ + return srs_error_new(ERROR_SOCKET_CREATE, "accept failed"); } - srs_verbose("get a client. fd=%d", fd); - if ((ret = handler->on_tcp_client(stfd)) != ERROR_SUCCESS) { - return srs_error_new(ret, "handle fd=%d", fd); + int cfd = srs_netfd_fileno(cstfd); + srs_fd_close_exec(cfd); + + if ((err = handler->on_tcp_client(cstfd)) != srs_success) { + return srs_error_wrap(err, "handle fd=%d", cfd); } } diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index bd4be0518..7d0c75389 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -46,7 +46,7 @@ public: * when fd changed, for instance, reload the listen port, * notify the handler and user can do something. */ - virtual int on_stfd_change(srs_netfd_t fd); + virtual srs_error_t on_stfd_change(srs_netfd_t fd); public: /** * when udp listener got a udp packet, notice server to process it. @@ -57,7 +57,7 @@ public: * @param nb_buf, the size of udp packet bytes. * @remark user should never use the buf, for it's a shared memory bytes. */ - virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) = 0; + virtual srs_error_t on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) = 0; }; /** @@ -72,7 +72,7 @@ public: /** * when got tcp client. */ - virtual int on_tcp_client(srs_netfd_t stfd) = 0; + virtual srs_error_t on_tcp_client(srs_netfd_t stfd) = 0; }; /** @@ -98,7 +98,7 @@ public: virtual int fd(); virtual srs_netfd_t stfd(); public: - virtual int listen(); + virtual srs_error_t listen(); // interface ISrsReusableThreadHandler. public: virtual srs_error_t cycle(); @@ -123,7 +123,7 @@ public: public: virtual int fd(); public: - virtual int listen(); + virtual srs_error_t listen(); // interface ISrsReusableThreadHandler. public: virtual srs_error_t cycle(); diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index 1af799f73..730c28626 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -157,7 +157,7 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp() srs_freep(pprint); } -int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) +srs_error_t SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) { std::string peer_ip = inet_ntoa(from->sin_addr); int peer_port = ntohs(from->sin_port); @@ -168,7 +168,11 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) srs_info("udp: got %s:%d packet %d/%d bytes", peer_ip.c_str(), peer_port, nb_buf, buffer->length()); - return on_udp_bytes(peer_ip, peer_port, buf, nb_buf); + int ret = on_udp_bytes(peer_ip, peer_port, buf, nb_buf); + if (ret != ERROR_SUCCESS) { + return srs_error_new(ret, "process udp"); + } + return srs_success; } int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf) diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index a983869af..a1c045558 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -101,7 +101,7 @@ public: virtual ~SrsMpegtsOverUdp(); // interface ISrsUdpHandler public: - virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf); + virtual srs_error_t on_udp_packet(sockaddr_in* from, char* buf, int nb_buf); private: virtual int on_udp_bytes(std::string host, int port, char* buf, int nb_buf); // interface ISrsTsHandler diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index b0e031629..399431644 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -450,7 +450,15 @@ int SrsRtmpConn::service_cycle() } srs_verbose("on_bw_done success"); - while (!trd->pull()) { + while (true) { + srs_error_t err = srs_success; + if ((err = trd->pull()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + return ret; + } + ret = stream_service_cycle(); // stream service must terminated with error, never success. diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 5d929c899..583af3702 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -71,14 +71,15 @@ int SrsRtpConn::port() return _port; } -int SrsRtpConn::listen() +srs_error_t SrsRtpConn::listen() { return listener->listen(); } -int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) +srs_error_t SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; pprint->elapse(); @@ -86,13 +87,12 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) SrsBuffer stream; if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "stream"); } SrsRtpPacket pkt; if ((ret = pkt.decode(&stream)) != ERROR_SUCCESS) { - srs_error("rtsp: decode rtp packet failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "decode"); } if (pkt.chunked) { @@ -106,7 +106,7 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc, cache->payload->length() ); - return ret; + return err; } } else { srs_freep(cache); @@ -126,11 +126,10 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) SrsAutoFree(SrsRtpPacket, cache); if ((ret = rtsp->on_rtp_packet(cache, stream_id)) != ERROR_SUCCESS) { - srs_error("rtsp: process rtp packet failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "process rtp packet"); } - return ret; + return err; } SrsRtspAudioCache::SrsRtspAudioCache() @@ -256,7 +255,11 @@ srs_error_t SrsRtspConn::do_cycle() srs_trace("rtsp: serve %s", ip.c_str()); // consume all rtsp messages. - while (!trd->pull()) { + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "rtsp cycle"); + } + SrsRtspRequest* req = NULL; if ((ret = rtsp->recv_message(&req)) != ERROR_SUCCESS) { return srs_error_new(ret, "recv message"); @@ -316,8 +319,8 @@ srs_error_t SrsRtspConn::do_cycle() srs_freep(audio_rtp); rtp = audio_rtp = new SrsRtpConn(this, lpm, audio_id); } - if ((ret = rtp->listen()) != ERROR_SUCCESS) { - return srs_error_new(ret, "rtp listen"); + if ((err = rtp->listen()) != srs_success) { + return srs_error_wrap(err, "rtp listen"); } srs_trace("rtsp: #%d %s over %s/%s/%s %s client-port=%d-%d, server-port=%d-%d", req->stream_id, (req->stream_id == video_id)? "Video":"Audio", @@ -728,27 +731,20 @@ void SrsRtspCaster::free_port(int lpmin, int lpmax) srs_trace("rtsp: free rtp port=%d-%d", lpmin, lpmax); } -int SrsRtspCaster::on_tcp_client(srs_netfd_t stfd) +srs_error_t SrsRtspCaster::on_tcp_client(srs_netfd_t stfd) { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; SrsRtspConn* conn = new SrsRtspConn(this, stfd, output); if ((err = conn->serve()) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - - srs_error("rtsp: serve client failed. ret=%d", ret); srs_freep(conn); - return ret; + return srs_error_wrap(err, "serve conn"); } clients.push_back(conn); - srs_info("rtsp: start thread to serve client."); - return ret; + return err; } void SrsRtspCaster::remove(SrsRtspConn* conn) diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 7373fd8ad..516df163b 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -71,10 +71,10 @@ public: virtual ~SrsRtpConn(); public: virtual int port(); - virtual int listen(); + virtual srs_error_t listen(); // interface ISrsUdpHandler public: - virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf); + virtual srs_error_t on_udp_packet(sockaddr_in* from, char* buf, int nb_buf); }; /** @@ -206,7 +206,7 @@ public: virtual void free_port(int lpmin, int lpmax); // interface ISrsTcpHandler public: - virtual int on_tcp_client(srs_netfd_t stfd); + virtual srs_error_t on_tcp_client(srs_netfd_t stfd); // internal methods. public: virtual void remove(SrsRtspConn* conn); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 73c90b538..9b4d37c56 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -139,9 +139,9 @@ SrsBufferListener::~SrsBufferListener() srs_freep(listener); } -int SrsBufferListener::listen(string i, int p) +srs_error_t SrsBufferListener::listen(string i, int p) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; ip = i; port = p; @@ -149,30 +149,25 @@ int SrsBufferListener::listen(string i, int p) srs_freep(listener); listener = new SrsTcpListener(this, ip, port); - if ((ret = listener->listen()) != ERROR_SUCCESS) { - srs_error("tcp listen failed. ret=%d", ret); - return ret; + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "buffer tcp listen %s:%d", ip.c_str(), port); } - srs_info("listen thread current_cid=%d, " - "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", - _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p); + string v = srs_listener_type2string(type); + srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd()); - - return ret; + return err; } -int SrsBufferListener::on_tcp_client(srs_netfd_t stfd) +srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd) { - int ret = ERROR_SUCCESS; - - if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) { - srs_warn("accept client error. ret=%d", ret); - return ret; + srs_error_t err = server->accept_client(type, stfd); + if (err != srs_success) { + srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); + srs_freep(err); } - return ret; + return srs_success; } #ifdef SRS_AUTO_STREAM_CASTER @@ -194,9 +189,9 @@ SrsRtspListener::~SrsRtspListener() srs_freep(listener); } -int SrsRtspListener::listen(string i, int p) +srs_error_t SrsRtspListener::listen(string i, int p) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // the caller already ensure the type is ok, // we just assert here for unknown stream caster. @@ -208,27 +203,25 @@ int SrsRtspListener::listen(string i, int p) srs_freep(listener); listener = new SrsTcpListener(this, ip, port); - if ((ret = listener->listen()) != ERROR_SUCCESS) { - srs_error("rtsp caster listen failed. ret=%d", ret); - return ret; + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "rtsp listen %s:%d", ip.c_str(), port); } - srs_info("listen thread listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", port, type, listener->fd(), ip.c_str(), port); - srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd()); + string v = srs_listener_type2string(type); + srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - return ret; + return err; } -int SrsRtspListener::on_tcp_client(srs_netfd_t stfd) +srs_error_t SrsRtspListener::on_tcp_client(srs_netfd_t stfd) { - int ret = ERROR_SUCCESS; - - if ((ret = caster->on_tcp_client(stfd)) != ERROR_SUCCESS) { - srs_warn("accept client error. ret=%d", ret); - return ret; + srs_error_t err = caster->on_tcp_client(stfd); + if (err != srs_success) { + srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); + srs_freep(err); } - return ret; + return srs_success; } SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t) @@ -249,9 +242,9 @@ SrsHttpFlvListener::~SrsHttpFlvListener() srs_freep(listener); } -int SrsHttpFlvListener::listen(string i, int p) +srs_error_t SrsHttpFlvListener::listen(string i, int p) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // the caller already ensure the type is ok, // we just assert here for unknown stream caster. @@ -260,35 +253,32 @@ int SrsHttpFlvListener::listen(string i, int p) ip = i; port = p; - if ((ret = caster->initialize()) != ERROR_SUCCESS) { - return ret; + if ((err = caster->initialize()) != srs_success) { + return srs_error_wrap(err, "init caster %s:%d", ip.c_str(), port); } srs_freep(listener); listener = new SrsTcpListener(this, ip, port); - if ((ret = listener->listen()) != ERROR_SUCCESS) { - srs_error("flv caster listen failed. ret=%d", ret); - return ret; + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "listen"); } - srs_info("listen thread listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", port, type, listener->fd(), ip.c_str(), port); - - srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd()); + string v = srs_listener_type2string(type); + srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - return ret; + return err; } -int SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd) +srs_error_t SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd) { - int ret = ERROR_SUCCESS; - - if ((ret = caster->on_tcp_client(stfd)) != ERROR_SUCCESS) { - srs_warn("accept client error. ret=%d", ret); - return ret; + srs_error_t err = caster->on_tcp_client(stfd); + if (err != srs_success) { + srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); + srs_freep(err); } - return ret; + return err; } #endif @@ -303,9 +293,9 @@ SrsUdpStreamListener::~SrsUdpStreamListener() srs_freep(listener); } -int SrsUdpStreamListener::listen(string i, int p) +srs_error_t SrsUdpStreamListener::listen(string i, int p) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // the caller already ensure the type is ok, // we just assert here for unknown stream caster. @@ -317,24 +307,19 @@ int SrsUdpStreamListener::listen(string i, int p) srs_freep(listener); listener = new SrsUdpListener(caster, ip, port); - if ((ret = listener->listen()) != ERROR_SUCCESS) { - srs_error("udp caster listen failed. ret=%d", ret); - return ret; + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } - srs_info("listen thread current_cid=%d, " - "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", - _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p); - // notify the handler the fd changed. - if ((ret = caster->on_stfd_change(listener->stfd())) != ERROR_SUCCESS) { - srs_error("notify handler fd changed. ret=%d", ret); - return ret; + if ((err = caster->on_stfd_change(listener->stfd())) != srs_success) { + return srs_error_wrap(err, "notify fd change failed"); } - srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd()); + string v = srs_listener_type2string(type); + srs_trace("%s listen at udp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); - return ret; + return err; } #ifdef SRS_AUTO_STREAM_CASTER @@ -1044,7 +1029,7 @@ srs_error_t SrsServer::do_cycle() srs_error_t SrsServer::listen_rtmp() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // stream service port. std::vector ip_ports = _srs_config->get_listens(); @@ -1060,17 +1045,17 @@ srs_error_t SrsServer::listen_rtmp() int port; srs_parse_endpoint(ip_ports[i], ip, port); - if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) { - srs_error_new(ret, "rtmp listen %s:%d", ip.c_str(), port); + if ((err = listener->listen(ip, port)) != srs_success) { + srs_error_wrap(err, "rtmp listen %s:%d", ip.c_str(), port); } } - return srs_success; + return err; } srs_error_t SrsServer::listen_http_api() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; close_listeners(SrsListenerHttpApi); if (_srs_config->get_http_api_enabled()) { @@ -1083,17 +1068,17 @@ srs_error_t SrsServer::listen_http_api() int port; srs_parse_endpoint(ep, ip, port); - if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) { - return srs_error_new(ret, "http api listen %s:%d", ip.c_str(), port); + if ((err = listener->listen(ip, port)) != srs_success) { + return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port); } } - return srs_success; + return err; } srs_error_t SrsServer::listen_http_stream() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; close_listeners(SrsListenerHttpStream); if (_srs_config->get_http_stream_enabled()) { @@ -1106,17 +1091,17 @@ srs_error_t SrsServer::listen_http_stream() int port; srs_parse_endpoint(ep, ip, port); - if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) { - return srs_error_new(ret, "http stream listen %s:%d", ip.c_str(), port); + if ((err = listener->listen(ip, port)) != srs_success) { + return srs_error_wrap(err, "http stream listen %s:%d", ip.c_str(), port); } } - return srs_success; + return err; } srs_error_t SrsServer::listen_stream_caster() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; #ifdef SRS_AUTO_STREAM_CASTER close_listeners(SrsListenerMpegTsOverUdp); @@ -1152,13 +1137,13 @@ srs_error_t SrsServer::listen_stream_caster() } // TODO: support listen at <[ip:]port> - if ((ret = listener->listen("0.0.0.0", port)) != ERROR_SUCCESS) { - return srs_error_new(ret, "listen at %d", port); + if ((err = listener->listen("0.0.0.0", port)) != srs_success) { + return srs_error_wrap(err, "listen at %d", port); } } #endif - return srs_success; + return err; } void SrsServer::close_listeners(SrsListenerType type) @@ -1198,40 +1183,27 @@ void SrsServer::resample_kbps() srs_update_rtmp_server((int)conns.size(), kbps); } -int SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) +srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; SrsConnection* conn = NULL; if ((err = fd2conn(type, stfd, &conn)) != srs_success) { - srs_error("accept client failed, err=%s", srs_error_desc(err).c_str()); - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - - srs_close_stfd(stfd); - return ERROR_SUCCESS; + return srs_error_wrap(err, "fd2conn"); } srs_assert(conn); // directly enqueue, the cycle thread will remove the client. conns.push_back(conn); - srs_verbose("add conn to vector."); // cycle will start process thread and when finished remove the client. // @remark never use the conn, for it maybe destroyed. if ((err = conn->start()) != srs_success) { - // TODO: FIXME: Use error - ret = srs_error_code(err); - srs_freep(err); - - return ret; + return srs_error_wrap(err, "start conn coroutine"); } - srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret); - return ret; + return err; } srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn) diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index c15f19e8c..f045e652d 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -91,7 +91,7 @@ public: virtual ~SrsListener(); public: virtual SrsListenerType listen_type(); - virtual int listen(std::string i, int p) = 0; + virtual srs_error_t listen(std::string i, int p) = 0; }; /** @@ -105,10 +105,10 @@ public: SrsBufferListener(SrsServer* server, SrsListenerType type); virtual ~SrsBufferListener(); public: - virtual int listen(std::string ip, int port); + virtual srs_error_t listen(std::string ip, int port); // ISrsTcpHandler public: - virtual int on_tcp_client(srs_netfd_t stfd); + virtual srs_error_t on_tcp_client(srs_netfd_t stfd); }; #ifdef SRS_AUTO_STREAM_CASTER @@ -124,10 +124,10 @@ public: SrsRtspListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); virtual ~SrsRtspListener(); public: - virtual int listen(std::string i, int p); + virtual srs_error_t listen(std::string i, int p); // ISrsTcpHandler public: - virtual int on_tcp_client(srs_netfd_t stfd); + virtual srs_error_t on_tcp_client(srs_netfd_t stfd); }; /** @@ -142,10 +142,10 @@ public: SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); virtual ~SrsHttpFlvListener(); public: - virtual int listen(std::string i, int p); + virtual srs_error_t listen(std::string i, int p); // ISrsTcpHandler public: - virtual int on_tcp_client(srs_netfd_t stfd); + virtual srs_error_t on_tcp_client(srs_netfd_t stfd); }; #endif @@ -161,7 +161,7 @@ public: SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c); virtual ~SrsUdpStreamListener(); public: - virtual int listen(std::string i, int p); + virtual srs_error_t listen(std::string i, int p); }; /** @@ -358,7 +358,7 @@ public: * for instance RTMP connection to serve client. * @param stfd, the client fd in st boxed, the underlayer fd. */ - virtual int accept_client(SrsListenerType type, srs_netfd_t stfd); + virtual srs_error_t accept_client(SrsListenerType type, srs_netfd_t stfd); private: virtual srs_error_t fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn); // IConnectionManager diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index b7d30022d..35e1bf290 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -45,9 +45,13 @@ * * Thread has its inside loop, such as the RTMP receive thread: * class SrsReceiveThread : public ISrsCoroutineHandler { - * public: SrsCoroutine trd; + * public: SrsCoroutine* trd; * public: virtual srs_error_t cycle() { - * while (!trd.pull()) { // Check whether thread interrupted. + * while (true) { + * // Check whether thread interrupted. + * if ((err = trd->pull()) != srs_success) { + * return err; + * } * // Do something, such as st_read() packets, it'll be wakeup * // when user stop or interrupt the thread. * } @@ -79,6 +83,8 @@ public: virtual srs_error_t start() = 0; virtual void stop() = 0; virtual void interrupt() = 0; + // @return a copy of error, which should be freed by user. + // NULL if not terminated and user should pull again. virtual srs_error_t pull() = 0; virtual int cid() = 0; }; diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 82b6f8a4c..f81fae0ce 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -116,7 +116,7 @@ #endif // Error predefined for all modules. -class SrsError; -typedef SrsError* srs_error_t; +class SrsCplxError; +typedef SrsCplxError* srs_error_t; #endif diff --git a/trunk/src/kernel/srs_kernel_error.cpp b/trunk/src/kernel/srs_kernel_error.cpp index 92e1cc53c..b12ded81b 100644 --- a/trunk/src/kernel/srs_kernel_error.cpp +++ b/trunk/src/kernel/srs_kernel_error.cpp @@ -44,23 +44,23 @@ bool srs_is_client_gracefully_close(int error_code) || error_code == ERROR_SOCKET_WRITE; } -SrsError::SrsError() +SrsCplxError::SrsCplxError() { code = ERROR_SUCCESS; wrapped = NULL; cid = rerrno = line = 0; } -SrsError::~SrsError() +SrsCplxError::~SrsCplxError() { } -std::string SrsError::description() { +std::string SrsCplxError::description() { if (desc.empty()) { stringstream ss; ss << "code=" << code; - SrsError* next = this; + SrsCplxError* next = this; while (next) { ss << " : " << next->msg; next = next->wrapped; @@ -82,7 +82,7 @@ std::string SrsError::description() { return desc; } -SrsError* SrsError::create(const char* func, const char* file, int line, int code, const char* fmt, ...) { +SrsCplxError* SrsCplxError::create(const char* func, const char* file, int line, int code, const char* fmt, ...) { int rerrno = (int)errno; va_list ap; @@ -91,7 +91,7 @@ SrsError* SrsError::create(const char* func, const char* file, int line, int cod vsnprintf(buffer, sizeof(buffer), fmt, ap); va_end(ap); - SrsError* err = new SrsError(); + SrsCplxError* err = new SrsCplxError(); err->func = func; err->file = file; @@ -107,7 +107,7 @@ SrsError* SrsError::create(const char* func, const char* file, int line, int cod return err; } -SrsError* SrsError::wrap(const char* func, const char* file, int line, SrsError* v, const char* fmt, ...) { +SrsCplxError* SrsCplxError::wrap(const char* func, const char* file, int line, SrsCplxError* v, const char* fmt, ...) { int rerrno = (int)errno; va_list ap; @@ -116,7 +116,7 @@ SrsError* SrsError::wrap(const char* func, const char* file, int line, SrsError* vsnprintf(buffer, sizeof(buffer), fmt, ap); va_end(ap); - SrsError* err = new SrsError(); + SrsCplxError* err = new SrsCplxError(); err->func = func; err->file = file; @@ -132,17 +132,17 @@ SrsError* SrsError::wrap(const char* func, const char* file, int line, SrsError* return err; } -SrsError* SrsError::success() { +SrsCplxError* SrsCplxError::success() { return NULL; } -SrsError* SrsError::copy(SrsError* from) +SrsCplxError* SrsCplxError::copy(SrsCplxError* from) { if (from == srs_success) { return srs_success; } - SrsError* err = new SrsError(); + SrsCplxError* err = new SrsCplxError(); err->code = from->code; err->wrapped = srs_error_copy(from->wrapped); @@ -157,12 +157,12 @@ SrsError* SrsError::copy(SrsError* from) return err; } -string SrsError::description(SrsError* err) +string SrsCplxError::description(SrsCplxError* err) { return err? err->description() : "Success"; } -int SrsError::error_code(SrsError* err) +int SrsCplxError::error_code(SrsCplxError* err) { return err? err->code : ERROR_SUCCESS; } diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 154bd92cf..1c914b9a2 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -336,11 +336,11 @@ extern bool srs_is_system_control_error(int error_code); extern bool srs_is_client_gracefully_close(int error_code); // Use complex errors, @read https://github.com/ossrs/srs/issues/913 -class SrsError +class SrsCplxError { private: int code; - SrsError* wrapped; + SrsCplxError* wrapped; std::string msg; std::string func; @@ -352,27 +352,27 @@ private: std::string desc; private: - SrsError(); + SrsCplxError(); public: - virtual ~SrsError(); + virtual ~SrsCplxError(); private: virtual std::string description(); public: - static SrsError* create(const char* func, const char* file, int line, int code, const char* fmt, ...); - static SrsError* wrap(const char* func, const char* file, int line, SrsError* err, const char* fmt, ...); - static SrsError* success(); - static SrsError* copy(SrsError* from); - static std::string description(SrsError* err); - static int error_code(SrsError* err); + static SrsCplxError* create(const char* func, const char* file, int line, int code, const char* fmt, ...); + static SrsCplxError* wrap(const char* func, const char* file, int line, SrsCplxError* err, const char* fmt, ...); + static SrsCplxError* success(); + static SrsCplxError* copy(SrsCplxError* from); + static std::string description(SrsCplxError* err); + static int error_code(SrsCplxError* err); }; // Error helpers, should use these functions to new or wrap an error. -#define srs_success SrsError::success() -#define srs_error_new(ret, fmt, ...) SrsError::create(__FUNCTION__, __FILE__, __LINE__, ret, fmt, ##__VA_ARGS__) -#define srs_error_wrap(err, fmt, ...) SrsError::wrap(__FUNCTION__, __FILE__, __LINE__, err, fmt, ##__VA_ARGS__) -#define srs_error_copy(err) SrsError::copy(err) -#define srs_error_desc(err) SrsError::description(err) -#define srs_error_code(err) SrsError::error_code(err) +#define srs_success SrsCplxError::success() +#define srs_error_new(ret, fmt, ...) SrsCplxError::create(__FUNCTION__, __FILE__, __LINE__, ret, fmt, ##__VA_ARGS__) +#define srs_error_wrap(err, fmt, ...) SrsCplxError::wrap(__FUNCTION__, __FILE__, __LINE__, err, fmt, ##__VA_ARGS__) +#define srs_error_copy(err) SrsCplxError::copy(err) +#define srs_error_desc(err) SrsCplxError::description(err) +#define srs_error_code(err) SrsCplxError::error_code(err) #endif