|
|
|
@ -266,17 +266,11 @@ void SrsMessageQueue::set_queue_size(srs_utime_t queue_size)
|
|
|
|
|
max_queue_size = queue_size;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow, bool pass_timestamp)
|
|
|
|
|
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
|
|
|
|
|
msgs.push_back(msg);
|
|
|
|
|
|
|
|
|
|
// For RTC, we never care about the timestamp and duration, so we never shrink queue here,
|
|
|
|
|
// but we will drop messages in each consumer coroutine.
|
|
|
|
|
if (pass_timestamp) {
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (msg->is_av()) {
|
|
|
|
|
if (av_start_time == -1) {
|
|
|
|
@ -285,6 +279,10 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
|
|
|
|
|
|
|
|
|
|
av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (max_queue_size <= 0) {
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while (av_end_time - av_start_time > max_queue_size) {
|
|
|
|
|
// notice the caller queue already overflow and shrinked.
|
|
|
|
@ -298,7 +296,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp)
|
|
|
|
|
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
|
|
|
|
@ -313,13 +311,9 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p
|
|
|
|
|
SrsSharedPtrMessage** omsgs = msgs.data();
|
|
|
|
|
memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*));
|
|
|
|
|
|
|
|
|
|
// For RTC, we enable pass_timestamp mode, which never care about the timestamp and duration,
|
|
|
|
|
// so we do not have to update the start time here.
|
|
|
|
|
if (!pass_timestamp) {
|
|
|
|
|
SrsSharedPtrMessage* last = omsgs[count - 1];
|
|
|
|
|
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsSharedPtrMessage* last = omsgs[count - 1];
|
|
|
|
|
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
if (count >= nb_msgs) {
|
|
|
|
|
// the pmsgs is big enough and clear msgs at most time.
|
|
|
|
|
msgs.clear();
|
|
|
|
@ -446,8 +440,6 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
|
|
|
|
|
mw_duration = 0;
|
|
|
|
|
mw_waiting = false;
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
pass_timestamp = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsConsumer::~SrsConsumer()
|
|
|
|
@ -461,11 +453,6 @@ SrsConsumer::~SrsConsumer()
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void SrsConsumer::enable_pass_timestamp()
|
|
|
|
|
{
|
|
|
|
|
pass_timestamp = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void SrsConsumer::set_queue_size(srs_utime_t queue_size)
|
|
|
|
|
{
|
|
|
|
|
queue->set_queue_size(queue_size);
|
|
|
|
@ -487,33 +474,19 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR
|
|
|
|
|
|
|
|
|
|
SrsSharedPtrMessage* msg = shared_msg->copy();
|
|
|
|
|
|
|
|
|
|
// For RTC, we enable pass_timestamp mode, which never correct or depends on monotonic increasing of
|
|
|
|
|
// timestamp. And in RTC, the audio and video timebase can be different, so we ignore time_jitter here.
|
|
|
|
|
if (!pass_timestamp && !atc) {
|
|
|
|
|
if (!atc) {
|
|
|
|
|
if ((err = jitter->correct(msg, ag)) != srs_success) {
|
|
|
|
|
return srs_error_wrap(err, "consume message");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Put message in queue, here we may enable pass_timestamp mode.
|
|
|
|
|
if ((err = queue->enqueue(msg, NULL, pass_timestamp)) != srs_success) {
|
|
|
|
|
if ((err = queue->enqueue(msg, NULL)) != srs_success) {
|
|
|
|
|
return srs_error_wrap(err, "enqueue message");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_PERF_QUEUE_COND_WAIT
|
|
|
|
|
// fire the mw when msgs is enough.
|
|
|
|
|
if (mw_waiting) {
|
|
|
|
|
// For RTC, we use pass_timestamp mode, we don't care about the timestamp in queue,
|
|
|
|
|
// so we only check the messages in queue.
|
|
|
|
|
if (pass_timestamp) {
|
|
|
|
|
if (queue->size() > mw_min_msgs) {
|
|
|
|
|
srs_cond_signal(mw_wait);
|
|
|
|
|
mw_waiting = false;
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// For RTMP, we wait for messages and duration.
|
|
|
|
|
srs_utime_t duration = queue->duration();
|
|
|
|
|
bool match_min_msgs = queue->size() > mw_min_msgs;
|
|
|
|
@ -564,7 +537,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// pump msgs from queue.
|
|
|
|
|
if ((err = queue->dump_packets(max, msgs->msgs, count, pass_timestamp)) != srs_success) {
|
|
|
|
|
if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {
|
|
|
|
|
return srs_error_wrap(err, "dump packets");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|