merge from 2.0

pull/556/head
winlin 10 years ago
commit a7c4b786cf

@ -1045,6 +1045,7 @@ Winlin
[bug #367]: https://github.com/simple-rtmp-server/srs/issues/367
[bug #471]: https://github.com/simple-rtmp-server/srs/issues/471
[bug #380]: https://github.com/simple-rtmp-server/srs/issues/380
[bug #474]: https://github.com/simple-rtmp-server/srs/issues/474
[bug #475]: https://github.com/simple-rtmp-server/srs/issues/475
[bug #458]: https://github.com/simple-rtmp-server/srs/issues/458
[bug #454]: https://github.com/simple-rtmp-server/srs/issues/454

@ -498,6 +498,14 @@ vhost min.delay.com {
}
}
# whether disable the sps parse, for the resolution of video.
vhost no.parse.sps.com {
# @see publish.srs.com
publish {
parse_sps on;
}
}
# the vhost to control the stream delivery feature
vhost stream.control.com {
# @see scope.vhost.srs.com
@ -546,6 +554,11 @@ vhost publish.srs.com {
# the normal packet timeout in ms for encoder.
# default: 5000
normal_timeout 7000;
# whether parse the sps when publish stream.
# we can got the resolution of video for stat api.
# but we may failed to cause publish failed.
# default: on
parse_sps on;
}
}

@ -738,6 +738,9 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
srs_error("reload never supports mode changed. ret=%d", ret);
return ret;
}
// the auto reload configs:
// publish.parse_sps
// ENABLED => ENABLED (modified)
if (get_vhost_enabled(new_vhost) && get_vhost_enabled(old_vhost)) {
@ -3096,7 +3099,7 @@ int SrsConfig::check_config()
} else if (n == "publish") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name.c_str();
if (m != "mr" && m != "mr_latency" && m != "firstpkt_timeout" && m != "normal_timeout") {
if (m != "mr" && m != "mr_latency" && m != "firstpkt_timeout" && m != "normal_timeout" && m != "parse_sps") {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported vhost publish directive %s, ret=%d", m.c_str(), ret);
return ret;
@ -3841,6 +3844,29 @@ int SrsConfig::get_chunk_size(string vhost)
return ::atoi(conf->arg0().c_str());
}
bool SrsConfig::get_parse_sps(string vhost)
{
static bool DEFAULT = true;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("publish");
if (!conf) {
return DEFAULT;
}
conf = conf->get("parse_sps");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
bool SrsConfig::get_mr_enabled(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);

@ -603,6 +603,10 @@ public:
* @remark, default 60000.
*/
virtual int get_chunk_size(std::string vhost);
/**
* whether parse the sps when publish stream to SRS.
*/
virtual bool get_parse_sps(std::string vhost);
/**
* whether mr is enabled for vhost.
* @param vhost, the vhost to get the mr.

@ -476,6 +476,8 @@ void SrsEdgeForwarder::stop()
close_underlayer_socket();
queue->clear();
srs_freep(client);
srs_freep(io);
kbps->set_io(NULL, NULL);

@ -1252,7 +1252,7 @@ int SrsHls::initialize(SrsSource* s, ISrsHlsHandler* h)
return ret;
}
int SrsHls::on_publish(SrsRequest* req)
int SrsHls::on_publish(SrsRequest* req, bool fetch_sequence_header)
{
int ret = ERROR_SUCCESS;
@ -1282,12 +1282,16 @@ int SrsHls::on_publish(SrsRequest* req)
// ok, the hls can be dispose, or need to be dispose.
hls_can_dispose = true;
// notice the source to get the cached sequence header.
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
if ((ret = source->on_hls_start()) != ERROR_SUCCESS) {
srs_error("callback source hls start failed. ret=%d", ret);
return ret;
// when publish, don't need to fetch sequence header, which is old and maybe corrupt.
// when reload, we must fetch the sequence header from source cache.
if (fetch_sequence_header) {
// notice the source to get the cached sequence header.
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
if ((ret = source->on_hls_start()) != ERROR_SUCCESS) {
srs_error("callback source hls start failed. ret=%d", ret);
return ret;
}
}
return ret;
@ -1391,7 +1395,7 @@ int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio)
return ret;
}
int SrsHls::on_video(SrsSharedPtrMessage* shared_video)
int SrsHls::on_video(SrsSharedPtrMessage* shared_video, bool is_sps_pps)
{
int ret = ERROR_SUCCESS;
@ -1405,6 +1409,12 @@ int SrsHls::on_video(SrsSharedPtrMessage* shared_video)
SrsSharedPtrMessage* video = shared_video->copy();
SrsAutoFree(SrsSharedPtrMessage, video);
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/simple-rtmp-server/srs/issues/474
if (is_sps_pps) {
codec->avc_parse_sps = _srs_config->get_parse_sps(_req->vhost);
}
sample->clear();
if ((ret = codec->video_avc_demux(video->payload, video->size, sample)) != ERROR_SUCCESS) {
srs_error("hls codec demux video failed. ret=%d", ret);

@ -424,10 +424,11 @@ public:
*/
virtual int initialize(SrsSource* s, ISrsHlsHandler* h);
/**
* publish stream event, continue to write the m3u8,
* for the muxer object not destroyed.
*/
virtual int on_publish(SrsRequest* req);
* publish stream event, continue to write the m3u8,
* for the muxer object not destroyed.
* @param fetch_sequence_header whether fetch sequence from source.
*/
virtual int on_publish(SrsRequest* req, bool fetch_sequence_header);
/**
* the unpublish event, only close the muxer, donot destroy the
* muxer, for when we continue to publish, the m3u8 will continue.
@ -443,10 +444,11 @@ public:
*/
virtual int on_audio(SrsSharedPtrMessage* shared_audio);
/**
* mux the video packets to ts.
* @param shared_video, directly ptr, copy it if need to save it.
*/
virtual int on_video(SrsSharedPtrMessage* shared_video);
* mux the video packets to ts.
* @param shared_video, directly ptr, copy it if need to save it.
* @param is_sps_pps whether the video is h.264 sps/pps.
*/
virtual int on_video(SrsSharedPtrMessage* shared_video, bool is_sps_pps);
private:
virtual void hls_show_mux_log();
};

@ -816,7 +816,14 @@ int SrsRtmpConn::publishing(SrsSource* source)
// stop isolate recv thread
trd.stop();
}
// whatever the acquire publish, always release publish.
// when the acquire error in the midlle-way, the publish state changed,
// but failed, so we must cleanup it.
// @see https://github.com/simple-rtmp-server/srs/issues/474
// @remark when stream is busy, should never release it.
if (ret != ERROR_SYSTEM_STREAM_BUSY) {
release_publish(source, vhost_is_edge);
}
@ -926,10 +933,12 @@ int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
if (is_edge) {
if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
srs_error("notice edge start publish stream failed. ret=%d", ret);
return ret;
}
} else {
if ((ret = source->on_publish()) != ERROR_SUCCESS) {
srs_error("notify publish failed. ret=%d", ret);
return ret;
}
}

@ -1161,7 +1161,7 @@ int SrsSource::on_reload_vhost_hls(string vhost)
#ifdef SRS_AUTO_HLS
hls->on_unpublish();
if ((ret = hls->on_publish(_req)) != ERROR_SUCCESS) {
if ((ret = hls->on_publish(_req, true)) != ERROR_SUCCESS) {
srs_error("hls publish failed. ret=%d", ret);
return ret;
}
@ -1297,7 +1297,7 @@ int SrsSource::on_hls_start()
// when reload to start hls, hls will never get the sequence header in stream,
// use the SrsSource.on_hls_start to push the sequence header to HLS.
// TODO: maybe need to decode the metadata?
if (cache_sh_video && (ret = hls->on_video(cache_sh_video)) != ERROR_SUCCESS) {
if (cache_sh_video && (ret = hls->on_video(cache_sh_video, true)) != ERROR_SUCCESS) {
srs_error("hls process video sequence header message failed. ret=%d", ret);
return ret;
}
@ -1558,6 +1558,44 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
bool is_aac_sequence_header = SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size);
bool is_sequence_header = is_aac_sequence_header;
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && cache_sh_audio && _srs_config->get_reduce_sequence_header(_req->vhost)) {
if (cache_sh_audio->size == msg->size) {
drop_for_reduce = srs_bytes_equals(cache_sh_audio->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh audio, size=%d", msg->size);
}
}
// cache the sequence header if aac
// donot cache the sequence header to gop_cache, return here.
if (is_aac_sequence_header) {
// parse detail audio codec
SrsAvcAacCodec codec;
SrsCodecSample sample;
if ((ret = codec.audio_aac_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
srs_error("source codec demux audio failed. ret=%d", ret);
return ret;
}
static int flv_sample_sizes[] = {8, 16, 0};
static int flv_sound_types[] = {1, 2, 0};
// when got audio stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_audio_info(_req, SrsCodecAudioAAC, sample.sound_rate, sample.sound_type, codec.aac_object)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("%dB audio sh, codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), "
"flv(%dbits, %dchannels, %dHZ)",
msg->size, codec.audio_codec_id,
srs_codec_aac_object2str(codec.aac_object).c_str(), codec.aac_channels,
codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate],
flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type],
flv_sample_rates[sample.sound_rate]);
}
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) {
// apply the error strategy for hls.
@ -1611,13 +1649,6 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
#endif
// copy to all consumer
bool drop_for_reduce = false;
if (is_sequence_header && cache_sh_audio && _srs_config->get_reduce_sequence_header(_req->vhost)) {
if (cache_sh_audio->size == msg->size) {
drop_for_reduce = srs_bytes_equals(cache_sh_audio->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh audio, size=%d", msg->size);
}
}
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
@ -1648,35 +1679,9 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
srs_freep(cache_sh_audio);
cache_sh_audio = msg->copy();
}
// cache the sequence header if aac
// donot cache the sequence header to gop_cache, return here.
if (is_aac_sequence_header) {
// parse detail audio codec
SrsAvcAacCodec codec;
SrsCodecSample sample;
if ((ret = codec.audio_aac_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
srs_error("source codec demux audio failed. ret=%d", ret);
return ret;
}
static int flv_sample_sizes[] = {8, 16, 0};
static int flv_sound_types[] = {1, 2, 0};
// when got audio stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_audio_info(_req, SrsCodecAudioAAC, sample.sound_rate, sample.sound_type, codec.aac_object)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("%dB audio sh, "
"codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), "
"flv(%dbits, %dchannels, %dHZ)",
msg->size, codec.audio_codec_id,
srs_codec_aac_object2str(codec.aac_object).c_str(), codec.aac_channels,
codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate],
flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type],
flv_sample_rates[sample.sound_rate]);
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
return ret;
}
@ -1768,8 +1773,49 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
bool is_sequence_header = SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size);
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && cache_sh_video && _srs_config->get_reduce_sequence_header(_req->vhost)) {
if (cache_sh_video->size == msg->size) {
drop_for_reduce = srs_bytes_equals(cache_sh_video->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh video, size=%d", msg->size);
}
}
// cache the sequence header if h264
// donot cache the sequence header to gop_cache, return here.
if (is_sequence_header) {
srs_freep(cache_sh_video);
cache_sh_video = msg->copy();
// parse detail audio codec
SrsAvcAacCodec codec;
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/simple-rtmp-server/srs/issues/474
codec.avc_parse_sps = _srs_config->get_parse_sps(_req->vhost);
SrsCodecSample sample;
if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
srs_error("source codec demux video failed. ret=%d", ret);
return ret;
}
// when got video stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_video_info(_req, SrsCodecVideoAVC, codec.avc_profile, codec.avc_level, codec.width, codec.height)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %dfps, %ds)",
msg->size, codec.video_codec_id,
srs_codec_avc_profile2str(codec.avc_profile).c_str(),
srs_codec_avc_level2str(codec.avc_level).c_str(), codec.width, codec.height,
codec.video_data_rate / 1000, codec.frame_rate, codec.duration);
}
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_video(msg)) != ERROR_SUCCESS) {
if ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {
// apply the error strategy for hls.
// @see https://github.com/simple-rtmp-server/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost);
@ -1821,13 +1867,6 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
#endif
// copy to all consumer
bool drop_for_reduce = false;
if (is_sequence_header && cache_sh_video && _srs_config->get_reduce_sequence_header(_req->vhost)) {
if (cache_sh_video->size == msg->size) {
drop_for_reduce = srs_bytes_equals(cache_sh_video->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh video, size=%d", msg->size);
}
}
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
@ -1850,33 +1889,9 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
}
}
}
// cache the sequence header if h264
// donot cache the sequence header to gop_cache, return here.
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
srs_freep(cache_sh_video);
cache_sh_video = msg->copy();
// parse detail audio codec
SrsAvcAacCodec codec;
SrsCodecSample sample;
if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
srs_error("source codec demux video failed. ret=%d", ret);
return ret;
}
// when got video stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_video_info(_req, SrsCodecVideoAVC, codec.avc_profile, codec.avc_level, codec.width, codec.height)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("%dB video sh, "
"codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %dfps, %ds)",
msg->size, codec.video_codec_id,
srs_codec_avc_profile2str(codec.avc_profile).c_str(),
srs_codec_avc_level2str(codec.avc_level).c_str(), codec.width, codec.height,
codec.video_data_rate / 1000, codec.frame_rate, codec.duration);
return ret;
}
@ -2044,7 +2059,7 @@ int SrsSource::on_publish()
// TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_publish(_req)) != ERROR_SUCCESS) {
if ((ret = hls->on_publish(_req, false)) != ERROR_SUCCESS) {
srs_error("start hls failed. ret=%d", ret);
return ret;
}
@ -2079,6 +2094,7 @@ int SrsSource::on_publish()
}
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_publish(_req, _source_id);
return ret;
}

@ -191,6 +191,10 @@ private:
* if no iframe found, clear it.
*/
virtual void shrink();
public:
/**
* clear all messages in queue.
*/
virtual void clear();
};

@ -66,6 +66,7 @@ namespace internal {
really_terminated = true;
_cid = -1;
_joinable = joinable;
disposed = false;
// in start(), the thread cycle method maybe stop and remove the thread itself,
// and the thread start() is waiting for the _cid, and segment fault then.
@ -115,38 +116,15 @@ namespace internal {
void SrsThread::stop()
{
if (tid) {
loop = false;
// the interrupt will cause the socket to read/write error,
// which will terminate the cycle thread.
st_thread_interrupt(tid);
// when joinable, wait util quit.
if (_joinable) {
// wait the thread to exit.
int ret = st_thread_join(tid, NULL);
if (ret) {
srs_warn("core: ignore join thread failed.");
}
}
// wait the thread actually terminated.
// sometimes the thread join return -1, for example,
// when thread use st_recvfrom, the thread join return -1.
// so here, we use a variable to ensure the thread stopped.
// @remark even the thread not joinable, we must ensure the thread stopped when stop.
while (!really_terminated) {
st_usleep(10 * 1000);
if (really_terminated) {
break;
}
srs_warn("core: wait thread to actually terminated");
}
tid = NULL;
if (!tid) {
return;
}
loop = false;
dispose();
tid = NULL;
}
bool SrsThread::can_loop()
@ -159,6 +137,42 @@ namespace internal {
loop = false;
}
void SrsThread::dispose()
{
if (disposed) {
return;
}
// the interrupt will cause the socket to read/write error,
// which will terminate the cycle thread.
st_thread_interrupt(tid);
// when joinable, wait util quit.
if (_joinable) {
// wait the thread to exit.
int ret = st_thread_join(tid, NULL);
if (ret) {
srs_warn("core: ignore join thread failed.");
}
}
// wait the thread actually terminated.
// sometimes the thread join return -1, for example,
// when thread use st_recvfrom, the thread join return -1.
// so here, we use a variable to ensure the thread stopped.
// @remark even the thread not joinable, we must ensure the thread stopped when stop.
while (!really_terminated) {
st_usleep(10 * 1000);
if (really_terminated) {
break;
}
srs_warn("core: wait thread to actually terminated");
}
disposed = true;
}
void SrsThread::thread_cycle()
{
int ret = ERROR_SUCCESS;
@ -217,6 +231,9 @@ namespace internal {
handler->on_thread_stop();
srs_info("thread %s cycle finished", _name);
// when thread terminated normally, also disposed.
disposed = true;
}
void* SrsThread::thread_fun(void* arg)

@ -100,6 +100,7 @@ namespace internal {
bool really_terminated;
bool _joinable;
const char* _name;
bool disposed;
private:
ISrsThreadHandler* handler;
int64_t cycle_interval_us;
@ -154,6 +155,7 @@ namespace internal {
*/
virtual void stop_loop();
private:
virtual void dispose();
virtual void thread_cycle();
static void* thread_fun(void* arg);
};

@ -382,6 +382,8 @@ int SrsCodecSample::add_sample_unit(char* bytes, int size)
SrsAvcAacCodec::SrsAvcAacCodec()
{
avc_parse_sps = true;
width = 0;
height = 0;
duration = 0;
@ -939,6 +941,12 @@ int SrsAvcAacCodec::avc_demux_sps_rbsp(char* rbsp, int nb_rbsp)
{
int ret = ERROR_SUCCESS;
// we donot parse the detail of sps.
// @see https://github.com/simple-rtmp-server/srs/issues/474
if (!avc_parse_sps) {
return ret;
}
// reparse the rbsp.
SrsStream stream;
if ((ret = stream.initialize(rbsp, nb_rbsp)) != ERROR_SUCCESS) {

@ -605,6 +605,9 @@ public:
*/
int aac_extra_size;
char* aac_extra_data;
public:
// for sequence header, whether parse the h.264 sps.
bool avc_parse_sps;
public:
SrsAvcAacCodec();
virtual ~SrsAvcAacCodec();

@ -905,7 +905,7 @@ namespace _srs_internal
}
// client c1 time and version
time = ::time(NULL);
time = (int32_t)::time(NULL);
version = 0x80000702; // client c1 version
// generate signature by schema

Loading…
Cancel
Save