pull/1753/head
xiaozhihong 5 years ago
commit e0cd148ad6

@ -157,6 +157,7 @@ For previous versions, please read:
## V4 changes
* v4.0, 2020-04-14, For [#307][bug #307], support sendmmsg, GSO and reuseport. 4.0.23
* v4.0, 2020-04-05, For [#307][bug #307], SRTP ASM only works with openssl-1.0, auto detect it. 4.0.22
* v4.0, 2020-04-04, Merge RTC and GB28181, with bugs fixed. 4.0.21
* v4.0, 2020-04-04, For [#307][bug #307], refine RTC latency from 600ms to 200ms. 4.0.20

@ -171,6 +171,12 @@ else
srs_undefine_macro "SRS_AUTO_HAS_SENDMMSG" $SRS_AUTO_HEADERS_H
fi
if [ $SRS_DEBUG = YES ]; then
srs_define_macro "SRS_AUTO_DEBUG" $SRS_AUTO_HEADERS_H
else
srs_undefine_macro "SRS_AUTO_DEBUG" $SRS_AUTO_HEADERS_H
fi
# prefix
echo "" >> $SRS_AUTO_HEADERS_H
echo "#define SRS_AUTO_PREFIX \"${SRS_PREFIX}\"" >> $SRS_AUTO_HEADERS_H

@ -110,7 +110,11 @@ function Ubuntu_prepare()
echo "The valgrind-dev is installed."
fi
fi
pkg-config --version >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then
echo "Please install pkg-config"; exit -1;
fi
echo "Tools for Ubuntu are installed."
return 0
}
@ -191,6 +195,10 @@ function Centos_prepare()
echo "The valgrind-devel is installed."
fi
fi
pkg-config --version --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then
echo "Please install pkg-config"; exit -1;
fi
echo "Tools for Centos are installed."
return 0

@ -122,6 +122,7 @@ SRS_NASM=YES
SRS_SRTP_ASM=YES
SRS_SENDMMSG=YES
SRS_HAS_SENDMMSG=YES
SRS_DEBUG=NO
#####################################################################################
# menu
@ -162,6 +163,7 @@ Features:
--prefix=<path> The absolute installation path for srs. Default: $SRS_PREFIX
--static Whether add '-static' to link options.
--gcov Whether enable the GCOV compiler options.
--debug Whether enable the debug code, may hurt performance.
--jobs[=N] Allow N jobs at once; infinite jobs with no arg.
Used for make in the configure, for example, to make ffmpeg.
--log-verbose Whether enable the log verbose level. default: no.
@ -293,6 +295,7 @@ function parse_user_option() {
--log-info) SRS_LOG_INFO=YES ;;
--log-trace) SRS_LOG_TRACE=YES ;;
--gcov) SRS_GCOV=YES ;;
--debug) SRS_DEBUG=YES ;;
--arm) SRS_CROSS_BUILD=YES ;;
--mips) SRS_CROSS_BUILD=YES ;;
@ -623,6 +626,7 @@ function regenerate_options() {
if [ $SRS_LOG_INFO = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --log-info"; fi
if [ $SRS_LOG_TRACE = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --log-trace"; fi
if [ $SRS_GCOV = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --gcov"; fi
if [ $SRS_DEBUG = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --debug"; fi
if [[ $SRS_EXTRA_FLAGS != '' ]]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --extra-flags=\\\"$SRS_EXTRA_FLAGS\\\""; fi
if [[ $SRS_BUILD_TAG != '' ]]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --build-tag=\\\"$SRS_BUILD_TAG\\\""; fi
if [[ $SRS_TOOL_CC != '' ]]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --cc=$SRS_TOOL_CC"; fi

@ -435,6 +435,14 @@ rtc_server {
# and net.core.rmem_default or just increase this to get larger UDP recv and send buffer.
# default: 4
reuseport 4;
# Whether merge multiple NALUs into one.
# @see https://github.com/ossrs/srs/issues/307#issuecomment-612806318
# default: on
merge_nalus on;
# Whether enable GSO to send out RTP packets.
# @remark Linux 4.18+ only, for other OS always disabled.
# default: on
gso on;
}
vhost rtc.vhost.srs.com {

@ -207,6 +207,14 @@ function build_default_hls_url() {
}
function build_default_rtc_url(query) {
// Use target to overwrite server, vhost and eip.
console.log('?target=x.x.x.x to overwrite server, vhost and eip.');
if (query.target) {
query.server = query.vhost = query.eip = query.target;
query.user_query.eip = query.target;
delete query.target;
}
var server = (!query.server)? window.location.hostname:query.server;
var vhost = (!query.vhost)? window.location.hostname:query.vhost;
var app = (!query.app)? "live":query.app;

@ -0,0 +1,90 @@
#!/usr/bin/python
'''
The MIT License (MIT)
Copyright (c) 2013-2016 SRS(ossrs)
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''
import urllib, sys, json
url = "http://localhost:1985/api/v1/perf"
if len(sys.argv) < 2:
print "Usage: %s <url>"%(sys.argv[0])
print "For example:"
print " %s http://localhost:1985/api/v1/perf"%(sys.argv[0])
sys.exit(-1)
url = sys.argv[1]
print "Open %s"%(url)
f = urllib.urlopen(url)
s = f.read()
f.close()
print "Repsonse %s"%(s)
obj = json.loads(s)
# 2, 3, 5, 9, 16, 32, 64, 128, 256
keys = ['lt_2', 'lt_3', 'lt_5', 'lt_9', 'lt_16', 'lt_32', 'lt_64', 'lt_128', 'lt_256', 'gt_256']
print ""
print("AV---Frames"),
p = obj['data']['avframes']
for k in keys:
k2 = '%s'%(k)
if k2 in p:
print(p[k2]),
else:
print(0),
print(p['nn']),
print ""
print("RTC--Frames"),
p = obj['data']['rtc']
for k in keys:
k2 = '%s'%(k)
if k2 in p:
print(p[k2]),
else:
print(0),
print(p['nn']),
print ""
print("RTP-Packets"),
p = obj['data']['rtp']
for k in keys:
k2 = '%s'%(k)
if k2 in p:
print(p[k2]),
else:
print(0),
print(p['nn']),
print ""
print("GSO-Packets"),
p = obj['data']['gso']
for k in keys:
k2 = '%s'%(k)
if k2 in p:
print(p[k2]),
else:
print(0),
print(p['nn']),

@ -33,6 +33,10 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#ifdef __linux__
#include <linux/version.h>
#include <sys/utsname.h>
#endif
#include <vector>
#include <algorithm>
@ -3614,7 +3618,7 @@ srs_error_t SrsConfig::check_normal_config()
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
string n = conf->at(i)->name;
if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa"
&& n != "sendmmsg" && n != "encrypt" && n != "reuseport") {
&& n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str());
}
}
@ -4747,12 +4751,20 @@ int SrsConfig::get_rtc_server_sendmmsg()
int SrsConfig::get_rtc_server_reuseport()
{
#if defined(SO_REUSEPORT)
static int DEFAULT = 4;
#else
static int DEFAULT = 1;
int v = get_rtc_server_reuseport2();
#if !defined(SO_REUSEPORT)
srs_warn("REUSEPORT not supported, reset %d to %d", reuseport, DEFAULT);
v = 1
#endif
return v;
}
int SrsConfig::get_rtc_server_reuseport2()
{
static int DEFAULT = 4;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
@ -4763,13 +4775,69 @@ int SrsConfig::get_rtc_server_reuseport()
return DEFAULT;
}
int reuseport = ::atoi(conf->arg0().c_str());
#if !defined(SO_REUSEPORT)
srs_warn("REUSEPORT not supported, reset %d to %d", reuseport, DEFAULT);
reuseport = DEFAULT
return ::atoi(conf->arg0().c_str());
}
bool SrsConfig::get_rtc_server_merge_nalus()
{
static int DEFAULT = true;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("merge_nalus");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
bool SrsConfig::get_rtc_server_gso()
{
bool v = get_rtc_server_gso2();
bool gso_disabled = false;
#if !defined(__linux__)
gso_disabled = true;
if (v) {
srs_warn("GSO is disabled, for Linux 4.18+ only");
}
#elif LINUX_VERSION_CODE < KERNEL_VERSION(4,18,0)
if (v) {
utsname un = {0};
int r0 = uname(&un);
if (r0 || strcmp(un.release, "4.18.0") < 0) {
gso_disabled = true;
srs_warn("GSO is disabled, for Linux 4.18+ only, r0=%d, kernel=%s", r0, un.release);
}
}
#endif
return reuseport;
if (v && gso_disabled) {
v = false;
}
return v;
}
bool SrsConfig::get_rtc_server_gso2()
{
static int DEFAULT = true;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("gso");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
SrsConfDirective* SrsConfig::get_rtc(string vhost)

@ -528,7 +528,14 @@ public:
virtual int get_rtc_server_sendmmsg();
virtual bool get_rtc_server_encrypt();
virtual int get_rtc_server_reuseport();
private:
virtual int get_rtc_server_reuseport2();
public:
virtual bool get_rtc_server_merge_nalus();
virtual bool get_rtc_server_gso();
private:
virtual bool get_rtc_server_gso2();
public:
SrsConfDirective* get_rtc(std::string vhost);
bool get_rtc_enabled(std::string vhost);
bool get_rtc_bframe_discard(std::string vhost);

@ -1622,13 +1622,40 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage*
data->set("query", p);
p->set("target", SrsJsonAny::str(target.c_str()));
p->set("help", SrsJsonAny::str("?target=writev|sendmmsg"));
p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|gso|writev_iovs|sendmmsg"));
}
if (target.empty() || target == "writev") {
if (target.empty() || target == "avframes") {
SrsJsonObject* p = SrsJsonAny::object();
data->set("writev", p);
if ((err = stat->dumps_perf_writev(p)) != srs_success) {
data->set("avframes", p);
if ((err = stat->dumps_perf_msgs(p)) != srs_success) {
int code = srs_error_code(err); srs_error_reset(err);
return srs_api_response_code(w, r, code);
}
}
if (target.empty() || target == "rtc") {
SrsJsonObject* p = SrsJsonAny::object();
data->set("rtc", p);
if ((err = stat->dumps_perf_rtc_packets(p)) != srs_success) {
int code = srs_error_code(err); srs_error_reset(err);
return srs_api_response_code(w, r, code);
}
}
if (target.empty() || target == "rtp") {
SrsJsonObject* p = SrsJsonAny::object();
data->set("rtp", p);
if ((err = stat->dumps_perf_rtp_packets(p)) != srs_success) {
int code = srs_error_code(err); srs_error_reset(err);
return srs_api_response_code(w, r, code);
}
}
if (target.empty() || target == "gso") {
SrsJsonObject* p = SrsJsonAny::object();
data->set("gso", p);
if ((err = stat->dumps_perf_gso(p)) != srs_success) {
int code = srs_error_code(err); srs_error_reset(err);
return srs_api_response_code(w, r, code);
}
@ -1643,6 +1670,15 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage*
}
}
if (target.empty() || target == "writev_iovs") {
SrsJsonObject* p = SrsJsonAny::object();
data->set("writev_iovs", p);
if ((err = stat->dumps_perf_writev_iovs(p)) != srs_success) {
int code = srs_error_code(err); srs_error_reset(err);
return srs_api_response_code(w, r, code);
}
}
return srs_api_response(w, r, obj->dumps());
}

@ -680,6 +680,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
err = streaming_send_messages(enc, msgs.msgs, count);
}
// TODO: FIXME: Update the stat.
// free the messages.
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];

@ -499,7 +499,7 @@ srs_error_t SrsUdpMuxListener::cycle()
pps_unit = "(k)"; pps_last /= 10000; pps_average /= 10000;
}
srs_trace("<- RTC #%d RECV %" PRId64 ", pps %d/%d%s, schedule %" PRId64,
srs_trace("<- RTC RECV #%d, udp %" PRId64 ", pps %d/%d%s, schedule %" PRId64,
srs_netfd_fileno(lfd), nn_msgs_stage, pps_average, pps_last, pps_unit.c_str(), nn_loop);
nn_msgs_last = nn_msgs; time_last = srs_get_system_time();
nn_loop = 0; nn_msgs_stage = 0;

@ -33,6 +33,11 @@ using namespace std;
#include <fcntl.h>
#include <unistd.h>
#include <netinet/udp.h>
#ifndef UDP_SEGMENT
#define UDP_SEGMENT 103
#endif
#include <sstream>
#include <srs_core_autofree.hpp>
@ -386,7 +391,7 @@ srs_error_t SrsDtlsSession::protect_rtp(char* out_buf, const char* in_buf, int&
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect failed");
}
srs_error_t SrsDtlsSession::protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* pkt)
srs_error_t SrsDtlsSession::protect_rtp2(void* rtp_hdr, int* len_ptr)
{
srs_error_t err = srs_success;
@ -394,14 +399,7 @@ srs_error_t SrsDtlsSession::protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2*
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect");
}
SrsBuffer stream(buf, *pnn_buf);
if ((err = pkt->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
*pnn_buf = stream.pos();
if (srtp_protect(srtp_send, buf, pnn_buf) != 0) {
if (srtp_protect(srtp_send, rtp_hdr, len_ptr) != 0) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect");
}
@ -456,6 +454,26 @@ srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, in
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed");
}
SrsRtcPackets::SrsRtcPackets(bool gso, bool merge_nalus)
{
use_gso = gso;
should_merge_nalus = merge_nalus;
nn_rtp_pkts = 0;
nn_audios = nn_extras = 0;
nn_videos = nn_samples = 0;
}
SrsRtcPackets::~SrsRtcPackets()
{
vector<SrsRtpPacket2*>::iterator it;
for (it = packets.begin(); it != packets.end(); ++it ) {
SrsRtpPacket2* packet = *it;
srs_freep(packet);
}
packets.clear();
}
SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid)
: sendonly_ukt(NULL)
{
@ -464,15 +482,21 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int
rtc_session = s;
sendonly_ukt = u->copy_sendonly();
gso = false;
merge_nalus = false;
audio_timestamp = 0;
audio_sequence = 0;
video_sequence = 0;
_srs_config->subscribe(this);
}
SrsRtcSenderThread::~SrsRtcSenderThread()
{
_srs_config->unsubscribe(this);
srs_freep(trd);
srs_freep(sendonly_ukt);
}
@ -487,9 +511,35 @@ srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t
video_payload_type = v_pt;
audio_payload_type = a_pt;
gso = _srs_config->get_rtc_server_gso();
merge_nalus = _srs_config->get_rtc_server_merge_nalus();
srs_trace("RTC sender video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d)",
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus);
return err;
}
srs_error_t SrsRtcSenderThread::on_reload_rtc_server()
{
if (true) {
bool v = _srs_config->get_rtc_server_gso();
if (gso != v) {
srs_trace("Reload gso %d=>%d", gso, v);
gso = v;
}
}
if (true) {
bool v = _srs_config->get_rtc_server_merge_nalus();
if (merge_nalus != v) {
srs_trace("Reload merge_nalus %d=>%d", merge_nalus, v);
merge_nalus = v;
}
}
return srs_success;
}
int SrsRtcSenderThread::cid()
{
return trd->cid();
@ -560,6 +610,7 @@ srs_error_t SrsRtcSenderThread::cycle()
SrsAutoFree(SrsPithyPrint, pprint);
srs_trace("rtc session=%s, start play", rtc_session->id().c_str());
SrsStatistic* stat = SrsStatistic::instance();
while (true) {
if ((err = trd->pull()) != srs_success) {
@ -569,7 +620,7 @@ srs_error_t SrsRtcSenderThread::cycle()
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (realtime) {
// for realtime, min required msgs is 0, send when got one+ msgs.
consumer->wait(0, mw_sleep);
consumer->wait(SRS_PERF_MW_MIN_MSGS_FOR_RTC_REALTIME, mw_sleep);
} else {
// for no-realtime, got some msgs then send.
consumer->wait(SRS_PERF_MW_MIN_MSGS_FOR_RTC, mw_sleep);
@ -589,9 +640,8 @@ srs_error_t SrsRtcSenderThread::cycle()
continue;
}
int nn = 0;
int nn_rtp_pkts = 0;
if ((err = send_messages(source, msgs.msgs, msg_count, sendonly_ukt, &nn, &nn_rtp_pkts)) != srs_success) {
SrsRtcPackets pkts(gso, merge_nalus);
if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, pkts)) != srs_success) {
srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err);
}
@ -600,17 +650,33 @@ srs_error_t SrsRtcSenderThread::cycle()
srs_freep(msg);
}
// Stat the original RAW AV frame, maybe h264+aac.
stat->perf_on_msgs(msg_count);
// Stat the RTC packets, RAW AV frame, maybe h.264+opus.
int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos;
stat->perf_on_rtc_packets(nn_rtc_packets);
// Stat the RAW RTP packets, which maybe group by GSO.
stat->perf_on_rtp_packets(pkts.packets.size());
// Stat the RTP packets going into kernel.
stat->perf_on_gso_packets(pkts.nn_rtp_pkts);
#if defined(SRS_DEBUG)
srs_trace("RTC PLAY packets, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d bytes",
msg_count, nn_rtc_packets, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos,
pkts.nn_samples, pkts.nn_bytes);
#endif
pprint->elapse();
if (pprint->can_print()) {
// TODO: FIXME: Print stat like frame/s, packet/s, loss_packets.
srs_trace("-> RTC PLAY %d msgs, %d packets, %d bytes", msg_count, nn_rtp_pkts, nn);
srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d bytes",
msg_count, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos,
pkts.nn_samples, pkts.nn_bytes);
}
}
}
srs_error_t SrsRtcSenderThread::send_messages(
SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs,
SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts
SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets
) {
srs_error_t err = srs_success;
@ -619,34 +685,81 @@ srs_error_t SrsRtcSenderThread::send_messages(
}
// Covert kernel messages to RTP packets.
vector<SrsRtpPacket2*> packets;
if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) {
return srs_error_wrap(err, "messages to packets");
}
#ifndef SRS_AUTO_OSX
// If enabled GSO, send out some packets in a msghdr.
if (packets.use_gso) {
if ((err = send_packets_gso(skt, packets)) != srs_success) {
return srs_error_wrap(err, "gso send");
}
return err;
}
#endif
// By default, we send packets by sendmmsg.
if ((err = send_packets(skt, packets)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
return err;
}
srs_error_t SrsRtcSenderThread::messages_to_packets(
SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets
) {
srs_error_t err = srs_success;
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
*pnn += msg->size;
// Update stats.
packets.nn_bytes += msg->size;
int nn_extra_payloads = msg->nn_extra_payloads();
packets.nn_extras += nn_extra_payloads;
int nn_samples = msg->nn_samples();
packets.nn_samples += nn_samples;
// For audio, we transcoded AAC to opus in extra payloads.
SrsRtpPacket2* packet = NULL;
if (msg->is_audio()) {
for (int i = 0; i < msg->nn_extra_payloads(); i++) {
packets.nn_audios++;
for (int i = 0; i < nn_extra_payloads; i++) {
SrsSample* sample = msg->extra_payloads() + i;
if ((err = packet_opus(sample, &packet)) != srs_success) {
return srs_error_wrap(err, "opus package");
}
packets.push_back(packet);
packets.packets.push_back(packet);
}
continue;
}
// For video, we should process all NALUs in samples.
packets.nn_videos++;
// Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A.
if (msg->has_idr()) {
if ((err = packet_stap_a(source, msg, &packet)) != srs_success) {
return srs_error_wrap(err, "packet stap-a");
}
packets.push_back(packet);
packets.packets.push_back(packet);
}
// If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet.
if (packets.should_merge_nalus && nn_samples > 1) {
if ((err = packet_nalus(msg, packets)) != srs_success) {
return srs_error_wrap(err, "packet stap-a");
}
continue;
}
for (int i = 0; i < msg->nn_samples(); i++) {
// By default, we package each NALU(sample) to a RTP or FUA packet.
for (int i = 0; i < nn_samples; i++) {
SrsSample* sample = msg->samples() + i;
// We always ignore bframe here, if config to discard bframe,
@ -660,76 +773,339 @@ srs_error_t SrsRtcSenderThread::send_messages(
if ((err = packet_single_nalu(msg, sample, &packet)) != srs_success) {
return srs_error_wrap(err, "packet single nalu");
}
if (i == msg->nn_samples() - 1) {
packet->rtp_header.set_marker(true);
}
packets.push_back(packet);
packets.packets.push_back(packet);
} else {
if ((err = packet_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) {
return srs_error_wrap(err, "packet fu-a");
}
}
if (i == msg->nn_samples() - 1) {
packets.back()->rtp_header.set_marker(true);
}
if (i == nn_samples - 1) {
packets.packets.back()->rtp_header.set_marker(true);
}
}
}
*pnn_rtp_pkts += (int)packets.size();
return err;
}
for (int j = 0; j < (int)packets.size(); j++) {
SrsRtpPacket2* packet = packets[j];
if ((err = send_packet(packet, skt)) != srs_success) {
srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err);
srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
ISrsUdpSender* sender = skt->sender();
vector<SrsRtpPacket2*>::iterator it;
for (it = packets.packets.begin(); it != packets.packets.end(); ++it) {
SrsRtpPacket2* packet = *it;
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
mmsghdr* mhdr = NULL;
if ((err = sender->fetch(&mhdr)) != srs_success) {
return srs_error_wrap(err, "fetch msghdr");
}
char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base;
int length = kRtpPacketSize;
// Marshal packet to bytes.
if (true) {
SrsBuffer stream(buf, length);
if ((err = packet->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
length = stream.pos();
}
// Whether encrypt the RTP bytes.
if (rtc_session->encrypt) {
if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
}
sockaddr_in* addr = (sockaddr_in*)skt->peer_addr();
socklen_t addrlen = (socklen_t)skt->peer_addrlen();
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
mhdr->msg_hdr.msg_iov->iov_len = length;
mhdr->msg_hdr.msg_controllen = 0;
mhdr->msg_len = 0;
// When we send out a packet, we commit a RTP packet.
packets.nn_rtp_pkts++;
if ((err = sender->sendmmsg(mhdr)) != srs_success) {
return srs_error_wrap(err, "send msghdr");
}
srs_freep(packet);
}
return err;
}
srs_error_t SrsRtcSenderThread::send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt)
// TODO: FIXME: We can gather and pad audios, because they have similar size.
srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
// Previous handler, if has the same size, we can use GSO.
mmsghdr* gso_mhdr = NULL; int gso_size = 0; int gso_encrypt = 0; int gso_cursor = 0;
// GSO, N packets has same length, the final one may not.
bool use_gso = false; bool gso_final = false;
ISrsUdpSender* sender = skt->sender();
int nn_packets = (int)packets.packets.size();
for (int i = 0; i < nn_packets; i++) {
SrsRtpPacket2* packet = packets.packets[i];
// The handler to send message.
mmsghdr* mhdr = NULL;
// Check whether we can use GSO to send it.
int nn_packet = packet->nb_bytes();
if ((gso_size && gso_size == nn_packet) || (use_gso && !gso_final)) {
use_gso = true;
gso_final = (gso_size && gso_size != nn_packet);
mhdr = gso_mhdr;
// We need to increase the iov and cursor.
int nb_iovs = mhdr->msg_hdr.msg_iovlen;
if (gso_cursor >= nb_iovs - 1) {
int nn_new_iovs = nb_iovs;
mhdr->msg_hdr.msg_iovlen = nb_iovs + nn_new_iovs;
mhdr->msg_hdr.msg_iov = (iovec*)realloc(mhdr->msg_hdr.msg_iov, sizeof(iovec) * (nb_iovs + nn_new_iovs));
memset(mhdr->msg_hdr.msg_iov + nb_iovs, 0, sizeof(iovec) * nn_new_iovs);
}
gso_cursor++;
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
mmsghdr* mhdr = NULL;
if ((err = sender->fetch(&mhdr)) != srs_success) {
return srs_error_wrap(err, "fetch msghdr");
}
char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base;
// Create payload cache for RTP packet.
iovec* p = mhdr->msg_hdr.msg_iov + gso_cursor;
if (!p->iov_base) {
p->iov_base = new char[kRtpPacketSize];
p->iov_len = kRtpPacketSize;
}
}
// Change the state according to the next packet.
if (i < nn_packets - 1) {
SrsRtpPacket2* next_packet = (i < nn_packets - 1)? packets.packets[i + 1]:NULL;
int nn_next_packet = next_packet? next_packet->nb_bytes() : 0;
// Length of iov, default size.
int length = kRtpPacketSize;
// If GSO, but next is bigger than this one, we must enter the final state.
if (use_gso && !gso_final) {
gso_final = (nn_packet < nn_next_packet);
}
if (rtc_session->encrypt) {
if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length, pkt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
// If not GSO, maybe the first fresh packet, we should see whether the next packet is smaller than this one,
// if smaller, we can still enter GSO.
if (!use_gso) {
use_gso = (nn_packet >= nn_next_packet);
}
}
} else {
SrsBuffer stream(buf, length);
if ((err = pkt->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
// Now, we fetch the msg from cache.
if (!mhdr) {
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
if ((err = sender->fetch(&mhdr)) != srs_success) {
return srs_error_wrap(err, "fetch msghdr");
}
// Reset the iovec, we should never change the msg_iovlen.
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* p = mhdr->msg_hdr.msg_iov + j;
p->iov_len = 0;
}
// Now, GSO will use this message and size.
if (use_gso) {
gso_mhdr = mhdr;
gso_size = nn_packet;
}
}
// Marshal packet to bytes.
iovec* iov = mhdr->msg_hdr.msg_iov + gso_cursor;
iov->iov_len = kRtpPacketSize;
if (true) {
SrsBuffer stream((char*)iov->iov_base, iov->iov_len);
if ((err = packet->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
iov->iov_len = stream.pos();
}
// Whether encrypt the RTP bytes.
if (rtc_session->encrypt) {
int nn_encrypt = (int)iov->iov_len;
if ((err = rtc_session->dtls_session->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
}
// If GSO, they must has same size, except the final one.
if (use_gso && !gso_final && gso_encrypt && gso_encrypt != (int)iov->iov_len) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "GSO size=%d/%d, encrypt=%d/%d", gso_size, nn_packet, gso_encrypt, iov->iov_len);
}
if (use_gso && !gso_final) {
gso_encrypt = iov->iov_len;
}
// If exceed the max GSO size, set to final.
if (use_gso && gso_cursor > 64) {
gso_final = true;
}
// For last message, or final gso, or determined not using GSO, send it now.
bool do_send = (i == nn_packets - 1 || gso_final || !use_gso);
#if defined(SRS_DEBUG)
bool is_video = packet->rtp_header.get_payload_type() == video_payload_type;
srs_trace("Packet %s SSRC=%d, SN=%d, %d bytes", is_video? "Video":"Audio", packet->rtp_header.get_ssrc(),
packet->rtp_header.get_sequence(), nn_packet);
if (do_send) {
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* iov = mhdr->msg_hdr.msg_iov + j;
if (iov->iov_len <= 0) {
break;
}
srs_trace("%s #%d/%d/%d, %d bytes, size %d/%d", (use_gso? "GSO":"RAW"), j, gso_cursor + 1,
mhdr->msg_hdr.msg_iovlen, iov->iov_len, gso_size, gso_encrypt);
}
}
#endif
if (do_send) {
sockaddr_in* addr = (sockaddr_in*)skt->peer_addr();
socklen_t addrlen = (socklen_t)skt->peer_addrlen();
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
mhdr->msg_hdr.msg_controllen = 0;
mhdr->msg_len = 0;
#ifndef SRS_AUTO_OSX
if (use_gso) {
mhdr->msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
if (!mhdr->msg_hdr.msg_control) {
mhdr->msg_hdr.msg_control = new char[mhdr->msg_hdr.msg_controllen];
}
cmsghdr* cm = CMSG_FIRSTHDR(&mhdr->msg_hdr);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t*)CMSG_DATA(cm)) = gso_encrypt;
// Private message, use it to store the cursor.
mhdr->msg_len = gso_cursor + 1;
}
#endif
// When we send out a packet, we commit a RTP packet.
packets.nn_rtp_pkts++;
if ((err = sender->sendmmsg(mhdr)) != srs_success) {
return srs_error_wrap(err, "send msghdr");
}
// Reset the GSO flag.
gso_mhdr = NULL; gso_size = 0; gso_encrypt = 0; gso_cursor = 0;
use_gso = gso_final = false;
}
length = stream.pos();
}
sockaddr_in* addr = (sockaddr_in*)skt->peer_addr();
socklen_t addrlen = (socklen_t)skt->peer_addrlen();
#if defined(SRS_DEBUG)
srs_trace("RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d", packets.packets.size(),
packets.nn_rtp_pkts, packets.nn_videos, packets.nn_samples, packets.nn_audios, packets.nn_extras);
#endif
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
mhdr->msg_hdr.msg_iov->iov_len = length;
mhdr->msg_len = 0;
return err;
}
if ((err = sender->sendmmsg(mhdr)) != srs_success) {
return srs_error_wrap(err, "send msghdr");
srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
SrsRtpRawNALUs* raw = new SrsRtpRawNALUs();
for (int i = 0; i < msg->nn_samples(); i++) {
SrsSample* sample = msg->samples() + i;
// We always ignore bframe here, if config to discard bframe,
// the bframe flag will not be set.
if (sample->bframe) {
continue;
}
raw->push_back(sample->copy());
}
// Ignore empty.
int nn_bytes = raw->nb_bytes();
if (nn_bytes <= 0) {
srs_freep(raw);
return err;
}
const int kRtpMaxPayloadSize = 1200;
if (nn_bytes < kRtpMaxPayloadSize) {
// Package NALUs in a single RTP packet.
SrsRtpPacket2* packet = new SrsRtpPacket2();
packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++);
packet->rtp_header.set_ssrc(video_ssrc);
packet->rtp_header.set_payload_type(video_payload_type);
packet->payload = raw;
packets.packets.push_back(packet);
} else {
SrsAutoFree(SrsRtpRawNALUs, raw);
// Package NALUs in FU-A RTP packets.
int fu_payload_size = kRtpMaxPayloadSize;
// The first byte is store in FU-A header.
uint8_t header = raw->skip_first_byte();
uint8_t nal_type = header & kNalTypeMask;
int nb_left = nn_bytes - 1;
int num_of_packet = 1 + (nn_bytes - 1) / fu_payload_size;
for (int i = 0; i < num_of_packet; ++i) {
int packet_size = srs_min(nb_left, fu_payload_size);
SrsRtpPacket2* packet = new SrsRtpPacket2();
packets.packets.push_back(packet);
packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++);
packet->rtp_header.set_ssrc(video_ssrc);
packet->rtp_header.set_payload_type(video_payload_type);
SrsRtpFUAPayload* fua = new SrsRtpFUAPayload();
packet->payload = fua;
fua->nri = (SrsAvcNaluType)header;
fua->nalu_type = (SrsAvcNaluType)nal_type;
fua->start = bool(i == 0);
fua->end = bool(i == num_of_packet - 1);
if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) {
return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes);
}
nb_left -= packet_size;
}
}
if (!packets.packets.empty()) {
packets.packets.back()->rtp_header.set_marker(true);
}
return err;
}
@ -757,7 +1133,7 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** p
return err;
}
srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector<SrsRtpPacket2*>& packets)
srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
@ -771,7 +1147,7 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample*
int packet_size = srs_min(nb_left, fu_payload_size);
SrsRtpPacket2* packet = new SrsRtpPacket2();
packets.push_back(packet);
packets.packets.push_back(packet);
packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++);
@ -786,10 +1162,10 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample*
fua->start = bool(i == 0);
fua->end = bool(i == num_of_packet - 1);
SrsSample* sample = new SrsSample();
sample->bytes = p;
sample->size = packet_size;
fua->nalus.push_back(sample);
SrsSample* fragment_sample = new SrsSample();
fragment_sample->bytes = p;
fragment_sample->size = packet_size;
fua->nalus.push_back(fragment_sample);
p += packet_size;
nb_left -= packet_size;
@ -809,11 +1185,13 @@ srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, Srs
packet->rtp_header.set_ssrc(video_ssrc);
packet->rtp_header.set_payload_type(video_payload_type);
SrsRtpRawPayload* raw = new SrsRtpRawPayload();
SrsRtpRawNALUs* raw = new SrsRtpRawNALUs();
packet->payload = raw;
raw->payload = sample->bytes;
raw->nn_payload = sample->size;
SrsSample* p = new SrsSample();
p->bytes = sample->bytes;
p->size = sample->size;
raw->push_back(p);
*ppacket = packet;
@ -1372,7 +1750,8 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd)
}
max_sendmmsg = _srs_config->get_rtc_server_sendmmsg();
srs_trace("UDP sender #%d init ok, max_sendmmsg=%d", srs_netfd_fileno(fd), max_sendmmsg);
bool gso = _srs_config->get_rtc_server_gso();
srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d", srs_netfd_fileno(fd), max_sendmmsg, gso);
return err;
}
@ -1382,6 +1761,11 @@ void SrsUdpMuxSender::free_mhdrs(std::vector<mmsghdr>& mhdrs)
for (int i = 0; i < (int)mhdrs.size(); i++) {
mmsghdr* hdr = &mhdrs[i];
// Free control for GSO.
char* msg_control = (char*)hdr->msg_hdr.msg_control;
srs_freep(msg_control);
// Free iovec.
for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) {
iovec* iov = hdr->msg_hdr.msg_iov + j;
char* data = (char*)iov->iov_base;
@ -1389,6 +1773,7 @@ void SrsUdpMuxSender::free_mhdrs(std::vector<mmsghdr>& mhdrs)
srs_freep(iov);
}
}
mhdrs.clear();
}
srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr)
@ -1425,11 +1810,9 @@ srs_error_t SrsUdpMuxSender::cycle()
{
srs_error_t err = srs_success;
uint64_t nn_msgs = 0;
uint64_t nn_msgs_last = 0;
int nn_msgs_max = 0;
int nn_loop = 0;
int nn_wait = 0;
uint64_t nn_msgs = 0; uint64_t nn_msgs_last = 0; int nn_msgs_max = 0;
uint64_t nn_gso_msgs = 0; uint64_t nn_gso_iovs = 0; int nn_gso_msgs_max = 0; int nn_gso_iovs_max = 0;
int nn_loop = 0; int nn_wait = 0;
srs_utime_t time_last = srs_get_system_time();
SrsStatistic* stat = SrsStatistic::instance();
@ -1444,7 +1827,9 @@ srs_error_t SrsUdpMuxSender::cycle()
nn_loop++;
int pos = cache_pos;
if (pos <= 0) {
int gso_pos = 0;
int gso_iovs = 0;
if (pos <= 0 && gso_pos == 0) {
waiting_msgs = true;
nn_wait++;
srs_cond_wait(cond);
@ -1455,22 +1840,45 @@ srs_error_t SrsUdpMuxSender::cycle()
cache.swap(hotspot);
cache_pos = 0;
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
for (; p < end; p += max_sendmmsg) {
int vlen = (int)(end - p);
vlen = srs_min(max_sendmmsg, vlen);
// Collect informations for GSO
if (pos > 0) {
// For shared GSO cache, stat the messages.
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
for (p = &hotspot[0]; p < end; p++) {
if (!p->msg_len) {
continue;
}
// Private message, use it to store the cursor.
int real_iovs = p->msg_len;
p->msg_len = 0;
int r0 = srs_sendmmsg(lfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != vlen) {
srs_warn("sendmsg %d msgs, %d done", vlen, r0);
gso_pos++; nn_gso_msgs++; nn_gso_iovs += real_iovs; gso_iovs += real_iovs;
}
}
stat->perf_mw_on_packets(vlen);
// Send out all messages, may GSO if shared cache.
if (pos > 0) {
// Send out all messages.
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
for (p = &hotspot[0]; p < end; p += max_sendmmsg) {
int vlen = (int)(end - p);
vlen = srs_min(max_sendmmsg, vlen);
int r0 = srs_sendmmsg(lfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != vlen) {
srs_warn("sendmmsg %d msgs, %d done", vlen, r0);
}
stat->perf_sendmmsg_on_packets(vlen);
}
}
// Increase total messages.
nn_msgs += pos;
nn_msgs_max = srs_max(pos, nn_msgs_max);
nn_gso_msgs_max = srs_max(gso_pos, nn_gso_msgs_max);
nn_gso_iovs_max = srs_max(gso_iovs, nn_gso_iovs_max);
pprint->elapse();
if (pprint->can_print()) {
@ -1492,11 +1900,12 @@ srs_error_t SrsUdpMuxSender::cycle()
pps_unit = "(k)"; pps_last /= 1000; pps_average /= 1000;
}
srs_trace("-> RTC #%d SEND %d/%d/%" PRId64 ", pps %d/%d%s, schedule %d/%d, sessions %d, cache %d/%d by sendmmsg %d",
srs_netfd_fileno(lfd), pos, nn_msgs_max, nn_msgs, pps_average, pps_last, pps_unit.c_str(), nn_loop, nn_wait,
(int)server->nn_sessions(), (int)cache.size(), (int)hotspot.size(), max_sendmmsg);
srs_trace("-> RTC SEND #%d, sessions %d, udp %d/%d/%" PRId64 ", gso %d/%d/%" PRId64 ", iovs %d/%d/%" PRId64 ", pps %d/%d%s",
srs_netfd_fileno(lfd), (int)server->nn_sessions(), pos, nn_msgs_max, nn_msgs, gso_pos, nn_gso_msgs_max, nn_gso_msgs, gso_iovs, nn_gso_iovs_max, nn_gso_iovs,
pps_average, pps_last, pps_unit.c_str());
nn_msgs_last = nn_msgs; time_last = srs_get_system_time();
nn_loop = nn_wait = nn_msgs_max = 0;
nn_gso_msgs_max = 0; nn_gso_iovs_max = 0;
}
}
@ -1505,10 +1914,12 @@ srs_error_t SrsUdpMuxSender::cycle()
srs_error_t SrsUdpMuxSender::on_reload_rtc_server()
{
int v = _srs_config->get_rtc_server_sendmmsg();
if (max_sendmmsg != v) {
max_sendmmsg = v;
srs_trace("Reload max_sendmmsg=%d", max_sendmmsg);
if (true) {
int v = _srs_config->get_rtc_server_sendmmsg();
if (max_sendmmsg != v) {
srs_trace("Reload max_sendmmsg %d=>%d", max_sendmmsg, v);
max_sendmmsg = v;
}
}
return srs_success;

@ -105,7 +105,7 @@ public:
srs_error_t on_dtls_application_data(const char* data, const int len);
public:
srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* pkt);
srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr);
srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
@ -117,7 +117,35 @@ private:
srs_error_t srtp_recv_init();
};
class SrsRtcSenderThread : public ISrsCoroutineHandler
// A group of RTP packets.
class SrsRtcPackets
{
public:
bool use_gso;
bool should_merge_nalus;
public:
// The total bytes of RTP packets.
int nn_bytes;
// The RTP packets send out by sendmmsg or sendmsg. Note that if many packets group to
// one msghdr by GSO, it's only one RTP packet, because we only send once.
int nn_rtp_pkts;
// For video, the samples or NALUs.
int nn_samples;
// For audio, the generated extra audio packets.
// For example, when transcoding AAC to opus, may many extra payloads for a audio.
int nn_extras;
// The original audio messages.
int nn_audios;
// The original video messages.
int nn_videos;
public:
std::vector<SrsRtpPacket2*> packets;
public:
SrsRtcPackets(bool gso, bool merge_nalus);
virtual ~SrsRtcPackets();
};
class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
{
protected:
SrsCoroutine* trd;
@ -136,11 +164,16 @@ private:
uint16_t video_sequence;
public:
SrsUdpMuxSocket* sendonly_ukt;
bool merge_nalus;
bool gso;
public:
SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid);
virtual ~SrsRtcSenderThread();
public:
srs_error_t initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt);
// interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_rtc_server();
public:
virtual int cid();
public:
@ -152,12 +185,15 @@ public:
public:
virtual srs_error_t cycle();
private:
srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts);
srs_error_t send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt);
srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets);
srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets);
srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets);
srs_error_t send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets);
private:
srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket);
private:
srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector<SrsRtpPacket2*>& packets);
srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets);
srs_error_t packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets);
srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket);
srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket);
};

@ -732,7 +732,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// @see https://github.com/ossrs/srs/issues/257
if (realtime) {
// for realtime, min required msgs is 0, send when got one+ msgs.
consumer->wait(0, mw_sleep);
consumer->wait(SRS_PERF_MW_MIN_MSGS_REALTIME, mw_sleep);
} else {
// for no-realtime, got some msgs then send.
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);

@ -236,6 +236,8 @@ srs_error_t SrsStatisticClient::dumps(SrsJsonObject* obj)
SrsStatisticCategory::SrsStatisticCategory()
{
nn = 0;
a = 0;
b = 0;
c = 0;
@ -265,8 +267,10 @@ SrsStatistic::SrsStatistic()
perf_iovs = new SrsStatisticCategory();
perf_msgs = new SrsStatisticCategory();
perf_sys = new SrsStatisticCategory();
perf_sendmmsg = new SrsStatisticCategory();
perf_gso = new SrsStatisticCategory();
perf_rtp = new SrsStatisticCategory();
perf_rtc = new SrsStatisticCategory();
}
SrsStatistic::~SrsStatistic()
@ -303,8 +307,10 @@ SrsStatistic::~SrsStatistic()
srs_freep(perf_iovs);
srs_freep(perf_msgs);
srs_freep(perf_sys);
srs_freep(perf_sendmmsg);
srs_freep(perf_gso);
srs_freep(perf_rtp);
srs_freep(perf_rtc);
}
SrsStatistic* SrsStatistic::instance()
@ -585,225 +591,135 @@ srs_error_t SrsStatistic::dumps_clients(SrsJsonArray* arr, int start, int count)
return err;
}
void SrsStatistic::perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs)
{
// For perf msgs, the nb_msgs stat.
// a: =1
// b: <10
// c: <100
// d: <200
// e: <300
// f: <400
// g: <500
// h: <600
// i: <1000
// j: >=1000
if (nb_msgs == 1) {
perf_msgs->a++;
} else if (nb_msgs < 10) {
perf_msgs->b++;
} else if (nb_msgs < 100) {
perf_msgs->c++;
} else if (nb_msgs < 200) {
perf_msgs->d++;
} else if (nb_msgs < 300) {
perf_msgs->e++;
} else if (nb_msgs < 400) {
perf_msgs->f++;
} else if (nb_msgs < 500) {
perf_msgs->g++;
} else if (nb_msgs < 600) {
perf_msgs->h++;
} else if (nb_msgs < 1000) {
perf_msgs->i++;
} else {
perf_msgs->j++;
}
void SrsStatistic::perf_on_msgs(int nb_msgs)
{
perf_on_packets(perf_msgs, nb_msgs);
}
// For perf iovs, the nb_iovs stat.
// a: <=2
// b: <10
// c: <20
// d: <200
// e: <300
// f: <500
// g: <700
// h: <900
// i: <1024
// j: >=1024
if (nb_iovs <= 2) {
perf_iovs->a++;
} else if (nb_iovs < 10) {
perf_iovs->b++;
} else if (nb_iovs < 20) {
perf_iovs->c++;
} else if (nb_iovs < 200) {
perf_iovs->d++;
} else if (nb_iovs < 300) {
perf_iovs->e++;
} else if (nb_iovs < 500) {
perf_iovs->f++;
} else if (nb_iovs < 700) {
perf_iovs->g++;
} else if (nb_iovs < 900) {
perf_iovs->h++;
} else if (nb_iovs < 1024) {
perf_iovs->i++;
} else {
perf_iovs->j++;
}
srs_error_t SrsStatistic::dumps_perf_msgs(SrsJsonObject* obj)
{
return dumps_perf(perf_msgs, obj);
}
// Stat the syscalls.
// a: number of syscalls of msgs.
perf_sys->a++;
void SrsStatistic::perf_on_rtc_packets(int nb_packets)
{
perf_on_packets(perf_rtc, nb_packets);
}
void SrsStatistic::perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs)
srs_error_t SrsStatistic::dumps_perf_rtc_packets(SrsJsonObject* obj)
{
// Stat the syscalls.
// a: number of syscalls of msgs.
// b: number of syscalls of pkts.
perf_sys->b++;
return dumps_perf(perf_rtc, obj);
}
srs_error_t SrsStatistic::dumps_perf_writev(SrsJsonObject* obj)
void SrsStatistic::perf_on_rtp_packets(int nb_packets)
{
srs_error_t err = srs_success;
perf_on_packets(perf_rtp, nb_packets);
}
if (true) {
SrsJsonObject* p = SrsJsonAny::object();
obj->set("msgs", p);
// For perf msgs, the nb_msgs stat.
// a: =1
// b: <10
// c: <100
// d: <200
// e: <300
// f: <400
// g: <500
// h: <600
// i: <1000
// j: >=1000
p->set("lt_2", SrsJsonAny::integer(perf_msgs->a));
p->set("lt_10", SrsJsonAny::integer(perf_msgs->b));
p->set("lt_100", SrsJsonAny::integer(perf_msgs->c));
p->set("lt_200", SrsJsonAny::integer(perf_msgs->d));
p->set("lt_300", SrsJsonAny::integer(perf_msgs->e));
p->set("lt_400", SrsJsonAny::integer(perf_msgs->f));
p->set("lt_500", SrsJsonAny::integer(perf_msgs->g));
p->set("lt_600", SrsJsonAny::integer(perf_msgs->h));
p->set("lt_1000", SrsJsonAny::integer(perf_msgs->i));
p->set("gt_1000", SrsJsonAny::integer(perf_msgs->j));
}
srs_error_t SrsStatistic::dumps_perf_rtp_packets(SrsJsonObject* obj)
{
return dumps_perf(perf_rtp, obj);
}
if (true) {
SrsJsonObject* p = SrsJsonAny::object();
obj->set("iovs", p);
// For perf iovs, the nb_iovs stat.
// a: <=2
// b: <10
// c: <20
// d: <200
// e: <300
// f: <500
// g: <700
// h: <900
// i: <1024
// j: >=1024
p->set("lt_3", SrsJsonAny::integer(perf_iovs->a));
p->set("lt_10", SrsJsonAny::integer(perf_iovs->b));
p->set("lt_20", SrsJsonAny::integer(perf_iovs->c));
p->set("lt_200", SrsJsonAny::integer(perf_iovs->d));
p->set("lt_300", SrsJsonAny::integer(perf_iovs->e));
p->set("lt_500", SrsJsonAny::integer(perf_iovs->f));
p->set("lt_700", SrsJsonAny::integer(perf_iovs->g));
p->set("lt_900", SrsJsonAny::integer(perf_iovs->h));
p->set("lt_1024", SrsJsonAny::integer(perf_iovs->i));
p->set("gt_1024", SrsJsonAny::integer(perf_iovs->j));
}
void SrsStatistic::perf_on_gso_packets(int nb_packets)
{
perf_on_packets(perf_gso, nb_packets);
}
if (true) {
SrsJsonObject* p = SrsJsonAny::object();
obj->set("sys", p);
// Stat the syscalls.
// a: number of syscalls of msgs.
// b: number of syscalls of pkts.
p->set("msgs", SrsJsonAny::integer(perf_sys->a));
p->set("pkts", SrsJsonAny::integer(perf_sys->b));
}
srs_error_t SrsStatistic::dumps_perf_gso(SrsJsonObject* obj)
{
return dumps_perf(perf_gso, obj);
}
return err;
void SrsStatistic::perf_on_writev_iovs(int nb_iovs)
{
perf_on_packets(perf_iovs, nb_iovs);
}
srs_error_t SrsStatistic::dumps_perf_writev_iovs(SrsJsonObject* obj)
{
return dumps_perf(perf_iovs, obj);
}
void SrsStatistic::perf_mw_on_packets(int nb_msgs)
{
// For perf msgs, the nb_msgs stat.
// a: =1
// b: <10
// c: <100
// d: <200
// e: <300
// f: <400
// g: <500
// h: <600
// i: <1000
// j: >=1000
if (nb_msgs == 1) {
perf_sendmmsg->a++;
} else if (nb_msgs < 10) {
perf_sendmmsg->b++;
} else if (nb_msgs < 100) {
perf_sendmmsg->c++;
} else if (nb_msgs < 200) {
perf_sendmmsg->d++;
} else if (nb_msgs < 300) {
perf_sendmmsg->e++;
} else if (nb_msgs < 400) {
perf_sendmmsg->f++;
} else if (nb_msgs < 500) {
perf_sendmmsg->g++;
} else if (nb_msgs < 600) {
perf_sendmmsg->h++;
} else if (nb_msgs < 1000) {
perf_sendmmsg->i++;
void SrsStatistic::perf_sendmmsg_on_packets(int nb_packets)
{
perf_on_packets(perf_sendmmsg, nb_packets);
}
srs_error_t SrsStatistic::dumps_perf_sendmmsg(SrsJsonObject* obj)
{
return dumps_perf(perf_sendmmsg, obj);
}
void SrsStatistic::perf_on_packets(SrsStatisticCategory* p, int nb_msgs)
{
// The range for stat:
// 2, 3, 5, 9, 16, 32, 64, 128, 256
// that is:
// a: <2
// b: <3
// c: <5
// d: <9
// e: <16
// f: <32
// g: <64
// h: <128
// i: <256
// j: >=256
if (nb_msgs < 2) {
p->a++;
} else if (nb_msgs < 3) {
p->b++;
} else if (nb_msgs < 5) {
p->c++;
} else if (nb_msgs < 9) {
p->d++;
} else if (nb_msgs < 16) {
p->e++;
} else if (nb_msgs < 32) {
p->f++;
} else if (nb_msgs < 64) {
p->g++;
} else if (nb_msgs < 128) {
p->h++;
} else if (nb_msgs < 256) {
p->i++;
} else {
perf_sendmmsg->j++;
p->j++;
}
p->nn += nb_msgs;
}
srs_error_t SrsStatistic::dumps_perf_sendmmsg(SrsJsonObject* obj)
srs_error_t SrsStatistic::dumps_perf(SrsStatisticCategory* p, SrsJsonObject* obj)
{
srs_error_t err = srs_success;
if (true) {
SrsJsonObject* p = SrsJsonAny::object();
obj->set("msgs", p);
// For perf msgs, the nb_msgs stat.
// a: =1
// b: <10
// c: <100
// d: <200
// e: <300
// f: <400
// g: <500
// h: <600
// i: <1000
// j: >=1000
p->set("lt_2", SrsJsonAny::integer(perf_sendmmsg->a));
p->set("lt_10", SrsJsonAny::integer(perf_sendmmsg->b));
p->set("lt_100", SrsJsonAny::integer(perf_sendmmsg->c));
p->set("lt_200", SrsJsonAny::integer(perf_sendmmsg->d));
p->set("lt_300", SrsJsonAny::integer(perf_sendmmsg->e));
p->set("lt_400", SrsJsonAny::integer(perf_sendmmsg->f));
p->set("lt_500", SrsJsonAny::integer(perf_sendmmsg->g));
p->set("lt_600", SrsJsonAny::integer(perf_sendmmsg->h));
p->set("lt_1000", SrsJsonAny::integer(perf_sendmmsg->i));
p->set("gt_1000", SrsJsonAny::integer(perf_sendmmsg->j));
}
// The range for stat:
// 2, 3, 5, 9, 16, 32, 64, 128, 256
// that is:
// a: <2
// b: <3
// c: <5
// d: <9
// e: <16
// f: <32
// g: <64
// h: <128
// i: <256
// j: >=256
if (p->a) obj->set("lt_2", SrsJsonAny::integer(p->a));
if (p->b) obj->set("lt_3", SrsJsonAny::integer(p->b));
if (p->c) obj->set("lt_5", SrsJsonAny::integer(p->c));
if (p->d) obj->set("lt_9", SrsJsonAny::integer(p->d));
if (p->e) obj->set("lt_16", SrsJsonAny::integer(p->e));
if (p->f) obj->set("lt_32", SrsJsonAny::integer(p->f));
if (p->g) obj->set("lt_64", SrsJsonAny::integer(p->g));
if (p->h) obj->set("lt_128", SrsJsonAny::integer(p->h));
if (p->i) obj->set("lt_256", SrsJsonAny::integer(p->i));
if (p->j) obj->set("gt_256", SrsJsonAny::integer(p->j));
obj->set("nn", SrsJsonAny::integer(p->nn));
return err;
}

@ -124,6 +124,8 @@ public:
class SrsStatisticCategory
{
public:
uint64_t nn;
public:
uint64_t a;
uint64_t b;
@ -168,8 +170,10 @@ private:
// The perf stat for mw(merged write).
SrsStatisticCategory* perf_iovs;
SrsStatisticCategory* perf_msgs;
SrsStatisticCategory* perf_sys;
SrsStatisticCategory* perf_sendmmsg;
SrsStatisticCategory* perf_gso;
SrsStatisticCategory* perf_rtp;
SrsStatisticCategory* perf_rtc;
private:
SrsStatistic();
virtual ~SrsStatistic();
@ -228,19 +232,39 @@ public:
// @param count the max count of clients to dump.
virtual srs_error_t dumps_clients(SrsJsonArray* arr, int start, int count);
public:
// Stat for packets merged written, nb_msgs is the number of RTMP messages,
// bytes_msgs is the total bytes of RTMP messages, nb_iovs is the total number of iovec.
virtual void perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs);
// Stat for packets merged written, nb_pkts is the number of or chunk packets,
// bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec.
virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs);
// Dumps the perf statistic data for TCP writev, for performance analysis.
virtual srs_error_t dumps_perf_writev(SrsJsonObject* obj);
public:
// Stat for packets UDP sendmmsg, nb_msgs is the vlen for sendmmsg.
virtual void perf_mw_on_packets(int nb_msgs);
// Stat for packets merged written, nb_msgs is the number of RTMP messages.
// For example, publish by FFMPEG, Audio and Video frames.
virtual void perf_on_msgs(int nb_msgs);
virtual srs_error_t dumps_perf_msgs(SrsJsonObject* obj);
public:
// Stat for packets merged written, nb_packets is the number of RTC packets.
// For example, a RTMP/AAC audio packet maybe transcoded to two RTC/opus packets.
virtual void perf_on_rtc_packets(int nb_packets);
virtual srs_error_t dumps_perf_rtc_packets(SrsJsonObject* obj);
public:
// Stat for packets merged written, nb_packets is the number of RTP packets.
// For example, a RTC/opus packet maybe package to three RTP packets.
virtual void perf_on_rtp_packets(int nb_packets);
// Dumps the perf statistic data for RTP packets, for performance analysis.
virtual srs_error_t dumps_perf_rtp_packets(SrsJsonObject* obj);
public:
// Stat for packets UDP GSO, nb_packets is the merged RTP packets.
// For example, three RTP/audio packets maybe GSO to one msghdr.
virtual void perf_on_gso_packets(int nb_packets);
// Dumps the perf statistic data for UDP GSO, for performance analysis.
virtual srs_error_t dumps_perf_gso(SrsJsonObject* obj);
public:
// Stat for TCP writev, nb_iovs is the total number of iovec.
virtual void perf_on_writev_iovs(int nb_iovs);
virtual srs_error_t dumps_perf_writev_iovs(SrsJsonObject* obj);
public:
// Stat for packets UDP sendmmsg, nb_packets is the vlen for sendmmsg.
virtual void perf_sendmmsg_on_packets(int nb_packets);
// Dumps the perf statistic data for UDP sendmmsg, for performance analysis.
virtual srs_error_t dumps_perf_sendmmsg(SrsJsonObject* obj);
private:
virtual void perf_on_packets(SrsStatisticCategory* p, int nb_msgs);
virtual srs_error_t dumps_perf(SrsStatisticCategory* p, SrsJsonObject* obj);
private:
virtual SrsStatisticVhost* create_vhost(SrsRequest* req);
virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req);

@ -33,6 +33,11 @@
// The macros generated by configure script.
#include <srs_auto_headers.hpp>
// Alias for debug.
#ifdef SRS_AUTO_DEBUG
#define SRS_DEBUG
#endif
// To convert macro values to string.
// @see https://gcc.gnu.org/onlinedocs/cpp/Stringification.html#Stringification
#define SRS_INTERNAL_STR(v) #v

@ -127,8 +127,12 @@
*/
#define SRS_PERF_QUEUE_COND_WAIT
#ifdef SRS_PERF_QUEUE_COND_WAIT
// For RTMP, use larger wait queue.
#define SRS_PERF_MW_MIN_MSGS 8
#define SRS_PERF_MW_MIN_MSGS_FOR_RTC 4
#define SRS_PERF_MW_MIN_MSGS_REALTIME 0
// For RTC, use smaller wait queue.
#define SRS_PERF_MW_MIN_MSGS_FOR_RTC 2
#define SRS_PERF_MW_MIN_MSGS_FOR_RTC_REALTIME 0
#endif
/**
* the default value of vhost for

@ -24,6 +24,6 @@
#ifndef SRS_CORE_VERSION4_HPP
#define SRS_CORE_VERSION4_HPP
#define SRS_VERSION4_REVISION 22
#define SRS_VERSION4_REVISION 23
#endif

@ -409,6 +409,14 @@ srs_error_t SrsSample::parse_bframe()
return err;
}
SrsSample* SrsSample::copy()
{
SrsSample* p = new SrsSample();
p->bytes = bytes;
p->size = size;
return p;
}
SrsCodecConfig::SrsCodecConfig()
{
}

@ -542,6 +542,8 @@ public:
public:
// If we need to know whether sample is bframe, we have to parse the NALU payload.
srs_error_t parse_bframe();
// Copy sample, share the bytes pointer.
SrsSample* copy();
};
/**

@ -383,7 +383,7 @@ void SrsSharedPtrMessage::set_extra_payloads(SrsSample* payloads, int nn_payload
ptr->nn_extra_payloads = nn_payloads;
ptr->extra_payloads = new SrsSample[nn_payloads];
memcpy(ptr->extra_payloads, payloads, nn_payloads * sizeof(SrsSample));
memcpy((void*)ptr->extra_payloads, payloads, nn_payloads * sizeof(SrsSample));
}
void SrsSharedPtrMessage::set_samples(SrsSample* samples, int nn_samples)
@ -394,7 +394,7 @@ void SrsSharedPtrMessage::set_samples(SrsSample* samples, int nn_samples)
ptr->nn_samples = nn_samples;
ptr->samples = new SrsSample[nn_samples];
memcpy(ptr->samples, samples, nn_samples * sizeof(SrsSample));
memcpy((void*)ptr->samples, samples, nn_samples * sizeof(SrsSample));
}
#endif

@ -229,6 +229,112 @@ srs_error_t SrsRtpRawPayload::encode(SrsBuffer* buf)
return srs_success;
}
SrsRtpRawNALUs::SrsRtpRawNALUs()
{
cursor = 0;
nn_bytes = 0;
}
SrsRtpRawNALUs::~SrsRtpRawNALUs()
{
vector<SrsSample*>::iterator it;
for (it = nalus.begin(); it != nalus.end(); ++it) {
SrsSample* p = *it;
srs_freep(p);
}
nalus.clear();
}
void SrsRtpRawNALUs::push_back(SrsSample* sample)
{
if (sample->size <= 0) {
return;
}
if (!nalus.empty()) {
SrsSample* p = new SrsSample();
p->bytes = (char*)"\0\0\1";
p->size = 3;
nn_bytes += 3;
nalus.push_back(p);
}
nn_bytes += sample->size;
nalus.push_back(sample);
}
uint8_t SrsRtpRawNALUs::skip_first_byte()
{
srs_assert (cursor >= 0 && nn_bytes > 0 && cursor < nn_bytes);
cursor++;
return uint8_t(nalus[0]->bytes[0]);
}
srs_error_t SrsRtpRawNALUs::read_samples(vector<SrsSample*>& samples, int size)
{
if (cursor + size < 0 || cursor + size > nn_bytes) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "cursor=%d, max=%d, size=%d", cursor, nn_bytes, size);
}
int pos = cursor;
cursor += size;
int left = size;
vector<SrsSample*>::iterator it;
for (it = nalus.begin(); it != nalus.end() && left > 0; ++it) {
SrsSample* p = *it;
// Ignore previous consumed samples.
if (pos && pos - p->size >= 0) {
pos -= p->size;
continue;
}
// Now, we are working at the sample.
int nn = srs_min(left, p->size - pos);
srs_assert(nn > 0);
SrsSample* sample = new SrsSample();
sample->bytes = p->bytes + pos;
sample->size = nn;
samples.push_back(sample);
left -= nn;
pos = 0;
}
return srs_success;
}
int SrsRtpRawNALUs::nb_bytes()
{
int size = 0;
vector<SrsSample*>::iterator it;
for (it = nalus.begin(); it != nalus.end(); ++it) {
SrsSample* p = *it;
size += p->size;
}
return size;
}
srs_error_t SrsRtpRawNALUs::encode(SrsBuffer* buf)
{
vector<SrsSample*>::iterator it;
for (it = nalus.begin(); it != nalus.end(); ++it) {
SrsSample* p = *it;
if (!buf->require(p->size)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size);
}
buf->write_bytes(p->bytes, p->size);
}
return srs_success;
}
SrsRtpSTAPPayload::SrsRtpSTAPPayload()
{
nri = (SrsAvcNaluType)0;
@ -339,7 +445,7 @@ srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf)
for (it = nalus.begin(); it != nalus.end(); ++it) {
SrsSample* p = *it;
if (!buf->require(p->size)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2 + p->size);
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size);
}
buf->write_bytes(p->bytes, p->size);

@ -95,6 +95,7 @@ public:
virtual srs_error_t encode(SrsBuffer* buf);
};
// Single payload data.
class SrsRtpRawPayload : public ISrsEncoder
{
public:
@ -110,6 +111,28 @@ public:
virtual srs_error_t encode(SrsBuffer* buf);
};
// Multiple NALUs, automatically insert 001 between NALUs.
class SrsRtpRawNALUs : public ISrsEncoder
{
private:
std::vector<SrsSample*> nalus;
int nn_bytes;
int cursor;
public:
SrsRtpRawNALUs();
virtual ~SrsRtpRawNALUs();
public:
void push_back(SrsSample* sample);
public:
uint8_t skip_first_byte();
srs_error_t read_samples(std::vector<SrsSample*>& samples, int size);
// interface ISrsEncoder
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
};
// STAP-A, for multiple NALUs.
class SrsRtpSTAPPayload : public ISrsEncoder
{
public:
@ -127,6 +150,7 @@ public:
virtual srs_error_t encode(SrsBuffer* buf);
};
// FU-A, for one NALU with multiple fragments.
class SrsRtpFUAPayload : public ISrsEncoder
{
public:

@ -548,7 +548,8 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg
// Notify about perf stat.
if (perf) {
perf->perf_mw_on_msgs(nb_msgs_merged_written, bytes_msgs_merged_written, iov_index);
perf->perf_on_msgs(nb_msgs_merged_written);
perf->perf_on_writev_iovs(iov_index);
nb_msgs_merged_written = 0; bytes_msgs_merged_written = 0;
}
@ -576,7 +577,8 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg
// Notify about perf stat.
if (perf) {
perf->perf_mw_on_msgs(nb_msgs_merged_written, bytes_msgs_merged_written, iov_index);
perf->perf_on_msgs(nb_msgs_merged_written);
perf->perf_on_writev_iovs(iov_index);
nb_msgs_merged_written = 0; bytes_msgs_merged_written = 0;
}
@ -627,11 +629,6 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg
if ((er = skt->writev(iovs, 2, NULL)) != srs_success) {
return srs_error_wrap(err, "writev");
}
// Notify about perf stat.
if (perf) {
perf->perf_mw_on_packets(1, payload_size, 2);
}
}
}

@ -155,11 +155,9 @@ public:
virtual ~ISrsProtocolPerf();
public:
// Stat for packets merged written, nb_msgs is the number of RTMP messages,
// bytes_msgs is the total bytes of RTMP messages, nb_iovs is the total number of iovec.
virtual void perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs) = 0;
// Stat for packets merged written, nb_pkts is the number of or chunk packets,
// bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec.
virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs) = 0;
virtual void perf_on_msgs(int nb_msgs) = 0;
// Stat for TCP writev, nb_iovs is the total number of iovec.
virtual void perf_on_writev_iovs(int nb_iovs) = 0;
};
// The protocol provides the rtmp-message-protocol services,

Loading…
Cancel
Save