diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 33afa2455..427c44a3d 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1534,27 +1534,6 @@ srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, ui return err; } -void SrsRtcPublisher::update_sendonly_socket(SrsUdpMuxSocket* skt) -{ - srs_trace("session %s address changed, update %s -> %s", - rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), skt->get_peer_id().c_str()); - - srs_freep(sendonly_ukt); - sendonly_ukt = skt->copy_sendonly(); -} - -srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick) -{ - if (sendonly_ukt) { - send_rtcp_rr(sendonly_ukt, video_ssrc, rtp_video_queue); - send_rtcp_rr(sendonly_ukt, audio_ssrc, rtp_audio_queue); - send_rtcp_xr_rrtr(sendonly_ukt, video_ssrc); - send_rtcp_xr_rrtr(sendonly_ukt, audio_ssrc); - } - - return srs_success; -} - srs_error_t SrsRtcPublisher::on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -1576,48 +1555,6 @@ srs_error_t SrsRtcPublisher::on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf) return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc); } -void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, SrsUdpMuxSocket* skt) -{ - // If DTLS is not OK, drop all messages. - if (!rtc_session->dtls_session) { - return; - } - - vector nack_seqs; - rtp_queue->nack_.get_nack_seqs(nack_seqs); - vector::iterator iter = nack_seqs.begin(); - while (iter != nack_seqs.end()) { - char buf[kRtpPacketSize]; - SrsBuffer stream(buf, sizeof(buf)); - // FIXME: Replace magic number. - stream.write_1bytes(0x81); - stream.write_1bytes(kRtpFb); - stream.write_2bytes(3); - stream.write_4bytes(ssrc); - stream.write_4bytes(ssrc); - uint16_t pid = *iter; - uint16_t blp = 0; - while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) { - blp |= (1 << (*(iter + 1) - pid - 1)); - ++iter; - } - - stream.write_2bytes(pid); - stream.write_2bytes(blp); - - char protected_buf[kRtpPacketSize]; - int nb_protected_buf = stream.pos(); - - // FIXME: Merge nack rtcp into one packets. - if (rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { - // TODO: FIXME: Check error. - skt->sendto(protected_buf, nb_protected_buf, 0); - } - - ++iter; - } -} - srs_error_t SrsRtcPublisher::on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; @@ -1685,6 +1622,7 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ uint32_t sender_packet_count = stream->read_4bytes(); uint32_t sender_octec_count = stream->read_4bytes(); + (void)sender_packet_count; (void)sender_octec_count; (void)rtp_time; srs_verbose("sender report, ssrc_of_sender=%u, rtp_time=%u, sender_packet_count=%u, sender_octec_count=%u", ssrc_of_sender, rtp_time, sender_packet_count, sender_octec_count); @@ -1756,7 +1694,7 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* int rtt_ntp = compact_ntp - lrr - dlrr; int rtt = ((rtt_ntp * 1000) >> 16) + ((rtt_ntp >> 16) * 1000); - srs_verbose("ssrc=%u, compact_ntp=%u, lrr=%u, dlrr=%u, rtt=%d", + srs_verbose("ssrc=%u, compact_ntp=%u, lrr=%u, dlrr=%u, rtt=%d", ssrc, compact_ntp, lrr, dlrr, rtt); if (ssrc == video_ssrc) { @@ -1771,6 +1709,48 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* return err; } +void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, SrsUdpMuxSocket* skt) +{ + // If DTLS is not OK, drop all messages. + if (!rtc_session->dtls_session) { + return; + } + + vector nack_seqs; + rtp_queue->nack_.get_nack_seqs(nack_seqs); + vector::iterator iter = nack_seqs.begin(); + while (iter != nack_seqs.end()) { + char buf[kRtpPacketSize]; + SrsBuffer stream(buf, sizeof(buf)); + // FIXME: Replace magic number. + stream.write_1bytes(0x81); + stream.write_1bytes(kRtpFb); + stream.write_2bytes(3); + stream.write_4bytes(ssrc); + stream.write_4bytes(ssrc); + uint16_t pid = *iter; + uint16_t blp = 0; + while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) { + blp |= (1 << (*(iter + 1) - pid - 1)); + ++iter; + } + + stream.write_2bytes(pid); + stream.write_2bytes(blp); + + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = stream.pos(); + + // FIXME: Merge nack rtcp into one packets. + if (rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) { + // TODO: FIXME: Check error. + skt->sendto(protected_buf, nb_protected_buf, 0); + } + + ++iter; + } +} + srs_error_t SrsRtcPublisher::send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, SrsRtpQueue* rtp_queue) { srs_error_t err = srs_success; @@ -1938,27 +1918,6 @@ srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* return collect_audio_frame(); } -srs_error_t SrsRtcPublisher::collect_audio_frame() -{ - srs_error_t err = srs_success; - - std::vector > frames; - rtp_audio_queue->get_and_clean_collected_frames(frames); - - for (size_t i = 0; i < frames.size(); ++i) { - if (! frames[i].empty()) { - srs_verbose("collect %d audio frames, seq range %u,%u", - frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence()); - } - - for (size_t n = 0; n < frames[i].size(); ++n) { - srs_freep(frames[i][n]); - } - } - - return err; -} - srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt) { srs_error_t err = srs_success; @@ -1980,6 +1939,27 @@ srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* return collect_video_frame(); } +srs_error_t SrsRtcPublisher::collect_audio_frame() +{ + srs_error_t err = srs_success; + + std::vector > frames; + rtp_audio_queue->get_and_clean_collected_frames(frames); + + for (size_t i = 0; i < frames.size(); ++i) { + if (! frames[i].empty()) { + srs_verbose("collect %d audio frames, seq range %u,%u", + frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence()); + } + + for (size_t n = 0; n < frames[i].size(); ++n) { + srs_freep(frames[i][n]); + } + } + + return err; +} + srs_error_t SrsRtcPublisher::collect_video_frame() { srs_error_t err = srs_success; @@ -2093,6 +2073,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame() SrsMessageHeader header; header.message_type = 9; + // TODO: FIXME: Maybe the tbn is not 90k. header.timestamp = timestamp / 90; SrsCommonMessage* shared_video = new SrsCommonMessage(); SrsAutoFree(SrsCommonMessage, shared_video); @@ -2106,7 +2087,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame() srs_verbose("rtp on video header"); } - if (! sps.empty() && ! pps.empty()) { + if (!sps.empty() && !pps.empty()) { if (source == NULL) { // TODO: FIXME: Should refactor it, directly use http server as handler. ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); @@ -2128,6 +2109,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame() SrsMessageHeader header; header.message_type = 9; + // TODO: FIXME: Maybe the tbn is not 90k. header.timestamp = timestamp / 90; SrsCommonMessage* shared_video = new SrsCommonMessage(); SrsAutoFree(SrsCommonMessage, shared_video); @@ -2142,6 +2124,27 @@ srs_error_t SrsRtcPublisher::collect_video_frame() return err; } +void SrsRtcPublisher::update_sendonly_socket(SrsUdpMuxSocket* skt) +{ + srs_trace("session %s address changed, update %s -> %s", + rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), skt->get_peer_id().c_str()); + + srs_freep(sendonly_ukt); + sendonly_ukt = skt->copy_sendonly(); +} + +srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick) +{ + if (sendonly_ukt) { + // TODO: FIXME: Check error. + send_rtcp_rr(sendonly_ukt, video_ssrc, rtp_video_queue); + send_rtcp_rr(sendonly_ukt, audio_ssrc, rtp_audio_queue); + send_rtcp_xr_rrtr(sendonly_ukt, video_ssrc); + send_rtcp_xr_rrtr(sendonly_ukt, audio_ssrc); + } + + return srs_success; +} SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) { diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 47f8587a9..942c244aa 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -186,6 +186,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) SrsRtpNackInfo* nack_info = NULL; if ((nack_info = nack_.find(seq)) != NULL) { int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0; + (void)nack_rtt; srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms", seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt); nack_.remove(seq);