Rename SrsConsumer* to SrsLiveConsumer*. 4.0.114

pull/2357/head
winlin 4 years ago
parent dae6dc5395
commit 3dce568b9c

@ -176,6 +176,7 @@ The ports used by SRS:
## V4 changes ## V4 changes
* v4.0, 2021-05-15, Rename SrsConsumer* to SrsLiveConsumer*. 4.0.114
* v4.0, 2021-05-15, Rename SrsRtcStream* to SrsRtcSource*. 4.0.113 * v4.0, 2021-05-15, Rename SrsRtcStream* to SrsRtcSource*. 4.0.113
* v4.0, 2021-05-15, Rename SrsSource* to SrsLiveSource*. 4.0.112 * v4.0, 2021-05-15, Rename SrsSource* to SrsLiveSource*. 4.0.112
* v4.0, 2021-05-15, Rename SrsRtpPacket2 to SrsRtpPacket. 4.0.111 * v4.0, 2021-05-15, Rename SrsRtpPacket2 to SrsRtpPacket. 4.0.111

@ -40,7 +40,7 @@
class SrsServer; class SrsServer;
class SrsLiveSource; class SrsLiveSource;
class SrsRequest; class SrsRequest;
class SrsConsumer; class SrsLiveConsumer;
class SrsStSocket; class SrsStSocket;
class SrsHttpParser; class SrsHttpParser;
class ISrsHttpMessage; class ISrsHttpMessage;

@ -96,7 +96,7 @@ srs_error_t SrsBufferCache::start()
return err; return err;
} }
srs_error_t SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) srs_error_t SrsBufferCache::dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -127,8 +127,8 @@ srs_error_t SrsBufferCache::cycle()
// the stream cache will create consumer to cache stream, // the stream cache will create consumer to cache stream,
// which will trigger to fetch stream from origin for edge. // which will trigger to fetch stream from origin for edge.
SrsConsumer* consumer = NULL; SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsConsumer, consumer); SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) { if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer"); return srs_error_wrap(err, "create consumer");
} }
@ -245,7 +245,7 @@ bool SrsTsStreamEncoder::has_cache()
return false; return false;
} }
srs_error_t SrsTsStreamEncoder::dump_cache(SrsConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/) srs_error_t SrsTsStreamEncoder::dump_cache(SrsLiveConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/)
{ {
// for ts stream, ignore cache. // for ts stream, ignore cache.
return srs_success; return srs_success;
@ -312,7 +312,7 @@ bool SrsFlvStreamEncoder::has_cache()
return false; return false;
} }
srs_error_t SrsFlvStreamEncoder::dump_cache(SrsConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/) srs_error_t SrsFlvStreamEncoder::dump_cache(SrsLiveConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/)
{ {
// for flv stream, ignore cache. // for flv stream, ignore cache.
return srs_success; return srs_success;
@ -412,7 +412,7 @@ bool SrsAacStreamEncoder::has_cache()
return true; return true;
} }
srs_error_t SrsAacStreamEncoder::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) srs_error_t SrsAacStreamEncoder::dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
{ {
srs_assert(cache); srs_assert(cache);
return cache->dump_cache(consumer, jitter); return cache->dump_cache(consumer, jitter);
@ -468,7 +468,7 @@ bool SrsMp3StreamEncoder::has_cache()
return true; return true;
} }
srs_error_t SrsMp3StreamEncoder::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) srs_error_t SrsMp3StreamEncoder::dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
{ {
srs_assert(cache); srs_assert(cache);
return cache->dump_cache(consumer, jitter); return cache->dump_cache(consumer, jitter);
@ -585,8 +585,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
w->write_header(SRS_CONSTS_HTTP_OK); w->write_header(SRS_CONSTS_HTTP_OK);
// create consumer of souce, ignore gop cache, use the audio gop cache. // create consumer of souce, ignore gop cache, use the audio gop cache.
SrsConsumer* consumer = NULL; SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsConsumer, consumer); SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) { if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "create consumer"); return srs_error_wrap(err, "create consumer");
} }

@ -49,7 +49,7 @@ public:
virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r); virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r);
public: public:
virtual srs_error_t start(); virtual srs_error_t start();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
// Interface ISrsEndlessThreadHandler. // Interface ISrsEndlessThreadHandler.
public: public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
@ -76,7 +76,7 @@ public:
// @return true to use gop cache of encoder; otherwise, use SrsLiveSource. // @return true to use gop cache of encoder; otherwise, use SrsLiveSource.
virtual bool has_cache() = 0; virtual bool has_cache() = 0;
// Dumps the cache of encoder to consumer. // Dumps the cache of encoder to consumer.
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) = 0; virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter) = 0;
}; };
// Transmux RTMP to HTTP Live Streaming. // Transmux RTMP to HTTP Live Streaming.
@ -95,7 +95,7 @@ public:
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size); virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public: public:
virtual bool has_cache(); virtual bool has_cache();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
public: public:
// Write the tags in a time. // Write the tags in a time.
virtual srs_error_t write_tags(SrsSharedPtrMessage** msgs, int count); virtual srs_error_t write_tags(SrsSharedPtrMessage** msgs, int count);
@ -118,7 +118,7 @@ public:
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size); virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public: public:
virtual bool has_cache(); virtual bool has_cache();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
}; };
// Transmux RTMP with AAC stream to HTTP AAC Streaming. // Transmux RTMP with AAC stream to HTTP AAC Streaming.
@ -137,7 +137,7 @@ public:
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size); virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public: public:
virtual bool has_cache(); virtual bool has_cache();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
}; };
// Transmux RTMP with MP3 stream to HTTP MP3 Streaming. // Transmux RTMP with MP3 stream to HTTP MP3 Streaming.
@ -156,7 +156,7 @@ public:
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size); virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public: public:
virtual bool has_cache(); virtual bool has_cache();
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
}; };
// Write stream to http response direclty. // Write stream to http response direclty.

@ -164,7 +164,7 @@ srs_error_t SrsRecvThread::do_cycle()
return err; return err;
} }
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid) SrsQueueRecvThread::SrsQueueRecvThread(SrsLiveConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid)
: trd(this, rtmp_sdk, tm, parent_cid) : trd(this, rtmp_sdk, tm, parent_cid)
{ {
_consumer = consumer; _consumer = consumer;

@ -39,7 +39,7 @@ class SrsCommonMessage;
class SrsRtmpConn; class SrsRtmpConn;
class SrsLiveSource; class SrsLiveSource;
class SrsRequest; class SrsRequest;
class SrsConsumer; class SrsLiveConsumer;
class SrsHttpConn; class SrsHttpConn;
class SrsResponseOnlyHttpConn; class SrsResponseOnlyHttpConn;
@ -114,10 +114,10 @@ private:
SrsRtmpServer* rtmp; SrsRtmpServer* rtmp;
// The recv thread error code. // The recv thread error code.
srs_error_t recv_error; srs_error_t recv_error;
SrsConsumer* _consumer; SrsLiveConsumer* _consumer;
public: public:
// TODO: FIXME: Refine timeout in time unit. // TODO: FIXME: Refine timeout in time unit.
SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid); SrsQueueRecvThread(SrsLiveConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid);
virtual ~SrsQueueRecvThread(); virtual ~SrsQueueRecvThread();
public: public:
virtual srs_error_t start(); virtual srs_error_t start();

@ -47,7 +47,7 @@
#include <sys/socket.h> #include <sys/socket.h>
class SrsUdpMuxSocket; class SrsUdpMuxSocket;
class SrsConsumer; class SrsLiveConsumer;
class SrsStunPacket; class SrsStunPacket;
class SrsRtcServer; class SrsRtcServer;
class SrsRtcConnection; class SrsRtcConnection;

@ -677,8 +677,8 @@ srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
set_sock_options(); set_sock_options();
// Create a consumer of source. // Create a consumer of source.
SrsConsumer* consumer = NULL; SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsConsumer, consumer); SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) { if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer"); return srs_error_wrap(err, "rtmp: create consumer");
} }
@ -709,7 +709,7 @@ srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
return err; return err;
} }
srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsConsumer* consumer, SrsQueueRecvThread* rtrd) srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1111,7 +1111,7 @@ srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommo
return err; return err;
} }
srs_error_t SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg) srs_error_t SrsRtmpConn::process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;

@ -40,7 +40,7 @@ class SrsRequest;
class SrsResponse; class SrsResponse;
class SrsLiveSource; class SrsLiveSource;
class SrsRefer; class SrsRefer;
class SrsConsumer; class SrsLiveConsumer;
class SrsCommonMessage; class SrsCommonMessage;
class SrsStSocket; class SrsStSocket;
class SrsHttpHooks; class SrsHttpHooks;
@ -162,14 +162,14 @@ private:
virtual srs_error_t stream_service_cycle(); virtual srs_error_t stream_service_cycle();
virtual srs_error_t check_vhost(bool try_default_vhost); virtual srs_error_t check_vhost(bool try_default_vhost);
virtual srs_error_t playing(SrsLiveSource* source); virtual srs_error_t playing(SrsLiveSource* source);
virtual srs_error_t do_playing(SrsLiveSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd); virtual srs_error_t do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd);
virtual srs_error_t publishing(SrsLiveSource* source); virtual srs_error_t publishing(SrsLiveSource* source);
virtual srs_error_t do_publishing(SrsLiveSource* source, SrsPublishRecvThread* trd); virtual srs_error_t do_publishing(SrsLiveSource* source, SrsPublishRecvThread* trd);
virtual srs_error_t acquire_publish(SrsLiveSource* source); virtual srs_error_t acquire_publish(SrsLiveSource* source);
virtual void release_publish(SrsLiveSource* source); virtual void release_publish(SrsLiveSource* source);
virtual srs_error_t handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg); virtual srs_error_t handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg); virtual srs_error_t process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
virtual srs_error_t process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); virtual srs_error_t process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg);
virtual void set_sock_options(); virtual void set_sock_options();
private: private:
virtual srs_error_t check_edge_token_traverse_auth(); virtual srs_error_t check_edge_token_traverse_auth();

@ -329,7 +329,7 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p
return err; return err;
} }
srs_error_t SrsMessageQueue::dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag) srs_error_t SrsMessageQueue::dump_packets(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -418,7 +418,7 @@ ISrsWakable::~ISrsWakable()
{ {
} }
SrsConsumer::SrsConsumer(SrsLiveSource* s) SrsLiveConsumer::SrsLiveConsumer(SrsLiveSource* s)
{ {
source = s; source = s;
paused = false; paused = false;
@ -434,7 +434,7 @@ SrsConsumer::SrsConsumer(SrsLiveSource* s)
#endif #endif
} }
SrsConsumer::~SrsConsumer() SrsLiveConsumer::~SrsLiveConsumer()
{ {
source->on_consumer_destroy(this); source->on_consumer_destroy(this);
srs_freep(jitter); srs_freep(jitter);
@ -445,22 +445,22 @@ SrsConsumer::~SrsConsumer()
#endif #endif
} }
void SrsConsumer::set_queue_size(srs_utime_t queue_size) void SrsLiveConsumer::set_queue_size(srs_utime_t queue_size)
{ {
queue->set_queue_size(queue_size); queue->set_queue_size(queue_size);
} }
void SrsConsumer::update_source_id() void SrsLiveConsumer::update_source_id()
{ {
should_update_source_id = true; should_update_source_id = true;
} }
int64_t SrsConsumer::get_time() int64_t SrsLiveConsumer::get_time()
{ {
return jitter->get_time(); return jitter->get_time();
} }
srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -504,7 +504,7 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR
return err; return err;
} }
srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) srs_error_t SrsLiveConsumer::dump_packets(SrsMessageArray* msgs, int& count)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -537,7 +537,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
} }
#ifdef SRS_PERF_QUEUE_COND_WAIT #ifdef SRS_PERF_QUEUE_COND_WAIT
void SrsConsumer::wait(int nb_msgs, srs_utime_t msgs_duration) void SrsLiveConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
{ {
if (paused) { if (paused) {
srs_usleep(SRS_CONSTS_RTMP_PULSE); srs_usleep(SRS_CONSTS_RTMP_PULSE);
@ -563,7 +563,7 @@ void SrsConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
} }
#endif #endif
srs_error_t SrsConsumer::on_play_client_pause(bool is_pause) srs_error_t SrsLiveConsumer::on_play_client_pause(bool is_pause)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -573,7 +573,7 @@ srs_error_t SrsConsumer::on_play_client_pause(bool is_pause)
return err; return err;
} }
void SrsConsumer::wakeup() void SrsLiveConsumer::wakeup()
{ {
#ifdef SRS_PERF_QUEUE_COND_WAIT #ifdef SRS_PERF_QUEUE_COND_WAIT
if (mw_waiting) { if (mw_waiting) {
@ -681,7 +681,7 @@ void SrsGopCache::clear()
audio_after_last_video_count = 0; audio_after_last_video_count = 0;
} }
srs_error_t SrsGopCache::dump(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm) srs_error_t SrsGopCache::dump(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1561,7 +1561,7 @@ SrsFormat* SrsMetaCache::ash_format()
return aformat; return aformat;
} }
srs_error_t SrsMetaCache::dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds) srs_error_t SrsMetaCache::dumps(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -2018,10 +2018,10 @@ srs_error_t SrsLiveSource::on_reload_vhost_play(string vhost)
srs_utime_t v = _srs_config->get_queue_length(req->vhost); srs_utime_t v = _srs_config->get_queue_length(req->vhost);
if (true) { if (true) {
std::vector<SrsConsumer*>::iterator it; std::vector<SrsLiveConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) { for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it; SrsLiveConsumer* consumer = *it;
consumer->set_queue_size(v); consumer->set_queue_size(v);
} }
@ -2066,9 +2066,9 @@ srs_error_t SrsLiveSource::on_source_id_changed(SrsContextId id)
_source_id = id; _source_id = id;
// notice all consumer // notice all consumer
std::vector<SrsConsumer*>::iterator it; std::vector<SrsLiveConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) { for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it; SrsLiveConsumer* consumer = *it;
consumer->update_source_id(); consumer->update_source_id();
} }
@ -2139,9 +2139,9 @@ srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPack
// copy to all consumer // copy to all consumer
if (!drop_for_reduce) { if (!drop_for_reduce) {
std::vector<SrsConsumer*>::iterator it; std::vector<SrsLiveConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) { for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it; SrsLiveConsumer* consumer = *it;
if ((err = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != srs_success) { if ((err = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume metadata"); return srs_error_wrap(err, "consume metadata");
} }
@ -2226,7 +2226,7 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)
// copy to all consumer // copy to all consumer
if (!drop_for_reduce) { if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) { for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i); SrsLiveConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) { if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume message"); return srs_error_wrap(err, "consume message");
} }
@ -2356,7 +2356,7 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg)
// copy to all consumer // copy to all consumer
if (!drop_for_reduce) { if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) { for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i); SrsLiveConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) { if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume video"); return srs_error_wrap(err, "consume video");
} }
@ -2569,11 +2569,11 @@ void SrsLiveSource::on_unpublish()
} }
} }
srs_error_t SrsLiveSource::create_consumer(SrsConsumer*& consumer) srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
consumer = new SrsConsumer(this); consumer = new SrsLiveConsumer(this);
consumers.push_back(consumer); consumers.push_back(consumer);
// for edge, when play edge stream, check the state // for edge, when play edge stream, check the state
@ -2587,7 +2587,7 @@ srs_error_t SrsLiveSource::create_consumer(SrsConsumer*& consumer)
return err; return err;
} }
srs_error_t SrsLiveSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, bool dg) srs_error_t SrsLiveSource::consumer_dumps(SrsLiveConsumer* consumer, bool ds, bool dm, bool dg)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -2630,9 +2630,9 @@ srs_error_t SrsLiveSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool d
return err; return err;
} }
void SrsLiveSource::on_consumer_destroy(SrsConsumer* consumer) void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer)
{ {
std::vector<SrsConsumer*>::iterator it; std::vector<SrsLiveConsumer*>::iterator it;
it = std::find(consumers.begin(), consumers.end(), consumer); it = std::find(consumers.begin(), consumers.end(), consumer);
if (it != consumers.end()) { if (it != consumers.end()) {
consumers.erase(it); consumers.erase(it);

@ -38,7 +38,7 @@
class SrsFormat; class SrsFormat;
class SrsRtmpFormat; class SrsRtmpFormat;
class SrsConsumer; class SrsLiveConsumer;
class SrsPlayEdge; class SrsPlayEdge;
class SrsPublishEdge; class SrsPublishEdge;
class SrsLiveSource; class SrsLiveSource;
@ -157,8 +157,8 @@ public:
// @max_count the max count to dequeue, must be positive. // @max_count the max count to dequeue, must be positive.
virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
// Dumps packets to consumer, use specified args. // Dumps packets to consumer, use specified args.
// @remark the atc/tba/tbv/ag are same to SrsConsumer.enqueue(). // @remark the atc/tba/tbv/ag are same to SrsLiveConsumer.enqueue().
virtual srs_error_t dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag); virtual srs_error_t dump_packets(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag);
private: private:
// Remove a gop from the front. // Remove a gop from the front.
// if no iframe found, clear it. // if no iframe found, clear it.
@ -183,7 +183,7 @@ public:
}; };
// The consumer for SrsLiveSource, that is a play client. // The consumer for SrsLiveSource, that is a play client.
class SrsConsumer : public ISrsWakable class SrsLiveConsumer : public ISrsWakable
{ {
private: private:
SrsRtmpJitter* jitter; SrsRtmpJitter* jitter;
@ -201,8 +201,8 @@ private:
srs_utime_t mw_duration; srs_utime_t mw_duration;
#endif #endif
public: public:
SrsConsumer(SrsLiveSource* s); SrsLiveConsumer(SrsLiveSource* s);
virtual ~SrsConsumer(); virtual ~SrsLiveConsumer();
public: public:
// Set the size of queue. // Set the size of queue.
virtual void set_queue_size(srs_utime_t queue_size); virtual void set_queue_size(srs_utime_t queue_size);
@ -279,7 +279,7 @@ public:
// clear the gop cache. // clear the gop cache.
virtual void clear(); virtual void clear();
// dump the cached gop to consumer. // dump the cached gop to consumer.
virtual srs_error_t dump(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm); virtual srs_error_t dump(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm);
// used for atc to get the time of gop cache, // used for atc to get the time of gop cache,
// The atc will adjust the sequence header timestamp to gop cache. // The atc will adjust the sequence header timestamp to gop cache.
virtual bool empty(); virtual bool empty();
@ -433,7 +433,7 @@ public:
// Dumps cached metadata to consumer. // Dumps cached metadata to consumer.
// @param dm Whether dumps the metadata. // @param dm Whether dumps the metadata.
// @param ds Whether dumps the sequence header. // @param ds Whether dumps the sequence header.
virtual srs_error_t dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds); virtual srs_error_t dumps(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds);
public: public:
// Previous exists sequence header. // Previous exists sequence header.
virtual SrsSharedPtrMessage* previous_vsh(); virtual SrsSharedPtrMessage* previous_vsh();
@ -515,7 +515,7 @@ private:
// deep copy of client request. // deep copy of client request.
SrsRequest* req; SrsRequest* req;
// To delivery stream to clients. // To delivery stream to clients.
std::vector<SrsConsumer*> consumers; std::vector<SrsLiveConsumer*> consumers;
// The time jitter algorithm for vhost. // The time jitter algorithm for vhost.
SrsRtmpJitterAlgorithm jitter_algorithm; SrsRtmpJitterAlgorithm jitter_algorithm;
// For play, whether use interlaced/mixed algorithm to correct timestamp. // For play, whether use interlaced/mixed algorithm to correct timestamp.
@ -600,13 +600,13 @@ public:
public: public:
// Create consumer // Create consumer
// @param consumer, output the create consumer. // @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsConsumer*& consumer); virtual srs_error_t create_consumer(SrsLiveConsumer*& consumer);
// Dumps packets in cache to consumer. // Dumps packets in cache to consumer.
// @param ds, whether dumps the sequence header. // @param ds, whether dumps the sequence header.
// @param dm, whether dumps the metadata. // @param dm, whether dumps the metadata.
// @param dg, whether dumps the gop cache. // @param dg, whether dumps the gop cache.
virtual srs_error_t consumer_dumps(SrsConsumer* consumer, bool ds = true, bool dm = true, bool dg = true); virtual srs_error_t consumer_dumps(SrsLiveConsumer* consumer, bool ds = true, bool dm = true, bool dg = true);
virtual void on_consumer_destroy(SrsConsumer* consumer); virtual void on_consumer_destroy(SrsLiveConsumer* consumer);
virtual void set_cache(bool enabled); virtual void set_cache(bool enabled);
virtual SrsRtmpJitterAlgorithm jitter(); virtual SrsRtmpJitterAlgorithm jitter();
public: public:

@ -26,6 +26,6 @@
#define VERSION_MAJOR 4 #define VERSION_MAJOR 4
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 113 #define VERSION_REVISION 114
#endif #endif

Loading…
Cancel
Save