From 2239e9f1fd7bcc5b0d128c8f61d912e78d75477b Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 20 Apr 2015 16:25:48 +0800 Subject: [PATCH] support ingest hls live stream to RTMP. --- README.md | 1 + trunk/auto/summary.sh | 10 +- trunk/configure | 21 +- trunk/ide/srs_upp/srs_upp.upp | 1 + .../srs_xcode.xcodeproj/project.pbxproj | 4 + trunk/src/app/srs_app_log.cpp | 2 +- trunk/src/main/srs_main_ingest_hls.cpp | 1074 +++++++++++++++++ 7 files changed, 1103 insertions(+), 10 deletions(-) create mode 100644 trunk/src/main/srs_main_ingest_hls.cpp diff --git a/README.md b/README.md index e1a59cff4..013c57988 100755 --- a/README.md +++ b/README.md @@ -562,6 +562,7 @@ Supported operating systems and hardware: ### SRS 2.0 history +* v2.0, 2015-04-20, support ingest hls live stream to RTMP. * v2.0, 2015-04-15, for [#383](https://github.com/winlinvip/simple-rtmp-server/issues/383), support mix_correct algorithm. 2.0.161. * v2.0, 2015-04-13, for [#381](https://github.com/winlinvip/simple-rtmp-server/issues/381), support reap hls/ts by gop or not. 2.0.160. * v2.0, 2015-04-10, enhanced on_hls_notify, support HTTP GET when reap ts. diff --git a/trunk/auto/summary.sh b/trunk/auto/summary.sh index 0b4852c80..f1134e750 100755 --- a/trunk/auto/summary.sh +++ b/trunk/auto/summary.sh @@ -57,8 +57,6 @@ echo -e " | ${SrsGprofSummaryColor}rm -f gmon.out; ./objs/srs -c conf/co echo -e " | ${SrsGprofSummaryColor}killall -2 srs # or CTRL+C to stop gprof\${BLACK}" echo -e " | ${SrsGprofSummaryColor}gprof -b ./objs/srs gmon.out > gprof.srs.log && rm -f gmon.out # gprof report to gprof.srs.log\${BLACK}" echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" -echo -e " |${SrsResearchSummaryColor}research: ./objs/research, api server, players, ts info, librtmp.\${BLACK}" -echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" echo -e " |${SrsUtestSummaryColor}utest: ./objs/srs_utest, the utest for srs\${BLACK}" echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" echo -e " |${SrsLibrtmpSummaryColor}librtmp @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v1_CN_SrsLibrtmp\${BLACK}" @@ -71,6 +69,12 @@ echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/ echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_detect_rtmp\${BLACK}" echo -e " | ${SrsLibrtmpSummaryColor}librtmp-sample: ./research/librtmp/objs/srs_bandwidth_check\${BLACK}" echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" +echo -e " |${SrsResearchSummaryColor}research: ./objs/research, api server, players, ts info, librtmp.\${BLACK}" +echo -e " | ${SrsResearchSummaryColor} @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_SrsLibrtmp#srs-librtmp-examples\${BLACK}" +echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" +echo -e " |\${GREEN}tools: important tool, others @see https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_SrsLibrtmp#srs-librtmp-examples\${BLACK}" +echo -e " | \${GREEN}./objs/srs_ingest_hls -i http://ossrs.net/live/livestream.m3u8 -y rtmp://127.0.0.1/live/livestream\${BLACK}" +echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" echo -e " |\${GREEN}server: ./objs/srs -c conf/srs.conf, start the srs server\${BLACK}" echo -e " | ${SrsHlsSummaryColor}hls @see: https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_DeliveryHLS\${BLACK}" echo -e " | ${SrsHlsSummaryColor}hls: generate m3u8 and ts from rtmp stream\${BLACK}" @@ -121,4 +125,4 @@ echo -e "\${BLACK}Examples for srs-librtmp at:\${BLACK}" echo -e "\${GREEN} objs/research/librtmp\${BLACK}" echo -e "\${GREEN} Examples: https://github.com/winlinvip/simple-rtmp-server/wiki/v2_CN_SrsLibrtmp#srs-librtmp-examples\${BLACK}" END -fi \ No newline at end of file +fi diff --git a/trunk/configure b/trunk/configure index 273fe6a53..56441f410 100755 --- a/trunk/configure +++ b/trunk/configure @@ -100,7 +100,7 @@ AR = ar LINK = g++ CXXFLAGS = ${CXXFLAGS} -.PHONY: default srs librtmp +.PHONY: default srs srs_ingest_hls librtmp default: @@ -200,7 +200,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then MODULE_ID="MAIN" MODULE_DEPENDS=("CORE" "KERNEL" "RTMP" "APP") ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibHttpParserRoot}) - MODULE_FILES=("srs_main_server") + MODULE_FILES=("srs_main_server" "srs_main_ingest_hls") # add each modules for main for SRS_MODULE in ${SRS_MODULES[*]}; do . $SRS_MODULE/config @@ -217,7 +217,7 @@ fi # disable all app when export librtmp if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then # all main entrances - MAIN_ENTRANCES=("srs_main_server") + MAIN_ENTRANCES=("srs_main_server" "srs_main_ingest_hls") # add each modules for main for SRS_MODULE in ${SRS_MODULES[*]}; do . $SRS_MODULE/config @@ -232,6 +232,9 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then # # srs: srs(simple rtmp server) over st(state-threads) BUILD_KEY="srs" APP_MAIN="srs_main_server" APP_NAME="srs" . auto/apps.sh + # + # srs_ingest_hls: to ingest hls stream to srs. + BUILD_KEY="srs_ingest_hls" APP_MAIN="srs_main_ingest_hls" APP_NAME="srs_ingest_hls" . auto/apps.sh # add each modules for application for SRS_MODULE in ${SRS_MODULES[*]}; do . $SRS_MODULE/config @@ -272,7 +275,7 @@ mv ${SRS_WORKDIR}/${SRS_MAKEFILE} ${SRS_WORKDIR}/${SRS_MAKEFILE}.bk # generate phony header cat << END > ${SRS_WORKDIR}/${SRS_MAKEFILE} -.PHONY: default _default install install-api help clean server librtmp utest _prepare_dir $__mphonys +.PHONY: default _default install install-api help clean server srs_ingest_hls librtmp utest _prepare_dir $__mphonys # install prefix. SRS_PREFIX=${SRS_PREFIX} @@ -300,14 +303,15 @@ fi # the server, librtmp and utest # where the bellow will check and disable some entry by only echo. cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE} -_default: server librtmp utest $__mdefaults +_default: server srs_ingest_hls librtmp utest $__mdefaults @bash objs/_srs_build_summary.sh help: - @echo "Usage: make |||||||" + @echo "Usage: make ||||||||" @echo " help display this help menu" @echo " clean cleanup project" @echo " server build the srs(simple rtmp server) over st(state-threads)" + @echo " srs_ingest_hls build the hls ingest tool of srs." @echo " librtmp build the client publish/play library, and samples" @echo " utest build the utest for srs" @echo " install install srs to the prefix path" @@ -332,6 +336,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT != NO ]; then cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE} server: _prepare_dir @echo "donot build the srs(simple rtmp server) for srs-librtmp" +srs_ingest_hls: _prepare_dir + @echo "donot build the srs_ingest_hls for srs-librtmp" END else @@ -339,6 +345,9 @@ else server: _prepare_dir @echo "build the srs(simple rtmp server) over st(state-threads)" \$(MAKE) -f ${SRS_OBJS_DIR}/${SRS_MAKEFILE} srs +srs_ingest_hls: _prepare_dir + @echo "build the srs_ingest_hls for srs" + \$(MAKE) -f ${SRS_OBJS_DIR}/${SRS_MAKEFILE} srs_ingest_hls END fi diff --git a/trunk/ide/srs_upp/srs_upp.upp b/trunk/ide/srs_upp/srs_upp.upp index de7824012..424a3bbd3 100755 --- a/trunk/ide/srs_upp/srs_upp.upp +++ b/trunk/ide/srs_upp/srs_upp.upp @@ -1,6 +1,7 @@ file main readonly separator, ../../src/main/srs_main_server.cpp, + ../../src/main/srs_main_ingest_hls.cpp, auto readonly separator, ../../objs/srs_auto_headers.hpp, libs readonly separator, diff --git a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj index 904377fa6..15522b837 100644 --- a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj +++ b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj @@ -105,6 +105,7 @@ 3CC52DDD1ACE4023006FEB01 /* srs_utest_reload.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD41ACE4023006FEB01 /* srs_utest_reload.cpp */; }; 3CC52DDE1ACE4023006FEB01 /* srs_utest.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CC52DD61ACE4023006FEB01 /* srs_utest.cpp */; }; 3CD88B3F1ACA9C58000359E0 /* srs_app_async_call.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */; }; + 3CE6CD311AE4AFB800706E07 /* srs_main_ingest_hls.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */; }; /* End PBXBuildFile section */ /* Begin PBXCopyFilesBuildPhase section */ @@ -361,6 +362,7 @@ 3CC52DD71ACE4023006FEB01 /* srs_utest.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_utest.hpp; path = ../../src/utest/srs_utest.hpp; sourceTree = ""; }; 3CD88B3D1ACA9C58000359E0 /* srs_app_async_call.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_async_call.cpp; path = ../../../src/app/srs_app_async_call.cpp; sourceTree = ""; }; 3CD88B3E1ACA9C58000359E0 /* srs_app_async_call.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_async_call.hpp; path = ../../../src/app/srs_app_async_call.hpp; sourceTree = ""; }; + 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_main_ingest_hls.cpp; path = ../../../src/main/srs_main_ingest_hls.cpp; sourceTree = ""; }; /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ @@ -442,6 +444,7 @@ 3C1232041AAE80CB00CE8F6C /* main */ = { isa = PBXGroup; children = ( + 3CE6CD301AE4AFB800706E07 /* srs_main_ingest_hls.cpp */, 3C1232051AAE812C00CE8F6C /* srs_main_server.cpp */, ); name = main; @@ -904,6 +907,7 @@ 3C1232A71AAE81D900CE8F6C /* srs_app_listener.cpp in Sources */, 3C1232261AAE814D00CE8F6C /* srs_kernel_flv.cpp in Sources */, 3C663F1A1AB0155100286D8B /* srs_rtmp_dump.c in Sources */, + 3CE6CD311AE4AFB800706E07 /* srs_main_ingest_hls.cpp in Sources */, 3C1232241AAE814D00CE8F6C /* srs_kernel_error.cpp in Sources */, 3C1232441AAE81A400CE8F6C /* srs_rtmp_handshake.cpp in Sources */, 3C1232291AAE814D00CE8F6C /* srs_kernel_stream.cpp in Sources */, diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index 798450213..e7d47585a 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -274,7 +274,7 @@ bool SrsFastLog::generate_header(bool error, const char* tag, int context_id, co // to calendar time struct tm* tm; - if (_srs_config->get_utc_time()) { + if (_srs_config && _srs_config->get_utc_time()) { if ((tm = gmtime(&tv.tv_sec)) == NULL) { return false; } diff --git a/trunk/src/main/srs_main_ingest_hls.cpp b/trunk/src/main/srs_main_ingest_hls.cpp new file mode 100644 index 000000000..9a1876db1 --- /dev/null +++ b/trunk/src/main/srs_main_ingest_hls.cpp @@ -0,0 +1,1074 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include +#include +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// the retry timeout in ms. +#define SRS_INGEST_HLS_ERROR_RETRY_MS 3000 + +// pre-declare +int proxy_hls2rtmp(std::string hls, std::string rtmp); + +// for the main objects(server, config, log, context), +// never subscribe handler in constructor, +// instead, subscribe handler in initialize method. +// kernel module. +ISrsLog* _srs_log = new SrsFastLog(); +ISrsThreadContext* _srs_context = new ISrsThreadContext(); +// app module. +SrsConfig* _srs_config = NULL; +SrsServer* _srs_server = NULL; + +/** +* main entrance. +*/ +int main(int argc, char** argv) +{ + // TODO: support both little and big endian. + srs_assert(srs_is_little_endian()); + + // directly failed when compile limited. +#if !defined(SRS_AUTO_HTTP_PARSER) + srs_error("depends on http-parser."); + exit(-1); +#endif + +#if defined(SRS_AUTO_GPERF_MP) || defined(SRS_AUTO_GPERF_MP) \ +|| defined(SRS_AUTO_GPERF_MC) || defined(SRS_AUTO_GPERF_MP) + srs_error("donot support gmc/gmp/gcp/gprof"); + exit(-1); +#endif + + srs_trace("srs_ingest_hls base on %s, to ingest hls live to srs", RTMP_SIG_SRS_SERVER); + + // parse user options. + std::string in_hls_url, out_rtmp_url; + for (int opt = 0; opt < argc; opt++) { + srs_trace("argv[%d]=%s", opt, argv[opt]); + } + + // fill the options for mac + for (int opt = 0; opt < argc - 1; opt++) { + // ignore all options except -i and -y. + char* p = argv[opt]; + + // only accept -x + if (p[0] != '-' || p[1] == 0 || p[2] != 0) { + continue; + } + + // parse according the option name. + switch (p[1]) { + case 'i': in_hls_url = argv[opt + 1]; break; + case 'y': out_rtmp_url = argv[opt + 1]; break; + default: break; + } + } + + if (in_hls_url.empty() || out_rtmp_url.empty()) { + printf("ingest hls live stream and publish to RTMP server\n" + "Usage: %s <-i in_hls_url> <-y out_rtmp_url>\n" + " in_hls_url input hls url, ingest from this m3u8.\n" + " out_rtmp_url output rtmp url, publish to this url.\n" + "For example:\n" + " %s -i http://127.0.0.1:8080/live/livestream.m3u8 -y rtmp://127.0.0.1/live/ingest_hls\n" + " %s -i http://ossrs.net/live/livestream.m3u8 -y rtmp://127.0.0.1/live/ingest_hls\n", + argv[0], argv[0], argv[0]); + exit(-1); + } + + srs_trace("input: %s", in_hls_url.c_str()); + srs_trace("output: %s", out_rtmp_url.c_str()); + + return proxy_hls2rtmp(in_hls_url, out_rtmp_url); +} + +// the context to ingest hls stream. +class SrsIngestSrsInput +{ +private: + struct SrsTsPiece { + double duration; + std::string url; + std::string body; + + // should skip this ts? + bool skip; + // already sent to rtmp server? + bool sent; + // whether ts piece is dirty, remove if not update. + bool dirty; + + SrsTsPiece() { + skip = false; + sent = false; + dirty = false; + } + + int fetch(std::string m3u8, SrsHttpClient* client); + }; +private: + SrsHttpUri* in_hls; + std::vector pieces; + int64_t next_connect_time; +private: + SrsStream* stream; + SrsTsContext* context; +public: + SrsIngestSrsInput(SrsHttpUri* hls) { + in_hls = hls; + next_connect_time = 0; + + stream = new SrsStream(); + context = new SrsTsContext(); + } + virtual ~SrsIngestSrsInput() { + srs_freep(stream); + srs_freep(context); + + std::vector::iterator it; + for (it = pieces.begin(); it != pieces.end(); ++it) { + SrsTsPiece* tp = *it; + srs_freep(tp); + } + pieces.clear(); + } + /** + * parse the input hls live m3u8 index. + */ + virtual int connect(); + /** + * parse the ts and use hanler to process the message. + */ + virtual int parse(ISrsTsHandler* handler); +private: + /** + * find the ts piece by its url. + */ + virtual SrsTsPiece* find_ts(string url); + /** + * set all ts to dirty. + */ + virtual void dirty_all_ts(); + /** + * fetch all ts body. + */ + virtual void fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client); + /** + * remove all ts which is dirty. + */ + virtual void remove_dirty(); +}; + +int SrsIngestSrsInput::connect() +{ + int ret = ERROR_SUCCESS; + + int64_t now = srs_update_system_time_ms(); + if (now < next_connect_time) { + st_usleep((next_connect_time - now) * 1000); + } + + SrsHttpClient client; + srs_trace("parse input hls %s", in_hls->get_url()); + + if ((ret = client.initialize(in_hls->get_host(), in_hls->get_port())) != ERROR_SUCCESS) { + srs_error("connect to server failed. ret=%d", ret); + return ret; + } + + SrsHttpMessage* msg = NULL; + if ((ret = client.get(in_hls->get_path(), "", &msg)) != ERROR_SUCCESS) { + srs_error("HTTP GET %s failed. ret=%d", in_hls->get_url(), ret); + return ret; + } + + srs_assert(msg); + SrsAutoFree(SrsHttpMessage, msg); + + std::string body; + if ((ret = msg->body_read_all(body)) != ERROR_SUCCESS) { + srs_error("read m3u8 failed. ret=%d", ret); + return ret; + } + + if (body.empty()) { + srs_warn("ignore empty m3u8"); + return ret; + } + + // set all ts to dirty. + dirty_all_ts(); + + std::string ptl; + double td = 0.0; + double duration = 0.0; + bool fresh_m3u8 = pieces.empty(); + while (!body.empty()) { + size_t pos = string::npos; + + std::string line; + if ((pos = body.find("\n")) != string::npos) { + line = body.substr(0, pos); + body = body.substr(pos + 1); + } else { + line = body; + body = ""; + } + + line = srs_string_replace(line, "\r", ""); + line = srs_string_replace(line, " ", ""); + + // #EXT-X-VERSION:3 + // the version must be 3.0 + if (srs_string_starts_with(line, "#EXT-X-VERSION:")) { + if (!srs_string_ends_with(line, ":3")) { + srs_warn("m3u8 3.0 required, actual is %s", line.c_str()); + } + continue; + } + + // #EXT-X-PLAYLIST-TYPE:VOD + // the playlist type, vod or nothing. + if (srs_string_starts_with(line, "#EXT-X-PLAYLIST-TYPE:")) { + ptl = line; + continue; + } + + // #EXT-X-TARGETDURATION:12 + // the target duration is required. + if (srs_string_starts_with(line, "#EXT-X-TARGETDURATION:")) { + td = ::atof(line.substr(string("#EXT-X-TARGETDURATION:").length()).c_str()); + } + + // #EXT-X-ENDLIST + // parse completed. + if (line == "#EXT-X-ENDLIST") { + break; + } + + // #EXTINF:11.401, + // livestream-5.ts + // parse each ts entry, expect current line is inf. + if (!srs_string_starts_with(line, "#EXTINF:")) { + continue; + } + + // expect next line is url. + std::string ts_url; + if ((pos = body.find("\n")) != string::npos) { + ts_url = body.substr(0, pos); + body = body.substr(pos + 1); + } else { + srs_warn("ts entry unexpected eof, inf=%s", line.c_str()); + break; + } + + // parse the ts duration. + line = line.substr(string("#EXTINF:").length()); + if ((pos = line.find(",")) != string::npos) { + line = line.substr(0, pos); + } + + double ts_duration = ::atof(line.c_str()); + duration += ts_duration; + + SrsTsPiece* tp = find_ts(ts_url); + if (!tp) { + tp = new SrsTsPiece(); + tp->url = ts_url; + tp->duration = ts_duration; + pieces.push_back(tp); + } else { + tp->dirty = false; + } + } + + // fetch all ts. + fetch_all_ts(fresh_m3u8, &client); + + // remove all dirty ts. + remove_dirty(); + + srs_trace("fetch m3u8 ok, td=%.2f, duration=%.2f, pieces=%d", td, duration, pieces.size()); + + return ret; +} + +int SrsIngestSrsInput::parse(ISrsTsHandler* handler) +{ + int ret = ERROR_SUCCESS; + + for (int i = 0; i < (int)pieces.size(); i++) { + SrsTsPiece* tp = pieces.at(i); + tp->sent = true; + + if (tp->body.empty()) { + continue; + } + + // use stream to parse ts packet. + int nb_packet = (int)tp->body.length() / SRS_TS_PACKET_SIZE; + for (int i = 0; i < nb_packet; i++) { + char* p = (char*)tp->body.data() + (i * SRS_TS_PACKET_SIZE); + if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) { + return ret; + } + + // process each ts packet + if ((ret = context->decode(stream, handler)) != ERROR_SUCCESS) { + srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret); + continue; + } + srs_info("mpegts: parse ts packet completed"); + } + srs_info("mpegts: parse udp packet completed"); + } + + return ret; +} + +SrsIngestSrsInput::SrsTsPiece* SrsIngestSrsInput::find_ts(string url) +{ + std::vector::iterator it; + for (it = pieces.begin(); it != pieces.end(); ++it) { + SrsTsPiece* tp = *it; + if (tp->url == url) { + return tp; + } + } + return NULL; +} + +void SrsIngestSrsInput::dirty_all_ts() +{ + std::vector::iterator it; + for (it = pieces.begin(); it != pieces.end(); ++it) { + SrsTsPiece* tp = *it; + tp->dirty = true; + } +} + +void SrsIngestSrsInput::fetch_all_ts(bool fresh_m3u8, SrsHttpClient* client) +{ + int ret = ERROR_SUCCESS; + + for (int i = 0; i < (int)pieces.size(); i++) { + SrsTsPiece* tp = pieces.at(i); + + // when skipped, ignore. + if (tp->skip) { + continue; + } + + // for the fresh m3u8, skip except the last one. + if (fresh_m3u8 && i != (int)pieces.size() - 1) { + tp->skip = true; + continue; + } + + if ((ret = tp->fetch(in_hls->get_url(), client)) != ERROR_SUCCESS) { + srs_warn("ignore ts %s for error. ret=%d", tp->url.c_str(), ret); + tp->skip = true; + continue; + } + + // set the next connect time. + if (next_connect_time <= 0) { + next_connect_time = srs_update_system_time_ms(); + } + next_connect_time += (int)tp->duration * 1000; + } +} + + +void SrsIngestSrsInput::remove_dirty() +{ + std::vector::iterator it; + for (it = pieces.begin(); it != pieces.end();) { + SrsTsPiece* tp = *it; + + if (tp->dirty) { + srs_freep(tp); + it = pieces.erase(it); + } else { + ++it; + } + } +} + +int SrsIngestSrsInput::SrsTsPiece::fetch(string m3u8, SrsHttpClient* client) +{ + int ret = ERROR_SUCCESS; + + if (skip || sent || !body.empty()) { + return ret; + } + + size_t pos = string::npos; + + bool use_abs_client = false; + SrsHttpClient abs_client; + + std::string ts_url = url; + if (!srs_string_starts_with(ts_url, "http://")) { + std::string baseurl = m3u8; + if ((pos = m3u8.rfind("/")) != string::npos) { + baseurl = m3u8.substr(0, pos); + } + ts_url = baseurl + "/" + url; + + // use fresh client for absolute url. + client = &abs_client; + use_abs_client = true; + } + + SrsHttpUri uri; + if ((ret = uri.initialize(ts_url)) != ERROR_SUCCESS) { + return ret; + } + + // initialize the fresh http client. + if (use_abs_client && (ret = client->initialize(uri.get_host(), uri.get_port()) != ERROR_SUCCESS)) { + return ret; + } + + SrsHttpMessage* msg = NULL; + if ((ret = client->get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) { + srs_error("HTTP GET %s failed. ret=%d", uri.get_url(), ret); + return ret; + } + + srs_assert(msg); + SrsAutoFree(SrsHttpMessage, msg); + + if ((ret = msg->body_read_all(body)) != ERROR_SUCCESS) { + srs_error("read ts failed. ret=%d", ret); + return ret; + } + + srs_trace("fetch ts ok, duration=%.2f, url=%s, body=%dB", duration, url.c_str(), body.length()); + + return ret; +} + +// the context to output to rtmp server +class SrsIngestSrsOutput : public ISrsTsHandler +{ +private: + SrsHttpUri* out_rtmp; +private: + SrsRequest* req; + st_netfd_t stfd; + SrsStSocket* io; + SrsRtmpClient* client; + int stream_id; +private: + SrsRawH264Stream* avc; + std::string h264_sps; + bool h264_sps_changed; + std::string h264_pps; + bool h264_pps_changed; + bool h264_sps_pps_sent; +private: + SrsRawAacStream* aac; + std::string aac_specific_config; +public: + SrsIngestSrsOutput(SrsHttpUri* rtmp) { + out_rtmp = rtmp; + + req = NULL; + io = NULL; + client = NULL; + stfd = NULL; + stream_id = 0; + + avc = new SrsRawH264Stream(); + aac = new SrsRawAacStream(); + h264_sps_changed = false; + h264_pps_changed = false; + h264_sps_pps_sent = false; + } + virtual ~SrsIngestSrsOutput() { + close(); + + srs_freep(avc); + srs_freep(aac); + } +// interface ISrsTsHandler +public: + virtual int on_ts_message(SrsTsMessage* msg); +private: + virtual int on_ts_video(SrsTsMessage* msg, SrsStream* avs); + virtual int write_h264_sps_pps(u_int32_t dts, u_int32_t pts); + virtual int write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts); + virtual int on_ts_audio(SrsTsMessage* msg, SrsStream* avs); + virtual int write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts); +private: + virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); +public: + /** + * connect to output rtmp server. + */ + virtual int connect(); +private: + virtual int connect_app(std::string ep_server, std::string ep_port); + // close the connected io and rtmp to ready to be re-connect. + virtual void close(); +}; + +int SrsIngestSrsOutput::on_ts_message(SrsTsMessage* msg) +{ + int ret = ERROR_SUCCESS; + + // about the bytes of msg, specified by elementary stream which indicates by PES_packet_data_byte and stream_id + // for example, when SrsTsStream of SrsTsChannel indicates stream_type is SrsTsStreamVideoMpeg4 and SrsTsStreamAudioMpeg4, + // the elementary stream can be mux in "2.11 Carriage of ISO/IEC 14496 data" in hls-mpeg-ts-iso13818-1.pdf, page 103 + // @remark, the most popular stream_id is 0xe0 for h.264 over mpegts, which indicates the stream_id is video and + // stream_number is 0, where I guess the elementary is specified in annexb format(H.264-AVC-ISO_IEC_14496-10.pdf, page 211). + // because when audio stream_number is 0, the elementary is ADTS(aac-mp4a-format-ISO_IEC_14496-3+2001.pdf, page 75, 1.A.2.2 ADTS). + + // about the bytes of PES_packet_data_byte, defined in hls-mpeg-ts-iso13818-1.pdf, page 58 + // PES_packet_data_byte ¨C PES_packet_data_bytes shall be contiguous bytes of data from the elementary stream + // indicated by the packet¡¯s stream_id or PID. When the elementary stream data conforms to ITU-T + // Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 13818-3, the PES_packet_data_bytes shall be byte aligned to the bytes of this + // Recommendation | International Standard. The byte-order of the elementary stream shall be preserved. The number of + // PES_packet_data_bytes, N, is specified by the PES_packet_length field. N shall be equal to the value indicated in the + // PES_packet_length minus the number of bytes between the last byte of the PES_packet_length field and the first + // PES_packet_data_byte. + // + // In the case of a private_stream_1, private_stream_2, ECM_stream, or EMM_stream, the contents of the + // PES_packet_data_byte field are user definable and will not be specified by ITU-T | ISO/IEC in the future. + + // about the bytes of stream_id, define in hls-mpeg-ts-iso13818-1.pdf, page 49 + // stream_id ¨C In Program Streams, the stream_id specifies the type and number of the elementary stream as defined by the + // stream_id Table 2-18. In Transport Streams, the stream_id may be set to any valid value which correctly describes the + // elementary stream type as defined in Table 2-18. In Transport Streams, the elementary stream type is specified in the + // Program Specific Information as specified in 2.4.4. + + // about the stream_id table, define in Table 2-18 ¨C Stream_id assignments, hls-mpeg-ts-iso13818-1.pdf, page 52. + // + // 110x xxxx + // ISO/IEC 13818-3 or ISO/IEC 11172-3 or ISO/IEC 13818-7 or ISO/IEC + // 14496-3 audio stream number x xxxx + // ((sid >> 5) & 0x07) == SrsTsPESStreamIdAudio + // + // 1110 xxxx + // ITU-T Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 11172-2 or ISO/IEC + // 14496-2 video stream number xxxx + // ((stream_id >> 4) & 0x0f) == SrsTsPESStreamIdVideo + + srs_trace("<- "SRS_CONSTS_LOG_STREAM_CASTER" mpegts: got %s stream=%s, dts=%"PRId64", pts=%"PRId64", size=%d, us=%d, cc=%d, sid=%#x(%s-%d)", + (msg->channel->apply == SrsTsPidApplyVideo)? "Video":"Audio", srs_ts_stream2string(msg->channel->stream).c_str(), + msg->dts, msg->pts, msg->payload->length(), msg->packet->payload_unit_start_indicator, msg->continuity_counter, msg->sid, + msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number()); + + // when not audio/video, or not adts/annexb format, donot support. + if (msg->stream_number() != 0) { + ret = ERROR_STREAM_CASTER_TS_ES; + srs_error("mpegts: unsupported stream format, sid=%#x(%s-%d). ret=%d", + msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number(), ret); + return ret; + } + + // check supported codec + if (msg->channel->stream != SrsTsStreamVideoH264 && msg->channel->stream != SrsTsStreamAudioAAC) { + ret = ERROR_STREAM_CASTER_TS_CODEC; + srs_error("mpegts: unsupported stream codec=%d. ret=%d", msg->channel->stream, ret); + return ret; + } + + // parse the stream. + SrsStream avs; + if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) { + srs_error("mpegts: initialize av stream failed. ret=%d", ret); + return ret; + } + + // publish audio or video. + if (msg->channel->stream == SrsTsStreamVideoH264) { + return on_ts_video(msg, &avs); + } + if (msg->channel->stream == SrsTsStreamAudioAAC) { + return on_ts_audio(msg, &avs); + } + + // TODO: FIXME: implements it. + return ret; +} + +int SrsIngestSrsOutput::on_ts_video(SrsTsMessage* msg, SrsStream* avs) +{ + int ret = ERROR_SUCCESS; + + // ensure rtmp connected. + if ((ret = connect()) != ERROR_SUCCESS) { + return ret; + } + + // ts tbn to flv tbn. + u_int32_t dts = (u_int32_t)(msg->dts / 90); + u_int32_t pts = (u_int32_t)(msg->dts / 90); + + // the whole ts pes video packet must be a flv frame packet. + char* ibpframe = avs->data() + avs->pos(); + int ibpframe_size = avs->size() - avs->pos(); + + // send each frame. + while (!avs->empty()) { + char* frame = NULL; + int frame_size = 0; + if ((ret = avc->annexb_demux(avs, &frame, &frame_size)) != ERROR_SUCCESS) { + return ret; + } + + // ignore invalid frame, + // * atleast 1bytes for SPS to decode the type + // * ignore the auth bytes '09f0' + if (frame_size <= 2) { + continue; + } + + // for sps + if (avc->is_sps(frame, frame_size)) { + std::string sps; + if ((ret = avc->sps_demux(frame, frame_size, sps)) != ERROR_SUCCESS) { + return ret; + } + + if (h264_sps == sps) { + continue; + } + h264_sps_changed = true; + h264_sps = sps; + + if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) { + return ret; + } + continue; + } + + // for pps + if (avc->is_pps(frame, frame_size)) { + std::string pps; + if ((ret = avc->pps_demux(frame, frame_size, pps)) != ERROR_SUCCESS) { + return ret; + } + + if (h264_pps == pps) { + continue; + } + h264_pps_changed = true; + h264_pps = pps; + + if ((ret = write_h264_sps_pps(dts, pts)) != ERROR_SUCCESS) { + return ret; + } + continue; + } + + break; + } + + // ibp frame. + srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", ibpframe_size, dts); + return write_h264_ipb_frame(ibpframe, ibpframe_size, dts, pts); +} + +int SrsIngestSrsOutput::write_h264_sps_pps(u_int32_t dts, u_int32_t pts) +{ + int ret = ERROR_SUCCESS; + + // only send when both sps and pps changed. + if (!h264_sps_changed || !h264_pps_changed) { + return ret; + } + + // h264 raw to h264 packet. + std::string sh; + if ((ret = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != ERROR_SUCCESS) { + return ret; + } + + // h264 packet to flv packet. + int8_t frame_type = SrsCodecVideoAVCFrameKeyFrame; + int8_t avc_packet_type = SrsCodecVideoAVCTypeSequenceHeader; + char* flv = NULL; + int nb_flv = 0; + if ((ret = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) { + return ret; + } + + // the timestamp in rtmp message header is dts. + u_int32_t timestamp = dts; + if ((ret = rtmp_write_packet(SrsCodecFlvTagVideo, timestamp, flv, nb_flv)) != ERROR_SUCCESS) { + return ret; + } + + // reset sps and pps. + h264_sps_changed = false; + h264_pps_changed = false; + h264_sps_pps_sent = true; + + return ret; +} + +int SrsIngestSrsOutput::write_h264_ipb_frame(char* frame, int frame_size, u_int32_t dts, u_int32_t pts) +{ + int ret = ERROR_SUCCESS; + + // when sps or pps not sent, ignore the packet. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/203 + if (!h264_sps_pps_sent) { + return ERROR_H264_DROP_BEFORE_SPS_PPS; + } + + // 5bits, 7.3.1 NAL unit syntax, + // H.264-AVC-ISO_IEC_14496-10.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // for IDR frame, the frame is keyframe. + SrsCodecVideoAVCFrame frame_type = SrsCodecVideoAVCFrameInterFrame; + if (nal_unit_type == SrsAvcNaluTypeIDR) { + frame_type = SrsCodecVideoAVCFrameKeyFrame; + } + + std::string ibp; + if ((ret = avc->mux_ipb_frame(frame, frame_size, ibp)) != ERROR_SUCCESS) { + return ret; + } + + int8_t avc_packet_type = SrsCodecVideoAVCTypeNALU; + char* flv = NULL; + int nb_flv = 0; + if ((ret = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != ERROR_SUCCESS) { + return ret; + } + + // the timestamp in rtmp message header is dts. + u_int32_t timestamp = dts; + return rtmp_write_packet(SrsCodecFlvTagVideo, timestamp, flv, nb_flv); +} + +int SrsIngestSrsOutput::on_ts_audio(SrsTsMessage* msg, SrsStream* avs) +{ + int ret = ERROR_SUCCESS; + + // ensure rtmp connected. + if ((ret = connect()) != ERROR_SUCCESS) { + return ret; + } + + // ts tbn to flv tbn. + u_int32_t dts = (u_int32_t)(msg->dts / 90); + + // send each frame. + while (!avs->empty()) { + char* frame = NULL; + int frame_size = 0; + SrsRawAacStreamCodec codec; + if ((ret = aac->adts_demux(avs, &frame, &frame_size, codec)) != ERROR_SUCCESS) { + return ret; + } + + // ignore invalid frame, + // * atleast 1bytes for aac to decode the data. + if (frame_size <= 0) { + continue; + } + srs_info("mpegts: demux aac frame size=%d, dts=%d", frame_size, dts); + + // generate sh. + if (aac_specific_config.empty()) { + std::string sh; + if ((ret = aac->mux_sequence_header(&codec, sh)) != ERROR_SUCCESS) { + return ret; + } + aac_specific_config = sh; + + codec.aac_packet_type = 0; + + if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != ERROR_SUCCESS) { + return ret; + } + } + + // audio raw data. + codec.aac_packet_type = 1; + if ((ret = write_audio_raw_frame(frame, frame_size, &codec, dts)) != ERROR_SUCCESS) { + return ret; + } + } + + return ret; +} + +int SrsIngestSrsOutput::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, u_int32_t dts) +{ + int ret = ERROR_SUCCESS; + + char* data = NULL; + int size = 0; + if ((ret = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != ERROR_SUCCESS) { + return ret; + } + + return rtmp_write_packet(SrsCodecFlvTagAudio, dts, data, size); +} + +int SrsIngestSrsOutput::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size) +{ + int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* msg = NULL; + + if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) { + srs_error("mpegts: create shared ptr msg failed. ret=%d", ret); + return ret; + } + srs_assert(msg); + + // send out encoded msg. + if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsIngestSrsOutput::connect() +{ + int ret = ERROR_SUCCESS; + + // when ok, ignore. + // TODO: FIXME: should reconnect when disconnected. + if (io || client) { + return ret; + } + + // parse uri + if (!req) { + req = new SrsRequest(); + + size_t pos = string::npos; + string uri = req->tcUrl = out_rtmp->get_url(); + + // tcUrl, stream + if ((pos = uri.rfind("/")) != string::npos) { + req->stream = uri.substr(pos + 1); + req->tcUrl = uri = uri.substr(0, pos); + } + + srs_discovery_tc_url(req->tcUrl, + req->schema, req->host, req->vhost, req->app, req->port, + req->param); + } + + // connect host. + if ((ret = srs_socket_connect(req->host, ::atoi(req->port.c_str()), ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { + srs_error("mpegts: connect server %s:%s failed. ret=%d", req->host.c_str(), req->port.c_str(), ret); + return ret; + } + io = new SrsStSocket(stfd); + client = new SrsRtmpClient(io); + + client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); + client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); + + // connect to vhost/app + if ((ret = client->handshake()) != ERROR_SUCCESS) { + srs_error("mpegts: handshake with server failed. ret=%d", ret); + return ret; + } + if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed. ret=%d", ret); + return ret; + } + if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + return ret; + } + + // publish. + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) { + srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d", + req->stream.c_str(), stream_id, ret); + return ret; + } + + return ret; +} + +// TODO: FIXME: refine the connect_app. +int SrsIngestSrsOutput::connect_app(string ep_server, string ep_port) +{ + int ret = ERROR_SUCCESS; + + // args of request takes the srs info. + if (req->args == NULL) { + req->args = SrsAmf0Any::object(); + } + + // notify server the edge identity, + // @see https://github.com/winlinvip/simple-rtmp-server/issues/147 + SrsAmf0Object* data = req->args; + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY)); + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE)); + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE)); + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL)); + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB)); + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL)); + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT)); + data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); + data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); + // for edge to directly get the id of client. + data->set("srs_pid", SrsAmf0Any::number(getpid())); + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id())); + + // local ip of edge + std::vector ips = srs_get_local_ipv4_ips(); + assert(0 < (int)ips.size()); + std::string local_ip = ips[0]; + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str())); + + // generate the tcUrl + std::string param = ""; + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param); + + // upnode server identity will show in the connect_app of client. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/160 + // the debug_srs_upnode is config in vhost and default to true. + bool debug_srs_upnode = true; + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) { + srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d", + tc_url.c_str(), debug_srs_upnode, ret); + return ret; + } + + return ret; +} + +void SrsIngestSrsOutput::close() +{ + srs_freep(client); + srs_freep(io); + srs_freep(req); + srs_close_stfd(stfd); +} + +// the context for ingest hls stream. +class SrsIngestSrsContext +{ +private: + SrsIngestSrsInput* ic; + SrsIngestSrsOutput* oc; +public: + SrsIngestSrsContext(SrsHttpUri* hls, SrsHttpUri* rtmp) { + ic = new SrsIngestSrsInput(hls); + oc = new SrsIngestSrsOutput(rtmp); + } + virtual ~SrsIngestSrsContext() { + srs_freep(ic); + srs_freep(oc); + } + virtual int proxy() { + int ret = ERROR_SUCCESS; + + if ((ret = ic->connect()) != ERROR_SUCCESS) { + srs_warn("connect oc failed. ret=%d", ret); + return ret; + } + + if ((ret = oc->connect()) != ERROR_SUCCESS) { + srs_warn("connect ic failed. ret=%d", ret); + return ret; + } + + if ((ret = ic->parse(oc)) != ERROR_SUCCESS) { + srs_warn("proxy ts to rtmp failed. ret=%d", ret); + return ret; + } + + return ret; + } +}; + +int proxy_hls2rtmp(string hls, string rtmp) +{ + int ret = ERROR_SUCCESS; + + // init st. + if ((ret = srs_init_st()) != ERROR_SUCCESS) { + srs_error("init st failed. ret=%d", ret); + return ret; + } + + SrsHttpUri hls_uri, rtmp_uri; + if ((ret = hls_uri.initialize(hls)) != ERROR_SUCCESS) { + srs_error("hls uri invalid. ret=%d", ret); + return ret; + } + if ((ret = rtmp_uri.initialize(rtmp)) != ERROR_SUCCESS) { + srs_error("rtmp uri invalid. ret=%d", ret); + return ret; + } + + SrsIngestSrsContext context(&hls_uri, &rtmp_uri); + for (;;) { + if ((ret = context.proxy()) == ERROR_SUCCESS) { + continue; + } + + srs_warn("proxy hls to rtmp failed. ret=%d", ret); + st_usleep(SRS_INGEST_HLS_ERROR_RETRY_MS * 1000); + } + + return ret; +} +