From 0858bd8b6f9d72568f50effd82e62071daad53c0 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 26 Mar 2014 16:25:02 +0800 Subject: [PATCH] Support RTMP ATC for HLS/HDS to support backup(failover). change to 0.9.35 --- README.md | 14 ++--- trunk/conf/full.conf | 17 ++++++ trunk/src/app/srs_app_config.cpp | 16 ++++++ trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_source.cpp | 64 +++++++++++++++++++++-- trunk/src/app/srs_app_source.hpp | 17 ++++++ trunk/src/core/srs_core.hpp | 2 +- trunk/src/rtmp/srs_protocol_handshake.cpp | 2 +- 8 files changed, 120 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index af90b108e..2e6f017de 100755 --- a/README.md +++ b/README.md @@ -112,13 +112,13 @@ Supported operating systems and hardware: 26. Support RTMP(play-publish) library: srs-librtmp
27. Support ARM([debian armhf, v7cpu](https://github.com/winlinvip/simple-rtmp-server/wiki/SrsLinuxArm)) with rtmp/ssl/hls/librtmp.
28. Support [init.d](https://github.com/winlinvip/simple-rtmp-server/wiki/LinuxService) and packge script, log to file.
-29. [plan] Support system utest
-30. [plan] Support embeded http server for api and hls(live/vod)
-31. [plan] Support vod(file to hls stream)
-32. [plan] Support stream ingester using ffmpeg.
-33. [plan] Support ingest RTSP(RTP, SDP) stream to RTMP.
-34. [plan] Support network based cli and json result.
-35. [plan] Support HLS cluster, use RTMP ATC to generate the TS
+29. Support RTMP ATC for HLS/HDS to support backup(failover)
+30. [plan] Support system utest
+31. [plan] Support embeded http server for api and hls(live/vod)
+32. [plan] Support vod(file to hls stream)
+33. [plan] Support stream ingester using ffmpeg.
+34. [plan] Support ingest RTSP(RTP, SDP) stream to RTMP.
+35. [plan] Support network based cli and json result.
36. [plan] Support RTMP edge server, push/pull stream from any RTMP server
37. [plan] Support multiple processes, for both origin and edge
38. [no-plan] Support adobe flash refer/token/swf verification.
diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index d7b060555..52db543c9 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -41,6 +41,23 @@ vhost __defaultVhost__ { gop_cache on; } +# vhost for atc. +vhost atc.srs.com { + # vhost for atc for hls/hds/rtmp backup. + # generally, atc default to off, server delivery rtmp stream to client(flash) timestamp from 0. + # when atc is on, server delivery rtmp stream by absolute time. + # atc is used, for instance, encoder will copy stream to master and slave server, + # server use atc to delivery stream to edge/client, where stream time from master/slave server + # is always the same, client/tools can slice RTMP stream to HLS according to the same time, + # if the time not the same, the HLS stream cannot slice to support system backup. + # + # @see http://www.adobe.com/cn/devnet/adobe-media-server/articles/varnish-sample-for-failover.html + # @see http://www.baidu.com/#wd=hds%20hls%20atc + # + # default: off + atc on; +} + # vhost for bandwidth check # generally, the bandcheck vhost must be: bandcheck.srs.com, # or need to modify the vhost of client. diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index ec6db32de..80eca830c 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1340,6 +1340,22 @@ bool SrsConfig::get_gop_cache(string vhost) return true; } +bool SrsConfig::get_atc(string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return true; + } + + conf = conf->get("atc"); + if (conf && conf->arg0() == "on") { + return true; + } + + return false; +} + double SrsConfig::get_queue_length(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index e3d73bc3a..431da58b2 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -153,6 +153,7 @@ public: virtual bool get_deamon(); virtual int get_max_connections(); virtual bool get_gop_cache(std::string vhost); + virtual bool get_atc(std::string vhost); virtual double get_queue_length(std::string vhost); virtual SrsConfDirective* get_forward(std::string vhost); private: diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 416d088e8..7df605e4d 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -268,10 +268,11 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) { int ret = ERROR_SUCCESS; - /* TODO: to support atc */ - if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) { - srs_freep(msg); - return ret; + if (!source->is_atc()) { + if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) { + srs_freep(msg); + return ret; + } } if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) { @@ -391,6 +392,23 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) return ret; } +bool SrsGopCache::empty() +{ + return gop_cache.empty(); +} + +int64_t SrsGopCache::get_start_time() +{ + if (empty()) { + return 0; + } + + SrsSharedPtrMessage* msg = gop_cache[0]; + srs_assert(msg); + + return msg->header.timestamp; +} + std::map SrsSource::pool; SrsSource* SrsSource::find(SrsRequest* req) @@ -425,6 +443,7 @@ SrsSource::SrsSource(SrsRequest* _req) gop_cache = new SrsGopCache(); _srs_config->subscribe(this); + atc = _srs_config->get_atc(req->vhost); } SrsSource::~SrsSource() @@ -774,6 +793,16 @@ int SrsSource::on_audio(SrsCommonMessage* audio) } srs_verbose("cache gop success."); + // if atc, update the sequence header to abs time. + if (atc) { + if (cache_sh_audio) { + cache_sh_audio->header.timestamp = msg->header.timestamp; + } + if (cache_metadata) { + cache_metadata->header.timestamp = msg->header.timestamp; + } + } + return ret; } @@ -841,6 +870,16 @@ int SrsSource::on_video(SrsCommonMessage* video) } srs_verbose("cache gop success."); + // if atc, update the sequence header to abs time. + if (atc) { + if (cache_sh_video) { + cache_sh_video->header.timestamp = msg->header.timestamp; + } + if (cache_metadata) { + cache_metadata->header.timestamp = msg->header.timestamp; + } + } + return ret; } @@ -921,6 +960,17 @@ void SrsSource::on_unpublish() } srs_info("dispatch metadata success"); + // if atc, update the sequence header to gop cache time. + if (atc && !gop_cache->empty()) { + if (cache_sh_video) { + cache_sh_video->header.timestamp = gop_cache->get_start_time(); + } + if (cache_sh_audio) { + cache_sh_audio->header.timestamp = gop_cache->get_start_time(); + } + } + + // copy sequence header if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch video sequence header failed. ret=%d", ret); return ret; @@ -933,6 +983,7 @@ void SrsSource::on_unpublish() } srs_info("dispatch audio sequence header success"); + // copy gop cache to client. if ((ret = gop_cache->dump(consumer, sample_rate, frame_rate)) != ERROR_SUCCESS) { return ret; } @@ -957,6 +1008,11 @@ void SrsSource::set_cache(bool enabled) gop_cache->set(enabled); } +bool SrsSource::is_atc() +{ + return atc; +} + int SrsSource::create_forwarders() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 9f0c554e5..0e3bdb4a1 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -191,6 +191,12 @@ public: virtual int cache(SrsSharedPtrMessage* msg); virtual void clear(); virtual int dump(SrsConsumer* consumer, int tba, int tbv); + /** + * used for atc to get the time of gop cache, + * the atc will adjust the sequence header timestamp to gop cache. + */ + virtual bool empty(); + virtual int64_t get_start_time(); }; /** @@ -238,6 +244,13 @@ private: * can publish, true when is not streaming */ bool _can_publish; + /** + * atc whether atc(use absolute time and donot adjust time), + * directly use msg time and donot adjust if atc is true, + * otherwise, adjust msg time to start from 0 to make flash happy. + */ + // TODO: FIXME: to support reload atc. + bool atc; private: SrsSharedPtrMessage* cache_metadata; // the cached video sequence header. @@ -279,6 +292,10 @@ public: virtual int create_consumer(SrsConsumer*& consumer); virtual void on_consumer_destroy(SrsConsumer* consumer); virtual void set_cache(bool enabled); +// internal +public: + // for consumer, atc feature. + virtual bool is_atc(); private: virtual int create_forwarders(); virtual void destroy_forwarders(); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 8d4b2dee0..c152ef816 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "34" +#define VERSION_REVISION "35" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "srs" diff --git a/trunk/src/rtmp/srs_protocol_handshake.cpp b/trunk/src/rtmp/srs_protocol_handshake.cpp index 26ba08916..ae979da6d 100644 --- a/trunk/src/rtmp/srs_protocol_handshake.cpp +++ b/trunk/src/rtmp/srs_protocol_handshake.cpp @@ -1163,7 +1163,7 @@ int SrsComplexHandshake::handshake_with_client(SrsHandshakeBytes* hs_bytes, ISrs // never verify c2, for ffmpeg will failed. // it's ok for flash. - srs_trace("comple handshake with client success"); + srs_trace("complex handshake with client success"); return ret; }