diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 2570b91a7..00cd9928b 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -794,6 +794,9 @@ srs_error_t SrsRtcSenderThread::cycle() srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", req->get_stream_url().c_str(), ::getpid(), source->source_id(), rtc_session->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs); + // For RTC, notify the source to fetch keyframe for this client. + source->request_keyframe(); + SrsMessageArray msgs(SRS_PERF_MW_MSGS); SrsRtcPackets pkts(SRS_PERF_RTC_RTP_PACKETS); @@ -1517,6 +1520,8 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) SrsRtcPublisher::~SrsRtcPublisher() { + source->set_rtc_publisher(NULL); + // TODO: FIXME: Do unpublish when session timeout. if (source) { source->on_unpublish(); @@ -1559,6 +1564,8 @@ srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, ui return srs_error_wrap(err, "on publish"); } + source->set_rtc_publisher(this); + return err; } @@ -1745,7 +1752,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc, Sr } vector nack_seqs; - rtp_queue->nack_.get_nack_seqs(nack_seqs); + rtp_queue->get_nack_seqs(nack_seqs); vector::iterator iter = nack_seqs.begin(); while (iter != nack_seqs.end()) { char buf[kRtpPacketSize]; @@ -1912,7 +1919,7 @@ srs_error_t SrsRtcPublisher::send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssr stream.write_4bytes(ssrc); stream.write_4bytes(ssrc); - srs_verbose("PLI ssrc=%u", ssrc); + srs_trace("RTC PLI ssrc=%u", ssrc); char protected_buf[kRtpPacketSize]; int nb_protected_buf = stream.pos(); @@ -1985,6 +1992,7 @@ srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_video_queue->insert(rtp_pkt); if (rtp_video_queue->get_and_clean_if_needed_request_key_frame()) { + // TODO: FIXME: Check error. send_rtcp_fb_pli(skt, video_ssrc); } @@ -2148,6 +2156,15 @@ void SrsRtcPublisher::update_sendonly_socket(SrsUdpMuxSocket* skt) sendonly_ukt = skt->copy_sendonly(); } +void SrsRtcPublisher::request_keyframe() +{ + int scid = _srs_context->get_id(); + int pcid = rtc_session->context_id(); + srs_trace("RTC play=[%d][%d] request keyframe from publish=[%d][%d]", ::getpid(), scid, ::getpid(), pcid); + + rtp_video_queue->request_keyframe(); +} + srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t tick) { if (sendonly_ukt) { @@ -3172,7 +3189,7 @@ srs_error_t SrsRtcServer::create_rtc_session( } // TODO: FIXME: Refine the API for stream status manage. - if (!source->can_publish(false)) { + if (publish && !source->can_publish(false)) { return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str()); } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 793813abb..42bbe0f6e 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -299,6 +299,7 @@ private: srs_error_t collect_video_frame(); public: void update_sendonly_socket(SrsUdpMuxSocket* skt); + void request_keyframe(); // interface ISrsHourGlass public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); @@ -359,6 +360,7 @@ public: void set_encrypt(bool v) { encrypt = v; } void switch_to_context(); + int context_id() { return cid; } public: srs_error_t initialize(); srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req); diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 28637e738..3e5d3a8cb 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -41,7 +41,7 @@ SrsRtpNackInfo::SrsRtpNackInfo() req_nack_count_ = 0; } -SrsRtpNackList::SrsRtpNackList(SrsRtpQueue* rtp_queue, size_t queue_size) +SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpQueue* rtp_queue, size_t queue_size) { max_queue_size_ = queue_size; rtp_queue_ = rtp_queue; @@ -51,23 +51,23 @@ SrsRtpNackList::SrsRtpNackList(SrsRtpQueue* rtp_queue, size_t queue_size) max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval); } -SrsRtpNackList::~SrsRtpNackList() +SrsRtpNackForReceiver::~SrsRtpNackForReceiver() { } -void SrsRtpNackList::insert(uint16_t seq) +void SrsRtpNackForReceiver::insert(uint16_t seq) { // FIXME: full, drop packet, and request key frame. SrsRtpNackInfo& nack_info = queue_[seq]; (void)nack_info; } -void SrsRtpNackList::remove(uint16_t seq) +void SrsRtpNackForReceiver::remove(uint16_t seq) { queue_.erase(seq); } -SrsRtpNackInfo* SrsRtpNackList::find(uint16_t seq) +SrsRtpNackInfo* SrsRtpNackForReceiver::find(uint16_t seq) { std::map::iterator iter = queue_.find(seq); @@ -78,7 +78,7 @@ SrsRtpNackInfo* SrsRtpNackList::find(uint16_t seq) return &(iter->second); } -void SrsRtpNackList::check_queue_size() +void SrsRtpNackForReceiver::check_queue_size() { if (queue_.size() >= max_queue_size_) { srs_verbose("NACK list full, queue size=%u, max_queue_size=%u", queue_.size(), max_queue_size_); @@ -86,7 +86,7 @@ void SrsRtpNackList::check_queue_size() } } -void SrsRtpNackList::get_nack_seqs(vector& seqs) +void SrsRtpNackForReceiver::get_nack_seqs(vector& seqs) { srs_utime_t now = srs_update_system_time(); int interval = now - pre_check_time_; @@ -126,7 +126,7 @@ void SrsRtpNackList::get_nack_seqs(vector& seqs) } } -void SrsRtpNackList::update_rtt(int rtt) +void SrsRtpNackForReceiver::update_rtt(int rtt) { rtt_ = rtt * SRS_UTIME_MILLISECONDS; srs_verbose("NACK, update rtt from %ld to %d", opts_.nack_interval, rtt_); @@ -244,10 +244,10 @@ void SrsRtpRingBuffer::update(uint16_t seq, bool startup, uint16_t& nack_low, ui } SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) - : nack_(this, capacity * 2 / 3) { nn_collected_frames = 0; queue_ = new SrsRtpRingBuffer(capacity); + nack_ = new SrsRtpNackForReceiver(this, capacity * 2 / 3); jitter_ = 0; last_trans_time_ = -1; @@ -266,6 +266,7 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) SrsRtpQueue::~SrsRtpQueue() { srs_freep(queue_); + srs_freep(nack_); } srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) @@ -276,13 +277,13 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) srs_utime_t now = srs_update_system_time(); uint16_t seq = rtp_pkt->rtp_header.get_sequence(); - SrsRtpNackInfo* nack_info = nack_.find(seq); + SrsRtpNackInfo* nack_info = nack_->find(seq); if (nack_info) { int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0; (void)nack_rtt; srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms", seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt); - nack_.remove(seq); + nack_->remove(seq); } // Calc jitter time, ignore nack packets. @@ -328,7 +329,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) srs_trace("seqs out of range, seq range [%u, %u]", queue_->low(), next); for (uint16_t s = queue_->low(); s != next; ++s) { - nack_.remove(s); + nack_->remove(s); queue_->remove(s); } @@ -359,6 +360,7 @@ bool SrsRtpQueue::get_and_clean_if_needed_request_key_frame() { if (request_key_frame_) { request_key_frame_ = false; + return true; } return request_key_frame_; @@ -421,20 +423,25 @@ uint32_t SrsRtpQueue::get_interarrival_jitter() return static_cast(jitter_); } +void SrsRtpQueue::get_nack_seqs(vector& seqs) +{ + nack_->get_nack_seqs(seqs); +} + void SrsRtpQueue::update_rtt(int rtt) { - nack_.update_rtt(rtt); + nack_->update_rtt(rtt); } void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end) { for (uint16_t s = seq_start; s != seq_end; ++s) { srs_verbose("loss seq=%u, insert into nack list", s); - nack_.insert(s); + nack_->insert(s); ++number_of_packet_lossed_; } - nack_.check_queue_size(); + nack_->check_queue_size(); } void SrsRtpQueue::collect_packet() @@ -443,7 +450,7 @@ void SrsRtpQueue::collect_packet() for (uint16_t s = queue_->low(); s != queue_->high(); ++s) { SrsRtpSharedPacket* pkt = queue_->at(s); - if (nack_.find(s) != NULL) { + if (nack_->find(s) != NULL) { srs_verbose("seq=%u, found in nack list when collect frame", s); break; } diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 70631e801..7483f297b 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -71,7 +71,7 @@ inline bool srs_rtp_seq_distance(const uint16_t& low, const uint16_t& high) return ((int16_t)(high - low)) > 0; } -class SrsRtpNackList +class SrsRtpNackForReceiver { private: struct SeqComp { @@ -91,8 +91,8 @@ private: private: int rtt_; public: - SrsRtpNackList(SrsRtpQueue* rtp_queue, size_t queue_size); - virtual ~SrsRtpNackList(); + SrsRtpNackForReceiver(SrsRtpQueue* rtp_queue, size_t queue_size); + virtual ~SrsRtpNackForReceiver(); public: void insert(uint16_t seq); void remove(uint16_t seq); @@ -159,6 +159,7 @@ class SrsRtpQueue private: uint64_t nn_collected_frames; SrsRtpRingBuffer* queue_; + SrsRtpNackForReceiver* nack_; private: double jitter_; // TODO: FIXME: Covert time to srs_utime_t. @@ -169,8 +170,6 @@ private: uint64_t number_of_packet_lossed_; private: bool one_packet_per_frame_; -public: - SrsRtpNackList nack_; private: std::vector > frames_; bool request_key_frame_; @@ -184,12 +183,14 @@ public: bool get_and_clean_if_needed_request_key_frame(); void notify_drop_seq(uint16_t seq); void notify_nack_list_full(); + void request_keyframe() { request_key_frame_ = true; } public: uint32_t get_extended_highest_sequence(); uint8_t get_fraction_lost(); uint32_t get_cumulative_number_of_packets_lost(); uint32_t get_interarrival_jitter(); public: + void get_nack_seqs(std::vector& seqs); void update_rtt(int rtt); private: void insert_into_nack_list(uint16_t seq_start, uint16_t seq_end); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 3f543915a..989abbb12 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -52,6 +52,7 @@ using namespace std; #include #ifdef SRS_RTC #include +#include #endif #define CONST_MAX_JITTER_MS 250 @@ -1966,6 +1967,10 @@ SrsSource::SrsSource() _srs_config->subscribe(this); atc = false; + +#ifdef SRS_RTC + rtc_publisher = NULL; +#endif } SrsSource::~SrsSource() @@ -2749,4 +2754,11 @@ SrsMetaCache* SrsSource::cached_meta() { return meta; } + +void SrsSource::request_keyframe() +{ + if (rtc_publisher) { + rtc_publisher->request_keyframe(); + } +} #endif diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index fcc6c84d4..8194b46ff 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -63,6 +63,9 @@ class SrsBuffer; class SrsHds; #endif class SrsRtpSharedPacket; +#ifdef SRS_RTC +class SrsRtcPublisher; +#endif // The time jitter algorithm: // 1. full, to ensure stream start at zero, and ensure stream monotonically increasing. @@ -577,6 +580,10 @@ private: // The last die time, when all consumers quit and no publisher, // We will remove the source when source die. srs_utime_t die_at; +#ifdef SRS_RTC +private: + SrsRtcPublisher* rtc_publisher; +#endif public: SrsSource(); virtual ~SrsSource(); @@ -639,12 +646,17 @@ public: virtual void on_edge_proxy_unpublish(); public: virtual std::string get_curr_origin(); -public: #ifdef SRS_RTC +public: // Find rtp packet by sequence SrsRtpSharedPacket* find_rtp_packet(const uint16_t& seq); // Get the cached meta, as such the sps/pps. SrsMetaCache* cached_meta(); + // Request keyframe for new client. + // TODO: FIXME: Maybe we could cache the keyframe. + // TODO: FIXME: Maybe we should only response for the new clients. + void request_keyframe(); + void set_rtc_publisher(SrsRtcPublisher* v) { rtc_publisher = v; } #endif };