diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 3294c79f5..72d62fffe 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -460,6 +460,10 @@ srs_error_t SrsUdpMuxListener::cycle() nn_loop++; + // TODO: FIXME: Refactor the memory cache for receiver. + // Because we have to decrypt the cipher of received packet payload, + // and the size is not determined, so we think there is at least one copy, + // and we can reuse the plaintext h264/opus with players when got plaintext. SrsUdpMuxSocket skt(sender, lfd); int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 5c1458d9d..c2fbba7c6 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -70,17 +70,17 @@ using namespace std; // which reserves 100 bytes for SRTP or paddings. const int kRtpMaxPayloadSize = kRtpPacketSize - 200; -static bool is_stun(const uint8_t* data, const int size) +static bool is_stun(const uint8_t* data, const int size) { return data != NULL && size > 0 && (data[0] == 0 || data[0] == 1); } -static bool is_dtls(const uint8_t* data, size_t len) +static bool is_dtls(const uint8_t* data, size_t len) { return (len >= 13 && (data[0] > 19 && data[0] < 64)); } -static bool is_rtp_or_rtcp(const uint8_t* data, size_t len) +static bool is_rtp_or_rtcp(const uint8_t* data, size_t len) { return (len >= 12 && (data[0] & 0xC0) == 0x80); } @@ -260,7 +260,7 @@ srs_error_t SrsDtlsSession::initialize(SrsRequest* r) return err; } -srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* skt) +srs_error_t SrsDtlsSession::handshake() { srs_error_t err = srs_success; @@ -272,7 +272,7 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* skt) int ssl_err = SSL_get_error(dtls, ret); switch(ssl_err) { case SSL_ERROR_NONE: { - if ((err = on_dtls_handshake_done(skt)) != srs_success) { + if ((err = on_dtls_handshake_done()) != srs_success) { return srs_error_wrap(err, "dtls handshake done handle"); } break; @@ -292,7 +292,7 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* skt) } if (out_bio_len) { - if ((err = skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) { + if ((err = rtc_session->sendonly_skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) { return srs_error_wrap(err, "send dtls packet"); } } @@ -306,7 +306,7 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* skt) return err; } -srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* skt) +srs_error_t SrsDtlsSession::on_dtls(char* data, int nb_data) { srs_error_t err = srs_success; if (BIO_reset(bio_in) != 1) { @@ -316,19 +316,19 @@ srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* skt) return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset"); } - if (BIO_write(bio_in, skt->data(), skt->size()) <= 0) { + if (BIO_write(bio_in, data, nb_data) <= 0) { // TODO: 0 or -1 maybe block, use BIO_should_retry to check. return srs_error_new(ERROR_OpenSslBIOWrite, "BIO_write"); } if (rtc_session->blackhole && rtc_session->blackhole_addr && rtc_session->blackhole_stfd) { // Ignore any error for black-hole. - void* p = skt->data(); int len = skt->size(); SrsRtcSession* s = rtc_session; + void* p = data; int len = nb_data; SrsRtcSession* s = rtc_session; srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); } if (!handshake_done) { - err = handshake(skt); + err = handshake(); } else { while (BIO_ctrl_pending(bio_in) > 0) { char dtls_read_buf[8092]; @@ -345,7 +345,7 @@ srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* skt) return err; } -srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpMuxSocket* skt) +srs_error_t SrsDtlsSession::on_dtls_handshake_done() { srs_error_t err = srs_success; srs_trace("rtc session=%s, DTLS handshake done.", rtc_session->id().c_str()); @@ -355,7 +355,7 @@ srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpMuxSocket* skt) return srs_error_wrap(err, "srtp init failed"); } - return rtc_session->on_connection_established(skt); + return rtc_session->on_connection_established(); } srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int nb_buf) @@ -367,7 +367,7 @@ srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int return err; } -srs_error_t SrsDtlsSession::srtp_initialize() +srs_error_t SrsDtlsSession::srtp_initialize() { srs_error_t err = srs_success; @@ -375,7 +375,7 @@ srs_error_t SrsDtlsSession::srtp_initialize() static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp"; if (!SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) { return srs_error_new(ERROR_RTC_SRTP_INIT, "SSL_export_keying_material failed"); - } + } size_t offset = 0; @@ -415,7 +415,7 @@ srs_error_t SrsDtlsSession::srtp_send_init() srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtcp); policy.ssrc.type = ssrc_any_outbound; - + policy.ssrc.value = 0; // TODO: adjust window_size policy.window_size = 8192; @@ -629,15 +629,13 @@ SrsRtpPacket2* SrsRtcPackets::at(int index) return cache + index; } -SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid) - : sendonly_ukt(NULL) +SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, int parent_cid) { _parent_cid = parent_cid; trd = new SrsDummyCoroutine(); rtc_session = s; - sendonly_ukt = u->copy_sendonly(); - sender = u->sender(); + sender = NULL; gso = false; merge_nalus = false; @@ -660,7 +658,6 @@ SrsRtcSenderThread::~SrsRtcSenderThread() _srs_config->unsubscribe(this); srs_freep(trd); - srs_freep(sendonly_ukt); } srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt) @@ -723,14 +720,14 @@ int SrsRtcSenderThread::cid() srs_error_t SrsRtcSenderThread::start() { srs_error_t err = srs_success; - + srs_freep(trd); trd = new SrsSTCoroutine("rtc_sender", this, _parent_cid); - + if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "rtc_sender"); } - + return err; } @@ -744,16 +741,6 @@ void SrsRtcSenderThread::stop_loop() trd->interrupt(); } -void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* skt) -{ - srs_trace("session %s address changed, update %s -> %s", - rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), skt->get_peer_id().c_str()); - - srs_freep(sendonly_ukt); - sendonly_ukt = skt->copy_sendonly(); - sender = skt->sender(); -} - srs_error_t SrsRtcSenderThread::cycle() { srs_error_t err = srs_success; @@ -1043,8 +1030,8 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets) packets.nn_rtp_bytes += (int)iov->iov_len; // Set the address and control information. - sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr(); - socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen(); + sockaddr_in* addr = (sockaddr_in*)rtc_session->sendonly_skt->peer_addr(); + socklen_t addrlen = (socklen_t)rtc_session->sendonly_skt->peer_addrlen(); mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; @@ -1220,8 +1207,8 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) if (do_send) { // Set the address and control information. - sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr(); - socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen(); + sockaddr_in* addr = (sockaddr_in*)rtc_session->sendonly_skt->peer_addr(); + socklen_t addrlen = (socklen_t)rtc_session->sendonly_skt->peer_addrlen(); mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; @@ -1506,8 +1493,6 @@ srs_error_t SrsRtcSenderThread::package_stap_a(SrsSource* source, SrsSharedPtrMe SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) { - sendonly_ukt = NULL; - report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); rtc_session = session; @@ -1535,12 +1520,10 @@ SrsRtcPublisher::~SrsRtcPublisher() srs_freep(rtp_audio_queue); } -srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest* r) +srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsRequest* r) { srs_error_t err = srs_success; - sendonly_ukt = skt; - video_ssrc = vssrc; audio_ssrc = assrc; req = r; @@ -1570,7 +1553,7 @@ srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, ui return err; } -srs_error_t SrsRtcPublisher::on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcPublisher::on_rtcp_sender_report(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -1661,7 +1644,7 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ return err; } -srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf) { srs_error_t err = srs_success; /* @@ -1724,7 +1707,7 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* return err; } -void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, SrsUdpMuxSocket* skt) +void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc) { // If DTLS is not OK, drop all messages. if (!rtc_session->dtls_session) { @@ -1759,14 +1742,14 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, Sr // FIXME: Merge nack rtcp into one packets. if (rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { // TODO: FIXME: Check error. - skt->sendto(protected_buf, nb_protected_buf, 0); + rtc_session->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); } ++iter; } } -srs_error_t SrsRtcPublisher::send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, SrsRtpQueue* rtp_queue) +srs_error_t SrsRtcPublisher::send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue) { srs_error_t err = srs_success; @@ -1781,12 +1764,12 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, S stream.write_1bytes(kRR); stream.write_2bytes(7); stream.write_4bytes(ssrc); - + uint8_t fraction_lost = rtp_queue->get_fraction_lost(); uint32_t cumulative_number_of_packets_lost = rtp_queue->get_cumulative_number_of_packets_lost() & 0x7FFFFF; uint32_t extended_highest_sequence = rtp_queue->get_extended_highest_sequence(); uint32_t interarrival_jitter = rtp_queue->get_interarrival_jitter(); - + uint32_t rr_lsr = 0; uint32_t rr_dlsr = 0; @@ -1798,7 +1781,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, S uint32_t dlsr = (srs_update_system_time() - lsr_systime) / 1000; rr_dlsr = ((dlsr / 1000) << 16) | ((dlsr % 1000) * 65536 / 1000); } - + stream.write_4bytes(ssrc); stream.write_1bytes(fraction_lost); stream.write_3bytes(cumulative_number_of_packets_lost); @@ -1806,10 +1789,10 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, S stream.write_4bytes(interarrival_jitter); stream.write_4bytes(rr_lsr); stream.write_4bytes(rr_dlsr); - + srs_verbose("RR ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, extended_highest_sequence=%u, interarrival_jitter=%u", ssrc, fraction_lost, cumulative_number_of_packets_lost, extended_highest_sequence, interarrival_jitter); - + char protected_buf[kRtpPacketSize]; int nb_protected_buf = stream.pos(); if ((err = rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf)) != srs_success) { @@ -1817,11 +1800,11 @@ srs_error_t SrsRtcPublisher::send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, S } // TDOO: FIXME: Check error. - skt->sendto(protected_buf, nb_protected_buf, 0); + rtc_session->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); return err; } -srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(SrsUdpMuxSocket* skt, uint32_t ssrc) +srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(uint32_t ssrc) { srs_error_t err = srs_success; @@ -1877,12 +1860,12 @@ srs_error_t SrsRtcPublisher::send_rtcp_xr_rrtr(SrsUdpMuxSocket* skt, uint32_t ss } // TDOO: FIXME: Check error. - skt->sendto(protected_buf, nb_protected_buf, 0); + rtc_session->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); return err; } -srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssrc) +srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(uint32_t ssrc) { srs_error_t err = srs_success; @@ -1898,9 +1881,9 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssr stream.write_2bytes(2); stream.write_4bytes(ssrc); stream.write_4bytes(ssrc); - + srs_trace("RTC PLI ssrc=%u", ssrc); - + char protected_buf[kRtpPacketSize]; int nb_protected_buf = stream.pos(); if ((err = rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf)) != srs_success) { @@ -1908,12 +1891,12 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssr } // TDOO: FIXME: Check error. - skt->sendto(protected_buf, nb_protected_buf, 0); + rtc_session->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); return err; } -srs_error_t SrsRtcPublisher::on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf) +srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -1926,15 +1909,15 @@ srs_error_t SrsRtcPublisher::on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf) uint32_t ssrc = pkt->rtp_header.get_ssrc(); if (ssrc == audio_ssrc) { - return on_audio(skt, pkt); + return on_audio(pkt); } else if (ssrc == video_ssrc) { - return on_video(skt, pkt); + return on_video(pkt); } return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc); } -srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* pkt) +srs_error_t SrsRtcPublisher::on_audio(SrsRtpSharedPacket* pkt) { srs_error_t err = srs_success; @@ -1948,10 +1931,10 @@ srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* if (rtp_audio_queue->should_request_key_frame()) { // TODO: FIXME: Check error. - send_rtcp_fb_pli(skt, audio_ssrc); + send_rtcp_fb_pli(audio_ssrc); } - check_send_nacks(rtp_audio_queue, audio_ssrc, skt); + check_send_nacks(rtp_audio_queue, audio_ssrc); return collect_audio_frame(); } @@ -1969,6 +1952,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frame() for (size_t j = 0; j < frames.size(); ++j) { SrsRtpSharedPacket* pkt = frames[j]; + // TODO: FIXME: Transcode OPUS to AAC. if (pkt->rtp_payload_size() > 0) { SrsMessageHeader header; header.message_type = RTMP_MSG_AudioMessage; @@ -1996,7 +1980,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frame() return err; } -srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* pkt) +srs_error_t SrsRtcPublisher::on_video(SrsRtpSharedPacket* pkt) { srs_error_t err = srs_success; @@ -2011,10 +1995,10 @@ srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* if (rtp_video_queue->should_request_key_frame()) { // TODO: FIXME: Check error. - send_rtcp_fb_pli(skt, video_ssrc); + send_rtcp_fb_pli(video_ssrc); } - check_send_nacks(rtp_video_queue, video_ssrc, skt); + check_send_nacks(rtp_video_queue, video_ssrc); return collect_video_frame(); } @@ -2028,8 +2012,8 @@ srs_error_t SrsRtcPublisher::collect_video_frame() for (size_t i = 0; i < frames.size(); ++i) { if (!frames[i].empty()) { - srs_verbose("collect %d video frames, seq range %u,%u", - frames.size(), frames[i].front()->rtp_header.get_sequence(), + srs_verbose("collect %d video frames, seq range %u,%u", + frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence()); } @@ -2085,7 +2069,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame() idr = true; } } - memcpy(frame_buffer + frame_buffer_index, frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first, + memcpy(frame_buffer + frame_buffer_index, frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first, rtp_h264_header->nalu_offset[j].second); frame_buffer_index += rtp_h264_header->nalu_offset[j].second; } @@ -2167,15 +2151,6 @@ srs_error_t SrsRtcPublisher::collect_video_frame() return err; } -void SrsRtcPublisher::update_sendonly_socket(SrsUdpMuxSocket* skt) -{ - srs_trace("session %s address changed, update %s -> %s", - rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), skt->get_peer_id().c_str()); - - srs_freep(sendonly_ukt); - sendonly_ukt = skt->copy_sendonly(); -} - void SrsRtcPublisher::request_keyframe() { int scid = _srs_context->get_id(); @@ -2187,15 +2162,15 @@ void SrsRtcPublisher::request_keyframe() srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick) { - if (sendonly_ukt) { - // TODO: FIXME: Check error. - send_rtcp_rr(sendonly_ukt, video_ssrc, rtp_video_queue); - send_rtcp_rr(sendonly_ukt, audio_ssrc, rtp_audio_queue); - send_rtcp_xr_rrtr(sendonly_ukt, video_ssrc); - send_rtcp_xr_rrtr(sendonly_ukt, audio_ssrc); - } + srs_error_t err = srs_success; - return srs_success; + // TODO: FIXME: Check error. + send_rtcp_rr(video_ssrc, rtp_video_queue); + send_rtcp_rr(audio_ssrc, rtp_audio_queue); + send_rtcp_xr_rrtr(video_ssrc); + send_rtcp_xr_rrtr(audio_ssrc); + + return err; } SrsRtcSession::SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& un, int context_id) @@ -2208,6 +2183,7 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& source = NULL; publisher = NULL; sender = NULL; + sendonly_skt = NULL; rtc_server = s; dtls_session = new SrsDtlsSession(this); @@ -2228,6 +2204,7 @@ SrsRtcSession::~SrsRtcSession() srs_freep(req); srs_close_stfd(blackhole_stfd); srs_freep(blackhole_addr); + srs_freep(sendonly_skt); } void SrsRtcSession::set_local_sdp(const SrsSdp& sdp) @@ -2279,32 +2256,31 @@ srs_error_t SrsRtcSession::initialize() return err; } -srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req) +srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) { srs_error_t err = srs_success; - if (stun_req->is_binding_request()) { - if ((err = on_binding_request(skt, stun_req)) != srs_success) { - return srs_error_wrap(err, "stun binding request failed"); - } + if (!r->is_binding_request()) { + return err; + } - last_stun_time = srs_get_system_time(); + last_stun_time = srs_get_system_time(); - if (sender && sender->sendonly_ukt) { - // We are running in the ice-lite(server) mode. If client have multi network interface, - // we only choose one candidate pair which is determined by client. - if (stun_req->get_use_candidate() && sender->sendonly_ukt->get_peer_id() != skt->get_peer_id()) { - sender->update_sendonly_socket(skt); - } - } + // We are running in the ice-lite(server) mode. If client have multi network interface, + // we only choose one candidate pair which is determined by client. + if (!sendonly_skt || sendonly_skt->get_peer_id() != skt->get_peer_id()) { + update_sendonly_socket(skt); + } - if (publisher && publisher->sendonly_ukt) { - // We are running in the ice-lite(server) mode. If client have multi network interface, - // we only choose one candidate pair which is determined by client. - if (stun_req->get_use_candidate() && publisher->sendonly_ukt->get_peer_id() != skt->get_peer_id()) { - publisher->update_sendonly_socket(skt); - } - } + // Write STUN messages to blackhole. + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = skt->data(); int len = skt->size(); + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + + if ((err = on_binding_request(r)) != srs_success) { + return srs_error_wrap(err, "stun binding request failed"); } return err; @@ -2317,12 +2293,12 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req #define be32toh ntohl #endif -srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req) +srs_error_t SrsRtcSession::on_binding_request(SrsStunPacket* r) { srs_error_t err = srs_success; bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost); - if (strict_check && stun_req->get_ice_controlled()) { + if (strict_check && r->get_ice_controlled()) { // @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1 // TODO: Send 487 (Role Conflict) error response. return srs_error_new(ERROR_RTC_STUN, "Peer must not in ice-controlled role in ice-lite mode."); @@ -2334,38 +2310,31 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacke SrsAutoFree(SrsBuffer, stream); stun_binding_response.set_message_type(BindingResponse); - stun_binding_response.set_local_ufrag(stun_req->get_remote_ufrag()); - stun_binding_response.set_remote_ufrag(stun_req->get_local_ufrag()); - stun_binding_response.set_transcation_id(stun_req->get_transcation_id()); + stun_binding_response.set_local_ufrag(r->get_remote_ufrag()); + stun_binding_response.set_remote_ufrag(r->get_local_ufrag()); + stun_binding_response.set_transcation_id(r->get_transcation_id()); // FIXME: inet_addr is deprecated, IPV6 support - stun_binding_response.set_mapped_address(be32toh(inet_addr(skt->get_peer_ip().c_str()))); - stun_binding_response.set_mapped_port(skt->get_peer_port()); + stun_binding_response.set_mapped_address(be32toh(inet_addr(sendonly_skt->get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(sendonly_skt->get_peer_port()); if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { return srs_error_wrap(err, "stun binding response encode failed"); } - if ((err = skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { + if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { return srs_error_wrap(err, "stun binding response send failed"); } if (get_session_state() == WAITING_STUN) { set_session_state(DOING_DTLS_HANDSHAKE); - peer_id = skt->get_peer_id(); + peer_id = sendonly_skt->get_peer_id(); rtc_server->insert_into_id_sessions(peer_id, this); set_session_state(DOING_DTLS_HANDSHAKE); srs_trace("rtc session=%s, STUN done, waitting DTLS handshake.", id().c_str()); } - // Write STUN messages to blackhole. - if (blackhole && blackhole_addr && blackhole_stfd) { - // Ignore any error for black-hole. - void* p = skt->data(); int len = skt->size(); - srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); - } - if (blackhole && blackhole_addr && blackhole_stfd) { // Ignore any error for black-hole. void* p = stream->data(); int len = stream->pos(); @@ -2375,7 +2344,7 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacke return err; } -srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -2457,14 +2426,14 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock // TODO: FIXME: Check error. dtls_session->protect_rtp(protected_buf, resend_pkts[i]->payload, nb_protected_buf); - skt->sendto(protected_buf, nb_protected_buf, 0); + sendonly_skt->sendto(protected_buf, nb_protected_buf, 0); } } return err; } -srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -2511,25 +2480,25 @@ srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxS return err; } -srs_error_t SrsRtcSession::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::on_rtcp_xr(char* buf, int nb_buf) { if (publisher == NULL) { return srs_error_new(ERROR_RTC_RTCP, "rtc publisher null"); } - return publisher->on_rtcp_xr(buf, nb_buf, skt); + return publisher->on_rtcp_xr(buf, nb_buf); } -srs_error_t SrsRtcSession::on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::on_rtcp_sender_report(char* buf, int nb_buf) { if (publisher == NULL) { return srs_error_new(ERROR_RTC_RTCP, "rtc publisher null"); } - return publisher->on_rtcp_sender_report(buf, nb_buf, skt); + return publisher->on_rtcp_sender_report(buf, nb_buf); } -srs_error_t SrsRtcSession::on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::on_rtcp_receiver_report(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -2598,7 +2567,7 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ return err; } -srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::on_connection_established() { srs_error_t err = srs_success; @@ -2606,14 +2575,14 @@ srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* skt) if (!local_sdp.media_descs_.empty() && (local_sdp.media_descs_.back().recvonly_ || local_sdp.media_descs_.back().sendrecv_)) { - if ((err = start_publish(skt)) != srs_success) { + if ((err = start_publish()) != srs_success) { return srs_error_wrap(err, "start publish"); } } if (!local_sdp.media_descs_.empty() && (local_sdp.media_descs_.back().sendonly_ || local_sdp.media_descs_.back().sendrecv_)) { - if ((err = start_play(skt)) != srs_success) { + if ((err = start_play()) != srs_success) { return srs_error_wrap(err, "start play"); } } @@ -2621,12 +2590,13 @@ srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* skt) return err; } -srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::start_play() { srs_error_t err = srs_success; srs_freep(sender); - sender = new SrsRtcSenderThread(this, skt, _srs_context->get_id()); + sender = new SrsRtcSenderThread(this, _srs_context->get_id()); + sender->update_sender(sendonly_skt->sender()); uint32_t video_ssrc = 0; uint32_t audio_ssrc = 0; @@ -2654,7 +2624,7 @@ srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* skt) return err; } -srs_error_t SrsRtcSession::start_publish(SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::start_publish() { srs_error_t err = srs_success; @@ -2677,7 +2647,7 @@ srs_error_t SrsRtcSession::start_publish(SrsUdpMuxSocket* skt) } // FIXME: err process. - if ((err = publisher->initialize(skt, video_ssrc, audio_ssrc, req)) != srs_success) { + if ((err = publisher->initialize(video_ssrc, audio_ssrc, req)) != srs_success) { return srs_error_wrap(err, "rtc publisher init"); } @@ -2689,12 +2659,12 @@ bool SrsRtcSession::is_stun_timeout() return last_stun_time + sessionStunTimeout < srs_get_system_time(); } -srs_error_t SrsRtcSession::on_dtls(SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::on_dtls(char* data, int nb_data) { - return dtls_session->on_dtls(skt); + return dtls_session->on_dtls(data, nb_data); } -srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::on_rtcp(char* data, int nb_data) { srs_error_t err = srs_success; @@ -2703,8 +2673,8 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) } char unprotected_buf[kRtpPacketSize]; - int nb_unprotected_buf = skt->size(); - if ((err = dtls_session->unprotect_rtcp(unprotected_buf, skt->data(), nb_unprotected_buf)) != srs_success) { + int nb_unprotected_buf = nb_data; + if ((err = dtls_session->unprotect_rtcp(unprotected_buf, data, nb_unprotected_buf)) != srs_success) { return srs_error_wrap(err, "rtcp unprotect failed"); } @@ -2730,11 +2700,11 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) switch (payload_type) { case kSR: { - err = on_rtcp_sender_report(ph, length, skt); + err = on_rtcp_sender_report(ph, length); break; } case kRR: { - err = on_rtcp_receiver_report(ph, length, skt); + err = on_rtcp_receiver_report(ph, length); break; } case kSDES: { @@ -2747,15 +2717,15 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) break; } case kRtpFb: { - err = on_rtcp_feedback(ph, length, skt); + err = on_rtcp_feedback(ph, length); break; } case kPsFb: { - err = on_rtcp_ps_feedback(ph, length, skt); + err = on_rtcp_ps_feedback(ph, length); break; } case kXR: { - err = on_rtcp_xr(ph, length, skt); + err = on_rtcp_xr(ph, length); break; } default:{ @@ -2775,7 +2745,7 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) return err; } -srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSession::on_rtp(char* data, int nb_data) { srs_error_t err = srs_success; @@ -2788,8 +2758,8 @@ srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* skt) } char* unprotected_buf = new char[kRtpPacketSize]; - int nb_unprotected_buf = skt->size(); - if ((err = dtls_session->unprotect_rtp(unprotected_buf, skt->data(), nb_unprotected_buf)) != srs_success) { + int nb_unprotected_buf = nb_data; + if ((err = dtls_session->unprotect_rtp(unprotected_buf, data, nb_unprotected_buf)) != srs_success) { return srs_error_wrap(err, "rtp unprotect failed"); } @@ -2799,7 +2769,21 @@ srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* skt) srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); } - return publisher->on_rtp(skt, unprotected_buf, nb_unprotected_buf); + return publisher->on_rtp(unprotected_buf, nb_unprotected_buf); +} + +void SrsRtcSession::update_sendonly_socket(SrsUdpMuxSocket* skt) +{ + if (sendonly_skt) { + srs_trace("session %s address changed, update %s -> %s", + id().c_str(), sendonly_skt->get_peer_id().c_str(), skt->get_peer_id().c_str()); + } + + srs_freep(sendonly_skt); + sendonly_skt = skt->copy_sendonly(); + if (sender) { + sender->update_sender(skt->sender()); + } } SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s) @@ -3166,13 +3150,54 @@ srs_error_t SrsRtcServer::listen_udp() srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) { - if (is_stun(reinterpret_cast(skt->data()), skt->size())) { - return on_stun(skt); - } else if (is_dtls(reinterpret_cast(skt->data()), skt->size())) { - return on_dtls(skt); - } else if (is_rtp_or_rtcp(reinterpret_cast(skt->data()), skt->size())) { - return on_rtp_or_rtcp(skt); - } + srs_error_t err = srs_success; + + char* data = skt->data(); int size = skt->size(); + SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(skt->get_peer_id()); + + if (rtc_session) { + // Now, we got the RTC session to handle the packet, switch to its context + // to make all logs write to the "correct" pid+cid. + rtc_session->switch_to_context(); + } + + if (is_stun((uint8_t*)data, size)) { + SrsStunPacket sr; + if ((err = sr.decode(data, size)) != srs_success) { + return srs_error_wrap(err, "decode stun packet failed"); + } + srs_verbose("recv stun packet from %s, use-candidate=%d, ice-controlled=%d, ice-controlling=%d", + skt->get_peer_id().c_str(), sr.get_use_candidate(), sr.get_ice_controlled(), sr.get_ice_controlling()); + + if (!rtc_session) { + rtc_session = find_rtc_session_by_username(sr.get_username()); + if (rtc_session) { + rtc_session->switch_to_context(); + } + } + if (rtc_session == NULL) { + return srs_error_new(ERROR_RTC_STUN, "can not find rtc_session, stun username=%s, peer_id=%s", + sr.get_username().c_str(), skt->get_peer_id().c_str()); + } + + return rtc_session->on_stun(skt, &sr); + } else if (is_dtls((uint8_t*)data, size)) { + if (rtc_session == NULL) { + return srs_error_new(ERROR_RTC_STUN, "can not find rtc_session, peer_id=%s", skt->get_peer_id().c_str()); + } + + return rtc_session->on_dtls(data, size); + } else if (is_rtp_or_rtcp((uint8_t*)data, size)) { + if (rtc_session == NULL) { + return srs_error_new(ERROR_RTC_STUN, "can not find rtc_session, peer_id=%s", skt->get_peer_id().c_str()); + } + + if (is_rtcp((uint8_t*)data, size)) { + return rtc_session->on_rtcp(data, size); + } else { + return rtc_session->on_rtp(data, size); + } + } return srs_error_new(ERROR_RTC_UDP, "unknown udp packet type"); } @@ -3267,69 +3292,6 @@ SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id) return iter->second; } -srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* skt) -{ - srs_error_t err = srs_success; - - SrsStunPacket stun_req; - if ((err = stun_req.decode(skt->data(), skt->size())) != srs_success) { - return srs_error_wrap(err, "decode stun packet failed"); - } - - srs_verbose("recv stun packet from %s, use-candidate=%d, ice-controlled=%d, ice-controlling=%d", - skt->get_peer_id().c_str(), stun_req.get_use_candidate(), stun_req.get_ice_controlled(), stun_req.get_ice_controlling()); - - std::string username = stun_req.get_username(); - SrsRtcSession* rtc_session = find_rtc_session_by_username(username); - if (rtc_session == NULL) { - return srs_error_new(ERROR_RTC_STUN, "can not find rtc_session, stun username=%s", username.c_str()); - } - - // Now, we got the RTC session to handle the packet, switch to its context - // to make all logs write to the "correct" pid+cid. - rtc_session->switch_to_context(); - - return rtc_session->on_stun(skt, &stun_req); -} - -srs_error_t SrsRtcServer::on_dtls(SrsUdpMuxSocket* skt) -{ - SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(skt->get_peer_id()); - - if (rtc_session == NULL) { - return srs_error_new(ERROR_RTC_DTLS, "can not find rtc session by peer_id=%s", skt->get_peer_id().c_str()); - } - - // Now, we got the RTC session to handle the packet, switch to its context - // to make all logs write to the "correct" pid+cid. - rtc_session->switch_to_context(); - - return rtc_session->on_dtls(skt); -} - -srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* skt) -{ - srs_error_t err = srs_success; - - SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(skt->get_peer_id()); - - if (rtc_session == NULL) { - return srs_error_new(ERROR_RTC_RTP, "can not find rtc session by peer_id=%s", skt->get_peer_id().c_str()); - } - - // Now, we got the RTC session to handle the packet, switch to its context - // to make all logs write to the "correct" pid+cid. - rtc_session->switch_to_context(); - - if (is_rtcp(reinterpret_cast(skt->data()), skt->size())) { - err = rtc_session->on_rtcp(skt); - } else { - err = rtc_session->on_rtp(skt); - } - - return err; -} - SrsRtcSession* SrsRtcServer::find_rtc_session_by_username(const std::string& username) { map::iterator iter = map_username_session.find(username); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index b4eb22cfb..3441e5f4d 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -122,8 +122,8 @@ public: srs_error_t initialize(SrsRequest* r); - srs_error_t on_dtls(SrsUdpMuxSocket* skt); - srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* skt); + srs_error_t on_dtls(char* data, int nb_data); + srs_error_t on_dtls_handshake_done(); srs_error_t on_dtls_application_data(const char* data, const int len); public: srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); @@ -132,7 +132,7 @@ public: srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); private: - srs_error_t handshake(SrsUdpMuxSocket* skt); + srs_error_t handshake(); private: srs_error_t srtp_initialize(); srs_error_t srtp_send_init(); @@ -208,8 +208,6 @@ private: uint16_t audio_sequence; private: uint16_t video_sequence; -public: - SrsUdpMuxSocket* sendonly_ukt; private: ISrsUdpSender* sender; private: @@ -221,10 +219,11 @@ private: int mw_msgs; bool realtime; public: - SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid); + SrsRtcSenderThread(SrsRtcSession* s, int parent_cid); virtual ~SrsRtcSenderThread(); public: srs_error_t initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt); + void update_sender(ISrsUdpSender* s) { sender = s; } // interface ISrsReloadHandler public: virtual srs_error_t on_reload_rtc_server(); @@ -236,8 +235,6 @@ public: virtual srs_error_t start(); virtual void stop(); virtual void stop_loop(); -public: - void update_sendonly_socket(SrsUdpMuxSocket* skt); public: virtual srs_error_t cycle(); private: @@ -258,8 +255,6 @@ class SrsRtcPublisher : virtual public ISrsHourGlass { private: SrsHourGlass* report_timer; -public: - SrsUdpMuxSocket* sendonly_ukt; private: SrsRtcSession* rtc_session; uint32_t video_ssrc; @@ -281,23 +276,22 @@ public: SrsRtcPublisher(SrsRtcSession* session); virtual ~SrsRtcPublisher(); public: - srs_error_t initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest* req); - srs_error_t on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt); - srs_error_t on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt); + srs_error_t initialize(uint32_t vssrc, uint32_t assrc, SrsRequest* req); + srs_error_t on_rtcp_sender_report(char* buf, int nb_buf); + srs_error_t on_rtcp_xr(char* buf, int nb_buf); private: - void check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, SrsUdpMuxSocket* skt); - srs_error_t send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, SrsRtpQueue* rtp_queue); - srs_error_t send_rtcp_xr_rrtr(SrsUdpMuxSocket* skt, uint32_t ssrc); - srs_error_t send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssrc); + void check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc); + srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue); + srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc); + srs_error_t send_rtcp_fb_pli(uint32_t ssrc); public: - srs_error_t on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf); + srs_error_t on_rtp(char* buf, int nb_buf); private: - srs_error_t on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* pkt); + srs_error_t on_audio(SrsRtpSharedPacket* pkt); srs_error_t collect_audio_frame(); - srs_error_t on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* pkt); + srs_error_t on_video(SrsRtpSharedPacket* pkt); srs_error_t collect_video_frame(); public: - void update_sendonly_socket(SrsUdpMuxSocket* skt); void request_keyframe(); // interface ISrsHourGlass public: @@ -311,13 +305,17 @@ class SrsRtcSession friend class SrsRtcPublisher; private: SrsRtcServer* rtc_server; - SrsSdp remote_sdp; - SrsSdp local_sdp; SrsRtcSessionStateType session_state; SrsDtlsSession* dtls_session; SrsRtcSenderThread* sender; + SrsRtcPublisher* publisher; +private: + SrsUdpMuxSocket* sendonly_skt; std::string username; std::string peer_id; +private: + // The timeout of session, keep alive by STUN ping pong. + srs_utime_t sessionStunTimeout; srs_utime_t last_stun_time; private: // For each RTC session, we use a specified cid for debugging logs. @@ -327,17 +325,14 @@ private: // Sepcifies by HTTP API, query encrypt, optional. // TODO: FIXME: Support reload. bool encrypt; - // The timeout of session, keep alive by STUN ping pong. - srs_utime_t sessionStunTimeout; + SrsRequest* req; + SrsSource* source; + SrsSdp remote_sdp; + SrsSdp local_sdp; private: bool blackhole; sockaddr_in* blackhole_addr; srs_netfd_t blackhole_stfd; -public: - SrsRequest* req; - SrsSource* source; -private: - SrsRtcPublisher* publisher; public: SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& un, int context_id); virtual ~SrsRtcSession(); @@ -360,27 +355,28 @@ public: void switch_to_context(); int context_id() { return cid; } + public: srs_error_t initialize(); - srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req); - srs_error_t on_dtls(SrsUdpMuxSocket* skt); - srs_error_t on_rtcp(SrsUdpMuxSocket* skt); - srs_error_t on_rtp(SrsUdpMuxSocket* skt); -public: - srs_error_t send_client_hello(SrsUdpMuxSocket* skt); - srs_error_t on_connection_established(SrsUdpMuxSocket* skt); - srs_error_t start_play(SrsUdpMuxSocket* skt); - srs_error_t start_publish(SrsUdpMuxSocket* skt); -public: + // The peer address may change, we can identify that by STUN messages. + srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r); + srs_error_t on_dtls(char* data, int nb_data); + srs_error_t on_rtcp(char* data, int nb_data); + srs_error_t on_rtp(char* data, int nb_data); +public: + srs_error_t send_client_hello(); + srs_error_t on_connection_established(); + srs_error_t start_play(); + srs_error_t start_publish(); bool is_stun_timeout(); + void update_sendonly_socket(SrsUdpMuxSocket* skt); private: - srs_error_t on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req); -private: - srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt); - srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt); - srs_error_t on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt); - srs_error_t on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt); - srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt); + srs_error_t on_binding_request(SrsStunPacket* r); + srs_error_t on_rtcp_feedback(char* data, int nb_data); + srs_error_t on_rtcp_ps_feedback(char* data, int nb_data); + srs_error_t on_rtcp_xr(char* data, int nb_data); + srs_error_t on_rtcp_sender_report(char* data, int nb_data); + srs_error_t on_rtcp_receiver_report(char* data, int nb_data); }; class SrsUdpMuxSender : virtual public ISrsUdpSender, virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler @@ -455,10 +451,6 @@ public: bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); void check_and_clean_timeout_session(); int nn_sessions() { return (int)map_username_session.size(); } -private: - srs_error_t on_stun(SrsUdpMuxSocket* skt); - srs_error_t on_dtls(SrsUdpMuxSocket* skt); - srs_error_t on_rtp_or_rtcp(SrsUdpMuxSocket* skt); private: SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag); SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id);