Perf: Refine player cycle, use fast coroutine

pull/2204/head
winlin 4 years ago
parent 29b33e6303
commit ecef3e7f0a

@ -382,7 +382,7 @@ SrsRtcPlayStreamStatistic::~SrsRtcPlayStreamStatistic()
SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
{ {
cid_ = cid; cid_ = cid;
trd = new SrsDummyCoroutine(); trd_ = NULL;
req_ = NULL; req_ = NULL;
source_ = NULL; source_ = NULL;
@ -412,7 +412,7 @@ SrsRtcPlayStream::~SrsRtcPlayStream()
srs_freep(nack_epp); srs_freep(nack_epp);
srs_freep(pli_worker_); srs_freep(pli_worker_);
srs_freep(trd); srs_freep(trd_);
srs_freep(timer_); srs_freep(timer_);
srs_freep(req_); srs_freep(req_);
@ -499,10 +499,10 @@ srs_error_t SrsRtcPlayStream::start()
return err; return err;
} }
srs_freep(trd); srs_freep(trd_);
trd = new SrsSTCoroutine("rtc_sender", this, cid_); trd_ = new SrsFastCoroutine("rtc_sender", this, cid_);
if ((err = trd->start()) != srs_success) { if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "rtc_sender"); return srs_error_wrap(err, "rtc_sender");
} }
@ -527,7 +527,9 @@ srs_error_t SrsRtcPlayStream::start()
void SrsRtcPlayStream::stop() void SrsRtcPlayStream::stop()
{ {
trd->stop(); if (trd_) {
trd_->stop();
}
} }
srs_error_t SrsRtcPlayStream::cycle() srs_error_t SrsRtcPlayStream::cycle()
@ -550,24 +552,14 @@ srs_error_t SrsRtcPlayStream::cycle()
realtime = _srs_config->get_realtime_enabled(req_->vhost, true); realtime = _srs_config->get_realtime_enabled(req_->vhost, true);
mw_msgs = _srs_config->get_mw_msgs(req_->vhost, realtime, true); mw_msgs = _srs_config->get_mw_msgs(req_->vhost, realtime, true);
bool stat_enabled = _srs_config->get_rtc_server_perf_stat();
SrsStatistic* stat = SrsStatistic::instance();
// TODO: FIXME: Add cost in ms. // TODO: FIXME: Add cost in ms.
SrsContextId cid = source->source_id(); SrsContextId cid = source->source_id();
srs_trace("RTC: start play url=%s, source_id=%s/%s, realtime=%d, mw_msgs=%d, stat=%d", req_->get_stream_url().c_str(), srs_trace("RTC: start play url=%s, source_id=%s/%s, realtime=%d, mw_msgs=%d", req_->get_stream_url().c_str(),
cid.c_str(), source->pre_source_id().c_str(), realtime, mw_msgs, stat_enabled); cid.c_str(), source->pre_source_id().c_str(), realtime, mw_msgs);
SrsErrorPithyPrint* epp = new SrsErrorPithyPrint(); SrsErrorPithyPrint* epp = new SrsErrorPithyPrint();
SrsAutoFree(SrsErrorPithyPrint, epp); SrsAutoFree(SrsErrorPithyPrint, epp);
SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_play();
SrsAutoFree(SrsPithyPrint, pprint);
// TODO: FIXME: Use cache for performance?
vector<SrsRtpPacket2*> pkts;
uint64_t total_pkts = 0;
if (_srs_rtc_hijacker) { if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_consume(session_, this, req_, consumer)) != srs_success) { if ((err = _srs_rtc_hijacker->on_start_consume(session_, this, req_, consumer)) != srs_success) {
return srs_error_wrap(err, "on start consuming"); return srs_error_wrap(err, "on start consuming");
@ -575,64 +567,30 @@ srs_error_t SrsRtcPlayStream::cycle()
} }
while (true) { while (true) {
if ((err = trd->pull()) != srs_success) { if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "rtc sender thread"); return srs_error_wrap(err, "rtc sender thread");
} }
// Wait for amount of packets. // Wait for amount of packets.
consumer->wait(mw_msgs); try_dump_again:
SrsRtpPacket2* pkt = NULL;
// TODO: FIXME: Handle error. consumer->dump_packet(&pkt);
consumer->dump_packets(pkts); if (!pkt) {
// TODO: FIXME: We should check the quit event.
int msg_count = (int)pkts.size(); consumer->wait(mw_msgs);
if (!msg_count) { goto try_dump_again;
continue; }
}
// Send-out the RTP packet and do cleanup
// Update stats for session. if ((err = send_packet(pkt)) != srs_success) {
session_->stat_->nn_out_rtp += msg_count; uint32_t nn = 0;
total_pkts += msg_count; if (epp->can_print(err, &nn)) {
srs_warn("play send packets=%u, nn=%u/%u, err: %s", 1, epp->nn_count, nn, srs_error_desc(err).c_str());
// Send-out all RTP packets and do cleanup
if (true) {
if ((err = send_packets(source, pkts, info)) != srs_success) {
uint32_t nn = 0;
if (epp->can_print(err, &nn)) {
srs_warn("play send packets=%u, nn=%u/%u, err: %s", pkts.size(), epp->nn_count, nn, srs_error_desc(err).c_str());
}
srs_freep(err);
}
for (int i = 0; i < msg_count; i++) {
SrsRtpPacket2* pkt = pkts[i];
_srs_rtp_cache->recycle(pkt);
} }
pkts.clear(); srs_freep(err);
}
// Stat for performance analysis.
if (!stat_enabled) {
continue;
} }
// Stat the original RAW AV frame, maybe h264+aac. _srs_rtp_cache->recycle(pkt);
stat->perf_on_msgs(msg_count);
// Stat the RTC packets, RAW AV frame, maybe h.264+opus.
int nn_rtc_packets = srs_max(info.nn_audios, info.nn_extras) + info.nn_videos;
stat->perf_on_rtc_packets(nn_rtc_packets);
// Stat the RAW RTP packets, which maybe group by GSO.
stat->perf_on_rtp_packets(msg_count);
// Stat the bytes and paddings.
stat->perf_on_rtc_bytes(info.nn_bytes, info.nn_rtp_bytes, info.nn_padding_bytes);
pprint->elapse();
if (pprint->can_print()) {
// TODO: FIXME: Print stat like frame/s, packet/s, loss_packets.
srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes, %d pad, %d/%d cache",
total_pkts, msg_count, info.nn_rtp_pkts, info.nn_audios, info.nn_extras, info.nn_videos, info.nn_samples, info.nn_bytes,
info.nn_rtp_bytes, info.nn_padding_bytes, info.nn_paddings, msg_count, msg_count);
}
} }
} }
@ -640,7 +598,6 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<Sr
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
vector<SrsRtpPacket2*> send_pkts;
// Covert kernel messages to RTP packets. // Covert kernel messages to RTP packets.
for (int i = 0; i < (int)pkts.size(); i++) { for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts[i]; SrsRtpPacket2* pkt = pkts[i];
@ -656,7 +613,7 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<Sr
// TODO: FIXME: Any simple solution? // TODO: FIXME: Any simple solution?
SrsRtcAudioSendTrack* audio_track = audio_tracks_[pkt->header.get_ssrc()]; SrsRtcAudioSendTrack* audio_track = audio_tracks_[pkt->header.get_ssrc()];
if ((err = audio_track->on_rtp(pkt, info)) != srs_success) { if ((err = audio_track->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "audio track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence()); return srs_error_wrap(err, "audio track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence());
} }
@ -665,7 +622,7 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<Sr
// TODO: FIXME: Any simple solution? // TODO: FIXME: Any simple solution?
SrsRtcVideoSendTrack* video_track = video_tracks_[pkt->header.get_ssrc()]; SrsRtcVideoSendTrack* video_track = video_tracks_[pkt->header.get_ssrc()];
if ((err = video_track->on_rtp(pkt, info)) != srs_success) { if ((err = video_track->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "video track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence()); return srs_error_wrap(err, "video track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence());
} }
} }
@ -678,6 +635,42 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<Sr
return err; return err;
} }
srs_error_t SrsRtcPlayStream::send_packet(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
// TODO: FIXME: Maybe refine for performance issue.
if (!audio_tracks_.count(pkt->header.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) {
srs_warn("ssrc %u not found", pkt->header.get_ssrc());
return err;
}
// For audio, we transcoded AAC to opus in extra payloads.
if (pkt->is_audio()) {
// TODO: FIXME: Any simple solution?
SrsRtcAudioSendTrack* audio_track = audio_tracks_[pkt->header.get_ssrc()];
if ((err = audio_track->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "audio track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence());
}
// TODO: FIXME: Padding audio to the max payload in RTP packets.
} else {
// TODO: FIXME: Any simple solution?
SrsRtcVideoSendTrack* video_track = video_tracks_[pkt->header.get_ssrc()];
if ((err = video_track->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "video track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence());
}
}
// Detail log, should disable it in release version.
srs_info("RTC: Update PT=%u, SSRC=%#x, Time=%u, %u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(),
pkt->header.get_timestamp(), pkt->nb_bytes());
return err;
}
void SrsRtcPlayStream::set_all_tracks_status(bool status) void SrsRtcPlayStream::set_all_tracks_status(bool status)
{ {
std::ostringstream merged_log; std::ostringstream merged_log;
@ -805,7 +798,7 @@ srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp)
} }
vector<uint16_t> seqs = rtcp->get_lost_sns(); vector<uint16_t> seqs = rtcp->get_lost_sns();
if((err = target->on_recv_nack(seqs, info)) != srs_success) { if((err = target->on_recv_nack(seqs)) != srs_success) {
return srs_error_wrap(err, "track response nack. id:%s, ssrc=%u", target->get_track_id().c_str(), ssrc); return srs_error_wrap(err, "track response nack. id:%s, ssrc=%u", target->get_track_id().c_str(), ssrc);
} }
@ -2569,6 +2562,51 @@ srs_error_t SrsRtcConnection::do_send_packets(const std::vector<SrsRtpPacket2*>&
return err; return err;
} }
srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
// For this message, select the first iovec.
iovec* iov = cache_iov_;
iov->iov_len = kRtpPacketSize;
cache_buffer_->skip(-1 * cache_buffer_->pos());
// Marshal packet to bytes in iovec.
if (true) {
if ((err = pkt->encode(cache_buffer_)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
iov->iov_len = cache_buffer_->pos();
}
// Cipher RTP to SRTP packet.
if (true) {
int nn_encrypt = (int)iov->iov_len;
if ((err = transport_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
}
// For NACK simulator, drop packet.
if (nn_simulate_player_nack_drop) {
simulate_player_drop_packet(&pkt->header, (int)iov->iov_len);
iov->iov_len = 0;
return err;
}
++_srs_pps_srtps->sugar;
// TODO: FIXME: Handle error.
sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0);
// Detail log, should disable it in release version.
srs_info("RTC: SEND PT=%u, SSRC=%#x, SEQ=%u, Time=%u, %u/%u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(),
pkt->header.get_sequence(), pkt->header.get_timestamp(), pkt->nb_bytes(), iov->iov_len);
return err;
}
void SrsRtcConnection::set_all_tracks_status(std::string stream_uri, bool is_publish, bool status) void SrsRtcConnection::set_all_tracks_status(std::string stream_uri, bool is_publish, bool status)
{ {
// For publishers. // For publishers.

@ -62,6 +62,8 @@ class SrsRtcConsumer;
class SrsRtcAudioSendTrack; class SrsRtcAudioSendTrack;
class SrsRtcVideoSendTrack; class SrsRtcVideoSendTrack;
class SrsErrorPithyPrint; class SrsErrorPithyPrint;
class SrsPithyPrint;
class SrsStatistic;
const uint8_t kSR = 200; const uint8_t kSR = 200;
const uint8_t kRR = 201; const uint8_t kRR = 201;
@ -245,7 +247,7 @@ class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISr
{ {
private: private:
SrsContextId cid_; SrsContextId cid_;
SrsCoroutine* trd; SrsFastCoroutine* trd_;
SrsRtcConnection* session_; SrsRtcConnection* session_;
SrsRtcPLIWorker* pli_worker_; SrsRtcPLIWorker* pli_worker_;
private: private:
@ -285,6 +287,7 @@ public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
private: private:
srs_error_t send_packets(SrsRtcStream* source, const std::vector<SrsRtpPacket2*>& pkts, SrsRtcPlayStreamStatistic& info); srs_error_t send_packets(SrsRtcStream* source, const std::vector<SrsRtpPacket2*>& pkts, SrsRtcPlayStreamStatistic& info);
srs_error_t send_packet(SrsRtpPacket2* pkt);
public: public:
// Directly set the status of track, generally for init to set the default value. // Directly set the status of track, generally for init to set the default value.
void set_all_tracks_status(bool status); void set_all_tracks_status(bool status);
@ -549,6 +552,7 @@ public:
void simulate_nack_drop(int nn); void simulate_nack_drop(int nn);
void simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes); void simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes);
srs_error_t do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcPlayStreamStatistic& info); srs_error_t do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcPlayStreamStatistic& info);
srs_error_t do_send_packet(SrsRtpPacket2* pkt);
// Directly set the status of play track, generally for init to set the default value. // Directly set the status of play track, generally for init to set the default value.
void set_all_tracks_status(std::string stream_uri, bool is_publish, bool status); void set_all_tracks_status(std::string stream_uri, bool is_publish, bool status);
private: private:

@ -197,7 +197,7 @@ srs_error_t SrsRtcConsumer::enqueue(SrsRtpPacket2* pkt)
return err; return err;
} }
srs_error_t SrsRtcConsumer::dump_packets(std::vector<SrsRtpPacket2*>& pkts) srs_error_t SrsRtcConsumer::dump_packet(SrsRtpPacket2** ppkt)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -206,7 +206,11 @@ srs_error_t SrsRtcConsumer::dump_packets(std::vector<SrsRtpPacket2*>& pkts)
should_update_source_id = false; should_update_source_id = false;
} }
queue.swap(pkts); // TODO: FIXME: Refine performance by ring buffer.
if (!queue.empty()) {
*ppkt = queue.front();
queue.erase(queue.begin());
}
return err; return err;
} }
@ -2049,7 +2053,7 @@ std::string SrsRtcSendTrack::get_track_id()
return track_desc_->id_; return track_desc_->id_;
} }
srs_error_t SrsRtcSendTrack::on_recv_nack(const vector<uint16_t>& lost_seqs, SrsRtcPlayStreamStatistic& info) srs_error_t SrsRtcSendTrack::on_recv_nack(const vector<uint16_t>& lost_seqs)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -2066,18 +2070,14 @@ srs_error_t SrsRtcSendTrack::on_recv_nack(const vector<uint16_t>& lost_seqs, Srs
continue; continue;
} }
info.nn_bytes += pkt->nb_bytes();
uint32_t nn = 0; uint32_t nn = 0;
if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) { if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) {
srs_trace("RTC: NACK ARQ seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", pkt->header.get_sequence(), srs_trace("RTC: NACK ARQ seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", pkt->header.get_sequence(),
pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes()); pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes());
} }
vector<SrsRtpPacket2*> resend_pkts;
resend_pkts.push_back(pkt);
// By default, we send packets by sendmmsg. // By default, we send packets by sendmmsg.
if ((err = session_->do_send_packets(resend_pkts, info)) != srs_success) { if ((err = session_->do_send_packet(pkt)) != srs_success) {
return srs_error_wrap(err, "raw send"); return srs_error_wrap(err, "raw send");
} }
} }
@ -2094,7 +2094,7 @@ SrsRtcAudioSendTrack::~SrsRtcAudioSendTrack()
{ {
} }
srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info) srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -2122,8 +2122,6 @@ srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamSta
} }
// Update stats. // Update stats.
info.nn_bytes += pkt->nb_bytes();
info.nn_audios++;
session_->stat_->nn_out_audios++; session_->stat_->nn_out_audios++;
// track level statistic // track level statistic
@ -2131,13 +2129,8 @@ srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamSta
statistic_->packets++; statistic_->packets++;
statistic_->bytes += pkt->nb_bytes(); statistic_->bytes += pkt->nb_bytes();
if (true) { if ((err = session_->do_send_packet(pkt)) != srs_success) {
std::vector<SrsRtpPacket2*> pkts; return srs_error_wrap(err, "raw send");
pkts.push_back(pkt);
if ((err = session_->do_send_packets(pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
} }
return err; return err;
@ -2159,7 +2152,7 @@ SrsRtcVideoSendTrack::~SrsRtcVideoSendTrack()
{ {
} }
srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info) srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -2189,8 +2182,6 @@ srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamSta
} }
// Update stats. // Update stats.
info.nn_bytes += pkt->nb_bytes();
info.nn_videos++;
session_->stat_->nn_out_videos++; session_->stat_->nn_out_videos++;
// track level statistic // track level statistic
@ -2198,15 +2189,10 @@ srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamSta
statistic->packets++; statistic->packets++;
statistic->bytes += pkt->nb_bytes(); statistic->bytes += pkt->nb_bytes();
if (true) { if ((err = session_->do_send_packet(pkt)) != srs_success) {
std::vector<SrsRtpPacket2*> pkts; return srs_error_wrap(err, "raw send");
pkts.push_back(pkt);
if ((err = session_->do_send_packets(pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
} }
return err; return err;
} }

@ -97,8 +97,8 @@ public:
// Put RTP packet into queue. // Put RTP packet into queue.
// @note We do not drop packet here, but drop it in sender. // @note We do not drop packet here, but drop it in sender.
srs_error_t enqueue(SrsRtpPacket2* pkt); srs_error_t enqueue(SrsRtpPacket2* pkt);
// Get all RTP packets from queue. // For RTC, we only got one packet, because there is not many packets in queue.
virtual srs_error_t dump_packets(std::vector<SrsRtpPacket2*>& pkts); virtual srs_error_t dump_packet(SrsRtpPacket2** ppkt);
// Wait for at-least some messages incoming in queue. // Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs); virtual void wait(int nb_msgs);
}; };
@ -591,9 +591,9 @@ public:
bool get_track_status(); bool get_track_status();
std::string get_track_id(); std::string get_track_id();
public: public:
virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info) = 0; virtual srs_error_t on_rtp(SrsRtpPacket2* pkt) = 0;
virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt) = 0; virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt) = 0;
virtual srs_error_t on_recv_nack(const std::vector<uint16_t>& lost_seqs, SrsRtcPlayStreamStatistic& info); virtual srs_error_t on_recv_nack(const std::vector<uint16_t>& lost_seqs);
}; };
class SrsRtcAudioSendTrack : public SrsRtcSendTrack class SrsRtcAudioSendTrack : public SrsRtcSendTrack
@ -602,7 +602,7 @@ public:
SrsRtcAudioSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); SrsRtcAudioSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc);
virtual ~SrsRtcAudioSendTrack(); virtual ~SrsRtcAudioSendTrack();
public: public:
virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info); virtual srs_error_t on_rtp(SrsRtpPacket2* pkt);
virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt);
}; };
@ -612,7 +612,7 @@ public:
SrsRtcVideoSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc); SrsRtcVideoSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc);
virtual ~SrsRtcVideoSendTrack(); virtual ~SrsRtcVideoSendTrack();
public: public:
virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info); virtual srs_error_t on_rtp(SrsRtpPacket2* pkt);
virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt); virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt);
}; };

@ -87,9 +87,54 @@ const SrsContextId& SrsDummyCoroutine::cid()
return _srs_context->get_id(); return _srs_context->get_id();
} }
SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h)
{
impl_ = new SrsFastCoroutine(n, h);
}
SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid)
{
impl_ = new SrsFastCoroutine(n, h, cid);
}
SrsSTCoroutine::~SrsSTCoroutine()
{
srs_freep(impl_);
}
void SrsSTCoroutine::set_stack_size(int v)
{
impl_->set_stack_size(v);
}
srs_error_t SrsSTCoroutine::start()
{
return impl_->start();
}
void SrsSTCoroutine::stop()
{
impl_->stop();
}
void SrsSTCoroutine::interrupt()
{
impl_->interrupt();
}
srs_error_t SrsSTCoroutine::pull()
{
return impl_->pull();
}
const SrsContextId& SrsSTCoroutine::cid()
{
return impl_->cid();
}
_ST_THREAD_CREATE_PFN _pfn_st_thread_create = (_ST_THREAD_CREATE_PFN)st_thread_create; _ST_THREAD_CREATE_PFN _pfn_st_thread_create = (_ST_THREAD_CREATE_PFN)st_thread_create;
SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h) SrsFastCoroutine::SrsFastCoroutine(string n, ISrsCoroutineHandler* h)
{ {
// TODO: FIXME: Reduce duplicated code. // TODO: FIXME: Reduce duplicated code.
name = n; name = n;
@ -102,7 +147,7 @@ SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h)
stack_size = 0; stack_size = 0;
} }
SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid) SrsFastCoroutine::SrsFastCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid)
{ {
name = n; name = n;
handler = h; handler = h;
@ -115,19 +160,19 @@ SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId c
stack_size = 0; stack_size = 0;
} }
SrsSTCoroutine::~SrsSTCoroutine() SrsFastCoroutine::~SrsFastCoroutine()
{ {
stop(); stop();
srs_freep(trd_err); srs_freep(trd_err);
} }
void SrsSTCoroutine::set_stack_size(int v) void SrsFastCoroutine::set_stack_size(int v)
{ {
stack_size = v; stack_size = v;
} }
srs_error_t SrsSTCoroutine::start() srs_error_t SrsFastCoroutine::start()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -159,7 +204,7 @@ srs_error_t SrsSTCoroutine::start()
return err; return err;
} }
void SrsSTCoroutine::stop() void SrsFastCoroutine::stop()
{ {
if (disposed) { if (disposed) {
return; return;
@ -190,7 +235,7 @@ void SrsSTCoroutine::stop()
return; return;
} }
void SrsSTCoroutine::interrupt() void SrsFastCoroutine::interrupt()
{ {
if (!started || interrupted || cycle_done) { if (!started || interrupted || cycle_done) {
return; return;
@ -204,17 +249,12 @@ void SrsSTCoroutine::interrupt()
st_thread_interrupt((st_thread_t)trd); st_thread_interrupt((st_thread_t)trd);
} }
srs_error_t SrsSTCoroutine::pull() const SrsContextId& SrsFastCoroutine::cid()
{
return srs_error_copy(trd_err);
}
const SrsContextId& SrsSTCoroutine::cid()
{ {
return cid_; return cid_;
} }
srs_error_t SrsSTCoroutine::cycle() srs_error_t SrsFastCoroutine::cycle()
{ {
if (_srs_context) { if (_srs_context) {
if (cid_.empty()) { if (cid_.empty()) {
@ -234,9 +274,9 @@ srs_error_t SrsSTCoroutine::cycle()
return err; return err;
} }
void* SrsSTCoroutine::pfn(void* arg) void* SrsFastCoroutine::pfn(void* arg)
{ {
SrsSTCoroutine* p = (SrsSTCoroutine*)arg; SrsFastCoroutine* p = (SrsFastCoroutine*)arg;
srs_error_t err = p->cycle(); srs_error_t err = p->cycle();

@ -29,9 +29,12 @@
#include <string> #include <string>
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_service_st.hpp> #include <srs_service_st.hpp>
#include <srs_protocol_io.hpp> #include <srs_protocol_io.hpp>
class SrsFastCoroutine;
// Each ST-coroutine must implements this interface, // Each ST-coroutine must implements this interface,
// to do the cycle job and handle some events. // to do the cycle job and handle some events.
// //
@ -108,10 +111,6 @@ public:
virtual const SrsContextId& cid(); virtual const SrsContextId& cid();
}; };
// For utest to mock the thread create.
typedef void* (*_ST_THREAD_CREATE_PFN)(void *(*start)(void *arg), void *arg, int joinable, int stack_size);
extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create;
// A ST-coroutine is a lightweight thread, just like the goroutine. // A ST-coroutine is a lightweight thread, just like the goroutine.
// But the goroutine maybe run on different thread, while ST-coroutine only // But the goroutine maybe run on different thread, while ST-coroutine only
// run in single thread, because it use setjmp and longjmp, so it may cause // run in single thread, because it use setjmp and longjmp, so it may cause
@ -127,19 +126,7 @@ extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create;
class SrsSTCoroutine : public SrsCoroutine class SrsSTCoroutine : public SrsCoroutine
{ {
private: private:
std::string name; SrsFastCoroutine* impl_;
int stack_size;
ISrsCoroutineHandler* handler;
private:
srs_thread_t trd;
SrsContextId cid_;
srs_error_t trd_err;
private:
bool started;
bool interrupted;
bool disposed;
// Cycle done, no need to interrupt it.
bool cycle_done;
public: public:
// Create a thread with name n and handler h. // Create a thread with name n and handler h.
// @remark User can specify a cid for thread to use, or we will allocate a new one. // @remark User can specify a cid for thread to use, or we will allocate a new one.
@ -170,8 +157,48 @@ public:
virtual srs_error_t pull(); virtual srs_error_t pull();
// Get the context id of thread. // Get the context id of thread.
virtual const SrsContextId& cid(); virtual const SrsContextId& cid();
};
// For utest to mock the thread create.
typedef void* (*_ST_THREAD_CREATE_PFN)(void *(*start)(void *arg), void *arg, int joinable, int stack_size);
extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create;
// High performance coroutine.
class SrsFastCoroutine
{
private:
std::string name;
int stack_size;
ISrsCoroutineHandler* handler;
private:
srs_thread_t trd;
SrsContextId cid_;
srs_error_t trd_err;
private:
bool started;
bool interrupted;
bool disposed;
// Cycle done, no need to interrupt it.
bool cycle_done;
public:
SrsFastCoroutine(std::string n, ISrsCoroutineHandler* h);
SrsFastCoroutine(std::string n, ISrsCoroutineHandler* h, SrsContextId cid);
~SrsFastCoroutine();
public:
void set_stack_size(int v);
public:
srs_error_t start();
void stop();
void interrupt();
inline srs_error_t pull() {
if (trd_err == srs_success) {
return srs_success;
}
return srs_error_copy(trd_err);
}
const SrsContextId& cid();
private: private:
virtual srs_error_t cycle(); srs_error_t cycle();
static void* pfn(void* arg); static void* pfn(void* arg);
}; };

Loading…
Cancel
Save