WebRTC: Extract SrsRtcNetwork layer for UDP/TCP.

pull/3166/head
winlin 2 years ago
parent 770d959148
commit 625069af7f

@ -116,6 +116,7 @@ auto_reload_for_docker on;
# the rtmp listen ports, split by space, each listen entry is <[ip:]port>
# for example, 192.168.1.100:1935 10.10.10.100:1935
# where the ip is optional, default to 0.0.0.0, that is 1935 equals to 0.0.0.0:1935
# Overwrite by env SRS_LISTEN
listen 1935;
# the default chunk size is 128, max is 65536,
# some client does not support chunk size change,
@ -135,12 +136,14 @@ chunk_size 60000;
# where the cli can only be used in shell/terminate.
http_api {
# whether http api is enabled.
# Overwrite by env SRS_HTTP_API_ENABLED
# default: off
enabled on;
# The http api listen entry is <[ip:]port>, For example, 192.168.1.100:8080, where the ip is optional, default to
# 0.0.0.0, that is 8080 equals to 0.0.0.0:8080.
# Note that you're able to use a dedicated port for HTTP API, such as 1985, to be different with HTTP server. In
# this situation, you you must also set another HTTPS API port.
# Overwrite by env SRS_HTTP_API_LISTEN
# Default: 1985
listen 8080;
# whether enable crossdomain request.
@ -164,11 +167,13 @@ http_api {
# For https_api or HTTPS API.
https {
# Whether enable HTTPS API.
# Overwrite by env SRS_HTTP_API_HTTPS_ENABLED
# default: off
enabled on;
# The listen endpoint for HTTPS API.
# Note that you're able to use a dedicated port for HTTPS API, such as 1990, and the HTTP API should not be
# the same of HTTP server(8080) neither.
# Overwrite by env SRS_HTTP_API_HTTPS_LISTEN
# Default: 1990
listen 8088;
# The SSL private key file, generated by:
@ -193,12 +198,14 @@ http_api {
# need to open the feature http of vhost.
http_server {
# whether http streaming service is enabled.
# Overwrite by env SRS_HTTP_SERVER_ENABLED
# default: off
enabled on;
# the http streaming listen entry is <[ip:]port>
# for example, 192.168.1.100:8080
# where the ip is optional, default to 0.0.0.0, that is 8080 equals to 0.0.0.0:8080
# @remark, if use lower port, for instance 80, user must start srs by root.
# Overwrite by env SRS_HTTP_SERVER_LISTEN
# default: 8080
listen 8080;
# the default dir for http root.
@ -211,9 +218,11 @@ http_server {
# For https_server or HTTPS Streaming.
https {
# Whether enable HTTPS Streaming.
# Overwrite by env SRS_HTTP_SERVER_HTTTPS_ENABLED
# default: off
enabled on;
# The listen endpoint for HTTPS Streaming.
# Overwrite by env SRS_HTTP_SERVER_HTTTPS_LISTEN
# default: 8088
listen 8088;
# The SSL private key file, generated by:
@ -297,9 +306,11 @@ vhost srt.vhost.srs.com {
#############################################################################################
rtc_server {
# Whether enable WebRTC server.
# Overwrite by env SRS_RTC_SERVER_ENABLED
# default: off
enabled on;
# The udp listen port, we will reuse it for connections.
# Overwrite by env SRS_RTC_SERVER_LISTEN
# default: 8000
listen 8000;
# For WebRTC over TCP directly, not TURN, see https://github.com/ossrs/srs/issues/2852
@ -329,11 +340,13 @@ rtc_server {
# x.x.x.x A specified IP address or DNS name, use * if 0.0.0.0.
# @remark For Firefox, the candidate MUST be IP, MUST NOT be DNS name, see https://bugzilla.mozilla.org/show_bug.cgi?id=1239006
# @see https://ossrs.net/lts/zh-cn/docs/v4/doc/webrtc#config-candidate
# Overwrite by env SRS_RTC_SERVER_CANDIDATE
# default: *
candidate *;
# If candidate is * or 0.0.0.0, means SRS could detect IP automatically, filtered by ip_family.
# You can config this to off to disable the detecting, then SRS will try to parse the API hostname.
# Note that browser might fail if no CANDIDATE specified.
# Overwrite by env SRS_RTC_SERVER_USE_AUTO_DETECT_NETWORK_IP
# Default: on
use_auto_detect_network_ip on;
# The IP family filter for auto discover candidate, it can be:
@ -341,20 +354,24 @@ rtc_server {
# ipv6 Filter IP v6 candidates.
# all Filter all IP v4 or v6 candidates.
# For example, if set to ipv4, we only use the IPv4 address as candidate.
# Overwrite by env SRS_RTC_SERVER_IP_FAMILY
# default: ipv4
ip_family ipv4;
# If api_as_candidates is on, SRS would try to use the IP of api server, specified by srs.sdk.js request:
# api:string "http://r.ossrs.net:1985/rtc/v1/play/"
# in this case, the r.ossrs.net and 39.107.238.185 will be added as candidates.
# Overwrite by env SRS_RTC_SERVER_API_AS_CANDIDATES
# Default: on
api_as_candidates on;
# If use api as CANDIDATE, whether resolve the api hostname.
# Note that use original domain name as CANDIDATE, which might make Firefox failed, see https://bugzilla.mozilla.org/show_bug.cgi?id=1239006
# Note that if hostname is IPv4 address, always directly use it.
# Overwrite by env SRS_RTC_SERVER_RESOLVE_API_DOMAIN
# Default: on
resolve_api_domain on;
# If use api as CANDIDATE, whether keep original api domain name as CANDIDATE.
# Note that use original domain name as CANDIDATE, which might make Firefox failed, see https://bugzilla.mozilla.org/show_bug.cgi?id=1239006
# Overwrite by env SRS_RTC_SERVER_KEEP_API_DOMAIN
# Default: off
keep_api_domain off;
# Whether use ECDSA certificate.
@ -390,6 +407,7 @@ rtc_server {
vhost rtc.vhost.srs.com {
rtc {
# Whether enable WebRTC server.
# Overwrite by env SRS_VHOST_RTC_ENABLED for all vhosts.
# default: off
enabled on;
# Whether support NACK.
@ -420,6 +438,7 @@ vhost rtc.vhost.srs.com {
###############################################################
# Whether enable transmuxing RTMP to RTC.
# If enabled, transcode aac to opus.
# Overwrite by env SRS_VHOST_RTC_RTMP_TO_RTC for all vhosts.
# default: off
rtmp_to_rtc off;
# Whether keep B-frame, which is normal feature in live streaming,
@ -428,6 +447,7 @@ vhost rtc.vhost.srs.com {
keep_bframe off;
###############################################################
# Whether enable transmuxing RTC to RTMP.
# Overwrite by env SRS_VHOST_RTC_RTC_TO_RTMP for all vhosts.
# Default: off
rtc_to_rtmp off;
# The PLI interval in seconds, for RTC to RTMP.

4
trunk/configure vendored

@ -260,7 +260,7 @@ MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_sourc
"srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_edge"
"srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static"
"srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds"
"srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call" "srs_app_rtc_network"
"srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call"
"srs_app_caster_flv" "srs_app_latest_version" "srs_app_uuid" "srs_app_process" "srs_app_ng_exec"
"srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr"
"srs_app_coworkers" "srs_app_hybrid" "srs_app_threads")
@ -268,7 +268,7 @@ if [[ $SRS_SRT == YES ]]; then
MODULE_FILES+=("srs_app_srt_server" "srs_app_srt_listener" "srs_app_srt_conn" "srs_app_srt_utility" "srs_app_srt_source")
fi
if [[ $SRS_RTC == YES ]]; then
MODULE_FILES+=("srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_sdp"
MODULE_FILES+=("srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_sdp" "srs_app_rtc_network"
"srs_app_rtc_queue" "srs_app_rtc_server" "srs_app_rtc_source" "srs_app_rtc_api")
fi
if [[ $SRS_FFMPEG_FIT == YES ]]; then

@ -7,6 +7,7 @@ The changelog for SRS.
## SRS 5.0 Changelog
* v5.0, 2022-09-04, Fix [#2852](https://github.com/ossrs/srs/issues/2852): WebRTC: WebRTC over TCP directly, not TURN. v5.0.60
* v5.0, 2022-09-01, Fix [#1405](https://github.com/ossrs/srs/issues/1405): Restore the stream when parsing failed. v5.0.59
* v5.0, 2022-09-01, Fix [#1405](https://github.com/ossrs/srs/issues/1405): Support guessing IBMF first. v5.0.58
* v5.0, 2022-09-01, ST: Define and use a new jmpbuf. v5.0.57

@ -290,10 +290,6 @@ srs_error_t SrsDynamicHttpConn::start()
return srs_error_wrap(err, "set cors=%d", v);
}
if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}
return conn->start();
}

@ -2868,6 +2868,12 @@ int SrsConfig::get_max_connections()
vector<string> SrsConfig::get_listens()
{
std::vector<string> ports;
// SRS_OVERWRITE_BY_ENV_STRING("SRS_LISTEN")
if (getenv("SRS_LISTEN")) {
ports.push_back(getenv("SRS_LISTEN"));
return ports;
}
SrsConfDirective* conf = root->get("listen");
if (!conf) {
@ -3575,6 +3581,8 @@ bool SrsConfig::get_rtc_server_enabled()
bool SrsConfig::get_rtc_server_enabled(SrsConfDirective* conf)
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_RTC_SERVER_ENABLED");
static bool DEFAULT = false;
if (!conf) {
@ -3591,6 +3599,8 @@ bool SrsConfig::get_rtc_server_enabled(SrsConfDirective* conf)
int SrsConfig::get_rtc_server_listen()
{
SRS_OVERWRITE_BY_ENV_INT("SRS_RTC_SERVER_LISTEN");
static int DEFAULT = 8000;
SrsConfDirective* conf = root->get("rtc_server");
@ -3608,6 +3618,8 @@ int SrsConfig::get_rtc_server_listen()
std::string SrsConfig::get_rtc_server_candidates()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_RTC_SERVER_CANDIDATE");
static string DEFAULT = "*";
SrsConfDirective* conf = root->get("rtc_server");
@ -3635,6 +3647,8 @@ std::string SrsConfig::get_rtc_server_candidates()
bool SrsConfig::get_api_as_candidates()
{
SRS_OVERWRITE_BY_ENV_BOOL2("SRS_RTC_SERVER_API_AS_CANDIDATES");
static bool DEFAULT = true;
SrsConfDirective* conf = root->get("rtc_server");
@ -3652,6 +3666,8 @@ bool SrsConfig::get_api_as_candidates()
bool SrsConfig::get_resolve_api_domain()
{
SRS_OVERWRITE_BY_ENV_BOOL2("SRS_RTC_SERVER_RESOLVE_API_DOMAIN");
static bool DEFAULT = true;
SrsConfDirective* conf = root->get("rtc_server");
@ -3669,6 +3685,8 @@ bool SrsConfig::get_resolve_api_domain()
bool SrsConfig::get_keep_api_domain()
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_RTC_SERVER_KEEP_API_DOMAIN");
static bool DEFAULT = false;
SrsConfDirective* conf = root->get("rtc_server");
@ -3686,6 +3704,8 @@ bool SrsConfig::get_keep_api_domain()
bool SrsConfig::get_use_auto_detect_network_ip()
{
SRS_OVERWRITE_BY_ENV_BOOL2("SRS_RTC_SERVER_USE_AUTO_DETECT_NETWORK_IP");
static bool DEFAULT = true;
SrsConfDirective* conf = root->get("rtc_server");
@ -3770,6 +3790,8 @@ std::string SrsConfig::get_rtc_server_protocol()
std::string SrsConfig::get_rtc_server_ip_family()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_RTC_SERVER_IP_FAMILY");
static string DEFAULT = "ipv4";
SrsConfDirective* conf = root->get("rtc_server");
@ -3919,6 +3941,8 @@ SrsConfDirective* SrsConfig::get_rtc(string vhost)
bool SrsConfig::get_rtc_enabled(string vhost)
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_VHOST_RTC_ENABLED");
static bool DEFAULT = false;
SrsConfDirective* conf = get_rtc(vhost);
@ -3955,6 +3979,8 @@ bool SrsConfig::get_rtc_keep_bframe(string vhost)
bool SrsConfig::get_rtc_from_rtmp(string vhost)
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_VHOST_RTC_RTMP_TO_RTC");
static bool DEFAULT = false;
SrsConfDirective* conf = get_rtc(vhost);
@ -4062,6 +4088,8 @@ int SrsConfig::get_rtc_drop_for_pt(string vhost)
bool SrsConfig::get_rtc_to_rtmp(string vhost)
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_VHOST_RTC_RTC_TO_RTMP");
static bool DEFAULT = false;
SrsConfDirective* conf = get_rtc(vhost);
@ -6686,6 +6714,8 @@ bool SrsConfig::get_http_api_enabled()
bool SrsConfig::get_http_api_enabled(SrsConfDirective* conf)
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_HTTP_API_ENABLED");
static bool DEFAULT = false;
if (!conf) {
@ -6702,6 +6732,8 @@ bool SrsConfig::get_http_api_enabled(SrsConfDirective* conf)
string SrsConfig::get_http_api_listen()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_HTTP_API_LISTEN");
static string DEFAULT = "1985";
SrsConfDirective* conf = root->get("http_api");
@ -6803,6 +6835,8 @@ SrsConfDirective* SrsConfig::get_https_api()
bool SrsConfig::get_https_api_enabled()
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_HTTP_API_HTTPS_ENABLED");
static bool DEFAULT = false;
SrsConfDirective* conf = get_https_api();
@ -6820,6 +6854,8 @@ bool SrsConfig::get_https_api_enabled()
string SrsConfig::get_https_api_listen()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_HTTP_API_HTTPS_LISTEN");
#ifdef SRS_UTEST
// We should not use static default, because we need to reset for different testcase.
string DEFAULT = "";
@ -7178,6 +7214,8 @@ bool SrsConfig::get_http_stream_enabled()
bool SrsConfig::get_http_stream_enabled(SrsConfDirective* conf)
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_HTTP_SERVER_ENABLED");
static bool DEFAULT = false;
if (!conf) {
@ -7194,6 +7232,8 @@ bool SrsConfig::get_http_stream_enabled(SrsConfDirective* conf)
string SrsConfig::get_http_stream_listen()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_HTTP_SERVER_LISTEN");
static string DEFAULT = "8080";
SrsConfDirective* conf = root->get("http_server");
@ -7255,6 +7295,8 @@ SrsConfDirective* SrsConfig::get_https_stream()
bool SrsConfig::get_https_stream_enabled()
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_HTTP_SERVER_HTTTPS_ENABLED");
static bool DEFAULT = false;
SrsConfDirective* conf = get_https_stream();
@ -7272,6 +7314,8 @@ bool SrsConfig::get_https_stream_enabled()
string SrsConfig::get_https_stream_listen()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_HTTP_SERVER_HTTTPS_LISTEN");
static string DEFAULT = "8088";
SrsConfDirective* conf = get_https_stream();

@ -18,7 +18,7 @@ using namespace std;
#include <srs_app_log.hpp>
#include <srs_app_config.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_protocol_kbps.hpp>
SrsPps* _srs_pps_ids = NULL;
@ -417,7 +417,7 @@ ISrsExpire::~ISrsExpire()
SrsTcpConnection::SrsTcpConnection(srs_netfd_t c)
{
stfd = c;
skt = new SrsStSocket();
skt = new SrsStSocket(c);
}
SrsTcpConnection::~SrsTcpConnection()
@ -426,17 +426,6 @@ SrsTcpConnection::~SrsTcpConnection()
srs_close_stfd(stfd);
}
srs_error_t SrsTcpConnection::initialize()
{
srs_error_t err = srs_success;
if ((err = skt->initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "init socket");
}
return err;
}
srs_error_t SrsTcpConnection::set_tcp_nodelay(bool v)
{
srs_error_t err = srs_success;
@ -570,6 +559,130 @@ srs_error_t SrsTcpConnection::writev(const iovec *iov, int iov_size, ssize_t* nw
return skt->writev(iov, iov_size, nwrite);
}
SrsBufferedReader::SrsBufferedReader(ISrsProtocolReadWriter* io)
{
io_ = io;
buf_ = NULL;
}
SrsBufferedReader::~SrsBufferedReader()
{
srs_freep(buf_);
}
srs_error_t SrsBufferedReader::peek(char* buf, int* size)
{
srs_error_t err = srs_success;
if ((err = reload_buffer()) != srs_success) {
return srs_error_wrap(err, "reload buffer");
}
int nn = srs_min(buf_->left(), *size);
*size = nn;
if (nn) {
memcpy(buf, buf_->head(), nn);
}
return err;
}
srs_error_t SrsBufferedReader::reload_buffer()
{
srs_error_t err = srs_success;
if (buf_ && !buf_->empty()) {
return err;
}
// We use read_fully to always full fill the cache, to avoid peeking failed.
ssize_t nread = 0;
if ((err = io_->read_fully(cache_, sizeof(cache_), &nread)) != srs_success) {
return srs_error_wrap(err, "read");
}
srs_freep(buf_);
buf_ = new SrsBuffer(cache_, nread);
return err;
}
srs_error_t SrsBufferedReader::read(void* buf, size_t size, ssize_t* nread)
{
if (!buf_ || buf_->empty()) {
return io_->read(buf, size, nread);
}
int nn = srs_min(buf_->left(), size);
*nread = nn;
if (nn) {
buf_->read_bytes((char*)buf, nn);
}
return srs_success;
}
srs_error_t SrsBufferedReader::read_fully(void* buf, size_t size, ssize_t* nread)
{
if (!buf_ || buf_->empty()) {
return io_->read_fully(buf, size, nread);
}
int nn = srs_min(buf_->left(), size);
if (nn) {
buf_->read_bytes((char*)buf, nn);
}
int left = size - nn;
*nread = size;
if (left) {
return io_->read_fully((char*)buf + nn, left, NULL);
}
return srs_success;
}
void SrsBufferedReader::set_recv_timeout(srs_utime_t tm)
{
return io_->set_recv_timeout(tm);
}
srs_utime_t SrsBufferedReader::get_recv_timeout()
{
return io_->get_recv_timeout();
}
int64_t SrsBufferedReader::get_recv_bytes()
{
return io_->get_recv_bytes();
}
int64_t SrsBufferedReader::get_send_bytes()
{
return io_->get_send_bytes();
}
void SrsBufferedReader::set_send_timeout(srs_utime_t tm)
{
return io_->set_send_timeout(tm);
}
srs_utime_t SrsBufferedReader::get_send_timeout()
{
return io_->get_send_timeout();
}
srs_error_t SrsBufferedReader::write(void* buf, size_t size, ssize_t* nwrite)
{
return io_->write(buf, size, nwrite);
}
srs_error_t SrsBufferedReader::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
return io_->writev(iov, iov_size, nwrite);
}
SrsSslConnection::SrsSslConnection(ISrsProtocolReadWriter* c)
{
transport = c;

@ -21,6 +21,7 @@
#include <srs_protocol_conn.hpp>
class SrsWallClock;
class SrsBuffer;
// Hooks for connection manager, to handle the event when disposing connections.
class ISrsDisposingHandler
@ -148,8 +149,6 @@ private:
public:
SrsTcpConnection(srs_netfd_t c);
virtual ~SrsTcpConnection();
public:
virtual srs_error_t initialize();
public:
// Set socket option TCP_NODELAY.
virtual srs_error_t set_tcp_nodelay(bool v);
@ -169,6 +168,40 @@ public:
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
// With a small fast read buffer, to support peek for protocol detecting. Note that directly write to io without any
// cache or buffer.
class SrsBufferedReader : public ISrsProtocolReadWriter
{
private:
// The under-layer transport.
ISrsProtocolReadWriter* io_;
// Fixed, small and fast buffer. Note that it must be very small piece of cache, make sure matches all protocols,
// because we will full fill it when peeking.
char cache_[16];
// Current reading position.
SrsBuffer* buf_;
public:
SrsBufferedReader(ISrsProtocolReadWriter* io);
virtual ~SrsBufferedReader();
public:
// Peek the head of cache to buf in size of bytes.
srs_error_t peek(char* buf, int* size);
private:
srs_error_t reload_buffer();
// Interface ISrsProtocolReadWriter
public:
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
// The SSL connection over TCP transport, in server mode.
class SrsSslConnection : public ISrsProtocolReadWriter
{

@ -285,21 +285,21 @@ void SrsHttpConn::expire()
trd->interrupt();
}
SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port)
SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, ISrsProtocolReadWriter* io, ISrsHttpServeMux* m, string cip, int port)
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());
io_ = io;
manager = cm;
skt = new SrsTcpConnection(fd);
enable_stat_ = false;
if (https) {
ssl = new SrsSslConnection(skt);
ssl = new SrsSslConnection(io_);
conn = new SrsHttpConn(this, ssl, m, cip, port);
} else {
ssl = NULL;
conn = new SrsHttpConn(this, skt, m, cip, port);
conn = new SrsHttpConn(this, io_, m, cip, port);
}
_srs_config->subscribe(this);
@ -311,7 +311,7 @@ SrsHttpxConn::~SrsHttpxConn()
srs_freep(conn);
srs_freep(ssl);
srs_freep(skt);
srs_freep(io_);
}
void SrsHttpxConn::set_enable_stat(bool v)
@ -323,7 +323,7 @@ srs_error_t SrsHttpxConn::pop_message(ISrsHttpMessage** preq)
{
srs_error_t err = srs_success;
ISrsProtocolReadWriter* io = skt;
ISrsProtocolReadWriter* io = io_;
if (ssl) {
io = ssl;
}
@ -424,16 +424,6 @@ srs_error_t SrsHttpxConn::on_conn_done(srs_error_t r0)
return r0;
}
srs_error_t SrsHttpxConn::set_tcp_nodelay(bool v)
{
return skt->set_tcp_nodelay(v);
}
srs_error_t SrsHttpxConn::set_socket_buffer(srs_utime_t buffer_v)
{
return skt->set_socket_buffer(buffer_v);
}
std::string SrsHttpxConn::desc()
{
if (ssl) {
@ -461,10 +451,6 @@ srs_error_t SrsHttpxConn::start()
return srs_error_wrap(err, "set cors=%d", v);
}
if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}
return conn->start();
}

@ -128,13 +128,13 @@ class SrsHttpxConn : public ISrsConnection, public ISrsStartable, public ISrsHtt
private:
// The manager object to manage the connection.
ISrsResourceManager* manager;
SrsTcpConnection* skt;
ISrsProtocolReadWriter* io_;
SrsSslConnection* ssl;
SrsHttpConn* conn;
// We should never enable the stat, unless HTTP stream connection requires.
bool enable_stat_;
public:
SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
SrsHttpxConn(bool https, ISrsResourceManager* cm, ISrsProtocolReadWriter* io, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpxConn();
public:
// Require statistic about HTTP connection, for HTTP streaming clients only.
@ -151,12 +151,6 @@ public:
virtual srs_error_t on_http_message(ISrsHttpMessage* r, SrsHttpResponseWriter* w);
virtual srs_error_t on_message_done(ISrsHttpMessage* r, SrsHttpResponseWriter* w);
virtual srs_error_t on_conn_done(srs_error_t r0);
// Extract APIs from SrsTcpConnection.
public:
// Set socket option TCP_NODELAY.
virtual srs_error_t set_tcp_nodelay(bool v);
// Set socket option SO_SNDBUF in srs_utime_t.
virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v);
// Interface ISrsResource.
public:
virtual std::string desc();

@ -628,19 +628,6 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Note that the handler of hc now is hxc.
SrsHttpxConn* hxc = dynamic_cast<SrsHttpxConn*>(hc->handler());
srs_assert(hxc);
// Set the socket options for transport.
bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);
if (tcp_nodelay) {
if ((err = hxc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
return srs_error_wrap(err, "set tcp nodelay");
}
}
srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
if ((err = hxc->set_socket_buffer(mw_sleep)) != srs_success) {
return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep);
}
// Start a thread to receive all messages from client, then drop them.
SrsHttpRecvThread* trd = new SrsHttpRecvThread(hxc);
@ -649,10 +636,10 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start recv thread");
}
srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d",
entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, srsu2msi(mw_sleep),
enc->has_cache(), msgs.max);
srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d", entry->pattern.c_str(), enc_desc.c_str(),
srsu2msi(mw_sleep), enc->has_cache(), msgs.max);
// TODO: free and erase the disabled entry after all related connections is closed.
// TODO: FXIME: Support timeout for player, quit infinite-loop.

@ -1314,7 +1314,7 @@ srs_error_t SrsRtcPublishStream::on_twcc(uint16_t sn) {
return err;
}
srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
srs_error_t SrsRtcPublishStream::on_rtp_cipher(char* data, int nb_data)
{
srs_error_t err = srs_success;
@ -1350,33 +1350,6 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
}
}
// Decrypt the cipher to plaintext RTP data.
char* plaintext = data;
int nb_plaintext = nb_data;
if ((err = session_->network_->unprotect_rtp(plaintext, &nb_plaintext)) != srs_success) {
// We try to decode the RTP header for more detail error informations.
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true);
srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding.
err = srs_error_wrap(err, "marker=%u, pt=%u, seq=%u, ts=%u, ssrc=%u, pad=%u, payload=%uB", h.get_marker(), h.get_payload_type(),
h.get_sequence(), h.get_timestamp(), h.get_ssrc(), h.get_padding(), nb_data - b.pos());
return err;
}
// Handle the plaintext RTP packet.
if ((err = on_rtp_plaintext(plaintext, nb_plaintext)) != srs_success) {
// We try to decode the RTP header for more detail error informations.
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true);
srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding.
int nb_header = h.nb_bytes();
const char* body = data + nb_header;
int nb_body = nb_data - nb_header;
return srs_error_wrap(err, "cipher=%u, plaintext=%u, body=[%s]", nb_data, nb_plaintext,
srs_string_dumps_hex(body, nb_body, 8).c_str());
}
return err;
}
@ -1769,14 +1742,13 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
cid_ = cid;
server_ = s;
network_ = new SrsRtcNetwork(this);
networks_ = new SrsRtcNetworks(this);
cache_iov_ = new iovec();
cache_iov_->iov_base = new char[kRtpPacketSize];
cache_iov_->iov_len = kRtpPacketSize;
cache_buffer_ = new SrsBuffer((char*)cache_iov_->iov_base, kRtpPacketSize);
state_ = INIT;
last_stun_time = 0;
session_timeout = 0;
disposing_ = false;
@ -1814,7 +1786,7 @@ SrsRtcConnection::~SrsRtcConnection()
players_ssrc_map_.clear();
// Free network over UDP or TCP.
srs_freep(network_);
srs_freep(networks_);
if (true) {
char* iov_base = (char*)cache_iov_->iov_base;
@ -1872,14 +1844,9 @@ void SrsRtcConnection::set_remote_sdp(const SrsSdp& sdp)
remote_sdp = sdp;
}
SrsRtcConnectionStateType SrsRtcConnection::state()
{
return state_;
}
void SrsRtcConnection::set_state(SrsRtcConnectionStateType state)
void SrsRtcConnection::set_state_as_waiting_stun()
{
state_ = state;
networks_->set_state(SrsRtcNetworkStateWaitingStun);
}
string SrsRtcConnection::username()
@ -1889,7 +1856,7 @@ string SrsRtcConnection::username()
ISrsKbpsDelta* SrsRtcConnection::delta()
{
return network_->delta();
return networks_->delta();
}
const SrsContextId& SrsRtcConnection::get_id()
@ -2010,7 +1977,7 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
req_ = r->copy();
SrsSessionConfig* cfg = &local_sdp.session_negotiate_;
if ((err = network_->initialize(cfg, dtls, srtp)) != srs_success) {
if ((err = networks_->initialize(cfg, dtls, srtp)) != srs_success) {
return srs_error_wrap(err, "init");
}
@ -2027,45 +1994,10 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
return err;
}
srs_error_t SrsRtcConnection::on_stun(SrsStunPacket* r, char* data, int nb_data)
{
srs_error_t err = srs_success;
// Write STUN messages to blackhole.
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(data, nb_data);
}
if (!r->is_binding_request()) {
return err;
}
if ((err = on_binding_request(r)) != srs_success) {
return srs_error_wrap(err, "stun binding request failed");
}
return err;
}
srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data)
{
return network_->on_dtls(data, nb_data);
}
srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data)
srs_error_t SrsRtcConnection::on_rtcp(char* unprotected_buf, int nb_unprotected_buf)
{
srs_error_t err = srs_success;
int nb_unprotected_buf = nb_data;
if ((err = network_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtcp unprotect");
}
char* unprotected_buf = data;
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf);
}
SrsBuffer* buffer = new SrsBuffer(unprotected_buf, nb_unprotected_buf);
SrsAutoFree(SrsBuffer, buffer);
@ -2082,7 +2014,7 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data)
SrsAutoFree(SrsRtcpCommon, rtcp);
if(srs_success != err) {
return srs_error_wrap(err, "cipher=%u, plaintext=%u, bytes=[%s], rtcp=(%u,%u,%u,%u)", nb_data, nb_unprotected_buf,
return srs_error_wrap(err, "plaintext=%u, bytes=[%s], rtcp=(%u,%u,%u,%u)", nb_unprotected_buf,
srs_string_dumps_hex(rtcp->data(), rtcp->size(), rtcp->size()).c_str(),
rtcp->get_rc(), rtcp->type(), rtcp->get_ssrc(), rtcp->size());
}
@ -2185,7 +2117,7 @@ srs_error_t SrsRtcConnection::on_rtcp_feedback_remb(SrsRtcpPsfbCommon *rtcp)
return srs_success;
}
srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
srs_error_t SrsRtcConnection::on_rtp_cipher(char* data, int nb_data)
{
srs_error_t err = srs_success;
@ -2195,7 +2127,20 @@ srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
}
srs_assert(publisher);
return publisher->on_rtp(data, nb_data);
return publisher->on_rtp_cipher(data, nb_data);
}
srs_error_t SrsRtcConnection::on_rtp_plaintext(char* data, int nb_data)
{
srs_error_t err = srs_success;
SrsRtcPublishStream* publisher = NULL;
if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) {
return srs_error_wrap(err, "find");
}
srs_assert(publisher);
return publisher->on_rtp_plaintext(data, nb_data);
}
srs_error_t SrsRtcConnection::find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher)
@ -2230,12 +2175,6 @@ srs_error_t SrsRtcConnection::on_connection_established()
return err;
}
// If DTLS done packet received many times, such as ARQ, ignore.
if(ESTABLISHED == state_) {
return err;
}
state_ = ESTABLISHED;
srs_trace("RTC: session pub=%u, sub=%u, to=%dms connection established", publishers_.size(), players_.size(),
srsu2msi(session_timeout));
@ -2294,7 +2233,7 @@ void SrsRtcConnection::alive()
SrsRtcUdpNetwork* SrsRtcConnection::udp()
{
return network_->udp();
return networks_->udp();
}
srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data)
@ -2304,11 +2243,11 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data)
++_srs_pps_srtcps->sugar;
int nb_buf = nb_data;
if ((err = network_->protect_rtcp(data, &nb_buf)) != srs_success) {
if ((err = networks_->available()->protect_rtcp(data, &nb_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp");
}
if ((err = network_->write(data, nb_buf, NULL)) != srs_success) {
if ((err = networks_->available()->write(data, nb_buf, NULL)) != srs_success) {
return srs_error_wrap(err, "send");
}
@ -2492,7 +2431,7 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt)
// Cipher RTP to SRTP packet.
if (true) {
int nn_encrypt = (int)iov->iov_len;
if ((err = network_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) {
if ((err = networks_->available()->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
@ -2507,7 +2446,7 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt)
++_srs_pps_srtps->sugar;
if ((err = network_->write(iov->iov_base, iov->iov_len, NULL)) != srs_success) {
if ((err = networks_->available()->write(iov->iov_base, iov->iov_len, NULL)) != srs_success) {
srs_warn("RTC: Write %d bytes err %s", iov->iov_len, srs_error_desc(err).c_str());
srs_freep(err);
return err;
@ -2544,14 +2483,7 @@ void SrsRtcConnection::set_all_tracks_status(std::string stream_uri, bool is_pub
player->set_all_tracks_status(status);
}
#ifdef SRS_OSX
// These functions are similar to the older byteorder(3) family of functions.
// For example, be32toh() is identical to ntohl().
// @see https://linux.die.net/man/3/be32toh
#define be32toh ntohl
#endif
srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r)
srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r, string& ice_pwd)
{
srs_error_t err = srs_success;
@ -2564,40 +2496,8 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r)
return srs_error_new(ERROR_RTC_STUN, "Peer must not in ice-controlled role in ice-lite mode.");
}
SrsStunPacket stun_binding_response;
char buf[kRtpPacketSize];
SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
SrsAutoFree(SrsBuffer, stream);
stun_binding_response.set_message_type(BindingResponse);
stun_binding_response.set_local_ufrag(r->get_remote_ufrag());
stun_binding_response.set_remote_ufrag(r->get_local_ufrag());
stun_binding_response.set_transcation_id(r->get_transcation_id());
// FIXME: inet_addr is deprecated, IPV6 support
stun_binding_response.set_mapped_address(be32toh(inet_addr(network_->get_peer_ip().c_str())));
stun_binding_response.set_mapped_port(network_->get_peer_port());
if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) {
return srs_error_wrap(err, "stun binding response encode failed");
}
if ((err = network_->write(stream->data(), stream->pos(), NULL)) != srs_success) {
return srs_error_wrap(err, "stun binding response send failed");
}
if (state_ == WAITING_STUN) {
state_ = DOING_DTLS_HANDSHAKE;
// TODO: FIXME: Add cost.
srs_trace("RTC: session STUN done, waiting DTLS handshake.");
if((err = network_->start_active_handshake()) != srs_success) {
return srs_error_wrap(err, "fail to dtls handshake");
}
}
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(stream->data(), stream->pos());
}
// If success, return the ice password to verify the STUN response.
ice_pwd = local_sdp.get_ice_pwd();
return err;
}
@ -3373,13 +3273,9 @@ srs_error_t SrsRtcConnection::create_player(SrsRequest* req, std::map<uint32_t,
}
srs_trace("RTC connection player gcc=%d", twcc_id);
// TODO: Start player when DTLS done. Removed it because we don't support single PC now.
// If DTLS done, start the player. Because maybe create some players after DTLS done.
// For example, for single PC, we maybe start publisher when create it, because DTLS is done.
if(ESTABLISHED == state_) {
if(srs_success != (err = player->start())) {
return srs_error_wrap(err, "start player");
}
}
return err;
}
@ -3453,13 +3349,9 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcSourceDesc
}
}
// TODO: Start player when DTLS done. Removed it because we don't support single PC now.
// If DTLS done, start the publisher. Because maybe create some publishers after DTLS done.
// For example, for single PC, we maybe start publisher when create it, because DTLS is done.
if(ESTABLISHED == state()) {
if(srs_success != (err = publisher->start())) {
return srs_error_wrap(err, "start publisher");
}
}
return err;
}

@ -52,7 +52,7 @@ class SrsRtcUserConfig;
class SrsRtcSendTrack;
class SrsRtcPublishStream;
class SrsEphemeralDelta;
class SrsRtcNetwork;
class SrsRtcNetworks;
class SrsRtcUdpNetwork;
class ISrsRtcNetwork;
@ -67,17 +67,6 @@ const uint8_t kRtpFb = 205;
const uint8_t kPsFb = 206;
const uint8_t kXR = 207;
enum SrsRtcConnectionStateType
{
// TODO: FIXME: Should prefixed by enum name.
INIT = -1,
WAITING_ANSWER = 1,
WAITING_STUN = 2,
DOING_DTLS_HANDSHAKE = 3,
ESTABLISHED = 4,
CLOSED = 5,
};
// The transport for RTC connection.
class ISrsRtcTransport : public ISrsDtlsCallback
{
@ -378,10 +367,8 @@ private:
srs_error_t send_rtcp_rr();
srs_error_t send_rtcp_xr_rrtr();
public:
srs_error_t on_rtp(char* buf, int nb_buf);
private:
// @remark We copy the plaintext, user should free it.
srs_error_t on_rtp_plaintext(char* plaintext, int nb_plaintext);
srs_error_t on_rtp_cipher(char* buf, int nb_buf);
srs_error_t on_rtp_plaintext(char* buf, int nb_buf);
private:
srs_error_t do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuffer* buf);
public:
@ -439,7 +426,6 @@ public:
bool disposing_;
private:
SrsRtcServer* server_;
SrsRtcConnectionStateType state_;
private:
iovec* cache_iov_;
SrsBuffer* cache_buffer_;
@ -455,8 +441,8 @@ private:
private:
// The local:remote username, such as m5x0n128:jvOm where local name is m5x0n128.
std::string username_;
// Use one UDP network and one TCP network.
SrsRtcNetwork* network_;
// A group of networks, each has its own DTLS and SRTP context.
SrsRtcNetworks* networks_;
private:
// TODO: FIXME: Rename it.
// The timeout of session, keep alive by STUN ping pong.
@ -491,9 +477,8 @@ public:
void set_local_sdp(const SrsSdp& sdp);
SrsSdp* get_remote_sdp();
void set_remote_sdp(const SrsSdp& sdp);
// Connection level state machine, for ARQ of UDP packets.
SrsRtcConnectionStateType state();
void set_state(SrsRtcConnectionStateType state);
// Change network to waiting stun state.
void set_state_as_waiting_stun();
// Get username pair for this connection, used as ID of session.
std::string username();
public:
@ -514,10 +499,8 @@ public:
public:
// Before initialize, user must set the local SDP, which is used to inititlize DTLS.
srs_error_t initialize(SrsRequest* r, bool dtls, bool srtp, std::string username);
// The peer address may change, we can identify that by STUN messages.
srs_error_t on_stun(SrsStunPacket* r, char* data, int nb_data);
srs_error_t on_dtls(char* data, int nb_data);
srs_error_t on_rtp(char* data, int nb_data);
srs_error_t on_rtp_cipher(char* data, int nb_data);
srs_error_t on_rtp_plaintext(char* data, int nb_data);
private:
// Decode the RTP header from buf, find the publisher by SSRC.
srs_error_t find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher);
@ -549,8 +532,10 @@ public:
srs_error_t do_send_packet(SrsRtpPacket* pkt);
// Directly set the status of play track, generally for init to set the default value.
void set_all_tracks_status(std::string stream_uri, bool is_publish, bool status);
public:
// Notify by specified network.
srs_error_t on_binding_request(SrsStunPacket* r, std::string& ice_pwd);
private:
srs_error_t on_binding_request(SrsStunPacket* r);
// publish media capabilitiy negotiate
srs_error_t negotiate_publish_capability(SrsRtcUserConfig* ruc, SrsRtcSourceDescription* stream_desc);
srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan);

@ -6,7 +6,7 @@
#include <srs_app_rtc_network.hpp>
#include <string>
#include <arpa/inet.h>
using namespace std;
#include <srs_kernel_log.hpp>
@ -19,31 +19,31 @@ using namespace std;
#include <srs_app_rtc_server.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_protocol_rtc_stun.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_core_autofree.hpp>
ISrsRtcNetwork::ISrsRtcNetwork()
{
}
ISrsRtcNetwork::~ISrsRtcNetwork()
{
}
#ifdef SRS_OSX
// These functions are similar to the older byteorder(3) family of functions.
// For example, be32toh() is identical to ntohl().
// @see https://linux.die.net/man/3/be32toh
#define be32toh ntohl
#endif
SrsRtcNetwork::SrsRtcNetwork(SrsRtcConnection* conn)
SrsRtcNetworks::SrsRtcNetworks(SrsRtcConnection* conn)
{
conn_ = conn;
udp_ = new SrsRtcUdpNetwork(this);
delta_ = new SrsEphemeralDelta();
udp_ = new SrsRtcUdpNetwork(conn_, delta_);
}
SrsRtcNetwork::~SrsRtcNetwork()
SrsRtcNetworks::~SrsRtcNetworks()
{
// Free the UDP network after transport deleted.
srs_freep(udp_);
srs_freep(delta_);
}
srs_error_t SrsRtcNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool srtp)
srs_error_t SrsRtcNetworks::initialize(SrsSessionConfig* cfg, bool dtls, bool srtp)
{
srs_error_t err = srs_success;
@ -54,91 +54,40 @@ srs_error_t SrsRtcNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool srt
return err;
}
srs_error_t SrsRtcNetwork::start_active_handshake()
{
return udp_->start_active_handshake();
}
srs_error_t SrsRtcNetwork::on_dtls(char* data, int nb_data)
{
return udp_->on_dtls(data, nb_data);
}
srs_error_t SrsRtcNetwork::on_dtls_alert(std::string type, std::string desc)
{
return conn_->on_dtls_alert(type, desc);
}
srs_error_t SrsRtcNetwork::on_connection_established()
{
return conn_->on_connection_established();
}
srs_error_t SrsRtcNetwork::protect_rtp(void* packet, int* nb_cipher)
{
return udp_->protect_rtp(packet, nb_cipher);
}
srs_error_t SrsRtcNetwork::protect_rtcp(void* packet, int* nb_cipher)
{
return udp_->protect_rtcp(packet, nb_cipher);
}
srs_error_t SrsRtcNetwork::unprotect_rtp(void* packet, int* nb_plaintext)
{
return udp_->unprotect_rtp(packet, nb_plaintext);
}
srs_error_t SrsRtcNetwork::unprotect_rtcp(void* packet, int* nb_plaintext)
void SrsRtcNetworks::set_state(SrsRtcNetworkState state)
{
return udp_->unprotect_rtcp(packet, nb_plaintext);
udp_->set_state(state);
}
srs_error_t SrsRtcNetwork::on_rtcp(char* data, int nb_data)
SrsRtcUdpNetwork* SrsRtcNetworks::udp()
{
// Update stat when we received data.
delta_->add_delta(nb_data, 0);
return conn_->on_rtcp(data, nb_data);
}
srs_error_t SrsRtcNetwork::on_rtp(char* data, int nb_data)
{
// Update stat when we received data.
delta_->add_delta(nb_data, 0);
return conn_->on_rtp(data, nb_data);
}
string SrsRtcNetwork::get_peer_ip()
{
return udp_->get_peer_ip();
return udp_;
}
int SrsRtcNetwork::get_peer_port()
ISrsRtcNetwork* SrsRtcNetworks::available()
{
return udp_->get_peer_port();
return udp_;
}
SrsRtcUdpNetwork* SrsRtcNetwork::udp()
ISrsKbpsDelta* SrsRtcNetworks::delta()
{
return udp_;
return delta_;
}
ISrsKbpsDelta* SrsRtcNetwork::delta()
ISrsRtcNetwork::ISrsRtcNetwork()
{
return delta_;
}
srs_error_t SrsRtcNetwork::write(void* buf, size_t size, ssize_t* nwrite)
ISrsRtcNetwork::~ISrsRtcNetwork()
{
return udp_->write(buf, size, nwrite);
}
SrsRtcUdpNetwork::SrsRtcUdpNetwork(SrsRtcNetwork* network)
SrsRtcUdpNetwork::SrsRtcUdpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta)
{
network_ = network;
sendonly_skt = NULL;
state_ = SrsRtcNetworkStateInit;
conn_ = conn;
delta_ = delta;
sendonly_skt_ = NULL;
pp_address_change_ = new SrsErrorPithyPrint();
transport_ = new SrsSecurityTransport(this);
}
@ -187,19 +136,31 @@ srs_error_t SrsRtcUdpNetwork::start_active_handshake()
srs_error_t SrsRtcUdpNetwork::on_dtls(char* data, int nb_data)
{
// Update stat when we received data.
network_->delta_->add_delta(nb_data, 0);
delta_->add_delta(nb_data, 0);
return transport_->on_dtls(data, nb_data);
}
srs_error_t SrsRtcUdpNetwork::on_dtls_alert(std::string type, std::string desc)
{
return network_->conn_->on_dtls_alert(type, desc);
return conn_->on_dtls_alert(type, desc);
}
srs_error_t SrsRtcUdpNetwork::on_connection_established()
{
return network_->conn_->on_connection_established();
srs_error_t err = srs_success;
// If DTLS done packet received many times, such as ARQ, ignore.
if(SrsRtcNetworkStateClosed == state_) {
return err;
}
if ((err = conn_->on_connection_established()) != srs_success) {
return srs_error_wrap(err, "udp");
}
state_ = SrsRtcNetworkStateClosed;
return err;
}
srs_error_t SrsRtcUdpNetwork::protect_rtp(void* packet, int* nb_cipher)
@ -212,53 +173,86 @@ srs_error_t SrsRtcUdpNetwork::protect_rtcp(void* packet, int* nb_cipher)
return transport_->protect_rtcp(packet, nb_cipher);
}
srs_error_t SrsRtcUdpNetwork::unprotect_rtp(void* packet, int* nb_plaintext)
srs_error_t SrsRtcUdpNetwork::on_rtcp(char* data, int nb_data)
{
return transport_->unprotect_rtp(packet, nb_plaintext);
}
srs_error_t err = srs_success;
srs_error_t SrsRtcUdpNetwork::unprotect_rtcp(void* packet, int* nb_plaintext)
{
// Update stat when we received data.
network_->delta_->add_delta(*nb_plaintext, 0);
delta_->add_delta(nb_data, 0);
int nb_unprotected_buf = nb_data;
if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtcp unprotect");
}
char* unprotected_buf = data;
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf);
}
return transport_->unprotect_rtcp(packet, nb_plaintext);
if ((err = conn_->on_rtcp(unprotected_buf, nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "cipher=%d", nb_data);
}
return err;
}
srs_error_t SrsRtcUdpNetwork::on_rtcp(char* data, int nb_data)
srs_error_t SrsRtcUdpNetwork::on_rtp(char* data, int nb_data)
{
srs_error_t err = srs_success;
// Update stat when we received data.
network_->delta_->add_delta(nb_data, 0);
delta_->add_delta(nb_data, 0);
if ((err = conn_->on_rtp_cipher(data, nb_data)) != srs_success) {
return srs_error_wrap(err, "cipher=%d", nb_data);
}
int nb_unprotected_buf = nb_data;
if ((err = transport_->unprotect_rtp(data, &nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtp unprotect");
}
char* unprotected_buf = data;
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf);
}
return network_->conn_->on_rtcp(data, nb_data);
if ((err = conn_->on_rtp_plaintext(unprotected_buf, nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "cipher=%d", nb_data);
}
return err;
}
srs_error_t SrsRtcUdpNetwork::on_rtp(char* data, int nb_data)
void SrsRtcUdpNetwork::set_state(SrsRtcNetworkState state)
{
// Update stat when we received data.
network_->delta_->add_delta(nb_data, 0);
if (state_ > state) {
srs_warn("RTC: Ignore setting state=%d, now=%d", state, state_);
return;
}
return network_->conn_->on_rtp(data, nb_data);
state_ = state;
}
string SrsRtcUdpNetwork::get_peer_ip()
{
srs_assert(sendonly_skt);
return sendonly_skt->get_peer_ip();
srs_assert(sendonly_skt_);
return sendonly_skt_->get_peer_ip();
}
int SrsRtcUdpNetwork::get_peer_port()
{
srs_assert(sendonly_skt);
return sendonly_skt->get_peer_port();
srs_assert(sendonly_skt_);
return sendonly_skt_->get_peer_port();
}
void SrsRtcUdpNetwork::update_sendonly_socket(SrsUdpMuxSocket* skt)
{
// TODO: FIXME: Refine performance.
string prev_peer_id, peer_id = skt->peer_id();
if (sendonly_skt) {
prev_peer_id = sendonly_skt->peer_id();
if (sendonly_skt_) {
prev_peer_id = sendonly_skt_->peer_id();
}
// Ignore if same address.
@ -289,33 +283,100 @@ void SrsRtcUdpNetwork::update_sendonly_socket(SrsUdpMuxSocket* skt)
// If no cache, build cache and setup the relations in connection.
if (!addr_cache) {
peer_addresses_[peer_id] = addr_cache = skt->copy_sendonly();
_srs_rtc_manager->add_with_id(peer_id, network_->conn_);
_srs_rtc_manager->add_with_id(peer_id, conn_);
uint64_t fast_id = skt->fast_id();
if (fast_id) {
_srs_rtc_manager->add_with_fast_id(fast_id, network_->conn_);
_srs_rtc_manager->add_with_fast_id(fast_id, conn_);
}
}
// Update the transport.
sendonly_skt = addr_cache;
sendonly_skt_ = addr_cache;
}
srs_error_t SrsRtcUdpNetwork::on_stun(SrsStunPacket* r, char* data, int nb_data)
{
srs_error_t err = srs_success;
// Write STUN messages to blackhole.
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(data, nb_data);
}
if (!r->is_binding_request()) {
return err;
}
string ice_pwd;
if ((err = conn_->on_binding_request(r, ice_pwd)) != srs_success) {
return srs_error_wrap(err, "udp");
}
if ((err = on_binding_request(r, ice_pwd)) != srs_success) {
return srs_error_wrap(err, "stun binding request failed");
}
return err;
}
srs_error_t SrsRtcUdpNetwork::on_binding_request(SrsStunPacket* r, string ice_pwd)
{
srs_error_t err = srs_success;
SrsStunPacket stun_binding_response;
char buf[kRtpPacketSize];
SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
SrsAutoFree(SrsBuffer, stream);
stun_binding_response.set_message_type(BindingResponse);
stun_binding_response.set_local_ufrag(r->get_remote_ufrag());
stun_binding_response.set_remote_ufrag(r->get_local_ufrag());
stun_binding_response.set_transcation_id(r->get_transcation_id());
// FIXME: inet_addr is deprecated, IPV6 support
stun_binding_response.set_mapped_address(be32toh(inet_addr(get_peer_ip().c_str())));
stun_binding_response.set_mapped_port(get_peer_port());
if ((err = stun_binding_response.encode(ice_pwd, stream)) != srs_success) {
return srs_error_wrap(err, "stun binding response encode failed");
}
if ((err = write(stream->data(), stream->pos(), NULL)) != srs_success) {
return srs_error_wrap(err, "stun binding response send failed");
}
if (state_ == SrsRtcNetworkStateWaitingStun) {
state_ = SrsRtcNetworkStateDtls;
// TODO: FIXME: Add cost.
srs_trace("RTC: session STUN done, waiting DTLS handshake.");
if((err = start_active_handshake()) != srs_success) {
return srs_error_wrap(err, "fail to dtls handshake");
}
}
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(stream->data(), stream->pos());
}
return err;
}
srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite)
{
// Update stat when we sending data.
network_->delta_->add_delta(0, size);
delta_->add_delta(0, size);
if (nwrite) *nwrite = size;
return sendonly_skt->sendto(buf, size, SRS_UTIME_NO_TIMEOUT);
return sendonly_skt_->sendto(buf, size, SRS_UTIME_NO_TIMEOUT);
}
SrsRtcTcpConn::SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm)
SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm)
{
manager_ = cm;
ip_ = cip;
port_ = port;
skt_ = new SrsTcpConnection(fd);
skt_ = skt;
delta_ = new SrsNetworkDelta();
delta_->set_io(skt_, skt_);
trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id());

@ -28,84 +28,94 @@ class ISrsRtcTransport;
class SrsEphemeralDelta;
class ISrsKbpsDelta;
class SrsRtcUdpNetwork;
class ISrsRtcNetwork;
// For DTLS to call network service.
class ISrsRtcNetwork : public ISrsStreamWriter
// The network stat.
enum SrsRtcNetworkState
{
public:
ISrsRtcNetwork();
virtual ~ISrsRtcNetwork();
public:
// Callback when DTLS connected.
virtual srs_error_t on_connection_established() = 0;
// Callback when DTLS disconnected.
virtual srs_error_t on_dtls_alert(std::string type, std::string desc) = 0;
SrsRtcNetworkStateInit = -1,
SrsRtcNetworkStateWaitingAnswer = 1,
SrsRtcNetworkStateWaitingStun = 2,
SrsRtcNetworkStateDtls = 3,
SrsRtcNetworkStateEstablished = 4,
SrsRtcNetworkStateClosed = 5,
};
// The UDP network, default for WebRTC.
class SrsRtcNetwork : public ISrsRtcNetwork
// A group of networks, each has its own DTLS and SRTP context.
class SrsRtcNetworks
{
private:
friend class SrsRtcUdpNetwork;
// Network over UDP.
SrsRtcUdpNetwork* udp_;
private:
// WebRTC session object.
SrsRtcConnection* conn_;
// Network over UDP.
SrsRtcUdpNetwork* udp_;
// Delta object for statistics.
SrsEphemeralDelta* delta_;
public:
SrsRtcNetwork(SrsRtcConnection* conn);
virtual ~SrsRtcNetwork();
SrsRtcNetworks(SrsRtcConnection* conn);
virtual ~SrsRtcNetworks();
// DTLS transport functions.
public:
srs_error_t initialize(SrsSessionConfig* cfg, bool dtls, bool srtp);
virtual srs_error_t start_active_handshake();
virtual srs_error_t on_dtls(char* data, int nb_data);
virtual srs_error_t on_dtls_alert(std::string type, std::string desc);
srs_error_t on_connection_established();
srs_error_t protect_rtp(void* packet, int* nb_cipher);
srs_error_t protect_rtcp(void* packet, int* nb_cipher);
srs_error_t unprotect_rtp(void* packet, int* nb_plaintext);
srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext);
// When got data from socket.
public:
srs_error_t on_rtcp(char* data, int nb_data);
srs_error_t on_rtp(char* data, int nb_data);
// Other functions.
public:
// ICE reflexive address functions.
std::string get_peer_ip();
int get_peer_port();
// Connection level state machine, for ARQ of UDP packets.
void set_state(SrsRtcNetworkState state);
// Get the UDP network object.
SrsRtcUdpNetwork* udp();
// Get an available network.
ISrsRtcNetwork* available();
public:
// Get the delta object for statistics.
virtual ISrsKbpsDelta* delta();
// Interface ISrsStreamWriter.
};
// For DTLS or Session to call network service.
class ISrsRtcNetwork : public ISrsStreamWriter
{
public:
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
ISrsRtcNetwork();
virtual ~ISrsRtcNetwork();
public:
// Callback when DTLS connected.
virtual srs_error_t on_connection_established() = 0;
// Callback when DTLS disconnected.
virtual srs_error_t on_dtls_alert(std::string type, std::string desc) = 0;
public:
// Protect RTP packet by SRTP context.
virtual srs_error_t protect_rtp(void* packet, int* nb_cipher) = 0;
// Protect RTCP packet by SRTP context.
virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher) = 0;
};
// The WebRTC over UDP network.
class SrsRtcUdpNetwork : public ISrsRtcNetwork
{
private:
SrsRtcNetwork* network_;
// WebRTC session object.
SrsRtcConnection* conn_;
// Delta object for statistics.
SrsEphemeralDelta* delta_;
SrsRtcNetworkState state_;
private:
// Pithy print for address change, use port as error code.
SrsErrorPithyPrint* pp_address_change_;
// The peer address, client maybe use more than one address, it's the current selected one.
SrsUdpMuxSocket* sendonly_skt;
SrsUdpMuxSocket* sendonly_skt_;
// The address list, client may use multiple addresses.
std::map<std::string, SrsUdpMuxSocket*> peer_addresses_;
// The DTLS transport over this network.
ISrsRtcTransport* transport_;
public:
SrsRtcUdpNetwork(SrsRtcNetwork* network);
SrsRtcUdpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta);
virtual ~SrsRtcUdpNetwork();
public:
// Update the UDP connection.
void update_sendonly_socket(SrsUdpMuxSocket* skt);
// When got STUN ping message. The peer address may change, we can identify that by STUN messages.
srs_error_t on_stun(SrsStunPacket* r, char* data, int nb_data);
private:
srs_error_t on_binding_request(SrsStunPacket* r, std::string ice_pwd);
// DTLS transport functions.
public:
srs_error_t initialize(SrsSessionConfig* cfg, bool dtls, bool srtp);
@ -115,14 +125,14 @@ public:
srs_error_t on_connection_established();
srs_error_t protect_rtp(void* packet, int* nb_cipher);
srs_error_t protect_rtcp(void* packet, int* nb_cipher);
srs_error_t unprotect_rtp(void* packet, int* nb_plaintext);
srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext);
// When got data from socket.
public:
srs_error_t on_rtcp(char* data, int nb_data);
srs_error_t on_rtp(char* data, int nb_data);
// Other functions.
public:
// Connection level state machine, for ARQ of UDP packets.
void set_state(SrsRtcNetworkState state);
// ICE reflexive address functions.
std::string get_peer_ip();
int get_peer_port();
@ -145,9 +155,9 @@ private:
// The delta for statistic.
SrsNetworkDelta* delta_;
// TCP Transport object.
SrsTcpConnection* skt_;
ISrsProtocolReadWriter* skt_;
public:
SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm);
SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm);
virtual ~SrsRtcTcpConn();
public:
ISrsKbpsDelta* delta();

@ -419,7 +419,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
session->udp()->update_sendonly_socket(skt);
}
return session->on_stun(&ping, data, size);
return session->udp()->on_stun(&ping, data, size);
}
// For DTLS, RTCP or RTP, which does not support peer address changing.
@ -432,7 +432,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
if (is_rtp_or_rtcp && !is_rtcp) {
++_srs_pps_rrtps->sugar;
err = session->on_rtp(data, size);
err = session->udp()->on_rtp(data, size);
if (err != srs_success) {
session->switch_to_context();
}
@ -443,12 +443,12 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
if (is_rtp_or_rtcp && is_rtcp) {
++_srs_pps_rrtcps->sugar;
return session->on_rtcp(data, size);
return session->udp()->on_rtcp(data, size);
}
if (srs_is_dtls((uint8_t*)data, size)) {
++_srs_pps_rstuns->sugar;
return session->on_dtls(data, size);
return session->udp()->on_dtls(data, size);
}
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
}
@ -546,17 +546,22 @@ srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local
// We allows to mock the eip of server.
if (true) {
int listen_port = _srs_config->get_rtc_server_listen();
int udp_port = _srs_config->get_rtc_server_listen();
int tcp_port = _srs_config->get_rtc_server_tcp_listen();
string protocol = _srs_config->get_rtc_server_protocol();
set<string> candidates = discover_candidates(ruc);
for (set<string>::iterator it = candidates.begin(); it != candidates.end(); ++it) {
string hostname; int port = listen_port;
srs_parse_hostport(*it, hostname, port);
if (protocol == "udp" || protocol == "tcp") {
local_sdp.add_candidate(protocol, hostname, port, "host");
string hostname;
int uport = udp_port; srs_parse_hostport(*it, hostname, uport);
int tport = tcp_port; srs_parse_hostport(*it, hostname, tport);
if (protocol == "udp") {
local_sdp.add_candidate("udp", hostname, uport, "host");
} else if (protocol == "tcp") {
local_sdp.add_candidate("tcp", hostname, tport, "host");
} else {
local_sdp.add_candidate("udp", hostname, port, "host");
local_sdp.add_candidate("tcp", hostname, port, "host");
local_sdp.add_candidate("udp", hostname, uport, "host");
local_sdp.add_candidate("tcp", hostname, tport, "host");
}
}
@ -585,7 +590,7 @@ srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local
session->set_remote_sdp(ruc->remote_sdp_);
// We must setup the local SDP, then initialize the session object.
session->set_local_sdp(local_sdp);
session->set_state(WAITING_STUN);
session->set_state_as_waiting_stun();
// Before session initialize, we must setup the local SDP.
if ((err = session->initialize(req, ruc->dtls_, ruc->srtp_, username)) != srs_success) {

@ -1444,10 +1444,6 @@ srs_error_t SrsRtmpConn::start()
{
srs_error_t err = srs_success;
if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}

@ -36,7 +36,10 @@ using namespace std;
#include <srs_app_coworkers.hpp>
#include <srs_protocol_log.hpp>
#include <srs_app_latest_version.hpp>
#include <srs_app_conn.hpp>
#ifdef SRS_RTC
#include <srs_app_rtc_network.hpp>
#endif
std::string srs_listener_type2string(SrsListenerType type)
{
@ -533,6 +536,7 @@ SrsServer::SrsServer()
http_api_mux = new SrsHttpServeMux();
http_server = new SrsHttpServer(this);
reuse_api_over_server_ = false;
reuse_rtc_over_server_ = false;
http_heartbeat = new SrsHttpHeartbeat();
ingester = new SrsIngester();
@ -655,16 +659,35 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch)
return srs_error_wrap(err, "handler initialize");
}
bool stream = _srs_config->get_http_stream_enabled();
string http_listen = _srs_config->get_http_stream_listen();
string https_listen = _srs_config->get_https_stream_listen();
#ifdef SRS_RTC
bool rtc = _srs_config->get_rtc_server_enabled();
bool rtc_tcp = _srs_config->get_rtc_server_tcp_enabled();
string rtc_listen = srs_int2str(_srs_config->get_rtc_server_tcp_listen());
// If enabled and listen is the same value, resue port for WebRTC over TCP.
if (stream && rtc && rtc_tcp && http_listen == rtc_listen) {
srs_trace("WebRTC tcp=%s reuses http=%s server", rtc_listen.c_str(), http_listen.c_str());
reuse_rtc_over_server_ = true;
}
if (stream && rtc && rtc_tcp && https_listen == rtc_listen) {
srs_trace("WebRTC tcp=%s reuses https=%s server", rtc_listen.c_str(), https_listen.c_str());
reuse_rtc_over_server_ = true;
}
#endif
// If enabled and the listen is the same value, reuse port.
if (_srs_config->get_http_stream_enabled() && _srs_config->get_http_api_enabled()
&& _srs_config->get_http_api_listen() == _srs_config->get_http_stream_listen()
&& _srs_config->get_https_api_listen() == _srs_config->get_https_stream_listen()
) {
srs_trace("API reuse listen to https server at %s", _srs_config->get_https_stream_listen().c_str());
bool api = _srs_config->get_http_api_enabled();
string api_listen = _srs_config->get_http_api_listen();
string apis_listen = _srs_config->get_https_api_listen();
if (stream && api && api_listen == http_listen && apis_listen == https_listen) {
srs_trace("API reuses http=%s and https=%s server", http_listen.c_str(), https_listen.c_str());
reuse_api_over_server_ = true;
}
// If reuse port, use the same object as server.
// Only init HTTP API when not reusing HTTP server.
if (!reuse_api_over_server_) {
SrsHttpServeMux *api = dynamic_cast<SrsHttpServeMux*>(http_api_mux);
srs_assert(api);
@ -744,22 +767,26 @@ srs_error_t SrsServer::listen()
return srs_error_wrap(err, "stream caster listen");
}
// TODO: FIXME: Refine the listeners.
close_listeners(SrsListenerTcp);
if (_srs_config->get_rtc_server_tcp_enabled()) {
SrsListener* listener = new SrsBufferListener(this, SrsListenerTcp);
listeners.push_back(listener);
#ifdef SRS_RTC
if (!reuse_rtc_over_server_) {
// TODO: FIXME: Refine the listeners.
close_listeners(SrsListenerTcp);
if (_srs_config->get_rtc_server_tcp_enabled()) {
SrsListener* listener = new SrsBufferListener(this, SrsListenerTcp);
listeners.push_back(listener);
std::string ep = srs_int2str(_srs_config->get_rtc_server_tcp_listen());
std::string ep = srs_int2str(_srs_config->get_rtc_server_tcp_listen());
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
if ((err = listener->listen(ip, port)) != srs_success) {
return srs_error_wrap(err, "tcp listen %s:%d", ip.c_str(), port);
if ((err = listener->listen(ip, port)) != srs_success) {
return srs_error_wrap(err, "tcp listen %s:%d", ip.c_str(), port);
}
}
}
#endif
if ((err = conn_manager->start()) != srs_success) {
return srs_error_wrap(err, "connection manager");
@ -1376,11 +1403,13 @@ void SrsServer::resample_kbps()
continue;
}
#ifdef SRS_RTC
SrsRtcTcpConn* tcp = dynamic_cast<SrsRtcTcpConn*>(c);
if (tcp) {
stat->kbps_add_delta(c->get_id().c_str(), tcp->delta());
continue;
}
#endif
// Impossible path, because we only create these connections above.
srs_assert(false);
@ -1397,7 +1426,6 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
ISrsResource* resource = NULL;
if ((err = fd_to_resource(type, stfd, &resource)) != srs_success) {
//close fd on conn error, otherwise will lead to fd leak -gs
srs_close_stfd(stfd);
if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {
srs_error_reset(err);
@ -1405,7 +1433,11 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
}
return srs_error_wrap(err, "fd to resource");
}
srs_assert(resource);
// Ignore if no resource found.
if (!resource) {
return err;
}
// directly enqueue, the cycle thread will remove the client.
conn_manager->add(resource);
@ -1423,7 +1455,7 @@ ISrsHttpServeMux* SrsServer::api_server()
return http_api_mux;
}
srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr)
srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, ISrsResource** pr)
{
srs_error_t err = srs_success;
@ -1462,24 +1494,56 @@ srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, IS
}
}
// We will free the stfd from now on.
srs_netfd_t fd2 = stfd;
stfd = NULL;
// The context id may change during creating the bellow objects.
SrsContextRestore(_srs_context->get_id());
#ifdef SRS_RTC
// If reuse HTTP server with WebRTC TCP, peek to detect the client.
if (reuse_rtc_over_server_ && (type == SrsListenerHttpStream || type == SrsListenerHttpsStream)) {
SrsTcpConnection* skt = new SrsTcpConnection(fd2);
SrsBufferedReader* io = new SrsBufferedReader(skt);
uint8_t b[10]; int nn = sizeof(b);
if ((err = io->peek((char*)b, &nn)) != srs_success) {
srs_freep(io); srs_freep(skt);
return srs_error_wrap(err, "peek");
}
// If first message is BindingRequest(00 01), prefixed with length(2B), it's WebRTC client. Generally, the frame
// length minus message length should be 20, that is the header size of STUN is 20 bytes. For example:
// 00 6c # Frame length: 0x006c = 108
// 00 01 # Message Type: Binding Request(0x0001)
// 00 58 # Message Length: 0x005 = 88
// 21 12 a4 42 # Message Cookie: 0x2112a442
// 48 32 6c 61 6b 42 35 71 42 35 4a 71 # Message Transaction ID: 12 bytes
if (nn == 10 && b[0] == 0 && b[2] == 0 && b[3] == 1 && b[1] - b[5] == 20
&& b[6] == 0x21 && b[7] == 0x12 && b[8] == 0xa4 && b[9] == 0x42
) {
*pr = new SrsRtcTcpConn(io, ip, port, this);
} else {
*pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, io, http_server, ip, port);
}
return err;
}
#endif
if (type == SrsListenerRtmpStream) {
*pr = new SrsRtmpConn(this, stfd, ip, port);
} else if (type == SrsListenerHttpApi) {
*pr = new SrsHttpxConn(false, this, stfd, http_api_mux, ip, port);
} else if (type == SrsListenerHttpsApi) {
*pr = new SrsHttpxConn(true, this, stfd, http_api_mux, ip, port);
} else if (type == SrsListenerHttpStream) {
*pr = new SrsHttpxConn(false, this, stfd, http_server, ip, port);
} else if (type == SrsListenerHttpsStream) {
*pr = new SrsHttpxConn(true, this, stfd, http_server, ip, port);
*pr = new SrsRtmpConn(this, fd2, ip, port);
} else if (type == SrsListenerHttpApi || type == SrsListenerHttpsApi) {
*pr = new SrsHttpxConn(type == SrsListenerHttpsApi, this, new SrsTcpConnection(fd2), http_api_mux, ip, port);
} else if (type == SrsListenerHttpStream || type == SrsListenerHttpsStream) {
*pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, new SrsTcpConnection(fd2), http_server, ip, port);
#ifdef SRS_RTC
} else if (type == SrsListenerTcp) {
*pr = new SrsRtcTcpConn(stfd, ip, port, this);
*pr = new SrsRtcTcpConn(new SrsTcpConnection(fd2), ip, port, this);
#endif
} else {
srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
srs_close_stfd(stfd);
srs_close_stfd(fd2);
return err;
}

@ -203,8 +203,10 @@ private:
// TODO: FIXME: Extract an HttpApiServer.
ISrsHttpServeMux* http_api_mux;
SrsHttpServer* http_server;
// If reuse, HTTP API use the same port of HTTP server.
// If reusing, HTTP API use the same port of HTTP server.
bool reuse_api_over_server_;
// If reusing, WebRTC TCP use the same port of HTTP server.
bool reuse_rtc_over_server_;
private:
SrsHttpHeartbeat* http_heartbeat;
SrsIngester* ingester;
@ -313,7 +315,7 @@ public:
// TODO: FIXME: Fetch from hybrid server manager.
virtual ISrsHttpServeMux* api_server();
private:
virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr);
virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, ISrsResource** pr);
// Interface ISrsResourceManager
public:
// A callback for connection to remove itself.

@ -9,6 +9,6 @@
#define VERSION_MAJOR 5
#define VERSION_MINOR 0
#define VERSION_REVISION 59
#define VERSION_REVISION 60
#endif

@ -220,7 +220,7 @@ int SrsHttpParser::on_url(http_parser* parser, const char* at, size_t length)
{
SrsHttpParser* obj = (SrsHttpParser*)parser->data;
srs_assert(obj);
if (length > 0) {
// Note that this function might be called for multiple times, and we got pieces of content.
obj->url.append(at, (int)length);

@ -469,19 +469,23 @@ bool srs_is_never_timeout(srs_utime_t tm)
SrsStSocket::SrsStSocket()
{
stfd = NULL;
stm = rtm = SRS_UTIME_NO_TIMEOUT;
rbytes = sbytes = 0;
init(NULL);
}
SrsStSocket::SrsStSocket(srs_netfd_t fd)
{
init(fd);
}
SrsStSocket::~SrsStSocket()
{
}
srs_error_t SrsStSocket::initialize(srs_netfd_t fd)
void SrsStSocket::init(srs_netfd_t fd)
{
stfd = fd;
return srs_success;
stfd_ = fd;
stm = rtm = SRS_UTIME_NO_TIMEOUT;
rbytes = sbytes = 0;
}
void SrsStSocket::set_recv_timeout(srs_utime_t tm)
@ -517,12 +521,14 @@ int64_t SrsStSocket::get_send_bytes()
srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{
srs_error_t err = srs_success;
srs_assert(stfd_);
ssize_t nb_read;
if (rtm == SRS_UTIME_NO_TIMEOUT) {
nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
nb_read = st_read((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read((st_netfd_t)stfd, buf, size, rtm);
nb_read = st_read((st_netfd_t)stfd_, buf, size, rtm);
}
if (nread) {
@ -552,12 +558,14 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{
srs_error_t err = srs_success;
srs_assert(stfd_);
ssize_t nb_read;
if (rtm == SRS_UTIME_NO_TIMEOUT) {
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
nb_read = st_read_fully((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm);
nb_read = st_read_fully((st_netfd_t)stfd_, buf, size, rtm);
}
if (nread) {
@ -576,7 +584,7 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
errno = ECONNRESET;
}
return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully");
return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully, size=%d, nn=%d", size, nb_read);
}
rbytes += nb_read;
@ -587,12 +595,14 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{
srs_error_t err = srs_success;
srs_assert(stfd_);
ssize_t nb_write;
if (stm == SRS_UTIME_NO_TIMEOUT) {
nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
nb_write = st_write((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_write((st_netfd_t)stfd, buf, size, stm);
nb_write = st_write((st_netfd_t)stfd_, buf, size, stm);
}
if (nwrite) {
@ -617,12 +627,14 @@ srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
srs_error_t err = srs_success;
srs_assert(stfd_);
ssize_t nb_write;
if (stm == SRS_UTIME_NO_TIMEOUT) {
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
nb_write = st_writev((st_netfd_t)stfd_, iov, iov_size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm);
nb_write = st_writev((st_netfd_t)stfd_, iov, iov_size, stm);
}
if (nwrite) {
@ -646,7 +658,7 @@ srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm)
{
stfd = NULL;
stfd_ = NULL;
io = new SrsStSocket();
host = h;
@ -656,37 +668,26 @@ SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm)
SrsTcpClient::~SrsTcpClient()
{
close();
srs_freep(io);
srs_close_stfd(stfd_);
}
srs_error_t SrsTcpClient::connect()
{
srs_error_t err = srs_success;
close();
srs_assert(stfd == NULL);
srs_netfd_t stfd = NULL;
if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) {
return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout));
}
if ((err = io->initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "tcp: init socket object");
}
return err;
}
void SrsTcpClient::close()
{
// Ignore when already closed.
if (!io) {
return;
}
srs_freep(io);
io = new SrsStSocket(stfd);
srs_close_stfd(stfd_);
stfd_ = stfd;
srs_close_stfd(stfd);
return err;
}
void SrsTcpClient::set_recv_timeout(srs_utime_t tm)

@ -127,13 +127,13 @@ private:
int64_t rbytes;
int64_t sbytes;
// The underlayer st fd.
srs_netfd_t stfd;
srs_netfd_t stfd_;
public:
SrsStSocket();
SrsStSocket(srs_netfd_t fd);
virtual ~SrsStSocket();
public:
// Initialize the socket with stfd, user must manage it.
virtual srs_error_t initialize(srs_netfd_t fd);
private:
void init(srs_netfd_t fd);
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
@ -161,7 +161,7 @@ public:
class SrsTcpClient : public ISrsProtocolReadWriter
{
private:
srs_netfd_t stfd;
srs_netfd_t stfd_;
SrsStSocket* io;
private:
std::string host;
@ -179,10 +179,6 @@ public:
// Connect to server over TCP.
// @remark We will close the exists connection before do connect.
virtual srs_error_t connect();
private:
// Close the connection to server.
// @remark User should never use the client when close it.
virtual void close();
// Interface ISrsProtocolReadWriter
public:
virtual void set_recv_timeout(srs_utime_t tm);

@ -113,12 +113,11 @@ VOID TEST(TCPServerTest, PingPong)
SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout);
HELPER_EXPECT_SUCCESS(c.connect());
SrsStSocket skt;
srs_usleep(30 * SRS_UTIME_MILLISECONDS);
#ifdef SRS_OSX
ASSERT_TRUE(h.fd != NULL);
#endif
HELPER_EXPECT_SUCCESS(skt.initialize(h.fd));
SrsStSocket skt(h.fd);
HELPER_EXPECT_SUCCESS(c.write((void*)"Hello", 5, NULL));
@ -135,12 +134,11 @@ VOID TEST(TCPServerTest, PingPong)
SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout);
HELPER_EXPECT_SUCCESS(c.connect());
SrsStSocket skt;
srs_usleep(30 * SRS_UTIME_MILLISECONDS);
#ifdef SRS_OSX
ASSERT_TRUE(h.fd != NULL);
#endif
HELPER_EXPECT_SUCCESS(skt.initialize(h.fd));
SrsStSocket skt(h.fd);
HELPER_EXPECT_SUCCESS(c.write((void*)"Hello", 5, NULL));
HELPER_EXPECT_SUCCESS(c.write((void*)" ", 1, NULL));
@ -159,12 +157,11 @@ VOID TEST(TCPServerTest, PingPong)
SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout);
HELPER_EXPECT_SUCCESS(c.connect());
SrsStSocket skt;
srs_usleep(30 * SRS_UTIME_MILLISECONDS);
#ifdef SRS_OSX
ASSERT_TRUE(h.fd != NULL);
#endif
HELPER_EXPECT_SUCCESS(skt.initialize(h.fd));
SrsStSocket skt(h.fd);
HELPER_EXPECT_SUCCESS(c.write((void*)"Hello SRS", 9, NULL));
EXPECT_EQ(9, c.get_send_bytes());
@ -194,12 +191,11 @@ VOID TEST(TCPServerTest, PingPongWithTimeout)
SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout);
HELPER_EXPECT_SUCCESS(c.connect());
SrsStSocket skt;
srs_usleep(30 * SRS_UTIME_MILLISECONDS);
#ifdef SRS_OSX
ASSERT_TRUE(h.fd != NULL);
#endif
HELPER_EXPECT_SUCCESS(skt.initialize(h.fd));
SrsStSocket skt(h.fd);
skt.set_recv_timeout(1 * SRS_UTIME_MILLISECONDS);
char buf[16] = {0};
@ -216,12 +212,11 @@ VOID TEST(TCPServerTest, PingPongWithTimeout)
SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout);
HELPER_EXPECT_SUCCESS(c.connect());
SrsStSocket skt;
srs_usleep(30 * SRS_UTIME_MILLISECONDS);
#ifdef SRS_OSX
ASSERT_TRUE(h.fd != NULL);
#endif
HELPER_EXPECT_SUCCESS(skt.initialize(h.fd));
SrsStSocket skt(h.fd);
skt.set_recv_timeout(1 * SRS_UTIME_MILLISECONDS);
char buf[16] = {0};
@ -238,12 +233,11 @@ VOID TEST(TCPServerTest, PingPongWithTimeout)
SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout);
HELPER_EXPECT_SUCCESS(c.connect());
SrsStSocket skt;
srs_usleep(30 * SRS_UTIME_MILLISECONDS);
#ifdef SRS_OSX
ASSERT_TRUE(h.fd != NULL);
#endif
HELPER_EXPECT_SUCCESS(skt.initialize(h.fd));
SrsStSocket skt(h.fd);
skt.set_recv_timeout(1 * SRS_UTIME_MILLISECONDS);
HELPER_EXPECT_SUCCESS(c.write((void*)"Hello", 5, NULL));
@ -418,12 +412,11 @@ VOID TEST(TCPServerTest, WritevIOVC)
SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout);
HELPER_EXPECT_SUCCESS(c.connect());
SrsStSocket skt;
srs_usleep(30 * SRS_UTIME_MILLISECONDS);
#ifdef SRS_OSX
ASSERT_TRUE(h.fd != NULL);
#endif
HELPER_EXPECT_SUCCESS(skt.initialize(h.fd));
SrsStSocket skt(h.fd);
iovec iovs[3];
iovs[0].iov_base = (void*)"H";
@ -448,12 +441,11 @@ VOID TEST(TCPServerTest, WritevIOVC)
SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout);
HELPER_EXPECT_SUCCESS(c.connect());
SrsStSocket skt;
srs_usleep(30 * SRS_UTIME_MILLISECONDS);
#ifdef SRS_OSX
ASSERT_TRUE(h.fd != NULL);
#endif
HELPER_EXPECT_SUCCESS(skt.initialize(h.fd));
SrsStSocket skt(h.fd);
iovec iovs[3];
iovs[0].iov_base = (void*)"H";
@ -1012,11 +1004,7 @@ public:
virtual srs_error_t do_cycle(srs_netfd_t cfd) {
srs_error_t err = srs_success;
SrsStSocket skt;
if ((err = skt.initialize(cfd)) != srs_success) {
return err;
}
SrsStSocket skt(cfd);
skt.set_recv_timeout(1 * SRS_UTIME_SECONDS);
skt.set_send_timeout(1 * SRS_UTIME_SECONDS);
@ -1085,7 +1073,7 @@ VOID TEST(TCPServerTest, TCPClientServer)
HELPER_ASSERT_SUCCESS(c.write((void*)"Hello", 5, NULL));
char buf[6]; HELPER_ARRAY_INIT(buf, 6, 0);
ASSERT_EQ(5, srs_read(c.stfd, buf, 5, 1*SRS_UTIME_SECONDS));
ASSERT_EQ(5, srs_read(c.stfd_, buf, 5, 1*SRS_UTIME_SECONDS));
EXPECT_STREQ("Hello", buf);
}
}
@ -1263,11 +1251,7 @@ public:
virtual srs_error_t do_cycle(srs_netfd_t cfd) {
srs_error_t err = srs_success;
SrsStSocket skt;
if ((err = skt.initialize(cfd)) != srs_success) {
return err;
}
SrsStSocket skt(cfd);
skt.set_recv_timeout(1 * SRS_UTIME_SECONDS);
skt.set_send_timeout(1 * SRS_UTIME_SECONDS);

Loading…
Cancel
Save