diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index d11d455ec..6e1d1a524 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -205,22 +205,10 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* skt) } } -<<<<<<< HEAD - if (out_bio_len) { - srs_netfd_t stfd = udp_mux_skt->stfd(); - sockaddr_in* addr = udp_mux_skt->peer_addr(); - socklen_t addrlen = udp_mux_skt->peer_addrlen(); - - char* buf = new char[out_bio_len]; - memcpy(buf, out_bio_data, out_bio_len); - - rtc_session->send_and_free_messages(stfd, addr, addrlen, buf, out_bio_len); -======= if (out_bio_len) { if ((err = skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) { return srs_error_wrap(err, "send dtls packet"); } ->>>>>>> upstream/feature/rtc } return err; @@ -607,17 +595,10 @@ srs_error_t SrsRtcSenderThread::cycle() srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); } -<<<<<<< HEAD -void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* ukt) -{ - srs_trace("rtc session=%s address changed, update %s -> %s", - rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), ukt->get_peer_id().c_str()); -======= for (int i = 0; i < msg_count; i++) { SrsSharedPtrMessage* msg = msgs.msgs[i]; srs_freep(msg); } ->>>>>>> upstream/feature/rtc pprint->elapse(); if (pprint->can_print()) { @@ -633,18 +614,12 @@ srs_error_t SrsRtcSenderThread::send_messages( ) { srs_error_t err = srs_success; -<<<<<<< HEAD - srs_netfd_t stfd = udp_mux_skt->stfd(); - sockaddr_in* addr = udp_mux_skt->peer_addr(); - socklen_t addrlen = udp_mux_skt->peer_addrlen(); -======= if (!rtc_session->dtls_session) { return err; } // Covert kernel messages to RTP packets. vector packets; ->>>>>>> upstream/feature/rtc for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = msgs[i]; @@ -704,14 +679,10 @@ srs_error_t SrsRtcSenderThread::send_messages( *pnn_rtp_pkts += (int)packets.size(); -<<<<<<< HEAD - rtc_session->send_and_free_messages(stfd, addr, addrlen, buf, length); -======= for (int j = 0; j < (int)packets.size(); j++) { SrsRtpPacket2* packet = packets[j]; if ((err = send_packet(packet, skt)) != srs_success) { srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); ->>>>>>> upstream/feature/rtc } srs_freep(packet); } @@ -731,8 +702,6 @@ srs_error_t SrsRtcSenderThread::send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* if ((err = sender->fetch(&mhdr)) != srs_success) { return srs_error_wrap(err, "fetch msghdr"); } -<<<<<<< HEAD -======= char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; // Length of iov, default size. @@ -901,7 +870,6 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes *ppacket = packet; return err; ->>>>>>> upstream/feature/rtc } SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) @@ -1003,12 +971,9 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacke } SrsStunPacket stun_binding_response; -<<<<<<< HEAD -======= char buf[kRtpPacketSize]; SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); SrsAutoFree(SrsBuffer, stream); ->>>>>>> upstream/feature/rtc stun_binding_response.set_message_type(BindingResponse); stun_binding_response.set_local_ufrag(stun_req->get_remote_ufrag()); @@ -1018,23 +983,10 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacke stun_binding_response.set_mapped_address(be32toh(inet_addr(skt->get_peer_ip().c_str()))); stun_binding_response.set_mapped_port(skt->get_peer_port()); - char* buf = new char[1460]; - SrsBuffer* stream = new SrsBuffer(buf, 1460); - SrsAutoFree(SrsBuffer, stream); - if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { return srs_error_wrap(err, "stun binding response encode failed"); } -<<<<<<< HEAD - srs_netfd_t stfd = udp_mux_skt->stfd(); - sockaddr_in* addr = udp_mux_skt->peer_addr(); - socklen_t addrlen = udp_mux_skt->peer_addrlen(); - send_and_free_messages(stfd, addr, addrlen, buf, stream->pos()); - - if (get_session_state() == WAITING_STUN) { - peer_id = udp_mux_skt->get_peer_id(); -======= if ((err = skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { return srs_error_wrap(err, "stun binding response send failed"); } @@ -1043,7 +995,6 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacke set_session_state(DOING_DTLS_HANDSHAKE); peer_id = skt->get_peer_id(); ->>>>>>> upstream/feature/rtc rtc_server->insert_into_id_sessions(peer_id, this); set_session_state(DOING_DTLS_HANDSHAKE); @@ -1138,15 +1089,7 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock srs_verbose("resend pkt sequence=%u", resend_pkts[i]->rtp_header.get_sequence()); dtls_session->protect_rtp(protected_buf, resend_pkts[i]->payload, nb_protected_buf); -<<<<<<< HEAD - - srs_netfd_t stfd = udp_mux_skt->stfd(); - sockaddr_in* addr = udp_mux_skt->peer_addr(); - socklen_t addrlen = udp_mux_skt->peer_addrlen(); - send_and_free_messages(stfd, addr, addrlen, protected_buf, nb_protected_buf); -======= skt->sendto(protected_buf, nb_protected_buf, 0); ->>>>>>> upstream/feature/rtc } } @@ -1271,14 +1214,8 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* skt) { -<<<<<<< HEAD - srs_trace("rtc session=%s, timeout=%dms connection established", id().c_str(), srsu2msi(sessionStunTimeout)); - set_session_state(ESTABLISHED); - return start_play(udp_mux_skt); -======= srs_trace("rtc session=%s, to=%dms connection established", id().c_str(), srsu2msi(sessionStunTimeout)); return start_play(skt); ->>>>>>> upstream/feature/rtc } srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* skt) @@ -1394,16 +1331,7 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) return err; } -<<<<<<< HEAD -void SrsRtcSession::send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length) -{ - rtc_server->send_and_free_messages(stfd, addr, addrlen, buf, length); -} - -SrsRtcServer::SrsRtcServer() -======= SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s) ->>>>>>> upstream/feature/rtc { lfd = NULL; server = s; @@ -1855,95 +1783,6 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic return srs_success; } -<<<<<<< HEAD -void SrsRtcServer::send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length) -{ - mmstfd = stfd; - - mmsghdr mhdr; - memset(&mhdr, 0, sizeof(mhdr)); - - mhdr.msg_hdr.msg_name = addr; - mhdr.msg_hdr.msg_namelen = addrlen; - mhdr.msg_hdr.msg_iovlen = 1; - mhdr.msg_hdr.msg_iov = new iovec(); - mhdr.msg_hdr.msg_iov->iov_base = buf; - mhdr.msg_hdr.msg_iov->iov_len = length; - mmhdrs.push_back(mhdr); - - if (waiting_msgs) { - waiting_msgs = false; - srs_cond_signal(cond); - } -} - -void SrsRtcServer::free_messages(vector& hdrs) -{ - for (int i = 0; i < (int)hdrs.size(); i++) { - msghdr* hdr = &hdrs[i].msg_hdr; - for (int j = (int)hdr->msg_iovlen - 1; j >= 0 ; j--) { - iovec* iov = hdr->msg_iov + j; - char* data = (char*)iov->iov_base; - srs_freep(data); - srs_freep(iov); - } - } -} - -srs_error_t SrsRtcServer::cycle() -{ - srs_error_t err = srs_success; - - // TODO: FIXME: Use pithy print. - uint32_t cnt = 1; - - SrsStatistic* stat = SrsStatistic::instance(); - - // TODO: FIXME: Support reload. - int max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); - - while (true) { - if ((err = trd->pull()) != srs_success) { - return err; - } - - // TODO: FIXME: Use cond trigger. - if (mmhdrs.empty()) { - waiting_msgs = true; - srs_cond_wait(cond); - } - - vector mhdrs; - mmhdrs.swap(mhdrs); - - mmsghdr* p = &mhdrs[0]; - for (mmsghdr* end = p + mhdrs.size(); p < end; p += max_sendmmsg) { - int vlen = (int)(end - p); - vlen = srs_min(max_sendmmsg, vlen); - - int r0 = srs_sendmmsg(mmstfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT); - if (r0 != vlen) { - srs_warn("sendmsg %d msgs, %d done", vlen, r0); - } - - stat->perf_mw_on_packets(vlen); - } - - // TODO: FIXME: Use pithy print. - if ((cnt++ % 100) == 0) { - // TODO: FIXME: Support reload. - max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); - srs_trace("-> RTC SEND %d msgs, by sendmmsg %d", mhdrs.size(), max_sendmmsg); - } - - free_messages(mhdrs); - } - - return err; -} - -======= ->>>>>>> upstream/feature/rtc RtcServerAdapter::RtcServerAdapter() { rtc = new SrsRtcServer(); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index a244c0070..3fef015c6 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -224,18 +224,9 @@ private: private: srs_error_t on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req); private: -<<<<<<< HEAD - srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); - srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); - srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); -// Internal only. -public: - void send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length); -======= srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt); srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt); srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt); ->>>>>>> upstream/feature/rtc }; class SrsUdpMuxSender : virtual public ISrsUdpSender, virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler @@ -306,14 +297,6 @@ private: // interface ISrsHourGlass public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); -<<<<<<< HEAD -// Internal only. -public: - void send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length); - void free_messages(std::vector& hdrs); - virtual srs_error_t cycle(); -======= ->>>>>>> upstream/feature/rtc }; // The RTC server adapter.