From a9bb6061c3bf282c646dd6071d20c6858496125b Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 13 Oct 2015 17:59:51 +0800 Subject: [PATCH] use tcp client for raw connect. --- trunk/src/app/srs_app_caster_flv.cpp | 16 +++-- trunk/src/app/srs_app_caster_flv.hpp | 4 +- trunk/src/app/srs_app_st.cpp | 89 ++++++++++++++++++++++++++++ trunk/src/app/srs_app_st.hpp | 29 ++++++++- 4 files changed, 126 insertions(+), 12 deletions(-) diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index a0d38cda9..20dc505f7 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -119,9 +119,8 @@ SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, Sr { req = NULL; - io = NULL; + transport = new SrsTcpClient(); client = NULL; - stfd = NULL; stream_id = 0; pprint = SrsPithyPrint::create_caster(); @@ -131,6 +130,7 @@ SrsDynamicHttpConn::~SrsDynamicHttpConn() { close(); + srs_freep(transport); srs_freep(pprint); } @@ -261,7 +261,7 @@ int SrsDynamicHttpConn::connect() // when ok, ignore. // TODO: FIXME: should reconnect when disconnected. - if (io || client) { + if (transport->connected()) { return ret; } @@ -273,12 +273,10 @@ int SrsDynamicHttpConn::connect() } // connect host. - if ((ret = srs_socket_connect(req->host, req->port, ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { - srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret); + if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) { return ret; } - io = new SrsStSocket(stfd); - client = new SrsRtmpClient(io); + client = new SrsRtmpClient(transport); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); @@ -360,10 +358,10 @@ int SrsDynamicHttpConn::connect_app(string ep_server, int ep_port) void SrsDynamicHttpConn::close() { + transport->close(); + srs_freep(client); - srs_freep(io); srs_freep(req); - srs_close_stfd(stfd); } SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index f6a03838e..7020fde41 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -43,6 +43,7 @@ class SrsRequest; class SrsPithyPrint; class ISrsHttpResponseReader; class SrsFlvDecoder; +class SrsTcpClient; #include #include @@ -86,8 +87,7 @@ private: SrsPithyPrint* pprint; private: SrsRequest* req; - st_netfd_t stfd; - SrsStSocket* io; + SrsTcpClient* transport; SrsRtmpClient* client; int stream_id; public: diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index 8d90e8084..f240d6798 100644 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -28,6 +28,7 @@ using namespace std; #include #include +#include namespace internal { @@ -411,18 +412,106 @@ int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) SrsTcpClient::SrsTcpClient() { + io = NULL; + stfd = NULL; } SrsTcpClient::~SrsTcpClient() { + close(); +} + +bool SrsTcpClient::connected() +{ + return io; } int SrsTcpClient::connect(string host, int port, int64_t timeout) { int ret = ERROR_SUCCESS; + + // when connected, ignore. + if (io) { + return ret; + } + + // connect host. + if ((ret = srs_socket_connect(host, port, timeout, &stfd)) != ERROR_SUCCESS) { + srs_error("mpegts: connect server %s:%d failed. ret=%d", host.c_str(), port, ret); + return ret; + } + + io = new SrsStSocket(stfd); + return ret; } +void SrsTcpClient::close() +{ + // when closed, ignore. + if (!io) { + return; + } + + srs_freep(io); + srs_close_stfd(stfd); +} + +bool SrsTcpClient::is_never_timeout(int64_t timeout_us) +{ + return io->is_never_timeout(timeout_us); +} + +void SrsTcpClient::set_recv_timeout(int64_t timeout_us) +{ + io->set_recv_timeout(timeout_us); +} + +int64_t SrsTcpClient::get_recv_timeout() +{ + return io->get_recv_timeout(); +} + +void SrsTcpClient::set_send_timeout(int64_t timeout_us) +{ + io->set_send_timeout(timeout_us); +} + +int64_t SrsTcpClient::get_send_timeout() +{ + return io->get_send_timeout(); +} + +int64_t SrsTcpClient::get_recv_bytes() +{ + return io->get_recv_bytes(); +} + +int64_t SrsTcpClient::get_send_bytes() +{ + return io->get_send_bytes(); +} + +int SrsTcpClient::read(void* buf, size_t size, ssize_t* nread) +{ + return io->read(buf, size, nread); +} + +int SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread) +{ + return io->read_fully(buf, size, nread); +} + +int SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite) +{ + return io->write(buf, size, nwrite); +} + +int SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite) +{ + return io->writev(iov, iov_size, nwrite); +} + #ifdef __linux__ #include diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 6125c50f0..faa30f6d5 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -208,19 +208,46 @@ public: * the common tcp client, to connect to specified TCP server, * reconnect and close the connection. */ -class SrsTcpClient +class SrsTcpClient : public ISrsProtocolReaderWriter { +private: + st_netfd_t stfd; + SrsStSocket* io; public: SrsTcpClient(); virtual ~SrsTcpClient(); +public: + /** + * whether connected to server. + */ + virtual bool connected(); public: /** * connect to server over TCP. * @param host the ip or hostname of server. * @param port the port to connect to. * @param timeout the timeout in us. + * @remark ignore when connected. */ virtual int connect(std::string host, int port, int64_t timeout); + /** + * close the connection. + * @remark ignore when closed. + */ + virtual void close(); +// interface ISrsProtocolReaderWriter +public: + virtual bool is_never_timeout(int64_t timeout_us); + virtual void set_recv_timeout(int64_t timeout_us); + virtual int64_t get_recv_timeout(); + virtual void set_send_timeout(int64_t timeout_us); + virtual int64_t get_send_timeout(); + virtual int64_t get_recv_bytes(); + virtual int64_t get_send_bytes(); + virtual int read(void* buf, size_t size, ssize_t* nread); + virtual int read_fully(void* buf, size_t size, ssize_t* nread); + virtual int write(void* buf, size_t size, ssize_t* nwrite); + virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite); }; // initialize st, requires epoll.