Rename functions for RTC publisher

pull/1753/head
winlin 5 years ago
parent f37ffdf740
commit 583ae52df8

@ -1573,18 +1573,18 @@ srs_error_t SrsRtcPublisher::on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf)
{
srs_error_t err = srs_success;
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
SrsAutoFree(SrsRtpSharedPacket, rtp_shared_pkt);
if ((err = rtp_shared_pkt->decode(buf, nb_buf)) != srs_success) {
SrsRtpSharedPacket* pkt = new SrsRtpSharedPacket();
SrsAutoFree(SrsRtpSharedPacket, pkt);
if ((err = pkt->decode(buf, nb_buf)) != srs_success) {
return srs_error_wrap(err, "rtp packet decode failed");
}
uint32_t ssrc = rtp_shared_pkt->rtp_header.get_ssrc();
uint32_t ssrc = pkt->rtp_header.get_ssrc();
if (ssrc == audio_ssrc) {
return on_audio(skt, rtp_shared_pkt);
return on_audio(skt, pkt);
} else if (ssrc == video_ssrc) {
return on_video(skt, rtp_shared_pkt);
return on_video(skt, pkt);
}
return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc);
@ -1933,20 +1933,19 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssr
return err;
}
srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt)
srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* pkt)
{
srs_error_t err = srs_success;
rtp_pkt->rtp_payload_header = new SrsRtpOpusHeader();
if ((err = rtp_opus_demuxer->parse(rtp_pkt)) != srs_success) {
pkt->rtp_payload_header = new SrsRtpOpusHeader();
if ((err = rtp_opus_demuxer->parse(pkt)) != srs_success) {
return srs_error_wrap(err, "rtp opus demux failed");
}
// TODO: FIXME: Rename it.
// TODO: FIXME: Error check.
rtp_audio_queue->insert(rtp_pkt);
rtp_audio_queue->consume(pkt);
if (rtp_audio_queue->get_and_clean_if_needed_request_key_frame()) {
if (rtp_audio_queue->should_request_key_frame()) {
// TODO: FIXME: Check error.
send_rtcp_fb_pli(skt, audio_ssrc);
}
@ -1961,7 +1960,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frame()
srs_error_t err = srs_success;
std::vector<std::vector<SrsRtpSharedPacket*> > frames;
rtp_audio_queue->get_and_clean_collected_frames(frames);
rtp_audio_queue->collect_frames(frames);
for (size_t i = 0; i < frames.size(); ++i) {
if (!frames[i].empty()) {
@ -1979,19 +1978,20 @@ srs_error_t SrsRtcPublisher::collect_audio_frame()
return err;
}
srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt)
srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* pkt)
{
srs_error_t err = srs_success;
rtp_pkt->rtp_payload_header = new SrsRtpH264Header();
pkt->rtp_payload_header = new SrsRtpH264Header();
if ((err = rtp_h264_demuxer->parse(rtp_pkt)) != srs_success) {
if ((err = rtp_h264_demuxer->parse(pkt)) != srs_success) {
return srs_error_wrap(err, "rtp h264 demux failed");
}
rtp_video_queue->insert(rtp_pkt);
// TODO: FIXME: Error check.
rtp_video_queue->consume(pkt);
if (rtp_video_queue->get_and_clean_if_needed_request_key_frame()) {
if (rtp_video_queue->should_request_key_frame()) {
// TODO: FIXME: Check error.
send_rtcp_fb_pli(skt, video_ssrc);
}
@ -2006,7 +2006,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame()
srs_error_t err = srs_success;
std::vector<std::vector<SrsRtpSharedPacket*> > frames;
rtp_video_queue->get_and_clean_collected_frames(frames);
rtp_video_queue->collect_frames(frames);
for (size_t i = 0; i < frames.size(); ++i) {
if (!frames[i].empty()) {

@ -291,10 +291,10 @@ private:
srs_error_t send_rtcp_xr_rrtr(SrsUdpMuxSocket* skt, uint32_t ssrc);
srs_error_t send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssrc);
private:
srs_error_t on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt);
srs_error_t on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* pkt);
srs_error_t collect_audio_frame();
private:
srs_error_t on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt);
srs_error_t on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* pkt);
private:
srs_error_t collect_video_frame();
public:

@ -269,14 +269,14 @@ SrsRtpQueue::~SrsRtpQueue()
srs_freep(nack_);
}
srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
srs_error_t SrsRtpQueue::consume(SrsRtpSharedPacket* pkt)
{
srs_error_t err = srs_success;
// TODO: FIXME: Update time for each packet, may hurt performance.
srs_utime_t now = srs_update_system_time();
uint16_t seq = rtp_pkt->rtp_header.get_sequence();
uint16_t seq = pkt->rtp_header.get_sequence();
SrsRtpNackInfo* nack_info = nack_->find(seq);
if (nack_info) {
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
@ -289,9 +289,9 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
// Calc jitter time, ignore nack packets.
// TODO: FIXME: Covert time to srs_utime_t.
if (last_trans_time_ == -1) {
last_trans_time_ = now / 1000 - rtp_pkt->rtp_header.get_timestamp() / 90;
last_trans_time_ = now / 1000 - pkt->rtp_header.get_timestamp() / 90;
} else if (!nack_info) {
int trans_time = now / 1000 - rtp_pkt->rtp_header.get_timestamp() / 90;
int trans_time = now / 1000 - pkt->rtp_header.get_timestamp() / 90;
int cur_jitter = trans_time - last_trans_time_;
if (cur_jitter < 0) {
@ -338,25 +338,25 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
}
// TODO: FIXME: Change to ptr of ptr.
queue_->set(seq, rtp_pkt->copy());
queue_->set(seq, pkt->copy());
// Collect packets to frame when:
// 1. Marker bit means the last packet of frame received.
// 2. Queue has lots of packets, the load is heavy.
// 3. The frame contains only one packet for each frame.
if (rtp_pkt->rtp_header.get_marker() || queue_->is_heavy() || one_packet_per_frame_) {
if (pkt->rtp_header.get_marker() || queue_->is_heavy() || one_packet_per_frame_) {
collect_packet();
}
return err;
}
void SrsRtpQueue::get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames)
void SrsRtpQueue::collect_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames)
{
frames.swap(frames_);
}
bool SrsRtpQueue::get_and_clean_if_needed_request_key_frame()
bool SrsRtpQueue::should_request_key_frame()
{
if (request_key_frame_) {
request_key_frame_ = false;

@ -160,6 +160,7 @@ private:
uint64_t nn_collected_frames;
SrsRtpRingBuffer* queue_;
SrsRtpNackForReceiver* nack_;
bool one_packet_per_frame_;
private:
double jitter_;
// TODO: FIXME: Covert time to srs_utime_t.
@ -168,8 +169,6 @@ private:
uint64_t pre_number_of_packet_lossed_;
uint64_t num_of_packet_received_;
uint64_t number_of_packet_lossed_;
private:
bool one_packet_per_frame_;
private:
std::vector<std::vector<SrsRtpSharedPacket*> > frames_;
bool request_key_frame_;
@ -177,10 +176,9 @@ public:
SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false);
virtual ~SrsRtpQueue();
public:
srs_error_t insert(SrsRtpSharedPacket* rtp_pkt);
public:
void get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames);
bool get_and_clean_if_needed_request_key_frame();
srs_error_t consume(SrsRtpSharedPacket* pkt);
void collect_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames);
bool should_request_key_frame();
void notify_drop_seq(uint16_t seq);
void notify_nack_list_full();
void request_keyframe() { request_key_frame_ = true; }
@ -194,7 +192,6 @@ public:
void update_rtt(int rtt);
private:
void insert_into_nack_list(uint16_t seq_start, uint16_t seq_end);
private:
void collect_packet();
};

Loading…
Cancel
Save