From 54d8c36905686be20bf09aa61547073f912b31dd Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 13 May 2020 20:13:25 +0800 Subject: [PATCH] RTC: Refine audio to RTP packet base. --- trunk/src/app/srs_app_rtc_conn.cpp | 181 +++++++++--------------- trunk/src/app/srs_app_rtc_conn.hpp | 16 ++- trunk/src/app/srs_app_rtc_source.cpp | 73 ++++------ trunk/src/app/srs_app_rtc_source.hpp | 33 ++--- trunk/src/core/srs_core_performance.hpp | 2 + trunk/src/kernel/srs_kernel_rtc_rtp.cpp | 6 + trunk/src/kernel/srs_kernel_rtc_rtp.hpp | 6 +- 7 files changed, 128 insertions(+), 189 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 1bcb07359..c7e11ed22 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -573,7 +573,6 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid) video_sequence = 0; - mw_sleep = 0; mw_msgs = 0; realtime = true; @@ -638,9 +637,8 @@ srs_error_t SrsRtcPlayer::on_reload_vhost_play(string vhost) realtime = _srs_config->get_realtime_enabled(req->vhost, true); mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); - mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); - srs_trace("Reload play realtime=%d, mw_msgs=%d, mw_sleep=%d", realtime, mw_msgs, mw_sleep); + srs_trace("Reload play realtime=%d, mw_msgs=%d", realtime, mw_msgs); return srs_success; } @@ -692,7 +690,7 @@ srs_error_t SrsRtcPlayer::cycle() SrsRtcConsumer* consumer = NULL; SrsAutoFree(SrsRtcConsumer, consumer); - if ((err = source->create_consumer(NULL, consumer)) != srs_success) { + if ((err = source->create_consumer(consumer)) != srs_success) { return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str()); } @@ -702,7 +700,6 @@ srs_error_t SrsRtcPlayer::cycle() } realtime = _srs_config->get_realtime_enabled(req->vhost, true); - mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); // We merged write more messages, so we need larger queue. @@ -713,11 +710,8 @@ srs_error_t SrsRtcPlayer::cycle() sender->set_extra_ratio(80); } - srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", req->get_stream_url().c_str(), - ::getpid(), source->source_id(), session_->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs); - - SrsMessageArray msgs(SRS_PERF_MW_MSGS); - SrsRtcOutgoingPackets pkts(SRS_PERF_RTC_RTP_PACKETS); + srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_msgs=%d", req->get_stream_url().c_str(), + ::getpid(), source->source_id(), session_->encrypt, realtime, mw_msgs); SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_play(); SrsAutoFree(SrsPithyPrint, pprint); @@ -726,41 +720,35 @@ srs_error_t SrsRtcPlayer::cycle() bool stat_enabled = _srs_config->get_rtc_server_perf_stat(); SrsStatistic* stat = SrsStatistic::instance(); + // TODO: FIXME: Use cache for performance? + vector pkts; + SrsRtcOutgoingPackets info; + while (true) { if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "rtc sender thread"); } -#ifdef SRS_PERF_QUEUE_COND_WAIT - // Wait for amount of messages or a duration. - consumer->wait(mw_msgs, mw_sleep); -#endif + // Wait for amount of packets. + consumer->wait(mw_msgs); - // Try to read some messages. - int msg_count = 0; - if ((err = consumer->dump_packets(&msgs, msg_count)) != srs_success) { - continue; - } + // TODO: FIXME: Handle error. + consumer->dump_packets(pkts); - if (msg_count <= 0) { -#ifndef SRS_PERF_QUEUE_COND_WAIT - srs_usleep(mw_sleep); -#endif + int msg_count = (int)pkts.size(); + if (!msg_count) { continue; } - // Transmux and send out messages. - pkts.reset(gso, merge_nalus); - - if ((err = send_messages(source, msgs.msgs, msg_count, pkts)) != srs_success) { - srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); - } + // Send-out all RTP packets and do cleanup. + // TODO: FIXME: Handle error. + send_messages(source, pkts, info); - // Do cleanup messages. for (int i = 0; i < msg_count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; - srs_freep(msg); + SrsRtpPacket2* pkt = pkts[i]; + srs_freep(pkt); } + pkts.clear(); // Stat for performance analysis. if (!stat_enabled) { @@ -770,36 +758,29 @@ srs_error_t SrsRtcPlayer::cycle() // Stat the original RAW AV frame, maybe h264+aac. stat->perf_on_msgs(msg_count); // Stat the RTC packets, RAW AV frame, maybe h.264+opus. - int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos; + int nn_rtc_packets = srs_max(info.nn_audios, info.nn_extras) + info.nn_videos; stat->perf_on_rtc_packets(nn_rtc_packets); // Stat the RAW RTP packets, which maybe group by GSO. - stat->perf_on_rtp_packets(pkts.size()); + stat->perf_on_rtp_packets(info.size()); // Stat the RTP packets going into kernel. - stat->perf_on_gso_packets(pkts.nn_rtp_pkts); + stat->perf_on_gso_packets(info.nn_rtp_pkts); // Stat the bytes and paddings. - stat->perf_on_rtc_bytes(pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); + stat->perf_on_rtc_bytes(info.nn_bytes, info.nn_rtp_bytes, info.nn_padding_bytes); // Stat the messages and dropped count. - stat->perf_on_dropped(msg_count, nn_rtc_packets, pkts.nn_dropped); - -#if defined(SRS_DEBUG) - srs_trace("RTC PLAY perf, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes", - msg_count, nn_rtc_packets, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, - pkts.nn_samples, pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); -#endif + stat->perf_on_dropped(msg_count, nn_rtc_packets, info.nn_dropped); pprint->elapse(); if (pprint->can_print()) { // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. srs_trace("-> RTC PLAY %d/%d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes, %d pad, %d/%d cache", - msg_count, pkts.nn_dropped, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, pkts.nn_samples, pkts.nn_bytes, - pkts.nn_rtp_bytes, pkts.nn_padding_bytes, pkts.nn_paddings, pkts.size(), pkts.capacity()); + msg_count, info.nn_dropped, info.size(), info.nn_rtp_pkts, info.nn_audios, info.nn_extras, info.nn_videos, info.nn_samples, info.nn_bytes, + info.nn_rtp_bytes, info.nn_padding_bytes, info.nn_paddings, info.size(), info.capacity()); } } } -srs_error_t SrsRtcPlayer::send_messages( - SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets -) { +srs_error_t SrsRtcPlayer::send_messages(SrsRtcSource* source, vector& pkts, SrsRtcOutgoingPackets& info) +{ srs_error_t err = srs_success; // If DTLS is not OK, drop all messages. @@ -808,15 +789,16 @@ srs_error_t SrsRtcPlayer::send_messages( } // Covert kernel messages to RTP packets. - if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) { + if ((err = messages_to_packets(source, pkts, info)) != srs_success) { return srs_error_wrap(err, "messages to packets"); } #ifndef SRS_OSX // If enabled GSO, send out some packets in a msghdr. // @remark When NACK simulator is on, we don't use GSO. + // TODO: FIXME: Support GSO. if (packets.use_gso && !nn_simulate_nack_drop) { - if ((err = send_packets_gso(packets)) != srs_success) { + if ((err = send_packets_gso(info)) != srs_success) { return srs_error_wrap(err, "gso send"); } return err; @@ -824,64 +806,54 @@ srs_error_t SrsRtcPlayer::send_messages( #endif // By default, we send packets by sendmmsg. - if ((err = send_packets(packets)) != srs_success) { + if ((err = send_packets(pkts, info)) != srs_success) { return srs_error_wrap(err, "raw send"); } return err; } -srs_error_t SrsRtcPlayer::messages_to_packets( - SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets -) { +srs_error_t SrsRtcPlayer::messages_to_packets(SrsRtcSource* source, vector& pkts, SrsRtcOutgoingPackets& info) +{ srs_error_t err = srs_success; ISrsUdpSender* sender = session_->sendonly_skt->sender(); - for (int i = 0; i < nb_msgs; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + for (int i = 0; i < (int)pkts.size(); i++) { + SrsRtpPacket2* pkt = pkts[i]; // If overflow, drop all messages. if (sender->overflow()) { - packets.nn_dropped += nb_msgs - i; + info.nn_dropped += (int)pkts.size() - i; return err; } // Update stats. - packets.nn_bytes += msg->size; - - int nn_extra_payloads = msg->nn_extra_payloads(); - packets.nn_extras += nn_extra_payloads; - - int nn_samples = msg->nn_samples(); - packets.nn_samples += nn_samples; + info.nn_bytes += pkt->nb_bytes(); // For audio, we transcoded AAC to opus in extra payloads. - if (msg->is_audio()) { - packets.nn_audios++; + if (pkt->is_audio()) { + info.nn_audios++; - for (int i = 0; i < nn_extra_payloads; i++) { - SrsSample* sample = msg->extra_payloads() + i; - if ((err = package_opus(sample, packets, msg->nn_max_extra_payloads())) != srs_success) { - return srs_error_wrap(err, "opus package"); - } + if ((err = package_opus(pkt)) != srs_success) { + return srs_error_wrap(err, "opus package"); } continue; } // For video, we should process all NALUs in samples. - packets.nn_videos++; + info.nn_videos++; // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. - if (msg->has_idr()) { - if ((err = package_stap_a(source, msg, packets)) != srs_success) { + /*if (msg->has_idr()) { + if ((err = package_stap_a(source, msg, info)) != srs_success) { return srs_error_wrap(err, "packet stap-a"); } } // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. - if (packets.should_merge_nalus && nn_samples > 1) { - if ((err = package_nalus(msg, packets)) != srs_success) { + if (info.should_merge_nalus && nn_samples > 1) { + if ((err = package_nalus(msg, info)) != srs_success) { return srs_error_wrap(err, "packet stap-a"); } continue; @@ -898,25 +870,25 @@ srs_error_t SrsRtcPlayer::messages_to_packets( } if (sample->size <= kRtpMaxPayloadSize) { - if ((err = package_single_nalu(msg, sample, packets)) != srs_success) { + if ((err = package_single_nalu(msg, sample, info)) != srs_success) { return srs_error_wrap(err, "packet single nalu"); } } else { - if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) { + if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, info)) != srs_success) { return srs_error_wrap(err, "packet fu-a"); } } if (i == nn_samples - 1) { - packets.back()->rtp_header.set_marker(true); + info.back()->rtp_header.set_marker(true); } - } + }*/ } return err; } -srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) +srs_error_t SrsRtcPlayer::send_packets(std::vector& pkts, SrsRtcOutgoingPackets& info) { srs_error_t err = srs_success; @@ -924,9 +896,8 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) bool encrypt = session_->encrypt; ISrsUdpSender* sender = session_->sendonly_skt->sender(); - int nn_packets = packets.size(); - for (int i = 0; i < nn_packets; i++) { - SrsRtpPacket2* packet = packets.at(i); + for (int i = 0; i < (int)pkts.size(); i++) { + SrsRtpPacket2* pkt = pkts.at(i); // Fetch a cached message from queue. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. @@ -947,7 +918,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) // Marshal packet to bytes in iovec. if (true) { SrsBuffer stream((char*)iov->iov_base, iov->iov_len); - if ((err = packet->encode(&stream)) != srs_success) { + if ((err = pkt->encode(&stream)) != srs_success) { return srs_error_wrap(err, "encode packet"); } iov->iov_len = stream.pos(); @@ -965,8 +936,8 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) // Put final RTP packet to NACK/ARQ queue. if (nack_enabled_) { SrsRtpPacket2* nack = new SrsRtpPacket2(); - nack->rtp_header = packet->rtp_header; - nack->padding = packet->padding; + nack->rtp_header = pkt->rtp_header; + nack->padding = pkt->padding; // TODO: FIXME: Should avoid memory copying. SrsRtpRawPayload* payload = nack->reuse_raw(); @@ -981,7 +952,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) } } - packets.nn_rtp_bytes += (int)iov->iov_len; + info.nn_rtp_bytes += (int)iov->iov_len; // Set the address and control information. sockaddr_in* addr = (sockaddr_in*)session_->sendonly_skt->peer_addr(); @@ -992,11 +963,11 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) mhdr->msg_hdr.msg_controllen = 0; // When we send out a packet, increase the stat counter. - packets.nn_rtp_pkts++; + info.nn_rtp_pkts++; // For NACK simulator, drop packet. if (nn_simulate_nack_drop) { - simulate_drop_packet(&packet->rtp_header, (int)iov->iov_len); + simulate_drop_packet(&pkt->rtp_header, (int)iov->iov_len); iov->iov_len = 0; continue; } @@ -1322,34 +1293,18 @@ srs_error_t SrsRtcPlayer::package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoing return err; } -srs_error_t SrsRtcPlayer::package_opus(SrsSample* sample, SrsRtcOutgoingPackets& packets, int nn_max_payload) +srs_error_t SrsRtcPlayer::package_opus(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - packet->rtp_header.set_marker(true); - packet->rtp_header.set_timestamp(audio_timestamp); - packet->rtp_header.set_sequence(audio_sequence++); - packet->rtp_header.set_ssrc(audio_ssrc); - packet->rtp_header.set_payload_type(audio_payload_type); - - SrsRtpRawPayload* raw = packet->reuse_raw(); - raw->payload = sample->bytes; - raw->nn_payload = sample->size; + pkt->rtp_header.set_marker(true); + pkt->rtp_header.set_timestamp(audio_timestamp); + pkt->rtp_header.set_sequence(audio_sequence++); + pkt->rtp_header.set_ssrc(audio_ssrc); + pkt->rtp_header.set_payload_type(audio_payload_type); + // TODO: FIXME: Padding audio to the max payload in RTP packets. if (max_padding > 0) { - if (sample->size < nn_max_payload && nn_max_payload - sample->size < max_padding) { - int padding = nn_max_payload - sample->size; - packet->set_padding(padding); - -#if defined(SRS_DEBUG) - srs_trace("#%d, Fast Padding %d bytes %d=>%d, SN=%d, max_payload %d, max_padding %d", packets.debug_id, - padding, sample->size, sample->size + padding, packet->rtp_header.get_sequence(), nn_max_payload, max_padding); -#endif - } } // TODO: FIXME: Why 960? Need Refactoring? diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index f7303dd2c..435e11d72 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -149,6 +149,7 @@ private: }; // A group of RTP packets for outgoing(send to players). +// TODO: FIXME: Rename to stat for RTP packets. class SrsRtcOutgoingPackets { public: @@ -171,9 +172,11 @@ public: // one msghdr by GSO, it's only one RTP packet, because we only send once. int nn_rtp_pkts; // For video, the samples or NALUs. + // TODO: FIXME: Remove it because we may don't know. int nn_samples; // For audio, the generated extra audio packets. // For example, when transcoding AAC to opus, may many extra payloads for a audio. + // TODO: FIXME: Remove it because we may don't know. int nn_extras; // The original audio messages. int nn_audios; @@ -184,11 +187,13 @@ public: // The number of dropped messages. int nn_dropped; private: + // TODO: FIXME: Remove the cache. int cursor; int nn_cache; SrsRtpPacket2* cache; public: - SrsRtcOutgoingPackets(int nn_cache_max); + // TODO: FIXME: Remove the cache. + SrsRtcOutgoingPackets(int nn_cache_max = 8); virtual ~SrsRtcOutgoingPackets(); public: void reset(bool gso, bool merge_nalus); @@ -227,7 +232,6 @@ private: bool gso; int max_padding; // For merged-write messages. - srs_utime_t mw_sleep; int mw_msgs; bool realtime; // Whether enabled nack. @@ -251,12 +255,12 @@ public: public: virtual srs_error_t cycle(); private: - srs_error_t send_messages(SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets); - srs_error_t messages_to_packets(SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets); - srs_error_t send_packets(SrsRtcOutgoingPackets& packets); + srs_error_t send_messages(SrsRtcSource* source, std::vector& pkts, SrsRtcOutgoingPackets& info); + srs_error_t messages_to_packets(SrsRtcSource* source, std::vector& pkts, SrsRtcOutgoingPackets& info); + srs_error_t send_packets(std::vector& pkts, SrsRtcOutgoingPackets& info); srs_error_t send_packets_gso(SrsRtcOutgoingPackets& packets); private: - srs_error_t package_opus(SrsSample* sample, SrsRtcOutgoingPackets& packets, int nn_max_payload); + srs_error_t package_opus(SrsRtpPacket2* pkt); private: srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcOutgoingPackets& packets); srs_error_t package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 0f39a19f5..1f396a6c9 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -80,30 +80,27 @@ srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFor return err; } -SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s, SrsConnection* c) +SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s) { source = s; - conn = c; should_update_source_id = false; - queue = new SrsMessageQueue(); -#ifdef SRS_PERF_QUEUE_COND_WAIT mw_wait = srs_cond_new(); mw_min_msgs = 0; - mw_duration = 0; mw_waiting = false; -#endif } SrsRtcConsumer::~SrsRtcConsumer() { source->on_consumer_destroy(this); - srs_freep(queue); + vector::iterator it; + for (it = queue.begin(); it != queue.end(); ++it) { + SrsRtpPacket2* pkt = *it; + srs_freep(pkt); + } -#ifdef SRS_PERF_QUEUE_COND_WAIT srs_cond_destroy(mw_wait); -#endif } void SrsRtcConsumer::update_source_id() @@ -114,66 +111,46 @@ void SrsRtcConsumer::update_source_id() srs_error_t SrsRtcConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) { srs_error_t err = srs_success; + return err; +} - SrsSharedPtrMessage* msg = shared_msg->copy(); +srs_error_t SrsRtcConsumer::enqueue2(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; - if ((err = queue->enqueue(msg, NULL)) != srs_success) { - return srs_error_wrap(err, "enqueue message"); - } + queue.push_back(pkt); -#ifdef SRS_PERF_QUEUE_COND_WAIT - // fire the mw when msgs is enough. if (mw_waiting) { - if (queue->size() > mw_min_msgs) { + if ((int)queue.size() > mw_min_msgs) { srs_cond_signal(mw_wait); mw_waiting = false; return err; } - return err; } -#endif return err; } -srs_error_t SrsRtcConsumer::dump_packets(SrsMessageArray* msgs, int& count) +srs_error_t SrsRtcConsumer::dump_packets(std::vector& pkts) { srs_error_t err = srs_success; - srs_assert(count >= 0); - srs_assert(msgs->max > 0); - - // the count used as input to reset the max if positive. - int max = count? srs_min(count, msgs->max) : msgs->max; - - // the count specifies the max acceptable count, - // here maybe 1+, and we must set to 0 when got nothing. - count = 0; - if (should_update_source_id) { srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id()); should_update_source_id = false; } - // pump msgs from queue. - if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) { - return srs_error_wrap(err, "dump packets"); - } + queue.swap(pkts); return err; } -#ifdef SRS_PERF_QUEUE_COND_WAIT -void SrsRtcConsumer::wait(int nb_msgs, srs_utime_t msgs_duration) +void SrsRtcConsumer::wait(int nb_msgs) { mw_min_msgs = nb_msgs; - mw_duration = msgs_duration; - - srs_utime_t duration = queue->duration(); - bool match_min_msgs = queue->size() > mw_min_msgs; // when duration ok, signal to flush. - if (match_min_msgs && duration > mw_duration) { + if ((int)queue.size() > mw_min_msgs) { return; } @@ -183,7 +160,6 @@ void SrsRtcConsumer::wait(int nb_msgs, srs_utime_t msgs_duration) // use cond block wait for high performance mode. srs_cond_wait(mw_wait); } -#endif SrsRtcSourceManager::SrsRtcSourceManager() { @@ -347,11 +323,11 @@ SrsMetaCache* SrsRtcSource::cached_meta() return meta; } -srs_error_t SrsRtcSource::create_consumer(SrsConnection* conn, SrsRtcConsumer*& consumer) +srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer) { srs_error_t err = srs_success; - consumer = new SrsRtcConsumer(this, conn); + consumer = new SrsRtcConsumer(this); consumers.push_back(consumer); // TODO: FIXME: Implements edge cluster. @@ -441,7 +417,6 @@ srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg) { srs_error_t err = srs_success; - // copy to all consumer for (int i = 0; i < (int)consumers.size(); i++) { SrsRtcConsumer* consumer = consumers.at(i); if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) { @@ -455,6 +430,14 @@ srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg) srs_error_t SrsRtcSource::on_audio2(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; + + for (int i = 0; i < (int)consumers.size(); i++) { + SrsRtcConsumer* consumer = consumers.at(i); + if ((err = consumer->enqueue2(pkt)) != srs_success) { + return srs_error_wrap(err, "consume message"); + } + } + return err; } @@ -686,7 +669,7 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio nn_max_extra_payload = srs_max(nn_max_extra_payload, p->size); SrsRtpPacket2* packet = new SrsRtpPacket2(); - packet->rtp_header.set_marker(true); + packet->frame_type = SrsFrameTypeAudio; SrsRtpRawPayload* raw = packet->reuse_raw(); raw->payload = new char[p->size]; diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index e838f522b..fcff72d59 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -48,41 +48,26 @@ class SrsRtcConsumer : public ISrsConsumerQueue { private: SrsRtcSource* source; - // The owner connection for debug, maybe NULL. - SrsConnection* conn; - SrsMessageQueue* queue; + std::vector queue; // when source id changed, notice all consumers bool should_update_source_id; -#ifdef SRS_PERF_QUEUE_COND_WAIT // The cond wait for mw. // @see https://github.com/ossrs/srs/issues/251 srs_cond_t mw_wait; bool mw_waiting; int mw_min_msgs; - srs_utime_t mw_duration; -#endif public: - SrsRtcConsumer(SrsRtcSource* s, SrsConnection* c); + SrsRtcConsumer(SrsRtcSource* s); virtual ~SrsRtcConsumer(); public: - // when source id changed, notice client to print. + // When source id changed, notice client to print. virtual void update_source_id(); - // Enqueue an shared ptr message. - // @param shared_msg, directly ptr, copy it if need to save it. - // @param whether atc, donot use jitter correct if true. - // @param ag the algorithm of time jitter. + // Put or get RTP packet in queue. virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag); - // Get packets in consumer queue. - // @param msgs the msgs array to dump packets to send. - // @param count the count in array, intput and output param. - // @remark user can specifies the count to get specified msgs; 0 to get all if possible. - virtual srs_error_t dump_packets(SrsMessageArray* msgs, int& count); -#ifdef SRS_PERF_QUEUE_COND_WAIT - // wait for messages incomming, atleast nb_msgs and in duration. - // @param nb_msgs the messages count to wait. - // @param msgs_duration the messages duration to wait. - virtual void wait(int nb_msgs, srs_utime_t msgs_duration); -#endif + srs_error_t enqueue2(SrsRtpPacket2* pkt); + virtual srs_error_t dump_packets(std::vector& pkts); + // Wait for at-least some messages incoming in queue. + virtual void wait(int nb_msgs); }; class SrsRtcSourceManager @@ -147,7 +132,7 @@ public: public: // Create consumer // @param consumer, output the create consumer. - virtual srs_error_t create_consumer(SrsConnection* conn, SrsRtcConsumer*& consumer); + virtual srs_error_t create_consumer(SrsRtcConsumer*& consumer); // Dumps packets in cache to consumer. // @param ds, whether dumps the sequence header. // @param dm, whether dumps the metadata. diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 39493646a..3a426c3c7 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -125,6 +125,7 @@ * @remark this improve performance for large connectios. * @see https://github.com/ossrs/srs/issues/251 */ +// TODO: FIXME: Should always enable it. #define SRS_PERF_QUEUE_COND_WAIT #ifdef SRS_PERF_QUEUE_COND_WAIT // For RTMP, use larger wait queue. @@ -212,6 +213,7 @@ #define SRS_PERF_RTC_GSO_MAX 64 // For RTC, the max count of RTP packets we process in one loop. +// TODO: FIXME: Remove it. #define SRS_PERF_RTC_RTP_PACKETS 1024 #endif diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index c880b6fc0..3304952a6 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -280,6 +280,7 @@ SrsRtpPacket2::SrsRtpPacket2() nalu_type = SrsAvcNaluTypeReserved; original_bytes = NULL; + frame_type = SrsFrameTypeReserved; cache_raw = new SrsRtpRawPayload(); cache_fua = new SrsRtpFUAPayload2(); @@ -350,6 +351,11 @@ void SrsRtpPacket2::set_decode_handler(ISrsRtpPacketDecodeHandler* h) decode_handler = h; } +bool SrsRtpPacket2::is_audio() +{ + return frame_type == SrsFrameTypeAudio; +} + int SrsRtpPacket2::nb_bytes() { if (!cache_payload) { diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index b927c03c4..b09ab3b35 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -115,12 +115,14 @@ public: ISrsCodec* payload; // TODO: FIXME: Merge into rtp_header. int padding; -// Decoder helper. +// Helper fields. public: // The first byte as nalu type, for video decoder only. SrsAvcNaluType nalu_type; // The original bytes for decoder or bridger only, we will free it. char* original_bytes; + // The frame type, for RTMP bridger or SFU source. + SrsFrameType frame_type; // Fast cache for performance. private: // Cache frequently used payload for performance. @@ -145,6 +147,8 @@ public: SrsRtpFUAPayload2* reuse_fua(); // Set the decode handler. void set_decode_handler(ISrsRtpPacketDecodeHandler* h); + // Whether the packet is Audio packet. + bool is_audio(); // interface ISrsEncoder public: virtual int nb_bytes();