RTC: Refine code

pull/1908/head
winlin 5 years ago
parent c115f77038
commit be951b17f1

@ -133,6 +133,7 @@ public:
virtual srs_error_t cycle();
};
// TODO: FIXME: Rename it. Refine it for performance issue.
class SrsUdpMuxSocket
{
private:

@ -93,10 +93,8 @@ srs_error_t SrsSecurityTransport::write_dtls_data(void* data, int size)
}
}
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = data; int len = size; SrsRtcConnection* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(data, size);
}
return err;
@ -929,10 +927,8 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
return err;
}
if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = unprotected_buf; int len = nb_unprotected_buf; SrsRtcConnection* s = session_;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf);
}
char* buf = unprotected_buf;
@ -1481,9 +1477,6 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, SrsContextId context_id)
sessionStunTimeout = 0;
disposing_ = false;
blackhole = false;
blackhole_addr = NULL;
blackhole_stfd = NULL;
twcc_id_ = 0;
nn_simulate_player_nack_drop = 0;
}
@ -1494,8 +1487,6 @@ SrsRtcConnection::~SrsRtcConnection()
srs_freep(publisher_);
srs_freep(transport_);
srs_freep(req);
srs_close_stfd(blackhole_stfd);
srs_freep(blackhole_addr);
srs_freep(sendonly_skt);
}
@ -1540,11 +1531,6 @@ string SrsRtcConnection::peer_id()
return peer_id_;
}
void SrsRtcConnection::set_peer_id(string v)
{
peer_id_ = v;
}
string SrsRtcConnection::username()
{
return username_;
@ -1572,7 +1558,7 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRequest* req, const SrsSdp& remot
SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription();
SrsAutoFree(SrsRtcStreamDescription, stream_desc);
if ((err = negotiate_publish_capability(req, remote_sdp, stream_desc)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
return srs_error_wrap(err, "publish negotiate");
}
if ((err = generate_publish_local_sdp(req, local_sdp, stream_desc)) != srs_success) {
@ -1593,16 +1579,17 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRequest* req, const SrsSdp& remot
return err;
}
// TODO: FIXME: Error when play before publishing.
srs_error_t SrsRtcConnection::add_player(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp)
{
srs_error_t err = srs_success;
std::map<uint32_t, SrsRtcTrackDescription*> play_sub_relations;
if ((err = negotiate_play_capability(req, remote_sdp, play_sub_relations)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
return srs_error_wrap(err, "play negotiate");
}
if (!play_sub_relations.size()) {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "cannot negotiate sub relations");
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no play relations");
}
SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription();
@ -1638,11 +1625,11 @@ srs_error_t SrsRtcConnection::add_player2(SrsRequest* req, SrsSdp& local_sdp)
std::map<uint32_t, SrsRtcTrackDescription*> play_sub_relations;
if ((err = fetch_source_capability(req, play_sub_relations)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
return srs_error_wrap(err, "play negotiate");
}
if (!play_sub_relations.size()) {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "cannot negotiate sub relations");
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no play relations");
}
SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription();
@ -1691,30 +1678,8 @@ srs_error_t SrsRtcConnection::initialize(SrsRtcStream* source, SrsRequest* r, bo
sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req->vhost);
last_stun_time = srs_get_system_time();
blackhole = _srs_config->get_rtc_server_black_hole();
srs_trace("RTC init session, DTLS(role=%s, version=%s), timeout=%dms, blackhole=%d",
cfg->dtls_role.c_str(), cfg->dtls_version.c_str(), srsu2msi(sessionStunTimeout), blackhole);
if (blackhole) {
string blackhole_ep = _srs_config->get_rtc_server_black_hole_addr();
if (!blackhole_ep.empty()) {
string host; int port;
srs_parse_hostport(blackhole_ep, host, port);
srs_freep(blackhole_addr);
blackhole_addr = new sockaddr_in();
blackhole_addr->sin_family = AF_INET;
blackhole_addr->sin_addr.s_addr = inet_addr(host.c_str());
blackhole_addr->sin_port = htons(port);
int fd = socket(AF_INET, SOCK_DGRAM, 0);
blackhole_stfd = srs_netfd_open_socket(fd);
srs_assert(blackhole_stfd);
srs_trace("RTC blackhole %s:%d, fd=%d", host.c_str(), port, fd);
}
}
srs_trace("RTC init session, DTLS(role=%s, version=%s), timeout=%dms",
cfg->dtls_role.c_str(), cfg->dtls_version.c_str(), srsu2msi(sessionStunTimeout));
return err;
}
@ -1736,10 +1701,8 @@ srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r)
}
// Write STUN messages to blackhole.
if (blackhole && blackhole_addr && blackhole_stfd) {
// Ignore any error for black-hole.
void* p = skt->data(); int len = skt->size();
srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(skt->data(), skt->size());
}
if ((err = on_binding_request(r)) != srs_success) {
@ -1768,10 +1731,8 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data)
return srs_error_wrap(err, "rtcp unprotect failed");
}
if (blackhole && blackhole_addr && blackhole_stfd) {
// Ignore any error for black-hole.
void* p = unprotected_buf; int len = nb_unprotected_buf;
srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf);
}
if (player_) {
@ -1866,7 +1827,8 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt)
sendonly_skt = skt->copy_sendonly();
// Update the sessions to handle packets from the new address.
server_->insert_into_id_sessions(sendonly_skt->peer_id().c_str(), this);
peer_id_ = sendonly_skt->peer_id();
server_->insert_into_id_sessions(peer_id_, this);
// Remove the old address.
if (!old_peer_id.empty()) {
@ -1905,10 +1867,8 @@ void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ss
stream.write_2bytes(pid);
stream.write_2bytes(blp);
if (blackhole && blackhole_addr && blackhole_stfd) {
// Ignore any error for black-hole.
void* p = stream.data(); int len = stream.pos();
srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(stream.data(), stream.pos());
}
char protected_buf[kRtpPacketSize];
@ -2059,10 +2019,8 @@ srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc)
srs_trace("RTC PLI ssrc=%u", ssrc);
if (blackhole && blackhole_addr && blackhole_stfd) {
// Ignore any error for black-hole.
void* p = stream.data(); int len = stream.pos();
srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(stream.data(), stream.pos());
}
char protected_buf[kRtpPacketSize];
@ -2194,22 +2152,15 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r)
if (state_ == WAITING_STUN) {
state_ = DOING_DTLS_HANDSHAKE;
peer_id_ = sendonly_skt->peer_id();
server_->insert_into_id_sessions(peer_id_, this);
state_ = DOING_DTLS_HANDSHAKE;
srs_trace("RTC session=%s, STUN done, waitting DTLS handshake.", id().c_str());
srs_trace("RTC session=%s, STUN done, waiting DTLS handshake.", id().c_str());
if((err = transport_->start_active_handshake()) != srs_success) {
return srs_error_wrap(err, "fail to dtls handshake");
}
}
if (blackhole && blackhole_addr && blackhole_stfd) {
// Ignore any error for black-hole.
void* p = stream->data(); int len = stream->pos();
srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(stream->data(), stream->pos());
}
return err;

@ -327,10 +327,6 @@ public:
std::string sequence_startup;
std::string sequence_delta;
std::string sequence_keep;
private:
bool blackhole;
sockaddr_in* blackhole_addr;
srs_netfd_t blackhole_stfd;
private:
// twcc handler
int twcc_id_;
@ -345,15 +341,20 @@ public:
void set_local_sdp(const SrsSdp& sdp);
SrsSdp* get_remote_sdp();
void set_remote_sdp(const SrsSdp& sdp);
// Connection level state machine, for ARQ of UDP packets.
SrsRtcConnectionStateType state();
void set_state(SrsRtcConnectionStateType state);
// TODO: FIXME: Rename it.
std::string id();
// TODO: FIXME: Rename it.
std::string peer_id();
void set_peer_id(std::string v);
// TODO: FIXME: Rename it.
std::string username();
public:
void set_encrypt(bool v);
void switch_to_context();
SrsContextId context_id();
public:
srs_error_t add_publisher(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t add_player(SrsRequest* request, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
// server send offer sdp to client, local sdp derivate from source stream desc.

@ -23,6 +23,8 @@
#include <srs_app_rtc_server.hpp>
using namespace std;
#include <srs_app_config.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
@ -42,11 +44,67 @@
#include <srs_app_rtc_api.hpp>
#include <srs_protocol_utility.hpp>
SrsRtcBlackhole::SrsRtcBlackhole()
{
blackhole = false;
blackhole_addr = NULL;
blackhole_stfd = NULL;
}
SrsRtcBlackhole::~SrsRtcBlackhole()
{
srs_close_stfd(blackhole_stfd);
srs_freep(blackhole_addr);
}
srs_error_t SrsRtcBlackhole::initialize()
{
srs_error_t err = srs_success;
blackhole = _srs_config->get_rtc_server_black_hole();
if (!blackhole) {
return err;
}
string blackhole_ep = _srs_config->get_rtc_server_black_hole_addr();
if (blackhole_ep.empty()) {
blackhole = false;
srs_warn("disable black hole for no endpoint");
return err;
}
string host; int port;
srs_parse_hostport(blackhole_ep, host, port);
srs_freep(blackhole_addr);
blackhole_addr = new sockaddr_in();
blackhole_addr->sin_family = AF_INET;
blackhole_addr->sin_addr.s_addr = inet_addr(host.c_str());
blackhole_addr->sin_port = htons(port);
int fd = socket(AF_INET, SOCK_DGRAM, 0);
blackhole_stfd = srs_netfd_open_socket(fd);
srs_assert(blackhole_stfd);
srs_trace("RTC blackhole %s:%d, fd=%d", host.c_str(), port, fd);
return err;
}
void SrsRtcBlackhole::sendto(void* data, int len)
{
if (!blackhole) {
return;
}
srs_sendto(blackhole_stfd, data, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
SrsRtcBlackhole* _srs_blackhole = new SrsRtcBlackhole();
// @global dtls certficate for rtc module.
SrsDtlsCertificate* _srs_rtc_dtls_certificate = new SrsDtlsCertificate();
using namespace std;
static bool is_stun(const uint8_t* data, const int size)
{
return data != NULL && size > 0 && (data[0] == 0 || data[0] == 1);
@ -176,6 +234,10 @@ srs_error_t SrsRtcServer::initialize()
return srs_error_wrap(err, "start timer");
}
if ((err = _srs_blackhole->initialize()) != srs_success) {
return srs_error_wrap(err, "black hole");
}
srs_trace("RTC server init ok");
return err;

@ -41,6 +41,27 @@ class SrsRequest;
class SrsSdp;
class SrsRtcStream;
// The UDP black hole, for developer to use wireshark to catch plaintext packets.
// For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole,
// we can use wireshark to capture the plaintext.
class SrsRtcBlackhole
{
public:
bool blackhole;
private:
sockaddr_in* blackhole_addr;
srs_netfd_t blackhole_stfd;
public:
SrsRtcBlackhole();
virtual ~SrsRtcBlackhole();
public:
srs_error_t initialize();
void sendto(void* data, int len);
};
extern SrsRtcBlackhole* _srs_blackhole;
// The handler for RTC server to call.
class ISrsRtcServerHandler
{
public:
@ -51,6 +72,7 @@ public:
virtual void on_timeout(SrsRtcConnection* session) = 0;
};
// The RTC server instance, listen UDP port, handle UDP packet, manage RTC connections.
class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass
{
private:
@ -58,7 +80,9 @@ private:
std::vector<SrsUdpMuxListener*> listeners;
ISrsRtcServerHandler* handler;
private:
// TODO: FIXME: Rename it.
std::map<std::string, SrsRtcConnection*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
// TODO: FIXME: Rename it.
std::map<std::string, SrsRtcConnection*> map_id_session; // key: peerip(ip + ":" + port)
// The zombie sessions, we will free them.
std::vector<SrsRtcConnection*> zombies_;

Loading…
Cancel
Save