For #913, use complex error for listener

pull/970/head
winlin 8 years ago
parent b88265fc78
commit 5c9a12e72a

@ -60,33 +60,23 @@ SrsAppCasterFlv::~SrsAppCasterFlv()
srs_freep(manager);
}
int SrsAppCasterFlv::initialize()
srs_error_t SrsAppCasterFlv::initialize()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if ((err = http_mux->handle("/", this)) != srs_success) {
// TODO: FIXME: Use error.
ret = srs_error_code(err);
srs_freep(err);
return ret;
return srs_error_wrap(err, "handle root");
}
if ((err = manager->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
return srs_error_wrap(err, "start manager");
}
return ret;
return err;
}
int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
string ip = srs_get_peer_ip(srs_netfd_fileno(stfd));
@ -94,14 +84,10 @@ int SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
conns.push_back(conn);
if ((err = conn->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
return srs_error_wrap(err, "start tcp listener");
}
return ret;
return err;
}
void SrsAppCasterFlv::remove(ISrsConnection* c)

@ -63,10 +63,10 @@ public:
SrsAppCasterFlv(SrsConfDirective* c);
virtual ~SrsAppCasterFlv();
public:
virtual int initialize();
virtual srs_error_t initialize();
// ISrsTcpHandler
public:
virtual int on_tcp_client(srs_netfd_t stfd);
virtual srs_error_t on_tcp_client(srs_netfd_t stfd);
// IConnectionManager
public:
virtual void remove(ISrsConnection* c);

@ -302,7 +302,15 @@ int SrsEdgeIngester::ingest()
// set to larger timeout to read av data from origin.
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TMMS);
while (!trd->pull()) {
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
pprint->elapse();
// pithy print
@ -555,7 +563,11 @@ srs_error_t SrsEdgeForwarder::do_cycle()
SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
while (!trd->pull()) {
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "edge forward pull");
}
if (send_error_code != ERROR_SUCCESS) {
srs_usleep(SRS_EDGE_FORWARDER_TMMS * 1000);
continue;

@ -313,7 +313,15 @@ int SrsForwarder::forward()
}
}
while (!trd->pull()) {
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
pprint->elapse();
// read from client.

@ -1353,7 +1353,15 @@ int SrsHttpApi::do_cycle()
}
// process http messages.
while(!trd->pull()) {
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
ISrsHttpMessage* req = NULL;
// get a http message

@ -121,7 +121,15 @@ int SrsHttpConn::do_cycle()
}
// process http messages.
while (!trd->pull()) {
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
ISrsHttpMessage* req = NULL;
// get a http message

@ -55,9 +55,9 @@ ISrsUdpHandler::~ISrsUdpHandler()
{
}
int ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/)
srs_error_t ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/)
{
return ERROR_SUCCESS;
return srs_success;
}
ISrsTcpHandler::ISrsTcpHandler()
@ -107,17 +107,13 @@ srs_netfd_t SrsUdpListener::stfd()
return _stfd;
}
int SrsUdpListener::listen()
srs_error_t SrsUdpListener::listen()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if ((_fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
srs_error("create linux socket error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_CREATE, "create socket");
}
srs_verbose("create linux socket success. ip=%s, port=%d, fd=%d", ip.c_str(), port, _fd);
srs_fd_close_exec(_fd);
srs_socket_reuse_addr(_fd);
@ -127,40 +123,31 @@ int SrsUdpListener::listen()
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_BIND, "bind socket");
}
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
}
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
srs_freep(trd);
trd = new SrsSTCoroutine("udp", this);
if ((err = trd->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_wrap(err, "start thread");
}
srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);
return ret;
return err;
}
srs_error_t SrsUdpListener::cycle()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
while (!trd->pull()) {
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "udp listener");
}
// TODO: FIXME: support ipv6, @see man 7 ipv6
sockaddr_in from;
int nb_from = sizeof(sockaddr_in);
@ -170,8 +157,8 @@ srs_error_t SrsUdpListener::cycle()
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
}
if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) {
return srs_error_new(ret, "handle packet %d bytes", nread);
if ((err = handler->on_udp_packet(&from, buf, nread)) != srs_success) {
return srs_error_wrap(err, "handle packet %d bytes", nread);
}
if (SrsUdpPacketRecvCycleMS > 0) {
@ -206,17 +193,13 @@ int SrsTcpListener::fd()
return _fd;
}
int SrsTcpListener::listen()
srs_error_t SrsTcpListener::listen()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
srs_error("create linux socket error. port=%d, ret=%d", port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_CREATE, "create socket");
}
srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);
srs_fd_close_exec(_fd);
srs_socket_reuse_addr(_fd);
@ -226,59 +209,45 @@ int SrsTcpListener::listen()
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_BIND, "bind socket");
}
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
ret = ERROR_SOCKET_LISTEN;
srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_SOCKET_LISTEN, "listen socket");
}
srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
}
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
srs_freep(trd);
trd = new SrsSTCoroutine("tcp", this);
if ((err = trd->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_wrap(err, "start coroutine");
}
srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);
return ret;
return err;
}
srs_error_t SrsTcpListener::cycle()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
while (!trd->pull()) {
srs_netfd_t stfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
int fd = srs_netfd_fileno(stfd);
srs_fd_close_exec(fd);
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}
if(stfd == NULL){
return err;
srs_netfd_t cstfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(cstfd == NULL){
return srs_error_new(ERROR_SOCKET_CREATE, "accept failed");
}
srs_verbose("get a client. fd=%d", fd);
if ((ret = handler->on_tcp_client(stfd)) != ERROR_SUCCESS) {
return srs_error_new(ret, "handle fd=%d", fd);
int cfd = srs_netfd_fileno(cstfd);
srs_fd_close_exec(cfd);
if ((err = handler->on_tcp_client(cstfd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", cfd);
}
}

@ -46,7 +46,7 @@ public:
* when fd changed, for instance, reload the listen port,
* notify the handler and user can do something.
*/
virtual int on_stfd_change(srs_netfd_t fd);
virtual srs_error_t on_stfd_change(srs_netfd_t fd);
public:
/**
* when udp listener got a udp packet, notice server to process it.
@ -57,7 +57,7 @@ public:
* @param nb_buf, the size of udp packet bytes.
* @remark user should never use the buf, for it's a shared memory bytes.
*/
virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) = 0;
virtual srs_error_t on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) = 0;
};
/**
@ -72,7 +72,7 @@ public:
/**
* when got tcp client.
*/
virtual int on_tcp_client(srs_netfd_t stfd) = 0;
virtual srs_error_t on_tcp_client(srs_netfd_t stfd) = 0;
};
/**
@ -98,7 +98,7 @@ public:
virtual int fd();
virtual srs_netfd_t stfd();
public:
virtual int listen();
virtual srs_error_t listen();
// interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
@ -123,7 +123,7 @@ public:
public:
virtual int fd();
public:
virtual int listen();
virtual srs_error_t listen();
// interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();

@ -157,7 +157,7 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
srs_freep(pprint);
}
int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
srs_error_t SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
{
std::string peer_ip = inet_ntoa(from->sin_addr);
int peer_port = ntohs(from->sin_port);
@ -168,7 +168,11 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
srs_info("udp: got %s:%d packet %d/%d bytes",
peer_ip.c_str(), peer_port, nb_buf, buffer->length());
return on_udp_bytes(peer_ip, peer_port, buf, nb_buf);
int ret = on_udp_bytes(peer_ip, peer_port, buf, nb_buf);
if (ret != ERROR_SUCCESS) {
return srs_error_new(ret, "process udp");
}
return srs_success;
}
int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf)

@ -101,7 +101,7 @@ public:
virtual ~SrsMpegtsOverUdp();
// interface ISrsUdpHandler
public:
virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf);
virtual srs_error_t on_udp_packet(sockaddr_in* from, char* buf, int nb_buf);
private:
virtual int on_udp_bytes(std::string host, int port, char* buf, int nb_buf);
// interface ISrsTsHandler

@ -450,7 +450,15 @@ int SrsRtmpConn::service_cycle()
}
srs_verbose("on_bw_done success");
while (!trd->pull()) {
while (true) {
srs_error_t err = srs_success;
if ((err = trd->pull()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
}
ret = stream_service_cycle();
// stream service must terminated with error, never success.

@ -71,14 +71,15 @@ int SrsRtpConn::port()
return _port;
}
int SrsRtpConn::listen()
srs_error_t SrsRtpConn::listen()
{
return listener->listen();
}
int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
srs_error_t SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
pprint->elapse();
@ -86,13 +87,12 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
SrsBuffer stream;
if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) {
return ret;
return srs_error_new(ret, "stream");
}
SrsRtpPacket pkt;
if ((ret = pkt.decode(&stream)) != ERROR_SUCCESS) {
srs_error("rtsp: decode rtp packet failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "decode");
}
if (pkt.chunked) {
@ -106,7 +106,7 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
nb_buf, pprint->age(), cache->version, cache->payload_type, cache->sequence_number, cache->timestamp, cache->ssrc,
cache->payload->length()
);
return ret;
return err;
}
} else {
srs_freep(cache);
@ -126,11 +126,10 @@ int SrsRtpConn::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
SrsAutoFree(SrsRtpPacket, cache);
if ((ret = rtsp->on_rtp_packet(cache, stream_id)) != ERROR_SUCCESS) {
srs_error("rtsp: process rtp packet failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "process rtp packet");
}
return ret;
return err;
}
SrsRtspAudioCache::SrsRtspAudioCache()
@ -256,7 +255,11 @@ srs_error_t SrsRtspConn::do_cycle()
srs_trace("rtsp: serve %s", ip.c_str());
// consume all rtsp messages.
while (!trd->pull()) {
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "rtsp cycle");
}
SrsRtspRequest* req = NULL;
if ((ret = rtsp->recv_message(&req)) != ERROR_SUCCESS) {
return srs_error_new(ret, "recv message");
@ -316,8 +319,8 @@ srs_error_t SrsRtspConn::do_cycle()
srs_freep(audio_rtp);
rtp = audio_rtp = new SrsRtpConn(this, lpm, audio_id);
}
if ((ret = rtp->listen()) != ERROR_SUCCESS) {
return srs_error_new(ret, "rtp listen");
if ((err = rtp->listen()) != srs_success) {
return srs_error_wrap(err, "rtp listen");
}
srs_trace("rtsp: #%d %s over %s/%s/%s %s client-port=%d-%d, server-port=%d-%d",
req->stream_id, (req->stream_id == video_id)? "Video":"Audio",
@ -728,27 +731,20 @@ void SrsRtspCaster::free_port(int lpmin, int lpmax)
srs_trace("rtsp: free rtp port=%d-%d", lpmin, lpmax);
}
int SrsRtspCaster::on_tcp_client(srs_netfd_t stfd)
srs_error_t SrsRtspCaster::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
SrsRtspConn* conn = new SrsRtspConn(this, stfd, output);
if ((err = conn->serve()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
srs_error("rtsp: serve client failed. ret=%d", ret);
srs_freep(conn);
return ret;
return srs_error_wrap(err, "serve conn");
}
clients.push_back(conn);
srs_info("rtsp: start thread to serve client.");
return ret;
return err;
}
void SrsRtspCaster::remove(SrsRtspConn* conn)

@ -71,10 +71,10 @@ public:
virtual ~SrsRtpConn();
public:
virtual int port();
virtual int listen();
virtual srs_error_t listen();
// interface ISrsUdpHandler
public:
virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf);
virtual srs_error_t on_udp_packet(sockaddr_in* from, char* buf, int nb_buf);
};
/**
@ -206,7 +206,7 @@ public:
virtual void free_port(int lpmin, int lpmax);
// interface ISrsTcpHandler
public:
virtual int on_tcp_client(srs_netfd_t stfd);
virtual srs_error_t on_tcp_client(srs_netfd_t stfd);
// internal methods.
public:
virtual void remove(SrsRtspConn* conn);

@ -139,9 +139,9 @@ SrsBufferListener::~SrsBufferListener()
srs_freep(listener);
}
int SrsBufferListener::listen(string i, int p)
srs_error_t SrsBufferListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
ip = i;
port = p;
@ -149,30 +149,25 @@ int SrsBufferListener::listen(string i, int p)
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("tcp listen failed. ret=%d", ret);
return ret;
if ((err = listener->listen()) != srs_success) {
return srs_error_wrap(err, "buffer tcp listen %s:%d", ip.c_str(), port);
}
srs_info("listen thread current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
_srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
string v = srs_listener_type2string(type);
srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
return err;
}
int SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
srs_error_t err = server->accept_client(type, stfd);
if (err != srs_success) {
srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
srs_freep(err);
}
return ret;
return srs_success;
}
#ifdef SRS_AUTO_STREAM_CASTER
@ -194,9 +189,9 @@ SrsRtspListener::~SrsRtspListener()
srs_freep(listener);
}
int SrsRtspListener::listen(string i, int p)
srs_error_t SrsRtspListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
@ -208,27 +203,25 @@ int SrsRtspListener::listen(string i, int p)
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("rtsp caster listen failed. ret=%d", ret);
return ret;
if ((err = listener->listen()) != srs_success) {
return srs_error_wrap(err, "rtsp listen %s:%d", ip.c_str(), port);
}
srs_info("listen thread listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", port, type, listener->fd(), ip.c_str(), port);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
string v = srs_listener_type2string(type);
srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
return ret;
return err;
}
int SrsRtspListener::on_tcp_client(srs_netfd_t stfd)
srs_error_t SrsRtspListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
if ((ret = caster->on_tcp_client(stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
srs_error_t err = caster->on_tcp_client(stfd);
if (err != srs_success) {
srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
srs_freep(err);
}
return ret;
return srs_success;
}
SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t)
@ -249,9 +242,9 @@ SrsHttpFlvListener::~SrsHttpFlvListener()
srs_freep(listener);
}
int SrsHttpFlvListener::listen(string i, int p)
srs_error_t SrsHttpFlvListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
@ -260,35 +253,32 @@ int SrsHttpFlvListener::listen(string i, int p)
ip = i;
port = p;
if ((ret = caster->initialize()) != ERROR_SUCCESS) {
return ret;
if ((err = caster->initialize()) != srs_success) {
return srs_error_wrap(err, "init caster %s:%d", ip.c_str(), port);
}
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("flv caster listen failed. ret=%d", ret);
return ret;
if ((err = listener->listen()) != srs_success) {
return srs_error_wrap(err, "listen");
}
srs_info("listen thread listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", port, type, listener->fd(), ip.c_str(), port);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
string v = srs_listener_type2string(type);
srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
return ret;
return err;
}
int SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd)
srs_error_t SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
if ((ret = caster->on_tcp_client(stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
srs_error_t err = caster->on_tcp_client(stfd);
if (err != srs_success) {
srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
srs_freep(err);
}
return ret;
return err;
}
#endif
@ -303,9 +293,9 @@ SrsUdpStreamListener::~SrsUdpStreamListener()
srs_freep(listener);
}
int SrsUdpStreamListener::listen(string i, int p)
srs_error_t SrsUdpStreamListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
@ -317,24 +307,19 @@ int SrsUdpStreamListener::listen(string i, int p)
srs_freep(listener);
listener = new SrsUdpListener(caster, ip, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("udp caster listen failed. ret=%d", ret);
return ret;
if ((err = listener->listen()) != srs_success) {
return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
}
srs_info("listen thread current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
_srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
// notify the handler the fd changed.
if ((ret = caster->on_stfd_change(listener->stfd())) != ERROR_SUCCESS) {
srs_error("notify handler fd changed. ret=%d", ret);
return ret;
if ((err = caster->on_stfd_change(listener->stfd())) != srs_success) {
return srs_error_wrap(err, "notify fd change failed");
}
srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
string v = srs_listener_type2string(type);
srs_trace("%s listen at udp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
return ret;
return err;
}
#ifdef SRS_AUTO_STREAM_CASTER
@ -1044,7 +1029,7 @@ srs_error_t SrsServer::do_cycle()
srs_error_t SrsServer::listen_rtmp()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// stream service port.
std::vector<std::string> ip_ports = _srs_config->get_listens();
@ -1060,17 +1045,17 @@ srs_error_t SrsServer::listen_rtmp()
int port;
srs_parse_endpoint(ip_ports[i], ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
srs_error_new(ret, "rtmp listen %s:%d", ip.c_str(), port);
if ((err = listener->listen(ip, port)) != srs_success) {
srs_error_wrap(err, "rtmp listen %s:%d", ip.c_str(), port);
}
}
return srs_success;
return err;
}
srs_error_t SrsServer::listen_http_api()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
close_listeners(SrsListenerHttpApi);
if (_srs_config->get_http_api_enabled()) {
@ -1083,17 +1068,17 @@ srs_error_t SrsServer::listen_http_api()
int port;
srs_parse_endpoint(ep, ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
return srs_error_new(ret, "http api listen %s:%d", ip.c_str(), port);
if ((err = listener->listen(ip, port)) != srs_success) {
return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port);
}
}
return srs_success;
return err;
}
srs_error_t SrsServer::listen_http_stream()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
close_listeners(SrsListenerHttpStream);
if (_srs_config->get_http_stream_enabled()) {
@ -1106,17 +1091,17 @@ srs_error_t SrsServer::listen_http_stream()
int port;
srs_parse_endpoint(ep, ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
return srs_error_new(ret, "http stream listen %s:%d", ip.c_str(), port);
if ((err = listener->listen(ip, port)) != srs_success) {
return srs_error_wrap(err, "http stream listen %s:%d", ip.c_str(), port);
}
}
return srs_success;
return err;
}
srs_error_t SrsServer::listen_stream_caster()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
#ifdef SRS_AUTO_STREAM_CASTER
close_listeners(SrsListenerMpegTsOverUdp);
@ -1152,13 +1137,13 @@ srs_error_t SrsServer::listen_stream_caster()
}
// TODO: support listen at <[ip:]port>
if ((ret = listener->listen("0.0.0.0", port)) != ERROR_SUCCESS) {
return srs_error_new(ret, "listen at %d", port);
if ((err = listener->listen("0.0.0.0", port)) != srs_success) {
return srs_error_wrap(err, "listen at %d", port);
}
}
#endif
return srs_success;
return err;
}
void SrsServer::close_listeners(SrsListenerType type)
@ -1198,40 +1183,27 @@ void SrsServer::resample_kbps()
srs_update_rtmp_server((int)conns.size(), kbps);
}
int SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
SrsConnection* conn = NULL;
if ((err = fd2conn(type, stfd, &conn)) != srs_success) {
srs_error("accept client failed, err=%s", srs_error_desc(err).c_str());
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
srs_close_stfd(stfd);
return ERROR_SUCCESS;
return srs_error_wrap(err, "fd2conn");
}
srs_assert(conn);
// directly enqueue, the cycle thread will remove the client.
conns.push_back(conn);
srs_verbose("add conn to vector.");
// cycle will start process thread and when finished remove the client.
// @remark never use the conn, for it maybe destroyed.
if ((err = conn->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
return ret;
return srs_error_wrap(err, "start conn coroutine");
}
srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
return ret;
return err;
}
srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)

@ -91,7 +91,7 @@ public:
virtual ~SrsListener();
public:
virtual SrsListenerType listen_type();
virtual int listen(std::string i, int p) = 0;
virtual srs_error_t listen(std::string i, int p) = 0;
};
/**
@ -105,10 +105,10 @@ public:
SrsBufferListener(SrsServer* server, SrsListenerType type);
virtual ~SrsBufferListener();
public:
virtual int listen(std::string ip, int port);
virtual srs_error_t listen(std::string ip, int port);
// ISrsTcpHandler
public:
virtual int on_tcp_client(srs_netfd_t stfd);
virtual srs_error_t on_tcp_client(srs_netfd_t stfd);
};
#ifdef SRS_AUTO_STREAM_CASTER
@ -124,10 +124,10 @@ public:
SrsRtspListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c);
virtual ~SrsRtspListener();
public:
virtual int listen(std::string i, int p);
virtual srs_error_t listen(std::string i, int p);
// ISrsTcpHandler
public:
virtual int on_tcp_client(srs_netfd_t stfd);
virtual srs_error_t on_tcp_client(srs_netfd_t stfd);
};
/**
@ -142,10 +142,10 @@ public:
SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c);
virtual ~SrsHttpFlvListener();
public:
virtual int listen(std::string i, int p);
virtual srs_error_t listen(std::string i, int p);
// ISrsTcpHandler
public:
virtual int on_tcp_client(srs_netfd_t stfd);
virtual srs_error_t on_tcp_client(srs_netfd_t stfd);
};
#endif
@ -161,7 +161,7 @@ public:
SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c);
virtual ~SrsUdpStreamListener();
public:
virtual int listen(std::string i, int p);
virtual srs_error_t listen(std::string i, int p);
};
/**
@ -358,7 +358,7 @@ public:
* for instance RTMP connection to serve client.
* @param stfd, the client fd in st boxed, the underlayer fd.
*/
virtual int accept_client(SrsListenerType type, srs_netfd_t stfd);
virtual srs_error_t accept_client(SrsListenerType type, srs_netfd_t stfd);
private:
virtual srs_error_t fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn);
// IConnectionManager

@ -45,9 +45,13 @@
*
* Thread has its inside loop, such as the RTMP receive thread:
* class SrsReceiveThread : public ISrsCoroutineHandler {
* public: SrsCoroutine trd;
* public: SrsCoroutine* trd;
* public: virtual srs_error_t cycle() {
* while (!trd.pull()) { // Check whether thread interrupted.
* while (true) {
* // Check whether thread interrupted.
* if ((err = trd->pull()) != srs_success) {
* return err;
* }
* // Do something, such as st_read() packets, it'll be wakeup
* // when user stop or interrupt the thread.
* }
@ -79,6 +83,8 @@ public:
virtual srs_error_t start() = 0;
virtual void stop() = 0;
virtual void interrupt() = 0;
// @return a copy of error, which should be freed by user.
// NULL if not terminated and user should pull again.
virtual srs_error_t pull() = 0;
virtual int cid() = 0;
};

@ -116,7 +116,7 @@
#endif
// Error predefined for all modules.
class SrsError;
typedef SrsError* srs_error_t;
class SrsCplxError;
typedef SrsCplxError* srs_error_t;
#endif

@ -44,23 +44,23 @@ bool srs_is_client_gracefully_close(int error_code)
|| error_code == ERROR_SOCKET_WRITE;
}
SrsError::SrsError()
SrsCplxError::SrsCplxError()
{
code = ERROR_SUCCESS;
wrapped = NULL;
cid = rerrno = line = 0;
}
SrsError::~SrsError()
SrsCplxError::~SrsCplxError()
{
}
std::string SrsError::description() {
std::string SrsCplxError::description() {
if (desc.empty()) {
stringstream ss;
ss << "code=" << code;
SrsError* next = this;
SrsCplxError* next = this;
while (next) {
ss << " : " << next->msg;
next = next->wrapped;
@ -82,7 +82,7 @@ std::string SrsError::description() {
return desc;
}
SrsError* SrsError::create(const char* func, const char* file, int line, int code, const char* fmt, ...) {
SrsCplxError* SrsCplxError::create(const char* func, const char* file, int line, int code, const char* fmt, ...) {
int rerrno = (int)errno;
va_list ap;
@ -91,7 +91,7 @@ SrsError* SrsError::create(const char* func, const char* file, int line, int cod
vsnprintf(buffer, sizeof(buffer), fmt, ap);
va_end(ap);
SrsError* err = new SrsError();
SrsCplxError* err = new SrsCplxError();
err->func = func;
err->file = file;
@ -107,7 +107,7 @@ SrsError* SrsError::create(const char* func, const char* file, int line, int cod
return err;
}
SrsError* SrsError::wrap(const char* func, const char* file, int line, SrsError* v, const char* fmt, ...) {
SrsCplxError* SrsCplxError::wrap(const char* func, const char* file, int line, SrsCplxError* v, const char* fmt, ...) {
int rerrno = (int)errno;
va_list ap;
@ -116,7 +116,7 @@ SrsError* SrsError::wrap(const char* func, const char* file, int line, SrsError*
vsnprintf(buffer, sizeof(buffer), fmt, ap);
va_end(ap);
SrsError* err = new SrsError();
SrsCplxError* err = new SrsCplxError();
err->func = func;
err->file = file;
@ -132,17 +132,17 @@ SrsError* SrsError::wrap(const char* func, const char* file, int line, SrsError*
return err;
}
SrsError* SrsError::success() {
SrsCplxError* SrsCplxError::success() {
return NULL;
}
SrsError* SrsError::copy(SrsError* from)
SrsCplxError* SrsCplxError::copy(SrsCplxError* from)
{
if (from == srs_success) {
return srs_success;
}
SrsError* err = new SrsError();
SrsCplxError* err = new SrsCplxError();
err->code = from->code;
err->wrapped = srs_error_copy(from->wrapped);
@ -157,12 +157,12 @@ SrsError* SrsError::copy(SrsError* from)
return err;
}
string SrsError::description(SrsError* err)
string SrsCplxError::description(SrsCplxError* err)
{
return err? err->description() : "Success";
}
int SrsError::error_code(SrsError* err)
int SrsCplxError::error_code(SrsCplxError* err)
{
return err? err->code : ERROR_SUCCESS;
}

@ -336,11 +336,11 @@ extern bool srs_is_system_control_error(int error_code);
extern bool srs_is_client_gracefully_close(int error_code);
// Use complex errors, @read https://github.com/ossrs/srs/issues/913
class SrsError
class SrsCplxError
{
private:
int code;
SrsError* wrapped;
SrsCplxError* wrapped;
std::string msg;
std::string func;
@ -352,27 +352,27 @@ private:
std::string desc;
private:
SrsError();
SrsCplxError();
public:
virtual ~SrsError();
virtual ~SrsCplxError();
private:
virtual std::string description();
public:
static SrsError* create(const char* func, const char* file, int line, int code, const char* fmt, ...);
static SrsError* wrap(const char* func, const char* file, int line, SrsError* err, const char* fmt, ...);
static SrsError* success();
static SrsError* copy(SrsError* from);
static std::string description(SrsError* err);
static int error_code(SrsError* err);
static SrsCplxError* create(const char* func, const char* file, int line, int code, const char* fmt, ...);
static SrsCplxError* wrap(const char* func, const char* file, int line, SrsCplxError* err, const char* fmt, ...);
static SrsCplxError* success();
static SrsCplxError* copy(SrsCplxError* from);
static std::string description(SrsCplxError* err);
static int error_code(SrsCplxError* err);
};
// Error helpers, should use these functions to new or wrap an error.
#define srs_success SrsError::success()
#define srs_error_new(ret, fmt, ...) SrsError::create(__FUNCTION__, __FILE__, __LINE__, ret, fmt, ##__VA_ARGS__)
#define srs_error_wrap(err, fmt, ...) SrsError::wrap(__FUNCTION__, __FILE__, __LINE__, err, fmt, ##__VA_ARGS__)
#define srs_error_copy(err) SrsError::copy(err)
#define srs_error_desc(err) SrsError::description(err)
#define srs_error_code(err) SrsError::error_code(err)
#define srs_success SrsCplxError::success()
#define srs_error_new(ret, fmt, ...) SrsCplxError::create(__FUNCTION__, __FILE__, __LINE__, ret, fmt, ##__VA_ARGS__)
#define srs_error_wrap(err, fmt, ...) SrsCplxError::wrap(__FUNCTION__, __FILE__, __LINE__, err, fmt, ##__VA_ARGS__)
#define srs_error_copy(err) SrsCplxError::copy(err)
#define srs_error_desc(err) SrsCplxError::description(err)
#define srs_error_code(err) SrsCplxError::error_code(err)
#endif

Loading…
Cancel
Save