From f8f6e438ccf0df891d67f1dd889fdd474b943212 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 14 Jun 2015 14:06:39 +0800 Subject: [PATCH 1/2] extract http static and http stream from http conn. --- trunk/configure | 4 +- trunk/ide/srs_upp/srs_upp.upp | 4 + trunk/ide/srs_vs2010/srs.vcxproj | 4 + trunk/ide/srs_vs2010/srs.vcxproj.filters | 6 + .../srs_xcode.xcodeproj/project.pbxproj | 12 + trunk/src/app/srs_app_http_conn.cpp | 1474 +---------------- trunk/src/app/srs_app_http_conn.hpp | 385 +---- trunk/src/app/srs_app_http_static.cpp | 283 ++++ trunk/src/app/srs_app_http_static.hpp | 76 + trunk/src/app/srs_app_http_stream.cpp | 1220 ++++++++++++++ trunk/src/app/srs_app_http_stream.hpp | 365 ++++ trunk/src/app/srs_app_server.cpp | 31 +- trunk/src/app/srs_app_server.hpp | 3 +- trunk/src/protocol/srs_http_stack.cpp | 35 +- trunk/src/protocol/srs_http_stack.hpp | 21 +- 15 files changed, 2156 insertions(+), 1767 deletions(-) create mode 100644 trunk/src/app/srs_app_http_static.cpp create mode 100644 trunk/src/app/srs_app_http_static.hpp create mode 100644 trunk/src/app/srs_app_http_stream.cpp create mode 100644 trunk/src/app/srs_app_http_stream.hpp diff --git a/trunk/configure b/trunk/configure index 53fa3152c..a8c6559f8 100755 --- a/trunk/configure +++ b/trunk/configure @@ -169,11 +169,11 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL") ModuleLibIncs=(${LibSTRoot} ${LibHttpParserRoot} ${SRS_OBJS_DIR}) MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_source" - "srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder" + "srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder" "srs_app_http_stream" "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config" "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" "srs_app_http_conn" "srs_app_http_hooks" "srs_app_json" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge" - "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" + "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" "srs_app_caster_flv") diff --git a/trunk/ide/srs_upp/srs_upp.upp b/trunk/ide/srs_upp/srs_upp.upp index e06c05bb9..68d0a115b 100755 --- a/trunk/ide/srs_upp/srs_upp.upp +++ b/trunk/ide/srs_upp/srs_upp.upp @@ -103,6 +103,10 @@ file ../../src/app/srs_app_http_conn.cpp, ../../src/app/srs_app_http_hooks.hpp, ../../src/app/srs_app_http_hooks.cpp, + ../../src/app/srs_app_http_stream.hpp, + ../../src/app/srs_app_http_stream.cpp, + ../../src/app/srs_app_http_static.hpp, + ../../src/app/srs_app_http_static.cpp, ../../src/app/srs_app_ingest.hpp, ../../src/app/srs_app_ingest.cpp, ../../src/app/srs_app_json.hpp, diff --git a/trunk/ide/srs_vs2010/srs.vcxproj b/trunk/ide/srs_vs2010/srs.vcxproj index 98b514de1..0959cdcc2 100755 --- a/trunk/ide/srs_vs2010/srs.vcxproj +++ b/trunk/ide/srs_vs2010/srs.vcxproj @@ -77,6 +77,8 @@ + + @@ -157,6 +159,8 @@ + + diff --git a/trunk/ide/srs_vs2010/srs.vcxproj.filters b/trunk/ide/srs_vs2010/srs.vcxproj.filters index 8cff84e3f..02de8979f 100755 --- a/trunk/ide/srs_vs2010/srs.vcxproj.filters +++ b/trunk/ide/srs_vs2010/srs.vcxproj.filters @@ -64,6 +64,12 @@ srs + + srs + + + srs + srs diff --git a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj index 1307b19e5..bba0489a7 100644 --- a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj +++ b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj @@ -7,6 +7,8 @@ objects = { /* Begin PBXBuildFile section */ + 3C036B551B2D0AC10078E2E0 /* srs_app_http_static.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C036B511B2D0AC10078E2E0 /* srs_app_http_static.cpp */; }; + 3C036B561B2D0AC10078E2E0 /* srs_app_http_stream.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C036B531B2D0AC10078E2E0 /* srs_app_http_stream.cpp */; }; 3C068D6A1B10149F00AA722C /* srs_protocol_kbps.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C068D681B10149F00AA722C /* srs_protocol_kbps.cpp */; }; 3C068D6D1B10175500AA722C /* srs_protocol_buffer.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C068D6B1B10175500AA722C /* srs_protocol_buffer.cpp */; }; 3C0E1B8D1B0F5ADF003ADEF7 /* srs_http_stack.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C0E1B8B1B0F5ADF003ADEF7 /* srs_http_stack.cpp */; }; @@ -121,6 +123,10 @@ /* End PBXCopyFilesBuildPhase section */ /* Begin PBXFileReference section */ + 3C036B511B2D0AC10078E2E0 /* srs_app_http_static.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_http_static.cpp; path = ../../../src/app/srs_app_http_static.cpp; sourceTree = ""; }; + 3C036B521B2D0AC10078E2E0 /* srs_app_http_static.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_http_static.hpp; path = ../../../src/app/srs_app_http_static.hpp; sourceTree = ""; }; + 3C036B531B2D0AC10078E2E0 /* srs_app_http_stream.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_http_stream.cpp; path = ../../../src/app/srs_app_http_stream.cpp; sourceTree = ""; }; + 3C036B541B2D0AC10078E2E0 /* srs_app_http_stream.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_http_stream.hpp; path = ../../../src/app/srs_app_http_stream.hpp; sourceTree = ""; }; 3C068D681B10149F00AA722C /* srs_protocol_kbps.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_protocol_kbps.cpp; path = ../../../src/protocol/srs_protocol_kbps.cpp; sourceTree = ""; }; 3C068D691B10149F00AA722C /* srs_protocol_kbps.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_protocol_kbps.hpp; path = ../../../src/protocol/srs_protocol_kbps.hpp; sourceTree = ""; }; 3C068D6B1B10175500AA722C /* srs_protocol_buffer.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_protocol_buffer.cpp; path = ../../../src/protocol/srs_protocol_buffer.cpp; sourceTree = ""; }; @@ -551,6 +557,10 @@ 3C1232671AAE81D900CE8F6C /* srs_app_http_conn.hpp */, 3C1232681AAE81D900CE8F6C /* srs_app_http_hooks.cpp */, 3C1232691AAE81D900CE8F6C /* srs_app_http_hooks.hpp */, + 3C036B511B2D0AC10078E2E0 /* srs_app_http_static.cpp */, + 3C036B521B2D0AC10078E2E0 /* srs_app_http_static.hpp */, + 3C036B531B2D0AC10078E2E0 /* srs_app_http_stream.cpp */, + 3C036B541B2D0AC10078E2E0 /* srs_app_http_stream.hpp */, 3C12326C1AAE81D900CE8F6C /* srs_app_ingest.cpp */, 3C12326D1AAE81D900CE8F6C /* srs_app_ingest.hpp */, 3C12326E1AAE81D900CE8F6C /* srs_app_json.cpp */, @@ -907,10 +917,12 @@ 3C1232461AAE81A400CE8F6C /* srs_rtmp_msg_array.cpp in Sources */, 3C1232A71AAE81D900CE8F6C /* srs_app_listener.cpp in Sources */, 3C1232261AAE814D00CE8F6C /* srs_kernel_flv.cpp in Sources */, + 3C036B551B2D0AC10078E2E0 /* srs_app_http_static.cpp in Sources */, 3C663F1A1AB0155100286D8B /* srs_rtmp_dump.c in Sources */, 3CE6CD311AE4AFB800706E07 /* srs_main_ingest_hls.cpp in Sources */, 3C28EDDF1AF5C43F00A3AEAC /* srs_app_caster_flv.cpp in Sources */, 3C1232241AAE814D00CE8F6C /* srs_kernel_error.cpp in Sources */, + 3C036B561B2D0AC10078E2E0 /* srs_app_http_stream.cpp in Sources */, 3C068D6D1B10175500AA722C /* srs_protocol_buffer.cpp in Sources */, 3C1232441AAE81A400CE8F6C /* srs_rtmp_handshake.cpp in Sources */, 3C1232291AAE814D00CE8F6C /* srs_kernel_stream.cpp in Sources */, diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 7e38341e0..c992c5002 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -23,7 +23,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include -#if defined(SRS_AUTO_HTTP_CORE) || defined(SRS_AUTO_HTTP_SERVER) +#if defined(SRS_AUTO_HTTP_CORE) #include #include @@ -52,6 +52,9 @@ using namespace std; #include #include #include +#include +#include +#include #endif @@ -1094,1392 +1097,7 @@ string SrsHttpUri::get_uri_field(string uri, http_parser_url* hp_u, http_parser_ return uri.substr(offset, len); } -#endif - -#ifdef SRS_AUTO_HTTP_SERVER - -SrsVodStream::SrsVodStream(string root_dir) - : SrsHttpFileServer(root_dir) -{ -} - -SrsVodStream::~SrsVodStream() -{ -} - -int SrsVodStream::serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, int offset) -{ - int ret = ERROR_SUCCESS; - - SrsFileReader fs; - - // open flv file - if ((ret = fs.open(fullpath)) != ERROR_SUCCESS) { - return ret; - } - - if (offset > fs.filesize()) { - ret = ERROR_HTTP_REMUX_OFFSET_OVERFLOW; - srs_warn("http flv streaming %s overflow. size=%"PRId64", offset=%d, ret=%d", - fullpath.c_str(), fs.filesize(), offset, ret); - return ret; - } - - SrsFlvVodStreamDecoder ffd; - - // open fast decoder - if ((ret = ffd.initialize(&fs)) != ERROR_SUCCESS) { - return ret; - } - - // save header, send later. - char flv_header[13]; - - // send flv header - if ((ret = ffd.read_header_ext(flv_header)) != ERROR_SUCCESS) { - return ret; - } - - // save sequence header, send later - char* sh_data = NULL; - int sh_size = 0; - - if (true) { - // send sequence header - int64_t start = 0; - if ((ret = ffd.read_sequence_header_summary(&start, &sh_size)) != ERROR_SUCCESS) { - return ret; - } - if (sh_size <= 0) { - ret = ERROR_HTTP_REMUX_SEQUENCE_HEADER; - srs_warn("http flv streaming no sequence header. size=%d, ret=%d", sh_size, ret); - return ret; - } - } - sh_data = new char[sh_size]; - SrsAutoFree(char, sh_data); - if ((ret = fs.read(sh_data, sh_size, NULL)) != ERROR_SUCCESS) { - return ret; - } - - // seek to data offset - int64_t left = fs.filesize() - offset; - - // write http header for ts. - w->header()->set_content_length((int)(sizeof(flv_header) + sh_size + left)); - w->header()->set_content_type("video/x-flv"); - - // write flv header and sequence header. - if ((ret = w->write(flv_header, sizeof(flv_header))) != ERROR_SUCCESS) { - return ret; - } - if (sh_size > 0 && (ret = w->write(sh_data, sh_size)) != ERROR_SUCCESS) { - return ret; - } - - // write body. - if ((ret = ffd.lseek(offset)) != ERROR_SUCCESS) { - return ret; - } - - // send data - if ((ret = copy(w, &fs, r, (int)left)) != ERROR_SUCCESS) { - srs_warn("read flv=%s size=%d failed, ret=%d", fullpath.c_str(), left, ret); - return ret; - } - - return ret; -} - -int SrsVodStream::serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, int start, int end) -{ - int ret = ERROR_SUCCESS; - - srs_assert(start >= 0); - srs_assert(end == -1 || end >= 0); - - SrsFileReader fs; - - // open flv file - if ((ret = fs.open(fullpath)) != ERROR_SUCCESS) { - return ret; - } - - // parse -1 to whole file. - if (end == -1) { - end = (int)fs.filesize(); - } - - if (end > fs.filesize() || start > end) { - ret = ERROR_HTTP_REMUX_OFFSET_OVERFLOW; - srs_warn("http mp4 streaming %s overflow. size=%"PRId64", offset=%d, ret=%d", - fullpath.c_str(), fs.filesize(), start, ret); - return ret; - } - - // seek to data offset, [start, end] for range. - int64_t left = end - start + 1; - - // write http header for ts. - w->header()->set_content_length(left); - w->header()->set_content_type("video/mp4"); - - // status code 206 to make dash.as happy. - w->write_header(SRS_CONSTS_HTTP_PartialContent); - - // response the content range header. - std::stringstream content_range; - content_range << "bytes " << start << "-" << end << "/" << fs.filesize(); - w->header()->set("Content-Range", content_range.str()); - - // write body. - fs.lseek(start); - - // send data - if ((ret = copy(w, &fs, r, (int)left)) != ERROR_SUCCESS) { - srs_warn("read mp4=%s size=%d failed, ret=%d", fullpath.c_str(), left, ret); - return ret; - } - - return ret; -} - -SrsStreamCache::SrsStreamCache(SrsSource* s, SrsRequest* r) -{ - req = r->copy(); - source = s; - queue = new SrsMessageQueue(true); - pthread = new SrsEndlessThread("http-stream", this); -} - -SrsStreamCache::~SrsStreamCache() -{ - srs_freep(pthread); - - srs_freep(queue); - srs_freep(req); -} - -int SrsStreamCache::start() -{ - return pthread->start(); -} - -int SrsStreamCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) -{ - int ret = ERROR_SUCCESS; - - double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); - - if (fast_cache <= 0) { - srs_info("http: ignore dump fast cache."); - return ret; - } - - // the jitter is get from SrsSource, which means the time_jitter of vhost. - if ((ret = queue->dump_packets(consumer, false, jitter)) != ERROR_SUCCESS) { - return ret; - } - - srs_trace("http: dump cache %d msgs, duration=%dms, cache=%.2fs", - queue->size(), queue->duration(), fast_cache); - - return ret; -} - -int SrsStreamCache::cycle() -{ - int ret = ERROR_SUCCESS; - - SrsConsumer* consumer = NULL; - if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) { - srs_error("http: create consumer failed. ret=%d", ret); - return ret; - } - SrsAutoFree(SrsConsumer, consumer); - - SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream_cache(); - SrsAutoFree(SrsPithyPrint, pprint); - - SrsMessageArray msgs(SRS_PERF_MW_MSGS); - - // TODO: FIXME: support reload. - double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); - if (fast_cache > 0) { - queue->set_queue_size(fast_cache); - } - - while (true) { - pprint->elapse(); - - // get messages from consumer. - // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. - int count = 0; - if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { - srs_error("http: get messages from consumer failed. ret=%d", ret); - return ret; - } - - if (count <= 0) { - srs_info("http: mw sleep %dms for no msg", mw_sleep); - // directly use sleep, donot use consumer wait. - st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); - - // ignore when nothing got. - continue; - } - - if (pprint->can_print()) { - srs_trace("-> "SRS_CONSTS_LOG_HTTP_STREAM_CACHE" http: got %d msgs, age=%d, min=%d, mw=%d", - count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000); - } - - // free the messages. - for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; - if (fast_cache > 0) { - queue->enqueue(msg); - } else { - srs_freep(msg); - } - } - } - - return ret; -} - -ISrsStreamEncoder::ISrsStreamEncoder() -{ -} - -ISrsStreamEncoder::~ISrsStreamEncoder() -{ -} - -SrsTsStreamEncoder::SrsTsStreamEncoder() -{ - enc = new SrsTsEncoder(); -} - -SrsTsStreamEncoder::~SrsTsStreamEncoder() -{ - srs_freep(enc); -} - -int SrsTsStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* /*c*/) -{ - int ret = ERROR_SUCCESS; - - if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { - return ret; - } - - return ret; -} - -int SrsTsStreamEncoder::write_audio(int64_t timestamp, char* data, int size) -{ - return enc->write_audio(timestamp, data, size); -} - -int SrsTsStreamEncoder::write_video(int64_t timestamp, char* data, int size) -{ - return enc->write_video(timestamp, data, size); -} - -int SrsTsStreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, int /*size*/) -{ - return ERROR_SUCCESS; -} - -bool SrsTsStreamEncoder::has_cache() -{ - // for ts stream, use gop cache of SrsSource is ok. - return false; -} - -int SrsTsStreamEncoder::dump_cache(SrsConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/) -{ - // for ts stream, ignore cache. - return ERROR_SUCCESS; -} - -SrsFlvStreamEncoder::SrsFlvStreamEncoder() -{ - enc = new SrsFlvEncoder(); -} - -SrsFlvStreamEncoder::~SrsFlvStreamEncoder() -{ - srs_freep(enc); -} - -int SrsFlvStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* /*c*/) -{ - int ret = ERROR_SUCCESS; - - if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { - return ret; - } - - // write flv header. - if ((ret = enc->write_header()) != ERROR_SUCCESS) { - return ret; - } - - return ret; -} - -int SrsFlvStreamEncoder::write_audio(int64_t timestamp, char* data, int size) -{ - return enc->write_audio(timestamp, data, size); -} - -int SrsFlvStreamEncoder::write_video(int64_t timestamp, char* data, int size) -{ - return enc->write_video(timestamp, data, size); -} - -int SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, int size) -{ - return enc->write_metadata(SrsCodecFlvTagScript, data, size); -} - -bool SrsFlvStreamEncoder::has_cache() -{ - // for flv stream, use gop cache of SrsSource is ok. - return false; -} - -int SrsFlvStreamEncoder::dump_cache(SrsConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/) -{ - // for flv stream, ignore cache. - return ERROR_SUCCESS; -} - -#ifdef SRS_PERF_FAST_FLV_ENCODER -SrsFastFlvStreamEncoder::SrsFastFlvStreamEncoder() -{ -} - -SrsFastFlvStreamEncoder::~SrsFastFlvStreamEncoder() -{ -} - -int SrsFastFlvStreamEncoder::write_tags(SrsSharedPtrMessage** msgs, int count) -{ - return enc->write_tags(msgs, count); -} -#endif - -SrsAacStreamEncoder::SrsAacStreamEncoder() -{ - enc = new SrsAacEncoder(); - cache = NULL; -} - -SrsAacStreamEncoder::~SrsAacStreamEncoder() -{ - srs_freep(enc); -} - -int SrsAacStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c) -{ - int ret = ERROR_SUCCESS; - - cache = c; - - if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { - return ret; - } - - return ret; -} - -int SrsAacStreamEncoder::write_audio(int64_t timestamp, char* data, int size) -{ - return enc->write_audio(timestamp, data, size); -} - -int SrsAacStreamEncoder::write_video(int64_t /*timestamp*/, char* /*data*/, int /*size*/) -{ - // aac ignore any flv video. - return ERROR_SUCCESS; -} - -int SrsAacStreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, int /*size*/) -{ - // aac ignore any flv metadata. - return ERROR_SUCCESS; -} - -bool SrsAacStreamEncoder::has_cache() -{ - return true; -} - -int SrsAacStreamEncoder::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) -{ - srs_assert(cache); - return cache->dump_cache(consumer, jitter); -} - -SrsMp3StreamEncoder::SrsMp3StreamEncoder() -{ - enc = new SrsMp3Encoder(); - cache = NULL; -} - -SrsMp3StreamEncoder::~SrsMp3StreamEncoder() -{ - srs_freep(enc); -} - -int SrsMp3StreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c) -{ - int ret = ERROR_SUCCESS; - - cache = c; - - if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { - return ret; - } - - if ((ret = enc->write_header()) != ERROR_SUCCESS) { - return ret; - } - - return ret; -} - -int SrsMp3StreamEncoder::write_audio(int64_t timestamp, char* data, int size) -{ - return enc->write_audio(timestamp, data, size); -} - -int SrsMp3StreamEncoder::write_video(int64_t /*timestamp*/, char* /*data*/, int /*size*/) -{ - // mp3 ignore any flv video. - return ERROR_SUCCESS; -} - -int SrsMp3StreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, int /*size*/) -{ - // mp3 ignore any flv metadata. - return ERROR_SUCCESS; -} - -bool SrsMp3StreamEncoder::has_cache() -{ - return true; -} - -int SrsMp3StreamEncoder::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) -{ - srs_assert(cache); - return cache->dump_cache(consumer, jitter); -} - -SrsStreamWriter::SrsStreamWriter(ISrsHttpResponseWriter* w) -{ - writer = w; -} - -SrsStreamWriter::~SrsStreamWriter() -{ -} - -int SrsStreamWriter::open(std::string /*file*/) -{ - return ERROR_SUCCESS; -} - -void SrsStreamWriter::close() -{ -} - -bool SrsStreamWriter::is_open() -{ - return true; -} - -int64_t SrsStreamWriter::tellg() -{ - return 0; -} - -int SrsStreamWriter::write(void* buf, size_t count, ssize_t* pnwrite) -{ - if (pnwrite) { - *pnwrite = count; - } - return writer->write((char*)buf, (int)count); -} - -int SrsStreamWriter::writev(iovec* iov, int iovcnt, ssize_t* pnwrite) -{ - return writer->writev(iov, iovcnt, pnwrite); -} - -SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c) -{ - source = s; - cache = c; - req = r->copy(); -} - -SrsLiveStream::~SrsLiveStream() -{ - srs_freep(req); -} - -int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) -{ - int ret = ERROR_SUCCESS; - - ISrsStreamEncoder* enc = NULL; - - srs_assert(entry); - if (srs_string_ends_with(entry->pattern, ".flv")) { - w->header()->set_content_type("video/x-flv"); -#ifdef SRS_PERF_FAST_FLV_ENCODER - enc = new SrsFastFlvStreamEncoder(); -#else - enc = new SrsFlvStreamEncoder(); -#endif - } else if (srs_string_ends_with(entry->pattern, ".aac")) { - w->header()->set_content_type("audio/x-aac"); - enc = new SrsAacStreamEncoder(); - } else if (srs_string_ends_with(entry->pattern, ".mp3")) { - w->header()->set_content_type("audio/mpeg"); - enc = new SrsMp3StreamEncoder(); - } else if (srs_string_ends_with(entry->pattern, ".ts")) { - w->header()->set_content_type("video/MP2T"); - enc = new SrsTsStreamEncoder(); - } else { - ret = ERROR_HTTP_LIVE_STREAM_EXT; - srs_error("http: unsupported pattern %s", entry->pattern.c_str()); - return ret; - } - SrsAutoFree(ISrsStreamEncoder, enc); - - // create consumer of souce, ignore gop cache, use the audio gop cache. - SrsConsumer* consumer = NULL; - if ((ret = source->create_consumer(consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) { - srs_error("http: create consumer failed. ret=%d", ret); - return ret; - } - SrsAutoFree(SrsConsumer, consumer); - srs_verbose("http: consumer created success."); - - SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream(); - SrsAutoFree(SrsPithyPrint, pprint); - - SrsMessageArray msgs(SRS_PERF_MW_MSGS); - - // the memory writer. - SrsStreamWriter writer(w); - if ((ret = enc->initialize(&writer, cache)) != ERROR_SUCCESS) { - srs_error("http: initialize stream encoder failed. ret=%d", ret); - return ret; - } - - // if gop cache enabled for encoder, dump to consumer. - if (enc->has_cache()) { - if ((ret = enc->dump_cache(consumer, source->jitter())) != ERROR_SUCCESS) { - srs_error("http: dump cache to consumer failed. ret=%d", ret); - return ret; - } - } - -#ifdef SRS_PERF_FAST_FLV_ENCODER - SrsFastFlvStreamEncoder* ffe = dynamic_cast(enc); -#endif - - while (true) { - pprint->elapse(); - - // get messages from consumer. - // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. - int count = 0; - if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { - srs_error("http: get messages from consumer failed. ret=%d", ret); - return ret; - } - - if (count <= 0) { - srs_info("http: mw sleep %dms for no msg", mw_sleep); - // directly use sleep, donot use consumer wait. - st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); - - // ignore when nothing got. - continue; - } - - if (pprint->can_print()) { - srs_info("-> "SRS_CONSTS_LOG_HTTP_STREAM" http: got %d msgs, age=%d, min=%d, mw=%d", - count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000); - } - - // sendout all messages. -#ifdef SRS_PERF_FAST_FLV_ENCODER - if (ffe) { - ret = ffe->write_tags(msgs.msgs, count); - } else { - ret = streaming_send_messages(enc, msgs.msgs, count); - } -#else - ret = streaming_send_messages(enc, msgs.msgs, count); -#endif - - // free the messages. - for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; - srs_freep(msg); - } - - // check send error code. - if (ret != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("http: send messages to client failed. ret=%d", ret); - } - return ret; - } - } - - return ret; -} - -int SrsLiveStream::streaming_send_messages(ISrsStreamEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs) -{ - int ret = ERROR_SUCCESS; - - for (int i = 0; i < nb_msgs; i++) { - SrsSharedPtrMessage* msg = msgs[i]; - - if (msg->is_audio()) { - ret = enc->write_audio(msg->timestamp, msg->payload, msg->size); - } else if (msg->is_video()) { - ret = enc->write_video(msg->timestamp, msg->payload, msg->size); - } else { - ret = enc->write_metadata(msg->timestamp, msg->payload, msg->size); - } - - if (ret != ERROR_SUCCESS) { - return ret; - } - } - - return ret; -} - -SrsLiveEntry::SrsLiveEntry(std::string m, bool h) -{ - mount = m; - hstrs = h; - - stream = NULL; - cache = NULL; - - std::string ext; - size_t pos = string::npos; - if ((pos = m.rfind(".")) != string::npos) { - ext = m.substr(pos); - } - _is_flv = (ext == ".flv"); - _is_ts = (ext == ".ts"); - _is_mp3 = (ext == ".mp3"); - _is_aac = (ext == ".aac"); -} - -bool SrsLiveEntry::is_flv() -{ - return _is_flv; -} - -bool SrsLiveEntry::is_ts() -{ - return _is_ts; -} - -bool SrsLiveEntry::is_aac() -{ - return _is_aac; -} - -bool SrsLiveEntry::is_mp3() -{ - return _is_mp3; -} - -SrsHlsM3u8Stream::SrsHlsM3u8Stream() -{ -} - -SrsHlsM3u8Stream::~SrsHlsM3u8Stream() -{ -} - -void SrsHlsM3u8Stream::set_m3u8(std::string v) -{ - m3u8 = v; -} - -int SrsHlsM3u8Stream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) -{ - int ret = ERROR_SUCCESS; - - std::string data = m3u8; - - w->header()->set_content_length((int)data.length()); - w->header()->set_content_type("application/x-mpegURL;charset=utf-8"); - - if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("send m3u8 failed. ret=%d", ret); - } - return ret; - } - - return ret; -} - -SrsHlsTsStream::SrsHlsTsStream() -{ -} - -SrsHlsTsStream::~SrsHlsTsStream() -{ -} - -void SrsHlsTsStream::set_ts(std::string v) -{ - ts = v; -} - -int SrsHlsTsStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) -{ - int ret = ERROR_SUCCESS; - - std::string data = ts; - - w->header()->set_content_length((int)data.length()); - w->header()->set_content_type("video/MP2T"); - - if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("send ts failed. ret=%d", ret); - } - return ret; - } - - return ret; -} - -SrsHlsEntry::SrsHlsEntry() -{ - tmpl = NULL; -} - -SrsHttpServer::SrsHttpServer(SrsServer* svr) -{ - server = svr; - - mux.hijack(this); -} - -SrsHttpServer::~SrsHttpServer() -{ - mux.unhijack(this); - - if (true) { - std::map::iterator it; - for (it = tflvs.begin(); it != tflvs.end(); ++it) { - SrsLiveEntry* entry = it->second; - srs_freep(entry); - } - tflvs.clear(); - } - if (true) { - std::map::iterator it; - for (it = sflvs.begin(); it != sflvs.end(); ++it) { - SrsLiveEntry* entry = it->second; - srs_freep(entry); - } - sflvs.clear(); - } - if (true) { - std::map::iterator it; - for (it = thls.begin(); it != thls.end(); ++it) { - SrsHlsEntry* entry = it->second; - srs_freep(entry); - } - thls.clear(); - } - if (true) { - std::map::iterator it; - for (it = shls.begin(); it != shls.end(); ++it) { - SrsHlsEntry* entry = it->second; - srs_freep(entry); - } - shls.clear(); - } -} - -int SrsHttpServer::initialize() -{ - int ret = ERROR_SUCCESS; - - // static file - // flv vod streaming. - if ((ret = initialize_static_file()) != ERROR_SUCCESS) { - return ret; - } - - // remux rtmp to flv live streaming - if ((ret = initialize_flv_streaming()) != ERROR_SUCCESS) { - return ret; - } - - // remux rtmp to hls live streaming - if ((ret = initialize_hls_streaming()) != ERROR_SUCCESS) { - return ret; - } - - return ret; -} - -// TODO: FIXME: rename for HTTP FLV mount. -int SrsHttpServer::http_mount(SrsSource* s, SrsRequest* r) -{ - int ret = ERROR_SUCCESS; - - // the id to identify stream. - std::string sid = r->get_stream_url(); - SrsLiveEntry* entry = NULL; - - // create stream from template when not found. - if (sflvs.find(sid) == sflvs.end()) { - if (tflvs.find(r->vhost) == tflvs.end()) { - srs_info("ignore mount flv stream for disabled"); - return ret; - } - - SrsLiveEntry* tmpl = tflvs[r->vhost]; - - std::string mount = tmpl->mount; - - // replace the vhost variable - mount = srs_string_replace(mount, "[vhost]", r->vhost); - mount = srs_string_replace(mount, "[app]", r->app); - mount = srs_string_replace(mount, "[stream]", r->stream); - - // remove the default vhost mount - mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); - - entry = new SrsLiveEntry(mount, tmpl->hstrs); - - entry->cache = new SrsStreamCache(s, r); - entry->stream = new SrsLiveStream(s, r, entry->cache); - - sflvs[sid] = entry; - - // mount the http flv stream. - // we must register the handler, then start the thread, - // for the thread will cause thread switch context. - // @see https://github.com/simple-rtmp-server/srs/issues/404 - if ((ret = mux.handle(mount, entry->stream)) != ERROR_SUCCESS) { - srs_error("http: mount flv stream for vhost=%s failed. ret=%d", sid.c_str(), ret); - return ret; - } - - // start http stream cache thread - if ((ret = entry->cache->start()) != ERROR_SUCCESS) { - srs_error("http: start stream cache failed. ret=%d", ret); - return ret; - } - srs_trace("http: mount flv stream for vhost=%s, mount=%s", sid.c_str(), mount.c_str()); - } else { - entry = sflvs[sid]; - } - - // TODO: FIXME: supports reload. - if (entry->stream) { - entry->stream->entry->enabled = true; - return ret; - } - - return ret; -} - -void SrsHttpServer::http_unmount(SrsSource* s, SrsRequest* r) -{ - std::string sid = r->get_stream_url(); - - if (sflvs.find(sid) == sflvs.end()) { - srs_info("ignore unmount flv stream for disabled"); - return; - } - - SrsLiveEntry* entry = sflvs[sid]; - entry->stream->entry->enabled = false; -} - -int SrsHttpServer::mount_hls(SrsRequest* r) -{ - int ret = ERROR_SUCCESS; - - std::string sid = r->get_stream_url(); - - if (shls.find(sid) == shls.end()) { - srs_info("ignore mount hls stream for disabled"); - return ret; - } - - SrsHlsEntry* entry = shls[sid]; - - // TODO: FIXME: supports reload. - std::map::iterator it; - for (it = entry->streams.begin(); it != entry->streams.end(); ++it) { - ISrsHttpHandler* stream = it->second; - stream->entry->enabled = true; - } - - return ret; -} - -int SrsHttpServer::hls_update_m3u8(SrsRequest* r, string m3u8) -{ - int ret = ERROR_SUCCESS; - - std::string mount; - - std::string sid = r->get_stream_url(); - SrsHlsEntry* entry = NULL; - - // create stream from template when not found. - if (shls.find(sid) == shls.end()) { - if (thls.find(r->vhost) == thls.end()) { - srs_info("ignore mount hls stream for disabled"); - return ret; - } - - SrsHlsEntry* tmpl = thls[r->vhost]; - srs_assert(tmpl); - - entry = new SrsHlsEntry(); - mount = tmpl->mount; - - // replace the vhost variable - mount = srs_string_replace(mount, "[vhost]", r->vhost); - mount = srs_string_replace(mount, "[app]", r->app); - mount = srs_string_replace(mount, "[stream]", r->stream); - - // remove the default vhost mount - mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); - - entry->tmpl = tmpl; - entry->mount = mount; - shls[sid] = entry; - - if (entry->streams.find(mount) == entry->streams.end()) { - ISrsHttpHandler* he = new SrsHlsM3u8Stream(); - entry->streams[mount] = he; - - if ((ret = mux.handle(mount, he)) != ERROR_SUCCESS) { - srs_error("handle mount=%s failed. ret=%d", mount.c_str(), ret); - return ret; - } - } - } else { - entry = shls[sid]; - } - - mount = entry->mount; - - // update the m3u8 stream. - SrsHlsM3u8Stream* hms = dynamic_cast(entry->streams[mount]); - if (hms) { - hms->set_m3u8(m3u8); - } - srs_trace("hls update m3u8 ok, mount=%s", mount.c_str()); - - return ret; -} - -int SrsHttpServer::hls_update_ts(SrsRequest* r, string uri, string ts) -{ - int ret = ERROR_SUCCESS; - - std::string sid = r->get_stream_url(); - - // when no hls mounted, init with empty m3u8. - if (shls.find(sid) == shls.end()) { - if ((ret = hls_update_m3u8(r, "")) != ERROR_SUCCESS) { - return ret; - } - } - - // find again, ignore if not exits. - if (shls.find(sid) == shls.end()) { - return ret; - } - - SrsHlsEntry* entry = shls[sid]; - srs_assert(entry); - srs_assert(entry->tmpl); - - std::string mount = hls_mount_generate(r, uri, entry->tmpl->mount); - - if (entry->streams.find(mount) == entry->streams.end()) { - ISrsHttpHandler* he = new SrsHlsTsStream(); - entry->streams[mount] = he; - - if ((ret = mux.handle(mount, he)) != ERROR_SUCCESS) { - srs_error("handle mount=%s failed. ret=%d", mount.c_str(), ret); - return ret; - } - } - - // update the ts stream. - SrsHlsTsStream* hts = dynamic_cast(entry->streams[mount]); - if (hts) { - hts->set_ts(ts); - } - srs_trace("hls update ts ok, mount=%s", mount.c_str()); - - return ret; -} - - -int SrsHttpServer::hls_remove_ts(SrsRequest* r, string uri) -{ - int ret = ERROR_SUCCESS; - - std::string sid = r->get_stream_url(); - - // when no hls mounted, ignore. - if (shls.find(sid) == shls.end()) { - return ret; - } - - SrsHlsEntry* entry = shls[sid]; - srs_assert(entry); - srs_assert(entry->tmpl); - - std::string mount = hls_mount_generate(r, uri, entry->tmpl->mount); - - // ignore when no ts mounted. - if (entry->streams.find(mount) == entry->streams.end()) { - return ret; - } - - // update the ts stream. - SrsHlsTsStream* hts = dynamic_cast(entry->streams[mount]); - if (hts) { - hts->set_ts(""); - // TODO: FIXME: unmount and remove the http handler. - } - srs_trace("hls remove ts ok, mount=%s", mount.c_str()); - - return ret; -} - -void SrsHttpServer::unmount_hls(SrsRequest* r) -{ - std::string sid = r->get_stream_url(); - - if (shls.find(sid) == shls.end()) { - srs_info("ignore unmount hls stream for disabled"); - return; - } - - SrsHlsEntry* entry = shls[sid]; - - std::map::iterator it; - for (it = entry->streams.begin(); it != entry->streams.end(); ++it) { - ISrsHttpHandler* stream = it->second; - stream->entry->enabled = false; - } -} - -int SrsHttpServer::on_reload_vhost_http_updated() -{ - int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. - return ret; -} - -int SrsHttpServer::on_reload_vhost_http_remux_updated() -{ - int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. - return ret; -} - -int SrsHttpServer::on_reload_vhost_hls(string vhost) -{ - int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. - return ret; -} - -int SrsHttpServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) -{ - int ret = ERROR_SUCCESS; - - // when handler not the root, we think the handler is ok. - ISrsHttpHandler* h = *ph? *ph : NULL; - if (h && h->entry && h->entry->pattern != "/") { - return ret; - } - - // only hijack for http streaming, http-flv/ts/mp3/aac. - std::string ext = request->ext(); - if (ext.empty()) { - return ret; - } - - // find the actually request vhost. - SrsConfDirective* vhost = _srs_config->get_vhost(request->host()); - if (!vhost || !_srs_config->get_vhost_enabled(vhost)) { - return ret; - } - - // find the entry template for the stream. - SrsLiveEntry* entry = NULL; - if (true) { - // no http streaming on vhost, ignore. - std::map::iterator it = tflvs.find(vhost->arg0()); - if (it == tflvs.end()) { - return ret; - } - - // hstrs not enabled, ignore. - entry = it->second; - if (!entry->hstrs) { - return ret; - } - - // check entry and request extension. - if (entry->is_flv()) { - if (ext != ".flv") { - return ret; - } - } else if (entry->is_ts()) { - if (ext != ".ts") { - return ret; - } - } else if (entry->is_mp3()) { - if (ext != ".mp3") { - return ret; - } - } else if (entry->is_aac()) { - if (ext != ".aac") { - return ret; - } - } else { - return ret; - } - } - - // convert to concreate class. - SrsHttpMessage* hreq = dynamic_cast(request); - srs_assert(hreq); - - // hijack for entry. - SrsRequest* r = hreq->to_request(vhost->arg0()); - SrsAutoFree(SrsRequest, r); - SrsSource* s = SrsSource::fetch(r); - if (!s) { - if ((ret = SrsSource::create(r, server, server, &s)) != ERROR_SUCCESS) { - return ret; - } - } - srs_assert(s != NULL); - - // create http streaming handler. - if ((ret = http_mount(s, r)) != ERROR_SUCCESS) { - return ret; - } - - // use the handler if exists. - if (ph) { - std::string sid = r->get_stream_url(); - if (sflvs.find(sid) != sflvs.end()) { - entry = sflvs[sid]; - *ph = entry->stream; - } - } - - // trigger edge to fetch from origin. - bool vhost_is_edge = _srs_config->get_vhost_is_edge(r->vhost); - srs_trace("hstrs: source url=%s, is_edge=%d, source_id=%d[%d]", - r->get_stream_url().c_str(), vhost_is_edge, s->source_id(), s->source_id()); - - // TODO: FIXME: disconnect when all connection closed. - if (vhost_is_edge) { - // notice edge to start for the first client. - if ((ret = s->on_edge_start_play()) != ERROR_SUCCESS) { - srs_error("notice edge start play stream failed. ret=%d", ret); - return ret; - } - } - - return ret; -} - -int SrsHttpServer::initialize_static_file() -{ - int ret = ERROR_SUCCESS; - - bool default_root_exists = false; - - // http static file and flv vod stream mount for each vhost. - SrsConfDirective* root = _srs_config->get_root(); - for (int i = 0; i < (int)root->directives.size(); i++) { - SrsConfDirective* conf = root->at(i); - - if (!conf->is_vhost()) { - continue; - } - - std::string vhost = conf->arg0(); - if (!_srs_config->get_vhost_http_enabled(vhost)) { - continue; - } - - std::string mount = _srs_config->get_vhost_http_mount(vhost); - std::string dir = _srs_config->get_vhost_http_dir(vhost); - - // replace the vhost variable - mount = srs_string_replace(mount, "[vhost]", vhost); - - // remove the default vhost mount - mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); - - // the dir mount must always ends with "/" - if (mount != "/" && mount.rfind("/") != mount.length() - 1) { - mount += "/"; - } - - // mount the http of vhost. - if ((ret = mux.handle(mount, new SrsVodStream(dir))) != ERROR_SUCCESS) { - srs_error("http: mount dir=%s for vhost=%s failed. ret=%d", dir.c_str(), vhost.c_str(), ret); - return ret; - } - - if (mount == "/") { - default_root_exists = true; - srs_warn("http: root mount to %s", dir.c_str()); - } - srs_trace("http: vhost=%s mount to %s", vhost.c_str(), mount.c_str()); - } - - if (!default_root_exists) { - // add root - std::string dir = _srs_config->get_http_stream_dir(); - if ((ret = mux.handle("/", new SrsVodStream(dir))) != ERROR_SUCCESS) { - srs_error("http: mount root dir=%s failed. ret=%d", dir.c_str(), ret); - return ret; - } - srs_trace("http: root mount to %s", dir.c_str()); - } - - return ret; -} - -int SrsHttpServer::initialize_flv_streaming() -{ - int ret = ERROR_SUCCESS; - - // http flv live stream mount for each vhost. - SrsConfDirective* root = _srs_config->get_root(); - for (int i = 0; i < (int)root->directives.size(); i++) { - SrsConfDirective* conf = root->at(i); - - if (!conf->is_vhost()) { - continue; - } - - std::string vhost = conf->arg0(); - if (!_srs_config->get_vhost_http_remux_enabled(vhost)) { - continue; - } - - SrsLiveEntry* entry = new SrsLiveEntry( - _srs_config->get_vhost_http_remux_mount(vhost), - _srs_config->get_vhost_http_remux_hstrs(vhost) - ); - tflvs[vhost] = entry; - srs_trace("http flv live stream, vhost=%s, mount=%s", - vhost.c_str(), entry->mount.c_str()); - } - - return ret; -} - -int SrsHttpServer::initialize_hls_streaming() -{ - int ret = ERROR_SUCCESS; - - // http hls live stream mount for each vhost. - SrsConfDirective* root = _srs_config->get_root(); - for (int i = 0; i < (int)root->directives.size(); i++) { - SrsConfDirective* conf = root->at(i); - - if (!conf->is_vhost()) { - continue; - } - - std::string vhost = conf->arg0(); - if (!_srs_config->get_hls_enabled(vhost)) { - continue; - } - - std::string storage = _srs_config->get_hls_storage(vhost); - if (storage != "ram" && storage != "both") { - continue; - } - - SrsHlsEntry* entry = new SrsHlsEntry(); - entry->mount = _srs_config->get_hls_mount(vhost); - thls[vhost] = entry; - srs_trace("http hls live stream, vhost=%s, mount=%s", - vhost.c_str(), entry->mount.c_str()); - } - - return ret; -} - -string SrsHttpServer::hls_mount_generate(SrsRequest* r, string uri, string tmpl) -{ - std::string mount = tmpl; - - // the ts is relative from the m3u8, the same start dir. - size_t pos = string::npos; - if ((pos = mount.rfind("/")) != string::npos) { - mount = mount.substr(0, pos); - } - - // replace the vhost variable - mount = srs_string_replace(mount, "[vhost]", r->vhost); - mount = srs_string_replace(mount, "[app]", r->app); - - // remove the default vhost mount - mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); - - // mount with ts. - mount += "/"; - mount += uri; - - return mount; -} - -#endif - -#ifdef SRS_AUTO_HTTP_CORE -SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) +SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m) : SrsConnection(cm, fd) { parser = new SrsHttpParser(); @@ -2586,7 +1204,7 @@ int SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) return ret; } -SrsStaticHttpConn::SrsStaticHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) +SrsStaticHttpConn::SrsStaticHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m) : SrsHttpConn(cm, fd, m) { } @@ -2610,5 +1228,85 @@ int SrsStaticHttpConn::on_got_http_message(ISrsHttpMessage* msg) return ret; } +SrsHttpServer::SrsHttpServer(SrsServer* svr) +{ + server = svr; + http_stream = new SrsHttpStreamServer(svr); + http_static = new SrsHttpStaticServer(svr); +} + +SrsHttpServer::~SrsHttpServer() +{ + srs_freep(http_stream); + srs_freep(http_static); +} + +int SrsHttpServer::initialize() +{ + int ret = ERROR_SUCCESS; + +#if defined(SRS_AUTO_HTTP_SERVER) && defined(SRS_AUTO_HTTP_API) + // for SRS go-sharp to detect the status of HTTP server of SRS HTTP FLV Cluster. + if ((ret = http_static->mux.handle("/api/v1/versions", new SrsGoApiVersion())) != ERROR_SUCCESS) { + return ret; + } +#endif + + if ((ret = http_stream->initialize()) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = http_static->initialize()) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsHttpServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) +{ + // try http stream first. + if (http_stream->mux.can_serve(r)) { + return http_stream->mux.serve_http(w, r); + } + + return http_static->mux.serve_http(w, r); +} + +int SrsHttpServer::http_mount(SrsSource* s, SrsRequest* r) +{ + return http_stream->http_mount(s, r); +} + +void SrsHttpServer::http_unmount(SrsSource* s, SrsRequest* r) +{ + http_stream->http_unmount(s, r); +} + +int SrsHttpServer::mount_hls(SrsRequest* r) +{ + return http_stream->mount_hls(r); +} + +int SrsHttpServer::hls_update_m3u8(SrsRequest* r, std::string m3u8) +{ + return http_stream->hls_update_m3u8(r, m3u8); +} + +int SrsHttpServer::hls_update_ts(SrsRequest* r, std::string uri, std::string ts) +{ + return http_stream->hls_update_ts(r, uri, ts); +} + +int SrsHttpServer::hls_remove_ts(SrsRequest* r, std::string uri) +{ + return http_stream->hls_remove_ts(r, uri); +} + +void SrsHttpServer::unmount_hls(SrsRequest* r) +{ + http_stream->unmount_hls(r); +} + #endif diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 577af0733..d27c30597 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -67,10 +67,8 @@ class SrsFastBuffer; class SrsHttpUri; class SrsConnection; class SrsHttpMessage; - -#endif - -#ifdef SRS_AUTO_HTTP_CORE +class SrsHttpStreamServer; +class SrsHttpStaticServer; // the http chunked header size, // for writev, there always one chunk to send it. @@ -371,327 +369,58 @@ private: virtual std::string get_uri_field(std::string uri, http_parser_url* hp_u, http_parser_url_fields field); }; -#endif - -#ifdef SRS_AUTO_HTTP_SERVER - -/** -* the flv vod stream supports flv?start=offset-bytes. -* for example, http://server/file.flv?start=10240 -* server will write flv header and sequence header, -* then seek(10240) and response flv tag data. -*/ -class SrsVodStream : public SrsHttpFileServer -{ -public: - SrsVodStream(std::string root_dir); - virtual ~SrsVodStream(); -protected: - virtual int serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int offset); - virtual int serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int start, int end); -}; - -/** -* for the srs http stream cache, -* for example, the audio stream cache to make android(weixin) happy. -* we start a thread to shrink the queue. -*/ -class SrsStreamCache : public ISrsEndlessThreadHandler +class SrsHttpConn : public SrsConnection { private: - SrsMessageQueue* queue; - SrsSource* source; - SrsRequest* req; - SrsEndlessThread* pthread; -public: - SrsStreamCache(SrsSource* s, SrsRequest* r); - virtual ~SrsStreamCache(); -public: - virtual int start(); - virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); -// interface ISrsEndlessThreadHandler. -public: - virtual int cycle(); -}; - -/** -* the stream encoder in some codec, for example, flv or aac. -*/ -class ISrsStreamEncoder -{ -public: - ISrsStreamEncoder(); - virtual ~ISrsStreamEncoder(); + SrsHttpParser* parser; + ISrsHttpServeMux* http_mux; public: - /** - * initialize the encoder with file writer(to http response) and stream cache. - * @param w the writer to write to http response. - * @param c the stream cache for audio stream fast startup. - */ - virtual int initialize(SrsFileWriter* w, SrsStreamCache* c) = 0; - /** - * write rtmp video/audio/metadata. - */ - virtual int write_audio(int64_t timestamp, char* data, int size) = 0; - virtual int write_video(int64_t timestamp, char* data, int size) = 0; - virtual int write_metadata(int64_t timestamp, char* data, int size) = 0; + SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m); + virtual ~SrsHttpConn(); +// interface IKbpsDelta public: - /** - * for some stream, for example, mp3 and aac, the audio stream, - * we use large gop cache in encoder, for the gop cache of SrsSource is ignore audio. - * @return true to use gop cache of encoder; otherwise, use SrsSource. - */ - virtual bool has_cache() = 0; - /** - * dumps the cache of encoder to consumer. - */ - virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) = 0; -}; - -/** -* the flv stream encoder, remux rtmp stream to flv stream. -*/ -class SrsFlvStreamEncoder : public ISrsStreamEncoder -{ + virtual void resample(); + virtual int64_t get_send_bytes_delta(); + virtual int64_t get_recv_bytes_delta(); + virtual void cleanup(); protected: - SrsFlvEncoder* enc; -public: - SrsFlvStreamEncoder(); - virtual ~SrsFlvStreamEncoder(); -public: - virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); - virtual int write_audio(int64_t timestamp, char* data, int size); - virtual int write_video(int64_t timestamp, char* data, int size); - virtual int write_metadata(int64_t timestamp, char* data, int size); -public: - virtual bool has_cache(); - virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); -}; - -#ifdef SRS_PERF_FAST_FLV_ENCODER -/** - * the fast flv stream encoder. - * @see https://github.com/simple-rtmp-server/srs/issues/405 - */ -class SrsFastFlvStreamEncoder : public SrsFlvStreamEncoder -{ -public: - SrsFastFlvStreamEncoder(); - virtual ~SrsFastFlvStreamEncoder(); -public: - /** - * write the tags in a time. - */ - virtual int write_tags(SrsSharedPtrMessage** msgs, int count); -}; -#endif - -/** -* the ts stream encoder, remux rtmp stream to ts stream. -*/ -class SrsTsStreamEncoder : public ISrsStreamEncoder -{ -private: - SrsTsEncoder* enc; -public: - SrsTsStreamEncoder(); - virtual ~SrsTsStreamEncoder(); -public: - virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); - virtual int write_audio(int64_t timestamp, char* data, int size); - virtual int write_video(int64_t timestamp, char* data, int size); - virtual int write_metadata(int64_t timestamp, char* data, int size); -public: - virtual bool has_cache(); - virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); -}; - -/** -* the aac stream encoder, remux rtmp stream to aac stream. -*/ -class SrsAacStreamEncoder : public ISrsStreamEncoder -{ -private: - SrsAacEncoder* enc; - SrsStreamCache* cache; -public: - SrsAacStreamEncoder(); - virtual ~SrsAacStreamEncoder(); -public: - virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); - virtual int write_audio(int64_t timestamp, char* data, int size); - virtual int write_video(int64_t timestamp, char* data, int size); - virtual int write_metadata(int64_t timestamp, char* data, int size); -public: - virtual bool has_cache(); - virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); -}; - -/** -* the mp3 stream encoder, remux rtmp stream to mp3 stream. -*/ -class SrsMp3StreamEncoder : public ISrsStreamEncoder -{ -private: - SrsMp3Encoder* enc; - SrsStreamCache* cache; -public: - SrsMp3StreamEncoder(); - virtual ~SrsMp3StreamEncoder(); -public: - virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); - virtual int write_audio(int64_t timestamp, char* data, int size); - virtual int write_video(int64_t timestamp, char* data, int size); - virtual int write_metadata(int64_t timestamp, char* data, int size); -public: - virtual bool has_cache(); - virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); -}; - -/** -* write stream to http response direclty. -*/ -class SrsStreamWriter : public SrsFileWriter -{ -private: - ISrsHttpResponseWriter* writer; -public: - SrsStreamWriter(ISrsHttpResponseWriter* w); - virtual ~SrsStreamWriter(); -public: - virtual int open(std::string file); - virtual void close(); -public: - virtual bool is_open(); - virtual int64_t tellg(); -public: - virtual int write(void* buf, size_t count, ssize_t* pnwrite); - virtual int writev(iovec* iov, int iovcnt, ssize_t* pnwrite); -}; - -/** -* the flv live stream supports access rtmp in flv over http. -* srs will remux rtmp to flv streaming. -*/ -class SrsLiveStream : public ISrsHttpHandler -{ -private: - SrsRequest* req; - SrsSource* source; - SrsStreamCache* cache; -public: - SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c); - virtual ~SrsLiveStream(); -public: - virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); -private: - virtual int streaming_send_messages(ISrsStreamEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs); -}; - -/** -* the srs live entry -*/ -struct SrsLiveEntry -{ -private: - bool _is_flv; - bool _is_ts; - bool _is_aac; - bool _is_mp3; -public: - // for template, the mount contains variables. - // for concrete stream, the mount is url to access. - std::string mount; - // whether hstrs(http stream trigger rtmp source) - bool hstrs; - - SrsLiveStream* stream; - SrsStreamCache* cache; - - SrsLiveEntry(std::string m, bool h); - - bool is_flv(); - bool is_ts(); - bool is_mp3(); - bool is_aac(); -}; - -/** -* the m3u8 stream handler. -*/ -class SrsHlsM3u8Stream : public ISrsHttpHandler -{ + virtual int do_cycle(); +protected: + // when got http message, + // for the static service or api, discard any body. + // for the stream caster, for instance, http flv streaming, may discard the flv header or not. + virtual int on_got_http_message(ISrsHttpMessage* msg) = 0; private: - std::string m3u8; -public: - SrsHlsM3u8Stream(); - virtual ~SrsHlsM3u8Stream(); -public: - virtual void set_m3u8(std::string v); -public: - virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); + virtual int process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); }; -/** -* the ts stream handler. -*/ -class SrsHlsTsStream : public ISrsHttpHandler +// TODO: FIXME: rename to SrsResponseOnlyHttpConn. +class SrsStaticHttpConn : public SrsHttpConn { -private: - std::string ts; -public: - SrsHlsTsStream(); - virtual ~SrsHlsTsStream(); public: - virtual void set_ts(std::string v); + SrsStaticHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m); + virtual ~SrsStaticHttpConn(); public: - virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); -}; - -/** -* the srs hls entry. -*/ -// TODO: FIXME: use hte hls template and entry. -struct SrsHlsEntry -{ - // for template, the mount contains variables. - // for concrete stream, the mount is url to access. - std::string mount; - - // the template to create the entry - SrsHlsEntry* tmpl; - - // key: the m3u8/ts file path. - // value: the http handler. - std::map streams; - - SrsHlsEntry(); + virtual int on_got_http_message(ISrsHttpMessage* msg); }; /** -* the http server instance, -* serve http static file, flv vod stream and flv live stream. -*/ -class SrsHttpServer : virtual public ISrsReloadHandler - , virtual public ISrsHttpMatchHijacker + * the http server, use http stream or static server to serve requests. + */ +class SrsHttpServer : public ISrsHttpServeMux { private: SrsServer* server; -public: - SrsHttpServeMux mux; - // the http live streaming template, to create streams. - std::map tflvs; - // the http live streaming streams, crote by template. - std::map sflvs; - // the hls live streaming template, to create streams. - std::map thls; - // the hls live streaming streams, crote by template. - std::map shls; + SrsHttpStaticServer* http_static; + SrsHttpStreamServer* http_stream; public: SrsHttpServer(SrsServer* svr); virtual ~SrsHttpServer(); public: virtual int initialize(); +// ISrsHttpServeMux +public: + virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); // http flv/ts/mp3/aac stream public: virtual int http_mount(SrsSource* s, SrsRequest* r); @@ -703,56 +432,6 @@ public: virtual int hls_update_ts(SrsRequest* r, std::string uri, std::string ts); virtual int hls_remove_ts(SrsRequest* r, std::string uri); virtual void unmount_hls(SrsRequest* r); -// interface ISrsReloadHandler. -public: - virtual int on_reload_vhost_http_updated(); - virtual int on_reload_vhost_http_remux_updated(); - virtual int on_reload_vhost_hls(std::string vhost); -// interface ISrsHttpMatchHijacker -public: - virtual int hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph); -private: - virtual int initialize_static_file(); - virtual int initialize_flv_streaming(); - virtual int initialize_hls_streaming(); - virtual std::string hls_mount_generate(SrsRequest* r, std::string uri, std::string tmpl); -}; - -#endif - -#ifdef SRS_AUTO_HTTP_CORE -class SrsHttpConn : public SrsConnection -{ -private: - SrsHttpParser* parser; - SrsHttpServeMux* http_mux; -public: - SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); - virtual ~SrsHttpConn(); -// interface IKbpsDelta -public: - virtual void resample(); - virtual int64_t get_send_bytes_delta(); - virtual int64_t get_recv_bytes_delta(); - virtual void cleanup(); -protected: - virtual int do_cycle(); -protected: - // when got http message, - // for the static service or api, discard any body. - // for the stream caster, for instance, http flv streaming, may discard the flv header or not. - virtual int on_got_http_message(ISrsHttpMessage* msg) = 0; -private: - virtual int process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); -}; - -class SrsStaticHttpConn : public SrsHttpConn -{ -public: - SrsStaticHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); - virtual ~SrsStaticHttpConn(); -public: - virtual int on_got_http_message(ISrsHttpMessage* msg); }; #endif diff --git a/trunk/src/app/srs_app_http_static.cpp b/trunk/src/app/srs_app_http_static.cpp new file mode 100644 index 000000000..01c94405f --- /dev/null +++ b/trunk/src/app/srs_app_http_static.cpp @@ -0,0 +1,283 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 SRS(simple-rtmp-server) + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#if defined(SRS_AUTO_HTTP_CORE) + +#include +#include +#include +#include + +#include +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#endif + +#ifdef SRS_AUTO_HTTP_SERVER + +SrsVodStream::SrsVodStream(string root_dir) + : SrsHttpFileServer(root_dir) +{ +} + +SrsVodStream::~SrsVodStream() +{ +} + +int SrsVodStream::serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, int offset) +{ + int ret = ERROR_SUCCESS; + + SrsFileReader fs; + + // open flv file + if ((ret = fs.open(fullpath)) != ERROR_SUCCESS) { + return ret; + } + + if (offset > fs.filesize()) { + ret = ERROR_HTTP_REMUX_OFFSET_OVERFLOW; + srs_warn("http flv streaming %s overflow. size=%"PRId64", offset=%d, ret=%d", + fullpath.c_str(), fs.filesize(), offset, ret); + return ret; + } + + SrsFlvVodStreamDecoder ffd; + + // open fast decoder + if ((ret = ffd.initialize(&fs)) != ERROR_SUCCESS) { + return ret; + } + + // save header, send later. + char flv_header[13]; + + // send flv header + if ((ret = ffd.read_header_ext(flv_header)) != ERROR_SUCCESS) { + return ret; + } + + // save sequence header, send later + char* sh_data = NULL; + int sh_size = 0; + + if (true) { + // send sequence header + int64_t start = 0; + if ((ret = ffd.read_sequence_header_summary(&start, &sh_size)) != ERROR_SUCCESS) { + return ret; + } + if (sh_size <= 0) { + ret = ERROR_HTTP_REMUX_SEQUENCE_HEADER; + srs_warn("http flv streaming no sequence header. size=%d, ret=%d", sh_size, ret); + return ret; + } + } + sh_data = new char[sh_size]; + SrsAutoFree(char, sh_data); + if ((ret = fs.read(sh_data, sh_size, NULL)) != ERROR_SUCCESS) { + return ret; + } + + // seek to data offset + int64_t left = fs.filesize() - offset; + + // write http header for ts. + w->header()->set_content_length((int)(sizeof(flv_header) + sh_size + left)); + w->header()->set_content_type("video/x-flv"); + + // write flv header and sequence header. + if ((ret = w->write(flv_header, sizeof(flv_header))) != ERROR_SUCCESS) { + return ret; + } + if (sh_size > 0 && (ret = w->write(sh_data, sh_size)) != ERROR_SUCCESS) { + return ret; + } + + // write body. + if ((ret = ffd.lseek(offset)) != ERROR_SUCCESS) { + return ret; + } + + // send data + if ((ret = copy(w, &fs, r, (int)left)) != ERROR_SUCCESS) { + srs_warn("read flv=%s size=%d failed, ret=%d", fullpath.c_str(), left, ret); + return ret; + } + + return ret; +} + +int SrsVodStream::serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, string fullpath, int start, int end) +{ + int ret = ERROR_SUCCESS; + + srs_assert(start >= 0); + srs_assert(end == -1 || end >= 0); + + SrsFileReader fs; + + // open flv file + if ((ret = fs.open(fullpath)) != ERROR_SUCCESS) { + return ret; + } + + // parse -1 to whole file. + if (end == -1) { + end = (int)fs.filesize(); + } + + if (end > fs.filesize() || start > end) { + ret = ERROR_HTTP_REMUX_OFFSET_OVERFLOW; + srs_warn("http mp4 streaming %s overflow. size=%"PRId64", offset=%d, ret=%d", + fullpath.c_str(), fs.filesize(), start, ret); + return ret; + } + + // seek to data offset, [start, end] for range. + int64_t left = end - start + 1; + + // write http header for ts. + w->header()->set_content_length(left); + w->header()->set_content_type("video/mp4"); + + // status code 206 to make dash.as happy. + w->write_header(SRS_CONSTS_HTTP_PartialContent); + + // response the content range header. + std::stringstream content_range; + content_range << "bytes " << start << "-" << end << "/" << fs.filesize(); + w->header()->set("Content-Range", content_range.str()); + + // write body. + fs.lseek(start); + + // send data + if ((ret = copy(w, &fs, r, (int)left)) != ERROR_SUCCESS) { + srs_warn("read mp4=%s size=%d failed, ret=%d", fullpath.c_str(), left, ret); + return ret; + } + + return ret; +} + +SrsHttpStaticServer::SrsHttpStaticServer(SrsServer* svr) +{ + server = svr; +} + +SrsHttpStaticServer::~SrsHttpStaticServer() +{ +} + +int SrsHttpStaticServer::initialize() +{ + int ret = ERROR_SUCCESS; + + bool default_root_exists = false; + + // http static file and flv vod stream mount for each vhost. + SrsConfDirective* root = _srs_config->get_root(); + for (int i = 0; i < (int)root->directives.size(); i++) { + SrsConfDirective* conf = root->at(i); + + if (!conf->is_vhost()) { + continue; + } + + std::string vhost = conf->arg0(); + if (!_srs_config->get_vhost_http_enabled(vhost)) { + continue; + } + + std::string mount = _srs_config->get_vhost_http_mount(vhost); + std::string dir = _srs_config->get_vhost_http_dir(vhost); + + // replace the vhost variable + mount = srs_string_replace(mount, "[vhost]", vhost); + + // remove the default vhost mount + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); + + // the dir mount must always ends with "/" + if (mount != "/" && mount.rfind("/") != mount.length() - 1) { + mount += "/"; + } + + // mount the http of vhost. + if ((ret = mux.handle(mount, new SrsVodStream(dir))) != ERROR_SUCCESS) { + srs_error("http: mount dir=%s for vhost=%s failed. ret=%d", dir.c_str(), vhost.c_str(), ret); + return ret; + } + + if (mount == "/") { + default_root_exists = true; + srs_warn("http: root mount to %s", dir.c_str()); + } + srs_trace("http: vhost=%s mount to %s", vhost.c_str(), mount.c_str()); + } + + if (!default_root_exists) { + // add root + std::string dir = _srs_config->get_http_stream_dir(); + if ((ret = mux.handle("/", new SrsVodStream(dir))) != ERROR_SUCCESS) { + srs_error("http: mount root dir=%s failed. ret=%d", dir.c_str(), ret); + return ret; + } + srs_trace("http: root mount to %s", dir.c_str()); + } + + return ret; +} + +int SrsHttpStaticServer::on_reload_vhost_http_updated() +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +#endif + diff --git a/trunk/src/app/srs_app_http_static.hpp b/trunk/src/app/srs_app_http_static.hpp new file mode 100644 index 000000000..1a0a70f83 --- /dev/null +++ b/trunk/src/app/srs_app_http_static.hpp @@ -0,0 +1,76 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 SRS(simple-rtmp-server) + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_APP_HTTP_STATIC_HPP +#define SRS_APP_HTTP_STATIC_HPP + +/* +#include +*/ + +#include + +#include + +#ifdef SRS_AUTO_HTTP_SERVER + +/** + * the flv vod stream supports flv?start=offset-bytes. + * for example, http://server/file.flv?start=10240 + * server will write flv header and sequence header, + * then seek(10240) and response flv tag data. + */ +class SrsVodStream : public SrsHttpFileServer +{ +public: + SrsVodStream(std::string root_dir); + virtual ~SrsVodStream(); +protected: + virtual int serve_flv_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int offset); + virtual int serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int start, int end); +}; + +/** +* the http static server instance, +* serve http static file and flv/mp4 vod stream. +*/ +class SrsHttpStaticServer : virtual public ISrsReloadHandler +{ +private: + SrsServer* server; +public: + SrsHttpServeMux mux; +public: + SrsHttpStaticServer(SrsServer* svr); + virtual ~SrsHttpStaticServer(); +public: + virtual int initialize(); +// interface ISrsReloadHandler. +public: + virtual int on_reload_vhost_http_updated(); +}; + +#endif + +#endif + diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp new file mode 100644 index 000000000..d2a2af7d2 --- /dev/null +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -0,0 +1,1220 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 SRS(simple-rtmp-server) + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#if defined(SRS_AUTO_HTTP_CORE) + +#include +#include +#include +#include + +#include +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#endif + +#ifdef SRS_AUTO_HTTP_SERVER + +SrsStreamCache::SrsStreamCache(SrsSource* s, SrsRequest* r) +{ + req = r->copy(); + source = s; + queue = new SrsMessageQueue(true); + pthread = new SrsEndlessThread("http-stream", this); +} + +SrsStreamCache::~SrsStreamCache() +{ + srs_freep(pthread); + + srs_freep(queue); + srs_freep(req); +} + +int SrsStreamCache::start() +{ + return pthread->start(); +} + +int SrsStreamCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) +{ + int ret = ERROR_SUCCESS; + + double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); + + if (fast_cache <= 0) { + srs_info("http: ignore dump fast cache."); + return ret; + } + + // the jitter is get from SrsSource, which means the time_jitter of vhost. + if ((ret = queue->dump_packets(consumer, false, jitter)) != ERROR_SUCCESS) { + return ret; + } + + srs_trace("http: dump cache %d msgs, duration=%dms, cache=%.2fs", + queue->size(), queue->duration(), fast_cache); + + return ret; +} + +int SrsStreamCache::cycle() +{ + int ret = ERROR_SUCCESS; + + SrsConsumer* consumer = NULL; + if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) { + srs_error("http: create consumer failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsConsumer, consumer); + + SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream_cache(); + SrsAutoFree(SrsPithyPrint, pprint); + + SrsMessageArray msgs(SRS_PERF_MW_MSGS); + + // TODO: FIXME: support reload. + double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); + if (fast_cache > 0) { + queue->set_queue_size(fast_cache); + } + + while (true) { + pprint->elapse(); + + // get messages from consumer. + // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. + int count = 0; + if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { + srs_error("http: get messages from consumer failed. ret=%d", ret); + return ret; + } + + if (count <= 0) { + srs_info("http: mw sleep %dms for no msg", mw_sleep); + // directly use sleep, donot use consumer wait. + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + + // ignore when nothing got. + continue; + } + + if (pprint->can_print()) { + srs_trace("-> "SRS_CONSTS_LOG_HTTP_STREAM_CACHE" http: got %d msgs, age=%d, min=%d, mw=%d", + count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000); + } + + // free the messages. + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs.msgs[i]; + if (fast_cache > 0) { + queue->enqueue(msg); + } else { + srs_freep(msg); + } + } + } + + return ret; +} + +ISrsStreamEncoder::ISrsStreamEncoder() +{ +} + +ISrsStreamEncoder::~ISrsStreamEncoder() +{ +} + +SrsTsStreamEncoder::SrsTsStreamEncoder() +{ + enc = new SrsTsEncoder(); +} + +SrsTsStreamEncoder::~SrsTsStreamEncoder() +{ + srs_freep(enc); +} + +int SrsTsStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* /*c*/) +{ + int ret = ERROR_SUCCESS; + + if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsTsStreamEncoder::write_audio(int64_t timestamp, char* data, int size) +{ + return enc->write_audio(timestamp, data, size); +} + +int SrsTsStreamEncoder::write_video(int64_t timestamp, char* data, int size) +{ + return enc->write_video(timestamp, data, size); +} + +int SrsTsStreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, int /*size*/) +{ + return ERROR_SUCCESS; +} + +bool SrsTsStreamEncoder::has_cache() +{ + // for ts stream, use gop cache of SrsSource is ok. + return false; +} + +int SrsTsStreamEncoder::dump_cache(SrsConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/) +{ + // for ts stream, ignore cache. + return ERROR_SUCCESS; +} + +SrsFlvStreamEncoder::SrsFlvStreamEncoder() +{ + enc = new SrsFlvEncoder(); +} + +SrsFlvStreamEncoder::~SrsFlvStreamEncoder() +{ + srs_freep(enc); +} + +int SrsFlvStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* /*c*/) +{ + int ret = ERROR_SUCCESS; + + if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { + return ret; + } + + // write flv header. + if ((ret = enc->write_header()) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsFlvStreamEncoder::write_audio(int64_t timestamp, char* data, int size) +{ + return enc->write_audio(timestamp, data, size); +} + +int SrsFlvStreamEncoder::write_video(int64_t timestamp, char* data, int size) +{ + return enc->write_video(timestamp, data, size); +} + +int SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, int size) +{ + return enc->write_metadata(SrsCodecFlvTagScript, data, size); +} + +bool SrsFlvStreamEncoder::has_cache() +{ + // for flv stream, use gop cache of SrsSource is ok. + return false; +} + +int SrsFlvStreamEncoder::dump_cache(SrsConsumer* /*consumer*/, SrsRtmpJitterAlgorithm /*jitter*/) +{ + // for flv stream, ignore cache. + return ERROR_SUCCESS; +} + +#ifdef SRS_PERF_FAST_FLV_ENCODER +SrsFastFlvStreamEncoder::SrsFastFlvStreamEncoder() +{ +} + +SrsFastFlvStreamEncoder::~SrsFastFlvStreamEncoder() +{ +} + +int SrsFastFlvStreamEncoder::write_tags(SrsSharedPtrMessage** msgs, int count) +{ + return enc->write_tags(msgs, count); +} +#endif + +SrsAacStreamEncoder::SrsAacStreamEncoder() +{ + enc = new SrsAacEncoder(); + cache = NULL; +} + +SrsAacStreamEncoder::~SrsAacStreamEncoder() +{ + srs_freep(enc); +} + +int SrsAacStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c) +{ + int ret = ERROR_SUCCESS; + + cache = c; + + if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsAacStreamEncoder::write_audio(int64_t timestamp, char* data, int size) +{ + return enc->write_audio(timestamp, data, size); +} + +int SrsAacStreamEncoder::write_video(int64_t /*timestamp*/, char* /*data*/, int /*size*/) +{ + // aac ignore any flv video. + return ERROR_SUCCESS; +} + +int SrsAacStreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, int /*size*/) +{ + // aac ignore any flv metadata. + return ERROR_SUCCESS; +} + +bool SrsAacStreamEncoder::has_cache() +{ + return true; +} + +int SrsAacStreamEncoder::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) +{ + srs_assert(cache); + return cache->dump_cache(consumer, jitter); +} + +SrsMp3StreamEncoder::SrsMp3StreamEncoder() +{ + enc = new SrsMp3Encoder(); + cache = NULL; +} + +SrsMp3StreamEncoder::~SrsMp3StreamEncoder() +{ + srs_freep(enc); +} + +int SrsMp3StreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c) +{ + int ret = ERROR_SUCCESS; + + cache = c; + + if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = enc->write_header()) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsMp3StreamEncoder::write_audio(int64_t timestamp, char* data, int size) +{ + return enc->write_audio(timestamp, data, size); +} + +int SrsMp3StreamEncoder::write_video(int64_t /*timestamp*/, char* /*data*/, int /*size*/) +{ + // mp3 ignore any flv video. + return ERROR_SUCCESS; +} + +int SrsMp3StreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, int /*size*/) +{ + // mp3 ignore any flv metadata. + return ERROR_SUCCESS; +} + +bool SrsMp3StreamEncoder::has_cache() +{ + return true; +} + +int SrsMp3StreamEncoder::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) +{ + srs_assert(cache); + return cache->dump_cache(consumer, jitter); +} + +SrsStreamWriter::SrsStreamWriter(ISrsHttpResponseWriter* w) +{ + writer = w; +} + +SrsStreamWriter::~SrsStreamWriter() +{ +} + +int SrsStreamWriter::open(std::string /*file*/) +{ + return ERROR_SUCCESS; +} + +void SrsStreamWriter::close() +{ +} + +bool SrsStreamWriter::is_open() +{ + return true; +} + +int64_t SrsStreamWriter::tellg() +{ + return 0; +} + +int SrsStreamWriter::write(void* buf, size_t count, ssize_t* pnwrite) +{ + if (pnwrite) { + *pnwrite = count; + } + return writer->write((char*)buf, (int)count); +} + +int SrsStreamWriter::writev(iovec* iov, int iovcnt, ssize_t* pnwrite) +{ + return writer->writev(iov, iovcnt, pnwrite); +} + +SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c) +{ + source = s; + cache = c; + req = r->copy(); +} + +SrsLiveStream::~SrsLiveStream() +{ + srs_freep(req); +} + +int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) +{ + int ret = ERROR_SUCCESS; + + ISrsStreamEncoder* enc = NULL; + + srs_assert(entry); + if (srs_string_ends_with(entry->pattern, ".flv")) { + w->header()->set_content_type("video/x-flv"); +#ifdef SRS_PERF_FAST_FLV_ENCODER + enc = new SrsFastFlvStreamEncoder(); +#else + enc = new SrsFlvStreamEncoder(); +#endif + } else if (srs_string_ends_with(entry->pattern, ".aac")) { + w->header()->set_content_type("audio/x-aac"); + enc = new SrsAacStreamEncoder(); + } else if (srs_string_ends_with(entry->pattern, ".mp3")) { + w->header()->set_content_type("audio/mpeg"); + enc = new SrsMp3StreamEncoder(); + } else if (srs_string_ends_with(entry->pattern, ".ts")) { + w->header()->set_content_type("video/MP2T"); + enc = new SrsTsStreamEncoder(); + } else { + ret = ERROR_HTTP_LIVE_STREAM_EXT; + srs_error("http: unsupported pattern %s", entry->pattern.c_str()); + return ret; + } + SrsAutoFree(ISrsStreamEncoder, enc); + + // create consumer of souce, ignore gop cache, use the audio gop cache. + SrsConsumer* consumer = NULL; + if ((ret = source->create_consumer(consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) { + srs_error("http: create consumer failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsConsumer, consumer); + srs_verbose("http: consumer created success."); + + SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream(); + SrsAutoFree(SrsPithyPrint, pprint); + + SrsMessageArray msgs(SRS_PERF_MW_MSGS); + + // the memory writer. + SrsStreamWriter writer(w); + if ((ret = enc->initialize(&writer, cache)) != ERROR_SUCCESS) { + srs_error("http: initialize stream encoder failed. ret=%d", ret); + return ret; + } + + // if gop cache enabled for encoder, dump to consumer. + if (enc->has_cache()) { + if ((ret = enc->dump_cache(consumer, source->jitter())) != ERROR_SUCCESS) { + srs_error("http: dump cache to consumer failed. ret=%d", ret); + return ret; + } + } + +#ifdef SRS_PERF_FAST_FLV_ENCODER + SrsFastFlvStreamEncoder* ffe = dynamic_cast(enc); +#endif + + while (true) { + pprint->elapse(); + + // get messages from consumer. + // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. + int count = 0; + if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { + srs_error("http: get messages from consumer failed. ret=%d", ret); + return ret; + } + + if (count <= 0) { + srs_info("http: mw sleep %dms for no msg", mw_sleep); + // directly use sleep, donot use consumer wait. + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + + // ignore when nothing got. + continue; + } + + if (pprint->can_print()) { + srs_info("-> "SRS_CONSTS_LOG_HTTP_STREAM" http: got %d msgs, age=%d, min=%d, mw=%d", + count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000); + } + + // sendout all messages. +#ifdef SRS_PERF_FAST_FLV_ENCODER + if (ffe) { + ret = ffe->write_tags(msgs.msgs, count); + } else { + ret = streaming_send_messages(enc, msgs.msgs, count); + } +#else + ret = streaming_send_messages(enc, msgs.msgs, count); +#endif + + // free the messages. + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs.msgs[i]; + srs_freep(msg); + } + + // check send error code. + if (ret != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("http: send messages to client failed. ret=%d", ret); + } + return ret; + } + } + + return ret; +} + +int SrsLiveStream::streaming_send_messages(ISrsStreamEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs) +{ + int ret = ERROR_SUCCESS; + + for (int i = 0; i < nb_msgs; i++) { + SrsSharedPtrMessage* msg = msgs[i]; + + if (msg->is_audio()) { + ret = enc->write_audio(msg->timestamp, msg->payload, msg->size); + } else if (msg->is_video()) { + ret = enc->write_video(msg->timestamp, msg->payload, msg->size); + } else { + ret = enc->write_metadata(msg->timestamp, msg->payload, msg->size); + } + + if (ret != ERROR_SUCCESS) { + return ret; + } + } + + return ret; +} + +SrsLiveEntry::SrsLiveEntry(std::string m, bool h) +{ + mount = m; + hstrs = h; + + stream = NULL; + cache = NULL; + + std::string ext; + size_t pos = string::npos; + if ((pos = m.rfind(".")) != string::npos) { + ext = m.substr(pos); + } + _is_flv = (ext == ".flv"); + _is_ts = (ext == ".ts"); + _is_mp3 = (ext == ".mp3"); + _is_aac = (ext == ".aac"); +} + +bool SrsLiveEntry::is_flv() +{ + return _is_flv; +} + +bool SrsLiveEntry::is_ts() +{ + return _is_ts; +} + +bool SrsLiveEntry::is_aac() +{ + return _is_aac; +} + +bool SrsLiveEntry::is_mp3() +{ + return _is_mp3; +} + +SrsHlsM3u8Stream::SrsHlsM3u8Stream() +{ +} + +SrsHlsM3u8Stream::~SrsHlsM3u8Stream() +{ +} + +void SrsHlsM3u8Stream::set_m3u8(std::string v) +{ + m3u8 = v; +} + +int SrsHlsM3u8Stream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) +{ + int ret = ERROR_SUCCESS; + + std::string data = m3u8; + + w->header()->set_content_length((int)data.length()); + w->header()->set_content_type("application/x-mpegURL;charset=utf-8"); + + if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send m3u8 failed. ret=%d", ret); + } + return ret; + } + + return ret; +} + +SrsHlsTsStream::SrsHlsTsStream() +{ +} + +SrsHlsTsStream::~SrsHlsTsStream() +{ +} + +void SrsHlsTsStream::set_ts(std::string v) +{ + ts = v; +} + +int SrsHlsTsStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) +{ + int ret = ERROR_SUCCESS; + + std::string data = ts; + + w->header()->set_content_length((int)data.length()); + w->header()->set_content_type("video/MP2T"); + + if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send ts failed. ret=%d", ret); + } + return ret; + } + + return ret; +} + +SrsHlsEntry::SrsHlsEntry() +{ + tmpl = NULL; +} + +SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr) +{ + server = svr; + + mux.hijack(this); +} + +SrsHttpStreamServer::~SrsHttpStreamServer() +{ + mux.unhijack(this); + + if (true) { + std::map::iterator it; + for (it = tflvs.begin(); it != tflvs.end(); ++it) { + SrsLiveEntry* entry = it->second; + srs_freep(entry); + } + tflvs.clear(); + } + if (true) { + std::map::iterator it; + for (it = sflvs.begin(); it != sflvs.end(); ++it) { + SrsLiveEntry* entry = it->second; + srs_freep(entry); + } + sflvs.clear(); + } + if (true) { + std::map::iterator it; + for (it = thls.begin(); it != thls.end(); ++it) { + SrsHlsEntry* entry = it->second; + srs_freep(entry); + } + thls.clear(); + } + if (true) { + std::map::iterator it; + for (it = shls.begin(); it != shls.end(); ++it) { + SrsHlsEntry* entry = it->second; + srs_freep(entry); + } + shls.clear(); + } +} + +int SrsHttpStreamServer::initialize() +{ + int ret = ERROR_SUCCESS; + + // remux rtmp to flv live streaming + if ((ret = initialize_flv_streaming()) != ERROR_SUCCESS) { + return ret; + } + + // remux rtmp to hls live streaming + if ((ret = initialize_hls_streaming()) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +// TODO: FIXME: rename for HTTP FLV mount. +int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r) +{ + int ret = ERROR_SUCCESS; + + // the id to identify stream. + std::string sid = r->get_stream_url(); + SrsLiveEntry* entry = NULL; + + // create stream from template when not found. + if (sflvs.find(sid) == sflvs.end()) { + if (tflvs.find(r->vhost) == tflvs.end()) { + srs_info("ignore mount flv stream for disabled"); + return ret; + } + + SrsLiveEntry* tmpl = tflvs[r->vhost]; + + std::string mount = tmpl->mount; + + // replace the vhost variable + mount = srs_string_replace(mount, "[vhost]", r->vhost); + mount = srs_string_replace(mount, "[app]", r->app); + mount = srs_string_replace(mount, "[stream]", r->stream); + + // remove the default vhost mount + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); + + entry = new SrsLiveEntry(mount, tmpl->hstrs); + + entry->cache = new SrsStreamCache(s, r); + entry->stream = new SrsLiveStream(s, r, entry->cache); + + sflvs[sid] = entry; + + // mount the http flv stream. + // we must register the handler, then start the thread, + // for the thread will cause thread switch context. + // @see https://github.com/simple-rtmp-server/srs/issues/404 + if ((ret = mux.handle(mount, entry->stream)) != ERROR_SUCCESS) { + srs_error("http: mount flv stream for vhost=%s failed. ret=%d", sid.c_str(), ret); + return ret; + } + + // start http stream cache thread + if ((ret = entry->cache->start()) != ERROR_SUCCESS) { + srs_error("http: start stream cache failed. ret=%d", ret); + return ret; + } + srs_trace("http: mount flv stream for vhost=%s, mount=%s", sid.c_str(), mount.c_str()); + } else { + entry = sflvs[sid]; + } + + // TODO: FIXME: supports reload. + if (entry->stream) { + entry->stream->entry->enabled = true; + return ret; + } + + return ret; +} + +void SrsHttpStreamServer::http_unmount(SrsSource* s, SrsRequest* r) +{ + std::string sid = r->get_stream_url(); + + if (sflvs.find(sid) == sflvs.end()) { + srs_info("ignore unmount flv stream for disabled"); + return; + } + + SrsLiveEntry* entry = sflvs[sid]; + entry->stream->entry->enabled = false; +} + +int SrsHttpStreamServer::mount_hls(SrsRequest* r) +{ + int ret = ERROR_SUCCESS; + + std::string sid = r->get_stream_url(); + + if (shls.find(sid) == shls.end()) { + srs_info("ignore mount hls stream for disabled"); + return ret; + } + + SrsHlsEntry* entry = shls[sid]; + + // TODO: FIXME: supports reload. + std::map::iterator it; + for (it = entry->streams.begin(); it != entry->streams.end(); ++it) { + ISrsHttpHandler* stream = it->second; + stream->entry->enabled = true; + } + + return ret; +} + +int SrsHttpStreamServer::hls_update_m3u8(SrsRequest* r, string m3u8) +{ + int ret = ERROR_SUCCESS; + + std::string mount; + + std::string sid = r->get_stream_url(); + SrsHlsEntry* entry = NULL; + + // create stream from template when not found. + if (shls.find(sid) == shls.end()) { + if (thls.find(r->vhost) == thls.end()) { + srs_info("ignore mount hls stream for disabled"); + return ret; + } + + SrsHlsEntry* tmpl = thls[r->vhost]; + srs_assert(tmpl); + + entry = new SrsHlsEntry(); + mount = tmpl->mount; + + // replace the vhost variable + mount = srs_string_replace(mount, "[vhost]", r->vhost); + mount = srs_string_replace(mount, "[app]", r->app); + mount = srs_string_replace(mount, "[stream]", r->stream); + + // remove the default vhost mount + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); + + entry->tmpl = tmpl; + entry->mount = mount; + shls[sid] = entry; + + if (entry->streams.find(mount) == entry->streams.end()) { + ISrsHttpHandler* he = new SrsHlsM3u8Stream(); + entry->streams[mount] = he; + + if ((ret = mux.handle(mount, he)) != ERROR_SUCCESS) { + srs_error("handle mount=%s failed. ret=%d", mount.c_str(), ret); + return ret; + } + } + } else { + entry = shls[sid]; + } + + mount = entry->mount; + + // update the m3u8 stream. + SrsHlsM3u8Stream* hms = dynamic_cast(entry->streams[mount]); + if (hms) { + hms->set_m3u8(m3u8); + } + srs_trace("hls update m3u8 ok, mount=%s", mount.c_str()); + + return ret; +} + +int SrsHttpStreamServer::hls_update_ts(SrsRequest* r, string uri, string ts) +{ + int ret = ERROR_SUCCESS; + + std::string sid = r->get_stream_url(); + + // when no hls mounted, init with empty m3u8. + if (shls.find(sid) == shls.end()) { + if ((ret = hls_update_m3u8(r, "")) != ERROR_SUCCESS) { + return ret; + } + } + + // find again, ignore if not exits. + if (shls.find(sid) == shls.end()) { + return ret; + } + + SrsHlsEntry* entry = shls[sid]; + srs_assert(entry); + srs_assert(entry->tmpl); + + std::string mount = hls_mount_generate(r, uri, entry->tmpl->mount); + + if (entry->streams.find(mount) == entry->streams.end()) { + ISrsHttpHandler* he = new SrsHlsTsStream(); + entry->streams[mount] = he; + + if ((ret = mux.handle(mount, he)) != ERROR_SUCCESS) { + srs_error("handle mount=%s failed. ret=%d", mount.c_str(), ret); + return ret; + } + } + + // update the ts stream. + SrsHlsTsStream* hts = dynamic_cast(entry->streams[mount]); + if (hts) { + hts->set_ts(ts); + } + srs_trace("hls update ts ok, mount=%s", mount.c_str()); + + return ret; +} + + +int SrsHttpStreamServer::hls_remove_ts(SrsRequest* r, string uri) +{ + int ret = ERROR_SUCCESS; + + std::string sid = r->get_stream_url(); + + // when no hls mounted, ignore. + if (shls.find(sid) == shls.end()) { + return ret; + } + + SrsHlsEntry* entry = shls[sid]; + srs_assert(entry); + srs_assert(entry->tmpl); + + std::string mount = hls_mount_generate(r, uri, entry->tmpl->mount); + + // ignore when no ts mounted. + if (entry->streams.find(mount) == entry->streams.end()) { + return ret; + } + + // update the ts stream. + SrsHlsTsStream* hts = dynamic_cast(entry->streams[mount]); + if (hts) { + hts->set_ts(""); + // TODO: FIXME: unmount and remove the http handler. + } + srs_trace("hls remove ts ok, mount=%s", mount.c_str()); + + return ret; +} + +void SrsHttpStreamServer::unmount_hls(SrsRequest* r) +{ + std::string sid = r->get_stream_url(); + + if (shls.find(sid) == shls.end()) { + srs_info("ignore unmount hls stream for disabled"); + return; + } + + SrsHlsEntry* entry = shls[sid]; + + std::map::iterator it; + for (it = entry->streams.begin(); it != entry->streams.end(); ++it) { + ISrsHttpHandler* stream = it->second; + stream->entry->enabled = false; + } +} + +int SrsHttpStreamServer::on_reload_vhost_http_remux_updated() +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +int SrsHttpStreamServer::on_reload_vhost_hls(string vhost) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) +{ + int ret = ERROR_SUCCESS; + + // when handler not the root, we think the handler is ok. + ISrsHttpHandler* h = *ph? *ph : NULL; + if (h && h->entry && h->entry->pattern != "/") { + return ret; + } + + // only hijack for http streaming, http-flv/ts/mp3/aac. + std::string ext = request->ext(); + if (ext.empty()) { + return ret; + } + + // find the actually request vhost. + SrsConfDirective* vhost = _srs_config->get_vhost(request->host()); + if (!vhost || !_srs_config->get_vhost_enabled(vhost)) { + return ret; + } + + // find the entry template for the stream. + SrsLiveEntry* entry = NULL; + if (true) { + // no http streaming on vhost, ignore. + std::map::iterator it = tflvs.find(vhost->arg0()); + if (it == tflvs.end()) { + return ret; + } + + // hstrs not enabled, ignore. + entry = it->second; + if (!entry->hstrs) { + return ret; + } + + // check entry and request extension. + if (entry->is_flv()) { + if (ext != ".flv") { + return ret; + } + } else if (entry->is_ts()) { + if (ext != ".ts") { + return ret; + } + } else if (entry->is_mp3()) { + if (ext != ".mp3") { + return ret; + } + } else if (entry->is_aac()) { + if (ext != ".aac") { + return ret; + } + } else { + return ret; + } + } + + // convert to concreate class. + SrsHttpMessage* hreq = dynamic_cast(request); + srs_assert(hreq); + + // hijack for entry. + SrsRequest* r = hreq->to_request(vhost->arg0()); + SrsAutoFree(SrsRequest, r); + SrsSource* s = SrsSource::fetch(r); + if (!s) { + if ((ret = SrsSource::create(r, server, server, &s)) != ERROR_SUCCESS) { + return ret; + } + } + srs_assert(s != NULL); + + // create http streaming handler. + if ((ret = http_mount(s, r)) != ERROR_SUCCESS) { + return ret; + } + + // use the handler if exists. + if (ph) { + std::string sid = r->get_stream_url(); + if (sflvs.find(sid) != sflvs.end()) { + entry = sflvs[sid]; + *ph = entry->stream; + } + } + + // trigger edge to fetch from origin. + bool vhost_is_edge = _srs_config->get_vhost_is_edge(r->vhost); + srs_trace("hstrs: source url=%s, is_edge=%d, source_id=%d[%d]", + r->get_stream_url().c_str(), vhost_is_edge, s->source_id(), s->source_id()); + + // TODO: FIXME: disconnect when all connection closed. + if (vhost_is_edge) { + // notice edge to start for the first client. + if ((ret = s->on_edge_start_play()) != ERROR_SUCCESS) { + srs_error("notice edge start play stream failed. ret=%d", ret); + return ret; + } + } + + return ret; +} + +int SrsHttpStreamServer::initialize_flv_streaming() +{ + int ret = ERROR_SUCCESS; + + // http flv live stream mount for each vhost. + SrsConfDirective* root = _srs_config->get_root(); + for (int i = 0; i < (int)root->directives.size(); i++) { + SrsConfDirective* conf = root->at(i); + + if (!conf->is_vhost()) { + continue; + } + + std::string vhost = conf->arg0(); + if (!_srs_config->get_vhost_http_remux_enabled(vhost)) { + continue; + } + + SrsLiveEntry* entry = new SrsLiveEntry( + _srs_config->get_vhost_http_remux_mount(vhost), + _srs_config->get_vhost_http_remux_hstrs(vhost) + ); + tflvs[vhost] = entry; + srs_trace("http flv live stream, vhost=%s, mount=%s", + vhost.c_str(), entry->mount.c_str()); + } + + return ret; +} + +int SrsHttpStreamServer::initialize_hls_streaming() +{ + int ret = ERROR_SUCCESS; + + // http hls live stream mount for each vhost. + SrsConfDirective* root = _srs_config->get_root(); + for (int i = 0; i < (int)root->directives.size(); i++) { + SrsConfDirective* conf = root->at(i); + + if (!conf->is_vhost()) { + continue; + } + + std::string vhost = conf->arg0(); + if (!_srs_config->get_hls_enabled(vhost)) { + continue; + } + + std::string storage = _srs_config->get_hls_storage(vhost); + if (storage != "ram" && storage != "both") { + continue; + } + + SrsHlsEntry* entry = new SrsHlsEntry(); + entry->mount = _srs_config->get_hls_mount(vhost); + thls[vhost] = entry; + srs_trace("http hls live stream, vhost=%s, mount=%s", + vhost.c_str(), entry->mount.c_str()); + } + + return ret; +} + +string SrsHttpStreamServer::hls_mount_generate(SrsRequest* r, string uri, string tmpl) +{ + std::string mount = tmpl; + + // the ts is relative from the m3u8, the same start dir. + size_t pos = string::npos; + if ((pos = mount.rfind("/")) != string::npos) { + mount = mount.substr(0, pos); + } + + // replace the vhost variable + mount = srs_string_replace(mount, "[vhost]", r->vhost); + mount = srs_string_replace(mount, "[app]", r->app); + + // remove the default vhost mount + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); + + // mount with ts. + mount += "/"; + mount += uri; + + return mount; +} + +#endif + diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp new file mode 100644 index 000000000..ed107b3b9 --- /dev/null +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -0,0 +1,365 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 SRS(simple-rtmp-server) + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_APP_HTTP_STREAM_HPP +#define SRS_APP_HTTP_STREAM_HPP + +/* +#include +*/ + +#include + +#include + +#ifdef SRS_AUTO_HTTP_SERVER + +/** +* for the srs http stream cache, +* for example, the audio stream cache to make android(weixin) happy. +* we start a thread to shrink the queue. +*/ +class SrsStreamCache : public ISrsEndlessThreadHandler +{ +private: + SrsMessageQueue* queue; + SrsSource* source; + SrsRequest* req; + SrsEndlessThread* pthread; +public: + SrsStreamCache(SrsSource* s, SrsRequest* r); + virtual ~SrsStreamCache(); +public: + virtual int start(); + virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); +// interface ISrsEndlessThreadHandler. +public: + virtual int cycle(); +}; + +/** +* the stream encoder in some codec, for example, flv or aac. +*/ +class ISrsStreamEncoder +{ +public: + ISrsStreamEncoder(); + virtual ~ISrsStreamEncoder(); +public: + /** + * initialize the encoder with file writer(to http response) and stream cache. + * @param w the writer to write to http response. + * @param c the stream cache for audio stream fast startup. + */ + virtual int initialize(SrsFileWriter* w, SrsStreamCache* c) = 0; + /** + * write rtmp video/audio/metadata. + */ + virtual int write_audio(int64_t timestamp, char* data, int size) = 0; + virtual int write_video(int64_t timestamp, char* data, int size) = 0; + virtual int write_metadata(int64_t timestamp, char* data, int size) = 0; +public: + /** + * for some stream, for example, mp3 and aac, the audio stream, + * we use large gop cache in encoder, for the gop cache of SrsSource is ignore audio. + * @return true to use gop cache of encoder; otherwise, use SrsSource. + */ + virtual bool has_cache() = 0; + /** + * dumps the cache of encoder to consumer. + */ + virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) = 0; +}; + +/** +* the flv stream encoder, remux rtmp stream to flv stream. +*/ +class SrsFlvStreamEncoder : public ISrsStreamEncoder +{ +protected: + SrsFlvEncoder* enc; +public: + SrsFlvStreamEncoder(); + virtual ~SrsFlvStreamEncoder(); +public: + virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); + virtual int write_audio(int64_t timestamp, char* data, int size); + virtual int write_video(int64_t timestamp, char* data, int size); + virtual int write_metadata(int64_t timestamp, char* data, int size); +public: + virtual bool has_cache(); + virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); +}; + +#ifdef SRS_PERF_FAST_FLV_ENCODER +/** + * the fast flv stream encoder. + * @see https://github.com/simple-rtmp-server/srs/issues/405 + */ +class SrsFastFlvStreamEncoder : public SrsFlvStreamEncoder +{ +public: + SrsFastFlvStreamEncoder(); + virtual ~SrsFastFlvStreamEncoder(); +public: + /** + * write the tags in a time. + */ + virtual int write_tags(SrsSharedPtrMessage** msgs, int count); +}; +#endif + +/** +* the ts stream encoder, remux rtmp stream to ts stream. +*/ +class SrsTsStreamEncoder : public ISrsStreamEncoder +{ +private: + SrsTsEncoder* enc; +public: + SrsTsStreamEncoder(); + virtual ~SrsTsStreamEncoder(); +public: + virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); + virtual int write_audio(int64_t timestamp, char* data, int size); + virtual int write_video(int64_t timestamp, char* data, int size); + virtual int write_metadata(int64_t timestamp, char* data, int size); +public: + virtual bool has_cache(); + virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); +}; + +/** +* the aac stream encoder, remux rtmp stream to aac stream. +*/ +class SrsAacStreamEncoder : public ISrsStreamEncoder +{ +private: + SrsAacEncoder* enc; + SrsStreamCache* cache; +public: + SrsAacStreamEncoder(); + virtual ~SrsAacStreamEncoder(); +public: + virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); + virtual int write_audio(int64_t timestamp, char* data, int size); + virtual int write_video(int64_t timestamp, char* data, int size); + virtual int write_metadata(int64_t timestamp, char* data, int size); +public: + virtual bool has_cache(); + virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); +}; + +/** +* the mp3 stream encoder, remux rtmp stream to mp3 stream. +*/ +class SrsMp3StreamEncoder : public ISrsStreamEncoder +{ +private: + SrsMp3Encoder* enc; + SrsStreamCache* cache; +public: + SrsMp3StreamEncoder(); + virtual ~SrsMp3StreamEncoder(); +public: + virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); + virtual int write_audio(int64_t timestamp, char* data, int size); + virtual int write_video(int64_t timestamp, char* data, int size); + virtual int write_metadata(int64_t timestamp, char* data, int size); +public: + virtual bool has_cache(); + virtual int dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); +}; + +/** +* write stream to http response direclty. +*/ +class SrsStreamWriter : public SrsFileWriter +{ +private: + ISrsHttpResponseWriter* writer; +public: + SrsStreamWriter(ISrsHttpResponseWriter* w); + virtual ~SrsStreamWriter(); +public: + virtual int open(std::string file); + virtual void close(); +public: + virtual bool is_open(); + virtual int64_t tellg(); +public: + virtual int write(void* buf, size_t count, ssize_t* pnwrite); + virtual int writev(iovec* iov, int iovcnt, ssize_t* pnwrite); +}; + +/** +* the flv live stream supports access rtmp in flv over http. +* srs will remux rtmp to flv streaming. +*/ +class SrsLiveStream : public ISrsHttpHandler +{ +private: + SrsRequest* req; + SrsSource* source; + SrsStreamCache* cache; +public: + SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c); + virtual ~SrsLiveStream(); +public: + virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); +private: + virtual int streaming_send_messages(ISrsStreamEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs); +}; + +/** +* the srs live entry +*/ +struct SrsLiveEntry +{ +private: + bool _is_flv; + bool _is_ts; + bool _is_aac; + bool _is_mp3; +public: + // for template, the mount contains variables. + // for concrete stream, the mount is url to access. + std::string mount; + // whether hstrs(http stream trigger rtmp source) + bool hstrs; + + SrsLiveStream* stream; + SrsStreamCache* cache; + + SrsLiveEntry(std::string m, bool h); + + bool is_flv(); + bool is_ts(); + bool is_mp3(); + bool is_aac(); +}; + +/** +* the m3u8 stream handler. +*/ +class SrsHlsM3u8Stream : public ISrsHttpHandler +{ +private: + std::string m3u8; +public: + SrsHlsM3u8Stream(); + virtual ~SrsHlsM3u8Stream(); +public: + virtual void set_m3u8(std::string v); +public: + virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); +}; + +/** +* the ts stream handler. +*/ +class SrsHlsTsStream : public ISrsHttpHandler +{ +private: + std::string ts; +public: + SrsHlsTsStream(); + virtual ~SrsHlsTsStream(); +public: + virtual void set_ts(std::string v); +public: + virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); +}; + +/** +* the srs hls entry. +*/ +// TODO: FIXME: use hte hls template and entry. +struct SrsHlsEntry +{ + // for template, the mount contains variables. + // for concrete stream, the mount is url to access. + std::string mount; + + // the template to create the entry + SrsHlsEntry* tmpl; + + // key: the m3u8/ts file path. + // value: the http handler. + std::map streams; + + SrsHlsEntry(); +}; + +/** +* the http stream server instance, +* serve http stream, for example, flv/ts/mp3/aac live stream. +*/ +class SrsHttpStreamServer : virtual public ISrsReloadHandler + , virtual public ISrsHttpMatchHijacker +{ +private: + SrsServer* server; +public: + SrsHttpServeMux mux; + // the http live streaming template, to create streams. + std::map tflvs; + // the http live streaming streams, crote by template. + std::map sflvs; + // the hls live streaming template, to create streams. + std::map thls; + // the hls live streaming streams, crote by template. + std::map shls; +public: + SrsHttpStreamServer(SrsServer* svr); + virtual ~SrsHttpStreamServer(); +public: + virtual int initialize(); +// http flv/ts/mp3/aac stream +public: + virtual int http_mount(SrsSource* s, SrsRequest* r); + virtual void http_unmount(SrsSource* s, SrsRequest* r); +// hls stream +public: + virtual int mount_hls(SrsRequest* r); + virtual int hls_update_m3u8(SrsRequest* r, std::string m3u8); + virtual int hls_update_ts(SrsRequest* r, std::string uri, std::string ts); + virtual int hls_remove_ts(SrsRequest* r, std::string uri); + virtual void unmount_hls(SrsRequest* r); +// interface ISrsReloadHandler. +public: + virtual int on_reload_vhost_http_remux_updated(); + virtual int on_reload_vhost_hls(std::string vhost); +// interface ISrsHttpMatchHijacker +public: + virtual int hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph); +private: + virtual int initialize_flv_streaming(); + virtual int initialize_hls_streaming(); + virtual std::string hls_mount_generate(SrsRequest* r, std::string uri, std::string tmpl); +}; + +#endif + +#endif + diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 87c36b565..806dbe011 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -496,7 +496,7 @@ SrsServer::SrsServer() http_api_mux = new SrsHttpServeMux(); #endif #ifdef SRS_AUTO_HTTP_SERVER - http_stream_mux = new SrsHttpServer(this); + http_server = new SrsHttpServer(this); #endif #ifdef SRS_AUTO_HTTP_CORE http_heartbeat = NULL; @@ -522,7 +522,7 @@ void SrsServer::destroy() #endif #ifdef SRS_AUTO_HTTP_SERVER - srs_freep(http_stream_mux); + srs_freep(http_server); #endif #ifdef SRS_AUTO_HTTP_CORE @@ -602,8 +602,8 @@ int SrsServer::initialize(ISrsServerCycle* cycle_handler) #endif #ifdef SRS_AUTO_HTTP_SERVER - srs_assert(http_stream_mux); - if ((ret = http_stream_mux->initialize()) != ERROR_SUCCESS) { + srs_assert(http_server); + if ((ret = http_server->initialize()) != ERROR_SUCCESS) { return ret; } #endif @@ -810,13 +810,6 @@ int SrsServer::http_handle() return ret; } #endif - -#if defined(SRS_AUTO_HTTP_SERVER) && defined(SRS_AUTO_HTTP_API) - // for SRS go-sharp to detect the status of HTTP server of SRS HTTP FLV Cluster. - if ((ret = http_stream_mux->mux.handle("/api/v1/versions", new SrsGoApiVersion())) != ERROR_SUCCESS) { - return ret; - } -#endif return ret; } @@ -1226,7 +1219,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) #endif } else if (type == SrsListenerHttpStream) { #ifdef SRS_AUTO_HTTP_SERVER - conn = new SrsStaticHttpConn(this, client_stfd, &http_stream_mux->mux); + conn = new SrsStaticHttpConn(this, client_stfd, http_server); #else srs_warn("close http client for server not support http-server"); srs_close_stfd(client_stfd); @@ -1364,7 +1357,7 @@ int SrsServer::on_publish(SrsSource* s, SrsRequest* r) int ret = ERROR_SUCCESS; #ifdef SRS_AUTO_HTTP_SERVER - if ((ret = http_stream_mux->http_mount(s, r)) != ERROR_SUCCESS) { + if ((ret = http_server->http_mount(s, r)) != ERROR_SUCCESS) { return ret; } #endif @@ -1375,7 +1368,7 @@ int SrsServer::on_publish(SrsSource* s, SrsRequest* r) void SrsServer::on_unpublish(SrsSource* s, SrsRequest* r) { #ifdef SRS_AUTO_HTTP_SERVER - http_stream_mux->http_unmount(s, r); + http_server->http_unmount(s, r); #endif } @@ -1384,7 +1377,7 @@ int SrsServer::on_hls_publish(SrsRequest* r) int ret = ERROR_SUCCESS; #ifdef SRS_AUTO_HTTP_SERVER - if ((ret = http_stream_mux->mount_hls(r)) != ERROR_SUCCESS) { + if ((ret = http_server->mount_hls(r)) != ERROR_SUCCESS) { return ret; } #endif @@ -1397,7 +1390,7 @@ int SrsServer::on_update_m3u8(SrsRequest* r, string m3u8) int ret = ERROR_SUCCESS; #ifdef SRS_AUTO_HTTP_SERVER - if ((ret = http_stream_mux->hls_update_m3u8(r, m3u8)) != ERROR_SUCCESS) { + if ((ret = http_server->hls_update_m3u8(r, m3u8)) != ERROR_SUCCESS) { return ret; } #endif @@ -1410,7 +1403,7 @@ int SrsServer::on_update_ts(SrsRequest* r, string uri, string ts) int ret = ERROR_SUCCESS; #ifdef SRS_AUTO_HTTP_SERVER - if ((ret = http_stream_mux->hls_update_ts(r, uri, ts)) != ERROR_SUCCESS) { + if ((ret = http_server->hls_update_ts(r, uri, ts)) != ERROR_SUCCESS) { return ret; } #endif @@ -1424,7 +1417,7 @@ int SrsServer::on_remove_ts(SrsRequest* r, string uri) int ret = ERROR_SUCCESS; #ifdef SRS_AUTO_HTTP_SERVER - if ((ret = http_stream_mux->hls_remove_ts(r, uri)) != ERROR_SUCCESS) { + if ((ret = http_server->hls_remove_ts(r, uri)) != ERROR_SUCCESS) { return ret; } #endif @@ -1437,7 +1430,7 @@ int SrsServer::on_hls_unpublish(SrsRequest* r) int ret = ERROR_SUCCESS; #ifdef SRS_AUTO_HTTP_SERVER - http_stream_mux->unmount_hls(r); + http_server->unmount_hls(r); #endif return ret; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index ac1e51414..c5ba5031f 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -235,10 +235,11 @@ class SrsServer : virtual public ISrsReloadHandler { private: #ifdef SRS_AUTO_HTTP_API + // TODO: FIXME: rename to http_api SrsHttpServeMux* http_api_mux; #endif #ifdef SRS_AUTO_HTTP_SERVER - SrsHttpServer* http_stream_mux; + SrsHttpServer* http_server; #endif #ifdef SRS_AUTO_HTTP_CORE SrsHttpHeartbeat* http_heartbeat; diff --git a/trunk/src/protocol/srs_http_stack.cpp b/trunk/src/protocol/srs_http_stack.cpp index 35818d1ea..0bd11854a 100644 --- a/trunk/src/protocol/srs_http_stack.cpp +++ b/trunk/src/protocol/srs_http_stack.cpp @@ -228,6 +228,11 @@ ISrsHttpHandler::~ISrsHttpHandler() { } +bool ISrsHttpHandler::is_not_found() +{ + return false; +} + SrsHttpRedirectHandler::SrsHttpRedirectHandler(string u, int c) { url = u; @@ -253,6 +258,11 @@ SrsHttpNotFoundHandler::~SrsHttpNotFoundHandler() { } +bool SrsHttpNotFoundHandler::is_not_found() +{ + return true; +} + int SrsHttpNotFoundHandler::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { return srs_go_http_error(w, SRS_CONSTS_HTTP_NotFound, SRS_CONSTS_HTTP_NotFound_str); @@ -487,6 +497,14 @@ ISrsHttpMatchHijacker::~ISrsHttpMatchHijacker() { } +ISrsHttpServeMux::ISrsHttpServeMux() +{ +} + +ISrsHttpServeMux::~ISrsHttpServeMux() +{ +} + SrsHttpServeMux::SrsHttpServeMux() { } @@ -604,6 +622,19 @@ int SrsHttpServeMux::handle(std::string pattern, ISrsHttpHandler* handler) return ret; } +bool SrsHttpServeMux::can_serve(ISrsHttpMessage* r) +{ + int ret = ERROR_SUCCESS; + + ISrsHttpHandler* h = NULL; + if ((ret = find_handler(r, &h)) != ERROR_SUCCESS) { + return false; + } + + srs_assert(h); + return !h->is_not_found(); +} + int SrsHttpServeMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { int ret = ERROR_SUCCESS; @@ -654,9 +685,9 @@ int SrsHttpServeMux::find_handler(ISrsHttpMessage* r, ISrsHttpHandler** ph) } } + static ISrsHttpHandler* h404 = new SrsHttpNotFoundHandler(); if (*ph == NULL) { - // TODO: FIXME: memory leak. - *ph = new SrsHttpNotFoundHandler(); + *ph = h404; } return ret; diff --git a/trunk/src/protocol/srs_http_stack.hpp b/trunk/src/protocol/srs_http_stack.hpp index a127156fa..bf12477ac 100644 --- a/trunk/src/protocol/srs_http_stack.hpp +++ b/trunk/src/protocol/srs_http_stack.hpp @@ -247,6 +247,7 @@ public: ISrsHttpHandler(); virtual ~ISrsHttpHandler(); public: + virtual bool is_not_found(); virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) = 0; }; @@ -270,6 +271,7 @@ public: SrsHttpNotFoundHandler(); virtual ~SrsHttpNotFoundHandler(); public: + virtual bool is_not_found(); virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); }; @@ -347,6 +349,18 @@ public: virtual int hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) = 0; }; +/** + * the server mux, all http server should implements it. + */ +class ISrsHttpServeMux +{ +public: + ISrsHttpServeMux(); + virtual ~ISrsHttpServeMux(); +public: + virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) = 0; +}; + // ServeMux is an HTTP request multiplexer. // It matches the URL of each incoming request against a list of registered // patterns and calls the handler for the pattern that @@ -374,7 +388,7 @@ public: // ServeMux also takes care of sanitizing the URL request path, // redirecting any request containing . or .. elements to an // equivalent .- and ..-free URL. -class SrsHttpServeMux +class SrsHttpServeMux : public ISrsHttpServeMux { private: // the pattern handler, to handle the http request. @@ -406,7 +420,10 @@ public: // Handle registers the handler for the given pattern. // If a handler already exists for pattern, Handle panics. virtual int handle(std::string pattern, ISrsHttpHandler* handler); - // interface ISrsHttpHandler + // whether the http muxer can serve the specified message, + // if not, user can try next muxer. + virtual bool can_serve(ISrsHttpMessage* r); +// interface ISrsHttpServeMux public: virtual int serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); private: From 3211282b0c588fc7e4d0b2ad5e526eb7ee027212 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 14 Jun 2015 19:42:43 +0800 Subject: [PATCH 2/2] refine the http code. --- trunk/src/app/srs_app_hls.cpp | 2 +- trunk/src/app/srs_app_http_conn.cpp | 18 ++++++++++-------- trunk/src/app/srs_app_http_conn.hpp | 10 ++++++---- trunk/src/app/srs_app_server.cpp | 2 +- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 8ced69e82..54eb18566 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -752,7 +752,7 @@ int SrsHlsMuxer::segment_close(string log_desc) if (should_write_file) { unlink(tmp_file.c_str()); if (unlink(tmp_file.c_str()) < 0) { - srs_warn("drop unlink path failed, file=%s.", tmp_file.c_str()); + srs_warn("ignore unlink path failed, file=%s.", tmp_file.c_str()); } } diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index c992c5002..64e095be0 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -1204,25 +1204,27 @@ int SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) return ret; } -SrsStaticHttpConn::SrsStaticHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m) +SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m) : SrsHttpConn(cm, fd, m) { } -SrsStaticHttpConn::~SrsStaticHttpConn() +SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn() { } -int SrsStaticHttpConn::on_got_http_message(ISrsHttpMessage* msg) +int SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg) { int ret = ERROR_SUCCESS; - // TODO: FIXME: use the post body. - std::string res; + ISrsHttpResponseReader* br = msg->body_reader(); - // get response body. - if ((ret = msg->body_read_all(res)) != ERROR_SUCCESS) { - return ret; + // drop all request body. + while (!br->eof()) { + char body[4096]; + if ((ret = br->read(body, 4096, NULL)) != ERROR_SUCCESS) { + return ret; + } } return ret; diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index d27c30597..7bc06cd01 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -394,12 +394,14 @@ private: virtual int process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); }; -// TODO: FIXME: rename to SrsResponseOnlyHttpConn. -class SrsStaticHttpConn : public SrsHttpConn +/** + * drop body of request, only process the response. + */ +class SrsResponseOnlyHttpConn : public SrsHttpConn { public: - SrsStaticHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m); - virtual ~SrsStaticHttpConn(); + SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m); + virtual ~SrsResponseOnlyHttpConn(); public: virtual int on_got_http_message(ISrsHttpMessage* msg); }; diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 806dbe011..0f2b3f960 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1219,7 +1219,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) #endif } else if (type == SrsListenerHttpStream) { #ifdef SRS_AUTO_HTTP_SERVER - conn = new SrsStaticHttpConn(this, client_stfd, http_server); + conn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server); #else srs_warn("close http client for server not support http-server"); srs_close_stfd(client_stfd);