Refactor RTC publish, reorder functions.

pull/1753/head
winlin 5 years ago
parent ebdc03416a
commit eace693ae9

@ -1534,27 +1534,6 @@ srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, ui
return err;
}
void SrsRtcPublisher::update_sendonly_socket(SrsUdpMuxSocket* skt)
{
srs_trace("session %s address changed, update %s -> %s",
rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), skt->get_peer_id().c_str());
srs_freep(sendonly_ukt);
sendonly_ukt = skt->copy_sendonly();
}
srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick)
{
if (sendonly_ukt) {
send_rtcp_rr(sendonly_ukt, video_ssrc, rtp_video_queue);
send_rtcp_rr(sendonly_ukt, audio_ssrc, rtp_audio_queue);
send_rtcp_xr_rrtr(sendonly_ukt, video_ssrc);
send_rtcp_xr_rrtr(sendonly_ukt, audio_ssrc);
}
return srs_success;
}
srs_error_t SrsRtcPublisher::on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf)
{
srs_error_t err = srs_success;
@ -1576,48 +1555,6 @@ srs_error_t SrsRtcPublisher::on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf)
return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc);
}
void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, SrsUdpMuxSocket* skt)
{
// If DTLS is not OK, drop all messages.
if (!rtc_session->dtls_session) {
return;
}
vector<uint16_t> nack_seqs;
rtp_queue->nack_.get_nack_seqs(nack_seqs);
vector<uint16_t>::iterator iter = nack_seqs.begin();
while (iter != nack_seqs.end()) {
char buf[kRtpPacketSize];
SrsBuffer stream(buf, sizeof(buf));
// FIXME: Replace magic number.
stream.write_1bytes(0x81);
stream.write_1bytes(kRtpFb);
stream.write_2bytes(3);
stream.write_4bytes(ssrc);
stream.write_4bytes(ssrc);
uint16_t pid = *iter;
uint16_t blp = 0;
while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) {
blp |= (1 << (*(iter + 1) - pid - 1));
++iter;
}
stream.write_2bytes(pid);
stream.write_2bytes(blp);
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
// FIXME: Merge nack rtcp into one packets.
if (rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) {
// TODO: FIXME: Check error.
skt->sendto(protected_buf, nb_protected_buf, 0);
}
++iter;
}
}
srs_error_t SrsRtcPublisher::on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt)
{
srs_error_t err = srs_success;
@ -1685,6 +1622,7 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
uint32_t sender_packet_count = stream->read_4bytes();
uint32_t sender_octec_count = stream->read_4bytes();
(void)sender_packet_count; (void)sender_octec_count; (void)rtp_time;
srs_verbose("sender report, ssrc_of_sender=%u, rtp_time=%u, sender_packet_count=%u, sender_octec_count=%u",
ssrc_of_sender, rtp_time, sender_packet_count, sender_octec_count);
@ -1756,7 +1694,7 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket*
int rtt_ntp = compact_ntp - lrr - dlrr;
int rtt = ((rtt_ntp * 1000) >> 16) + ((rtt_ntp >> 16) * 1000);
srs_verbose("ssrc=%u, compact_ntp=%u, lrr=%u, dlrr=%u, rtt=%d",
srs_verbose("ssrc=%u, compact_ntp=%u, lrr=%u, dlrr=%u, rtt=%d",
ssrc, compact_ntp, lrr, dlrr, rtt);
if (ssrc == video_ssrc) {
@ -1771,6 +1709,48 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket*
return err;
}
void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, SrsUdpMuxSocket* skt)
{
// If DTLS is not OK, drop all messages.
if (!rtc_session->dtls_session) {
return;
}
vector<uint16_t> nack_seqs;
rtp_queue->nack_.get_nack_seqs(nack_seqs);
vector<uint16_t>::iterator iter = nack_seqs.begin();
while (iter != nack_seqs.end()) {
char buf[kRtpPacketSize];
SrsBuffer stream(buf, sizeof(buf));
// FIXME: Replace magic number.
stream.write_1bytes(0x81);
stream.write_1bytes(kRtpFb);
stream.write_2bytes(3);
stream.write_4bytes(ssrc);
stream.write_4bytes(ssrc);
uint16_t pid = *iter;
uint16_t blp = 0;
while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) {
blp |= (1 << (*(iter + 1) - pid - 1));
++iter;
}
stream.write_2bytes(pid);
stream.write_2bytes(blp);
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
// FIXME: Merge nack rtcp into one packets.
if (rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) {
// TODO: FIXME: Check error.
skt->sendto(protected_buf, nb_protected_buf, 0);
}
++iter;
}
}
srs_error_t SrsRtcPublisher::send_rtcp_rr(SrsUdpMuxSocket* skt, uint32_t ssrc, SrsRtpQueue* rtp_queue)
{
srs_error_t err = srs_success;
@ -1938,27 +1918,6 @@ srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket*
return collect_audio_frame();
}
srs_error_t SrsRtcPublisher::collect_audio_frame()
{
srs_error_t err = srs_success;
std::vector<std::vector<SrsRtpSharedPacket*> > frames;
rtp_audio_queue->get_and_clean_collected_frames(frames);
for (size_t i = 0; i < frames.size(); ++i) {
if (! frames[i].empty()) {
srs_verbose("collect %d audio frames, seq range %u,%u",
frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence());
}
for (size_t n = 0; n < frames[i].size(); ++n) {
srs_freep(frames[i][n]);
}
}
return err;
}
srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt)
{
srs_error_t err = srs_success;
@ -1980,6 +1939,27 @@ srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket*
return collect_video_frame();
}
srs_error_t SrsRtcPublisher::collect_audio_frame()
{
srs_error_t err = srs_success;
std::vector<std::vector<SrsRtpSharedPacket*> > frames;
rtp_audio_queue->get_and_clean_collected_frames(frames);
for (size_t i = 0; i < frames.size(); ++i) {
if (! frames[i].empty()) {
srs_verbose("collect %d audio frames, seq range %u,%u",
frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence());
}
for (size_t n = 0; n < frames[i].size(); ++n) {
srs_freep(frames[i][n]);
}
}
return err;
}
srs_error_t SrsRtcPublisher::collect_video_frame()
{
srs_error_t err = srs_success;
@ -2093,6 +2073,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame()
SrsMessageHeader header;
header.message_type = 9;
// TODO: FIXME: Maybe the tbn is not 90k.
header.timestamp = timestamp / 90;
SrsCommonMessage* shared_video = new SrsCommonMessage();
SrsAutoFree(SrsCommonMessage, shared_video);
@ -2106,7 +2087,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame()
srs_verbose("rtp on video header");
}
if (! sps.empty() && ! pps.empty()) {
if (!sps.empty() && !pps.empty()) {
if (source == NULL) {
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
@ -2128,6 +2109,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame()
SrsMessageHeader header;
header.message_type = 9;
// TODO: FIXME: Maybe the tbn is not 90k.
header.timestamp = timestamp / 90;
SrsCommonMessage* shared_video = new SrsCommonMessage();
SrsAutoFree(SrsCommonMessage, shared_video);
@ -2142,6 +2124,27 @@ srs_error_t SrsRtcPublisher::collect_video_frame()
return err;
}
void SrsRtcPublisher::update_sendonly_socket(SrsUdpMuxSocket* skt)
{
srs_trace("session %s address changed, update %s -> %s",
rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), skt->get_peer_id().c_str());
srs_freep(sendonly_ukt);
sendonly_ukt = skt->copy_sendonly();
}
srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick)
{
if (sendonly_ukt) {
// TODO: FIXME: Check error.
send_rtcp_rr(sendonly_ukt, video_ssrc, rtp_video_queue);
send_rtcp_rr(sendonly_ukt, audio_ssrc, rtp_audio_queue);
send_rtcp_xr_rrtr(sendonly_ukt, video_ssrc);
send_rtcp_xr_rrtr(sendonly_ukt, audio_ssrc);
}
return srs_success;
}
SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id)
{

@ -186,6 +186,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
SrsRtpNackInfo* nack_info = NULL;
if ((nack_info = nack_.find(seq)) != NULL) {
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
(void)nack_rtt;
srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms",
seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt);
nack_.remove(seq);

Loading…
Cancel
Save