diff --git a/README.md b/README.md index f59c2ec2f..0ccdb71c8 100755 --- a/README.md +++ b/README.md @@ -566,6 +566,7 @@ Supported operating systems and hardware: ### SRS 2.0 history +* v2.0, 2015-03-31, support on_hls for http hooks. 2.0.152. * v2.0, 2015-03-31, enhanced hls, support deviation for duration. 2.0.151. * v2.0, 2015-03-30, for [#351](https://github.com/winlinvip/simple-rtmp-server/issues/351), support config the m3u8/ts path for hls. 2.0.149. * v2.0, 2015-03-17, for [#155](https://github.com/winlinvip/simple-rtmp-server/issues/155), osx(darwin) support demo with nginx and ffmpeg. 2.0.143. diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index dddecc20a..b06472438 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -563,6 +563,7 @@ vhost with-hls.srs.com { # whether use floor for the hls_ts_file path generation. # if on, use floor(timestamp/hls_fragment) as the variable [timestamp], # and use enahanced algorithm to calc deviation for segment. + # @remark when floor on, recommend the hls_segment>=2*gop. # default: off hls_ts_floor off; # the hls entry prefix, which is base url of ts url. @@ -740,7 +741,8 @@ vhost hooks.callback.srs.com { # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", # "stream": "livestream", # "cwd": "/usr/local/srs", - # "file": "./objs/nginx/html/live/livestream.1420254068776-100.ts" + # "file": "./objs/nginx/html/live/livestream.1420254068776-100.ts", + # "seq_no": 100 # } # if valid, the hook must return HTTP code 200(Stauts OK) and response # an int value specifies the error code(0 corresponding to success): diff --git a/trunk/configure b/trunk/configure index 14108d178..3169f954e 100755 --- a/trunk/configure +++ b/trunk/configure @@ -174,7 +174,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_json" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge" "srs_app_kbps" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" - "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener") + "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call") DEFINES="" # add each modules for app for SRS_MODULE in $SRS_MODULES; do diff --git a/trunk/ide/srs_upp/srs_upp.upp b/trunk/ide/srs_upp/srs_upp.upp index c52bd3391..de7824012 100755 --- a/trunk/ide/srs_upp/srs_upp.upp +++ b/trunk/ide/srs_upp/srs_upp.upp @@ -64,6 +64,8 @@ file ../../src/protocol/srs_rtmp_utility.hpp, ../../src/protocol/srs_rtmp_utility.cpp, app readonly separator, + ../../src/app/srs_app_async_call.hpp, + ../../src/app/srs_app_async_call.cpp, ../../src/app/srs_app_bandwidth.hpp, ../../src/app/srs_app_bandwidth.cpp, ../../src/app/srs_app_conn.hpp, diff --git a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj index 4e7f97e82..aff92c0da 100644 --- a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj +++ b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj @@ -97,6 +97,7 @@ 3C689F9F1AB6AAC800C9CEEE /* sched.c in Sources */ = {isa = PBXBuildFile; fileRef = 3C689F9B1AB6AAC800C9CEEE /* sched.c */; }; 3C689FA01AB6AAC800C9CEEE /* stk.c in Sources */ = {isa = PBXBuildFile; fileRef = 3C689F9C1AB6AAC800C9CEEE /* stk.c */; }; 3C689FA11AB6AAC800C9CEEE /* sync.c in Sources */ = {isa = PBXBuildFile; fileRef = 3C689F9D1AB6AAC800C9CEEE /* sync.c */; }; + 3CD88B3F1ACA9C58000359E0 /* srs_app_async_call.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */; }; /* End PBXBuildFile section */ /* Begin PBXCopyFilesBuildPhase section */ @@ -337,6 +338,8 @@ 3C689F9B1AB6AAC800C9CEEE /* sched.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; name = sched.c; path = "../../objs/st-1.9/sched.c"; sourceTree = ""; }; 3C689F9C1AB6AAC800C9CEEE /* stk.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; name = stk.c; path = "../../objs/st-1.9/stk.c"; sourceTree = ""; }; 3C689F9D1AB6AAC800C9CEEE /* sync.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; name = sync.c; path = "../../objs/st-1.9/sync.c"; sourceTree = ""; }; + 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_async_call.cpp; path = ../../../src/app/srs_app_async_call.cpp; sourceTree = ""; }; + 3CD88B3E1ACA9C58000359E0 /* srs_app_async_call.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_async_call.hpp; path = ../../../src/app/srs_app_async_call.hpp; sourceTree = ""; }; /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ @@ -483,6 +486,8 @@ 3C12324B1AAE81CE00CE8F6C /* app */ = { isa = PBXGroup; children = ( + 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */, + 3CD88B3E1ACA9C58000359E0 /* srs_app_async_call.hpp */, 3C12324C1AAE81D900CE8F6C /* srs_app_bandwidth.cpp */, 3C12324D1AAE81D900CE8F6C /* srs_app_bandwidth.hpp */, 3C12324E1AAE81D900CE8F6C /* srs_app_config.cpp */, @@ -789,6 +794,7 @@ 3C1232951AAE81D900CE8F6C /* srs_app_config.cpp in Sources */, 3C663F0F1AB0155100286D8B /* srs_aac_raw_publish.c in Sources */, 3C689FA01AB6AAC800C9CEEE /* stk.c in Sources */, + 3CD88B3F1ACA9C58000359E0 /* srs_app_async_call.cpp in Sources */, 3C1232961AAE81D900CE8F6C /* srs_app_conn.cpp in Sources */, 3C12322A1AAE814D00CE8F6C /* srs_kernel_ts.cpp in Sources */, 3C12329E1AAE81D900CE8F6C /* srs_app_hls.cpp in Sources */, diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index 5fcd0ea63..0c7d900f9 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -253,7 +253,7 @@ class RESTDvrs(object): return json.dumps(dvrs) ''' - for SRS hook: on_dvr, on_dvr_reap_segment + for SRS hook: on_dvr on_dvr: when srs reap a dvr file, call the hook, the request in the POST data string is a object encode by json: @@ -265,17 +265,6 @@ class RESTDvrs(object): "cwd": "/usr/local/srs", "file": "./objs/nginx/html/live/livestream.1420254068776.flv" } - on_dvr_reap_segment: - when api dvr specifes the callback when reap flv segment, call the hook, - the request in the POST data string is a object encode by json: - { - "action": "on_dvr_reap_segment", - "client_id": 1985, - "vhost": "video.test.com", "app": "live", - "stream": "livestream", - "cwd": "/usr/local/srs", - "file": "./objs/nginx/html/live/livestream.1420254068776.flv" - } if valid, the hook must return HTTP code 200(Stauts OK) and response an int value specifies the error code(0 corresponding to success): 0 @@ -298,8 +287,6 @@ class RESTDvrs(object): action = json_req["action"] if action == "on_dvr": code = self.__on_dvr(json_req) - if action == "on_dvr_reap_segment": - code = self.__on_dvr_reap_segment(json_req) else: trace("invalid request action: %s"%(json_req["action"])) code = Error.request_invalid_action @@ -321,15 +308,72 @@ class RESTDvrs(object): return code - def __on_dvr_reap_segment(self, req): +''' +handle the hls requests: hls stream. +''' +class RESTHls(object): + exposed = True + + def GET(self): + enable_crossdomain() + + hls = {} + return json.dumps(hls) + + ''' + for SRS hook: on_hls + on_hls: + when srs reap a dvr file, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_dvr", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream", + "cwd": "/usr/local/srs", + "file": "./objs/nginx/html/live/livestream.1420254068776-100.ts", + "seq_no": 100 + } + if valid, the hook must return HTTP code 200(Stauts OK) and response + an int value specifies the error code(0 corresponding to success): + 0 + ''' + def POST(self): + enable_crossdomain() + + # return the error code in str code = Error.success - trace("srs %s: client id=%s, vhost=%s, app=%s, stream=%s, cwd=%s, file=%s"%( - req["action"], req["client_id"], req["vhost"], req["app"], req["stream"], - req["cwd"], req["file"] + req = cherrypy.request.body.read() + trace("post to hls, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return str(code) + + action = json_req["action"] + if action == "on_hls": + code = self.__on_hls(json_req) + else: + trace("invalid request action: %s"%(json_req["action"])) + code = Error.request_invalid_action + + return str(code) + + def OPTIONS(self, *args, **kwargs): + enable_crossdomain() + + def __on_hls(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, cwd=%s, file=%s, seq_no=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], + req["cwd"], req["file"], req["seq_no"] )) - # TODO: process the on_dvr event + # TODO: process the on_hls event return code @@ -1133,6 +1177,7 @@ class V1(object): self.streams = RESTStreams() self.sessions = RESTSessions() self.dvrs = RESTDvrs() + self.hls = RESTHls() self.chats = RESTChats() self.servers = RESTServers() self.nodes = RESTNodes() diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp new file mode 100644 index 000000000..3bfac6391 --- /dev/null +++ b/trunk/src/app/srs_app_async_call.cpp @@ -0,0 +1,98 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +using namespace std; + +#include +#include + +// the sleep interval for http async callback. +#define SRS_AUTO_ASYNC_CALLBACL_SLEEP_US 300000 + +ISrsDvrAsyncCall::ISrsDvrAsyncCall() +{ +} + +ISrsDvrAsyncCall::~ISrsDvrAsyncCall() +{ +} + +SrsDvrAsyncCallThread::SrsDvrAsyncCallThread() +{ + pthread = new SrsThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US, true); +} + +SrsDvrAsyncCallThread::~SrsDvrAsyncCallThread() +{ + stop(); + srs_freep(pthread); + + std::vector::iterator it; + for (it = callbacks.begin(); it != callbacks.end(); ++it) { + ISrsDvrAsyncCall* call = *it; + srs_freep(call); + } + callbacks.clear(); +} + +int SrsDvrAsyncCallThread::call(ISrsDvrAsyncCall* c) +{ + int ret = ERROR_SUCCESS; + + callbacks.push_back(c); + + return ret; +} + +int SrsDvrAsyncCallThread::start() +{ + return pthread->start(); +} + +void SrsDvrAsyncCallThread::stop() +{ + pthread->stop(); +} + +int SrsDvrAsyncCallThread::cycle() +{ + int ret = ERROR_SUCCESS; + + std::vector copies = callbacks; + callbacks.clear(); + + std::vector::iterator it; + for (it = copies.begin(); it != copies.end(); ++it) { + ISrsDvrAsyncCall* call = *it; + if ((ret = call->call()) != ERROR_SUCCESS) { + srs_warn("dvr: ignore callback %s, ret=%d", call->to_string().c_str(), ret); + } + srs_freep(call); + } + + return ret; +} + + diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp new file mode 100644 index 000000000..3379d284e --- /dev/null +++ b/trunk/src/app/srs_app_async_call.hpp @@ -0,0 +1,75 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_APP_ASYNC_CALL_HPP +#define SRS_APP_ASYNC_CALL_HPP + +/* +#include +*/ +#include + +#include +#include + +#include + +/** + * the async call for http hooks, + * for the http hooks will switch st-thread, + * so we must use isolate thread to avoid the thread corrupt, + * for example, when dvr call http hooks, the video receive thread got + * a video and pass it to the dvr again. + * futhurmore, the aync call never block the main worker thread. + */ +class ISrsDvrAsyncCall +{ +public: + ISrsDvrAsyncCall(); + virtual ~ISrsDvrAsyncCall(); +public: + virtual int call() = 0; + virtual std::string to_string() = 0; +}; + +/** +* the async callback for dvr. +*/ +class SrsDvrAsyncCallThread : public ISrsThreadHandler +{ +private: + SrsThread* pthread; + std::vector callbacks; +public: + SrsDvrAsyncCallThread(); + virtual ~SrsDvrAsyncCallThread(); +public: + virtual int call(ISrsDvrAsyncCall* c); +public: + virtual int start(); + virtual void stop(); + virtual int cycle(); +}; + +#endif + diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 34ca326e2..3c7b8c276 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1494,7 +1494,7 @@ int SrsConfig::check_config() string m = conf->at(j)->name.c_str(); if (m != "enabled" && m != "on_connect" && m != "on_close" && m != "on_publish" && m != "on_unpublish" && m != "on_play" && m != "on_stop" - && m != "on_dvr" + && m != "on_dvr" && m != "on_hls" ) { ret = ERROR_SYSTEM_CONFIG_INVALID; srs_error("unsupported vhost http_hooks directive %s, ret=%d", m.c_str(), ret); @@ -2403,14 +2403,25 @@ SrsConfDirective* SrsConfig::get_vhost_on_stop(string vhost) SrsConfDirective* SrsConfig::get_vhost_on_dvr(string vhost) { SrsConfDirective* conf = get_vhost_http_hooks(vhost); - - if (!conf) { + + if (!conf) { return NULL; } return conf->get("on_dvr"); } +SrsConfDirective* SrsConfig::get_vhost_on_hls(string vhost) +{ + SrsConfDirective* conf = get_vhost_http_hooks(vhost); + + if (!conf) { + return NULL; + } + + return conf->get("on_hls"); +} + bool SrsConfig::get_bw_check_enabled(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 0ab5717e0..a1b11a848 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -627,10 +627,15 @@ public: */ virtual SrsConfDirective* get_vhost_on_stop(std::string vhost); /** - * get the on_dvr callbacks of vhost. - * @return the on_dvr callback directive, the args is the url to callback. - */ + * get the on_dvr callbacks of vhost. + * @return the on_dvr callback directive, the args is the url to callback. + */ virtual SrsConfDirective* get_vhost_on_dvr(std::string vhost); + /** + * get the on_hls callbacks of vhost. + * @return the on_hls callback directive, the args is the url to callback. + */ + virtual SrsConfDirective* get_vhost_on_hls(std::string vhost); // bwct(bandwidth check tool) section public: /** diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index ee0411f80..576ea4dff 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -46,12 +46,6 @@ using namespace std; // update the flv duration and filesize every this interval in ms. #define SRS_DVR_UPDATE_DURATION_INTERVAL 60000 -// the sleep interval for http async callback. -#define SRS_AUTO_ASYNC_CALLBACL_SLEEP_US 300000 - -// the use raction for dvr rpc. -#define SRS_DVR_USER_ACTION_REAP_SEGMENT "reap_segment" - SrsFlvSegment::SrsFlvSegment(SrsDvrPlan* p) { req = NULL; @@ -502,14 +496,6 @@ int SrsFlvSegment::on_reload_vhost_dvr(std::string /*vhost*/) return ret; } -ISrsDvrAsyncCall::ISrsDvrAsyncCall() -{ -} - -ISrsDvrAsyncCall::~ISrsDvrAsyncCall() -{ -} - SrsDvrAsyncCallOnDvr::SrsDvrAsyncCallOnDvr(SrsRequest* r, string p) { req = r; @@ -534,13 +520,10 @@ int SrsDvrAsyncCallOnDvr::call() return ret; } - int connection_id = _srs_context->get_id(); - std::string ip = req->ip; - std::string cwd = _srs_config->cwd(); std::string file = path; for (int i = 0; i < (int)on_dvr->args.size(); i++) { std::string url = on_dvr->args.at(i); - if ((ret = SrsHttpHooks::on_dvr(url, connection_id, ip, req, cwd, file)) != ERROR_SUCCESS) { + if ((ret = SrsHttpHooks::on_dvr(url, req, file)) != ERROR_SUCCESS) { srs_error("hook client on_dvr failed. url=%s, ret=%d", url.c_str(), ret); return ret; } @@ -558,62 +541,6 @@ string SrsDvrAsyncCallOnDvr::to_string() return ss.str(); } -SrsDvrAsyncCallThread::SrsDvrAsyncCallThread() -{ - pthread = new SrsThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US, true); -} - -SrsDvrAsyncCallThread::~SrsDvrAsyncCallThread() -{ - stop(); - srs_freep(pthread); - - std::vector::iterator it; - for (it = callbacks.begin(); it != callbacks.end(); ++it) { - ISrsDvrAsyncCall* call = *it; - srs_freep(call); - } - callbacks.clear(); -} - -int SrsDvrAsyncCallThread::call(ISrsDvrAsyncCall* c) -{ - int ret = ERROR_SUCCESS; - - callbacks.push_back(c); - - return ret; -} - -int SrsDvrAsyncCallThread::start() -{ - return pthread->start(); -} - -void SrsDvrAsyncCallThread::stop() -{ - pthread->stop(); -} - -int SrsDvrAsyncCallThread::cycle() -{ - int ret = ERROR_SUCCESS; - - std::vector copies = callbacks; - callbacks.clear(); - - std::vector::iterator it; - for (it = copies.begin(); it != copies.end(); ++it) { - ISrsDvrAsyncCall* call = *it; - if ((ret = call->call()) != ERROR_SUCCESS) { - srs_warn("dvr: ignore callback %s, ret=%d", call->to_string().c_str(), ret); - } - srs_freep(call); - } - - return ret; -} - SrsDvrPlan::SrsDvrPlan() { req = NULL; diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index a2065c8ed..0c471199c 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -49,7 +49,7 @@ class SrsThread; #include #include -#include +#include /** * a piece of flv segment. @@ -178,15 +178,6 @@ public: /** * the dvr async call. */ -class ISrsDvrAsyncCall -{ -public: - ISrsDvrAsyncCall(); - virtual ~ISrsDvrAsyncCall(); -public: - virtual int call() = 0; - virtual std::string to_string() = 0; -}; class SrsDvrAsyncCallOnDvr : public ISrsDvrAsyncCall { private: @@ -200,25 +191,6 @@ public: virtual std::string to_string(); }; -/** -* the async callback for dvr. -*/ -class SrsDvrAsyncCallThread : public ISrsThreadHandler -{ -private: - SrsThread* pthread; - std::vector callbacks; -public: - SrsDvrAsyncCallThread(); - virtual ~SrsDvrAsyncCallThread(); -public: - virtual int call(ISrsDvrAsyncCall* c); -public: - virtual int start(); - virtual void stop(); - virtual int cycle(); -}; - /** * the plan for dvr. * use to control the following dvr params: diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 6f3a76c71..3570cd9b0 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -54,6 +54,7 @@ using namespace std; #include #include #include +#include // drop the segment when duration of ts too small. #define SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS 100 @@ -169,6 +170,53 @@ void SrsHlsSegment::update_duration(int64_t current_frame_dts) return; } +SrsDvrAsyncCallOnHls::SrsDvrAsyncCallOnHls(SrsRequest* r, string p, int s) +{ + req = r; + path = p; + seq_no = s; +} + +SrsDvrAsyncCallOnHls::~SrsDvrAsyncCallOnHls() +{ +} + +int SrsDvrAsyncCallOnHls::call() +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HTTP_CALLBACK + // http callback for on_hls in config. + if (_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { + // HTTP: on_hls + SrsConfDirective* on_hls = _srs_config->get_vhost_on_hls(req->vhost); + if (!on_hls) { + srs_info("ignore the empty http callback: on_hls"); + return ret; + } + + std::string file = path; + int sn = seq_no; + for (int i = 0; i < (int)on_hls->args.size(); i++) { + std::string url = on_hls->args.at(i); + if ((ret = SrsHttpHooks::on_hls(url, req, file, sn)) != ERROR_SUCCESS) { + srs_error("hook client on_hls failed. url=%s, ret=%d", url.c_str(), ret); + return ret; + } + } + } +#endif + + return ret; +} + +string SrsDvrAsyncCallOnHls::to_string() +{ + std::stringstream ss; + ss << "vhost=" << req->vhost << ", file=" << path; + return ss.str(); +} + SrsHlsMuxer::SrsHlsMuxer() { req = NULL; @@ -177,6 +225,7 @@ SrsHlsMuxer::SrsHlsMuxer() hls_aof_ratio = 1.0; hls_fragment_deviation = 0; previous_floor_ts = 0; + accept_floor_ts = 0; hls_ts_floor = false; target_duration = 0; _sequence_no = 0; @@ -184,6 +233,7 @@ SrsHlsMuxer::SrsHlsMuxer() acodec = SrsCodecAudioReserved1; should_write_cache = false; should_write_file = true; + async = new SrsDvrAsyncCallThread(); } SrsHlsMuxer::~SrsHlsMuxer() @@ -197,6 +247,7 @@ SrsHlsMuxer::~SrsHlsMuxer() srs_freep(current); srs_freep(req); + srs_freep(async); } int SrsHlsMuxer::initialize(ISrsHlsHandler* h) @@ -204,6 +255,10 @@ int SrsHlsMuxer::initialize(ISrsHlsHandler* h) int ret = ERROR_SUCCESS; handler = h; + + if ((ret = async->start()) != ERROR_SUCCESS) { + return ret; + } return ret; } @@ -244,6 +299,7 @@ int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix, hls_aof_ratio = aof_ratio; hls_ts_floor = ts_floor; previous_floor_ts = 0; + accept_floor_ts = 0; hls_window = window; // for the first time, we set to -N% of fragment, // that is, the first piece always smaller. @@ -331,9 +387,17 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts) std::string ts_file = hls_ts_file; ts_file = srs_path_build_stream(ts_file, req->vhost, req->app, req->stream); if (hls_ts_floor) { + // accept the floor ts for the first piece. int64_t floor_ts = (int64_t)(srs_get_system_time_ms() / (1000 * hls_fragment)); + if (!accept_floor_ts) { + accept_floor_ts = floor_ts - 1; + } else { + accept_floor_ts++; + } + + // we always ensure the piece is increase one by one. std::stringstream ts_floor; - ts_floor << floor_ts; + ts_floor << accept_floor_ts; ts_file = srs_string_replace(ts_file, "[timestamp]", ts_floor.str()); // dup/jmp detect for ts in floor mode. @@ -513,6 +577,11 @@ int SrsHlsMuxer::segment_close(string log_desc) if (hls_ts_floor) { hls_fragment_deviation += (double)(hls_fragment - current->duration); } + + // use async to call the http hooks, for it will cause thread switch. + if ((ret = async->call(new SrsDvrAsyncCallOnHls(req, current->full_path, current->sequence_no))) != ERROR_SUCCESS) { + return ret; + } srs_info("%s reap ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64", deviation=%.2f", log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration, diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index 05f8ce3ee..c28b4e0bd 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -39,6 +39,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include class SrsSharedPtrMessage; class SrsCodecSample; @@ -154,6 +155,23 @@ public: virtual void update_duration(int64_t current_frame_dts); }; +/** + * the dvr async call. + */ +class SrsDvrAsyncCallOnHls : public ISrsDvrAsyncCall +{ +private: + std::string path; + int seq_no; + SrsRequest* req; +public: + SrsDvrAsyncCallOnHls(SrsRequest* r, std::string p, int s); + virtual ~SrsDvrAsyncCallOnHls(); +public: + virtual int call(); + virtual std::string to_string(); +}; + /** * muxer the HLS stream(m3u8 and ts files). * generally, the m3u8 muxer only provides methods to open/close segments, @@ -174,6 +192,7 @@ private: double hls_aof_ratio; double hls_fragment; double hls_window; + SrsDvrAsyncCallThread* async; private: // whether use floor algorithm for timestamp. bool hls_ts_floor; @@ -182,6 +201,7 @@ private: double hls_fragment_deviation; // the previous reap floor timestamp, // used to detect the dup or jmp or ts. + int64_t accept_floor_ts; int64_t previous_floor_ts; private: int _sequence_no; diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index dab2d6a86..618de8ebb 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -36,6 +36,7 @@ using namespace std; #include #include #include +#include #define SRS_HTTP_RESPONSE_OK SRS_XSTR(ERROR_SUCCESS) @@ -50,15 +51,17 @@ SrsHttpHooks::~SrsHttpHooks() { } -int SrsHttpHooks::on_connect(string url, int client_id, string ip, SrsRequest* req) +int SrsHttpHooks::on_connect(string url, SrsRequest* req) { int ret = ERROR_SUCCESS; + int client_id = _srs_context->get_id(); + std::stringstream ss; ss << SRS_JOBJECT_START << SRS_JFIELD_STR("action", "on_connect") << SRS_JFIELD_CONT << SRS_JFIELD_ORG("client_id", client_id) << SRS_JFIELD_CONT - << SRS_JFIELD_STR("ip", ip) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT << SRS_JFIELD_STR("vhost", req->vhost) << SRS_JFIELD_CONT << SRS_JFIELD_STR("app", req->app) << SRS_JFIELD_CONT << SRS_JFIELD_STR("tcUrl", req->tcUrl) << SRS_JFIELD_CONT @@ -82,15 +85,17 @@ int SrsHttpHooks::on_connect(string url, int client_id, string ip, SrsRequest* r return ret; } -void SrsHttpHooks::on_close(string url, int client_id, string ip, SrsRequest* req, int64_t send_bytes, int64_t recv_bytes) +void SrsHttpHooks::on_close(string url, SrsRequest* req, int64_t send_bytes, int64_t recv_bytes) { int ret = ERROR_SUCCESS; + int client_id = _srs_context->get_id(); + std::stringstream ss; ss << SRS_JOBJECT_START << SRS_JFIELD_STR("action", "on_close") << SRS_JFIELD_CONT << SRS_JFIELD_ORG("client_id", client_id) << SRS_JFIELD_CONT - << SRS_JFIELD_STR("ip", ip) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT << SRS_JFIELD_STR("vhost", req->vhost) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("send_bytes", send_bytes) << SRS_JFIELD_CONT << SRS_JFIELD_ORG("recv_bytes", recv_bytes) << SRS_JFIELD_CONT @@ -114,15 +119,17 @@ void SrsHttpHooks::on_close(string url, int client_id, string ip, SrsRequest* re return; } -int SrsHttpHooks::on_publish(string url, int client_id, string ip, SrsRequest* req) +int SrsHttpHooks::on_publish(string url, SrsRequest* req) { int ret = ERROR_SUCCESS; + int client_id = _srs_context->get_id(); + std::stringstream ss; ss << SRS_JOBJECT_START << SRS_JFIELD_STR("action", "on_publish") << SRS_JFIELD_CONT << SRS_JFIELD_ORG("client_id", client_id) << SRS_JFIELD_CONT - << SRS_JFIELD_STR("ip", ip) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT << SRS_JFIELD_STR("vhost", req->vhost) << SRS_JFIELD_CONT << SRS_JFIELD_STR("app", req->app) << SRS_JFIELD_CONT << SRS_JFIELD_STR("stream", req->stream) @@ -145,15 +152,17 @@ int SrsHttpHooks::on_publish(string url, int client_id, string ip, SrsRequest* r return ret; } -void SrsHttpHooks::on_unpublish(string url, int client_id, string ip, SrsRequest* req) +void SrsHttpHooks::on_unpublish(string url, SrsRequest* req) { int ret = ERROR_SUCCESS; + int client_id = _srs_context->get_id(); + std::stringstream ss; ss << SRS_JOBJECT_START << SRS_JFIELD_STR("action", "on_unpublish") << SRS_JFIELD_CONT << SRS_JFIELD_ORG("client_id", client_id) << SRS_JFIELD_CONT - << SRS_JFIELD_STR("ip", ip) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT << SRS_JFIELD_STR("vhost", req->vhost) << SRS_JFIELD_CONT << SRS_JFIELD_STR("app", req->app) << SRS_JFIELD_CONT << SRS_JFIELD_STR("stream", req->stream) @@ -176,15 +185,17 @@ void SrsHttpHooks::on_unpublish(string url, int client_id, string ip, SrsRequest return; } -int SrsHttpHooks::on_play(string url, int client_id, string ip, SrsRequest* req) +int SrsHttpHooks::on_play(string url, SrsRequest* req) { int ret = ERROR_SUCCESS; + int client_id = _srs_context->get_id(); + std::stringstream ss; ss << SRS_JOBJECT_START << SRS_JFIELD_STR("action", "on_play") << SRS_JFIELD_CONT << SRS_JFIELD_ORG("client_id", client_id) << SRS_JFIELD_CONT - << SRS_JFIELD_STR("ip", ip) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT << SRS_JFIELD_STR("vhost", req->vhost) << SRS_JFIELD_CONT << SRS_JFIELD_STR("app", req->app) << SRS_JFIELD_CONT << SRS_JFIELD_STR("stream", req->stream) @@ -207,15 +218,17 @@ int SrsHttpHooks::on_play(string url, int client_id, string ip, SrsRequest* req) return ret; } -void SrsHttpHooks::on_stop(string url, int client_id, string ip, SrsRequest* req) +void SrsHttpHooks::on_stop(string url, SrsRequest* req) { int ret = ERROR_SUCCESS; + int client_id = _srs_context->get_id(); + std::stringstream ss; ss << SRS_JOBJECT_START << SRS_JFIELD_STR("action", "on_stop") << SRS_JFIELD_CONT << SRS_JFIELD_ORG("client_id", client_id) << SRS_JFIELD_CONT - << SRS_JFIELD_STR("ip", ip) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT << SRS_JFIELD_STR("vhost", req->vhost) << SRS_JFIELD_CONT << SRS_JFIELD_STR("app", req->app) << SRS_JFIELD_CONT << SRS_JFIELD_STR("stream", req->stream) @@ -238,15 +251,18 @@ void SrsHttpHooks::on_stop(string url, int client_id, string ip, SrsRequest* req return; } -int SrsHttpHooks::on_dvr(string url, int client_id, string ip, SrsRequest* req, string cwd, string file) +int SrsHttpHooks::on_dvr(string url, SrsRequest* req, string file) { int ret = ERROR_SUCCESS; + int client_id = _srs_context->get_id(); + std::string cwd = _srs_config->cwd(); + std::stringstream ss; ss << SRS_JOBJECT_START << SRS_JFIELD_STR("action", "on_dvr") << SRS_JFIELD_CONT << SRS_JFIELD_ORG("client_id", client_id) << SRS_JFIELD_CONT - << SRS_JFIELD_STR("ip", ip) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT << SRS_JFIELD_STR("vhost", req->vhost) << SRS_JFIELD_CONT << SRS_JFIELD_STR("app", req->app) << SRS_JFIELD_CONT << SRS_JFIELD_STR("stream", req->stream) << SRS_JFIELD_CONT @@ -271,32 +287,37 @@ int SrsHttpHooks::on_dvr(string url, int client_id, string ip, SrsRequest* req, return ret; } -int SrsHttpHooks::on_dvr_reap_segment(string url, int client_id, SrsRequest* req, string cwd, string file) +int SrsHttpHooks::on_hls(string url, SrsRequest* req, string file, int sn) { int ret = ERROR_SUCCESS; + int client_id = _srs_context->get_id(); + std::string cwd = _srs_config->cwd(); + std::stringstream ss; ss << SRS_JOBJECT_START - << SRS_JFIELD_STR("action", "on_dvr_reap_segment") << SRS_JFIELD_CONT + << SRS_JFIELD_STR("action", "on_hls") << SRS_JFIELD_CONT << SRS_JFIELD_ORG("client_id", client_id) << SRS_JFIELD_CONT + << SRS_JFIELD_STR("ip", req->ip) << SRS_JFIELD_CONT << SRS_JFIELD_STR("vhost", req->vhost) << SRS_JFIELD_CONT << SRS_JFIELD_STR("app", req->app) << SRS_JFIELD_CONT << SRS_JFIELD_STR("stream", req->stream) << SRS_JFIELD_CONT << SRS_JFIELD_STR("cwd", cwd) << SRS_JFIELD_CONT - << SRS_JFIELD_STR("file", file) + << SRS_JFIELD_STR("file", file) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("seq_no", sn) << SRS_JOBJECT_END; std::string data = ss.str(); std::string res; int status_code; if ((ret = do_post(url, data, status_code, res)) != ERROR_SUCCESS) { - srs_error("http post on_dvr_reap_segment uri failed, ignored. " + srs_error("http post on_hls uri failed, ignored. " "client_id=%d, url=%s, request=%s, response=%s, code=%d, ret=%d", client_id, url.c_str(), data.c_str(), res.c_str(), status_code, ret); return ret; } - srs_trace("http hook on_dvr_reap_segment success. " + srs_trace("http hook on_hls success. " "client_id=%d, url=%s, request=%s, response=%s, ret=%d", client_id, url.c_str(), data.c_str(), res.c_str(), ret); diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index 1d34dc3fb..de64bef29 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -55,64 +55,55 @@ public: public: /** * on_connect hook, when client connect to srs. - * @param client_id the id of client on server. * @param url the api server url, to valid the client. * ignore if empty. */ - static int on_connect(std::string url, int client_id, std::string ip, SrsRequest* req); + static int on_connect(std::string url, SrsRequest* req); /** * on_close hook, when client disconnect to srs, where client is valid by on_connect. - * @param client_id the id of client on server. * @param url the api server url, to process the event. * ignore if empty. */ - static void on_close(std::string url, int client_id, std::string ip, SrsRequest* req, int64_t send_bytes, int64_t recv_bytes); + static void on_close(std::string url, SrsRequest* req, int64_t send_bytes, int64_t recv_bytes); /** * on_publish hook, when client(encoder) start to publish stream - * @param client_id the id of client on server. * @param url the api server url, to valid the client. * ignore if empty. */ - static int on_publish(std::string url, int client_id, std::string ip, SrsRequest* req); + static int on_publish(std::string url, SrsRequest* req); /** * on_unpublish hook, when client(encoder) stop publish stream. - * @param client_id the id of client on server. * @param url the api server url, to process the event. * ignore if empty. */ - static void on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req); + static void on_unpublish(std::string url, SrsRequest* req); /** * on_play hook, when client start to play stream. - * @param client_id the id of client on server. * @param url the api server url, to valid the client. * ignore if empty. */ - static int on_play(std::string url, int client_id, std::string ip, SrsRequest* req); + static int on_play(std::string url, SrsRequest* req); /** * on_stop hook, when client stop to play the stream. - * @param client_id the id of client on server. * @param url the api server url, to process the event. * ignore if empty. */ - static void on_stop(std::string url, int client_id, std::string ip, SrsRequest* req); + static void on_stop(std::string url, SrsRequest* req); /** * on_dvr hook, when reap a dvr file. - * @param client_id the id of client on server. * @param url the api server url, to process the event. * ignore if empty. - * @param cwd the current work directory, used to resolve the reltive file path. * @param file the file path, can be relative or absolute path. */ - static int on_dvr(std::string url, int client_id, std::string ip, SrsRequest* req, std::string cwd, std::string file); + static int on_dvr(std::string url, SrsRequest* req, std::string file); /** - * when dvr reap segment, callback. - * @param client_id the id of client on server. + * when hls reap segment, callback. * @param url the api server url, to process the event. * ignore if empty. - * @param cwd the current work directory, used to resolve the reltive file path. - * @param file the file path, can be relative or absolute path. + * @param file the ts file path, can be relative or absolute path. + * @param sn the seq_no, the sequence number of ts in hls/m3u8. */ - static int on_dvr_reap_segment(std::string url, int client_id, SrsRequest* req, std::string cwd, std::string file); + static int on_hls(std::string url, SrsRequest* req, std::string file, int sn); private: static int do_post(std::string url, std::string req, int& code, std::string& res); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 7d4949b36..62feb76cd 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1265,10 +1265,9 @@ int SrsRtmpConn::http_hooks_on_connect() return ret; } - int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_connect->args.size(); i++) { std::string url = on_connect->args.at(i); - if ((ret = SrsHttpHooks::on_connect(url, connection_id, ip, req)) != ERROR_SUCCESS) { + if ((ret = SrsHttpHooks::on_connect(url, req)) != ERROR_SUCCESS) { srs_error("hook client on_connect failed. url=%s, ret=%d", url.c_str(), ret); return ret; } @@ -1291,10 +1290,9 @@ void SrsRtmpConn::http_hooks_on_close() return; } - int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_close->args.size(); i++) { std::string url = on_close->args.at(i); - SrsHttpHooks::on_close(url, connection_id, ip, req, kbps->get_send_bytes(), kbps->get_recv_bytes()); + SrsHttpHooks::on_close(url, req, kbps->get_send_bytes(), kbps->get_recv_bytes()); } } #endif @@ -1313,10 +1311,9 @@ int SrsRtmpConn::http_hooks_on_publish() return ret; } - int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_publish->args.size(); i++) { std::string url = on_publish->args.at(i); - if ((ret = SrsHttpHooks::on_publish(url, connection_id, ip, req)) != ERROR_SUCCESS) { + if ((ret = SrsHttpHooks::on_publish(url, req)) != ERROR_SUCCESS) { srs_error("hook client on_publish failed. url=%s, ret=%d", url.c_str(), ret); return ret; } @@ -1339,10 +1336,9 @@ void SrsRtmpConn::http_hooks_on_unpublish() return; } - int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_unpublish->args.size(); i++) { std::string url = on_unpublish->args.at(i); - SrsHttpHooks::on_unpublish(url, connection_id, ip, req); + SrsHttpHooks::on_unpublish(url, req); } } #endif @@ -1361,10 +1357,9 @@ int SrsRtmpConn::http_hooks_on_play() return ret; } - int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_play->args.size(); i++) { std::string url = on_play->args.at(i); - if ((ret = SrsHttpHooks::on_play(url, connection_id, ip, req)) != ERROR_SUCCESS) { + if ((ret = SrsHttpHooks::on_play(url, req)) != ERROR_SUCCESS) { srs_error("hook client on_play failed. url=%s, ret=%d", url.c_str(), ret); return ret; } @@ -1387,10 +1382,9 @@ void SrsRtmpConn::http_hooks_on_stop() return; } - int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_stop->args.size(); i++) { std::string url = on_stop->args.at(i); - SrsHttpHooks::on_stop(url, connection_id, ip, req); + SrsHttpHooks::on_stop(url, req); } } #endif