diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 9e8d7fd39..2cb77fab5 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -288,39 +288,32 @@ int SrsFlvEncoder::write_tag(char* header, int header_size, char* tag, int tag_s return ret; } -SrsDvr::SrsDvr(SrsSource* source) +SrsDvrPlan::SrsDvrPlan() { - _source = source; + _source = NULL; dvr_enabled = false; fs = new SrsFileStream(); enc = new SrsFlvEncoder(); } -SrsDvr::~SrsDvr() +SrsDvrPlan::~SrsDvrPlan() { srs_freep(fs); srs_freep(enc); } -int SrsDvr::on_publish(SrsRequest* req) +int SrsDvrPlan::initialize(SrsSource* source) { int ret = ERROR_SUCCESS; - // support multiple publish. - if (dvr_enabled) { - return ret; - } + _source = source; - if (!_srs_config->get_dvr_enabled(req->vhost)) { - return ret; - } - - std::string path = _srs_config->get_dvr_path(req->vhost); - path += "/"; - path += req->app; - path += "/"; - path += req->stream; - path += ".flv"; + return ret; +} + +int SrsDvrPlan::flv_open(string stream, string path) +{ + int ret = ERROR_SUCCESS; if ((ret = fs->open(path)) != ERROR_SUCCESS) { srs_error("open file stream for file %s failed. ret=%d", path.c_str(), ret); @@ -341,26 +334,16 @@ int SrsDvr::on_publish(SrsRequest* req) return ret; } - srs_trace("dvr stream %s to file %s", req->get_stream_url().c_str(), path.c_str()); - dvr_enabled = true; - + srs_trace("dvr stream %s to file %s", stream.c_str(), path.c_str()); return ret; } -void SrsDvr::on_unpublish() +int SrsDvrPlan::flv_close() { - // support multiple publish. - if (!dvr_enabled) { - return; - } - - // ignore error. - fs->close(); - - dvr_enabled = false; + return fs->close(); } -int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata) +int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata) { int ret = ERROR_SUCCESS; @@ -382,12 +365,10 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata) return ret; } -int SrsDvr::on_audio(SrsSharedPtrMessage* audio) +int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio) { int ret = ERROR_SUCCESS; - SrsAutoFree(SrsSharedPtrMessage, audio, false); - if (!dvr_enabled) { return ret; } @@ -402,12 +383,10 @@ int SrsDvr::on_audio(SrsSharedPtrMessage* audio) return ret; } -int SrsDvr::on_video(SrsSharedPtrMessage* video) +int SrsDvrPlan::on_video(SrsSharedPtrMessage* video) { int ret = ERROR_SUCCESS; - SrsAutoFree(SrsSharedPtrMessage, video, false); - if (!dvr_enabled) { return ret; } @@ -422,5 +401,140 @@ int SrsDvr::on_video(SrsSharedPtrMessage* video) return ret; } +SrsDvrPlan* SrsDvrPlan::create_plan() +{ + return new SrsDvrSessionPlan(); +} + +SrsDvrSessionPlan::SrsDvrSessionPlan() +{ +} + +SrsDvrSessionPlan::~SrsDvrSessionPlan() +{ +} + +int SrsDvrSessionPlan::on_publish(SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + // support multiple publish. + if (dvr_enabled) { + return ret; + } + + if (!_srs_config->get_dvr_enabled(req->vhost)) { + return ret; + } + + std::string path = _srs_config->get_dvr_path(req->vhost); + path += "/"; + path += req->app; + path += "/"; + path += req->stream; + path += ".flv"; + + if ((ret = flv_open(req->get_stream_url(), path)) != ERROR_SUCCESS) { + return ret; + } + dvr_enabled = true; + + return ret; +} + +void SrsDvrSessionPlan::on_unpublish() +{ + // support multiple publish. + if (!dvr_enabled) { + return; + } + + // ignore error. + int ret = flv_close(); + if (ret != ERROR_SUCCESS) { + srs_warn("ignore flv close error. ret=%d", ret); + } + + dvr_enabled = false; +} + +SrsDvr::SrsDvr(SrsSource* source) +{ + _source = source; + plan = NULL; +} + +SrsDvr::~SrsDvr() +{ + srs_freep(plan); +} + +int SrsDvr::initialize() +{ + int ret = ERROR_SUCCESS; + + srs_freep(plan); + plan = SrsDvrPlan::create_plan(); + + if ((ret = plan->initialize(_source)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsDvr::on_publish(SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + if ((ret = plan->on_publish(req)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +void SrsDvr::on_unpublish() +{ + plan->on_unpublish(); +} + +int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata) +{ + int ret = ERROR_SUCCESS; + + if ((ret = plan->on_meta_data(metadata)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsDvr::on_audio(SrsSharedPtrMessage* audio) +{ + int ret = ERROR_SUCCESS; + + SrsAutoFree(SrsSharedPtrMessage, audio, false); + + if ((ret = plan->on_audio(audio)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsDvr::on_video(SrsSharedPtrMessage* video) +{ + int ret = ERROR_SUCCESS; + + SrsAutoFree(SrsSharedPtrMessage, video, false); + + if ((ret = plan->on_video(video)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + #endif diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index 62028000b..f4506a8a1 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -105,6 +105,57 @@ private: virtual int write_tag(char* header, int header_size, char* tag, int tag_size); }; +/** +* the plan for dvr. +* use to control the following dvr params: +* 1. filename: the filename for record file. +* 2. reap flv: when to reap the flv and start new piece. +*/ +class SrsDvrPlan +{ +protected: + /** + * the underlayer dvr stream. + * if close, the flv is reap and closed. + * if open, new flv file is crote. + */ + SrsFileStream* fs; + SrsFlvEncoder* enc; + bool dvr_enabled; + SrsSource* _source; +public: + SrsDvrPlan(); + virtual ~SrsDvrPlan(); +public: + virtual int initialize(SrsSource* source); + virtual int on_publish(SrsRequest* req) = 0; + virtual void on_unpublish() = 0; + virtual int on_meta_data(SrsOnMetaDataPacket* metadata); + virtual int on_audio(SrsSharedPtrMessage* audio); + virtual int on_video(SrsSharedPtrMessage* video); +protected: + virtual int flv_open(std::string stream, std::string path); + virtual int flv_close(); +public: + static SrsDvrPlan* create_plan(); +}; + +/** +* default session plan: +* 1. start dvr when session start(publish). +* 2. stop dvr when session stop(unpublish). +* 3. always dvr to file: dvr_path/app/stream.flv +*/ +class SrsDvrSessionPlan : public SrsDvrPlan +{ +public: + SrsDvrSessionPlan(); + virtual ~SrsDvrSessionPlan(); +public: + virtual int on_publish(SrsRequest* req); + virtual void on_unpublish(); +}; + /** * dvr(digital video recorder) to record RTMP stream to flv file. * TODO: FIXME: add utest for it. @@ -114,13 +165,17 @@ class SrsDvr private: SrsSource* _source; private: - bool dvr_enabled; - SrsFileStream* fs; - SrsFlvEncoder* enc; + SrsDvrPlan* plan; public: SrsDvr(SrsSource* source); virtual ~SrsDvr(); public: + /** + * initialize dvr, create dvr plan. + * when system initialize(encoder publish at first time, or reload), + * initialize the dvr will reinitialize the plan, the whole dvr framework. + */ + virtual int initialize(); /** * publish stream event, * when encoder start to publish RTMP stream. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index fe75df413..7e3dd2862 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -276,7 +276,10 @@ int SrsRtmpConn::stream_service_cycle() srs_trace("set chunk_size=%d success", chunk_size); // find a source to serve. - SrsSource* source = SrsSource::find(req); + SrsSource* source = NULL; + if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) { + return ret; + } srs_assert(source != NULL); // check publish available. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index c16452bf7..bb7910867 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -413,17 +413,27 @@ int64_t SrsGopCache::get_start_time() std::map SrsSource::pool; -SrsSource* SrsSource::find(SrsRequest* req) +int SrsSource::find(SrsRequest* req, SrsSource** ppsource) { + int ret = ERROR_SUCCESS; + string stream_url = req->get_stream_url(); string vhost = req->vhost; if (pool.find(stream_url) == pool.end()) { - pool[stream_url] = new SrsSource(req); + SrsSource* source = new SrsSource(req); + if ((ret = source->initialize()) != ERROR_SUCCESS) { + srs_freep(source); + return ret; + } + + pool[stream_url] = source; srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); } - return pool[stream_url]; + *ppsource = pool[stream_url]; + + return ret; } SrsSource::SrsSource(SrsRequest* _req) @@ -492,6 +502,19 @@ SrsSource::~SrsSource() srs_freep(req); } +int SrsSource::initialize() +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_DVR + if ((ret = dvr->initialize()) != ERROR_SUCCESS) { + return ret; + } +#endif + + return ret; +} + int SrsSource::on_reload_vhost_atc(string vhost) { int ret = ERROR_SUCCESS; @@ -614,11 +637,20 @@ int SrsSource::on_reload_vhost_dvr(string vhost) } #ifdef SRS_AUTO_DVR + // cleanup dvr dvr->on_unpublish(); + + // reinitialize the dvr, update plan. + if ((ret = dvr->initialize()) != ERROR_SUCCESS) { + return ret; + } + + // start to publish by new plan. if ((ret = dvr->on_publish(req)) != ERROR_SUCCESS) { srs_error("dvr publish failed. ret=%d", ret); return ret; } + srs_trace("vhost %s dvr reload success", vhost.c_str()); #endif diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 0cf8daf46..ebbd77942 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -213,10 +213,10 @@ public: /** * find stream by vhost/app/stream. * @param req the client request. - * @return the matched source, never be NULL. + * @param ppsource the matched source, if success never be NULL. * @remark stream_url should without port and schema. */ - static SrsSource* find(SrsRequest* req); + static int find(SrsRequest* req, SrsSource** ppsource); private: // deep copy of client request. SrsRequest* req; @@ -271,6 +271,8 @@ public: */ SrsSource(SrsRequest* _req); virtual ~SrsSource(); +public: + virtual int initialize(); // interface ISrsReloadHandler public: virtual int on_reload_vhost_atc(std::string vhost);