For #307, remove dedicate GSO cache

pull/1753/head
winlin 5 years ago
parent 23c68a922c
commit feaf98eb69

@ -440,13 +440,9 @@ rtc_server {
# default: on # default: on
merge_nalus on; merge_nalus on;
# Whether enable GSO to send out RTP packets. # Whether enable GSO to send out RTP packets.
# @remark Linux only, for other OS always disabled. # @remark Linux 4.18+ only, for other OS always disabled.
# default: off # default: off
gso off; gso off;
# Whether GSO uses dedicated cache. Share the same mmsghdr cache if not dedicated.
# For Linux 3.*, we must use dedicated cache for GSO.
# default: off
gso_dedicated off;
} }
vhost rtc.vhost.srs.com { vhost rtc.vhost.srs.com {

@ -3614,8 +3614,7 @@ srs_error_t SrsConfig::check_normal_config()
for (int i = 0; conf && i < (int)conf->directives.size(); i++) { for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
string n = conf->at(i)->name; string n = conf->at(i)->name;
if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa" if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa"
&& n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus" && n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus") {
&& n != "gso_dedicated") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str()); return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str());
} }
} }
@ -4815,31 +4814,6 @@ bool SrsConfig::get_rtc_server_gso()
return v; return v;
} }
bool SrsConfig::get_rtc_server_gso_dedicated()
{
static int DEFAULT = false;
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("gso_dedicated");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
bool v = SRS_CONF_PERFER_FALSE(conf->arg0());
#ifdef SRS_AUTO_OSX
if (v != DEFAULT) {
srs_warn("GSO is for Linux only");
}
#endif
return v;
}
SrsConfDirective* SrsConfig::get_rtc(string vhost) SrsConfDirective* SrsConfig::get_rtc(string vhost)
{ {
SrsConfDirective* conf = get_vhost(vhost); SrsConfDirective* conf = get_vhost(vhost);

@ -530,7 +530,6 @@ public:
virtual int get_rtc_server_reuseport(); virtual int get_rtc_server_reuseport();
virtual bool get_rtc_server_merge_nalus(); virtual bool get_rtc_server_merge_nalus();
virtual bool get_rtc_server_gso(); virtual bool get_rtc_server_gso();
virtual bool get_rtc_server_gso_dedicated();
SrsConfDirective* get_rtc(std::string vhost); SrsConfDirective* get_rtc(std::string vhost);
bool get_rtc_enabled(std::string vhost); bool get_rtc_enabled(std::string vhost);

@ -139,10 +139,6 @@ public:
public: public:
// Fetch a mmsghdr from sender's cache. // Fetch a mmsghdr from sender's cache.
virtual srs_error_t fetch(mmsghdr** pphdr) = 0; virtual srs_error_t fetch(mmsghdr** pphdr) = 0;
// For Linux kernel 3.*, we must use isolate APIs for GSO,
// that is, sendmmsg does not work with GSO.
// Fetch a mmsghdr from sender's cache.
virtual srs_error_t gso_fetch(mmsghdr** pphdr) = 0;
// Notify the sender to send out the msg. // Notify the sender to send out the msg.
virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0; virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0;
}; };

@ -813,6 +813,7 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
mhdr->msg_hdr.msg_iov->iov_len = length; mhdr->msg_hdr.msg_iov->iov_len = length;
mhdr->msg_hdr.msg_controllen = 0;
mhdr->msg_len = 0; mhdr->msg_len = 0;
if ((err = sender->sendmmsg(mhdr)) != srs_success) { if ((err = sender->sendmmsg(mhdr)) != srs_success) {
@ -889,12 +890,7 @@ srs_error_t SrsRtcSenderThread::send_packets2(SrsUdpMuxSocket* skt, SrsRtcPacket
if (!mhdr) { if (!mhdr) {
// Fetch a cached message from queue. // Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
if (use_gso) { if ((err = sender->fetch(&mhdr)) != srs_success) {
err = sender->gso_fetch(&mhdr);
} else {
err = sender->fetch(&mhdr);
}
if (err != srs_success) {
return srs_error_wrap(err, "fetch msghdr"); return srs_error_wrap(err, "fetch msghdr");
} }
@ -1689,7 +1685,6 @@ SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s)
trd = new SrsDummyCoroutine(); trd = new SrsDummyCoroutine();
cache_pos = 0; cache_pos = 0;
gso_cache_pos = 0;
_srs_config->subscribe(this); _srs_config->subscribe(this);
} }
@ -1706,12 +1701,6 @@ SrsUdpMuxSender::~SrsUdpMuxSender()
free_mhdrs(cache); free_mhdrs(cache);
cache.clear(); cache.clear();
free_mhdrs(gso_hotspot);
gso_hotspot.clear();
free_mhdrs(gso_cache);
gso_cache.clear();
} }
srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd) srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd)
@ -1728,9 +1717,7 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd)
max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); max_sendmmsg = _srs_config->get_rtc_server_sendmmsg();
bool gso = _srs_config->get_rtc_server_gso(); bool gso = _srs_config->get_rtc_server_gso();
gso_dedicated = _srs_config->get_rtc_server_gso_dedicated(); srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d", srs_netfd_fileno(fd), max_sendmmsg, gso);
srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d, gso-dedicated=%d",
srs_netfd_fileno(fd), max_sendmmsg, gso, gso_dedicated);
return err; return err;
} }
@ -1775,34 +1762,6 @@ srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr)
return srs_success; return srs_success;
} }
srs_error_t SrsUdpMuxSender::gso_fetch(mmsghdr** pphdr)
{
// When GSO share cache, we use the same cache with non-GSO.
if (!gso_dedicated) {
return fetch(pphdr);
}
// TODO: FIXME: Maybe need to shrink?
if (gso_cache_pos >= (int)gso_cache.size()) {
mmsghdr mhdr;
memset(&mhdr, 0, sizeof(mmsghdr));
mhdr.msg_hdr.msg_iovlen = 1;
mhdr.msg_hdr.msg_iov = new iovec();
mhdr.msg_hdr.msg_iov->iov_base = new char[kRtpPacketSize];
mhdr.msg_hdr.msg_iov->iov_len = kRtpPacketSize;
mhdr.msg_len = 0;
mhdr.msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
mhdr.msg_hdr.msg_control = new char[mhdr.msg_hdr.msg_controllen];
gso_cache.push_back(mhdr);
}
*pphdr = &gso_cache[gso_cache_pos++];
return srs_success;
}
srs_error_t SrsUdpMuxSender::sendmmsg(mmsghdr* hdr) srs_error_t SrsUdpMuxSender::sendmmsg(mmsghdr* hdr)
{ {
if (waiting_msgs) { if (waiting_msgs) {
@ -1834,7 +1793,7 @@ srs_error_t SrsUdpMuxSender::cycle()
nn_loop++; nn_loop++;
int pos = cache_pos; int pos = cache_pos;
int gso_pos = gso_cache_pos; int gso_pos = 0;
int gso_iovs = 0; int gso_iovs = 0;
if (pos <= 0 && gso_pos == 0) { if (pos <= 0 && gso_pos == 0) {
waiting_msgs = true; waiting_msgs = true;
@ -1847,13 +1806,8 @@ srs_error_t SrsUdpMuxSender::cycle()
cache.swap(hotspot); cache.swap(hotspot);
cache_pos = 0; cache_pos = 0;
if (gso_dedicated) {
gso_cache.swap(gso_hotspot);
gso_cache_pos = 0;
}
// Collect informations for GSO // Collect informations for GSO
if (!gso_dedicated && pos > 0) { if (pos > 0) {
// For shared GSO cache, stat the messages. // For shared GSO cache, stat the messages.
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
for (p = &hotspot[0]; p < end; p++) { for (p = &hotspot[0]; p < end; p++) {
@ -1871,7 +1825,7 @@ srs_error_t SrsUdpMuxSender::cycle()
} }
// Send out all messages, may GSO if shared cache. // Send out all messages, may GSO if shared cache.
if (false && pos > 0) { if (pos > 0) {
// Send out all messages. // Send out all messages.
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
for (p = &hotspot[0]; p < end; p += max_sendmmsg) { for (p = &hotspot[0]; p < end; p += max_sendmmsg) {
@ -1887,32 +1841,9 @@ srs_error_t SrsUdpMuxSender::cycle()
} }
} }
// Send out GSO in dedicated queue.
if (gso_dedicated && gso_pos > 0) {
mmsghdr* p = &gso_hotspot[0]; mmsghdr* end = p + gso_pos;
for (; p < end; p++) {
// Private message, use it to store the cursor.
int real_iovs = p->msg_len;
p->msg_len = 0;
// Send out GSO message.
int r0 = srs_sendmsg(lfd, &p->msg_hdr, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 < 0) {
srs_warn("sendmsg err, r0=%d", r0);
}
nn_gso_msgs++; nn_gso_iovs += real_iovs; gso_iovs += real_iovs;
stat->perf_gso_on_packets(real_iovs);
}
}
// Increase total messages. // Increase total messages.
int nn_pos = pos; nn_msgs += pos;
if (gso_dedicated) { nn_msgs_max = srs_max(pos, nn_msgs_max);
nn_pos = pos + gso_pos;
}
nn_msgs += nn_pos;
nn_msgs_max = srs_max(nn_pos, nn_msgs_max);
nn_gso_msgs_max = srs_max(gso_pos, nn_gso_msgs_max); nn_gso_msgs_max = srs_max(gso_pos, nn_gso_msgs_max);
nn_gso_iovs_max = srs_max(gso_iovs, nn_gso_iovs_max); nn_gso_iovs_max = srs_max(gso_iovs, nn_gso_iovs_max);
@ -1936,9 +1867,9 @@ srs_error_t SrsUdpMuxSender::cycle()
pps_unit = "(k)"; pps_last /= 1000; pps_average /= 1000; pps_unit = "(k)"; pps_last /= 1000; pps_average /= 1000;
} }
srs_trace("-> RTC #%d SEND %d/%d/%" PRId64 ", gso %d/%d/%" PRId64 ", gso-iovs %d/%d/%" PRId64 ", pps %d/%d%s, schedule %d/%d, sessions %d, cache %d/%d, sendmmsg %d", srs_trace("-> RTC #%d SEND %d/%d/%" PRId64 ", gso %d/%d/%" PRId64 ", gso-iovs %d/%d/%" PRId64 ", pps %d/%d%s, schedule %d/%d, sessions %d, cache %d, sendmmsg %d",
srs_netfd_fileno(lfd), nn_pos, nn_msgs_max, nn_msgs, gso_pos, nn_gso_msgs_max, nn_gso_msgs, gso_iovs, nn_gso_iovs_max, nn_gso_iovs, pps_average, pps_last, pps_unit.c_str(), srs_netfd_fileno(lfd), pos, nn_msgs_max, nn_msgs, gso_pos, nn_gso_msgs_max, nn_gso_msgs, gso_iovs, nn_gso_iovs_max, nn_gso_iovs, pps_average, pps_last, pps_unit.c_str(),
nn_loop, nn_wait, (int)server->nn_sessions(), (int)cache.size(), (int)gso_cache.size(), max_sendmmsg); nn_loop, nn_wait, (int)server->nn_sessions(), (int)cache.size(), max_sendmmsg);
nn_msgs_last = nn_msgs; time_last = srs_get_system_time(); nn_msgs_last = nn_msgs; time_last = srs_get_system_time();
nn_loop = nn_wait = nn_msgs_max = 0; nn_loop = nn_wait = nn_msgs_max = 0;
nn_gso_msgs_max = 0; nn_gso_iovs_max = 0; nn_gso_msgs_max = 0; nn_gso_iovs_max = 0;
@ -1958,15 +1889,6 @@ srs_error_t SrsUdpMuxSender::on_reload_rtc_server()
} }
} }
if (true) {
bool gso = _srs_config->get_rtc_server_gso();
bool v = _srs_config->get_rtc_server_gso_dedicated();
if (gso_dedicated != v) {
srs_trace("Reload gso=%d, gso-dedicated %d=>%d", gso, gso_dedicated, v);
gso_dedicated = v;
}
}
return srs_success; return srs_success;
} }

@ -273,15 +273,6 @@ private:
int cache_pos; int cache_pos;
// The max number of messages for sendmmsg. If 1, we use sendmsg to send. // The max number of messages for sendmmsg. If 1, we use sendmsg to send.
int max_sendmmsg; int max_sendmmsg;
// Whether GSO shares the same mmsghdr cache.
// If not shared, use dedicated cache for GSO.
bool gso_dedicated;
private:
// For Linux kernel 3.*, we must use isolate queue for GSO,
// that is, sendmmsg does not work with GSO.
std::vector<mmsghdr> gso_hotspot;
std::vector<mmsghdr> gso_cache;
int gso_cache_pos;
public: public:
SrsUdpMuxSender(SrsRtcServer* s); SrsUdpMuxSender(SrsRtcServer* s);
virtual ~SrsUdpMuxSender(); virtual ~SrsUdpMuxSender();
@ -291,7 +282,6 @@ private:
void free_mhdrs(std::vector<mmsghdr>& mhdrs); void free_mhdrs(std::vector<mmsghdr>& mhdrs);
public: public:
virtual srs_error_t fetch(mmsghdr** pphdr); virtual srs_error_t fetch(mmsghdr** pphdr);
virtual srs_error_t gso_fetch(mmsghdr** pphdr);
virtual srs_error_t sendmmsg(mmsghdr* hdr); virtual srs_error_t sendmmsg(mmsghdr* hdr);
virtual srs_error_t cycle(); virtual srs_error_t cycle();
// interface ISrsReloadHandler // interface ISrsReloadHandler

Loading…
Cancel
Save