Refine consumer dumps, extract dumps API

pull/1753/head
winlin 5 years ago
parent 9906d2e859
commit 20b4984af4

@ -1089,6 +1089,7 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str
local_media_desc.rtcp_mux_ = true;
local_media_desc.rtcp_rsize_ = true;
// TODO: FIXME: Avoid SSRC collision.
if (!ssrc_num) {
ssrc_num = ::getpid() * 10000 + ::getpid() * 100 + ::getpid();
}

@ -128,10 +128,13 @@ srs_error_t SrsBufferCache::cycle()
// the stream cache will create consumer to cache stream,
// which will trigger to fetch stream from origin for edge.
SrsConsumer* consumer = NULL;
if ((err = source->create_consumer(NULL, consumer, false, false, true)) != srs_success) {
SrsAutoFree(SrsConsumer, consumer);
if ((err = source->create_consumer(NULL, consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
SrsAutoFree(SrsConsumer, consumer);
if ((err = source->consumer_dumps(consumer, false, false, true)) != srs_success) {
return srs_error_wrap(err, "dumps consumer");
}
SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream_cache();
SrsAutoFree(SrsPithyPrint, pprint);
@ -583,12 +586,14 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsConsumer* consumer = NULL;
if ((err = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != srs_success) {
SrsAutoFree(SrsConsumer, consumer);
if ((err = source->create_consumer(NULL, consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
SrsAutoFree(SrsConsumer, consumer);
srs_verbose("http: consumer created success.");
if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) {
return srs_error_wrap(err, "dumps consumer");
}
SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream();
SrsAutoFree(SrsPithyPrint, pprint);

@ -721,7 +721,6 @@ srs_error_t SrsRtcSenderThread::cycle()
SrsConsumer* consumer = NULL;
SrsAutoFree(SrsConsumer, consumer);
// TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.
if ((err = source->create_consumer(NULL, consumer)) != srs_success) {
return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str());
}
@ -733,6 +732,11 @@ srs_error_t SrsRtcSenderThread::cycle()
// messages and drop them if the shared sender queue is full.
consumer->enable_pass_timestamp();
// TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.
if ((err = source->consumer_dumps(consumer)) != srs_success) {
return srs_error_wrap(err, "dumps consumer, source url=%s", req->get_stream_url().c_str());
}
realtime = _srs_config->get_realtime_enabled(req->vhost, true);
mw_sleep = _srs_config->get_mw_sleep(req->vhost, true);
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true);

@ -654,10 +654,13 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
// Create a consumer of source.
SrsConsumer* consumer = NULL;
SrsAutoFree(SrsConsumer, consumer);
if ((err = source->create_consumer(this, consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer");
}
SrsAutoFree(SrsConsumer, consumer);
if ((err = source->consumer_dumps(consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: dumps consumer");
}
// Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217

@ -2594,12 +2594,27 @@ void SrsSource::on_unpublish()
}
}
srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer)
{
srs_error_t err = srs_success;
consumer = new SrsConsumer(this, conn);
consumers.push_back(consumer);
// for edge, when play edge stream, check the state
if (_srs_config->get_vhost_is_edge(req->vhost)) {
// notice edge to start for the first client.
if ((err = play_edge->on_client_play()) != srs_success) {
return srs_error_wrap(err, "play edge");
}
}
return err;
}
srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, bool dg)
{
srs_error_t err = srs_success;
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
consumer->set_queue_size(queue_size);
@ -2636,15 +2651,7 @@ srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consum
} else {
srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm);
}
// for edge, when play edge stream, check the state
if (_srs_config->get_vhost_is_edge(req->vhost)) {
// notice edge to start for the first client.
if ((err = play_edge->on_client_play()) != srs_success) {
return srs_error_wrap(err, "play edge");
}
}
return err;
}

@ -599,12 +599,14 @@ public:
virtual srs_error_t on_publish();
virtual void on_unpublish();
public:
// Create consumer and dumps packets in cache.
// Create consumer
// @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsConnection* conn, SrsConsumer*& consumer);
// Dumps packets in cache to consumer.
// @param ds, whether dumps the sequence header.
// @param dm, whether dumps the metadata.
// @param dg, whether dumps the gop cache.
virtual srs_error_t create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds = true, bool dm = true, bool dg = true);
virtual srs_error_t consumer_dumps(SrsConsumer* consumer, bool ds = true, bool dm = true, bool dg = true);
virtual void on_consumer_destroy(SrsConsumer* consumer);
virtual void set_cache(bool enabled);
virtual SrsRtmpJitterAlgorithm jitter();

Loading…
Cancel
Save