diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index 2e1826be2..90d46d8e7 100755 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -93,7 +93,7 @@ vhost __defaultVhost__ { vhost dev { enabled on; gop_cache on; - queue_length 30; + queue_length 10; #forward 127.0.0.1:19350; hls { enabled off; diff --git a/trunk/src/core/srs_core_config.cpp b/trunk/src/core/srs_core_config.cpp index fea6d9dd2..3e487d2b5 100644 --- a/trunk/src/core/srs_core_config.cpp +++ b/trunk/src/core/srs_core_config.cpp @@ -1303,7 +1303,7 @@ double SrsConfig::get_queue_length(string vhost) } conf = conf->get("queue_length"); - if (conf || conf->arg0().empty()) { + if (!conf || conf->arg0().empty()) { return SRS_CONF_DEFAULT_QUEUE_LENGTH; } diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index 332f164f0..025b99642 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -171,6 +171,10 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in } else { count = srs_min(max_count, (int)msgs.size()); } + + if (count <= 0) { + return ret; + } pmsgs = new SrsSharedPtrMessage*[count]; @@ -178,6 +182,9 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in pmsgs[i] = msgs[i]; } + SrsSharedPtrMessage* last = msgs[count - 1]; + av_start_time = last->header.timestamp; + if (count == (int)msgs.size()) { msgs.clear(); } else { @@ -217,15 +224,15 @@ void SrsMessageQueue::shrink() return; } + srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f", + (int)msgs.size(), iframe_index, queue_size_ms / 1000.0); + // remove the first gop from the front for (int i = 0; i < iframe_index; i++) { SrsSharedPtrMessage* msg = msgs[i]; srs_freep(msg); } msgs.erase(msgs.begin(), msgs.begin() + iframe_index); - - srs_trace("shrink the cache queue, " - "size=%d, removed=%d", (int)msgs.size(), iframe_index); } void SrsMessageQueue::clear() @@ -893,9 +900,12 @@ void SrsSource::on_unpublish() { int ret = ERROR_SUCCESS; + double queue_size = config->get_queue_length(req->vhost); + consumer = new SrsConsumer(this); - consumer->set_queue_size(config->get_queue_length(req->vhost)); + consumers.push_back(consumer); + consumer->set_queue_size(queue_size); if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch metadata failed. ret=%d", ret); @@ -919,6 +929,8 @@ void SrsSource::on_unpublish() return ret; } + srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate); + return ret; }