From ecef3e7f0a9c994953f679736314a32ee0825045 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 27 Feb 2021 17:40:19 +0800 Subject: [PATCH] Perf: Refine player cycle, use fast coroutine --- trunk/src/app/srs_app_rtc_conn.cpp | 184 ++++++++++++++++----------- trunk/src/app/srs_app_rtc_conn.hpp | 6 +- trunk/src/app/srs_app_rtc_source.cpp | 44 +++---- trunk/src/app/srs_app_rtc_source.hpp | 12 +- trunk/src/app/srs_app_st.cpp | 72 ++++++++--- trunk/src/app/srs_app_st.hpp | 63 ++++++--- 6 files changed, 238 insertions(+), 143 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 23e7b6dc6..85b134f6a 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -382,7 +382,7 @@ SrsRtcPlayStreamStatistic::~SrsRtcPlayStreamStatistic() SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) { cid_ = cid; - trd = new SrsDummyCoroutine(); + trd_ = NULL; req_ = NULL; source_ = NULL; @@ -412,7 +412,7 @@ SrsRtcPlayStream::~SrsRtcPlayStream() srs_freep(nack_epp); srs_freep(pli_worker_); - srs_freep(trd); + srs_freep(trd_); srs_freep(timer_); srs_freep(req_); @@ -499,10 +499,10 @@ srs_error_t SrsRtcPlayStream::start() return err; } - srs_freep(trd); - trd = new SrsSTCoroutine("rtc_sender", this, cid_); + srs_freep(trd_); + trd_ = new SrsFastCoroutine("rtc_sender", this, cid_); - if ((err = trd->start()) != srs_success) { + if ((err = trd_->start()) != srs_success) { return srs_error_wrap(err, "rtc_sender"); } @@ -527,7 +527,9 @@ srs_error_t SrsRtcPlayStream::start() void SrsRtcPlayStream::stop() { - trd->stop(); + if (trd_) { + trd_->stop(); + } } srs_error_t SrsRtcPlayStream::cycle() @@ -550,24 +552,14 @@ srs_error_t SrsRtcPlayStream::cycle() realtime = _srs_config->get_realtime_enabled(req_->vhost, true); mw_msgs = _srs_config->get_mw_msgs(req_->vhost, realtime, true); - bool stat_enabled = _srs_config->get_rtc_server_perf_stat(); - SrsStatistic* stat = SrsStatistic::instance(); - // TODO: FIXME: Add cost in ms. SrsContextId cid = source->source_id(); - srs_trace("RTC: start play url=%s, source_id=%s/%s, realtime=%d, mw_msgs=%d, stat=%d", req_->get_stream_url().c_str(), - cid.c_str(), source->pre_source_id().c_str(), realtime, mw_msgs, stat_enabled); + srs_trace("RTC: start play url=%s, source_id=%s/%s, realtime=%d, mw_msgs=%d", req_->get_stream_url().c_str(), + cid.c_str(), source->pre_source_id().c_str(), realtime, mw_msgs); SrsErrorPithyPrint* epp = new SrsErrorPithyPrint(); SrsAutoFree(SrsErrorPithyPrint, epp); - SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_play(); - SrsAutoFree(SrsPithyPrint, pprint); - - // TODO: FIXME: Use cache for performance? - vector pkts; - uint64_t total_pkts = 0; - if (_srs_rtc_hijacker) { if ((err = _srs_rtc_hijacker->on_start_consume(session_, this, req_, consumer)) != srs_success) { return srs_error_wrap(err, "on start consuming"); @@ -575,64 +567,30 @@ srs_error_t SrsRtcPlayStream::cycle() } while (true) { - if ((err = trd->pull()) != srs_success) { + if ((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "rtc sender thread"); } // Wait for amount of packets. - consumer->wait(mw_msgs); - - // TODO: FIXME: Handle error. - consumer->dump_packets(pkts); - - int msg_count = (int)pkts.size(); - if (!msg_count) { - continue; - } - - // Update stats for session. - session_->stat_->nn_out_rtp += msg_count; - total_pkts += msg_count; - - // Send-out all RTP packets and do cleanup - if (true) { - if ((err = send_packets(source, pkts, info)) != srs_success) { - uint32_t nn = 0; - if (epp->can_print(err, &nn)) { - srs_warn("play send packets=%u, nn=%u/%u, err: %s", pkts.size(), epp->nn_count, nn, srs_error_desc(err).c_str()); - } - srs_freep(err); - } - - for (int i = 0; i < msg_count; i++) { - SrsRtpPacket2* pkt = pkts[i]; - _srs_rtp_cache->recycle(pkt); +try_dump_again: + SrsRtpPacket2* pkt = NULL; + consumer->dump_packet(&pkt); + if (!pkt) { + // TODO: FIXME: We should check the quit event. + consumer->wait(mw_msgs); + goto try_dump_again; + } + + // Send-out the RTP packet and do cleanup + if ((err = send_packet(pkt)) != srs_success) { + uint32_t nn = 0; + if (epp->can_print(err, &nn)) { + srs_warn("play send packets=%u, nn=%u/%u, err: %s", 1, epp->nn_count, nn, srs_error_desc(err).c_str()); } - pkts.clear(); - } - - // Stat for performance analysis. - if (!stat_enabled) { - continue; + srs_freep(err); } - // Stat the original RAW AV frame, maybe h264+aac. - stat->perf_on_msgs(msg_count); - // Stat the RTC packets, RAW AV frame, maybe h.264+opus. - int nn_rtc_packets = srs_max(info.nn_audios, info.nn_extras) + info.nn_videos; - stat->perf_on_rtc_packets(nn_rtc_packets); - // Stat the RAW RTP packets, which maybe group by GSO. - stat->perf_on_rtp_packets(msg_count); - // Stat the bytes and paddings. - stat->perf_on_rtc_bytes(info.nn_bytes, info.nn_rtp_bytes, info.nn_padding_bytes); - - pprint->elapse(); - if (pprint->can_print()) { - // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. - srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes, %d pad, %d/%d cache", - total_pkts, msg_count, info.nn_rtp_pkts, info.nn_audios, info.nn_extras, info.nn_videos, info.nn_samples, info.nn_bytes, - info.nn_rtp_bytes, info.nn_padding_bytes, info.nn_paddings, msg_count, msg_count); - } + _srs_rtp_cache->recycle(pkt); } } @@ -640,7 +598,6 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector send_pkts; // Covert kernel messages to RTP packets. for (int i = 0; i < (int)pkts.size(); i++) { SrsRtpPacket2* pkt = pkts[i]; @@ -656,7 +613,7 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vectorheader.get_ssrc()]; - if ((err = audio_track->on_rtp(pkt, info)) != srs_success) { + if ((err = audio_track->on_rtp(pkt)) != srs_success) { return srs_error_wrap(err, "audio track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence()); } @@ -665,7 +622,7 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vectorheader.get_ssrc()]; - if ((err = video_track->on_rtp(pkt, info)) != srs_success) { + if ((err = video_track->on_rtp(pkt)) != srs_success) { return srs_error_wrap(err, "video track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence()); } } @@ -678,6 +635,42 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vectorheader.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) { + srs_warn("ssrc %u not found", pkt->header.get_ssrc()); + return err; + } + + // For audio, we transcoded AAC to opus in extra payloads. + if (pkt->is_audio()) { + // TODO: FIXME: Any simple solution? + SrsRtcAudioSendTrack* audio_track = audio_tracks_[pkt->header.get_ssrc()]; + + if ((err = audio_track->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "audio track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence()); + } + + // TODO: FIXME: Padding audio to the max payload in RTP packets. + } else { + // TODO: FIXME: Any simple solution? + SrsRtcVideoSendTrack* video_track = video_tracks_[pkt->header.get_ssrc()]; + + if ((err = video_track->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "video track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence()); + } + } + + // Detail log, should disable it in release version. + srs_info("RTC: Update PT=%u, SSRC=%#x, Time=%u, %u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(), + pkt->header.get_timestamp(), pkt->nb_bytes()); + + return err; +} + void SrsRtcPlayStream::set_all_tracks_status(bool status) { std::ostringstream merged_log; @@ -805,7 +798,7 @@ srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp) } vector seqs = rtcp->get_lost_sns(); - if((err = target->on_recv_nack(seqs, info)) != srs_success) { + if((err = target->on_recv_nack(seqs)) != srs_success) { return srs_error_wrap(err, "track response nack. id:%s, ssrc=%u", target->get_track_id().c_str(), ssrc); } @@ -2569,6 +2562,51 @@ srs_error_t SrsRtcConnection::do_send_packets(const std::vector& return err; } +srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + // For this message, select the first iovec. + iovec* iov = cache_iov_; + iov->iov_len = kRtpPacketSize; + cache_buffer_->skip(-1 * cache_buffer_->pos()); + + // Marshal packet to bytes in iovec. + if (true) { + if ((err = pkt->encode(cache_buffer_)) != srs_success) { + return srs_error_wrap(err, "encode packet"); + } + iov->iov_len = cache_buffer_->pos(); + } + + // Cipher RTP to SRTP packet. + if (true) { + int nn_encrypt = (int)iov->iov_len; + if ((err = transport_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) { + return srs_error_wrap(err, "srtp protect"); + } + iov->iov_len = (size_t)nn_encrypt; + } + + // For NACK simulator, drop packet. + if (nn_simulate_player_nack_drop) { + simulate_player_drop_packet(&pkt->header, (int)iov->iov_len); + iov->iov_len = 0; + return err; + } + + ++_srs_pps_srtps->sugar; + + // TODO: FIXME: Handle error. + sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0); + + // Detail log, should disable it in release version. + srs_info("RTC: SEND PT=%u, SSRC=%#x, SEQ=%u, Time=%u, %u/%u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(), + pkt->header.get_sequence(), pkt->header.get_timestamp(), pkt->nb_bytes(), iov->iov_len); + + return err; +} + void SrsRtcConnection::set_all_tracks_status(std::string stream_uri, bool is_publish, bool status) { // For publishers. diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 437857833..69c620b03 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -62,6 +62,8 @@ class SrsRtcConsumer; class SrsRtcAudioSendTrack; class SrsRtcVideoSendTrack; class SrsErrorPithyPrint; +class SrsPithyPrint; +class SrsStatistic; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -245,7 +247,7 @@ class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISr { private: SrsContextId cid_; - SrsCoroutine* trd; + SrsFastCoroutine* trd_; SrsRtcConnection* session_; SrsRtcPLIWorker* pli_worker_; private: @@ -285,6 +287,7 @@ public: virtual srs_error_t cycle(); private: srs_error_t send_packets(SrsRtcStream* source, const std::vector& pkts, SrsRtcPlayStreamStatistic& info); + srs_error_t send_packet(SrsRtpPacket2* pkt); public: // Directly set the status of track, generally for init to set the default value. void set_all_tracks_status(bool status); @@ -549,6 +552,7 @@ public: void simulate_nack_drop(int nn); void simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes); srs_error_t do_send_packets(const std::vector& pkts, SrsRtcPlayStreamStatistic& info); + srs_error_t do_send_packet(SrsRtpPacket2* pkt); // Directly set the status of play track, generally for init to set the default value. void set_all_tracks_status(std::string stream_uri, bool is_publish, bool status); private: diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 21c7f8c7b..7d2c149b7 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -197,7 +197,7 @@ srs_error_t SrsRtcConsumer::enqueue(SrsRtpPacket2* pkt) return err; } -srs_error_t SrsRtcConsumer::dump_packets(std::vector& pkts) +srs_error_t SrsRtcConsumer::dump_packet(SrsRtpPacket2** ppkt) { srs_error_t err = srs_success; @@ -206,7 +206,11 @@ srs_error_t SrsRtcConsumer::dump_packets(std::vector& pkts) should_update_source_id = false; } - queue.swap(pkts); + // TODO: FIXME: Refine performance by ring buffer. + if (!queue.empty()) { + *ppkt = queue.front(); + queue.erase(queue.begin()); + } return err; } @@ -2049,7 +2053,7 @@ std::string SrsRtcSendTrack::get_track_id() return track_desc_->id_; } -srs_error_t SrsRtcSendTrack::on_recv_nack(const vector& lost_seqs, SrsRtcPlayStreamStatistic& info) +srs_error_t SrsRtcSendTrack::on_recv_nack(const vector& lost_seqs) { srs_error_t err = srs_success; @@ -2066,18 +2070,14 @@ srs_error_t SrsRtcSendTrack::on_recv_nack(const vector& lost_seqs, Srs continue; } - info.nn_bytes += pkt->nb_bytes(); uint32_t nn = 0; if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) { srs_trace("RTC: NACK ARQ seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", pkt->header.get_sequence(), pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes()); } - vector resend_pkts; - resend_pkts.push_back(pkt); - // By default, we send packets by sendmmsg. - if ((err = session_->do_send_packets(resend_pkts, info)) != srs_success) { + if ((err = session_->do_send_packet(pkt)) != srs_success) { return srs_error_wrap(err, "raw send"); } } @@ -2094,7 +2094,7 @@ SrsRtcAudioSendTrack::~SrsRtcAudioSendTrack() { } -srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info) +srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -2122,8 +2122,6 @@ srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamSta } // Update stats. - info.nn_bytes += pkt->nb_bytes(); - info.nn_audios++; session_->stat_->nn_out_audios++; // track level statistic @@ -2131,13 +2129,8 @@ srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamSta statistic_->packets++; statistic_->bytes += pkt->nb_bytes(); - if (true) { - std::vector pkts; - pkts.push_back(pkt); - - if ((err = session_->do_send_packets(pkts, info)) != srs_success) { - return srs_error_wrap(err, "raw send"); - } + if ((err = session_->do_send_packet(pkt)) != srs_success) { + return srs_error_wrap(err, "raw send"); } return err; @@ -2159,7 +2152,7 @@ SrsRtcVideoSendTrack::~SrsRtcVideoSendTrack() { } -srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info) +srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -2189,8 +2182,6 @@ srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamSta } // Update stats. - info.nn_bytes += pkt->nb_bytes(); - info.nn_videos++; session_->stat_->nn_out_videos++; // track level statistic @@ -2198,15 +2189,10 @@ srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamSta statistic->packets++; statistic->bytes += pkt->nb_bytes(); - if (true) { - std::vector pkts; - pkts.push_back(pkt); - - if ((err = session_->do_send_packets(pkts, info)) != srs_success) { - return srs_error_wrap(err, "raw send"); - } + if ((err = session_->do_send_packet(pkt)) != srs_success) { + return srs_error_wrap(err, "raw send"); } - + return err; } diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index d5155c263..5bb7c42d9 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -97,8 +97,8 @@ public: // Put RTP packet into queue. // @note We do not drop packet here, but drop it in sender. srs_error_t enqueue(SrsRtpPacket2* pkt); - // Get all RTP packets from queue. - virtual srs_error_t dump_packets(std::vector& pkts); + // For RTC, we only got one packet, because there is not many packets in queue. + virtual srs_error_t dump_packet(SrsRtpPacket2** ppkt); // Wait for at-least some messages incoming in queue. virtual void wait(int nb_msgs); }; @@ -591,9 +591,9 @@ public: bool get_track_status(); std::string get_track_id(); public: - virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info) = 0; + virtual srs_error_t on_rtp(SrsRtpPacket2* pkt) = 0; virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt) = 0; - virtual srs_error_t on_recv_nack(const std::vector& lost_seqs, SrsRtcPlayStreamStatistic& info); + virtual srs_error_t on_recv_nack(const std::vector& lost_seqs); }; class SrsRtcAudioSendTrack : public SrsRtcSendTrack @@ -602,7 +602,7 @@ public: SrsRtcAudioSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); virtual ~SrsRtcAudioSendTrack(); public: - virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info); + virtual srs_error_t on_rtp(SrsRtpPacket2* pkt); virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); }; @@ -612,7 +612,7 @@ public: SrsRtcVideoSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); virtual ~SrsRtcVideoSendTrack(); public: - virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info); + virtual srs_error_t on_rtp(SrsRtpPacket2* pkt); virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); }; diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index b11c50fd1..5a1954a4b 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -87,9 +87,54 @@ const SrsContextId& SrsDummyCoroutine::cid() return _srs_context->get_id(); } +SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h) +{ + impl_ = new SrsFastCoroutine(n, h); +} + +SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid) +{ + impl_ = new SrsFastCoroutine(n, h, cid); +} + +SrsSTCoroutine::~SrsSTCoroutine() +{ + srs_freep(impl_); +} + +void SrsSTCoroutine::set_stack_size(int v) +{ + impl_->set_stack_size(v); +} + +srs_error_t SrsSTCoroutine::start() +{ + return impl_->start(); +} + +void SrsSTCoroutine::stop() +{ + impl_->stop(); +} + +void SrsSTCoroutine::interrupt() +{ + impl_->interrupt(); +} + +srs_error_t SrsSTCoroutine::pull() +{ + return impl_->pull(); +} + +const SrsContextId& SrsSTCoroutine::cid() +{ + return impl_->cid(); +} + _ST_THREAD_CREATE_PFN _pfn_st_thread_create = (_ST_THREAD_CREATE_PFN)st_thread_create; -SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h) +SrsFastCoroutine::SrsFastCoroutine(string n, ISrsCoroutineHandler* h) { // TODO: FIXME: Reduce duplicated code. name = n; @@ -102,7 +147,7 @@ SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h) stack_size = 0; } -SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid) +SrsFastCoroutine::SrsFastCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid) { name = n; handler = h; @@ -115,19 +160,19 @@ SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId c stack_size = 0; } -SrsSTCoroutine::~SrsSTCoroutine() +SrsFastCoroutine::~SrsFastCoroutine() { stop(); srs_freep(trd_err); } -void SrsSTCoroutine::set_stack_size(int v) +void SrsFastCoroutine::set_stack_size(int v) { stack_size = v; } -srs_error_t SrsSTCoroutine::start() +srs_error_t SrsFastCoroutine::start() { srs_error_t err = srs_success; @@ -159,7 +204,7 @@ srs_error_t SrsSTCoroutine::start() return err; } -void SrsSTCoroutine::stop() +void SrsFastCoroutine::stop() { if (disposed) { return; @@ -190,7 +235,7 @@ void SrsSTCoroutine::stop() return; } -void SrsSTCoroutine::interrupt() +void SrsFastCoroutine::interrupt() { if (!started || interrupted || cycle_done) { return; @@ -204,17 +249,12 @@ void SrsSTCoroutine::interrupt() st_thread_interrupt((st_thread_t)trd); } -srs_error_t SrsSTCoroutine::pull() -{ - return srs_error_copy(trd_err); -} - -const SrsContextId& SrsSTCoroutine::cid() +const SrsContextId& SrsFastCoroutine::cid() { return cid_; } -srs_error_t SrsSTCoroutine::cycle() +srs_error_t SrsFastCoroutine::cycle() { if (_srs_context) { if (cid_.empty()) { @@ -234,9 +274,9 @@ srs_error_t SrsSTCoroutine::cycle() return err; } -void* SrsSTCoroutine::pfn(void* arg) +void* SrsFastCoroutine::pfn(void* arg) { - SrsSTCoroutine* p = (SrsSTCoroutine*)arg; + SrsFastCoroutine* p = (SrsFastCoroutine*)arg; srs_error_t err = p->cycle(); diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 42313ed7a..89d118ce9 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -29,9 +29,12 @@ #include #include +#include #include #include +class SrsFastCoroutine; + // Each ST-coroutine must implements this interface, // to do the cycle job and handle some events. // @@ -108,10 +111,6 @@ public: virtual const SrsContextId& cid(); }; -// For utest to mock the thread create. -typedef void* (*_ST_THREAD_CREATE_PFN)(void *(*start)(void *arg), void *arg, int joinable, int stack_size); -extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create; - // A ST-coroutine is a lightweight thread, just like the goroutine. // But the goroutine maybe run on different thread, while ST-coroutine only // run in single thread, because it use setjmp and longjmp, so it may cause @@ -127,19 +126,7 @@ extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create; class SrsSTCoroutine : public SrsCoroutine { private: - std::string name; - int stack_size; - ISrsCoroutineHandler* handler; -private: - srs_thread_t trd; - SrsContextId cid_; - srs_error_t trd_err; -private: - bool started; - bool interrupted; - bool disposed; - // Cycle done, no need to interrupt it. - bool cycle_done; + SrsFastCoroutine* impl_; public: // Create a thread with name n and handler h. // @remark User can specify a cid for thread to use, or we will allocate a new one. @@ -170,8 +157,48 @@ public: virtual srs_error_t pull(); // Get the context id of thread. virtual const SrsContextId& cid(); +}; + +// For utest to mock the thread create. +typedef void* (*_ST_THREAD_CREATE_PFN)(void *(*start)(void *arg), void *arg, int joinable, int stack_size); +extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create; + +// High performance coroutine. +class SrsFastCoroutine +{ +private: + std::string name; + int stack_size; + ISrsCoroutineHandler* handler; +private: + srs_thread_t trd; + SrsContextId cid_; + srs_error_t trd_err; +private: + bool started; + bool interrupted; + bool disposed; + // Cycle done, no need to interrupt it. + bool cycle_done; +public: + SrsFastCoroutine(std::string n, ISrsCoroutineHandler* h); + SrsFastCoroutine(std::string n, ISrsCoroutineHandler* h, SrsContextId cid); + ~SrsFastCoroutine(); +public: + void set_stack_size(int v); +public: + srs_error_t start(); + void stop(); + void interrupt(); + inline srs_error_t pull() { + if (trd_err == srs_success) { + return srs_success; + } + return srs_error_copy(trd_err); + } + const SrsContextId& cid(); private: - virtual srs_error_t cycle(); + srs_error_t cycle(); static void* pfn(void* arg); };