|
|
|
@ -44,6 +44,7 @@ using namespace std;
|
|
|
|
|
#include <srs_rtmp_msg_array.hpp>
|
|
|
|
|
#include <srs_app_hds.hpp>
|
|
|
|
|
#include <srs_app_statistic.hpp>
|
|
|
|
|
#include <srs_core_autofree.hpp>
|
|
|
|
|
|
|
|
|
|
#define CONST_MAX_JITTER_MS 500
|
|
|
|
|
#define DEFAULT_FRAME_TIME_MS 40
|
|
|
|
@ -768,10 +769,62 @@ void SrsSource::destroy()
|
|
|
|
|
pool.clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsMixQueue::SrsMixQueue()
|
|
|
|
|
{
|
|
|
|
|
nb_videos = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsMixQueue::~SrsMixQueue()
|
|
|
|
|
{
|
|
|
|
|
clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void SrsMixQueue::clear()
|
|
|
|
|
{
|
|
|
|
|
std::multimap<int64_t, SrsSharedPtrMessage*>::iterator it;
|
|
|
|
|
for (it = msgs.begin(); it != msgs.end(); ++it) {
|
|
|
|
|
SrsSharedPtrMessage* msg = it->second;
|
|
|
|
|
srs_freep(msg);
|
|
|
|
|
}
|
|
|
|
|
msgs.clear();
|
|
|
|
|
|
|
|
|
|
nb_videos = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void SrsMixQueue::push(SrsSharedPtrMessage* msg)
|
|
|
|
|
{
|
|
|
|
|
msgs.insert(std::make_pair(msg->timestamp, msg));
|
|
|
|
|
|
|
|
|
|
if (msg->is_video()) {
|
|
|
|
|
nb_videos++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsSharedPtrMessage* SrsMixQueue::pop()
|
|
|
|
|
{
|
|
|
|
|
// always keep 2+ videos
|
|
|
|
|
if (nb_videos < 2) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// pop the first msg.
|
|
|
|
|
std::multimap<int64_t, SrsSharedPtrMessage*>::iterator it = msgs.begin();
|
|
|
|
|
SrsSharedPtrMessage* msg = it->second;
|
|
|
|
|
msgs.erase(it);
|
|
|
|
|
|
|
|
|
|
if (msg->is_video()) {
|
|
|
|
|
nb_videos--;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return msg;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsSource::SrsSource()
|
|
|
|
|
{
|
|
|
|
|
_req = NULL;
|
|
|
|
|
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
|
|
|
|
|
mix_correct = false;
|
|
|
|
|
mix_queue = new SrsMixQueue();
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_AUTO_HLS
|
|
|
|
|
hls = new SrsHls();
|
|
|
|
@ -818,6 +871,7 @@ SrsSource::~SrsSource()
|
|
|
|
|
forwarders.clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_freep(mix_queue);
|
|
|
|
|
srs_freep(cache_metadata);
|
|
|
|
|
srs_freep(cache_sh_video);
|
|
|
|
|
srs_freep(cache_sh_audio);
|
|
|
|
@ -878,6 +932,7 @@ int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* h
|
|
|
|
|
publish_edge->set_queue_size(queue_size);
|
|
|
|
|
|
|
|
|
|
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost);
|
|
|
|
|
mix_correct = _srs_config->get_mix_correct(_req->vhost);
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -973,6 +1028,25 @@ int SrsSource::on_reload_vhost_time_jitter(string vhost)
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsSource::on_reload_vhost_mix_correct(string vhost)
|
|
|
|
|
{
|
|
|
|
|
int ret = ERROR_SUCCESS;
|
|
|
|
|
|
|
|
|
|
if (_req->vhost != vhost) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool v = _srs_config->get_mix_correct(_req->vhost);
|
|
|
|
|
|
|
|
|
|
// when changed, clear the mix queue.
|
|
|
|
|
if (v != mix_correct) {
|
|
|
|
|
mix_queue->clear();
|
|
|
|
|
}
|
|
|
|
|
mix_correct = v;
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsSource::on_reload_vhost_forward(string vhost)
|
|
|
|
|
{
|
|
|
|
|
int ret = ERROR_SUCCESS;
|
|
|
|
@ -1330,10 +1404,23 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
srs_error("initialize the audio failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
srs_verbose("initialize shared ptr audio success.");
|
|
|
|
|
srs_info("Audio dts=%"PRId64", size=%d", msg.timestamp, msg.size);
|
|
|
|
|
|
|
|
|
|
if (!mix_correct) {
|
|
|
|
|
return on_audio_imp(&msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return do_mix_correct(&msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
|
|
|
|
|
{
|
|
|
|
|
int ret = ERROR_SUCCESS;
|
|
|
|
|
|
|
|
|
|
srs_info("Audio dts=%"PRId64", size=%d", msg->timestamp, msg->size);
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_AUTO_HLS
|
|
|
|
|
if ((ret = hls->on_audio(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
// apply the error strategy for hls.
|
|
|
|
|
// @see https://github.com/winlinvip/simple-rtmp-server/issues/264
|
|
|
|
|
std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost);
|
|
|
|
@ -1347,7 +1434,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
ret = ERROR_SUCCESS;
|
|
|
|
|
} else if (hls_error_strategy == SRS_CONF_DEFAULT_HLS_ON_ERROR_CONTINUE) {
|
|
|
|
|
// compare the sequence header with audio, continue when it's actually an sequence header.
|
|
|
|
|
if (ret == ERROR_HLS_DECODE_ERROR && cache_sh_audio && cache_sh_audio->size == msg.size) {
|
|
|
|
|
if (ret == ERROR_HLS_DECODE_ERROR && cache_sh_audio && cache_sh_audio->size == msg->size) {
|
|
|
|
|
srs_warn("the audio is actually a sequence header, ignore this packet.");
|
|
|
|
|
ret = ERROR_SUCCESS;
|
|
|
|
|
} else {
|
|
|
|
@ -1362,7 +1449,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_AUTO_DVR
|
|
|
|
|
if ((ret = dvr->on_audio(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = dvr->on_audio(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret);
|
|
|
|
|
|
|
|
|
|
// unpublish, ignore ret.
|
|
|
|
@ -1374,7 +1461,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_AUTO_HDS
|
|
|
|
|
if ((ret = hds->on_audio(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = hds->on_audio(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_warn("hds process audio message failed, ignore and disable dvr. ret=%d", ret);
|
|
|
|
|
|
|
|
|
|
// unpublish, ignore ret.
|
|
|
|
@ -1390,7 +1477,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
SrsConsumer** pconsumer = consumers.data();
|
|
|
|
|
for (int i = 0; i < nb_consumers; i++) {
|
|
|
|
|
SrsConsumer* consumer = pconsumer[i];
|
|
|
|
|
if ((ret = consumer->enqueue(&msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = consumer->enqueue(msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("dispatch the audio failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -1403,7 +1490,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
std::vector<SrsForwarder*>::iterator it;
|
|
|
|
|
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
|
|
|
|
|
SrsForwarder* forwarder = *it;
|
|
|
|
|
if ((ret = forwarder->on_audio(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = forwarder->on_audio(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("forwarder process audio message failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -1413,10 +1500,10 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
// cache the sequence header of aac, or first packet of mp3.
|
|
|
|
|
// for example, the mp3 is used for hls to write the "right" audio codec.
|
|
|
|
|
// TODO: FIXME: to refine the stream info system.
|
|
|
|
|
bool is_aac_sequence_header = SrsFlvCodec::audio_is_sequence_header(msg.payload, msg.size);
|
|
|
|
|
bool is_aac_sequence_header = SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size);
|
|
|
|
|
if (is_aac_sequence_header || !cache_sh_audio) {
|
|
|
|
|
srs_freep(cache_sh_audio);
|
|
|
|
|
cache_sh_audio = msg.copy();
|
|
|
|
|
cache_sh_audio = msg->copy();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cache the sequence header if aac
|
|
|
|
@ -1425,7 +1512,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
// parse detail audio codec
|
|
|
|
|
SrsAvcAacCodec codec;
|
|
|
|
|
SrsCodecSample sample;
|
|
|
|
|
if ((ret = codec.audio_aac_demux(msg.payload, msg.size, &sample)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = codec.audio_aac_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("source codec demux audio failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -1442,7 +1529,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
srs_trace("%dB audio sh, "
|
|
|
|
|
"codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), "
|
|
|
|
|
"flv(%dbits, %dchannels, %dHZ)",
|
|
|
|
|
msg.size, codec.audio_codec_id,
|
|
|
|
|
msg->size, codec.audio_codec_id,
|
|
|
|
|
srs_codec_aac_object2str(codec.aac_object).c_str(), codec.aac_channels,
|
|
|
|
|
codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate],
|
|
|
|
|
flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type],
|
|
|
|
@ -1451,7 +1538,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cache the last gop packets
|
|
|
|
|
if ((ret = gop_cache->cache(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("shrink gop cache failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -1460,10 +1547,10 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio)
|
|
|
|
|
// if atc, update the sequence header to abs time.
|
|
|
|
|
if (atc) {
|
|
|
|
|
if (cache_sh_audio) {
|
|
|
|
|
cache_sh_audio->timestamp = msg.timestamp;
|
|
|
|
|
cache_sh_audio->timestamp = msg->timestamp;
|
|
|
|
|
}
|
|
|
|
|
if (cache_metadata) {
|
|
|
|
|
cache_metadata->timestamp = msg.timestamp;
|
|
|
|
|
cache_metadata->timestamp = msg->timestamp;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1481,10 +1568,23 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
srs_error("initialize the video failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
srs_verbose("initialize shared ptr video success.");
|
|
|
|
|
srs_info("Video dts=%"PRId64", size=%d", msg.timestamp, msg.size);
|
|
|
|
|
|
|
|
|
|
if (!mix_correct) {
|
|
|
|
|
return on_video_imp(&msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return do_mix_correct(&msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
|
|
|
|
|
{
|
|
|
|
|
int ret = ERROR_SUCCESS;
|
|
|
|
|
|
|
|
|
|
srs_info("Video dts=%"PRId64", size=%d", msg->timestamp, msg->size);
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_AUTO_HLS
|
|
|
|
|
if ((ret = hls->on_video(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = hls->on_video(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
// apply the error strategy for hls.
|
|
|
|
|
// @see https://github.com/winlinvip/simple-rtmp-server/issues/264
|
|
|
|
|
std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost);
|
|
|
|
@ -1498,7 +1598,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
ret = ERROR_SUCCESS;
|
|
|
|
|
} else if (hls_error_strategy == SRS_CONF_DEFAULT_HLS_ON_ERROR_CONTINUE) {
|
|
|
|
|
// compare the sequence header with video, continue when it's actually an sequence header.
|
|
|
|
|
if (ret == ERROR_HLS_DECODE_ERROR && cache_sh_video && cache_sh_video->size == msg.size) {
|
|
|
|
|
if (ret == ERROR_HLS_DECODE_ERROR && cache_sh_video && cache_sh_video->size == msg->size) {
|
|
|
|
|
srs_warn("the video is actually a sequence header, ignore this packet.");
|
|
|
|
|
ret = ERROR_SUCCESS;
|
|
|
|
|
} else {
|
|
|
|
@ -1513,7 +1613,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_AUTO_DVR
|
|
|
|
|
if ((ret = dvr->on_video(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);
|
|
|
|
|
|
|
|
|
|
// unpublish, ignore ret.
|
|
|
|
@ -1525,7 +1625,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_AUTO_HDS
|
|
|
|
|
if ((ret = hds->on_video(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = hds->on_video(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret);
|
|
|
|
|
|
|
|
|
|
// unpublish, ignore ret.
|
|
|
|
@ -1539,7 +1639,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
if (true) {
|
|
|
|
|
for (int i = 0; i < (int)consumers.size(); i++) {
|
|
|
|
|
SrsConsumer* consumer = consumers.at(i);
|
|
|
|
|
if ((ret = consumer->enqueue(&msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = consumer->enqueue(msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("dispatch the video failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -1552,7 +1652,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
std::vector<SrsForwarder*>::iterator it;
|
|
|
|
|
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
|
|
|
|
|
SrsForwarder* forwarder = *it;
|
|
|
|
|
if ((ret = forwarder->on_video(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("forwarder process video message failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -1561,14 +1661,14 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
|
|
|
|
|
// cache the sequence header if h264
|
|
|
|
|
// donot cache the sequence header to gop_cache, return here.
|
|
|
|
|
if (SrsFlvCodec::video_is_sequence_header(msg.payload, msg.size)) {
|
|
|
|
|
if (SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size)) {
|
|
|
|
|
srs_freep(cache_sh_video);
|
|
|
|
|
cache_sh_video = msg.copy();
|
|
|
|
|
cache_sh_video = msg->copy();
|
|
|
|
|
|
|
|
|
|
// parse detail audio codec
|
|
|
|
|
SrsAvcAacCodec codec;
|
|
|
|
|
SrsCodecSample sample;
|
|
|
|
|
if ((ret = codec.video_avc_demux(msg.payload, msg.size, &sample)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("source codec demux video failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -1581,7 +1681,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
|
|
|
|
|
srs_trace("%dB video sh, "
|
|
|
|
|
"codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %dfps, %ds)",
|
|
|
|
|
msg.size, codec.video_codec_id,
|
|
|
|
|
msg->size, codec.video_codec_id,
|
|
|
|
|
srs_codec_avc_profile2str(codec.avc_profile).c_str(),
|
|
|
|
|
srs_codec_avc_level2str(codec.avc_level).c_str(), codec.width, codec.height,
|
|
|
|
|
codec.video_data_rate / 1000, codec.frame_rate, codec.duration);
|
|
|
|
@ -1589,7 +1689,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cache the last gop packets
|
|
|
|
|
if ((ret = gop_cache->cache(&msg)) != ERROR_SUCCESS) {
|
|
|
|
|
if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("gop cache msg failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
@ -1598,16 +1698,39 @@ int SrsSource::on_video(SrsCommonMessage* shared_video)
|
|
|
|
|
// if atc, update the sequence header to abs time.
|
|
|
|
|
if (atc) {
|
|
|
|
|
if (cache_sh_video) {
|
|
|
|
|
cache_sh_video->timestamp = msg.timestamp;
|
|
|
|
|
cache_sh_video->timestamp = msg->timestamp;
|
|
|
|
|
}
|
|
|
|
|
if (cache_metadata) {
|
|
|
|
|
cache_metadata->timestamp = msg.timestamp;
|
|
|
|
|
cache_metadata->timestamp = msg->timestamp;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsSource::do_mix_correct(SrsSharedPtrMessage* msg)
|
|
|
|
|
{
|
|
|
|
|
int ret = ERROR_SUCCESS;
|
|
|
|
|
|
|
|
|
|
// insert msg to the queue.
|
|
|
|
|
mix_queue->push(msg->copy());
|
|
|
|
|
|
|
|
|
|
// fetch someone from mix queue.
|
|
|
|
|
SrsSharedPtrMessage* m = mix_queue->pop();
|
|
|
|
|
if (!m) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
SrsAutoFree(SrsSharedPtrMessage, m);
|
|
|
|
|
|
|
|
|
|
// consume the monotonically increase message.
|
|
|
|
|
if (m->is_audio()) {
|
|
|
|
|
return on_audio_imp(m);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_assert(m->is_video());
|
|
|
|
|
return on_video_imp(m);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsSource::on_aggregate(SrsCommonMessage* msg)
|
|
|
|
|
{
|
|
|
|
|
int ret = ERROR_SUCCESS;
|
|
|
|
@ -1748,6 +1871,9 @@ int SrsSource::on_publish()
|
|
|
|
|
// save its id to srouce id.
|
|
|
|
|
on_source_id_changed(_srs_context->get_id());
|
|
|
|
|
|
|
|
|
|
// reset the mix queue.
|
|
|
|
|
mix_queue->clear();
|
|
|
|
|
|
|
|
|
|
// create forwarders
|
|
|
|
|
if ((ret = create_forwarders()) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("create forwarders failed. ret=%d", ret);
|
|
|
|
|