Merge branch 'develop' into 4.0release

pull/1804/head
winlin 5 years ago
commit 96565acd0f

@ -159,6 +159,8 @@ For previous versions, please read:
## V4 changes
* v4.0, 2020-05-14, For [#307][bug #307], refine core structure, RTMP base on frame, RTC base on RTP. 4.0.26
* v4.0, 2020-05-11, For [#307][bug #307], refine RTC publisher structure. 4.0.25
* v4.0, 2020-04-30, For [#307][bug #307], support publish RTC with passing opus. 4.0.24
* v4.0, 2020-04-14, For [#307][bug #307], support sendmmsg, GSO and reuseport. 4.0.23
* v4.0, 2020-04-05, For [#307][bug #307], SRTP ASM only works with openssl-1.0, auto detect it. 4.0.22

4
trunk/configure vendored

@ -279,8 +279,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
"srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr"
"srs_app_coworkers" "srs_app_hybrid")
if [[ $SRS_RTC == YES ]]; then
MODULE_FILES+=("srs_app_rtc" "srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_codec" "srs_app_rtc_sdp"
"srs_app_rtc_queue" "srs_app_rtc_server")
MODULE_FILES+=("srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_codec" "srs_app_rtc_sdp"
"srs_app_rtc_queue" "srs_app_rtc_server" "srs_app_rtc_source")
fi
if [[ $SRS_GB28181 == YES ]]; then
MODULE_FILES+=("srs_app_gb28181" "srs_app_gb28181_sip")

@ -4879,7 +4879,9 @@ bool SrsConfig::get_rtc_server_gso()
}
#elif LINUX_VERSION_CODE < KERNEL_VERSION(4,18,0)
if (v) {
utsname un = {0};
utsname un;
memset((void*)&un, 0, sizeof(utsname));
int r0 = uname(&un);
if (r0 || strcmp(un.release, "4.18.0") < 0) {
gso_disabled = true;

@ -1,370 +0,0 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 John
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_app_rtc.hpp>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <unistd.h>
#include <algorithm>
#include <sstream>
using namespace std;
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_kernel_flv.hpp>
#include <srs_kernel_rtc_rtp.hpp>
#include <srs_app_config.hpp>
#include <srs_app_source.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_codec.hpp>
#include <srs_kernel_file.hpp>
#include <srs_app_utility.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_protocol_format.hpp>
#include <srs_rtmp_stack.hpp>
#include <openssl/rand.h>
#include <srs_app_rtc_codec.hpp>
// TODO: Add this function into SrsRtpMux class.
srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf)
{
srs_error_t err = srs_success;
if (format->is_aac_sequence_header()) {
return err;
}
if (format->audio->nb_samples != 1) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "adts");
}
int nb_buf = format->audio->samples[0].size + 7;
char* buf = new char[nb_buf];
SrsBuffer stream(buf, nb_buf);
// TODO: Add comment.
stream.write_1bytes(0xFF);
stream.write_1bytes(0xF9);
stream.write_1bytes(((format->acodec->aac_object - 1) << 6) | ((format->acodec->aac_sample_rate & 0x0F) << 2) | ((format->acodec->aac_channels & 0x04) >> 2));
stream.write_1bytes(((format->acodec->aac_channels & 0x03) << 6) | ((nb_buf >> 11) & 0x03));
stream.write_1bytes((nb_buf >> 3) & 0xFF);
stream.write_1bytes(((nb_buf & 0x07) << 5) | 0x1F);
stream.write_1bytes(0xFC);
stream.write_bytes(format->audio->samples[0].bytes, format->audio->samples[0].size);
*pbuf = buf;
*pnn_buf = nb_buf;
return err;
}
SrsRtpH264Muxer::SrsRtpH264Muxer()
{
discard_bframe = false;
}
SrsRtpH264Muxer::~SrsRtpH264Muxer()
{
}
srs_error_t SrsRtpH264Muxer::filter(SrsSharedPtrMessage* shared_frame, SrsFormat* format)
{
srs_error_t err = srs_success;
// If IDR, we will insert SPS/PPS before IDR frame.
if (format->video && format->video->has_idr) {
shared_frame->set_has_idr(true);
}
// Update samples to shared frame.
for (int i = 0; i < format->video->nb_samples; ++i) {
SrsSample* sample = &format->video->samples[i];
// Because RTC does not support B-frame, so we will drop them.
// TODO: Drop B-frame in better way, which not cause picture corruption.
if (discard_bframe) {
if ((err = sample->parse_bframe()) != srs_success) {
return srs_error_wrap(err, "parse bframe");
}
if (sample->bframe) {
continue;
}
}
}
if (format->video->nb_samples <= 0) {
return err;
}
shared_frame->set_samples(format->video->samples, format->video->nb_samples);
return err;
}
SrsRtpOpusMuxer::SrsRtpOpusMuxer()
{
codec = NULL;
}
SrsRtpOpusMuxer::~SrsRtpOpusMuxer()
{
srs_freep(codec);
}
srs_error_t SrsRtpOpusMuxer::initialize()
{
srs_error_t err = srs_success;
codec = new SrsAudioRecode(kChannel, kSamplerate);
if (!codec) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "SrsAacOpus init failed");
}
if ((err = codec->initialize()) != srs_success) {
return srs_error_wrap(err, "init codec");
}
return err;
}
// An AAC packet may be transcoded to many OPUS packets.
const int kMaxOpusPackets = 8;
// The max size for each OPUS packet.
const int kMaxOpusPacketSize = 4096;
srs_error_t SrsRtpOpusMuxer::transcode(SrsSharedPtrMessage* shared_audio, char* adts_audio, int nn_adts_audio)
{
srs_error_t err = srs_success;
// Opus packet cache.
static char* opus_payloads[kMaxOpusPackets];
static bool initialized = false;
if (!initialized) {
initialized = true;
static char opus_packets_cache[kMaxOpusPackets][kMaxOpusPacketSize];
opus_payloads[0] = &opus_packets_cache[0][0];
for (int i = 1; i < kMaxOpusPackets; i++) {
opus_payloads[i] = opus_packets_cache[i];
}
}
// Transcode an aac packet to many opus packets.
SrsSample aac;
aac.bytes = adts_audio;
aac.size = nn_adts_audio;
int nn_opus_packets = 0;
int opus_sizes[kMaxOpusPackets];
if ((err = codec->recode(&aac, opus_payloads, opus_sizes, nn_opus_packets)) != srs_success) {
return srs_error_wrap(err, "recode error");
}
// Save OPUS packets in shared message.
if (nn_opus_packets <= 0) {
return err;
}
int nn_max_extra_payload = 0;
SrsSample samples[nn_opus_packets];
for (int i = 0; i < nn_opus_packets; i++) {
SrsSample* p = samples + i;
p->size = opus_sizes[i];
p->bytes = new char[p->size];
memcpy(p->bytes, opus_payloads[i], p->size);
nn_max_extra_payload = srs_max(nn_max_extra_payload, p->size);
}
shared_audio->set_extra_payloads(samples, nn_opus_packets);
shared_audio->set_max_extra_payload(nn_max_extra_payload);
return err;
}
SrsRtc::SrsRtc()
{
req = NULL;
hub = NULL;
enabled = false;
disposable = false;
last_update_time = 0;
discard_aac = false;
}
SrsRtc::~SrsRtc()
{
srs_freep(rtp_h264_muxer);
}
void SrsRtc::dispose()
{
if (enabled) {
on_unpublish();
}
}
// TODO: FIXME: Dead code?
srs_error_t SrsRtc::cycle()
{
srs_error_t err = srs_success;
return err;
}
srs_error_t SrsRtc::initialize(SrsOriginHub* h, SrsRequest* r)
{
srs_error_t err = srs_success;
hub = h;
req = r;
rtp_h264_muxer = new SrsRtpH264Muxer();
rtp_h264_muxer->discard_bframe = _srs_config->get_rtc_bframe_discard(req->vhost);
// TODO: FIXME: Support reload and log it.
discard_aac = _srs_config->get_rtc_aac_discard(req->vhost);
rtp_opus_muxer = new SrsRtpOpusMuxer();
if (!rtp_opus_muxer) {
return srs_error_wrap(err, "rtp_opus_muxer nullptr");
}
return rtp_opus_muxer->initialize();
}
srs_error_t SrsRtc::on_publish()
{
srs_error_t err = srs_success;
// update the hls time, for hls_dispose.
last_update_time = srs_get_system_time();
// support multiple publish.
if (enabled) {
return err;
}
if (!_srs_config->get_rtc_enabled(req->vhost)) {
return err;
}
// if enabled, open the muxer.
enabled = true;
// ok, the hls can be dispose, or need to be dispose.
disposable = true;
return err;
}
void SrsRtc::on_unpublish()
{
// support multiple unpublish.
if (!enabled) {
return;
}
enabled = false;
}
srs_error_t SrsRtc::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format)
{
srs_error_t err = srs_success;
if (!enabled) {
return err;
}
// Ignore if no format->acodec, it means the codec is not parsed, or unknown codec.
// @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474
if (!format->acodec) {
return err;
}
// update the hls time, for hls_dispose.
last_update_time = srs_get_system_time();
// ts support audio codec: aac/mp3
SrsAudioCodecId acodec = format->acodec->id;
if (acodec != SrsAudioCodecIdAAC && acodec != SrsAudioCodecIdMP3) {
return err;
}
// When drop aac audio packet, never transcode.
if (discard_aac && acodec == SrsAudioCodecIdAAC) {
return err;
}
// ignore sequence header
srs_assert(format->audio);
char* adts_audio = NULL;
int nn_adts_audio = 0;
// TODO: FIXME: Reserve 7 bytes header when create shared message.
if ((err = aac_raw_append_adts_header(shared_audio, format, &adts_audio, &nn_adts_audio)) != srs_success) {
return srs_error_wrap(err, "aac append header");
}
if (adts_audio) {
err = rtp_opus_muxer->transcode(shared_audio, adts_audio, nn_adts_audio);
srs_freep(adts_audio);
}
return err;
}
srs_error_t SrsRtc::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format)
{
srs_error_t err = srs_success;
// TODO: FIXME: Maybe it should config on vhost level.
if (!enabled) {
return err;
}
// Ignore if no format->vcodec, it means the codec is not parsed, or unknown codec.
// @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474
if (!format->vcodec) {
return err;
}
// update the hls time, for hls_dispose.
last_update_time = srs_get_system_time();
// ignore info frame,
// @see https://github.com/ossrs/srs/issues/288#issuecomment-69863909
srs_assert(format->video);
return rtp_h264_muxer->filter(shared_video, format);
}

@ -1,105 +0,0 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 John
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_RTC_HPP
#define SRS_APP_RTC_HPP
#include <srs_core.hpp>
#include <string>
#include <vector>
#include <map>
class SrsFormat;
class SrsSample;
class SrsSharedPtrMessage;
class SrsRequest;
class SrsOriginHub;
class SrsAudioRecode;
class SrsBuffer;
// The RTP packet max size, should never exceed this size.
const int kRtpPacketSize = 1500;
// Payload type will rewrite in srs_app_rtc_conn.cpp when send to client.
const uint8_t kOpusPayloadType = 111;
const uint8_t kH264PayloadType = 102;
const int kChannel = 2;
const int kSamplerate = 48000;
// SSRC will rewrite in srs_app_rtc_conn.cpp when send to client.
const uint32_t kAudioSSRC = 1;
const uint32_t kVideoSSRC = 2;
// TODO: Define interface class like ISrsRtpMuxer
class SrsRtpH264Muxer
{
public:
bool discard_bframe;
public:
SrsRtpH264Muxer();
virtual ~SrsRtpH264Muxer();
public:
srs_error_t filter(SrsSharedPtrMessage* shared_video, SrsFormat* format);
};
// TODO: FIXME: It's not a muxer, but a transcoder.
class SrsRtpOpusMuxer
{
private:
SrsAudioRecode* codec;
public:
SrsRtpOpusMuxer();
virtual ~SrsRtpOpusMuxer();
virtual srs_error_t initialize();
public:
srs_error_t transcode(SrsSharedPtrMessage* shared_audio, char* adts_audio, int nn_adts_audio);
};
class SrsRtc
{
private:
SrsRequest* req;
bool enabled;
bool disposable;
bool discard_aac;
srs_utime_t last_update_time;
SrsRtpH264Muxer* rtp_h264_muxer;
SrsRtpOpusMuxer* rtp_opus_muxer;
SrsOriginHub* hub;
public:
SrsRtc();
virtual ~SrsRtc();
public:
virtual void dispose();
virtual srs_error_t cycle();
public:
virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format);
virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format);
};
#endif

@ -384,8 +384,7 @@ srs_error_t SrsAudioRecode::initialize()
return err;
}
// TODO: FIXME: Rename to transcode.
srs_error_t SrsAudioRecode::recode(SrsSample *pkt, char **buf, int *buf_len, int &n)
srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len, int &n)
{
srs_error_t err = srs_success;

@ -121,7 +121,7 @@ public:
SrsAudioRecode(int channels, int samplerate);
virtual ~SrsAudioRecode();
srs_error_t initialize();
virtual srs_error_t recode(SrsSample *pkt, char **buf, int *buf_len, int &n);
virtual srs_error_t transcode(SrsSample *pkt, char **buf, int *buf_len, int &n);
};
#endif /* SRS_APP_AUDIO_RECODE_HPP */

File diff suppressed because it is too large Load Diff

@ -50,7 +50,7 @@ class SrsStunPacket;
class SrsRtcServer;
class SrsRtcSession;
class SrsSharedPtrMessage;
class SrsSource;
class SrsRtcSource;
class SrsRtpPacket2;
class ISrsUdpSender;
class SrsRtpQueue;
@ -149,11 +149,11 @@ private:
};
// A group of RTP packets for outgoing(send to players).
class SrsRtcOutgoingPackets
// TODO: FIXME: Rename to stat for RTP packets.
class SrsRtcOutgoingInfo
{
public:
bool use_gso;
bool should_merge_nalus;
public:
#if defined(SRS_DEBUG)
// Debug id.
@ -171,9 +171,11 @@ public:
// one msghdr by GSO, it's only one RTP packet, because we only send once.
int nn_rtp_pkts;
// For video, the samples or NALUs.
// TODO: FIXME: Remove it because we may don't know.
int nn_samples;
// For audio, the generated extra audio packets.
// For example, when transcoding AAC to opus, may many extra payloads for a audio.
// TODO: FIXME: Remove it because we may don't know.
int nn_extras;
// The original audio messages.
int nn_audios;
@ -183,20 +185,9 @@ public:
int nn_paddings;
// The number of dropped messages.
int nn_dropped;
private:
int cursor;
int nn_cache;
SrsRtpPacket2* cache;
public:
SrsRtcOutgoingPackets(int nn_cache_max);
virtual ~SrsRtcOutgoingPackets();
public:
void reset(bool gso, bool merge_nalus);
SrsRtpPacket2* fetch();
SrsRtpPacket2* back();
int size();
int capacity();
SrsRtpPacket2* at(int index);
SrsRtcOutgoingInfo();
virtual ~SrsRtcOutgoingInfo();
};
class SrsRtcPlayer : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
@ -223,11 +214,9 @@ private:
int nn_simulate_nack_drop;
private:
// For merged-write and GSO.
bool merge_nalus;
bool gso;
int max_padding;
// For merged-write messages.
srs_utime_t mw_sleep;
int mw_msgs;
bool realtime;
// Whether enabled nack.
@ -251,17 +240,12 @@ public:
public:
virtual srs_error_t cycle();
private:
srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets);
srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets);
srs_error_t send_packets(SrsRtcOutgoingPackets& packets);
srs_error_t send_packets_gso(SrsRtcOutgoingPackets& packets);
private:
srs_error_t package_opus(SrsSample* sample, SrsRtcOutgoingPackets& packets, int nn_max_payload);
private:
srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcOutgoingPackets& packets);
srs_error_t package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets);
srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcOutgoingPackets& packets);
srs_error_t package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets);
srs_error_t send_messages(SrsRtcSource* source, std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
srs_error_t messages_to_packets(SrsRtcSource* source, std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
srs_error_t package_opus(SrsRtpPacket2* pkt);
srs_error_t package_video(SrsRtpPacket2* pkt);
srs_error_t send_packets(std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
srs_error_t send_packets_gso(std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
public:
void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq);
void simulate_nack_drop(int nn);
@ -293,7 +277,7 @@ private:
SrsRtpNackForReceiver* audio_nack_;
private:
SrsRequest* req;
SrsSource* source;
SrsRtcSource* source;
// Whether enabled nack.
bool nack_enabled_;
// Simulators.
@ -313,13 +297,14 @@ private:
srs_error_t send_rtcp_fb_pli(uint32_t ssrc);
public:
srs_error_t on_rtp(char* buf, int nb_buf);
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload);
srs_error_t on_rtcp(char* data, int nb_data);
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload);
private:
srs_error_t on_audio(SrsRtpPacket2* pkt);
srs_error_t on_audio_frame(SrsRtpPacket2* frame);
srs_error_t on_video(SrsRtpPacket2* pkt);
srs_error_t on_video_frame(SrsRtpPacket2* frame);
public:
srs_error_t on_rtcp(char* data, int nb_data);
private:
srs_error_t on_rtcp_sr(char* buf, int nb_buf);
srs_error_t on_rtcp_xr(char* buf, int nb_buf);
srs_error_t on_rtcp_feedback(char* data, int nb_data);
@ -365,7 +350,7 @@ private:
// TODO: FIXME: Support reload.
bool encrypt;
SrsRequest* req;
SrsSource* source_;
SrsRtcSource* source_;
SrsSdp remote_sdp;
SrsSdp local_sdp;
private:
@ -390,7 +375,7 @@ public:
void switch_to_context();
int context_id();
public:
srs_error_t initialize(SrsSource* source, SrsRequest* r, bool is_publisher, std::string username, int context_id);
srs_error_t initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, std::string username, int context_id);
// The peer address may change, we can identify that by STUN messages.
srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r);
srs_error_t on_dtls(char* data, int nb_data);

@ -620,12 +620,13 @@ void SrsRtpVideoQueue::covert_frame(std::vector<SrsRtpVideoPacket*>& frame, SrsR
// TODO: FIXME: Should covert to multiple NALU RTP packet to avoid copying.
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkt->rtp_header = head->rtp_header;
pkt->padding = head->padding;
SrsRtpFUAPayload2* head_payload = dynamic_cast<SrsRtpFUAPayload2*>(head->payload);
pkt->nalu_type = head_payload->nalu_type;
SrsRtpRawPayload* payload = pkt->reuse_raw();
SrsRtpRawPayload* payload = new SrsRtpRawPayload();
pkt->payload = payload;
payload->nn_payload = nn_nalus + 1;
payload->payload = new char[payload->nn_payload];

@ -27,7 +27,6 @@
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_log.hpp>
#include <srs_app_rtc.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_core_autofree.hpp>
@ -38,6 +37,7 @@
#include <srs_app_http_api.hpp>
#include <srs_app_rtc_dtls.hpp>
#include <srs_service_utility.hpp>
#include <srs_app_rtc_source.hpp>
using namespace std;
@ -569,11 +569,8 @@ srs_error_t SrsRtcServer::create_session(
) {
srs_error_t err = srs_success;
SrsSource* source = NULL;
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -663,11 +660,8 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req
return err;
}
SrsSource* source = NULL;
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
SrsRtcSource* source = NULL;
if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,201 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 John
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_RTC_SOURCE_HPP
#define SRS_APP_RTC_SOURCE_HPP
#include <srs_core.hpp>
#include <vector>
#include <map>
#include <srs_service_st.hpp>
#include <srs_app_source.hpp>
class SrsRequest;
class SrsConnection;
class SrsMetaCache;
class SrsRtcPublisher;
class SrsSharedPtrMessage;
class SrsCommonMessage;
class SrsMessageArray;
class SrsRtcSource;
class SrsRtcFromRtmpBridger;
class SrsAudioRecode;
class SrsRtpPacket2;
class SrsSample;
class SrsRtcConsumer : public ISrsConsumerQueue
{
private:
SrsRtcSource* source;
std::vector<SrsRtpPacket2*> queue;
// when source id changed, notice all consumers
bool should_update_source_id;
// The cond wait for mw.
// @see https://github.com/ossrs/srs/issues/251
srs_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
public:
SrsRtcConsumer(SrsRtcSource* s);
virtual ~SrsRtcConsumer();
public:
// When source id changed, notice client to print.
virtual void update_source_id();
// Put or get RTP packet in queue.
virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag);
srs_error_t enqueue2(SrsRtpPacket2* pkt);
virtual srs_error_t dump_packets(std::vector<SrsRtpPacket2*>& pkts);
// Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs);
};
class SrsRtcSourceManager
{
private:
srs_mutex_t lock;
std::map<std::string, SrsRtcSource*> pool;
public:
SrsRtcSourceManager();
virtual ~SrsRtcSourceManager();
public:
// create source when fetch from cache failed.
// @param r the client request.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcSource** pps);
private:
// Get the exists source, NULL when not exists.
// update the request and return the exists source.
virtual SrsRtcSource* fetch(SrsRequest* r);
};
// Global singleton instance.
extern SrsRtcSourceManager* _srs_rtc_sources;
class SrsRtcSource
{
private:
// For publish, it's the publish client id.
// For edge, it's the edge ingest id.
// when source id changed, for example, the edge reconnect,
// invoke the on_source_id_changed() to let all clients know.
int _source_id;
// previous source id.
int _pre_source_id;
SrsRequest* req;
SrsRtcPublisher* rtc_publisher_;
// Transmux RTMP to RTC.
SrsRtcFromRtmpBridger* bridger_;
// The metadata cache.
SrsMetaCache* meta;
private:
// To delivery stream to clients.
std::vector<SrsRtcConsumer*> consumers;
// Whether source is avaiable for publishing.
bool _can_publish;
public:
SrsRtcSource();
virtual ~SrsRtcSource();
public:
virtual srs_error_t initialize(SrsRequest* r);
// Update the authentication information in request.
virtual void update_auth(SrsRequest* r);
// The source id changed.
virtual srs_error_t on_source_id_changed(int id);
// Get current source id.
virtual int source_id();
virtual int pre_source_id();
// Get the bridger.
ISrsSourceBridger* bridger();
// For RTC, we need to package SPS/PPS(in cached meta) before each IDR.
SrsMetaCache* cached_meta();
public:
// Create consumer
// @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsRtcConsumer*& consumer);
// Dumps packets in cache to consumer.
// @param ds, whether dumps the sequence header.
// @param dm, whether dumps the metadata.
// @param dg, whether dumps the gop cache.
virtual srs_error_t consumer_dumps(SrsRtcConsumer* consumer, bool ds = true, bool dm = true, bool dg = true);
virtual void on_consumer_destroy(SrsRtcConsumer* consumer);
// TODO: FIXME: Remove the param is_edge.
virtual bool can_publish(bool is_edge);
// When start publish stream.
virtual srs_error_t on_publish();
// When stop publish stream.
virtual void on_unpublish();
public:
// Get and set the publisher, passed to consumer to process requests such as PLI.
SrsRtcPublisher* rtc_publisher();
void set_rtc_publisher(SrsRtcPublisher* v);
srs_error_t on_rtp(SrsRtpPacket2* pkt);
virtual srs_error_t on_audio_imp(SrsSharedPtrMessage* audio);
// When got RTC audio message, which is encoded in opus.
// TODO: FIXME: Merge with on_audio.
virtual srs_error_t on_video(SrsCommonMessage* video);
virtual srs_error_t on_video_imp(SrsSharedPtrMessage* video);
private:
// The format, codec information.
// TODO: FIXME: Remove it.
SrsRtmpFormat* format;
srs_error_t filter(SrsSharedPtrMessage* shared_video, SrsFormat* format);
};
class SrsRtcFromRtmpBridger : public ISrsSourceBridger
{
private:
SrsRequest* req;
SrsRtcSource* source_;
// The format, codec information.
SrsRtmpFormat* format;
private:
bool discard_aac;
SrsAudioRecode* codec;
bool discard_bframe;
bool merge_nalus;
public:
SrsRtcFromRtmpBridger(SrsRtcSource* source);
virtual ~SrsRtcFromRtmpBridger();
public:
virtual srs_error_t initialize(SrsRequest* r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_audio(SrsSharedPtrMessage* msg);
private:
srs_error_t transcode(char* adts_audio, int nn_adts_audio);
srs_error_t package_opus(char* data, int size, SrsRtpPacket2** ppkt);
public:
virtual srs_error_t on_video(SrsSharedPtrMessage* msg);
private:
srs_error_t filter(SrsSharedPtrMessage* msg, SrsFormat* format);
srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt);
srs_error_t package_nalus(SrsSharedPtrMessage* msg, std::vector<SrsRtpPacket2*>& pkts);
srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector<SrsRtpPacket2*>& pkts);
srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector<SrsRtpPacket2*>& pkts);
srs_error_t consume_packets(std::vector<SrsRtpPacket2*>& pkts);
};
#endif

@ -50,10 +50,7 @@ using namespace std;
#include <srs_app_ng_exec.hpp>
#include <srs_app_dash.hpp>
#include <srs_protocol_format.hpp>
#ifdef SRS_RTC
#include <srs_app_rtc.hpp>
#include <srs_app_rtc_conn.hpp>
#endif
#include <srs_app_rtc_source.hpp>
#define CONST_MAX_JITTER_MS 250
#define CONST_MAX_JITTER_MS_NEG -250
@ -270,17 +267,11 @@ void SrsMessageQueue::set_queue_size(srs_utime_t queue_size)
max_queue_size = queue_size;
}
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow, bool pass_timestamp)
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
{
srs_error_t err = srs_success;
msgs.push_back(msg);
// For RTC, we never care about the timestamp and duration, so we never shrink queue here,
// but we will drop messages in each consumer coroutine.
if (pass_timestamp) {
return err;
}
if (msg->is_av()) {
if (av_start_time == -1) {
@ -289,6 +280,10 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
}
if (max_queue_size <= 0) {
return err;
}
while (av_end_time - av_start_time > max_queue_size) {
// notice the caller queue already overflow and shrinked.
@ -302,7 +297,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
return err;
}
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp)
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{
srs_error_t err = srs_success;
@ -317,13 +312,9 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p
SrsSharedPtrMessage** omsgs = msgs.data();
memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*));
// For RTC, we enable pass_timestamp mode, which never care about the timestamp and duration,
// so we do not have to update the start time here.
if (!pass_timestamp) {
SrsSharedPtrMessage* last = omsgs[count - 1];
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
}
SrsSharedPtrMessage* last = omsgs[count - 1];
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
if (count >= nb_msgs) {
// the pmsgs is big enough and clear msgs at most time.
msgs.clear();
@ -427,6 +418,14 @@ ISrsWakable::~ISrsWakable()
{
}
ISrsConsumerQueue::ISrsConsumerQueue()
{
}
ISrsConsumerQueue::~ISrsConsumerQueue()
{
}
SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
{
source = s;
@ -442,8 +441,6 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
mw_duration = 0;
mw_waiting = false;
#endif
pass_timestamp = false;
}
SrsConsumer::~SrsConsumer()
@ -457,11 +454,6 @@ SrsConsumer::~SrsConsumer()
#endif
}
void SrsConsumer::enable_pass_timestamp()
{
pass_timestamp = true;
}
void SrsConsumer::set_queue_size(srs_utime_t queue_size)
{
queue->set_queue_size(queue_size);
@ -483,33 +475,19 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR
SrsSharedPtrMessage* msg = shared_msg->copy();
// For RTC, we enable pass_timestamp mode, which never correct or depends on monotonic increasing of
// timestamp. And in RTC, the audio and video timebase can be different, so we ignore time_jitter here.
if (!pass_timestamp && !atc) {
if (!atc) {
if ((err = jitter->correct(msg, ag)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
// Put message in queue, here we may enable pass_timestamp mode.
if ((err = queue->enqueue(msg, NULL, pass_timestamp)) != srs_success) {
if ((err = queue->enqueue(msg, NULL)) != srs_success) {
return srs_error_wrap(err, "enqueue message");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// fire the mw when msgs is enough.
if (mw_waiting) {
// For RTC, we use pass_timestamp mode, we don't care about the timestamp in queue,
// so we only check the messages in queue.
if (pass_timestamp) {
if (queue->size() > mw_min_msgs) {
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
return err;
}
// For RTMP, we wait for messages and duration.
srs_utime_t duration = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
@ -560,7 +538,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
}
// pump msgs from queue.
if ((err = queue->dump_packets(max, msgs->msgs, count, pass_timestamp)) != srs_success) {
if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {
return srs_error_wrap(err, "dump packets");
}
@ -859,9 +837,6 @@ SrsOriginHub::SrsOriginHub()
dash = new SrsDash();
dvr = new SrsDvr();
encoder = new SrsEncoder();
#ifdef SRS_RTC
rtc = new SrsRtc();
#endif
#ifdef SRS_HDS
hds = new SrsHds();
#endif
@ -905,12 +880,6 @@ srs_error_t SrsOriginHub::initialize(SrsSource* s, SrsRequest* r)
if ((err = format->initialize()) != srs_success) {
return srs_error_wrap(err, "format initialize");
}
#ifdef SRS_RTC
if ((err = rtc->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "rtc initialize");
}
#endif
if ((err = hls->initialize(this, req)) != srs_success) {
return srs_error_wrap(err, "hls initialize");
@ -1010,15 +979,6 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type],
srs_flv_srates[c->sound_rate]);
}
#ifdef SRS_RTC
// TODO: FIXME: Support parsing OPUS for RTC.
if ((err = rtc->on_audio(msg, format)) != srs_success) {
srs_warn("rtc: ignore audio error %s", srs_error_desc(err).c_str());
srs_error_reset(err);
rtc->on_unpublish();
}
#endif
if ((err = hls->on_audio(msg, format)) != srs_success) {
// apply the error strategy for hls.
@ -1112,16 +1072,6 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
if (format->vcodec && !format->vcodec->is_avc_codec_ok()) {
return err;
}
#ifdef SRS_RTC
// Parse RTMP message to RTP packets, in FU-A if too large.
if ((err = rtc->on_video(msg, format)) != srs_success) {
// TODO: We should support more strategies.
srs_warn("rtc: ignore video error %s", srs_error_desc(err).c_str());
srs_error_reset(err);
rtc->on_unpublish();
}
#endif
if ((err = hls->on_video(msg, format)) != srs_success) {
// TODO: We should support more strategies.
@ -1191,12 +1141,6 @@ srs_error_t SrsOriginHub::on_publish()
return srs_error_wrap(err, "encoder publish");
}
#ifdef SRS_RTC
if ((err = rtc->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtc publish");
}
#endif
if ((err = hls->on_publish()) != srs_success) {
return srs_error_wrap(err, "hls publish");
}
@ -1234,9 +1178,6 @@ void SrsOriginHub::on_unpublish()
destroy_forwarders();
encoder->on_unpublish();
#ifdef SRS_RTC
rtc->on_unpublish();
#endif
hls->on_unpublish();
dash->on_unpublish();
dvr->on_unpublish();
@ -1629,7 +1570,7 @@ SrsFormat* SrsMetaCache::ash_format()
return aformat;
}
srs_error_t SrsMetaCache::dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds)
srs_error_t SrsMetaCache::dumps(ISrsConsumerQueue* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds)
{
srs_error_t err = srs_success;
@ -1775,6 +1716,7 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
// TODO: FIXME: Use smaller lock.
SrsLocker(lock);
SrsSource* source = NULL;
@ -1789,17 +1731,41 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
#ifdef SRS_RTC
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost);
// Get the RTC source and bridger.
SrsRtcSource* rtc = NULL;
if (rtc_server_enabled && rtc_enabled) {
if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) {
err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str());
goto failed;
}
}
#endif
srs_trace("new source, stream_url=%s", stream_url.c_str());
source = new SrsSource();
if ((err = source->initialize(r, h)) != srs_success) {
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
goto failed;
}
#ifdef SRS_RTC
// If rtc enabled, bridge RTMP source to RTC,
// all RTMP packets will be forwarded to RTC source.
if (source && rtc) {
source->bridge_to(rtc->bridger());
}
#endif
pool[stream_url] = source;
*pps = source;
return err;
failed:
srs_freep(source);
return err;
}
@ -1892,6 +1858,14 @@ void SrsSourceManager::destroy()
pool.clear();
}
ISrsSourceBridger::ISrsSourceBridger()
{
}
ISrsSourceBridger::~ISrsSourceBridger()
{
}
SrsSource::SrsSource()
{
req = NULL;
@ -1902,6 +1876,9 @@ SrsSource::SrsSource()
_can_publish = true;
_pre_source_id = _source_id = -1;
die_at = 0;
handler = NULL;
bridger = NULL;
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
@ -1914,10 +1891,6 @@ SrsSource::SrsSource()
_srs_config->subscribe(this);
atc = false;
#ifdef SRS_RTC
rtc_publisher_ = NULL;
#endif
}
SrsSource::~SrsSource()
@ -2012,6 +1985,11 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
return err;
}
void SrsSource::bridge_to(ISrsSourceBridger* v)
{
bridger = v;
}
srs_error_t SrsSource::on_reload_vhost_play(string vhost)
{
srs_error_t err = srs_success;
@ -2262,6 +2240,11 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
return srs_error_wrap(err, "consume audio");
}
// For bridger to consume the message.
if (bridger && (err = bridger->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume audio");
}
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
@ -2290,7 +2273,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
if ((err = gop_cache->cache(msg)) != srs_success) {
return srs_error_wrap(err, "gop cache consume audio");
}
// if atc, update the sequence header to abs time.
if (atc) {
if (meta->ash()) {
@ -2387,6 +2370,11 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
return srs_error_wrap(err, "hub consume video");
}
// For bridger to consume the message.
if (bridger && (err = bridger->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume video");
}
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
@ -2546,6 +2534,11 @@ srs_error_t SrsSource::on_publish()
if ((err = handler->on_publish(this, req)) != srs_success) {
return srs_error_wrap(err, "handle publish");
}
if (bridger && (err = bridger->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridger publish");
}
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_publish(req, _source_id);
@ -2581,7 +2574,12 @@ void SrsSource::on_unpublish()
srs_assert(handler);
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_close(req);
handler->on_unpublish(this, req);
if (bridger) {
bridger->on_unpublish();
}
// no consumer, stream is die.
if (consumers.empty()) {
@ -2695,26 +2693,3 @@ string SrsSource::get_curr_origin()
return play_edge->get_curr_origin();
}
#ifdef SRS_RTC
SrsMetaCache* SrsSource::cached_meta()
{
return meta;
}
SrsRtcPublisher* SrsSource::rtc_publisher()
{
return rtc_publisher_;
}
void SrsSource::set_rtc_publisher(SrsRtcPublisher* v)
{
rtc_publisher_ = v;
}
srs_error_t SrsSource::on_rtc_audio(SrsSharedPtrMessage* audio)
{
// TODO: FIXME: Merge with on_audio.
// TODO: FIXME: Print key information.
return on_audio_imp(audio);
}
#endif

@ -62,9 +62,6 @@ class SrsBuffer;
#ifdef SRS_HDS
class SrsHds;
#endif
#ifdef SRS_RTC
class SrsRtcPublisher;
#endif
// The time jitter algorithm:
// 1. full, to ensure stream start at zero, and ensure stream monotonically increasing.
@ -153,13 +150,12 @@ public:
// Enqueue the message, the timestamp always monotonically.
// @param msg, the msg to enqueue, user never free it whatever the return code.
// @param is_overflow, whether overflow and shrinked. NULL to ignore.
// @remark If pass_timestamp, we never shrink and never care about the timestamp or duration.
virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL, bool pass_timestamp = false);
virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL);
// Get packets in consumer queue.
// @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it.
// @count the count in array, output param.
// @max_count the max count to dequeue, must be positive.
virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp = false);
virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
// Dumps packets to consumer, use specified args.
// @remark the atc/tba/tbv/ag are same to SrsConsumer.enqueue().
virtual srs_error_t dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag);
@ -186,8 +182,18 @@ public:
virtual void wakeup() = 0;
};
// Enqueue the packet to consumer.
class ISrsConsumerQueue
{
public:
ISrsConsumerQueue();
virtual ~ISrsConsumerQueue();
public:
virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) = 0;
};
// The consumer for SrsSource, that is a play client.
class SrsConsumer : public ISrsWakable
class SrsConsumer : virtual public ISrsWakable, virtual public ISrsConsumerQueue
{
private:
SrsRtmpJitter* jitter;
@ -206,17 +212,10 @@ private:
int mw_min_msgs;
srs_utime_t mw_duration;
#endif
private:
// For RTC, we never use jitter to correct timestamp.
// But we should not change the atc or time_jitter for source or RTMP.
// @remark In this mode, we also never check the queue by timstamp, but only by count.
bool pass_timestamp;
public:
SrsConsumer(SrsSource* s, SrsConnection* c);
virtual ~SrsConsumer();
public:
// Use pass timestamp mode.
void enable_pass_timestamp();
// Set the size of queue.
virtual void set_queue_size(srs_utime_t queue_size);
// when source id changed, notice client to print.
@ -347,10 +346,6 @@ private:
private:
// The format, codec information.
SrsRtmpFormat* format;
#ifdef SRS_RTC
// rtc handler
SrsRtc* rtc;
#endif
// hls handler.
SrsHls* hls;
// The DASH encoder.
@ -450,7 +445,7 @@ public:
// Dumps cached metadata to consumer.
// @param dm Whether dumps the metadata.
// @param ds Whether dumps the sequence header.
virtual srs_error_t dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds);
virtual srs_error_t dumps(ISrsConsumerQueue* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds);
public:
// Previous exists sequence header.
virtual SrsSharedPtrMessage* previous_vsh();
@ -493,7 +488,7 @@ public:
private:
virtual srs_error_t do_cycle();
public:
// when system exit, destroy the sources,
// when system exit, destroy th`e sources,
// For gmc to analysis mem leaks.
virtual void destroy();
};
@ -501,6 +496,19 @@ public:
// Global singleton instance.
extern SrsSourceManager* _srs_sources;
// For two sources to bridge with each other.
class ISrsSourceBridger
{
public:
ISrsSourceBridger();
virtual ~ISrsSourceBridger();
public:
virtual srs_error_t on_publish() = 0;
virtual srs_error_t on_audio(SrsSharedPtrMessage* audio) = 0;
virtual srs_error_t on_video(SrsSharedPtrMessage* video) = 0;
virtual void on_unpublish() = 0;
};
// live streaming source.
class SrsSource : public ISrsReloadHandler
{
@ -534,6 +542,8 @@ private:
int64_t last_packet_time;
// The event handler.
ISrsSourceHandler* handler;
// The source bridger for other source.
ISrsSourceBridger* bridger;
// The edge control service
SrsPlayEdge* play_edge;
SrsPublishEdge* publish_edge;
@ -549,10 +559,6 @@ private:
// The last die time, when all consumers quit and no publisher,
// We will remove the source when source die.
srs_utime_t die_at;
#ifdef SRS_RTC
private:
SrsRtcPublisher* rtc_publisher_;
#endif
public:
SrsSource();
virtual ~SrsSource();
@ -564,6 +570,8 @@ public:
public:
// Initialize the hls with handlers.
virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h);
// Bridge to other source, forward packets to it.
void bridge_to(ISrsSourceBridger* v);
// Interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_vhost_play(std::string vhost);
@ -619,17 +627,6 @@ public:
virtual void on_edge_proxy_unpublish();
public:
virtual std::string get_curr_origin();
#ifdef SRS_RTC
public:
// For RTC, we need to package SPS/PPS(in cached meta) before each IDR.
SrsMetaCache* cached_meta();
// Get and set the publisher, passed to consumer to process requests such as PLI.
SrsRtcPublisher* rtc_publisher();
void set_rtc_publisher(SrsRtcPublisher* v);
// When got RTC audio message, which is encoded in opus.
// TODO: FIXME: Merge with on_audio.
srs_error_t on_rtc_audio(SrsSharedPtrMessage* audio);
#endif
};
#endif

@ -125,6 +125,7 @@
* @remark this improve performance for large connectios.
* @see https://github.com/ossrs/srs/issues/251
*/
// TODO: FIXME: Should always enable it.
#define SRS_PERF_QUEUE_COND_WAIT
#ifdef SRS_PERF_QUEUE_COND_WAIT
// For RTMP, use larger wait queue.
@ -212,6 +213,7 @@
#define SRS_PERF_RTC_GSO_MAX 64
// For RTC, the max count of RTP packets we process in one loop.
// TODO: FIXME: Remove it.
#define SRS_PERF_RTC_RTP_PACKETS 1024
#endif

@ -24,6 +24,6 @@
#ifndef SRS_CORE_VERSION4_HPP
#define SRS_CORE_VERSION4_HPP
#define SRS_VERSION4_REVISION 25
#define SRS_VERSION4_REVISION 26
#endif

@ -414,6 +414,7 @@ SrsSample* SrsSample::copy()
SrsSample* p = new SrsSample();
p->bytes = bytes;
p->size = size;
p->bframe = bframe;
return p;
}

@ -636,6 +636,7 @@ public:
SrsAvcLevel avc_level;
// lengthSizeMinusOne, ISO_IEC_14496-15-AVC-format-2012.pdf, page 16
int8_t NAL_unit_length;
// Note that we may resize the vector, so the under-layer bytes may change.
std::vector<char> sequenceParameterSetNALUnit;
std::vector<char> pictureParameterSetNALUnit;
public:

@ -303,6 +303,18 @@ srs_error_t SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload
return err;
}
void SrsSharedPtrMessage::wrap(char* payload, int size)
{
srs_assert(!ptr);
ptr = new SrsSharedPtrPayload();
ptr->payload = payload;
ptr->size = size;
this->payload = ptr->payload;
this->size = ptr->size;
}
int SrsSharedPtrMessage::count()
{
srs_assert(ptr);

@ -304,6 +304,7 @@ private:
// The reference count
int shared_count;
#ifdef SRS_RTC
// TODO: FIXME: Remove it.
public:
// For RTC video, we need to know the NALU structures,
// because the RTP STAP-A or FU-A based on NALU.
@ -340,6 +341,9 @@ public:
// @remark user should never free the payload.
// @param pheader, the header to copy to the message. NULL to ignore.
virtual srs_error_t create(SrsMessageHeader* pheader, char* payload, int size);
// Create shared ptr message from RAW payload.
// @remark Note that the header is set to zero.
virtual void wrap(char* payload, int size);
// Get current reference count.
// when this object created, count set to 0.
// if copy() this object, count increase 1.

@ -31,10 +31,10 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_flv.hpp>
SrsRtpHeader::SrsRtpHeader()
{
padding = false;
padding_length = 0;
extension = false;
cc = 0;
@ -46,17 +46,6 @@ SrsRtpHeader::SrsRtpHeader()
extension_length = 0;
}
void SrsRtpHeader::reset()
{
// We only reset the optional fields, the required field such as ssrc
// will always be set by user.
padding = false;
extension = false;
cc = 0;
marker = false;
extension_length = 0;
}
SrsRtpHeader::~SrsRtpHeader()
{
}
@ -85,7 +74,7 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* buf)
*/
uint8_t first = buf->read_1bytes();
padding = (first & 0x20);
bool padding = (first & 0x20);
extension = (first & 0x10);
cc = (first & 0x0F);
@ -141,7 +130,7 @@ srs_error_t SrsRtpHeader::encode(SrsBuffer* buf)
// The version, padding, extension and cc, total 1 byte.
uint8_t v = 0x80 | cc;
if (padding) {
if (padding_length > 0) {
v |= 0x20;
}
if (extension) {
@ -249,19 +238,22 @@ uint32_t SrsRtpHeader::get_ssrc() const
return ssrc;
}
void SrsRtpHeader::set_padding(bool v)
void SrsRtpHeader::set_padding(uint8_t v)
{
padding = v;
padding_length = v;
}
void SrsRtpHeader::set_padding_length(uint8_t v)
uint8_t SrsRtpHeader::get_padding() const
{
padding_length = v;
return padding_length;
}
uint8_t SrsRtpHeader::get_padding_length() const
ISrsRtpPayloader::ISrsRtpPayloader()
{
}
ISrsRtpPayloader::~ISrsRtpPayloader()
{
return padding_length;
}
ISrsRtpPacketDecodeHandler::ISrsRtpPacketDecodeHandler()
@ -274,88 +266,71 @@ ISrsRtpPacketDecodeHandler::~ISrsRtpPacketDecodeHandler()
SrsRtpPacket2::SrsRtpPacket2()
{
padding = 0;
payload = NULL;
decode_handler = NULL;
nalu_type = SrsAvcNaluTypeReserved;
original_bytes = NULL;
cache_raw = new SrsRtpRawPayload();
cache_fua = new SrsRtpFUAPayload2();
cache_payload = 0;
shared_msg = NULL;
frame_type = SrsFrameTypeReserved;
cached_payload_size = 0;
}
SrsRtpPacket2::~SrsRtpPacket2()
{
// We may use the cache as payload.
if (payload == cache_raw || payload == cache_fua) {
payload = NULL;
}
srs_freep(payload);
srs_freep(cache_raw);
srs_freep(cache_fua);
srs_freepa(original_bytes);
srs_freep(shared_msg);
}
void SrsRtpPacket2::set_padding(int size)
{
rtp_header.set_padding(size > 0);
rtp_header.set_padding_length(size);
if (cache_payload) {
cache_payload += size - padding;
rtp_header.set_padding(size);
if (cached_payload_size) {
cached_payload_size += size - rtp_header.get_padding();
}
padding = size;
}
void SrsRtpPacket2::add_padding(int size)
{
rtp_header.set_padding(padding + size > 0);
rtp_header.set_padding_length(rtp_header.get_padding_length() + size);
if (cache_payload) {
cache_payload += size;
rtp_header.set_padding(rtp_header.get_padding() + size);
if (cached_payload_size) {
cached_payload_size += size;
}
padding += size;
}
void SrsRtpPacket2::reset()
void SrsRtpPacket2::set_decode_handler(ISrsRtpPacketDecodeHandler* h)
{
rtp_header.reset();
padding = 0;
cache_payload = 0;
// We may use the cache as payload.
if (payload == cache_raw || payload == cache_fua) {
payload = NULL;
}
srs_freep(payload);
decode_handler = h;
}
SrsRtpRawPayload* SrsRtpPacket2::reuse_raw()
bool SrsRtpPacket2::is_audio()
{
payload = cache_raw;
return cache_raw;
return frame_type == SrsFrameTypeAudio;
}
SrsRtpFUAPayload2* SrsRtpPacket2::reuse_fua()
SrsRtpPacket2* SrsRtpPacket2::copy()
{
payload = cache_fua;
return cache_fua;
}
SrsRtpPacket2* cp = new SrsRtpPacket2();
void SrsRtpPacket2::set_decode_handler(ISrsRtpPacketDecodeHandler* h)
{
decode_handler = h;
cp->rtp_header = rtp_header;
cp->payload = payload? payload->copy():NULL;
cp->nalu_type = nalu_type;
cp->shared_msg = shared_msg? shared_msg->copy():NULL;
cp->frame_type = frame_type;
cp->cached_payload_size = cached_payload_size;
cp->decode_handler = decode_handler;
return cp;
}
int SrsRtpPacket2::nb_bytes()
{
if (!cache_payload) {
cache_payload = rtp_header.nb_bytes() + (payload? payload->nb_bytes():0) + padding;
if (!cached_payload_size) {
int nn_payload = (payload? payload->nb_bytes():0);
cached_payload_size = rtp_header.nb_bytes() + nn_payload + rtp_header.get_padding();
}
return cache_payload;
return cached_payload_size;
}
srs_error_t SrsRtpPacket2::encode(SrsBuffer* buf)
@ -370,7 +345,8 @@ srs_error_t SrsRtpPacket2::encode(SrsBuffer* buf)
return srs_error_wrap(err, "rtp payload");
}
if (padding > 0) {
if (rtp_header.get_padding() > 0) {
uint8_t padding = rtp_header.get_padding();
if (!buf->require(padding)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", padding);
}
@ -390,7 +366,7 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf)
}
// We must skip the padding bytes before parsing payload.
padding = rtp_header.get_padding_length();
uint8_t padding = rtp_header.get_padding();
if (!buf->require(padding)) {
return srs_error_wrap(err, "requires padding %d bytes", padding);
}
@ -408,7 +384,7 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf)
// By default, we always use the RAW payload.
if (!payload) {
payload = reuse_raw();
payload = new SrsRtpRawPayload();
}
if ((err = payload->decode(buf)) != srs_success) {
@ -460,6 +436,16 @@ srs_error_t SrsRtpRawPayload::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpRawPayload::copy()
{
SrsRtpRawPayload* cp = new SrsRtpRawPayload();
cp->payload = payload;
cp->nn_payload = nn_payload;
return cp;
}
SrsRtpRawNALUs::SrsRtpRawNALUs()
{
cursor = 0;
@ -582,6 +568,22 @@ srs_error_t SrsRtpRawNALUs::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpRawNALUs::copy()
{
SrsRtpRawNALUs* cp = new SrsRtpRawNALUs();
cp->nn_bytes = nn_bytes;
cp->cursor = cursor;
int nn_nalus = (int)nalus.size();
for (int i = 0; i < nn_nalus; i++) {
SrsSample* p = nalus[i];
cp->nalus.push_back(p->copy());
}
return cp;
}
SrsRtpSTAPPayload::SrsRtpSTAPPayload()
{
nri = (SrsAvcNaluType)0;
@ -706,6 +708,21 @@ srs_error_t SrsRtpSTAPPayload::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpSTAPPayload::copy()
{
SrsRtpSTAPPayload* cp = new SrsRtpSTAPPayload();
cp->nri = nri;
int nn_nalus = (int)nalus.size();
for (int i = 0; i < nn_nalus; i++) {
SrsSample* p = nalus[i];
cp->nalus.push_back(p->copy());
}
return cp;
}
SrsRtpFUAPayload::SrsRtpFUAPayload()
{
start = end = false;
@ -800,6 +817,24 @@ srs_error_t SrsRtpFUAPayload::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpFUAPayload::copy()
{
SrsRtpFUAPayload* cp = new SrsRtpFUAPayload();
cp->nri = nri;
cp->start = start;
cp->end = end;
cp->nalu_type = nalu_type;
int nn_nalus = (int)nalus.size();
for (int i = 0; i < nn_nalus; i++) {
SrsSample* p = nalus[i];
cp->nalus.push_back(p->copy());
}
return cp;
}
SrsRtpFUAPayload2::SrsRtpFUAPayload2()
{
start = end = false;
@ -877,3 +912,17 @@ srs_error_t SrsRtpFUAPayload2::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpFUAPayload2::copy()
{
SrsRtpFUAPayload2* cp = new SrsRtpFUAPayload2();
cp->nri = nri;
cp->start = start;
cp->end = end;
cp->nalu_type = nalu_type;
cp->payload = payload;
cp->size = size;
return cp;
}

@ -33,8 +33,11 @@
class SrsRtpPacket2;
const int kRtpHeaderFixedSize = 12;
const uint8_t kRtpMarker = 0x80;
// The RTP packet max size, should never exceed this size.
const int kRtpPacketSize = 1500;
const int kRtpHeaderFixedSize = 12;
const uint8_t kRtpMarker = 0x80;
// H.264 nalu header type mask.
const uint8_t kNalTypeMask = 0x1F;
@ -53,11 +56,11 @@ const uint8_t kEnd = 0x40; // Fu-header end bit
class SrsBuffer;
class SrsRtpRawPayload;
class SrsRtpFUAPayload2;
class SrsSharedPtrMessage;
class SrsRtpHeader
{
private:
bool padding;
uint8_t padding_length;
bool extension;
uint8_t cc;
@ -72,7 +75,6 @@ private:
public:
SrsRtpHeader();
virtual ~SrsRtpHeader();
void reset();
public:
virtual srs_error_t decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
@ -88,9 +90,17 @@ public:
uint32_t get_timestamp() const;
void set_ssrc(uint32_t v);
uint32_t get_ssrc() const;
void set_padding(bool v);
void set_padding_length(uint8_t v);
uint8_t get_padding_length() const;
void set_padding(uint8_t v);
uint8_t get_padding() const;
};
class ISrsRtpPayloader : public ISrsCodec
{
public:
ISrsRtpPayloader();
virtual ~ISrsRtpPayloader();
public:
virtual ISrsRtpPayloader* copy() = 0;
};
class ISrsRtpPacketDecodeHandler
@ -100,7 +110,7 @@ public:
virtual ~ISrsRtpPacketDecodeHandler();
public:
// We don't know the actual payload, so we depends on external handler.
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload) = 0;
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) = 0;
};
class SrsRtpPacket2
@ -109,21 +119,19 @@ class SrsRtpPacket2
public:
// TODO: FIXME: Rename to header.
SrsRtpHeader rtp_header;
ISrsCodec* payload;
// TODO: FIXME: Merge into rtp_header.
int padding;
// Decoder helper.
ISrsRtpPayloader* payload;
// Helper fields.
public:
// The first byte as nalu type, for video decoder only.
SrsAvcNaluType nalu_type;
// The original bytes for decoder only, we will free it.
char* original_bytes;
// The original shared message, all RTP packets can refer to its data.
SrsSharedPtrMessage* shared_msg;
// The frame type, for RTMP bridger or SFU source.
SrsFrameType frame_type;
// Fast cache for performance.
private:
// Cache frequently used payload for performance.
SrsRtpRawPayload* cache_raw;
SrsRtpFUAPayload2* cache_fua;
int cache_payload;
// The cached payload size for packet.
int cached_payload_size;
// The helper handler for decoder, use RAW payload if NULL.
ISrsRtpPacketDecodeHandler* decode_handler;
public:
@ -134,14 +142,12 @@ public:
void set_padding(int size);
// Increase the padding of RTP packet.
void add_padding(int size);
// Reset RTP packet.
void reset();
// Reuse the cached raw message as payload.
SrsRtpRawPayload* reuse_raw();
// Reuse the cached fua message as payload.
SrsRtpFUAPayload2* reuse_fua();
// Set the decode handler.
void set_decode_handler(ISrsRtpPacketDecodeHandler* h);
// Whether the packet is Audio packet.
bool is_audio();
// Copy the RTP packet.
SrsRtpPacket2* copy();
// interface ISrsEncoder
public:
virtual int nb_bytes();
@ -150,7 +156,7 @@ public:
};
// Single payload data.
class SrsRtpRawPayload : public ISrsCodec
class SrsRtpRawPayload : public ISrsRtpPayloader
{
public:
// The RAW payload, directly point to the shared memory.
@ -160,15 +166,16 @@ public:
public:
SrsRtpRawPayload();
virtual ~SrsRtpRawPayload();
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
// Multiple NALUs, automatically insert 001 between NALUs.
class SrsRtpRawNALUs : public ISrsCodec
class SrsRtpRawNALUs : public ISrsRtpPayloader
{
private:
// We will manage the samples, but the sample itself point to the shared memory.
@ -184,15 +191,16 @@ public:
uint8_t skip_first_byte();
// We will manage the returned samples, if user want to manage it, please copy it.
srs_error_t read_samples(std::vector<SrsSample*>& samples, int packet_size);
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
// STAP-A, for multiple NALUs.
class SrsRtpSTAPPayload : public ISrsCodec
class SrsRtpSTAPPayload : public ISrsRtpPayloader
{
public:
// The NRI in NALU type.
@ -206,16 +214,17 @@ public:
public:
SrsSample* get_sps();
SrsSample* get_pps();
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
// FU-A, for one NALU with multiple fragments.
// With more than one payload.
class SrsRtpFUAPayload : public ISrsCodec
class SrsRtpFUAPayload : public ISrsRtpPayloader
{
public:
// The NRI in NALU type.
@ -230,16 +239,17 @@ public:
public:
SrsRtpFUAPayload();
virtual ~SrsRtpFUAPayload();
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
// FU-A, for one NALU with multiple fragments.
// With only one payload.
class SrsRtpFUAPayload2 : public ISrsCodec
class SrsRtpFUAPayload2 : public ISrsRtpPayloader
{
public:
// The NRI in NALU type.
@ -254,11 +264,12 @@ public:
public:
SrsRtpFUAPayload2();
virtual ~SrsRtpFUAPayload2();
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
#endif

Loading…
Cancel
Save