For RTC publisher, support black-hole

pull/1753/head
winlin 5 years ago
parent e4329fd1a0
commit 7692e589ed

@ -461,6 +461,16 @@ rtc_server {
# then system queue is 2000*4 = 8k, user can incrase reuseport to incrase the queue.
# default: 2000
queue_length 2000;
# The black-hole to copy packet to, for debugging.
# For example, when debugging Chrome publish stream, the received packets are encrypted cipher,
# we can set the publisher black-hole, SRS will copy the plaintext packets to black-hole, and
# we are able to capture the plaintext packets by wireshark.
black_hole {
# Whether enable the black-hole.
enabled off;
# The black-hole address for publisher, or SRS as receiver.
publisher 127.0.0.1:10000;
}
}
vhost rtc.vhost.srs.com {

@ -3645,7 +3645,7 @@ srs_error_t SrsConfig::check_normal_config()
string n = conf->at(i)->name;
if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa"
&& n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus"
&& n != "padding" && n != "perf_stat" && n != "queue_length") {
&& n != "padding" && n != "perf_stat" && n != "queue_length" && n != "black_hole") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str());
}
}
@ -4742,7 +4742,7 @@ std::string SrsConfig::get_rtc_server_candidates()
return DEFAULT;
}
return (conf->arg0().c_str());
return conf->arg0();
}
bool SrsConfig::get_rtc_server_ecdsa()
@ -4943,6 +4943,50 @@ int SrsConfig::get_rtc_server_queue_length()
return ::atoi(conf->arg0().c_str());
}
bool SrsConfig::get_rtc_server_black_hole()
{
static bool DEFAULT = false;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("black_hole");
if (!conf) {
return DEFAULT;
}
conf = conf->get("enabled");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
std::string SrsConfig::get_rtc_server_black_hole_publisher()
{
static string DEFAULT = "";
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("black_hole");
if (!conf) {
return DEFAULT;
}
conf = conf->get("publisher");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return conf->arg0();
}
SrsConfDirective* SrsConfig::get_rtc(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);

@ -534,6 +534,8 @@ public:
virtual int get_rtc_server_padding();
virtual bool get_rtc_server_perf_stat();
virtual int get_rtc_server_queue_length();
virtual bool get_rtc_server_black_hole();
virtual std::string get_rtc_server_black_hole_publisher();
private:
virtual int get_rtc_server_reuseport2();
virtual bool get_rtc_server_gso2();

@ -62,6 +62,7 @@ using namespace std;
#include <srs_app_http_api.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_service_st.hpp>
// The RTP payload max size, reserved some paddings for SRTP as such:
// kRtpPacketSize = kRtpMaxPayloadSize + paddings
@ -290,12 +291,18 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* skt)
}
}
if (out_bio_len) {
if (out_bio_len) {
if ((err = skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) {
return srs_error_wrap(err, "send dtls packet");
}
}
if (rtc_session->blackhole && rtc_session->blackhole_addr && rtc_session->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = out_bio_data; int len = out_bio_len; SrsRtcSession* s = rtc_session;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
return err;
}
@ -314,7 +321,13 @@ srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* skt)
return srs_error_new(ERROR_OpenSslBIOWrite, "BIO_write");
}
if (! handshake_done) {
if (rtc_session->blackhole && rtc_session->blackhole_addr && rtc_session->blackhole_stfd) {
// Ignore any error for black-hole.
void* p = skt->data(); int len = skt->size(); SrsRtcSession* s = rtc_session;
srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
if (!handshake_done) {
err = handshake(skt);
} else {
while (BIO_ctrl_pending(bio_in) > 0) {
@ -360,7 +373,7 @@ srs_error_t SrsDtlsSession::srtp_initialize()
unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server
static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp";
if (! SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) {
if (!SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) {
return srs_error_new(ERROR_RTC_SRTP_INIT, "SSL_export_keying_material failed");
}
@ -1962,7 +1975,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frame()
rtp_audio_queue->get_and_clean_collected_frames(frames);
for (size_t i = 0; i < frames.size(); ++i) {
if (! frames[i].empty()) {
if (!frames[i].empty()) {
srs_verbose("collect %d audio frames, seq range %u,%u",
frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence());
}
@ -1983,7 +1996,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame()
rtp_video_queue->get_and_clean_collected_frames(frames);
for (size_t i = 0; i < frames.size(); ++i) {
if (! frames[i].empty()) {
if (!frames[i].empty()) {
srs_verbose("collect %d video frames, seq range %u,%u",
frames.size(), frames[i].front()->rtp_header.get_sequence(),
frames[i].back()->rtp_header.get_sequence());
@ -2011,13 +2024,13 @@ srs_error_t SrsRtcPublisher::collect_video_frame()
if (rtp_h264_header->nalu_type != kFuA) {
if ((p[0] & kNalTypeMask) == SrsAvcNaluTypeSPS) {
string cur_sps = string((char*)p, rtp_h264_header->nalu_offset[j].second);
if (! cur_sps.empty() && sps != cur_sps) {
if (!cur_sps.empty() && sps != cur_sps) {
video_header_change = true;
sps = cur_sps;
}
} else if ((p[0] & kNalTypeMask) == SrsAvcNaluTypePPS) {
string cur_pps = string((char*)p, rtp_h264_header->nalu_offset[j].second);
if (! cur_pps.empty() && pps != cur_pps) {
if (!cur_pps.empty() && pps != cur_pps) {
video_header_change = true;
pps = cur_pps;
}
@ -2159,6 +2172,10 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string&
session_state = INIT;
last_stun_time = 0;
sessionStunTimeout = 0;
blackhole = false;
blackhole_addr = NULL;
blackhole_stfd = NULL;
}
SrsRtcSession::~SrsRtcSession()
@ -2167,6 +2184,8 @@ SrsRtcSession::~SrsRtcSession()
srs_freep(publisher);
srs_freep(dtls_session);
srs_freep(req);
srs_close_stfd(blackhole_stfd);
srs_freep(blackhole_addr);
}
void SrsRtcSession::set_local_sdp(const SrsSdp& sdp)
@ -2191,6 +2210,30 @@ srs_error_t SrsRtcSession::initialize()
sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req->vhost);
last_stun_time = srs_get_system_time();
blackhole = _srs_config->get_rtc_server_black_hole();
srs_trace("RTC init session, timeout=%dms, blackhole=%d", srsu2msi(sessionStunTimeout), blackhole);
if (blackhole) {
string blackhole_ep = _srs_config->get_rtc_server_black_hole_publisher();
if (!blackhole_ep.empty()) {
string host; int port;
srs_parse_hostport(blackhole_ep, host, port);
srs_freep(blackhole_addr);
blackhole_addr = new sockaddr_in();
blackhole_addr->sin_family = AF_INET;
blackhole_addr->sin_addr.s_addr = inet_addr(host.c_str());
blackhole_addr->sin_port = htons(port);
int fd = socket(AF_INET, SOCK_DGRAM, 0);
blackhole_stfd = srs_netfd_open_socket(fd);
srs_assert(blackhole_stfd);
srs_trace("RTC blackhole %s:%d, fd=%d", host.c_str(), port, fd);
}
}
return err;
}
@ -2274,6 +2317,19 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacke
srs_trace("rtc session=%s, STUN done, waitting DTLS handshake.", id().c_str());
}
// Write STUN messages to blackhole.
if (blackhole && blackhole_addr && blackhole_stfd) {
// Ignore any error for black-hole.
void* p = skt->data(); int len = skt->size();
srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
if (blackhole && blackhole_addr && blackhole_stfd) {
// Ignore any error for black-hole.
void* p = stream->data(); int len = stream->pos();
srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
return err;
}
@ -2334,7 +2390,7 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock
uint16_t mask = 0x01;
for (int i = 1; i < 16 && blp; ++i, mask <<= 1) {
if (! (blp & mask)) {
if (!(blp & mask)) {
continue;
}
@ -2343,7 +2399,7 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock
// TODO: FIXME: Support ARQ.
(void)loss_seq;
SrsRtpSharedPacket* pkt = NULL; // source->find_rtp_packet(loss_seq);
if (! pkt) {
if (!pkt) {
continue;
}
@ -2506,14 +2562,14 @@ srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* skt)
srs_trace("rtc session=%s, to=%dms connection established", id().c_str(), srsu2msi(sessionStunTimeout));
if (! local_sdp.media_descs_.empty() &&
if (!local_sdp.media_descs_.empty() &&
(local_sdp.media_descs_.back().recvonly_ || local_sdp.media_descs_.back().sendrecv_)) {
if ((err = start_publish(skt)) != srs_success) {
return srs_error_wrap(err, "start publish");
}
}
if (! local_sdp.media_descs_.empty() &&
if (!local_sdp.media_descs_.empty() &&
(local_sdp.media_descs_.back().sendonly_ || local_sdp.media_descs_.back().sendrecv_)) {
if ((err = start_play(skt)) != srs_success) {
return srs_error_wrap(err, "start play");
@ -2568,11 +2624,11 @@ srs_error_t SrsRtcSession::start_publish(SrsUdpMuxSocket* skt)
for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) {
const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i];
if (media_desc.is_audio()) {
if (! media_desc.ssrc_infos_.empty()) {
if (!media_desc.ssrc_infos_.empty()) {
audio_ssrc = media_desc.ssrc_infos_[0].ssrc_;
}
} else if (media_desc.is_video()) {
if (! media_desc.ssrc_infos_.empty()) {
if (!media_desc.ssrc_infos_.empty()) {
video_ssrc = media_desc.ssrc_infos_[0].ssrc_;
}
}
@ -2610,6 +2666,12 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt)
return srs_error_wrap(err, "rtcp unprotect failed");
}
if (blackhole && blackhole_addr && blackhole_stfd) {
// Ignore any error for black-hole.
void* p = unprotected_buf; int len = nb_unprotected_buf;
srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
char* ph = unprotected_buf;
int nb_left = nb_unprotected_buf;
while (nb_left) {
@ -2683,12 +2745,18 @@ srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* skt)
return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done");
}
char* unprotected_buf = new char[1460];
char* unprotected_buf = new char[kRtpPacketSize];
int nb_unprotected_buf = skt->size();
if ((err = dtls_session->unprotect_rtp(unprotected_buf, skt->data(), nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtp unprotect failed");
}
if (blackhole && blackhole_addr && blackhole_stfd) {
// Ignore any error for black-hole.
void* p = unprotected_buf; int len = nb_unprotected_buf;
srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
}
return publisher->on_rtp(skt, unprotected_buf, nb_unprotected_buf);
}
@ -3110,7 +3178,7 @@ srs_error_t SrsRtcServer::create_rtc_session(
local_ufrag = gen_random_str(8);
username = local_ufrag + ":" + remote_sdp.get_ice_ufrag();
if (! map_username_session.count(username))
if (!map_username_session.count(username))
break;
}

@ -305,6 +305,7 @@ public:
class SrsRtcSession
{
friend class SrsDtlsSession;
friend class SrsRtcSenderThread;
friend class SrsRtcPublisher;
private:
@ -327,6 +328,10 @@ private:
bool encrypt;
// The timeout of session, keep alive by STUN ping pong.
srs_utime_t sessionStunTimeout;
private:
bool blackhole;
sockaddr_in* blackhole_addr;
srs_netfd_t blackhole_stfd;
public:
SrsRequest* req;
SrsSource* source;

Loading…
Cancel
Save