For #370, use round-trip to send on all fds

pull/1753/head
winlin 5 years ago
parent 37e2f8896c
commit 82579e4b0c

@ -1334,7 +1334,6 @@ SrsRtcServer::SrsRtcServer()
{
timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS);
mmstfd = NULL;
waiting_msgs = false;
cond = srs_cond_new();
trd = new SrsDummyCoroutine();
@ -1415,6 +1414,9 @@ srs_error_t SrsRtcServer::listen_udp()
return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
}
// We will use all FDs to sendmmsg.
stfds.push_back(listener->stfd());
srs_trace("rtc listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd());
listeners.push_back(listener);
}
@ -1642,8 +1644,6 @@ mmsghdr* SrsRtcServer::fetch()
void SrsRtcServer::sendmmsg(srs_netfd_t stfd, mmsghdr* /*hdr*/)
{
mmstfd = stfd;
if (waiting_msgs) {
waiting_msgs = false;
srs_cond_signal(cond);
@ -1670,12 +1670,15 @@ srs_error_t SrsRtcServer::cycle()
uint64_t nn_msgs = 0;
uint64_t nn_msgs_last = 0;
uint64_t nn_msgs_max = 0;
uint64_t nn_loop = 0;
uint64_t nn_wait = 0;
int nn_msgs_max = 0;
int nn_loop = 0;
int nn_wait = 0;
srs_utime_t time_last = srs_get_system_time();
SrsStatistic* stat = SrsStatistic::instance();
// We use FDs to send out messages, by round-trip algorithm.
uint32_t fd_index = 0;
SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_send();
SrsAutoFree(SrsPithyPrint, pprint);
@ -1698,12 +1701,12 @@ srs_error_t SrsRtcServer::cycle()
cache.swap(hotspot);
cache_pos = 0;
mmsghdr* p = &hotspot[0];
mmsghdr* end = p + pos;
srs_netfd_t stfd = mmstfd;
srs_netfd_t stfd = NULL;
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
for (; p < end; p += max_sendmmsg) {
int vlen = (int)(end - p);
vlen = srs_min(max_sendmmsg, vlen);
stfd = stfds.at((fd_index++) % stfds.size());
int r0 = srs_sendmmsg(stfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != vlen) {
@ -1719,6 +1722,7 @@ srs_error_t SrsRtcServer::cycle()
pprint->elapse();
if (pprint->can_print()) {
// TODO: FIXME: Extract a PPS calculator.
int pps_average = 0; int pps_last = 0;
if (true) {
if (srs_get_system_time() > srs_get_system_startup_time()) {
@ -1736,7 +1740,7 @@ srs_error_t SrsRtcServer::cycle()
pps_unit = "(k)"; pps_last /= 10000; pps_average /= 10000;
}
srs_trace("-> RTC #%d SEND %d, pps %d/%d%s, schedule %" PRId64 "/%" PRId64 "/%" PRId64 ", sessions %d by sendmmsg %d",
srs_trace("-> RTC #%d SEND %d, pps %d/%d%s, schedule %d/%d/%d, sessions %d by sendmmsg %d",
srs_netfd_fileno(stfd), pos, pps_average, pps_last, pps_unit.c_str(), nn_loop, nn_wait, nn_msgs_max, (int)map_username_session.size(), max_sendmmsg);
nn_msgs_last = nn_msgs; time_last = srs_get_system_time();
nn_loop = nn_wait = nn_msgs_max = 0;

@ -240,7 +240,7 @@ private:
srs_cond_t cond;
bool waiting_msgs;
// TODO: FIXME: Support multiple stfd.
srs_netfd_t mmstfd;
std::vector<srs_netfd_t> stfds;
// Hotspot msgs, we are working on it.
// @remark We will wait util all messages are ready.
std::vector<mmsghdr> hotspot;

Loading…
Cancel
Save