From ab9a2ddd6cc7a2d9f714cdd5813975f915009912 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 17 Apr 2014 18:13:59 +0800 Subject: [PATCH] dvr support segment plan --- trunk/src/app/srs_app_dvr.cpp | 174 +++++++++++++++++++--------------- trunk/src/app/srs_app_dvr.hpp | 19 +++- trunk/src/app/srs_app_hls.cpp | 6 +- 3 files changed, 113 insertions(+), 86 deletions(-) diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 45c745531..caec7ae2d 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -90,6 +90,11 @@ int SrsFileStream::close() return ret; } +bool SrsFileStream::is_open() +{ + return fd > 0; +} + int SrsFileStream::read(void* buf, size_t count, ssize_t* pnread) { int ret = ERROR_SUCCESS; @@ -293,6 +298,8 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s SrsDvrPlan::SrsDvrPlan() { _source = NULL; + _req = NULL; + jitter = NULL; dvr_enabled = false; fs = new SrsFileStream(); enc = new SrsFlvEncoder(); @@ -300,19 +307,60 @@ SrsDvrPlan::SrsDvrPlan() SrsDvrPlan::~SrsDvrPlan() { + srs_freep(jitter); srs_freep(fs); srs_freep(enc); } -int SrsDvrPlan::initialize(SrsSource* source, SrsRequest* /*req*/) +int SrsDvrPlan::initialize(SrsSource* source, SrsRequest* req) { int ret = ERROR_SUCCESS; _source = source; + _req = req; return ret; } +int SrsDvrPlan::on_publish() +{ + int ret = ERROR_SUCCESS; + + // support multiple publish. + if (dvr_enabled) { + return ret; + } + + SrsRequest* req = _req; + + if (!_srs_config->get_dvr_enabled(req->vhost)) { + return ret; + } + + // jitter. + srs_freep(jitter); + jitter = new SrsRtmpJitter(); + + // new flv file + std::stringstream path; + + path << _srs_config->get_dvr_path(req->vhost) + << "/" << req->app << "/" + << req->stream << "." << srs_get_system_time_ms() << ".flv"; + + if ((ret = flv_open(req->get_stream_url(), path.str())) != ERROR_SUCCESS) { + return ret; + } + dvr_enabled = true; + + // the dvr is enabled, notice the source to push the data. + if ((ret = _source->on_dvr_start()) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + int SrsDvrPlan::flv_open(string stream, string path) { int ret = ERROR_SUCCESS; @@ -332,10 +380,6 @@ int SrsDvrPlan::flv_open(string stream, string path) return ret; } - if ((ret = _source->on_dvr_start()) != ERROR_SUCCESS) { - return ret; - } - srs_trace("dvr stream %s to file %s", stream.c_str(), path.c_str()); return ret; } @@ -375,6 +419,10 @@ int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio) return ret; } + if ((jitter->correct(audio, 0, 0)) != ERROR_SUCCESS) { + return ret; + } + int32_t timestamp = audio->header.timestamp; char* payload = (char*)audio->payload; int size = (int)audio->size; @@ -382,6 +430,10 @@ int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio) return ret; } + if ((ret = on_audio_msg(audio)) != ERROR_SUCCESS) { + return ret; + } + return ret; } @@ -393,6 +445,10 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video) return ret; } + if ((jitter->correct(video, 0, 0)) != ERROR_SUCCESS) { + return ret; + } + int32_t timestamp = video->header.timestamp; char* payload = (char*)video->payload; int size = (int)video->size; @@ -400,6 +456,22 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* video) return ret; } + if ((ret = on_video_msg(video)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsDvrPlan::on_audio_msg(SrsSharedPtrMessage* /*audio*/) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +int SrsDvrPlan::on_video_msg(SrsSharedPtrMessage* /*video*/) +{ + int ret = ERROR_SUCCESS; return ret; } @@ -423,33 +495,6 @@ SrsDvrSessionPlan::~SrsDvrSessionPlan() { } -int SrsDvrSessionPlan::on_publish(SrsRequest* req) -{ - int ret = ERROR_SUCCESS; - - // support multiple publish. - if (dvr_enabled) { - return ret; - } - - if (!_srs_config->get_dvr_enabled(req->vhost)) { - return ret; - } - - std::stringstream path; - - path << _srs_config->get_dvr_path(req->vhost) - << "/" << req->app << "/" - << req->stream << "." << srs_get_system_time_ms() << ".flv"; - - if ((ret = flv_open(req->get_stream_url(), path.str())) != ERROR_SUCCESS) { - return ret; - } - dvr_enabled = true; - - return ret; -} - void SrsDvrSessionPlan::on_unpublish() { // support multiple publish. @@ -492,31 +537,17 @@ int SrsDvrSegmentPlan::initialize(SrsSource* source, SrsRequest* req) return ret; } -int SrsDvrSegmentPlan::on_publish(SrsRequest* req) +int SrsDvrSegmentPlan::on_publish() { int ret = ERROR_SUCCESS; - // support multiple publish. - if (dvr_enabled) { - return ret; - } - - if (!_srs_config->get_dvr_enabled(req->vhost)) { + // if already opened, continue to dvr. + if (fs->is_open()) { + dvr_enabled = true; return ret; } - std::stringstream path; - - path << _srs_config->get_dvr_path(req->vhost) - << "/" << req->app << "/" - << req->stream << "." << srs_get_system_time_ms() << ".flv"; - - if ((ret = flv_open(req->get_stream_url(), path.str())) != ERROR_SUCCESS) { - return ret; - } - dvr_enabled = true; - - return ret; + return SrsDvrPlan::on_publish(); } void SrsDvrSegmentPlan::on_unpublish() @@ -525,51 +556,28 @@ void SrsDvrSegmentPlan::on_unpublish() if (!dvr_enabled) { return; } - - // ignore error. - int ret = flv_close(); - if (ret != ERROR_SUCCESS) { - srs_warn("ignore flv close error. ret=%d", ret); - } - dvr_enabled = false; } -int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* audio) +int SrsDvrSegmentPlan::on_audio_msg(SrsSharedPtrMessage* audio) { int ret = ERROR_SUCCESS; - if (!dvr_enabled) { - return ret; - } - if ((ret = update_duration(audio)) != ERROR_SUCCESS) { return ret; } - if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) { - return ret; - } - return ret; } -int SrsDvrSegmentPlan::on_video(SrsSharedPtrMessage* video) +int SrsDvrSegmentPlan::on_video_msg(SrsSharedPtrMessage* video) { int ret = ERROR_SUCCESS; - if (!dvr_enabled) { - return ret; - } - if ((ret = update_duration(video)) != ERROR_SUCCESS) { return ret; } - if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) { - return ret; - } - return ret; } @@ -586,7 +594,17 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg) // reap if exceed duration. if (duration > 0 && segment_duration > 0 && duration > segment_duration) { + duration = 0; + starttime = -1; + + if ((ret = flv_close()) != ERROR_SUCCESS) { + return ret; + } on_unpublish(); + + if ((ret = on_publish()) != ERROR_SUCCESS) { + return ret; + } } return ret; @@ -617,11 +635,11 @@ int SrsDvr::initialize(SrsRequest* req) return ret; } -int SrsDvr::on_publish(SrsRequest* req) +int SrsDvr::on_publish(SrsRequest* /*req*/) { int ret = ERROR_SUCCESS; - if ((ret = plan->on_publish(req)) != ERROR_SUCCESS) { + if ((ret = plan->on_publish()) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index 6de317e36..4e022eaa9 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsSource; class SrsRequest; class SrsStream; +class SrsRtmpJitter; class SrsOnMetaDataPacket; class SrsSharedPtrMessage; @@ -51,6 +52,7 @@ public: public: virtual int open(std::string file); virtual int close(); + virtual bool is_open(); public: /** * @param pnread, return the read size. NULL to ignore. @@ -123,18 +125,26 @@ protected: SrsFlvEncoder* enc; bool dvr_enabled; SrsSource* _source; + SrsRequest* _req; + SrsRtmpJitter* jitter; public: SrsDvrPlan(); virtual ~SrsDvrPlan(); public: virtual int initialize(SrsSource* source, SrsRequest* req); - virtual int on_publish(SrsRequest* req) = 0; + virtual int on_publish(); virtual void on_unpublish() = 0; virtual int on_meta_data(SrsOnMetaDataPacket* metadata); virtual int on_audio(SrsSharedPtrMessage* audio); virtual int on_video(SrsSharedPtrMessage* video); protected: virtual int flv_open(std::string stream, std::string path); + /** + * user should override this method. + * for the audio/video is corrected by jitter. + */ + virtual int on_audio_msg(SrsSharedPtrMessage* audio); + virtual int on_video_msg(SrsSharedPtrMessage* video); virtual int flv_close(); public: static SrsDvrPlan* create_plan(std::string vhost); @@ -149,7 +159,6 @@ public: SrsDvrSessionPlan(); virtual ~SrsDvrSessionPlan(); public: - virtual int on_publish(SrsRequest* req); virtual void on_unpublish(); }; @@ -168,10 +177,10 @@ public: virtual ~SrsDvrSegmentPlan(); public: virtual int initialize(SrsSource* source, SrsRequest* req); - virtual int on_publish(SrsRequest* req); + virtual int on_publish(); virtual void on_unpublish(); - virtual int on_audio(SrsSharedPtrMessage* audio); - virtual int on_video(SrsSharedPtrMessage* video); + virtual int on_audio_msg(SrsSharedPtrMessage* audio); + virtual int on_video_msg(SrsSharedPtrMessage* video); private: virtual int update_duration(SrsSharedPtrMessage* msg); }; diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index dfe1390f5..822b99ba7 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -1297,6 +1297,9 @@ int SrsHls::on_publish(SrsRequest* req) return ret; } + // if enabled, open the muxer. + hls_enabled = true; + // notice the source to get the cached sequence header. // when reload to start hls, hls will never get the sequence header in stream, // use the SrsSource.on_hls_start to push the sequence header to HLS. @@ -1304,9 +1307,6 @@ int SrsHls::on_publish(SrsRequest* req) srs_error("callback source hls start failed. ret=%d", ret); return ret; } - - // if enabled, open the muxer. - hls_enabled = true; return ret; }