diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index f7e5ee802..b174c96e0 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -116,6 +116,7 @@ auto_reload_for_docker on; # the rtmp listen ports, split by space, each listen entry is <[ip:]port> # for example, 192.168.1.100:1935 10.10.10.100:1935 # where the ip is optional, default to 0.0.0.0, that is 1935 equals to 0.0.0.0:1935 +# Overwrite by env SRS_LISTEN listen 1935; # the default chunk size is 128, max is 65536, # some client does not support chunk size change, @@ -135,12 +136,14 @@ chunk_size 60000; # where the cli can only be used in shell/terminate. http_api { # whether http api is enabled. + # Overwrite by env SRS_HTTP_API_ENABLED # default: off enabled on; # The http api listen entry is <[ip:]port>, For example, 192.168.1.100:8080, where the ip is optional, default to # 0.0.0.0, that is 8080 equals to 0.0.0.0:8080. # Note that you're able to use a dedicated port for HTTP API, such as 1985, to be different with HTTP server. In # this situation, you you must also set another HTTPS API port. + # Overwrite by env SRS_HTTP_API_LISTEN # Default: 1985 listen 8080; # whether enable crossdomain request. @@ -164,11 +167,13 @@ http_api { # For https_api or HTTPS API. https { # Whether enable HTTPS API. + # Overwrite by env SRS_HTTP_API_HTTPS_ENABLED # default: off enabled on; # The listen endpoint for HTTPS API. # Note that you're able to use a dedicated port for HTTPS API, such as 1990, and the HTTP API should not be # the same of HTTP server(8080) neither. + # Overwrite by env SRS_HTTP_API_HTTPS_LISTEN # Default: 1990 listen 8088; # The SSL private key file, generated by: @@ -193,12 +198,14 @@ http_api { # need to open the feature http of vhost. http_server { # whether http streaming service is enabled. + # Overwrite by env SRS_HTTP_SERVER_ENABLED # default: off enabled on; # the http streaming listen entry is <[ip:]port> # for example, 192.168.1.100:8080 # where the ip is optional, default to 0.0.0.0, that is 8080 equals to 0.0.0.0:8080 # @remark, if use lower port, for instance 80, user must start srs by root. + # Overwrite by env SRS_HTTP_SERVER_LISTEN # default: 8080 listen 8080; # the default dir for http root. @@ -211,9 +218,11 @@ http_server { # For https_server or HTTPS Streaming. https { # Whether enable HTTPS Streaming. + # Overwrite by env SRS_HTTP_SERVER_HTTTPS_ENABLED # default: off enabled on; # The listen endpoint for HTTPS Streaming. + # Overwrite by env SRS_HTTP_SERVER_HTTTPS_LISTEN # default: 8088 listen 8088; # The SSL private key file, generated by: @@ -297,9 +306,11 @@ vhost srt.vhost.srs.com { ############################################################################################# rtc_server { # Whether enable WebRTC server. + # Overwrite by env SRS_RTC_SERVER_ENABLED # default: off enabled on; # The udp listen port, we will reuse it for connections. + # Overwrite by env SRS_RTC_SERVER_LISTEN # default: 8000 listen 8000; # For WebRTC over TCP directly, not TURN, see https://github.com/ossrs/srs/issues/2852 @@ -329,11 +340,13 @@ rtc_server { # x.x.x.x A specified IP address or DNS name, use * if 0.0.0.0. # @remark For Firefox, the candidate MUST be IP, MUST NOT be DNS name, see https://bugzilla.mozilla.org/show_bug.cgi?id=1239006 # @see https://ossrs.net/lts/zh-cn/docs/v4/doc/webrtc#config-candidate + # Overwrite by env SRS_RTC_SERVER_CANDIDATE # default: * candidate *; # If candidate is * or 0.0.0.0, means SRS could detect IP automatically, filtered by ip_family. # You can config this to off to disable the detecting, then SRS will try to parse the API hostname. # Note that browser might fail if no CANDIDATE specified. + # Overwrite by env SRS_RTC_SERVER_USE_AUTO_DETECT_NETWORK_IP # Default: on use_auto_detect_network_ip on; # The IP family filter for auto discover candidate, it can be: @@ -341,20 +354,24 @@ rtc_server { # ipv6 Filter IP v6 candidates. # all Filter all IP v4 or v6 candidates. # For example, if set to ipv4, we only use the IPv4 address as candidate. + # Overwrite by env SRS_RTC_SERVER_IP_FAMILY # default: ipv4 ip_family ipv4; # If api_as_candidates is on, SRS would try to use the IP of api server, specified by srs.sdk.js request: # api:string "http://r.ossrs.net:1985/rtc/v1/play/" # in this case, the r.ossrs.net and 39.107.238.185 will be added as candidates. + # Overwrite by env SRS_RTC_SERVER_API_AS_CANDIDATES # Default: on api_as_candidates on; # If use api as CANDIDATE, whether resolve the api hostname. # Note that use original domain name as CANDIDATE, which might make Firefox failed, see https://bugzilla.mozilla.org/show_bug.cgi?id=1239006 # Note that if hostname is IPv4 address, always directly use it. + # Overwrite by env SRS_RTC_SERVER_RESOLVE_API_DOMAIN # Default: on resolve_api_domain on; # If use api as CANDIDATE, whether keep original api domain name as CANDIDATE. # Note that use original domain name as CANDIDATE, which might make Firefox failed, see https://bugzilla.mozilla.org/show_bug.cgi?id=1239006 + # Overwrite by env SRS_RTC_SERVER_KEEP_API_DOMAIN # Default: off keep_api_domain off; # Whether use ECDSA certificate. @@ -390,6 +407,7 @@ rtc_server { vhost rtc.vhost.srs.com { rtc { # Whether enable WebRTC server. + # Overwrite by env SRS_VHOST_RTC_ENABLED for all vhosts. # default: off enabled on; # Whether support NACK. @@ -420,6 +438,7 @@ vhost rtc.vhost.srs.com { ############################################################### # Whether enable transmuxing RTMP to RTC. # If enabled, transcode aac to opus. + # Overwrite by env SRS_VHOST_RTC_RTMP_TO_RTC for all vhosts. # default: off rtmp_to_rtc off; # Whether keep B-frame, which is normal feature in live streaming, @@ -428,6 +447,7 @@ vhost rtc.vhost.srs.com { keep_bframe off; ############################################################### # Whether enable transmuxing RTC to RTMP. + # Overwrite by env SRS_VHOST_RTC_RTC_TO_RTMP for all vhosts. # Default: off rtc_to_rtmp off; # The PLI interval in seconds, for RTC to RTMP. diff --git a/trunk/configure b/trunk/configure index 317640733..48e19ad15 100755 --- a/trunk/configure +++ b/trunk/configure @@ -260,7 +260,7 @@ MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_sourc "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_edge" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" - "srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call" "srs_app_rtc_network" + "srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call" "srs_app_caster_flv" "srs_app_latest_version" "srs_app_uuid" "srs_app_process" "srs_app_ng_exec" "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" "srs_app_coworkers" "srs_app_hybrid" "srs_app_threads") @@ -268,7 +268,7 @@ if [[ $SRS_SRT == YES ]]; then MODULE_FILES+=("srs_app_srt_server" "srs_app_srt_listener" "srs_app_srt_conn" "srs_app_srt_utility" "srs_app_srt_source") fi if [[ $SRS_RTC == YES ]]; then - MODULE_FILES+=("srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_sdp" + MODULE_FILES+=("srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_sdp" "srs_app_rtc_network" "srs_app_rtc_queue" "srs_app_rtc_server" "srs_app_rtc_source" "srs_app_rtc_api") fi if [[ $SRS_FFMPEG_FIT == YES ]]; then diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index c51d6559d..55dfed564 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 5.0 Changelog +* v5.0, 2022-09-04, Fix [#2852](https://github.com/ossrs/srs/issues/2852): WebRTC: WebRTC over TCP directly, not TURN. v5.0.60 * v5.0, 2022-09-01, Fix [#1405](https://github.com/ossrs/srs/issues/1405): Restore the stream when parsing failed. v5.0.59 * v5.0, 2022-09-01, Fix [#1405](https://github.com/ossrs/srs/issues/1405): Support guessing IBMF first. v5.0.58 * v5.0, 2022-09-01, ST: Define and use a new jmpbuf. v5.0.57 diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 7c5db1d3b..8c0762136 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -290,10 +290,6 @@ srs_error_t SrsDynamicHttpConn::start() return srs_error_wrap(err, "set cors=%d", v); } - if ((err = skt->initialize()) != srs_success) { - return srs_error_wrap(err, "init socket"); - } - return conn->start(); } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 7aac3ad53..d8e181134 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2868,6 +2868,12 @@ int SrsConfig::get_max_connections() vector SrsConfig::get_listens() { std::vector ports; + + // SRS_OVERWRITE_BY_ENV_STRING("SRS_LISTEN") + if (getenv("SRS_LISTEN")) { + ports.push_back(getenv("SRS_LISTEN")); + return ports; + } SrsConfDirective* conf = root->get("listen"); if (!conf) { @@ -3575,6 +3581,8 @@ bool SrsConfig::get_rtc_server_enabled() bool SrsConfig::get_rtc_server_enabled(SrsConfDirective* conf) { + SRS_OVERWRITE_BY_ENV_BOOL("SRS_RTC_SERVER_ENABLED"); + static bool DEFAULT = false; if (!conf) { @@ -3591,6 +3599,8 @@ bool SrsConfig::get_rtc_server_enabled(SrsConfDirective* conf) int SrsConfig::get_rtc_server_listen() { + SRS_OVERWRITE_BY_ENV_INT("SRS_RTC_SERVER_LISTEN"); + static int DEFAULT = 8000; SrsConfDirective* conf = root->get("rtc_server"); @@ -3608,6 +3618,8 @@ int SrsConfig::get_rtc_server_listen() std::string SrsConfig::get_rtc_server_candidates() { + SRS_OVERWRITE_BY_ENV_STRING("SRS_RTC_SERVER_CANDIDATE"); + static string DEFAULT = "*"; SrsConfDirective* conf = root->get("rtc_server"); @@ -3635,6 +3647,8 @@ std::string SrsConfig::get_rtc_server_candidates() bool SrsConfig::get_api_as_candidates() { + SRS_OVERWRITE_BY_ENV_BOOL2("SRS_RTC_SERVER_API_AS_CANDIDATES"); + static bool DEFAULT = true; SrsConfDirective* conf = root->get("rtc_server"); @@ -3652,6 +3666,8 @@ bool SrsConfig::get_api_as_candidates() bool SrsConfig::get_resolve_api_domain() { + SRS_OVERWRITE_BY_ENV_BOOL2("SRS_RTC_SERVER_RESOLVE_API_DOMAIN"); + static bool DEFAULT = true; SrsConfDirective* conf = root->get("rtc_server"); @@ -3669,6 +3685,8 @@ bool SrsConfig::get_resolve_api_domain() bool SrsConfig::get_keep_api_domain() { + SRS_OVERWRITE_BY_ENV_BOOL("SRS_RTC_SERVER_KEEP_API_DOMAIN"); + static bool DEFAULT = false; SrsConfDirective* conf = root->get("rtc_server"); @@ -3686,6 +3704,8 @@ bool SrsConfig::get_keep_api_domain() bool SrsConfig::get_use_auto_detect_network_ip() { + SRS_OVERWRITE_BY_ENV_BOOL2("SRS_RTC_SERVER_USE_AUTO_DETECT_NETWORK_IP"); + static bool DEFAULT = true; SrsConfDirective* conf = root->get("rtc_server"); @@ -3770,6 +3790,8 @@ std::string SrsConfig::get_rtc_server_protocol() std::string SrsConfig::get_rtc_server_ip_family() { + SRS_OVERWRITE_BY_ENV_STRING("SRS_RTC_SERVER_IP_FAMILY"); + static string DEFAULT = "ipv4"; SrsConfDirective* conf = root->get("rtc_server"); @@ -3919,6 +3941,8 @@ SrsConfDirective* SrsConfig::get_rtc(string vhost) bool SrsConfig::get_rtc_enabled(string vhost) { + SRS_OVERWRITE_BY_ENV_BOOL("SRS_VHOST_RTC_ENABLED"); + static bool DEFAULT = false; SrsConfDirective* conf = get_rtc(vhost); @@ -3955,6 +3979,8 @@ bool SrsConfig::get_rtc_keep_bframe(string vhost) bool SrsConfig::get_rtc_from_rtmp(string vhost) { + SRS_OVERWRITE_BY_ENV_BOOL("SRS_VHOST_RTC_RTMP_TO_RTC"); + static bool DEFAULT = false; SrsConfDirective* conf = get_rtc(vhost); @@ -4062,6 +4088,8 @@ int SrsConfig::get_rtc_drop_for_pt(string vhost) bool SrsConfig::get_rtc_to_rtmp(string vhost) { + SRS_OVERWRITE_BY_ENV_BOOL("SRS_VHOST_RTC_RTC_TO_RTMP"); + static bool DEFAULT = false; SrsConfDirective* conf = get_rtc(vhost); @@ -6686,6 +6714,8 @@ bool SrsConfig::get_http_api_enabled() bool SrsConfig::get_http_api_enabled(SrsConfDirective* conf) { + SRS_OVERWRITE_BY_ENV_BOOL("SRS_HTTP_API_ENABLED"); + static bool DEFAULT = false; if (!conf) { @@ -6702,6 +6732,8 @@ bool SrsConfig::get_http_api_enabled(SrsConfDirective* conf) string SrsConfig::get_http_api_listen() { + SRS_OVERWRITE_BY_ENV_STRING("SRS_HTTP_API_LISTEN"); + static string DEFAULT = "1985"; SrsConfDirective* conf = root->get("http_api"); @@ -6803,6 +6835,8 @@ SrsConfDirective* SrsConfig::get_https_api() bool SrsConfig::get_https_api_enabled() { + SRS_OVERWRITE_BY_ENV_BOOL("SRS_HTTP_API_HTTPS_ENABLED"); + static bool DEFAULT = false; SrsConfDirective* conf = get_https_api(); @@ -6820,6 +6854,8 @@ bool SrsConfig::get_https_api_enabled() string SrsConfig::get_https_api_listen() { + SRS_OVERWRITE_BY_ENV_STRING("SRS_HTTP_API_HTTPS_LISTEN"); + #ifdef SRS_UTEST // We should not use static default, because we need to reset for different testcase. string DEFAULT = ""; @@ -7178,6 +7214,8 @@ bool SrsConfig::get_http_stream_enabled() bool SrsConfig::get_http_stream_enabled(SrsConfDirective* conf) { + SRS_OVERWRITE_BY_ENV_BOOL("SRS_HTTP_SERVER_ENABLED"); + static bool DEFAULT = false; if (!conf) { @@ -7194,6 +7232,8 @@ bool SrsConfig::get_http_stream_enabled(SrsConfDirective* conf) string SrsConfig::get_http_stream_listen() { + SRS_OVERWRITE_BY_ENV_STRING("SRS_HTTP_SERVER_LISTEN"); + static string DEFAULT = "8080"; SrsConfDirective* conf = root->get("http_server"); @@ -7255,6 +7295,8 @@ SrsConfDirective* SrsConfig::get_https_stream() bool SrsConfig::get_https_stream_enabled() { + SRS_OVERWRITE_BY_ENV_BOOL("SRS_HTTP_SERVER_HTTTPS_ENABLED"); + static bool DEFAULT = false; SrsConfDirective* conf = get_https_stream(); @@ -7272,6 +7314,8 @@ bool SrsConfig::get_https_stream_enabled() string SrsConfig::get_https_stream_listen() { + SRS_OVERWRITE_BY_ENV_STRING("SRS_HTTP_SERVER_HTTTPS_LISTEN"); + static string DEFAULT = "8088"; SrsConfDirective* conf = get_https_stream(); diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 394baa05f..1f30ca86c 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -18,7 +18,7 @@ using namespace std; #include #include #include - +#include #include SrsPps* _srs_pps_ids = NULL; @@ -417,7 +417,7 @@ ISrsExpire::~ISrsExpire() SrsTcpConnection::SrsTcpConnection(srs_netfd_t c) { stfd = c; - skt = new SrsStSocket(); + skt = new SrsStSocket(c); } SrsTcpConnection::~SrsTcpConnection() @@ -426,17 +426,6 @@ SrsTcpConnection::~SrsTcpConnection() srs_close_stfd(stfd); } -srs_error_t SrsTcpConnection::initialize() -{ - srs_error_t err = srs_success; - - if ((err = skt->initialize(stfd)) != srs_success) { - return srs_error_wrap(err, "init socket"); - } - - return err; -} - srs_error_t SrsTcpConnection::set_tcp_nodelay(bool v) { srs_error_t err = srs_success; @@ -570,6 +559,130 @@ srs_error_t SrsTcpConnection::writev(const iovec *iov, int iov_size, ssize_t* nw return skt->writev(iov, iov_size, nwrite); } +SrsBufferedReader::SrsBufferedReader(ISrsProtocolReadWriter* io) +{ + io_ = io; + buf_ = NULL; +} + +SrsBufferedReader::~SrsBufferedReader() +{ + srs_freep(buf_); +} + +srs_error_t SrsBufferedReader::peek(char* buf, int* size) +{ + srs_error_t err = srs_success; + + if ((err = reload_buffer()) != srs_success) { + return srs_error_wrap(err, "reload buffer"); + } + + int nn = srs_min(buf_->left(), *size); + *size = nn; + + if (nn) { + memcpy(buf, buf_->head(), nn); + } + + return err; +} + +srs_error_t SrsBufferedReader::reload_buffer() +{ + srs_error_t err = srs_success; + + if (buf_ && !buf_->empty()) { + return err; + } + + // We use read_fully to always full fill the cache, to avoid peeking failed. + ssize_t nread = 0; + if ((err = io_->read_fully(cache_, sizeof(cache_), &nread)) != srs_success) { + return srs_error_wrap(err, "read"); + } + + srs_freep(buf_); + buf_ = new SrsBuffer(cache_, nread); + + return err; +} + +srs_error_t SrsBufferedReader::read(void* buf, size_t size, ssize_t* nread) +{ + if (!buf_ || buf_->empty()) { + return io_->read(buf, size, nread); + } + + int nn = srs_min(buf_->left(), size); + *nread = nn; + + if (nn) { + buf_->read_bytes((char*)buf, nn); + } + return srs_success; +} + +srs_error_t SrsBufferedReader::read_fully(void* buf, size_t size, ssize_t* nread) +{ + if (!buf_ || buf_->empty()) { + return io_->read_fully(buf, size, nread); + } + + int nn = srs_min(buf_->left(), size); + if (nn) { + buf_->read_bytes((char*)buf, nn); + } + + int left = size - nn; + *nread = size; + + if (left) { + return io_->read_fully((char*)buf + nn, left, NULL); + } + return srs_success; +} + +void SrsBufferedReader::set_recv_timeout(srs_utime_t tm) +{ + return io_->set_recv_timeout(tm); +} + +srs_utime_t SrsBufferedReader::get_recv_timeout() +{ + return io_->get_recv_timeout(); +} + +int64_t SrsBufferedReader::get_recv_bytes() +{ + return io_->get_recv_bytes(); +} + +int64_t SrsBufferedReader::get_send_bytes() +{ + return io_->get_send_bytes(); +} + +void SrsBufferedReader::set_send_timeout(srs_utime_t tm) +{ + return io_->set_send_timeout(tm); +} + +srs_utime_t SrsBufferedReader::get_send_timeout() +{ + return io_->get_send_timeout(); +} + +srs_error_t SrsBufferedReader::write(void* buf, size_t size, ssize_t* nwrite) +{ + return io_->write(buf, size, nwrite); +} + +srs_error_t SrsBufferedReader::writev(const iovec *iov, int iov_size, ssize_t* nwrite) +{ + return io_->writev(iov, iov_size, nwrite); +} + SrsSslConnection::SrsSslConnection(ISrsProtocolReadWriter* c) { transport = c; diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 6cba50549..99255cc98 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -21,6 +21,7 @@ #include class SrsWallClock; +class SrsBuffer; // Hooks for connection manager, to handle the event when disposing connections. class ISrsDisposingHandler @@ -148,8 +149,6 @@ private: public: SrsTcpConnection(srs_netfd_t c); virtual ~SrsTcpConnection(); -public: - virtual srs_error_t initialize(); public: // Set socket option TCP_NODELAY. virtual srs_error_t set_tcp_nodelay(bool v); @@ -169,6 +168,40 @@ public: virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite); }; +// With a small fast read buffer, to support peek for protocol detecting. Note that directly write to io without any +// cache or buffer. +class SrsBufferedReader : public ISrsProtocolReadWriter +{ +private: + // The under-layer transport. + ISrsProtocolReadWriter* io_; + // Fixed, small and fast buffer. Note that it must be very small piece of cache, make sure matches all protocols, + // because we will full fill it when peeking. + char cache_[16]; + // Current reading position. + SrsBuffer* buf_; +public: + SrsBufferedReader(ISrsProtocolReadWriter* io); + virtual ~SrsBufferedReader(); +public: + // Peek the head of cache to buf in size of bytes. + srs_error_t peek(char* buf, int* size); +private: + srs_error_t reload_buffer(); +// Interface ISrsProtocolReadWriter +public: + virtual srs_error_t read(void* buf, size_t size, ssize_t* nread); + virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread); + virtual void set_recv_timeout(srs_utime_t tm); + virtual srs_utime_t get_recv_timeout(); + virtual int64_t get_recv_bytes(); + virtual int64_t get_send_bytes(); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_utime_t get_send_timeout(); + virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); + virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite); +}; + // The SSL connection over TCP transport, in server mode. class SrsSslConnection : public ISrsProtocolReadWriter { diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 6860a6f0c..652689446 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -285,21 +285,21 @@ void SrsHttpConn::expire() trd->interrupt(); } -SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) +SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, ISrsProtocolReadWriter* io, ISrsHttpServeMux* m, string cip, int port) { // Create a identify for this client. _srs_context->set_id(_srs_context->generate_id()); + io_ = io; manager = cm; - skt = new SrsTcpConnection(fd); enable_stat_ = false; if (https) { - ssl = new SrsSslConnection(skt); + ssl = new SrsSslConnection(io_); conn = new SrsHttpConn(this, ssl, m, cip, port); } else { ssl = NULL; - conn = new SrsHttpConn(this, skt, m, cip, port); + conn = new SrsHttpConn(this, io_, m, cip, port); } _srs_config->subscribe(this); @@ -311,7 +311,7 @@ SrsHttpxConn::~SrsHttpxConn() srs_freep(conn); srs_freep(ssl); - srs_freep(skt); + srs_freep(io_); } void SrsHttpxConn::set_enable_stat(bool v) @@ -323,7 +323,7 @@ srs_error_t SrsHttpxConn::pop_message(ISrsHttpMessage** preq) { srs_error_t err = srs_success; - ISrsProtocolReadWriter* io = skt; + ISrsProtocolReadWriter* io = io_; if (ssl) { io = ssl; } @@ -424,16 +424,6 @@ srs_error_t SrsHttpxConn::on_conn_done(srs_error_t r0) return r0; } -srs_error_t SrsHttpxConn::set_tcp_nodelay(bool v) -{ - return skt->set_tcp_nodelay(v); -} - -srs_error_t SrsHttpxConn::set_socket_buffer(srs_utime_t buffer_v) -{ - return skt->set_socket_buffer(buffer_v); -} - std::string SrsHttpxConn::desc() { if (ssl) { @@ -461,10 +451,6 @@ srs_error_t SrsHttpxConn::start() return srs_error_wrap(err, "set cors=%d", v); } - if ((err = skt->initialize()) != srs_success) { - return srs_error_wrap(err, "init socket"); - } - return conn->start(); } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 5970cd9ee..aebd484e9 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -128,13 +128,13 @@ class SrsHttpxConn : public ISrsConnection, public ISrsStartable, public ISrsHtt private: // The manager object to manage the connection. ISrsResourceManager* manager; - SrsTcpConnection* skt; + ISrsProtocolReadWriter* io_; SrsSslConnection* ssl; SrsHttpConn* conn; // We should never enable the stat, unless HTTP stream connection requires. bool enable_stat_; public: - SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); + SrsHttpxConn(bool https, ISrsResourceManager* cm, ISrsProtocolReadWriter* io, ISrsHttpServeMux* m, std::string cip, int port); virtual ~SrsHttpxConn(); public: // Require statistic about HTTP connection, for HTTP streaming clients only. @@ -151,12 +151,6 @@ public: virtual srs_error_t on_http_message(ISrsHttpMessage* r, SrsHttpResponseWriter* w); virtual srs_error_t on_message_done(ISrsHttpMessage* r, SrsHttpResponseWriter* w); virtual srs_error_t on_conn_done(srs_error_t r0); -// Extract APIs from SrsTcpConnection. -public: - // Set socket option TCP_NODELAY. - virtual srs_error_t set_tcp_nodelay(bool v); - // Set socket option SO_SNDBUF in srs_utime_t. - virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v); // Interface ISrsResource. public: virtual std::string desc(); diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index b143124ca..bb56420cd 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -628,19 +628,6 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // Note that the handler of hc now is hxc. SrsHttpxConn* hxc = dynamic_cast(hc->handler()); srs_assert(hxc); - - // Set the socket options for transport. - bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost); - if (tcp_nodelay) { - if ((err = hxc->set_tcp_nodelay(tcp_nodelay)) != srs_success) { - return srs_error_wrap(err, "set tcp nodelay"); - } - } - - srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost); - if ((err = hxc->set_socket_buffer(mw_sleep)) != srs_success) { - return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep); - } // Start a thread to receive all messages from client, then drop them. SrsHttpRecvThread* trd = new SrsHttpRecvThread(hxc); @@ -649,10 +636,10 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "start recv thread"); } - - srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d", - entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, srsu2msi(mw_sleep), - enc->has_cache(), msgs.max); + + srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost); + srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d", entry->pattern.c_str(), enc_desc.c_str(), + srsu2msi(mw_sleep), enc->has_cache(), msgs.max); // TODO: free and erase the disabled entry after all related connections is closed. // TODO: FXIME: Support timeout for player, quit infinite-loop. diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index f14d156d9..85e836136 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1314,7 +1314,7 @@ srs_error_t SrsRtcPublishStream::on_twcc(uint16_t sn) { return err; } -srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) +srs_error_t SrsRtcPublishStream::on_rtp_cipher(char* data, int nb_data) { srs_error_t err = srs_success; @@ -1350,33 +1350,6 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) } } - // Decrypt the cipher to plaintext RTP data. - char* plaintext = data; - int nb_plaintext = nb_data; - if ((err = session_->network_->unprotect_rtp(plaintext, &nb_plaintext)) != srs_success) { - // We try to decode the RTP header for more detail error informations. - SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); - srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding. - - err = srs_error_wrap(err, "marker=%u, pt=%u, seq=%u, ts=%u, ssrc=%u, pad=%u, payload=%uB", h.get_marker(), h.get_payload_type(), - h.get_sequence(), h.get_timestamp(), h.get_ssrc(), h.get_padding(), nb_data - b.pos()); - - return err; - } - - // Handle the plaintext RTP packet. - if ((err = on_rtp_plaintext(plaintext, nb_plaintext)) != srs_success) { - // We try to decode the RTP header for more detail error informations. - SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); - srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding. - - int nb_header = h.nb_bytes(); - const char* body = data + nb_header; - int nb_body = nb_data - nb_header; - return srs_error_wrap(err, "cipher=%u, plaintext=%u, body=[%s]", nb_data, nb_plaintext, - srs_string_dumps_hex(body, nb_body, 8).c_str()); - } - return err; } @@ -1769,14 +1742,13 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) cid_ = cid; server_ = s; - network_ = new SrsRtcNetwork(this); + networks_ = new SrsRtcNetworks(this); cache_iov_ = new iovec(); cache_iov_->iov_base = new char[kRtpPacketSize]; cache_iov_->iov_len = kRtpPacketSize; cache_buffer_ = new SrsBuffer((char*)cache_iov_->iov_base, kRtpPacketSize); - state_ = INIT; last_stun_time = 0; session_timeout = 0; disposing_ = false; @@ -1814,7 +1786,7 @@ SrsRtcConnection::~SrsRtcConnection() players_ssrc_map_.clear(); // Free network over UDP or TCP. - srs_freep(network_); + srs_freep(networks_); if (true) { char* iov_base = (char*)cache_iov_->iov_base; @@ -1872,14 +1844,9 @@ void SrsRtcConnection::set_remote_sdp(const SrsSdp& sdp) remote_sdp = sdp; } -SrsRtcConnectionStateType SrsRtcConnection::state() -{ - return state_; -} - -void SrsRtcConnection::set_state(SrsRtcConnectionStateType state) +void SrsRtcConnection::set_state_as_waiting_stun() { - state_ = state; + networks_->set_state(SrsRtcNetworkStateWaitingStun); } string SrsRtcConnection::username() @@ -1889,7 +1856,7 @@ string SrsRtcConnection::username() ISrsKbpsDelta* SrsRtcConnection::delta() { - return network_->delta(); + return networks_->delta(); } const SrsContextId& SrsRtcConnection::get_id() @@ -2010,7 +1977,7 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st req_ = r->copy(); SrsSessionConfig* cfg = &local_sdp.session_negotiate_; - if ((err = network_->initialize(cfg, dtls, srtp)) != srs_success) { + if ((err = networks_->initialize(cfg, dtls, srtp)) != srs_success) { return srs_error_wrap(err, "init"); } @@ -2027,45 +1994,10 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st return err; } -srs_error_t SrsRtcConnection::on_stun(SrsStunPacket* r, char* data, int nb_data) -{ - srs_error_t err = srs_success; - - // Write STUN messages to blackhole. - if (_srs_blackhole->blackhole) { - _srs_blackhole->sendto(data, nb_data); - } - - if (!r->is_binding_request()) { - return err; - } - - if ((err = on_binding_request(r)) != srs_success) { - return srs_error_wrap(err, "stun binding request failed"); - } - - return err; -} - -srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data) -{ - return network_->on_dtls(data, nb_data); -} - -srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data) +srs_error_t SrsRtcConnection::on_rtcp(char* unprotected_buf, int nb_unprotected_buf) { srs_error_t err = srs_success; - int nb_unprotected_buf = nb_data; - if ((err = network_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) { - return srs_error_wrap(err, "rtcp unprotect"); - } - - char* unprotected_buf = data; - if (_srs_blackhole->blackhole) { - _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); - } - SrsBuffer* buffer = new SrsBuffer(unprotected_buf, nb_unprotected_buf); SrsAutoFree(SrsBuffer, buffer); @@ -2082,7 +2014,7 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data) SrsAutoFree(SrsRtcpCommon, rtcp); if(srs_success != err) { - return srs_error_wrap(err, "cipher=%u, plaintext=%u, bytes=[%s], rtcp=(%u,%u,%u,%u)", nb_data, nb_unprotected_buf, + return srs_error_wrap(err, "plaintext=%u, bytes=[%s], rtcp=(%u,%u,%u,%u)", nb_unprotected_buf, srs_string_dumps_hex(rtcp->data(), rtcp->size(), rtcp->size()).c_str(), rtcp->get_rc(), rtcp->type(), rtcp->get_ssrc(), rtcp->size()); } @@ -2185,7 +2117,7 @@ srs_error_t SrsRtcConnection::on_rtcp_feedback_remb(SrsRtcpPsfbCommon *rtcp) return srs_success; } -srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data) +srs_error_t SrsRtcConnection::on_rtp_cipher(char* data, int nb_data) { srs_error_t err = srs_success; @@ -2195,7 +2127,20 @@ srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data) } srs_assert(publisher); - return publisher->on_rtp(data, nb_data); + return publisher->on_rtp_cipher(data, nb_data); +} + +srs_error_t SrsRtcConnection::on_rtp_plaintext(char* data, int nb_data) +{ + srs_error_t err = srs_success; + + SrsRtcPublishStream* publisher = NULL; + if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) { + return srs_error_wrap(err, "find"); + } + srs_assert(publisher); + + return publisher->on_rtp_plaintext(data, nb_data); } srs_error_t SrsRtcConnection::find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher) @@ -2230,12 +2175,6 @@ srs_error_t SrsRtcConnection::on_connection_established() return err; } - // If DTLS done packet received many times, such as ARQ, ignore. - if(ESTABLISHED == state_) { - return err; - } - state_ = ESTABLISHED; - srs_trace("RTC: session pub=%u, sub=%u, to=%dms connection established", publishers_.size(), players_.size(), srsu2msi(session_timeout)); @@ -2294,7 +2233,7 @@ void SrsRtcConnection::alive() SrsRtcUdpNetwork* SrsRtcConnection::udp() { - return network_->udp(); + return networks_->udp(); } srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data) @@ -2304,11 +2243,11 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data) ++_srs_pps_srtcps->sugar; int nb_buf = nb_data; - if ((err = network_->protect_rtcp(data, &nb_buf)) != srs_success) { + if ((err = networks_->available()->protect_rtcp(data, &nb_buf)) != srs_success) { return srs_error_wrap(err, "protect rtcp"); } - if ((err = network_->write(data, nb_buf, NULL)) != srs_success) { + if ((err = networks_->available()->write(data, nb_buf, NULL)) != srs_success) { return srs_error_wrap(err, "send"); } @@ -2492,7 +2431,7 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt) // Cipher RTP to SRTP packet. if (true) { int nn_encrypt = (int)iov->iov_len; - if ((err = network_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) { + if ((err = networks_->available()->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) { return srs_error_wrap(err, "srtp protect"); } iov->iov_len = (size_t)nn_encrypt; @@ -2507,7 +2446,7 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt) ++_srs_pps_srtps->sugar; - if ((err = network_->write(iov->iov_base, iov->iov_len, NULL)) != srs_success) { + if ((err = networks_->available()->write(iov->iov_base, iov->iov_len, NULL)) != srs_success) { srs_warn("RTC: Write %d bytes err %s", iov->iov_len, srs_error_desc(err).c_str()); srs_freep(err); return err; @@ -2544,14 +2483,7 @@ void SrsRtcConnection::set_all_tracks_status(std::string stream_uri, bool is_pub player->set_all_tracks_status(status); } -#ifdef SRS_OSX -// These functions are similar to the older byteorder(3) family of functions. -// For example, be32toh() is identical to ntohl(). -// @see https://linux.die.net/man/3/be32toh -#define be32toh ntohl -#endif - -srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) +srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r, string& ice_pwd) { srs_error_t err = srs_success; @@ -2564,40 +2496,8 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) return srs_error_new(ERROR_RTC_STUN, "Peer must not in ice-controlled role in ice-lite mode."); } - SrsStunPacket stun_binding_response; - char buf[kRtpPacketSize]; - SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); - SrsAutoFree(SrsBuffer, stream); - - stun_binding_response.set_message_type(BindingResponse); - stun_binding_response.set_local_ufrag(r->get_remote_ufrag()); - stun_binding_response.set_remote_ufrag(r->get_local_ufrag()); - stun_binding_response.set_transcation_id(r->get_transcation_id()); - // FIXME: inet_addr is deprecated, IPV6 support - stun_binding_response.set_mapped_address(be32toh(inet_addr(network_->get_peer_ip().c_str()))); - stun_binding_response.set_mapped_port(network_->get_peer_port()); - - if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { - return srs_error_wrap(err, "stun binding response encode failed"); - } - - if ((err = network_->write(stream->data(), stream->pos(), NULL)) != srs_success) { - return srs_error_wrap(err, "stun binding response send failed"); - } - - if (state_ == WAITING_STUN) { - state_ = DOING_DTLS_HANDSHAKE; - // TODO: FIXME: Add cost. - srs_trace("RTC: session STUN done, waiting DTLS handshake."); - - if((err = network_->start_active_handshake()) != srs_success) { - return srs_error_wrap(err, "fail to dtls handshake"); - } - } - - if (_srs_blackhole->blackhole) { - _srs_blackhole->sendto(stream->data(), stream->pos()); - } + // If success, return the ice password to verify the STUN response. + ice_pwd = local_sdp.get_ice_pwd(); return err; } @@ -3373,13 +3273,9 @@ srs_error_t SrsRtcConnection::create_player(SrsRequest* req, std::mapstart())) { - return srs_error_wrap(err, "start player"); - } - } return err; } @@ -3453,13 +3349,9 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcSourceDesc } } + // TODO: Start player when DTLS done. Removed it because we don't support single PC now. // If DTLS done, start the publisher. Because maybe create some publishers after DTLS done. // For example, for single PC, we maybe start publisher when create it, because DTLS is done. - if(ESTABLISHED == state()) { - if(srs_success != (err = publisher->start())) { - return srs_error_wrap(err, "start publisher"); - } - } return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 1d6970948..8e49eb854 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -52,7 +52,7 @@ class SrsRtcUserConfig; class SrsRtcSendTrack; class SrsRtcPublishStream; class SrsEphemeralDelta; -class SrsRtcNetwork; +class SrsRtcNetworks; class SrsRtcUdpNetwork; class ISrsRtcNetwork; @@ -67,17 +67,6 @@ const uint8_t kRtpFb = 205; const uint8_t kPsFb = 206; const uint8_t kXR = 207; -enum SrsRtcConnectionStateType -{ - // TODO: FIXME: Should prefixed by enum name. - INIT = -1, - WAITING_ANSWER = 1, - WAITING_STUN = 2, - DOING_DTLS_HANDSHAKE = 3, - ESTABLISHED = 4, - CLOSED = 5, -}; - // The transport for RTC connection. class ISrsRtcTransport : public ISrsDtlsCallback { @@ -378,10 +367,8 @@ private: srs_error_t send_rtcp_rr(); srs_error_t send_rtcp_xr_rrtr(); public: - srs_error_t on_rtp(char* buf, int nb_buf); -private: - // @remark We copy the plaintext, user should free it. - srs_error_t on_rtp_plaintext(char* plaintext, int nb_plaintext); + srs_error_t on_rtp_cipher(char* buf, int nb_buf); + srs_error_t on_rtp_plaintext(char* buf, int nb_buf); private: srs_error_t do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuffer* buf); public: @@ -439,7 +426,6 @@ public: bool disposing_; private: SrsRtcServer* server_; - SrsRtcConnectionStateType state_; private: iovec* cache_iov_; SrsBuffer* cache_buffer_; @@ -455,8 +441,8 @@ private: private: // The local:remote username, such as m5x0n128:jvOm where local name is m5x0n128. std::string username_; - // Use one UDP network and one TCP network. - SrsRtcNetwork* network_; + // A group of networks, each has its own DTLS and SRTP context. + SrsRtcNetworks* networks_; private: // TODO: FIXME: Rename it. // The timeout of session, keep alive by STUN ping pong. @@ -491,9 +477,8 @@ public: void set_local_sdp(const SrsSdp& sdp); SrsSdp* get_remote_sdp(); void set_remote_sdp(const SrsSdp& sdp); - // Connection level state machine, for ARQ of UDP packets. - SrsRtcConnectionStateType state(); - void set_state(SrsRtcConnectionStateType state); + // Change network to waiting stun state. + void set_state_as_waiting_stun(); // Get username pair for this connection, used as ID of session. std::string username(); public: @@ -514,10 +499,8 @@ public: public: // Before initialize, user must set the local SDP, which is used to inititlize DTLS. srs_error_t initialize(SrsRequest* r, bool dtls, bool srtp, std::string username); - // The peer address may change, we can identify that by STUN messages. - srs_error_t on_stun(SrsStunPacket* r, char* data, int nb_data); - srs_error_t on_dtls(char* data, int nb_data); - srs_error_t on_rtp(char* data, int nb_data); + srs_error_t on_rtp_cipher(char* data, int nb_data); + srs_error_t on_rtp_plaintext(char* data, int nb_data); private: // Decode the RTP header from buf, find the publisher by SSRC. srs_error_t find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher); @@ -549,8 +532,10 @@ public: srs_error_t do_send_packet(SrsRtpPacket* pkt); // Directly set the status of play track, generally for init to set the default value. void set_all_tracks_status(std::string stream_uri, bool is_publish, bool status); +public: + // Notify by specified network. + srs_error_t on_binding_request(SrsStunPacket* r, std::string& ice_pwd); private: - srs_error_t on_binding_request(SrsStunPacket* r); // publish media capabilitiy negotiate srs_error_t negotiate_publish_capability(SrsRtcUserConfig* ruc, SrsRtcSourceDescription* stream_desc); srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan); diff --git a/trunk/src/app/srs_app_rtc_network.cpp b/trunk/src/app/srs_app_rtc_network.cpp index 9537ad628..d154673d9 100644 --- a/trunk/src/app/srs_app_rtc_network.cpp +++ b/trunk/src/app/srs_app_rtc_network.cpp @@ -6,7 +6,7 @@ #include -#include +#include using namespace std; #include @@ -19,31 +19,31 @@ using namespace std; #include #include #include +#include +#include +#include -ISrsRtcNetwork::ISrsRtcNetwork() -{ -} - -ISrsRtcNetwork::~ISrsRtcNetwork() -{ -} +#ifdef SRS_OSX +// These functions are similar to the older byteorder(3) family of functions. +// For example, be32toh() is identical to ntohl(). +// @see https://linux.die.net/man/3/be32toh +#define be32toh ntohl +#endif -SrsRtcNetwork::SrsRtcNetwork(SrsRtcConnection* conn) +SrsRtcNetworks::SrsRtcNetworks(SrsRtcConnection* conn) { conn_ = conn; - udp_ = new SrsRtcUdpNetwork(this); delta_ = new SrsEphemeralDelta(); + udp_ = new SrsRtcUdpNetwork(conn_, delta_); } -SrsRtcNetwork::~SrsRtcNetwork() +SrsRtcNetworks::~SrsRtcNetworks() { - // Free the UDP network after transport deleted. srs_freep(udp_); - srs_freep(delta_); } -srs_error_t SrsRtcNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool srtp) +srs_error_t SrsRtcNetworks::initialize(SrsSessionConfig* cfg, bool dtls, bool srtp) { srs_error_t err = srs_success; @@ -54,91 +54,40 @@ srs_error_t SrsRtcNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool srt return err; } -srs_error_t SrsRtcNetwork::start_active_handshake() -{ - return udp_->start_active_handshake(); -} - -srs_error_t SrsRtcNetwork::on_dtls(char* data, int nb_data) -{ - return udp_->on_dtls(data, nb_data); -} - -srs_error_t SrsRtcNetwork::on_dtls_alert(std::string type, std::string desc) -{ - return conn_->on_dtls_alert(type, desc); -} - -srs_error_t SrsRtcNetwork::on_connection_established() -{ - return conn_->on_connection_established(); -} - -srs_error_t SrsRtcNetwork::protect_rtp(void* packet, int* nb_cipher) -{ - return udp_->protect_rtp(packet, nb_cipher); -} - -srs_error_t SrsRtcNetwork::protect_rtcp(void* packet, int* nb_cipher) -{ - return udp_->protect_rtcp(packet, nb_cipher); -} - -srs_error_t SrsRtcNetwork::unprotect_rtp(void* packet, int* nb_plaintext) -{ - return udp_->unprotect_rtp(packet, nb_plaintext); -} - -srs_error_t SrsRtcNetwork::unprotect_rtcp(void* packet, int* nb_plaintext) +void SrsRtcNetworks::set_state(SrsRtcNetworkState state) { - return udp_->unprotect_rtcp(packet, nb_plaintext); + udp_->set_state(state); } -srs_error_t SrsRtcNetwork::on_rtcp(char* data, int nb_data) +SrsRtcUdpNetwork* SrsRtcNetworks::udp() { - // Update stat when we received data. - delta_->add_delta(nb_data, 0); - - return conn_->on_rtcp(data, nb_data); -} - -srs_error_t SrsRtcNetwork::on_rtp(char* data, int nb_data) -{ - // Update stat when we received data. - delta_->add_delta(nb_data, 0); - - return conn_->on_rtp(data, nb_data); -} - -string SrsRtcNetwork::get_peer_ip() -{ - return udp_->get_peer_ip(); + return udp_; } -int SrsRtcNetwork::get_peer_port() +ISrsRtcNetwork* SrsRtcNetworks::available() { - return udp_->get_peer_port(); + return udp_; } -SrsRtcUdpNetwork* SrsRtcNetwork::udp() +ISrsKbpsDelta* SrsRtcNetworks::delta() { - return udp_; + return delta_; } -ISrsKbpsDelta* SrsRtcNetwork::delta() +ISrsRtcNetwork::ISrsRtcNetwork() { - return delta_; } -srs_error_t SrsRtcNetwork::write(void* buf, size_t size, ssize_t* nwrite) +ISrsRtcNetwork::~ISrsRtcNetwork() { - return udp_->write(buf, size, nwrite); } -SrsRtcUdpNetwork::SrsRtcUdpNetwork(SrsRtcNetwork* network) +SrsRtcUdpNetwork::SrsRtcUdpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta) { - network_ = network; - sendonly_skt = NULL; + state_ = SrsRtcNetworkStateInit; + conn_ = conn; + delta_ = delta; + sendonly_skt_ = NULL; pp_address_change_ = new SrsErrorPithyPrint(); transport_ = new SrsSecurityTransport(this); } @@ -187,19 +136,31 @@ srs_error_t SrsRtcUdpNetwork::start_active_handshake() srs_error_t SrsRtcUdpNetwork::on_dtls(char* data, int nb_data) { // Update stat when we received data. - network_->delta_->add_delta(nb_data, 0); + delta_->add_delta(nb_data, 0); return transport_->on_dtls(data, nb_data); } srs_error_t SrsRtcUdpNetwork::on_dtls_alert(std::string type, std::string desc) { - return network_->conn_->on_dtls_alert(type, desc); + return conn_->on_dtls_alert(type, desc); } srs_error_t SrsRtcUdpNetwork::on_connection_established() { - return network_->conn_->on_connection_established(); + srs_error_t err = srs_success; + + // If DTLS done packet received many times, such as ARQ, ignore. + if(SrsRtcNetworkStateClosed == state_) { + return err; + } + + if ((err = conn_->on_connection_established()) != srs_success) { + return srs_error_wrap(err, "udp"); + } + + state_ = SrsRtcNetworkStateClosed; + return err; } srs_error_t SrsRtcUdpNetwork::protect_rtp(void* packet, int* nb_cipher) @@ -212,53 +173,86 @@ srs_error_t SrsRtcUdpNetwork::protect_rtcp(void* packet, int* nb_cipher) return transport_->protect_rtcp(packet, nb_cipher); } -srs_error_t SrsRtcUdpNetwork::unprotect_rtp(void* packet, int* nb_plaintext) +srs_error_t SrsRtcUdpNetwork::on_rtcp(char* data, int nb_data) { - return transport_->unprotect_rtp(packet, nb_plaintext); -} + srs_error_t err = srs_success; -srs_error_t SrsRtcUdpNetwork::unprotect_rtcp(void* packet, int* nb_plaintext) -{ // Update stat when we received data. - network_->delta_->add_delta(*nb_plaintext, 0); + delta_->add_delta(nb_data, 0); + + int nb_unprotected_buf = nb_data; + if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "rtcp unprotect"); + } + + char* unprotected_buf = data; + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); + } - return transport_->unprotect_rtcp(packet, nb_plaintext); + if ((err = conn_->on_rtcp(unprotected_buf, nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "cipher=%d", nb_data); + } + + return err; } -srs_error_t SrsRtcUdpNetwork::on_rtcp(char* data, int nb_data) +srs_error_t SrsRtcUdpNetwork::on_rtp(char* data, int nb_data) { + srs_error_t err = srs_success; + // Update stat when we received data. - network_->delta_->add_delta(nb_data, 0); + delta_->add_delta(nb_data, 0); + + if ((err = conn_->on_rtp_cipher(data, nb_data)) != srs_success) { + return srs_error_wrap(err, "cipher=%d", nb_data); + } + + int nb_unprotected_buf = nb_data; + if ((err = transport_->unprotect_rtp(data, &nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "rtp unprotect"); + } + + char* unprotected_buf = data; + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); + } - return network_->conn_->on_rtcp(data, nb_data); + if ((err = conn_->on_rtp_plaintext(unprotected_buf, nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "cipher=%d", nb_data); + } + + return err; } -srs_error_t SrsRtcUdpNetwork::on_rtp(char* data, int nb_data) +void SrsRtcUdpNetwork::set_state(SrsRtcNetworkState state) { - // Update stat when we received data. - network_->delta_->add_delta(nb_data, 0); + if (state_ > state) { + srs_warn("RTC: Ignore setting state=%d, now=%d", state, state_); + return; + } - return network_->conn_->on_rtp(data, nb_data); + state_ = state; } string SrsRtcUdpNetwork::get_peer_ip() { - srs_assert(sendonly_skt); - return sendonly_skt->get_peer_ip(); + srs_assert(sendonly_skt_); + return sendonly_skt_->get_peer_ip(); } int SrsRtcUdpNetwork::get_peer_port() { - srs_assert(sendonly_skt); - return sendonly_skt->get_peer_port(); + srs_assert(sendonly_skt_); + return sendonly_skt_->get_peer_port(); } void SrsRtcUdpNetwork::update_sendonly_socket(SrsUdpMuxSocket* skt) { // TODO: FIXME: Refine performance. string prev_peer_id, peer_id = skt->peer_id(); - if (sendonly_skt) { - prev_peer_id = sendonly_skt->peer_id(); + if (sendonly_skt_) { + prev_peer_id = sendonly_skt_->peer_id(); } // Ignore if same address. @@ -289,33 +283,100 @@ void SrsRtcUdpNetwork::update_sendonly_socket(SrsUdpMuxSocket* skt) // If no cache, build cache and setup the relations in connection. if (!addr_cache) { peer_addresses_[peer_id] = addr_cache = skt->copy_sendonly(); - _srs_rtc_manager->add_with_id(peer_id, network_->conn_); + _srs_rtc_manager->add_with_id(peer_id, conn_); uint64_t fast_id = skt->fast_id(); if (fast_id) { - _srs_rtc_manager->add_with_fast_id(fast_id, network_->conn_); + _srs_rtc_manager->add_with_fast_id(fast_id, conn_); } } // Update the transport. - sendonly_skt = addr_cache; + sendonly_skt_ = addr_cache; +} + +srs_error_t SrsRtcUdpNetwork::on_stun(SrsStunPacket* r, char* data, int nb_data) +{ + srs_error_t err = srs_success; + + // Write STUN messages to blackhole. + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(data, nb_data); + } + + if (!r->is_binding_request()) { + return err; + } + + string ice_pwd; + if ((err = conn_->on_binding_request(r, ice_pwd)) != srs_success) { + return srs_error_wrap(err, "udp"); + } + + if ((err = on_binding_request(r, ice_pwd)) != srs_success) { + return srs_error_wrap(err, "stun binding request failed"); + } + + return err; +} + +srs_error_t SrsRtcUdpNetwork::on_binding_request(SrsStunPacket* r, string ice_pwd) +{ + srs_error_t err = srs_success; + + SrsStunPacket stun_binding_response; + char buf[kRtpPacketSize]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + stun_binding_response.set_message_type(BindingResponse); + stun_binding_response.set_local_ufrag(r->get_remote_ufrag()); + stun_binding_response.set_remote_ufrag(r->get_local_ufrag()); + stun_binding_response.set_transcation_id(r->get_transcation_id()); + // FIXME: inet_addr is deprecated, IPV6 support + stun_binding_response.set_mapped_address(be32toh(inet_addr(get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(get_peer_port()); + + if ((err = stun_binding_response.encode(ice_pwd, stream)) != srs_success) { + return srs_error_wrap(err, "stun binding response encode failed"); + } + + if ((err = write(stream->data(), stream->pos(), NULL)) != srs_success) { + return srs_error_wrap(err, "stun binding response send failed"); + } + + if (state_ == SrsRtcNetworkStateWaitingStun) { + state_ = SrsRtcNetworkStateDtls; + // TODO: FIXME: Add cost. + srs_trace("RTC: session STUN done, waiting DTLS handshake."); + + if((err = start_active_handshake()) != srs_success) { + return srs_error_wrap(err, "fail to dtls handshake"); + } + } + + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(stream->data(), stream->pos()); + } + + return err; } srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite) { // Update stat when we sending data. - network_->delta_->add_delta(0, size); + delta_->add_delta(0, size); if (nwrite) *nwrite = size; - return sendonly_skt->sendto(buf, size, SRS_UTIME_NO_TIMEOUT); + return sendonly_skt_->sendto(buf, size, SRS_UTIME_NO_TIMEOUT); } -SrsRtcTcpConn::SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm) +SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm) { manager_ = cm; ip_ = cip; port_ = port; - skt_ = new SrsTcpConnection(fd); + skt_ = skt; delta_ = new SrsNetworkDelta(); delta_->set_io(skt_, skt_); trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id()); diff --git a/trunk/src/app/srs_app_rtc_network.hpp b/trunk/src/app/srs_app_rtc_network.hpp index b883701e5..b9c935dfe 100644 --- a/trunk/src/app/srs_app_rtc_network.hpp +++ b/trunk/src/app/srs_app_rtc_network.hpp @@ -28,84 +28,94 @@ class ISrsRtcTransport; class SrsEphemeralDelta; class ISrsKbpsDelta; class SrsRtcUdpNetwork; +class ISrsRtcNetwork; -// For DTLS to call network service. -class ISrsRtcNetwork : public ISrsStreamWriter +// The network stat. +enum SrsRtcNetworkState { -public: - ISrsRtcNetwork(); - virtual ~ISrsRtcNetwork(); -public: - // Callback when DTLS connected. - virtual srs_error_t on_connection_established() = 0; - // Callback when DTLS disconnected. - virtual srs_error_t on_dtls_alert(std::string type, std::string desc) = 0; + SrsRtcNetworkStateInit = -1, + SrsRtcNetworkStateWaitingAnswer = 1, + SrsRtcNetworkStateWaitingStun = 2, + SrsRtcNetworkStateDtls = 3, + SrsRtcNetworkStateEstablished = 4, + SrsRtcNetworkStateClosed = 5, }; -// The UDP network, default for WebRTC. -class SrsRtcNetwork : public ISrsRtcNetwork +// A group of networks, each has its own DTLS and SRTP context. +class SrsRtcNetworks { private: - friend class SrsRtcUdpNetwork; + // Network over UDP. + SrsRtcUdpNetwork* udp_; private: // WebRTC session object. SrsRtcConnection* conn_; - // Network over UDP. - SrsRtcUdpNetwork* udp_; // Delta object for statistics. SrsEphemeralDelta* delta_; public: - SrsRtcNetwork(SrsRtcConnection* conn); - virtual ~SrsRtcNetwork(); + SrsRtcNetworks(SrsRtcConnection* conn); + virtual ~SrsRtcNetworks(); // DTLS transport functions. public: srs_error_t initialize(SrsSessionConfig* cfg, bool dtls, bool srtp); - virtual srs_error_t start_active_handshake(); - virtual srs_error_t on_dtls(char* data, int nb_data); - virtual srs_error_t on_dtls_alert(std::string type, std::string desc); - srs_error_t on_connection_established(); - srs_error_t protect_rtp(void* packet, int* nb_cipher); - srs_error_t protect_rtcp(void* packet, int* nb_cipher); - srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); - srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); -// When got data from socket. -public: - srs_error_t on_rtcp(char* data, int nb_data); - srs_error_t on_rtp(char* data, int nb_data); -// Other functions. public: - // ICE reflexive address functions. - std::string get_peer_ip(); - int get_peer_port(); + // Connection level state machine, for ARQ of UDP packets. + void set_state(SrsRtcNetworkState state); // Get the UDP network object. SrsRtcUdpNetwork* udp(); + // Get an available network. + ISrsRtcNetwork* available(); +public: // Get the delta object for statistics. virtual ISrsKbpsDelta* delta(); -// Interface ISrsStreamWriter. +}; + +// For DTLS or Session to call network service. +class ISrsRtcNetwork : public ISrsStreamWriter +{ public: - virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); + ISrsRtcNetwork(); + virtual ~ISrsRtcNetwork(); +public: + // Callback when DTLS connected. + virtual srs_error_t on_connection_established() = 0; + // Callback when DTLS disconnected. + virtual srs_error_t on_dtls_alert(std::string type, std::string desc) = 0; +public: + // Protect RTP packet by SRTP context. + virtual srs_error_t protect_rtp(void* packet, int* nb_cipher) = 0; + // Protect RTCP packet by SRTP context. + virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher) = 0; }; // The WebRTC over UDP network. class SrsRtcUdpNetwork : public ISrsRtcNetwork { private: - SrsRtcNetwork* network_; + // WebRTC session object. + SrsRtcConnection* conn_; + // Delta object for statistics. + SrsEphemeralDelta* delta_; + SrsRtcNetworkState state_; private: // Pithy print for address change, use port as error code. SrsErrorPithyPrint* pp_address_change_; // The peer address, client maybe use more than one address, it's the current selected one. - SrsUdpMuxSocket* sendonly_skt; + SrsUdpMuxSocket* sendonly_skt_; // The address list, client may use multiple addresses. std::map peer_addresses_; // The DTLS transport over this network. ISrsRtcTransport* transport_; public: - SrsRtcUdpNetwork(SrsRtcNetwork* network); + SrsRtcUdpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta); virtual ~SrsRtcUdpNetwork(); public: // Update the UDP connection. void update_sendonly_socket(SrsUdpMuxSocket* skt); + // When got STUN ping message. The peer address may change, we can identify that by STUN messages. + srs_error_t on_stun(SrsStunPacket* r, char* data, int nb_data); +private: + srs_error_t on_binding_request(SrsStunPacket* r, std::string ice_pwd); // DTLS transport functions. public: srs_error_t initialize(SrsSessionConfig* cfg, bool dtls, bool srtp); @@ -115,14 +125,14 @@ public: srs_error_t on_connection_established(); srs_error_t protect_rtp(void* packet, int* nb_cipher); srs_error_t protect_rtcp(void* packet, int* nb_cipher); - srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); - srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); // When got data from socket. public: srs_error_t on_rtcp(char* data, int nb_data); srs_error_t on_rtp(char* data, int nb_data); // Other functions. public: + // Connection level state machine, for ARQ of UDP packets. + void set_state(SrsRtcNetworkState state); // ICE reflexive address functions. std::string get_peer_ip(); int get_peer_port(); @@ -145,9 +155,9 @@ private: // The delta for statistic. SrsNetworkDelta* delta_; // TCP Transport object. - SrsTcpConnection* skt_; + ISrsProtocolReadWriter* skt_; public: - SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm); + SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm); virtual ~SrsRtcTcpConn(); public: ISrsKbpsDelta* delta(); diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 879d148af..3e38771b3 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -419,7 +419,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) session->udp()->update_sendonly_socket(skt); } - return session->on_stun(&ping, data, size); + return session->udp()->on_stun(&ping, data, size); } // For DTLS, RTCP or RTP, which does not support peer address changing. @@ -432,7 +432,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) if (is_rtp_or_rtcp && !is_rtcp) { ++_srs_pps_rrtps->sugar; - err = session->on_rtp(data, size); + err = session->udp()->on_rtp(data, size); if (err != srs_success) { session->switch_to_context(); } @@ -443,12 +443,12 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) if (is_rtp_or_rtcp && is_rtcp) { ++_srs_pps_rrtcps->sugar; - return session->on_rtcp(data, size); + return session->udp()->on_rtcp(data, size); } if (srs_is_dtls((uint8_t*)data, size)) { ++_srs_pps_rstuns->sugar; - return session->on_dtls(data, size); + return session->udp()->on_dtls(data, size); } return srs_error_new(ERROR_RTC_UDP, "unknown packet"); } @@ -546,17 +546,22 @@ srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local // We allows to mock the eip of server. if (true) { - int listen_port = _srs_config->get_rtc_server_listen(); + int udp_port = _srs_config->get_rtc_server_listen(); + int tcp_port = _srs_config->get_rtc_server_tcp_listen(); string protocol = _srs_config->get_rtc_server_protocol(); set candidates = discover_candidates(ruc); for (set::iterator it = candidates.begin(); it != candidates.end(); ++it) { - string hostname; int port = listen_port; - srs_parse_hostport(*it, hostname, port); - if (protocol == "udp" || protocol == "tcp") { - local_sdp.add_candidate(protocol, hostname, port, "host"); + string hostname; + int uport = udp_port; srs_parse_hostport(*it, hostname, uport); + int tport = tcp_port; srs_parse_hostport(*it, hostname, tport); + + if (protocol == "udp") { + local_sdp.add_candidate("udp", hostname, uport, "host"); + } else if (protocol == "tcp") { + local_sdp.add_candidate("tcp", hostname, tport, "host"); } else { - local_sdp.add_candidate("udp", hostname, port, "host"); - local_sdp.add_candidate("tcp", hostname, port, "host"); + local_sdp.add_candidate("udp", hostname, uport, "host"); + local_sdp.add_candidate("tcp", hostname, tport, "host"); } } @@ -585,7 +590,7 @@ srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local session->set_remote_sdp(ruc->remote_sdp_); // We must setup the local SDP, then initialize the session object. session->set_local_sdp(local_sdp); - session->set_state(WAITING_STUN); + session->set_state_as_waiting_stun(); // Before session initialize, we must setup the local SDP. if ((err = session->initialize(req, ruc->dtls_, ruc->srtp_, username)) != srs_success) { diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 023a90fd1..dc49fcb1a 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1444,10 +1444,6 @@ srs_error_t SrsRtmpConn::start() { srs_error_t err = srs_success; - if ((err = skt->initialize()) != srs_success) { - return srs_error_wrap(err, "init socket"); - } - if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "coroutine"); } diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 94f2890b0..d041e8620 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -36,7 +36,10 @@ using namespace std; #include #include #include +#include +#ifdef SRS_RTC #include +#endif std::string srs_listener_type2string(SrsListenerType type) { @@ -533,6 +536,7 @@ SrsServer::SrsServer() http_api_mux = new SrsHttpServeMux(); http_server = new SrsHttpServer(this); reuse_api_over_server_ = false; + reuse_rtc_over_server_ = false; http_heartbeat = new SrsHttpHeartbeat(); ingester = new SrsIngester(); @@ -655,16 +659,35 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch) return srs_error_wrap(err, "handler initialize"); } + bool stream = _srs_config->get_http_stream_enabled(); + string http_listen = _srs_config->get_http_stream_listen(); + string https_listen = _srs_config->get_https_stream_listen(); + +#ifdef SRS_RTC + bool rtc = _srs_config->get_rtc_server_enabled(); + bool rtc_tcp = _srs_config->get_rtc_server_tcp_enabled(); + string rtc_listen = srs_int2str(_srs_config->get_rtc_server_tcp_listen()); + // If enabled and listen is the same value, resue port for WebRTC over TCP. + if (stream && rtc && rtc_tcp && http_listen == rtc_listen) { + srs_trace("WebRTC tcp=%s reuses http=%s server", rtc_listen.c_str(), http_listen.c_str()); + reuse_rtc_over_server_ = true; + } + if (stream && rtc && rtc_tcp && https_listen == rtc_listen) { + srs_trace("WebRTC tcp=%s reuses https=%s server", rtc_listen.c_str(), https_listen.c_str()); + reuse_rtc_over_server_ = true; + } +#endif + // If enabled and the listen is the same value, reuse port. - if (_srs_config->get_http_stream_enabled() && _srs_config->get_http_api_enabled() - && _srs_config->get_http_api_listen() == _srs_config->get_http_stream_listen() - && _srs_config->get_https_api_listen() == _srs_config->get_https_stream_listen() - ) { - srs_trace("API reuse listen to https server at %s", _srs_config->get_https_stream_listen().c_str()); + bool api = _srs_config->get_http_api_enabled(); + string api_listen = _srs_config->get_http_api_listen(); + string apis_listen = _srs_config->get_https_api_listen(); + if (stream && api && api_listen == http_listen && apis_listen == https_listen) { + srs_trace("API reuses http=%s and https=%s server", http_listen.c_str(), https_listen.c_str()); reuse_api_over_server_ = true; } - // If reuse port, use the same object as server. + // Only init HTTP API when not reusing HTTP server. if (!reuse_api_over_server_) { SrsHttpServeMux *api = dynamic_cast(http_api_mux); srs_assert(api); @@ -744,22 +767,26 @@ srs_error_t SrsServer::listen() return srs_error_wrap(err, "stream caster listen"); } - // TODO: FIXME: Refine the listeners. - close_listeners(SrsListenerTcp); - if (_srs_config->get_rtc_server_tcp_enabled()) { - SrsListener* listener = new SrsBufferListener(this, SrsListenerTcp); - listeners.push_back(listener); +#ifdef SRS_RTC + if (!reuse_rtc_over_server_) { + // TODO: FIXME: Refine the listeners. + close_listeners(SrsListenerTcp); + if (_srs_config->get_rtc_server_tcp_enabled()) { + SrsListener* listener = new SrsBufferListener(this, SrsListenerTcp); + listeners.push_back(listener); - std::string ep = srs_int2str(_srs_config->get_rtc_server_tcp_listen()); + std::string ep = srs_int2str(_srs_config->get_rtc_server_tcp_listen()); - std::string ip; - int port; - srs_parse_endpoint(ep, ip, port); + std::string ip; + int port; + srs_parse_endpoint(ep, ip, port); - if ((err = listener->listen(ip, port)) != srs_success) { - return srs_error_wrap(err, "tcp listen %s:%d", ip.c_str(), port); + if ((err = listener->listen(ip, port)) != srs_success) { + return srs_error_wrap(err, "tcp listen %s:%d", ip.c_str(), port); + } } } +#endif if ((err = conn_manager->start()) != srs_success) { return srs_error_wrap(err, "connection manager"); @@ -1376,11 +1403,13 @@ void SrsServer::resample_kbps() continue; } +#ifdef SRS_RTC SrsRtcTcpConn* tcp = dynamic_cast(c); if (tcp) { stat->kbps_add_delta(c->get_id().c_str(), tcp->delta()); continue; } +#endif // Impossible path, because we only create these connections above. srs_assert(false); @@ -1397,7 +1426,6 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) ISrsResource* resource = NULL; if ((err = fd_to_resource(type, stfd, &resource)) != srs_success) { - //close fd on conn error, otherwise will lead to fd leak -gs srs_close_stfd(stfd); if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) { srs_error_reset(err); @@ -1405,7 +1433,11 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) } return srs_error_wrap(err, "fd to resource"); } - srs_assert(resource); + + // Ignore if no resource found. + if (!resource) { + return err; + } // directly enqueue, the cycle thread will remove the client. conn_manager->add(resource); @@ -1423,7 +1455,7 @@ ISrsHttpServeMux* SrsServer::api_server() return http_api_mux; } -srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr) +srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, ISrsResource** pr) { srs_error_t err = srs_success; @@ -1462,24 +1494,56 @@ srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, IS } } + // We will free the stfd from now on. + srs_netfd_t fd2 = stfd; + stfd = NULL; + // The context id may change during creating the bellow objects. SrsContextRestore(_srs_context->get_id()); + +#ifdef SRS_RTC + // If reuse HTTP server with WebRTC TCP, peek to detect the client. + if (reuse_rtc_over_server_ && (type == SrsListenerHttpStream || type == SrsListenerHttpsStream)) { + SrsTcpConnection* skt = new SrsTcpConnection(fd2); + SrsBufferedReader* io = new SrsBufferedReader(skt); + + uint8_t b[10]; int nn = sizeof(b); + if ((err = io->peek((char*)b, &nn)) != srs_success) { + srs_freep(io); srs_freep(skt); + return srs_error_wrap(err, "peek"); + } + + // If first message is BindingRequest(00 01), prefixed with length(2B), it's WebRTC client. Generally, the frame + // length minus message length should be 20, that is the header size of STUN is 20 bytes. For example: + // 00 6c # Frame length: 0x006c = 108 + // 00 01 # Message Type: Binding Request(0x0001) + // 00 58 # Message Length: 0x005 = 88 + // 21 12 a4 42 # Message Cookie: 0x2112a442 + // 48 32 6c 61 6b 42 35 71 42 35 4a 71 # Message Transaction ID: 12 bytes + if (nn == 10 && b[0] == 0 && b[2] == 0 && b[3] == 1 && b[1] - b[5] == 20 + && b[6] == 0x21 && b[7] == 0x12 && b[8] == 0xa4 && b[9] == 0x42 + ) { + *pr = new SrsRtcTcpConn(io, ip, port, this); + } else { + *pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, io, http_server, ip, port); + } + return err; + } +#endif if (type == SrsListenerRtmpStream) { - *pr = new SrsRtmpConn(this, stfd, ip, port); - } else if (type == SrsListenerHttpApi) { - *pr = new SrsHttpxConn(false, this, stfd, http_api_mux, ip, port); - } else if (type == SrsListenerHttpsApi) { - *pr = new SrsHttpxConn(true, this, stfd, http_api_mux, ip, port); - } else if (type == SrsListenerHttpStream) { - *pr = new SrsHttpxConn(false, this, stfd, http_server, ip, port); - } else if (type == SrsListenerHttpsStream) { - *pr = new SrsHttpxConn(true, this, stfd, http_server, ip, port); + *pr = new SrsRtmpConn(this, fd2, ip, port); + } else if (type == SrsListenerHttpApi || type == SrsListenerHttpsApi) { + *pr = new SrsHttpxConn(type == SrsListenerHttpsApi, this, new SrsTcpConnection(fd2), http_api_mux, ip, port); + } else if (type == SrsListenerHttpStream || type == SrsListenerHttpsStream) { + *pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, new SrsTcpConnection(fd2), http_server, ip, port); +#ifdef SRS_RTC } else if (type == SrsListenerTcp) { - *pr = new SrsRtcTcpConn(stfd, ip, port, this); + *pr = new SrsRtcTcpConn(new SrsTcpConnection(fd2), ip, port, this); +#endif } else { srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port); - srs_close_stfd(stfd); + srs_close_stfd(fd2); return err; } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 723f84bd3..e3df09a67 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -203,8 +203,10 @@ private: // TODO: FIXME: Extract an HttpApiServer. ISrsHttpServeMux* http_api_mux; SrsHttpServer* http_server; - // If reuse, HTTP API use the same port of HTTP server. + // If reusing, HTTP API use the same port of HTTP server. bool reuse_api_over_server_; + // If reusing, WebRTC TCP use the same port of HTTP server. + bool reuse_rtc_over_server_; private: SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; @@ -313,7 +315,7 @@ public: // TODO: FIXME: Fetch from hybrid server manager. virtual ISrsHttpServeMux* api_server(); private: - virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr); + virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, ISrsResource** pr); // Interface ISrsResourceManager public: // A callback for connection to remove itself. diff --git a/trunk/src/core/srs_core_version5.hpp b/trunk/src/core/srs_core_version5.hpp index 6960338f3..2dfa82d4f 100644 --- a/trunk/src/core/srs_core_version5.hpp +++ b/trunk/src/core/srs_core_version5.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 5 #define VERSION_MINOR 0 -#define VERSION_REVISION 59 +#define VERSION_REVISION 60 #endif diff --git a/trunk/src/protocol/srs_protocol_http_conn.cpp b/trunk/src/protocol/srs_protocol_http_conn.cpp index c78dc1daa..ad832233c 100644 --- a/trunk/src/protocol/srs_protocol_http_conn.cpp +++ b/trunk/src/protocol/srs_protocol_http_conn.cpp @@ -220,7 +220,7 @@ int SrsHttpParser::on_url(http_parser* parser, const char* at, size_t length) { SrsHttpParser* obj = (SrsHttpParser*)parser->data; srs_assert(obj); - + if (length > 0) { // Note that this function might be called for multiple times, and we got pieces of content. obj->url.append(at, (int)length); diff --git a/trunk/src/protocol/srs_protocol_st.cpp b/trunk/src/protocol/srs_protocol_st.cpp index 73d2ed10a..32d320f6f 100644 --- a/trunk/src/protocol/srs_protocol_st.cpp +++ b/trunk/src/protocol/srs_protocol_st.cpp @@ -469,19 +469,23 @@ bool srs_is_never_timeout(srs_utime_t tm) SrsStSocket::SrsStSocket() { - stfd = NULL; - stm = rtm = SRS_UTIME_NO_TIMEOUT; - rbytes = sbytes = 0; + init(NULL); +} + +SrsStSocket::SrsStSocket(srs_netfd_t fd) +{ + init(fd); } SrsStSocket::~SrsStSocket() { } -srs_error_t SrsStSocket::initialize(srs_netfd_t fd) +void SrsStSocket::init(srs_netfd_t fd) { - stfd = fd; - return srs_success; + stfd_ = fd; + stm = rtm = SRS_UTIME_NO_TIMEOUT; + rbytes = sbytes = 0; } void SrsStSocket::set_recv_timeout(srs_utime_t tm) @@ -517,12 +521,14 @@ int64_t SrsStSocket::get_send_bytes() srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; - + + srs_assert(stfd_); + ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { - nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); + nb_read = st_read((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_read = st_read((st_netfd_t)stfd, buf, size, rtm); + nb_read = st_read((st_netfd_t)stfd_, buf, size, rtm); } if (nread) { @@ -552,12 +558,14 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; + + srs_assert(stfd_); ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { - nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); + nb_read = st_read_fully((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm); + nb_read = st_read_fully((st_netfd_t)stfd_, buf, size, rtm); } if (nread) { @@ -576,7 +584,7 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) errno = ECONNRESET; } - return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully"); + return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully, size=%d, nn=%d", size, nb_read); } rbytes += nb_read; @@ -587,12 +595,14 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) { srs_error_t err = srs_success; + + srs_assert(stfd_); ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { - nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); + nb_write = st_write((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_write = st_write((st_netfd_t)stfd, buf, size, stm); + nb_write = st_write((st_netfd_t)stfd_, buf, size, stm); } if (nwrite) { @@ -617,12 +627,14 @@ srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { srs_error_t err = srs_success; + + srs_assert(stfd_); ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { - nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); + nb_write = st_writev((st_netfd_t)stfd_, iov, iov_size, ST_UTIME_NO_TIMEOUT); } else { - nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm); + nb_write = st_writev((st_netfd_t)stfd_, iov, iov_size, stm); } if (nwrite) { @@ -646,7 +658,7 @@ srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) { - stfd = NULL; + stfd_ = NULL; io = new SrsStSocket(); host = h; @@ -656,37 +668,26 @@ SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) SrsTcpClient::~SrsTcpClient() { - close(); - srs_freep(io); + srs_close_stfd(stfd_); } srs_error_t SrsTcpClient::connect() { srs_error_t err = srs_success; - close(); - - srs_assert(stfd == NULL); + srs_netfd_t stfd = NULL; if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) { return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout)); } - - if ((err = io->initialize(stfd)) != srs_success) { - return srs_error_wrap(err, "tcp: init socket object"); - } - - return err; -} -void SrsTcpClient::close() -{ - // Ignore when already closed. - if (!io) { - return; - } + srs_freep(io); + io = new SrsStSocket(stfd); + + srs_close_stfd(stfd_); + stfd_ = stfd; - srs_close_stfd(stfd); + return err; } void SrsTcpClient::set_recv_timeout(srs_utime_t tm) diff --git a/trunk/src/protocol/srs_protocol_st.hpp b/trunk/src/protocol/srs_protocol_st.hpp index 5603f02e4..ae409a823 100644 --- a/trunk/src/protocol/srs_protocol_st.hpp +++ b/trunk/src/protocol/srs_protocol_st.hpp @@ -127,13 +127,13 @@ private: int64_t rbytes; int64_t sbytes; // The underlayer st fd. - srs_netfd_t stfd; + srs_netfd_t stfd_; public: SrsStSocket(); + SrsStSocket(srs_netfd_t fd); virtual ~SrsStSocket(); -public: - // Initialize the socket with stfd, user must manage it. - virtual srs_error_t initialize(srs_netfd_t fd); +private: + void init(srs_netfd_t fd); public: virtual void set_recv_timeout(srs_utime_t tm); virtual srs_utime_t get_recv_timeout(); @@ -161,7 +161,7 @@ public: class SrsTcpClient : public ISrsProtocolReadWriter { private: - srs_netfd_t stfd; + srs_netfd_t stfd_; SrsStSocket* io; private: std::string host; @@ -179,10 +179,6 @@ public: // Connect to server over TCP. // @remark We will close the exists connection before do connect. virtual srs_error_t connect(); -private: - // Close the connection to server. - // @remark User should never use the client when close it. - virtual void close(); // Interface ISrsProtocolReadWriter public: virtual void set_recv_timeout(srs_utime_t tm); diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index dbd09742c..3b564dbe1 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -113,12 +113,11 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); HELPER_EXPECT_SUCCESS(c.write((void*)"Hello", 5, NULL)); @@ -135,12 +134,11 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); HELPER_EXPECT_SUCCESS(c.write((void*)"Hello", 5, NULL)); HELPER_EXPECT_SUCCESS(c.write((void*)" ", 1, NULL)); @@ -159,12 +157,11 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); HELPER_EXPECT_SUCCESS(c.write((void*)"Hello SRS", 9, NULL)); EXPECT_EQ(9, c.get_send_bytes()); @@ -194,12 +191,11 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); skt.set_recv_timeout(1 * SRS_UTIME_MILLISECONDS); char buf[16] = {0}; @@ -216,12 +212,11 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); skt.set_recv_timeout(1 * SRS_UTIME_MILLISECONDS); char buf[16] = {0}; @@ -238,12 +233,11 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); skt.set_recv_timeout(1 * SRS_UTIME_MILLISECONDS); HELPER_EXPECT_SUCCESS(c.write((void*)"Hello", 5, NULL)); @@ -418,12 +412,11 @@ VOID TEST(TCPServerTest, WritevIOVC) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); iovec iovs[3]; iovs[0].iov_base = (void*)"H"; @@ -448,12 +441,11 @@ VOID TEST(TCPServerTest, WritevIOVC) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); iovec iovs[3]; iovs[0].iov_base = (void*)"H"; @@ -1012,11 +1004,7 @@ public: virtual srs_error_t do_cycle(srs_netfd_t cfd) { srs_error_t err = srs_success; - SrsStSocket skt; - if ((err = skt.initialize(cfd)) != srs_success) { - return err; - } - + SrsStSocket skt(cfd); skt.set_recv_timeout(1 * SRS_UTIME_SECONDS); skt.set_send_timeout(1 * SRS_UTIME_SECONDS); @@ -1085,7 +1073,7 @@ VOID TEST(TCPServerTest, TCPClientServer) HELPER_ASSERT_SUCCESS(c.write((void*)"Hello", 5, NULL)); char buf[6]; HELPER_ARRAY_INIT(buf, 6, 0); - ASSERT_EQ(5, srs_read(c.stfd, buf, 5, 1*SRS_UTIME_SECONDS)); + ASSERT_EQ(5, srs_read(c.stfd_, buf, 5, 1*SRS_UTIME_SECONDS)); EXPECT_STREQ("Hello", buf); } } @@ -1263,11 +1251,7 @@ public: virtual srs_error_t do_cycle(srs_netfd_t cfd) { srs_error_t err = srs_success; - SrsStSocket skt; - if ((err = skt.initialize(cfd)) != srs_success) { - return err; - } - + SrsStSocket skt(cfd); skt.set_recv_timeout(1 * SRS_UTIME_SECONDS); skt.set_send_timeout(1 * SRS_UTIME_SECONDS);