Refactor RTC publisher code

pull/1753/head
winlin 5 years ago
parent 7692e589ed
commit 89cdfe2f50

@ -1935,9 +1935,12 @@ srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket*
return srs_error_wrap(err, "rtp opus demux failed");
}
// TODO: FIXME: Rename it.
// TODO: FIXME: Error check.
rtp_audio_queue->insert(rtp_pkt);
if (rtp_audio_queue->get_and_clean_if_needed_rqeuest_key_frame()) {
if (rtp_audio_queue->get_and_clean_if_needed_request_key_frame()) {
// TODO: FIXME: Check error.
send_rtcp_fb_pli(skt, audio_ssrc);
}
@ -1946,27 +1949,6 @@ srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket*
return collect_audio_frame();
}
srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt)
{
srs_error_t err = srs_success;
rtp_pkt->rtp_payload_header = new SrsRtpH264Header();
if ((err = rtp_h264_demuxer->parse(rtp_pkt)) != srs_success) {
return srs_error_wrap(err, "rtp h264 demux failed");
}
rtp_video_queue->insert(rtp_pkt);
if (rtp_video_queue->get_and_clean_if_needed_rqeuest_key_frame()) {
send_rtcp_fb_pli(skt, video_ssrc);
}
check_send_nacks(rtp_video_queue, video_ssrc, skt);
return collect_video_frame();
}
srs_error_t SrsRtcPublisher::collect_audio_frame()
{
srs_error_t err = srs_success;
@ -1980,6 +1962,8 @@ srs_error_t SrsRtcPublisher::collect_audio_frame()
frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence());
}
// TODO: FIXME: Write audio frame to source.
for (size_t n = 0; n < frames[i].size(); ++n) {
srs_freep(frames[i][n]);
}
@ -1988,6 +1972,27 @@ srs_error_t SrsRtcPublisher::collect_audio_frame()
return err;
}
srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt)
{
srs_error_t err = srs_success;
rtp_pkt->rtp_payload_header = new SrsRtpH264Header();
if ((err = rtp_h264_demuxer->parse(rtp_pkt)) != srs_success) {
return srs_error_wrap(err, "rtp h264 demux failed");
}
rtp_video_queue->insert(rtp_pkt);
if (rtp_video_queue->get_and_clean_if_needed_request_key_frame()) {
send_rtcp_fb_pli(skt, video_ssrc);
}
check_send_nacks(rtp_video_queue, video_ssrc, skt);
return collect_video_frame();
}
srs_error_t SrsRtcPublisher::collect_video_frame()
{
srs_error_t err = srs_success;

@ -292,10 +292,11 @@ private:
srs_error_t send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssrc);
private:
srs_error_t on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt);
srs_error_t collect_audio_frame();
private:
srs_error_t on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt);
private:
srs_error_t collect_video_frame();
srs_error_t collect_audio_frame();
public:
void update_sendonly_socket(SrsUdpMuxSocket* skt);
// interface ISrsHourGlass

@ -171,16 +171,18 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
uint16_t seq = rtp_pkt->rtp_header.get_sequence();
// TODO: FIXME: Update time for each packet, may hurt performance.
srs_utime_t now = srs_update_system_time();
// First packet recv, init head_sequence and highest_sequence.
if (! initialized_) {
if (!initialized_) {
initialized_ = true;
head_sequence_ = seq;
highest_sequence_ = seq;
++num_of_packet_received_;
// TODO: FIXME: Covert time to srs_utime_t.
last_trans_time_ = now/1000 - rtp_pkt->rtp_header.get_timestamp()/90;
} else {
SrsRtpNackInfo* nack_info = NULL;
@ -219,7 +221,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
} else {
// Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first packet client sented.
if (! start_collected_) {
if (!start_collected_) {
if (seq_cmp(seq, head_sequence_)) {
srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", head_sequence_, seq);
head_sequence_ = seq;
@ -267,6 +269,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
delete old_pkt;
}
// TODO: FIXME: Change to ptr of ptr.
old_pkt = rtp_pkt->copy();
// Marker bit means the last packet of frame received.
@ -295,7 +298,7 @@ void SrsRtpQueue::get_and_clean_collected_frames(std::vector<std::vector<SrsRtpS
frames.swap(frames_);
}
bool SrsRtpQueue::get_and_clean_if_needed_rqeuest_key_frame()
bool SrsRtpQueue::get_and_clean_if_needed_request_key_frame()
{
if (request_key_frame_) {
request_key_frame_ = false;
@ -334,7 +337,7 @@ void SrsRtpQueue::notify_nack_list_full()
++head_sequence_;
}
if (! found_key_frame) {
if (!found_key_frame) {
srs_verbose("no found first packet of key frame, request key frame");
request_key_frame_ = true;
head_sequence_ = highest_sequence_;
@ -398,13 +401,13 @@ void SrsRtpQueue::collect_packet()
}
// We must collect frame from first packet to last packet.
if (s == head_sequence_ && pkt->rtp_payload_size() != 0 && ! pkt->rtp_payload_header->is_first_packet_of_frame) {
if (s == head_sequence_ && pkt->rtp_payload_size() != 0 && !pkt->rtp_payload_header->is_first_packet_of_frame) {
break;
}
frame.push_back(pkt->copy());
if (pkt->rtp_header.get_marker() || one_packet_per_frame_) {
if (! start_collected_) {
if (!start_collected_) {
start_collected_ = true;
}
frames_.push_back(frame);

@ -142,7 +142,7 @@ public:
srs_error_t remove(uint16_t seq);
public:
void get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames);
bool get_and_clean_if_needed_rqeuest_key_frame();
bool get_and_clean_if_needed_request_key_frame();
void notify_drop_seq(uint16_t seq);
void notify_nack_list_full();
public:

Loading…
Cancel
Save