For #307, refine GSO performance, alloc iovs

pull/1748/head
winlin 5 years ago
parent 8383f1b27a
commit f0015a7cc1

@ -704,8 +704,6 @@ srs_error_t SrsRtcSenderThread::cycle()
// For RTC, we always try to read messages, only wait when no message.
if (msg_count <= 0) {
srs_usleep(0);
#ifdef SRS_PERF_QUEUE_COND_WAIT
if (realtime) {
// for realtime, min required msgs is 0, send when got one+ msgs.
@ -887,43 +885,40 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets)
return srs_error_wrap(err, "fetch msghdr");
}
// Reset the iovec, we should never change the msg_iovlen.
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* p = mhdr->msg_hdr.msg_iov + j;
// For this message, select the first iovec.
iovec* iov = mhdr->msg_hdr.msg_iov;
mhdr->msg_hdr.msg_iovlen = 1;
if (!p->iov_len) {
break;
}
p->iov_len = 0;
if (!iov->iov_base) {
iov->iov_base = new char[kRtpPacketSize];
}
iov->iov_len = kRtpPacketSize;
char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base;
int length = kRtpPacketSize;
// Marshal packet to bytes.
// Marshal packet to bytes in iovec.
if (true) {
SrsBuffer stream(buf, length);
SrsBuffer stream((char*)iov->iov_base, iov->iov_len);
if ((err = packet->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
length = stream.pos();
iov->iov_len = stream.pos();
}
// Whether encrypt the RTP bytes.
if (rtc_session->encrypt) {
if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length)) != srs_success) {
int nn_encrypt = (int)iov->iov_len;
if ((err = rtc_session->dtls_session->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
}
// Set the address and control information.
sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr();
socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen();
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
mhdr->msg_hdr.msg_iov->iov_len = length;
mhdr->msg_hdr.msg_controllen = 0;
mhdr->msg_len = 0;
// When we send out a packet, we commit a RTP packet.
packets.nn_rtp_pkts++;
@ -996,23 +991,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
using_gso = true;
gso_final = (gso_size && gso_size != nn_packet);
mhdr = gso_mhdr;
// We need to increase the iov and cursor.
int nb_iovs = mhdr->msg_hdr.msg_iovlen;
if (gso_cursor >= nb_iovs - 1) {
int nn_new_iovs = 1;
mhdr->msg_hdr.msg_iovlen = nb_iovs + nn_new_iovs;
mhdr->msg_hdr.msg_iov = (iovec*)realloc(mhdr->msg_hdr.msg_iov, sizeof(iovec) * (nb_iovs + nn_new_iovs));
memset(mhdr->msg_hdr.msg_iov + nb_iovs, 0, sizeof(iovec) * nn_new_iovs);
}
gso_cursor++;
// Create payload cache for RTP packet.
iovec* p = mhdr->msg_hdr.msg_iov + gso_cursor;
if (!p->iov_base) {
p->iov_base = new char[kRtpPacketSize];
p->iov_len = kRtpPacketSize;
}
}
// Change the state according to the next packet.
@ -1037,16 +1015,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
return srs_error_wrap(err, "fetch msghdr");
}
// Reset the iovec, we should never change the msg_iovlen.
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* p = mhdr->msg_hdr.msg_iov + j;
if (!p->iov_len) {
break;
}
p->iov_len = 0;
}
// Now, GSO will use this message and size.
if (using_gso) {
gso_mhdr = mhdr;
@ -1054,10 +1022,17 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
}
}
// Marshal packet to bytes.
// For this message, select a new iovec.
iovec* iov = mhdr->msg_hdr.msg_iov + gso_cursor;
mhdr->msg_hdr.msg_iovlen = gso_cursor + 1;
gso_cursor++;
if (!iov->iov_base) {
iov->iov_base = new char[kRtpPacketSize];
}
iov->iov_len = kRtpPacketSize;
// Marshal packet to bytes in iovec.
if (true) {
SrsBuffer stream((char*)iov->iov_base, iov->iov_len);
if ((err = packet->encode(&stream)) != srs_success) {
@ -1085,7 +1060,7 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
}
// If exceed the max GSO size, set to final.
if (using_gso && gso_cursor > 64) {
if (using_gso && gso_cursor + 1 >= SRS_PERF_RTC_GSO_MAX) {
gso_final = true;
}
@ -1099,10 +1074,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
if (do_send) {
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* iov = mhdr->msg_hdr.msg_iov + j;
if (iov->iov_len <= 0) {
break;
}
srs_trace("#%d, %s #%d/%d/%d, %d/%d bytes, size %d/%d", packets.debug_id, (using_gso? "GSO":"RAW"), j,
gso_cursor + 1, mhdr->msg_hdr.msg_iovlen, iov->iov_len, padding, gso_size, gso_encrypt);
}
@ -1110,13 +1081,13 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
#endif
if (do_send) {
// Set the address and control information.
sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr();
socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen();
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
mhdr->msg_hdr.msg_controllen = 0;
mhdr->msg_len = 0;
#ifndef SRS_AUTO_OSX
if (using_gso) {
@ -1130,9 +1101,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t*)CMSG_DATA(cm)) = gso_encrypt;
// Private message, use it to store the cursor.
mhdr->msg_len = gso_cursor + 1;
}
#endif
@ -1870,18 +1838,21 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd)
void SrsUdpMuxSender::free_mhdrs(std::vector<mmsghdr>& mhdrs)
{
for (int i = 0; i < (int)mhdrs.size(); i++) {
int nn_mhdrs = (int)mhdrs.size();
for (int i = 0; i < nn_mhdrs; i++) {
// @see https://linux.die.net/man/2/sendmmsg
// @see https://linux.die.net/man/2/sendmsg
mmsghdr* hdr = &mhdrs[i];
// Free control for GSO.
char* msg_control = (char*)hdr->msg_hdr.msg_control;
srs_freep(msg_control);
srs_freepa(msg_control);
// Free iovec.
for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) {
for (int j = SRS_PERF_RTC_GSO_MAX - 1; j >= 0 ; j--) {
iovec* iov = hdr->msg_hdr.msg_iov + j;
char* data = (char*)iov->iov_base;
srs_freep(data);
srs_freepa(data);
srs_freepa(iov);
}
}
@ -1892,19 +1863,21 @@ srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr)
{
// TODO: FIXME: Maybe need to shrink?
if (cache_pos >= (int)cache.size()) {
// @see https://linux.die.net/man/2/sendmmsg
// @see https://linux.die.net/man/2/sendmsg
mmsghdr mhdr;
memset(&mhdr, 0, sizeof(mmsghdr));
mhdr.msg_len = 0;
mhdr.msg_hdr.msg_flags = 0;
mhdr.msg_hdr.msg_control = NULL;
mhdr.msg_hdr.msg_iovlen = SRS_PERF_RTC_GSO_IOVS;
mhdr.msg_hdr.msg_iovlen = SRS_PERF_RTC_GSO_MAX;
mhdr.msg_hdr.msg_iov = new iovec[mhdr.msg_hdr.msg_iovlen];
memset((void*)mhdr.msg_hdr.msg_iov, 0, sizeof(iovec) * mhdr.msg_hdr.msg_iovlen);
for (int i = 0; i < (int)mhdr.msg_hdr.msg_iovlen; i++) {
for (int i = 0; i < SRS_PERF_RTC_GSO_IOVS; i++) {
iovec* p = mhdr.msg_hdr.msg_iov + i;
p->iov_base = new char[kRtpPacketSize];
p->iov_len = kRtpPacketSize;
}
cache.push_back(mhdr);
@ -1949,15 +1922,10 @@ srs_error_t SrsUdpMuxSender::cycle()
int pos = cache_pos;
int gso_iovs = 0;
if (pos <= 0) {
srs_usleep(0);
if (pos <= 0) {
waiting_msgs = true;
nn_wait++;
srs_cond_wait(cond);
}
if (pos <= 0) {
continue;
}
waiting_msgs = true;
nn_wait++;
srs_cond_wait(cond);
continue;
}
// We are working on hotspot now.
@ -1968,16 +1936,12 @@ srs_error_t SrsUdpMuxSender::cycle()
int gso_pos = 0;
if (pos > 0 && stat_enabled) {
// For shared GSO cache, stat the messages.
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
for (p = &hotspot[0]; p < end; p++) {
if (!p->msg_len) {
continue;
}
// Private message, use it to store the cursor.
int real_iovs = p->msg_len;
p->msg_len = 0;
// @see https://linux.die.net/man/2/sendmmsg
// @see https://linux.die.net/man/2/sendmsg
for (int i = 0; i < pos; i++) {
mmsghdr* mhdr = &hotspot[i];
int real_iovs = mhdr->msg_hdr.msg_iovlen;
gso_pos++; nn_gso_msgs++; nn_gso_iovs += real_iovs;
gso_iovs += real_iovs;
}
@ -1986,6 +1950,8 @@ srs_error_t SrsUdpMuxSender::cycle()
// Send out all messages.
if (pos > 0) {
// Send out all messages.
// @see https://linux.die.net/man/2/sendmmsg
// @see https://linux.die.net/man/2/sendmsg
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
for (p = &hotspot[0]; p < end; p += max_sendmmsg) {
int vlen = (int)(end - p);

@ -201,7 +201,14 @@
// = 715MB # For SRS_PERF_RTC_GSO_IOVS = 1
// = 1402MB # For SRS_PERF_RTC_GSO_IOVS = 2
// = 2775MB # For SRS_PERF_RTC_GSO_IOVS = 4
#define SRS_PERF_RTC_GSO_IOVS 2
#if defined(__linux__)
#define SRS_PERF_RTC_GSO_IOVS 2
#else
#define SRS_PERF_RTC_GSO_IOVS 1
#endif
// For RTC, the max iovs in msghdr, the max packets sent in a msghdr.
#define SRS_PERF_RTC_GSO_MAX 64
#endif

@ -436,6 +436,8 @@ int srs_sendmmsg(srs_netfd_t stfd, struct mmsghdr *msgvec, unsigned int vlen, in
}
msgvec->msg_len = r0;
#else
msgvec->msg_len = 0;
int tolen = (int)msgvec->msg_hdr.msg_namelen;
const struct sockaddr* to = (const struct sockaddr*)msgvec->msg_hdr.msg_name;
for (int i = 0; i < (int)msgvec->msg_hdr.msg_iovlen; i++) {

Loading…
Cancel
Save