diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index c7e11ed22..84d265b94 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -65,12 +65,6 @@ using namespace std; #include #include -// The RTP payload max size, reserved some paddings for SRTP as such: -// kRtpPacketSize = kRtpMaxPayloadSize + paddings -// For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400, -// which reserves 100 bytes for SRTP or paddings. -const int kRtpMaxPayloadSize = kRtpPacketSize - 200; - string gen_random_str(int len) { static string random_table = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; @@ -565,7 +559,6 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid) session_ = s; gso = false; - merge_nalus = false; max_padding = 0; audio_timestamp = 0; @@ -606,12 +599,11 @@ srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assr audio_payload_type = a_pt; gso = _srs_config->get_rtc_server_gso(); - merge_nalus = _srs_config->get_rtc_server_merge_nalus(); max_padding = _srs_config->get_rtc_server_padding(); // TODO: FIXME: Support reload. nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost); - srs_trace("RTC publisher video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d), padding=%d, nack=%d", - video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus, max_padding, nack_enabled_); + srs_trace("RTC publisher video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), gso=%d, padding=%d, nack=%d", + video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, max_padding, nack_enabled_); return err; } @@ -619,10 +611,9 @@ srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assr srs_error_t SrsRtcPlayer::on_reload_rtc_server() { gso = _srs_config->get_rtc_server_gso(); - merge_nalus = _srs_config->get_rtc_server_merge_nalus(); max_padding = _srs_config->get_rtc_server_padding(); - srs_trace("Reload rtc_server gso=%d, merge_nalus=%d, max_padding=%d", gso, merge_nalus, max_padding); + srs_trace("Reload rtc_server gso=%d, max_padding=%d", gso, max_padding); return srs_success; } @@ -836,7 +827,7 @@ srs_error_t SrsRtcPlayer::messages_to_packets(SrsRtcSource* source, vectorhas_idr()) { - if ((err = package_stap_a(source, msg, info)) != srs_success) { - return srs_error_wrap(err, "packet stap-a"); - } - } - - // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. - if (info.should_merge_nalus && nn_samples > 1) { - if ((err = package_nalus(msg, info)) != srs_success) { - return srs_error_wrap(err, "packet stap-a"); - } - continue; + // For video, we should set the RTP packet informations about this consumer. + if ((err = package_video(pkt)) != srs_success) { + return srs_error_wrap(err, "package video"); } - - // By default, we package each NALU(sample) to a RTP or FUA packet. - for (int i = 0; i < nn_samples; i++) { - SrsSample* sample = msg->samples() + i; - - // We always ignore bframe here, if config to discard bframe, - // the bframe flag will not be set. - if (sample->bframe) { - continue; - } - - if (sample->size <= kRtpMaxPayloadSize) { - if ((err = package_single_nalu(msg, sample, info)) != srs_success) { - return srs_error_wrap(err, "packet single nalu"); - } - } else { - if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, info)) != srs_success) { - return srs_error_wrap(err, "packet fu-a"); - } - } - - if (i == nn_samples - 1) { - info.back()->rtp_header.set_marker(true); - } - }*/ } return err; @@ -1203,101 +1159,10 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) return err; } -srs_error_t SrsRtcPlayer::package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets) -{ - srs_error_t err = srs_success; - - SrsRtpRawNALUs* raw = new SrsRtpRawNALUs(); - - for (int i = 0; i < msg->nn_samples(); i++) { - SrsSample* sample = msg->samples() + i; - - // We always ignore bframe here, if config to discard bframe, - // the bframe flag will not be set. - if (sample->bframe) { - continue; - } - - raw->push_back(sample->copy()); - } - - // Ignore empty. - int nn_bytes = raw->nb_bytes(); - if (nn_bytes <= 0) { - srs_freep(raw); - return err; - } - - if (nn_bytes < kRtpMaxPayloadSize) { - // Package NALUs in a single RTP packet. - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - srs_freep(raw); - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - packet->payload = raw; - } else { - // We must free it, should never use RTP packets to free it, - // because more than one RTP packet will refer to it. - SrsAutoFree(SrsRtpRawNALUs, raw); - - // Package NALUs in FU-A RTP packets. - int fu_payload_size = kRtpMaxPayloadSize; - - // The first byte is store in FU-A header. - uint8_t header = raw->skip_first_byte(); - uint8_t nal_type = header & kNalTypeMask; - int nb_left = nn_bytes - 1; - - int num_of_packet = 1 + (nn_bytes - 1) / fu_payload_size; - for (int i = 0; i < num_of_packet; ++i) { - int packet_size = srs_min(nb_left, fu_payload_size); - - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - srs_freep(raw); - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); - packet->payload = fua; - - fua->nri = (SrsAvcNaluType)header; - fua->nalu_type = (SrsAvcNaluType)nal_type; - fua->start = bool(i == 0); - fua->end = bool(i == num_of_packet - 1); - - if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) { - return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes); - } - - nb_left -= packet_size; - } - } - - if (packets.size() > 0) { - packets.back()->rtp_header.set_marker(true); - } - - return err; -} - srs_error_t SrsRtcPlayer::package_opus(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; - pkt->rtp_header.set_marker(true); pkt->rtp_header.set_timestamp(audio_timestamp); pkt->rtp_header.set_sequence(audio_sequence++); pkt->rtp_header.set_ssrc(audio_ssrc); @@ -1313,118 +1178,13 @@ srs_error_t SrsRtcPlayer::package_opus(SrsRtpPacket2* pkt) return err; } -srs_error_t SrsRtcPlayer::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcOutgoingPackets& packets) -{ - srs_error_t err = srs_success; - - char* p = sample->bytes + 1; - int nb_left = sample->size - 1; - uint8_t header = sample->bytes[0]; - uint8_t nal_type = header & kNalTypeMask; - - int num_of_packet = 1 + (sample->size - 1) / fu_payload_size; - for (int i = 0; i < num_of_packet; ++i) { - int packet_size = srs_min(nb_left, fu_payload_size); - - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - SrsRtpFUAPayload2* fua = packet->reuse_fua(); - - fua->nri = (SrsAvcNaluType)header; - fua->nalu_type = (SrsAvcNaluType)nal_type; - fua->start = bool(i == 0); - fua->end = bool(i == num_of_packet - 1); - - fua->payload = p; - fua->size = packet_size; - - p += packet_size; - nb_left -= packet_size; - } - - return err; -} - -// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 -srs_error_t SrsRtcPlayer::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcOutgoingPackets& packets) -{ - srs_error_t err = srs_success; - - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - SrsRtpRawPayload* raw = packet->reuse_raw(); - raw->payload = sample->bytes; - raw->nn_payload = sample->size; - - return err; -} - -srs_error_t SrsRtcPlayer::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets) +srs_error_t SrsRtcPlayer::package_video(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; - SrsMetaCache* meta = source->cached_meta(); - if (!meta) { - return err; - } - - SrsFormat* format = meta->vsh_format(); - if (!format || !format->vcodec) { - return err; - } - - const vector& sps = format->vcodec->sequenceParameterSetNALUnit; - const vector& pps = format->vcodec->pictureParameterSetNALUnit; - if (sps.empty() || pps.empty()) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty"); - } - - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - packet->rtp_header.set_marker(false); - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - SrsRtpSTAPPayload* stap = new SrsRtpSTAPPayload(); - packet->payload = stap; - - uint8_t header = sps[0]; - stap->nri = (SrsAvcNaluType)header; - - if (true) { - SrsSample* sample = new SrsSample(); - sample->bytes = (char*)&sps[0]; - sample->size = (int)sps.size(); - stap->nalus.push_back(sample); - } - - if (true) { - SrsSample* sample = new SrsSample(); - sample->bytes = (char*)&pps[0]; - sample->size = (int)pps.size(); - stap->nalus.push_back(sample); - } - - srs_trace("RTC STAP-A seq=%u, sps %d, pps %d bytes", packet->rtp_header.get_sequence(), sps.size(), pps.size()); + pkt->rtp_header.set_sequence(video_sequence++); + pkt->rtp_header.set_ssrc(video_ssrc); + pkt->rtp_header.set_payload_type(video_payload_type); return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 435e11d72..c6c032fb8 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -154,6 +154,7 @@ class SrsRtcOutgoingPackets { public: bool use_gso; + // TODO: FIXME: Remove it. bool should_merge_nalus; public: #if defined(SRS_DEBUG) @@ -228,7 +229,6 @@ private: int nn_simulate_nack_drop; private: // For merged-write and GSO. - bool merge_nalus; bool gso; int max_padding; // For merged-write messages. @@ -261,11 +261,7 @@ private: srs_error_t send_packets_gso(SrsRtcOutgoingPackets& packets); private: srs_error_t package_opus(SrsRtpPacket2* pkt); -private: - srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcOutgoingPackets& packets); - srs_error_t package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets); - srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcOutgoingPackets& packets); - srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets); + srs_error_t package_video(SrsRtpPacket2* pkt); public: void nack_fetch(std::vector& pkts, uint32_t ssrc, uint16_t seq); void simulate_nack_drop(int nn); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 1f396a6c9..331d350b6 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -35,6 +35,7 @@ #include #include #include +#include const int kChannel = 2; const int kSamplerate = 48000; @@ -44,6 +45,12 @@ const int kMaxOpusPackets = 8; // The max size for each OPUS packet. const int kMaxOpusPacketSize = 4096; +// The RTP payload max size, reserved some paddings for SRTP as such: +// kRtpPacketSize = kRtpMaxPayloadSize + paddings +// For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400, +// which reserves 100 bytes for SRTP or paddings. +const int kRtpMaxPayloadSize = kRtpPacketSize - 200; + using namespace std; // TODO: Add this function into SrsRtpMux class. @@ -413,13 +420,13 @@ void SrsRtcSource::set_rtc_publisher(SrsRtcPublisher* v) rtc_publisher_ = v; } -srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg) +srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; for (int i = 0; i < (int)consumers.size(); i++) { SrsRtcConsumer* consumer = consumers.at(i); - if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) { + if ((err = consumer->enqueue2(pkt)) != srs_success) { return srs_error_wrap(err, "consume message"); } } @@ -427,13 +434,13 @@ srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg) return err; } -srs_error_t SrsRtcSource::on_audio2(SrsRtpPacket2* pkt) +srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg) { srs_error_t err = srs_success; for (int i = 0; i < (int)consumers.size(); i++) { SrsRtcConsumer* consumer = consumers.at(i); - if ((err = consumer->enqueue2(pkt)) != srs_success) { + if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) { return srs_error_wrap(err, "consume message"); } } @@ -532,6 +539,7 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcSource* source) codec = new SrsAudioRecode(kChannel, kSamplerate); discard_aac = false; discard_bframe = false; + merge_nalus = false; } SrsRtcFromRtmpBridger::~SrsRtcFromRtmpBridger() @@ -554,10 +562,12 @@ srs_error_t SrsRtcFromRtmpBridger::initialize(SrsRequest* r) return srs_error_wrap(err, "init codec"); } - // TODO: FIXME: Support reload and log it. + // TODO: FIXME: Support reload. discard_aac = _srs_config->get_rtc_aac_discard(req->vhost); discard_bframe = _srs_config->get_rtc_bframe_discard(req->vhost); - srs_trace("RTC bridge from RTMP, discard_aac=%d, discard_bframe=%d", discard_aac, discard_bframe); + merge_nalus = _srs_config->get_rtc_server_merge_nalus(); + srs_trace("RTC bridge from RTMP, discard_aac=%d, discard_bframe=%d, merge_nalus=%d", + discard_aac, discard_bframe, merge_nalus); return err; } @@ -661,25 +671,18 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio int nn_max_extra_payload = 0; SrsSample samples[nn_opus_packets]; for (int i = 0; i < nn_opus_packets; i++) { - SrsSample* p = samples + i; - p->size = opus_sizes[i]; - p->bytes = new char[p->size]; - memcpy(p->bytes, opus_payloads[i], p->size); - - nn_max_extra_payload = srs_max(nn_max_extra_payload, p->size); + char* data = (char*)opus_payloads[i]; + int size = (int)opus_sizes[i]; - SrsRtpPacket2* packet = new SrsRtpPacket2(); - packet->frame_type = SrsFrameTypeAudio; + // TODO: FIXME: Use it to padding audios. + nn_max_extra_payload = srs_max(nn_max_extra_payload, size); - SrsRtpRawPayload* raw = packet->reuse_raw(); - raw->payload = new char[p->size]; - raw->nn_payload = p->size; - memcpy(raw->payload, opus_payloads[i], p->size); - - // When free the RTP packet, should free the bytes allocated here. - packet->original_bytes = raw->payload; + SrsRtpPacket2* pkt = NULL; + if ((err = package_opus(data, size, &pkt)) != srs_success) { + return srs_error_wrap(err, "package opus"); + } - if ((err = source_->on_audio2(packet)) != srs_success) { + if ((err = source_->on_rtp(pkt)) != srs_success) { return srs_error_wrap(err, "consume opus"); } } @@ -687,6 +690,27 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio return err; } +srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPacket2** ppkt) +{ + srs_error_t err = srs_success; + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->rtp_header.set_marker(true); + pkt->frame_type = SrsFrameTypeAudio; + + SrsRtpRawPayload* raw = pkt->reuse_raw(); + raw->payload = new char[size]; + raw->nn_payload = size; + memcpy(raw->payload, data, size); + + // When free the RTP packet, should free the bytes allocated here. + pkt->original_bytes = raw->payload; + + *ppkt = pkt; + + return err; +} + srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg) { srs_error_t err = srs_success; @@ -709,13 +733,13 @@ srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg) return source_->on_video_imp(msg); } -srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* shared_frame, SrsFormat* format) +srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* format) { srs_error_t err = srs_success; // If IDR, we will insert SPS/PPS before IDR frame. if (format->video && format->video->has_idr) { - shared_frame->set_has_idr(true); + msg->set_has_idr(true); } // Update samples to shared frame. @@ -738,7 +762,266 @@ srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* shared_frame, Srs return err; } - shared_frame->set_samples(format->video->samples, format->video->nb_samples); + // TODO: FIXME: Directly covert samples to RTP packets. + msg->set_samples(format->video->samples, format->video->nb_samples); + int nn_samples = format->video->nb_samples; + + // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. + if (msg->has_idr()) { + SrsRtpPacket2* pkt = NULL; + if ((err = package_stap_a(source_, msg, &pkt)) != srs_success) { + return srs_error_wrap(err, "package stap-a"); + } + + if ((err = source_->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "consume sps/pps"); + } + } + + // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. + vector pkts; + if (merge_nalus && nn_samples > 1) { + if ((err = package_nalus(msg, pkts)) != srs_success) { + return srs_error_wrap(err, "package nalus as one"); + } + } + + // By default, we package each NALU(sample) to a RTP or FUA packet. + for (int i = 0; i < nn_samples; i++) { + SrsSample* sample = msg->samples() + i; + + // We always ignore bframe here, if config to discard bframe, + // the bframe flag will not be set. + if (sample->bframe) { + continue; + } + + if (sample->size <= kRtpMaxPayloadSize) { + if ((err = package_single_nalu(msg, sample, pkts)) != srs_success) { + return srs_error_wrap(err, "package single nalu"); + } + } else { + if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, pkts)) != srs_success) { + return srs_error_wrap(err, "package fu-a"); + } + } + } + + if (pkts.size() > 0) { + pkts.back()->rtp_header.set_marker(true); + } + + return consume_packets(pkts); +} + +srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt) +{ + srs_error_t err = srs_success; + + SrsMetaCache* meta = source->cached_meta(); + if (!meta) { + return err; + } + + SrsFormat* format = meta->vsh_format(); + if (!format || !format->vcodec) { + return err; + } + + // Note that the sps/pps may change, so we should copy it. + const vector& sps = format->vcodec->sequenceParameterSetNALUnit; + const vector& pps = format->vcodec->pictureParameterSetNALUnit; + if (sps.empty() || pps.empty()) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty"); + } + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->rtp_header.set_marker(false); + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + + SrsRtpSTAPPayload* stap = new SrsRtpSTAPPayload(); + pkt->payload = stap; + + uint8_t header = sps[0]; + stap->nri = (SrsAvcNaluType)header; + + // Copy the SPS/PPS bytes, because it may change. + char* p = new char[sps.size() + pps.size()]; + pkt->original_bytes = p; + + if (true) { + SrsSample* sample = new SrsSample(); + sample->bytes = p; + sample->size = (int)sps.size(); + stap->nalus.push_back(sample); + + memcpy(p, (char*)&sps[0], sps.size()); + p += (int)sps.size(); + } + + if (true) { + SrsSample* sample = new SrsSample(); + sample->bytes = p; + sample->size = (int)pps.size(); + stap->nalus.push_back(sample); + + memcpy(p, (char*)&pps[0], pps.size()); + p += (int)pps.size(); + } + + *ppkt = pkt; + srs_trace("RTC STAP-A seq=%u, sps %d, pps %d bytes", pkt->rtp_header.get_sequence(), sps.size(), pps.size()); + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, vector& pkts) +{ + srs_error_t err = srs_success; + + SrsRtpRawNALUs* raw = new SrsRtpRawNALUs(); + + for (int i = 0; i < msg->nn_samples(); i++) { + SrsSample* sample = msg->samples() + i; + + // We always ignore bframe here, if config to discard bframe, + // the bframe flag will not be set. + if (sample->bframe) { + continue; + } + + raw->push_back(sample->copy()); + } + + // Ignore empty. + int nn_bytes = raw->nb_bytes(); + if (nn_bytes <= 0) { + srs_freep(raw); + return err; + } + + if (nn_bytes < kRtpMaxPayloadSize) { + // Package NALUs in a single RTP packet. + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + pkt->payload = raw; + pkt->original_msg = msg->copy(); + pkts.push_back(pkt); + } else { + // We must free it, should never use RTP packets to free it, + // because more than one RTP packet will refer to it. + SrsAutoFree(SrsRtpRawNALUs, raw); + + // Package NALUs in FU-A RTP packets. + int fu_payload_size = kRtpMaxPayloadSize; + + // The first byte is store in FU-A header. + uint8_t header = raw->skip_first_byte(); + uint8_t nal_type = header & kNalTypeMask; + int nb_left = nn_bytes - 1; + + int num_of_packet = 1 + (nn_bytes - 1) / fu_payload_size; + for (int i = 0; i < num_of_packet; ++i) { + int packet_size = srs_min(nb_left, fu_payload_size); + + SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); + if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) { + srs_freep(fua); + return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes); + } + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + + fua->nri = (SrsAvcNaluType)header; + fua->nalu_type = (SrsAvcNaluType)nal_type; + fua->start = bool(i == 0); + fua->end = bool(i == num_of_packet - 1); + + pkt->payload = fua; + pkt->original_msg = msg->copy(); + pkts.push_back(pkt); + + nb_left -= packet_size; + } + } + + return err; +} + +// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 +srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector& pkts) +{ + srs_error_t err = srs_success; + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + + SrsRtpRawPayload* raw = pkt->reuse_raw(); + raw->payload = sample->bytes; + raw->nn_payload = sample->size; + + pkt->original_msg = msg->copy(); + pkts.push_back(pkt); + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector& pkts) +{ + srs_error_t err = srs_success; + + char* p = sample->bytes + 1; + int nb_left = sample->size - 1; + uint8_t header = sample->bytes[0]; + uint8_t nal_type = header & kNalTypeMask; + + int num_of_packet = 1 + (sample->size - 1) / fu_payload_size; + for (int i = 0; i < num_of_packet; ++i) { + int packet_size = srs_min(nb_left, fu_payload_size); + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + + SrsRtpFUAPayload2* fua = pkt->reuse_fua(); + + fua->nri = (SrsAvcNaluType)header; + fua->nalu_type = (SrsAvcNaluType)nal_type; + fua->start = bool(i == 0); + fua->end = bool(i == num_of_packet - 1); + + fua->payload = p; + fua->size = packet_size; + + pkt->original_msg = msg->copy(); + pkts.push_back(pkt); + + p += packet_size; + nb_left -= packet_size; + } + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector& pkts) +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Consume a range of packets. + int i = 0; + for (; i < (int)pkts.size(); i++) { + SrsRtpPacket2* pkt = pkts[i]; + + if ((err = source_->on_rtp(pkt)) != srs_success) { + err = srs_error_wrap(err, "consume sps/pps"); + break; + } + } + + for (; i < (int)pkts.size(); i++) { + SrsRtpPacket2* pkt = pkts[i]; + srs_freep(pkt); + } return err; } diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index fcff72d59..ea1c5ac48 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -43,6 +43,7 @@ class SrsRtcSource; class SrsRtcFromRtmpBridger; class SrsAudioRecode; class SrsRtpPacket2; +class SrsSample; class SrsRtcConsumer : public ISrsConsumerQueue { @@ -149,8 +150,8 @@ public: // Get and set the publisher, passed to consumer to process requests such as PLI. SrsRtcPublisher* rtc_publisher(); void set_rtc_publisher(SrsRtcPublisher* v); + srs_error_t on_rtp(SrsRtpPacket2* pkt); virtual srs_error_t on_audio_imp(SrsSharedPtrMessage* audio); - srs_error_t on_audio2(SrsRtpPacket2* pkt); // When got RTC audio message, which is encoded in opus. // TODO: FIXME: Merge with on_audio. virtual srs_error_t on_video(SrsCommonMessage* video); @@ -173,6 +174,7 @@ private: bool discard_aac; SrsAudioRecode* codec; bool discard_bframe; + bool merge_nalus; public: SrsRtcFromRtmpBridger(SrsRtcSource* source); virtual ~SrsRtcFromRtmpBridger(); @@ -180,13 +182,19 @@ public: virtual srs_error_t initialize(SrsRequest* r); virtual srs_error_t on_publish(); virtual void on_unpublish(); - virtual srs_error_t on_audio(SrsSharedPtrMessage* audio); + virtual srs_error_t on_audio(SrsSharedPtrMessage* msg); private: srs_error_t transcode(char* adts_audio, int nn_adts_audio); + srs_error_t package_opus(char* data, int size, SrsRtpPacket2** ppkt); public: - virtual srs_error_t on_video(SrsSharedPtrMessage* video); + virtual srs_error_t on_video(SrsSharedPtrMessage* msg); private: - srs_error_t filter(SrsSharedPtrMessage* shared_video, SrsFormat* format); + srs_error_t filter(SrsSharedPtrMessage* msg, SrsFormat* format); + srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt); + srs_error_t package_nalus(SrsSharedPtrMessage* msg, std::vector& pkts); + srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector& pkts); + srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& pkts); + srs_error_t consume_packets(std::vector& pkts); }; #endif diff --git a/trunk/src/kernel/srs_kernel_codec.hpp b/trunk/src/kernel/srs_kernel_codec.hpp index 9c74e1387..a5bb960cb 100644 --- a/trunk/src/kernel/srs_kernel_codec.hpp +++ b/trunk/src/kernel/srs_kernel_codec.hpp @@ -636,6 +636,7 @@ public: SrsAvcLevel avc_level; // lengthSizeMinusOne, ISO_IEC_14496-15-AVC-format-2012.pdf, page 16 int8_t NAL_unit_length; + // Note that we may resize the vector, so the under-layer bytes may change. std::vector sequenceParameterSetNALUnit; std::vector pictureParameterSetNALUnit; public: diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index 3304952a6..d3af5de6c 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -31,6 +31,7 @@ using namespace std; #include #include #include +#include SrsRtpHeader::SrsRtpHeader() { @@ -280,6 +281,7 @@ SrsRtpPacket2::SrsRtpPacket2() nalu_type = SrsAvcNaluTypeReserved; original_bytes = NULL; + original_msg = NULL; frame_type = SrsFrameTypeReserved; cache_raw = new SrsRtpRawPayload(); @@ -299,6 +301,7 @@ SrsRtpPacket2::~SrsRtpPacket2() srs_freep(cache_fua); srs_freepa(original_bytes); + srs_freep(original_msg); } void SrsRtpPacket2::set_padding(int size) diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index b09ab3b35..e348631aa 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -56,6 +56,7 @@ const uint8_t kEnd = 0x40; // Fu-header end bit class SrsBuffer; class SrsRtpRawPayload; class SrsRtpFUAPayload2; +class SrsSharedPtrMessage; class SrsRtpHeader { @@ -121,6 +122,8 @@ public: SrsAvcNaluType nalu_type; // The original bytes for decoder or bridger only, we will free it. char* original_bytes; + // The original msg for bridger only, we will free it. + SrsSharedPtrMessage* original_msg; // The frame type, for RTMP bridger or SFU source. SrsFrameType frame_type; // Fast cache for performance.