RTC: Refine audio to RTP packet base.

pull/1804/head
winlin 5 years ago
parent 588d17c09d
commit 54d8c36905

@ -573,7 +573,6 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid)
video_sequence = 0;
mw_sleep = 0;
mw_msgs = 0;
realtime = true;
@ -638,9 +637,8 @@ srs_error_t SrsRtcPlayer::on_reload_vhost_play(string vhost)
realtime = _srs_config->get_realtime_enabled(req->vhost, true);
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true);
mw_sleep = _srs_config->get_mw_sleep(req->vhost, true);
srs_trace("Reload play realtime=%d, mw_msgs=%d, mw_sleep=%d", realtime, mw_msgs, mw_sleep);
srs_trace("Reload play realtime=%d, mw_msgs=%d", realtime, mw_msgs);
return srs_success;
}
@ -692,7 +690,7 @@ srs_error_t SrsRtcPlayer::cycle()
SrsRtcConsumer* consumer = NULL;
SrsAutoFree(SrsRtcConsumer, consumer);
if ((err = source->create_consumer(NULL, consumer)) != srs_success) {
if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str());
}
@ -702,7 +700,6 @@ srs_error_t SrsRtcPlayer::cycle()
}
realtime = _srs_config->get_realtime_enabled(req->vhost, true);
mw_sleep = _srs_config->get_mw_sleep(req->vhost, true);
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true);
// We merged write more messages, so we need larger queue.
@ -713,11 +710,8 @@ srs_error_t SrsRtcPlayer::cycle()
sender->set_extra_ratio(80);
}
srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", req->get_stream_url().c_str(),
::getpid(), source->source_id(), session_->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs);
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
SrsRtcOutgoingPackets pkts(SRS_PERF_RTC_RTP_PACKETS);
srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_msgs=%d", req->get_stream_url().c_str(),
::getpid(), source->source_id(), session_->encrypt, realtime, mw_msgs);
SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_play();
SrsAutoFree(SrsPithyPrint, pprint);
@ -726,41 +720,35 @@ srs_error_t SrsRtcPlayer::cycle()
bool stat_enabled = _srs_config->get_rtc_server_perf_stat();
SrsStatistic* stat = SrsStatistic::instance();
// TODO: FIXME: Use cache for performance?
vector<SrsRtpPacket2*> pkts;
SrsRtcOutgoingPackets info;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "rtc sender thread");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// Wait for amount of messages or a duration.
consumer->wait(mw_msgs, mw_sleep);
#endif
// Wait for amount of packets.
consumer->wait(mw_msgs);
// Try to read some messages.
int msg_count = 0;
if ((err = consumer->dump_packets(&msgs, msg_count)) != srs_success) {
continue;
}
// TODO: FIXME: Handle error.
consumer->dump_packets(pkts);
if (msg_count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
srs_usleep(mw_sleep);
#endif
int msg_count = (int)pkts.size();
if (!msg_count) {
continue;
}
// Transmux and send out messages.
pkts.reset(gso, merge_nalus);
if ((err = send_messages(source, msgs.msgs, msg_count, pkts)) != srs_success) {
srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err);
}
// Send-out all RTP packets and do cleanup.
// TODO: FIXME: Handle error.
send_messages(source, pkts, info);
// Do cleanup messages.
for (int i = 0; i < msg_count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
srs_freep(msg);
SrsRtpPacket2* pkt = pkts[i];
srs_freep(pkt);
}
pkts.clear();
// Stat for performance analysis.
if (!stat_enabled) {
@ -770,36 +758,29 @@ srs_error_t SrsRtcPlayer::cycle()
// Stat the original RAW AV frame, maybe h264+aac.
stat->perf_on_msgs(msg_count);
// Stat the RTC packets, RAW AV frame, maybe h.264+opus.
int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos;
int nn_rtc_packets = srs_max(info.nn_audios, info.nn_extras) + info.nn_videos;
stat->perf_on_rtc_packets(nn_rtc_packets);
// Stat the RAW RTP packets, which maybe group by GSO.
stat->perf_on_rtp_packets(pkts.size());
stat->perf_on_rtp_packets(info.size());
// Stat the RTP packets going into kernel.
stat->perf_on_gso_packets(pkts.nn_rtp_pkts);
stat->perf_on_gso_packets(info.nn_rtp_pkts);
// Stat the bytes and paddings.
stat->perf_on_rtc_bytes(pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes);
stat->perf_on_rtc_bytes(info.nn_bytes, info.nn_rtp_bytes, info.nn_padding_bytes);
// Stat the messages and dropped count.
stat->perf_on_dropped(msg_count, nn_rtc_packets, pkts.nn_dropped);
#if defined(SRS_DEBUG)
srs_trace("RTC PLAY perf, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes",
msg_count, nn_rtc_packets, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos,
pkts.nn_samples, pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes);
#endif
stat->perf_on_dropped(msg_count, nn_rtc_packets, info.nn_dropped);
pprint->elapse();
if (pprint->can_print()) {
// TODO: FIXME: Print stat like frame/s, packet/s, loss_packets.
srs_trace("-> RTC PLAY %d/%d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes, %d pad, %d/%d cache",
msg_count, pkts.nn_dropped, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, pkts.nn_samples, pkts.nn_bytes,
pkts.nn_rtp_bytes, pkts.nn_padding_bytes, pkts.nn_paddings, pkts.size(), pkts.capacity());
msg_count, info.nn_dropped, info.size(), info.nn_rtp_pkts, info.nn_audios, info.nn_extras, info.nn_videos, info.nn_samples, info.nn_bytes,
info.nn_rtp_bytes, info.nn_padding_bytes, info.nn_paddings, info.size(), info.capacity());
}
}
}
srs_error_t SrsRtcPlayer::send_messages(
SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets
) {
srs_error_t SrsRtcPlayer::send_messages(SrsRtcSource* source, vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info)
{
srs_error_t err = srs_success;
// If DTLS is not OK, drop all messages.
@ -808,15 +789,16 @@ srs_error_t SrsRtcPlayer::send_messages(
}
// Covert kernel messages to RTP packets.
if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) {
if ((err = messages_to_packets(source, pkts, info)) != srs_success) {
return srs_error_wrap(err, "messages to packets");
}
#ifndef SRS_OSX
// If enabled GSO, send out some packets in a msghdr.
// @remark When NACK simulator is on, we don't use GSO.
// TODO: FIXME: Support GSO.
if (packets.use_gso && !nn_simulate_nack_drop) {
if ((err = send_packets_gso(packets)) != srs_success) {
if ((err = send_packets_gso(info)) != srs_success) {
return srs_error_wrap(err, "gso send");
}
return err;
@ -824,64 +806,54 @@ srs_error_t SrsRtcPlayer::send_messages(
#endif
// By default, we send packets by sendmmsg.
if ((err = send_packets(packets)) != srs_success) {
if ((err = send_packets(pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
return err;
}
srs_error_t SrsRtcPlayer::messages_to_packets(
SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets
) {
srs_error_t SrsRtcPlayer::messages_to_packets(SrsRtcSource* source, vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info)
{
srs_error_t err = srs_success;
ISrsUdpSender* sender = session_->sendonly_skt->sender();
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts[i];
// If overflow, drop all messages.
if (sender->overflow()) {
packets.nn_dropped += nb_msgs - i;
info.nn_dropped += (int)pkts.size() - i;
return err;
}
// Update stats.
packets.nn_bytes += msg->size;
int nn_extra_payloads = msg->nn_extra_payloads();
packets.nn_extras += nn_extra_payloads;
int nn_samples = msg->nn_samples();
packets.nn_samples += nn_samples;
info.nn_bytes += pkt->nb_bytes();
// For audio, we transcoded AAC to opus in extra payloads.
if (msg->is_audio()) {
packets.nn_audios++;
if (pkt->is_audio()) {
info.nn_audios++;
for (int i = 0; i < nn_extra_payloads; i++) {
SrsSample* sample = msg->extra_payloads() + i;
if ((err = package_opus(sample, packets, msg->nn_max_extra_payloads())) != srs_success) {
return srs_error_wrap(err, "opus package");
}
if ((err = package_opus(pkt)) != srs_success) {
return srs_error_wrap(err, "opus package");
}
continue;
}
// For video, we should process all NALUs in samples.
packets.nn_videos++;
info.nn_videos++;
// Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A.
if (msg->has_idr()) {
if ((err = package_stap_a(source, msg, packets)) != srs_success) {
/*if (msg->has_idr()) {
if ((err = package_stap_a(source, msg, info)) != srs_success) {
return srs_error_wrap(err, "packet stap-a");
}
}
// If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet.
if (packets.should_merge_nalus && nn_samples > 1) {
if ((err = package_nalus(msg, packets)) != srs_success) {
if (info.should_merge_nalus && nn_samples > 1) {
if ((err = package_nalus(msg, info)) != srs_success) {
return srs_error_wrap(err, "packet stap-a");
}
continue;
@ -898,25 +870,25 @@ srs_error_t SrsRtcPlayer::messages_to_packets(
}
if (sample->size <= kRtpMaxPayloadSize) {
if ((err = package_single_nalu(msg, sample, packets)) != srs_success) {
if ((err = package_single_nalu(msg, sample, info)) != srs_success) {
return srs_error_wrap(err, "packet single nalu");
}
} else {
if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) {
if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, info)) != srs_success) {
return srs_error_wrap(err, "packet fu-a");
}
}
if (i == nn_samples - 1) {
packets.back()->rtp_header.set_marker(true);
info.back()->rtp_header.set_marker(true);
}
}
}*/
}
return err;
}
srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
srs_error_t SrsRtcPlayer::send_packets(std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info)
{
srs_error_t err = srs_success;
@ -924,9 +896,8 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
bool encrypt = session_->encrypt;
ISrsUdpSender* sender = session_->sendonly_skt->sender();
int nn_packets = packets.size();
for (int i = 0; i < nn_packets; i++) {
SrsRtpPacket2* packet = packets.at(i);
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts.at(i);
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
@ -947,7 +918,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
// Marshal packet to bytes in iovec.
if (true) {
SrsBuffer stream((char*)iov->iov_base, iov->iov_len);
if ((err = packet->encode(&stream)) != srs_success) {
if ((err = pkt->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
iov->iov_len = stream.pos();
@ -965,8 +936,8 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
// Put final RTP packet to NACK/ARQ queue.
if (nack_enabled_) {
SrsRtpPacket2* nack = new SrsRtpPacket2();
nack->rtp_header = packet->rtp_header;
nack->padding = packet->padding;
nack->rtp_header = pkt->rtp_header;
nack->padding = pkt->padding;
// TODO: FIXME: Should avoid memory copying.
SrsRtpRawPayload* payload = nack->reuse_raw();
@ -981,7 +952,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
}
}
packets.nn_rtp_bytes += (int)iov->iov_len;
info.nn_rtp_bytes += (int)iov->iov_len;
// Set the address and control information.
sockaddr_in* addr = (sockaddr_in*)session_->sendonly_skt->peer_addr();
@ -992,11 +963,11 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets)
mhdr->msg_hdr.msg_controllen = 0;
// When we send out a packet, increase the stat counter.
packets.nn_rtp_pkts++;
info.nn_rtp_pkts++;
// For NACK simulator, drop packet.
if (nn_simulate_nack_drop) {
simulate_drop_packet(&packet->rtp_header, (int)iov->iov_len);
simulate_drop_packet(&pkt->rtp_header, (int)iov->iov_len);
iov->iov_len = 0;
continue;
}
@ -1322,34 +1293,18 @@ srs_error_t SrsRtcPlayer::package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoing
return err;
}
srs_error_t SrsRtcPlayer::package_opus(SrsSample* sample, SrsRtcOutgoingPackets& packets, int nn_max_payload)
srs_error_t SrsRtcPlayer::package_opus(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
SrsRtpPacket2* packet = packets.fetch();
if (!packet) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty");
}
packet->rtp_header.set_marker(true);
packet->rtp_header.set_timestamp(audio_timestamp);
packet->rtp_header.set_sequence(audio_sequence++);
packet->rtp_header.set_ssrc(audio_ssrc);
packet->rtp_header.set_payload_type(audio_payload_type);
SrsRtpRawPayload* raw = packet->reuse_raw();
raw->payload = sample->bytes;
raw->nn_payload = sample->size;
pkt->rtp_header.set_marker(true);
pkt->rtp_header.set_timestamp(audio_timestamp);
pkt->rtp_header.set_sequence(audio_sequence++);
pkt->rtp_header.set_ssrc(audio_ssrc);
pkt->rtp_header.set_payload_type(audio_payload_type);
// TODO: FIXME: Padding audio to the max payload in RTP packets.
if (max_padding > 0) {
if (sample->size < nn_max_payload && nn_max_payload - sample->size < max_padding) {
int padding = nn_max_payload - sample->size;
packet->set_padding(padding);
#if defined(SRS_DEBUG)
srs_trace("#%d, Fast Padding %d bytes %d=>%d, SN=%d, max_payload %d, max_padding %d", packets.debug_id,
padding, sample->size, sample->size + padding, packet->rtp_header.get_sequence(), nn_max_payload, max_padding);
#endif
}
}
// TODO: FIXME: Why 960? Need Refactoring?

@ -149,6 +149,7 @@ private:
};
// A group of RTP packets for outgoing(send to players).
// TODO: FIXME: Rename to stat for RTP packets.
class SrsRtcOutgoingPackets
{
public:
@ -171,9 +172,11 @@ public:
// one msghdr by GSO, it's only one RTP packet, because we only send once.
int nn_rtp_pkts;
// For video, the samples or NALUs.
// TODO: FIXME: Remove it because we may don't know.
int nn_samples;
// For audio, the generated extra audio packets.
// For example, when transcoding AAC to opus, may many extra payloads for a audio.
// TODO: FIXME: Remove it because we may don't know.
int nn_extras;
// The original audio messages.
int nn_audios;
@ -184,11 +187,13 @@ public:
// The number of dropped messages.
int nn_dropped;
private:
// TODO: FIXME: Remove the cache.
int cursor;
int nn_cache;
SrsRtpPacket2* cache;
public:
SrsRtcOutgoingPackets(int nn_cache_max);
// TODO: FIXME: Remove the cache.
SrsRtcOutgoingPackets(int nn_cache_max = 8);
virtual ~SrsRtcOutgoingPackets();
public:
void reset(bool gso, bool merge_nalus);
@ -227,7 +232,6 @@ private:
bool gso;
int max_padding;
// For merged-write messages.
srs_utime_t mw_sleep;
int mw_msgs;
bool realtime;
// Whether enabled nack.
@ -251,12 +255,12 @@ public:
public:
virtual srs_error_t cycle();
private:
srs_error_t send_messages(SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets);
srs_error_t messages_to_packets(SrsRtcSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets);
srs_error_t send_packets(SrsRtcOutgoingPackets& packets);
srs_error_t send_messages(SrsRtcSource* source, std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info);
srs_error_t messages_to_packets(SrsRtcSource* source, std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info);
srs_error_t send_packets(std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info);
srs_error_t send_packets_gso(SrsRtcOutgoingPackets& packets);
private:
srs_error_t package_opus(SrsSample* sample, SrsRtcOutgoingPackets& packets, int nn_max_payload);
srs_error_t package_opus(SrsRtpPacket2* pkt);
private:
srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcOutgoingPackets& packets);
srs_error_t package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets);

@ -80,30 +80,27 @@ srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFor
return err;
}
SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s, SrsConnection* c)
SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s)
{
source = s;
conn = c;
should_update_source_id = false;
queue = new SrsMessageQueue();
#ifdef SRS_PERF_QUEUE_COND_WAIT
mw_wait = srs_cond_new();
mw_min_msgs = 0;
mw_duration = 0;
mw_waiting = false;
#endif
}
SrsRtcConsumer::~SrsRtcConsumer()
{
source->on_consumer_destroy(this);
srs_freep(queue);
vector<SrsRtpPacket2*>::iterator it;
for (it = queue.begin(); it != queue.end(); ++it) {
SrsRtpPacket2* pkt = *it;
srs_freep(pkt);
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
srs_cond_destroy(mw_wait);
#endif
}
void SrsRtcConsumer::update_source_id()
@ -114,66 +111,46 @@ void SrsRtcConsumer::update_source_id()
srs_error_t SrsRtcConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{
srs_error_t err = srs_success;
return err;
}
SrsSharedPtrMessage* msg = shared_msg->copy();
srs_error_t SrsRtcConsumer::enqueue2(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
if ((err = queue->enqueue(msg, NULL)) != srs_success) {
return srs_error_wrap(err, "enqueue message");
}
queue.push_back(pkt);
#ifdef SRS_PERF_QUEUE_COND_WAIT
// fire the mw when msgs is enough.
if (mw_waiting) {
if (queue->size() > mw_min_msgs) {
if ((int)queue.size() > mw_min_msgs) {
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
return err;
}
#endif
return err;
}
srs_error_t SrsRtcConsumer::dump_packets(SrsMessageArray* msgs, int& count)
srs_error_t SrsRtcConsumer::dump_packets(std::vector<SrsRtpPacket2*>& pkts)
{
srs_error_t err = srs_success;
srs_assert(count >= 0);
srs_assert(msgs->max > 0);
// the count used as input to reset the max if positive.
int max = count? srs_min(count, msgs->max) : msgs->max;
// the count specifies the max acceptable count,
// here maybe 1+, and we must set to 0 when got nothing.
count = 0;
if (should_update_source_id) {
srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id());
should_update_source_id = false;
}
// pump msgs from queue.
if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {
return srs_error_wrap(err, "dump packets");
}
queue.swap(pkts);
return err;
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
void SrsRtcConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
void SrsRtcConsumer::wait(int nb_msgs)
{
mw_min_msgs = nb_msgs;
mw_duration = msgs_duration;
srs_utime_t duration = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
// when duration ok, signal to flush.
if (match_min_msgs && duration > mw_duration) {
if ((int)queue.size() > mw_min_msgs) {
return;
}
@ -183,7 +160,6 @@ void SrsRtcConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
// use cond block wait for high performance mode.
srs_cond_wait(mw_wait);
}
#endif
SrsRtcSourceManager::SrsRtcSourceManager()
{
@ -347,11 +323,11 @@ SrsMetaCache* SrsRtcSource::cached_meta()
return meta;
}
srs_error_t SrsRtcSource::create_consumer(SrsConnection* conn, SrsRtcConsumer*& consumer)
srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
{
srs_error_t err = srs_success;
consumer = new SrsRtcConsumer(this, conn);
consumer = new SrsRtcConsumer(this);
consumers.push_back(consumer);
// TODO: FIXME: Implements edge cluster.
@ -441,7 +417,6 @@ srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
// copy to all consumer
for (int i = 0; i < (int)consumers.size(); i++) {
SrsRtcConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) {
@ -455,6 +430,14 @@ srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg)
srs_error_t SrsRtcSource::on_audio2(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
for (int i = 0; i < (int)consumers.size(); i++) {
SrsRtcConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue2(pkt)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
return err;
}
@ -686,7 +669,7 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio
nn_max_extra_payload = srs_max(nn_max_extra_payload, p->size);
SrsRtpPacket2* packet = new SrsRtpPacket2();
packet->rtp_header.set_marker(true);
packet->frame_type = SrsFrameTypeAudio;
SrsRtpRawPayload* raw = packet->reuse_raw();
raw->payload = new char[p->size];

@ -48,41 +48,26 @@ class SrsRtcConsumer : public ISrsConsumerQueue
{
private:
SrsRtcSource* source;
// The owner connection for debug, maybe NULL.
SrsConnection* conn;
SrsMessageQueue* queue;
std::vector<SrsRtpPacket2*> queue;
// when source id changed, notice all consumers
bool should_update_source_id;
#ifdef SRS_PERF_QUEUE_COND_WAIT
// The cond wait for mw.
// @see https://github.com/ossrs/srs/issues/251
srs_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
srs_utime_t mw_duration;
#endif
public:
SrsRtcConsumer(SrsRtcSource* s, SrsConnection* c);
SrsRtcConsumer(SrsRtcSource* s);
virtual ~SrsRtcConsumer();
public:
// when source id changed, notice client to print.
// When source id changed, notice client to print.
virtual void update_source_id();
// Enqueue an shared ptr message.
// @param shared_msg, directly ptr, copy it if need to save it.
// @param whether atc, donot use jitter correct if true.
// @param ag the algorithm of time jitter.
// Put or get RTP packet in queue.
virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag);
// Get packets in consumer queue.
// @param msgs the msgs array to dump packets to send.
// @param count the count in array, intput and output param.
// @remark user can specifies the count to get specified msgs; 0 to get all if possible.
virtual srs_error_t dump_packets(SrsMessageArray* msgs, int& count);
#ifdef SRS_PERF_QUEUE_COND_WAIT
// wait for messages incomming, atleast nb_msgs and in duration.
// @param nb_msgs the messages count to wait.
// @param msgs_duration the messages duration to wait.
virtual void wait(int nb_msgs, srs_utime_t msgs_duration);
#endif
srs_error_t enqueue2(SrsRtpPacket2* pkt);
virtual srs_error_t dump_packets(std::vector<SrsRtpPacket2*>& pkts);
// Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs);
};
class SrsRtcSourceManager
@ -147,7 +132,7 @@ public:
public:
// Create consumer
// @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsConnection* conn, SrsRtcConsumer*& consumer);
virtual srs_error_t create_consumer(SrsRtcConsumer*& consumer);
// Dumps packets in cache to consumer.
// @param ds, whether dumps the sequence header.
// @param dm, whether dumps the metadata.

@ -125,6 +125,7 @@
* @remark this improve performance for large connectios.
* @see https://github.com/ossrs/srs/issues/251
*/
// TODO: FIXME: Should always enable it.
#define SRS_PERF_QUEUE_COND_WAIT
#ifdef SRS_PERF_QUEUE_COND_WAIT
// For RTMP, use larger wait queue.
@ -212,6 +213,7 @@
#define SRS_PERF_RTC_GSO_MAX 64
// For RTC, the max count of RTP packets we process in one loop.
// TODO: FIXME: Remove it.
#define SRS_PERF_RTC_RTP_PACKETS 1024
#endif

@ -280,6 +280,7 @@ SrsRtpPacket2::SrsRtpPacket2()
nalu_type = SrsAvcNaluTypeReserved;
original_bytes = NULL;
frame_type = SrsFrameTypeReserved;
cache_raw = new SrsRtpRawPayload();
cache_fua = new SrsRtpFUAPayload2();
@ -350,6 +351,11 @@ void SrsRtpPacket2::set_decode_handler(ISrsRtpPacketDecodeHandler* h)
decode_handler = h;
}
bool SrsRtpPacket2::is_audio()
{
return frame_type == SrsFrameTypeAudio;
}
int SrsRtpPacket2::nb_bytes()
{
if (!cache_payload) {

@ -115,12 +115,14 @@ public:
ISrsCodec* payload;
// TODO: FIXME: Merge into rtp_header.
int padding;
// Decoder helper.
// Helper fields.
public:
// The first byte as nalu type, for video decoder only.
SrsAvcNaluType nalu_type;
// The original bytes for decoder or bridger only, we will free it.
char* original_bytes;
// The frame type, for RTMP bridger or SFU source.
SrsFrameType frame_type;
// Fast cache for performance.
private:
// Cache frequently used payload for performance.
@ -145,6 +147,8 @@ public:
SrsRtpFUAPayload2* reuse_fua();
// Set the decode handler.
void set_decode_handler(ISrsRtpPacketDecodeHandler* h);
// Whether the packet is Audio packet.
bool is_audio();
// interface ISrsEncoder
public:
virtual int nb_bytes();

Loading…
Cancel
Save