|
|
|
@ -574,59 +574,61 @@ void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* ukt)
|
|
|
|
|
|
|
|
|
|
void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt)
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
if (!rtc_session->dtls_session) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
vector<mmsghdr> mhdrs;
|
|
|
|
|
for (int i = 0; i < nb_msgs; i++) {
|
|
|
|
|
SrsSharedPtrMessage* msg = msgs[i];
|
|
|
|
|
bool is_video = msg->is_video();
|
|
|
|
|
bool is_audio = msg->is_audio();
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < (int)msg->rtp_packets.size(); ++i) {
|
|
|
|
|
if (!rtc_session->dtls_session) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsRtpSharedPacket* pkt = msg->rtp_packets[i];
|
|
|
|
|
|
|
|
|
|
if (msg->is_video()) {
|
|
|
|
|
pkt->modify_rtp_header_payload_type(video_payload_type);
|
|
|
|
|
pkt->modify_rtp_header_ssrc(video_ssrc);
|
|
|
|
|
srs_verbose("send video, ssrc=%u, seq=%u, timestamp=%u", video_ssrc, pkt->rtp_header.get_sequence(), pkt->rtp_header.get_timestamp());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (msg->is_audio()) {
|
|
|
|
|
pkt->modify_rtp_header_payload_type(audio_payload_type);
|
|
|
|
|
pkt->modify_rtp_header_ssrc(audio_ssrc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int length = pkt->size;
|
|
|
|
|
char* buf = new char[kRtpPacketSize];
|
|
|
|
|
if (rtc_session->encrypt) {
|
|
|
|
|
if ((err = rtc_session->dtls_session->protect_rtp(buf, pkt->payload, length)) != srs_success) {
|
|
|
|
|
srs_warn("srtp err %s", srs_error_desc(err).c_str()); srs_freep(err); srs_freepa(buf);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
memcpy(buf, pkt->payload, length);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mmsghdr mhdr;
|
|
|
|
|
memset(&mhdr, 0, sizeof(mmsghdr));
|
|
|
|
|
mhdr.msg_hdr.msg_name = (sockaddr_in*)udp_mux_skt->peer_addr();
|
|
|
|
|
mhdr.msg_hdr.msg_namelen = udp_mux_skt->peer_addrlen();
|
|
|
|
|
mhdr.msg_hdr.msg_iovlen = 1;
|
|
|
|
|
mhdr.msg_hdr.msg_iov = new iovec();
|
|
|
|
|
mhdr.msg_hdr.msg_iov->iov_base = buf;
|
|
|
|
|
mhdr.msg_hdr.msg_iov->iov_len = length;
|
|
|
|
|
mhdrs.push_back(mhdr);
|
|
|
|
|
for (vector<SrsRtpSharedPacket*>::iterator it = msg->rtp_packets.begin(); it != msg->rtp_packets.end(); ++it) {
|
|
|
|
|
SrsRtpSharedPacket* pkt = *it;
|
|
|
|
|
send_and_free_message(msg, is_video, is_audio, pkt, udp_mux_skt);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_freep(msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ((err = rtc_session->rtc_server->send_and_free_messages(udp_mux_skt->stfd(), mhdrs)) != srs_success) {
|
|
|
|
|
srs_warn("sendmsg %d msgs, err %s", mhdrs.size(), srs_error_summary(err).c_str());
|
|
|
|
|
srs_freep(err);
|
|
|
|
|
void SrsRtcSenderThread::send_and_free_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* udp_mux_skt)
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
|
|
|
|
|
if (is_video) {
|
|
|
|
|
pkt->modify_rtp_header_payload_type(video_payload_type);
|
|
|
|
|
pkt->modify_rtp_header_ssrc(video_ssrc);
|
|
|
|
|
srs_verbose("send video, ssrc=%u, seq=%u, timestamp=%u", video_ssrc, pkt->rtp_header.get_sequence(), pkt->rtp_header.get_timestamp());
|
|
|
|
|
} else if (is_audio) {
|
|
|
|
|
pkt->modify_rtp_header_payload_type(audio_payload_type);
|
|
|
|
|
pkt->modify_rtp_header_ssrc(audio_ssrc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int length = pkt->size;
|
|
|
|
|
// Fetch a cached message from queue.
|
|
|
|
|
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
|
|
|
|
|
mmsghdr* mhdr = rtc_session->rtc_server->fetch();
|
|
|
|
|
char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base;
|
|
|
|
|
|
|
|
|
|
if (rtc_session->encrypt) {
|
|
|
|
|
if ((err = rtc_session->dtls_session->protect_rtp(buf, pkt->payload, length)) != srs_success) {
|
|
|
|
|
srs_warn("srtp err %s", srs_error_desc(err).c_str()); srs_freep(err); srs_freepa(buf);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
memcpy(buf, pkt->payload, length);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sockaddr_in* addr = (sockaddr_in*)udp_mux_skt->peer_addr();
|
|
|
|
|
socklen_t addrlen = (socklen_t)udp_mux_skt->peer_addrlen();
|
|
|
|
|
|
|
|
|
|
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
|
|
|
|
|
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
|
|
|
|
|
mhdr->msg_hdr.msg_iov->iov_len = length;
|
|
|
|
|
mhdr->msg_len = 0;
|
|
|
|
|
|
|
|
|
|
rtc_session->rtc_server->sendmmsg(udp_mux_skt->stfd(), mhdr);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id)
|
|
|
|
@ -728,7 +730,7 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsStunPacket stun_binding_response;
|
|
|
|
|
char buf[1460];
|
|
|
|
|
char buf[kRtpPacketSize];
|
|
|
|
|
SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
|
|
|
|
|
SrsAutoFree(SrsBuffer, stream);
|
|
|
|
|
|
|
|
|
@ -1023,7 +1025,7 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt)
|
|
|
|
|
return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
char unprotected_buf[1460];
|
|
|
|
|
char unprotected_buf[kRtpPacketSize];
|
|
|
|
|
int nb_unprotected_buf = udp_mux_skt->size();
|
|
|
|
|
if ((err = dtls_session->unprotect_rtcp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf)) != srs_success) {
|
|
|
|
|
return srs_error_wrap(err, "rtcp unprotect failed");
|
|
|
|
@ -1094,6 +1096,8 @@ SrsRtcServer::SrsRtcServer()
|
|
|
|
|
waiting_msgs = false;
|
|
|
|
|
cond = srs_cond_new();
|
|
|
|
|
trd = new SrsDummyCoroutine();
|
|
|
|
|
|
|
|
|
|
cache_pos = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsRtcServer::~SrsRtcServer()
|
|
|
|
@ -1104,8 +1108,11 @@ SrsRtcServer::~SrsRtcServer()
|
|
|
|
|
srs_freep(trd);
|
|
|
|
|
srs_cond_destroy(cond);
|
|
|
|
|
|
|
|
|
|
free_messages(mmhdrs);
|
|
|
|
|
mmhdrs.clear();
|
|
|
|
|
free_mhdrs(hotspot);
|
|
|
|
|
hotspot.clear();
|
|
|
|
|
|
|
|
|
|
free_mhdrs(cache);
|
|
|
|
|
cache.clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsRtcServer::initialize()
|
|
|
|
@ -1344,27 +1351,42 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic
|
|
|
|
|
return srs_success;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsRtcServer::send_and_free_messages(srs_netfd_t stfd, const vector<mmsghdr>& msgs)
|
|
|
|
|
mmsghdr* SrsRtcServer::fetch()
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
// TODO: FIXME: Maybe need to shrink?
|
|
|
|
|
if (cache_pos >= (int)cache.size()) {
|
|
|
|
|
mmsghdr mhdr;
|
|
|
|
|
memset(&mhdr, 0, sizeof(mmsghdr));
|
|
|
|
|
|
|
|
|
|
mhdr.msg_hdr.msg_iovlen = 1;
|
|
|
|
|
mhdr.msg_hdr.msg_iov = new iovec();
|
|
|
|
|
mhdr.msg_hdr.msg_iov->iov_base = new char[kRtpPacketSize];
|
|
|
|
|
mhdr.msg_hdr.msg_iov->iov_len = kRtpPacketSize;
|
|
|
|
|
mhdr.msg_len = 0;
|
|
|
|
|
|
|
|
|
|
cache.push_back(mhdr);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &cache[cache_pos++];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void SrsRtcServer::sendmmsg(srs_netfd_t stfd, mmsghdr* /*hdr*/)
|
|
|
|
|
{
|
|
|
|
|
mmstfd = stfd;
|
|
|
|
|
mmhdrs.insert(mmhdrs.end(), msgs.begin(), msgs.end());
|
|
|
|
|
|
|
|
|
|
if (waiting_msgs) {
|
|
|
|
|
waiting_msgs = false;
|
|
|
|
|
srs_cond_signal(cond);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void SrsRtcServer::free_messages(vector<mmsghdr>& hdrs)
|
|
|
|
|
void SrsRtcServer::free_mhdrs(std::vector<mmsghdr>& mhdrs)
|
|
|
|
|
{
|
|
|
|
|
for (int i = 0; i < (int)hdrs.size(); i++) {
|
|
|
|
|
msghdr* hdr = &hdrs[i].msg_hdr;
|
|
|
|
|
for (int j = (int)hdr->msg_iovlen - 1; j >= 0 ; j--) {
|
|
|
|
|
iovec* iov = hdr->msg_iov + j;
|
|
|
|
|
for (int i = 0; i < (int)mhdrs.size(); i++) {
|
|
|
|
|
mmsghdr* hdr = &mhdrs[i];
|
|
|
|
|
|
|
|
|
|
for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) {
|
|
|
|
|
iovec* iov = hdr->msg_hdr.msg_iov + j;
|
|
|
|
|
char* data = (char*)iov->iov_base;
|
|
|
|
|
srs_freep(data);
|
|
|
|
|
srs_freep(iov);
|
|
|
|
@ -1389,17 +1411,19 @@ srs_error_t SrsRtcServer::cycle()
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: FIXME: Use cond trigger.
|
|
|
|
|
if (mmhdrs.empty()) {
|
|
|
|
|
int pos = cache_pos;
|
|
|
|
|
if (pos <= 0) {
|
|
|
|
|
waiting_msgs = true;
|
|
|
|
|
srs_cond_wait(cond);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
vector<mmsghdr> mhdrs;
|
|
|
|
|
mmhdrs.swap(mhdrs);
|
|
|
|
|
// We are working on hotspot now.
|
|
|
|
|
cache.swap(hotspot);
|
|
|
|
|
cache_pos = 0;
|
|
|
|
|
|
|
|
|
|
mmsghdr* p = &mhdrs[0];
|
|
|
|
|
for (mmsghdr* end = p + mhdrs.size(); p < end; p += max_sendmmsg) {
|
|
|
|
|
mmsghdr* p = &hotspot[0];
|
|
|
|
|
for (mmsghdr* end = p + pos; p < end; p += max_sendmmsg) {
|
|
|
|
|
int vlen = (int)(end - p);
|
|
|
|
|
vlen = srs_min(max_sendmmsg, vlen);
|
|
|
|
|
|
|
|
|
@ -1415,10 +1439,8 @@ srs_error_t SrsRtcServer::cycle()
|
|
|
|
|
if ((cnt++ % 100) == 0) {
|
|
|
|
|
// TODO: FIXME: Support reload.
|
|
|
|
|
max_sendmmsg = _srs_config->get_rtc_server_sendmmsg();
|
|
|
|
|
srs_trace("-> RTC SEND %d msgs, by sendmmsg %d", mhdrs.size(), max_sendmmsg);
|
|
|
|
|
srs_trace("-> RTC SEND %d msgs, by sendmmsg %d", pos, max_sendmmsg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
free_messages(mhdrs);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return err;
|
|
|
|
|