Refine ST service.

pull/1398/head
winlin 6 years ago
parent 202a584aab
commit 3d57c1c9bc

@ -46,9 +46,6 @@ using namespace std;
// sleep in srs_utime_t for udp recv packet. // sleep in srs_utime_t for udp recv packet.
#define SrsUdpPacketRecvCycleInterval 0 #define SrsUdpPacketRecvCycleInterval 0
// nginx also set to 512
#define SERVER_LISTEN_BACKLOG 512
ISrsUdpHandler::ISrsUdpHandler() ISrsUdpHandler::ISrsUdpHandler()
{ {
} }
@ -75,9 +72,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
handler = h; handler = h;
ip = i; ip = i;
port = p; port = p;
lfd = NULL;
_fd = -1;
_stfd = NULL;
nb_buf = SRS_UDP_MAX_PACKET_SIZE; nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf]; buf = new char[nb_buf];
@ -87,65 +82,27 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
SrsUdpListener::~SrsUdpListener() SrsUdpListener::~SrsUdpListener()
{ {
// close the stfd to trigger thread to interrupted.
srs_close_stfd(_stfd);
srs_freep(trd); srs_freep(trd);
srs_close_stfd(lfd);
// st does not close it sometimes,
// close it manually.
close(_fd);
srs_freepa(buf); srs_freepa(buf);
} }
int SrsUdpListener::fd() int SrsUdpListener::fd()
{ {
return _fd; return srs_netfd_fileno(lfd);
} }
srs_netfd_t SrsUdpListener::stfd() srs_netfd_t SrsUdpListener::stfd()
{ {
return _stfd; return lfd;
} }
srs_error_t SrsUdpListener::listen() srs_error_t SrsUdpListener::listen()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
char sport[8]; if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) {
snprintf(sport, sizeof(sport), "%d", port); return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_NUMERICHOST;
addrinfo* r = NULL;
SrsAutoFree(addrinfo, r);
if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info");
}
if ((_fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
return srs_error_new(ERROR_SOCKET_CREATE, "create socket. ip=%s, port=%d", ip.c_str(), port);
}
if ((err = srs_fd_closeexec(_fd)) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
if ((err = srs_fd_reuseaddr(_fd)) != srs_success) {
return srs_error_wrap(err, "set reuseaddr");
}
if (bind(_fd, r->ai_addr, r->ai_addrlen) == -1) {
return srs_error_new(ERROR_SOCKET_BIND, "bind socket. ep=%s:%d", ip.c_str(), port);;
}
if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
} }
srs_freep(trd); srs_freep(trd);
@ -166,11 +123,10 @@ srs_error_t SrsUdpListener::cycle()
return srs_error_wrap(err, "udp listener"); return srs_error_wrap(err, "udp listener");
} }
int nread = 0;
sockaddr_storage from; sockaddr_storage from;
int nb_from = sizeof(from); int nb_from = sizeof(from);
int nread = 0; if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
if ((nread = srs_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread); return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
} }
@ -192,8 +148,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
ip = i; ip = i;
port = p; port = p;
_fd = -1; lfd = NULL;
_stfd = NULL;
trd = new SrsDummyCoroutine(); trd = new SrsDummyCoroutine();
} }
@ -201,62 +156,20 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
SrsTcpListener::~SrsTcpListener() SrsTcpListener::~SrsTcpListener()
{ {
srs_freep(trd); srs_freep(trd);
srs_close_stfd(lfd);
srs_close_stfd(_stfd);
} }
int SrsTcpListener::fd() int SrsTcpListener::fd()
{ {
return _fd; return srs_netfd_fileno(lfd);;
} }
srs_error_t SrsTcpListener::listen() srs_error_t SrsTcpListener::listen()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
char sport[8]; if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) {
snprintf(sport, sizeof(sport), "%d", port); return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_NUMERICHOST;
addrinfo* r = NULL;
SrsAutoFree(addrinfo, r);
if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info");
}
if ((_fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
return srs_error_new(ERROR_SOCKET_CREATE, "create socket. ip=%s, port=%d", ip.c_str(), port);
}
// Detect alive for TCP connection.
// @see https://github.com/ossrs/srs/issues/1044
if ((err = srs_fd_keepalive(_fd)) != srs_success) {
return srs_error_wrap(err, "set keepalive");
}
if ((err = srs_fd_closeexec(_fd)) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
if ((err = srs_fd_reuseaddr(_fd)) != srs_success) {
return srs_error_wrap(err, "set reuseaddr");
}
if (bind(_fd, r->ai_addr, r->ai_addrlen) == -1) {
return srs_error_new(ERROR_SOCKET_BIND, "bind socket. ep=%s:%d", ip.c_str(), port);;
}
if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
return srs_error_new(ERROR_SOCKET_LISTEN, "listen socket");
}
if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
} }
srs_freep(trd); srs_freep(trd);
@ -277,19 +190,17 @@ srs_error_t SrsTcpListener::cycle()
return srs_error_wrap(err, "tcp listener"); return srs_error_wrap(err, "tcp listener");
} }
srs_netfd_t cstfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(cstfd == NULL){ if(fd == NULL){
return srs_error_new(ERROR_SOCKET_CREATE, "accept failed"); return srs_error_new(ERROR_SOCKET_CREATE, "accept failed");
} }
int cfd = srs_netfd_fileno(cstfd); if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
if ((err = srs_fd_closeexec(cfd)) != srs_success) {
return srs_error_wrap(err, "set closeexec"); return srs_error_wrap(err, "set closeexec");
} }
if ((err = handler->on_tcp_client(cstfd)) != srs_success) { if ((err = handler->on_tcp_client(fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", cfd); return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
} }
} }

@ -69,8 +69,7 @@ public:
class SrsUdpListener : public ISrsCoroutineHandler class SrsUdpListener : public ISrsCoroutineHandler
{ {
private: private:
int _fd; srs_netfd_t lfd;
srs_netfd_t _stfd;
SrsCoroutine* trd; SrsCoroutine* trd;
private: private:
char* buf; char* buf;
@ -96,8 +95,7 @@ public:
class SrsTcpListener : public ISrsCoroutineHandler class SrsTcpListener : public ISrsCoroutineHandler
{ {
private: private:
int _fd; srs_netfd_t lfd;
srs_netfd_t _stfd;
SrsCoroutine* trd; SrsCoroutine* trd;
private: private:
ISrsTcpHandler* handler; ISrsTcpHandler* handler;

@ -35,6 +35,9 @@ using namespace std;
#include <srs_service_utility.hpp> #include <srs_service_utility.hpp>
#include <srs_kernel_utility.hpp> #include <srs_kernel_utility.hpp>
// nginx also set to 512
#define SERVER_LISTEN_BACKLOG 512
#ifdef __linux__ #ifdef __linux__
#include <sys/epoll.h> #include <sys/epoll.h>
@ -124,7 +127,7 @@ srs_thread_t srs_thread_self()
return (srs_thread_t)st_thread_self(); return (srs_thread_t)st_thread_self();
} }
srs_error_t srs_socket_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd) srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd)
{ {
st_utime_t timeout = ST_UTIME_NO_TIMEOUT; st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
if (tm != SRS_UTIME_NO_TIMEOUT) { if (tm != SRS_UTIME_NO_TIMEOUT) {
@ -169,6 +172,116 @@ srs_error_t srs_socket_connect(string server, int port, srs_utime_t tm, srs_netf
return srs_success; return srs_success;
} }
srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd)
{
srs_error_t err = srs_success;
char sport[8];
snprintf(sport, sizeof(sport), "%d", port);
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_NUMERICHOST;
addrinfo* r = NULL;
SrsAutoFree(addrinfo, r);
if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",
hints.ai_family, hints.ai_socktype, hints.ai_flags);
}
int fd = 0;
if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",
r->ai_family, r->ai_socktype, r->ai_protocol);
}
// Detect alive for TCP connection.
// @see https://github.com/ossrs/srs/issues/1044
if ((err = srs_fd_keepalive(fd)) != srs_success) {
::close(fd);
return srs_error_wrap(err, "set keepalive fd=%d", fd);
}
if ((err = srs_fd_closeexec(fd)) != srs_success) {
::close(fd);
return srs_error_wrap(err, "set closeexec fd=%d", fd);
}
if ((err = srs_fd_reuseaddr(fd)) != srs_success) {
::close(fd);
return srs_error_wrap(err, "set reuseaddr fd=%d", fd);
}
if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {
::close(fd);
return srs_error_new(ERROR_SOCKET_BIND, "bind fd=%d", fd);
}
if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) {
::close(fd);
return srs_error_new(ERROR_SOCKET_LISTEN, "listen fd=%d", fd);
}
if ((*pfd = srs_netfd_open_socket(fd)) == NULL){
::close(fd);
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open fd=%d", fd);
}
return err;
}
srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd)
{
srs_error_t err = srs_success;
char sport[8];
snprintf(sport, sizeof(sport), "%d", port);
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_NUMERICHOST;
addrinfo* r = NULL;
SrsAutoFree(addrinfo, r);
if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",
hints.ai_family, hints.ai_socktype, hints.ai_flags);
}
int fd = 0;
if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",
r->ai_family, r->ai_socktype, r->ai_protocol);
}
if ((err = srs_fd_closeexec(fd)) != srs_success) {
::close(fd);
return srs_error_wrap(err, "set closeexec fd=%d", fd);
}
if ((err = srs_fd_reuseaddr(fd)) != srs_success) {
::close(fd);
return srs_error_wrap(err, "set reuseaddr fd=%d", fd);
}
if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {
::close(fd);
return srs_error_new(ERROR_SOCKET_BIND, "bind fd=%d", fd);
}
if ((*pfd = srs_netfd_open_socket(fd)) == NULL){
::close(fd);
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open fd=%d", fd);
}
return err;
}
srs_cond_t srs_cond_new() srs_cond_t srs_cond_new()
{ {
return (srs_cond_t)st_cond_new(); return (srs_cond_t)st_cond_new();
@ -459,7 +572,7 @@ srs_error_t SrsTcpClient::connect()
close(); close();
srs_assert(stfd == NULL); srs_assert(stfd == NULL);
if ((err = srs_socket_connect(host, port, timeout, &stfd)) != srs_success) { if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) {
return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout)); return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout));
} }

@ -55,9 +55,15 @@ extern srs_error_t srs_fd_keepalive(int fd);
// Get current coroutine/thread. // Get current coroutine/thread.
extern srs_thread_t srs_thread_self(); extern srs_thread_t srs_thread_self();
// client open socket and connect to server. // For client, to open socket and connect to server.
// @param tm The timeout in srs_utime_t. // @param tm The timeout in srs_utime_t.
extern srs_error_t srs_socket_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd); extern srs_error_t srs_tcp_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd);
// For server, listen at TCP endpoint.
extern srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd);
// For server, listen at UDP endpoint.
extern srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd);
// Wrap for coroutine. // Wrap for coroutine.
extern srs_cond_t srs_cond_new(); extern srs_cond_t srs_cond_new();

Loading…
Cancel
Save