From e8fca60eceb98302d301d77c5432babe681681cb Mon Sep 17 00:00:00 2001 From: loveforever <31402829+zhouxiaojun2008@users.noreply.github.com> Date: Tue, 15 Feb 2022 18:49:17 +0800 Subject: [PATCH] SRT: Close connection if RTMP failed. (#2917) * SRT: using global variables to pass errors (#2897) * SRT: using global variables to pass errors (#2897) --- trunk/src/srt/srt_handle.cpp | 13 +++++++++++++ trunk/src/srt/srt_to_rtmp.cpp | 33 ++++++++++++++++++++------------- trunk/src/srt/srt_to_rtmp.hpp | 9 ++++++--- trunk/src/srt/ts_demux.cpp | 15 ++++++++------- trunk/src/srt/ts_demux.hpp | 4 ++-- 5 files changed, 49 insertions(+), 25 deletions(-) diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index 20547d673..4d4ccd34e 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -206,6 +206,19 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp srt_conn_ptr->update_timestamp(srt_now_ms); srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); + { + std::unique_lock locker(srt2rtmp::_srt_error_mutex); + if (srt2rtmp::_srt_error_map.count(subpath) == 1) { + int err_code = srt2rtmp::_srt_error_map[subpath]; + if (err_code != ERROR_SUCCESS) { + close_push_conn(conn_fd); + srt_log_error("handle_push_data srt to rtmp error:%d, fd:%d", err_code,conn_fd); + //todo: reset to next use, maybe update by srt2rtmp::cycle again + srt2rtmp::_srt_error_map[subpath] = ERROR_SUCCESS; + return; + } + } + } //send data to subscriber(players) //streamid, play map diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index 268a1cbf4..ea3922417 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -16,6 +16,8 @@ #include std::shared_ptr srt2rtmp::s_srt2rtmp_ptr; +std::mutex srt2rtmp::_srt_error_mutex; +std::map srt2rtmp::_srt_error_map; std::shared_ptr srt2rtmp::get_instance() { if (!s_srt2rtmp_ptr) { @@ -153,6 +155,7 @@ void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) { srs_error_t srt2rtmp::cycle() { srs_error_t err = srs_success; _lastcheck_ts = 0; + int err_code = -1; while(true) { SRT_DATA_MSG_PTR msg_ptr = get_data_message(); @@ -163,7 +166,11 @@ srs_error_t srt2rtmp::cycle() { switch (msg_ptr->msg_type()) { case SRT_MSG_DATA_TYPE: { - handle_ts_data(msg_ptr); + err_code = handle_ts_data(msg_ptr); + if (err_code != ERROR_SUCCESS) { + std::unique_lock locker(_srt_error_mutex); + _srt_error_map[msg_ptr->get_path()] = err_code; + } break; } case SRT_MSG_CLOSE_TYPE: @@ -192,7 +199,7 @@ srs_error_t srt2rtmp::cycle() { } } -void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { +int srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { RTMP_CLIENT_PTR rtmp_ptr; auto iter = _rtmp_client_map.find(data_ptr->get_path()); if (iter == _rtmp_client_map.end()) { @@ -203,9 +210,7 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { rtmp_ptr = iter->second; } - rtmp_ptr->receive_ts_data(data_ptr); - - return; + return rtmp_ptr->receive_ts_data(data_ptr); } void srt2rtmp::handle_log_data(SRT_DATA_MSG_PTR data_ptr) { @@ -343,9 +348,8 @@ srs_error_t rtmp_client::connect() { return err; } -void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) { - _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback - return; +int rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) { + return _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback } srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) { @@ -668,13 +672,13 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_ return err; } -void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, +int rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) { srs_error_t err = srs_success; if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) { assert(0); - return; + return 0; } auto avs_ptr = std::make_shared((char*)data_ptr->get_data(), data_ptr->data_len()); @@ -685,13 +689,16 @@ void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media err = on_ts_audio(avs_ptr, dts, pts); } else { srs_error("mpegts demux unkown stream type:0x%02x, only support h264+aac", media_type); - return; + return 0; } if (err != srs_success) { - srs_error("send media data error:", srs_error_code(err)); + srs_error("send media data error:%s", srs_error_desc(err).c_str()); + int err_code = srs_error_code(err); + srs_freep(err); + return err_code; } - return; + return 0; } rtmp_packet_queue::rtmp_packet_queue():_queue_timeout(QUEUE_DEF_TIMEOUT) diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp index 3bf98e103..ce50e6149 100644 --- a/trunk/src/srt/srt_to_rtmp.hpp +++ b/trunk/src/srt/srt_to_rtmp.hpp @@ -71,7 +71,7 @@ public: rtmp_client(std::string key_path); virtual ~rtmp_client(); - void receive_ts_data(SRT_DATA_MSG_PTR data_ptr); + int receive_ts_data(SRT_DATA_MSG_PTR data_ptr); int64_t get_last_live_ts(); std::string get_url(); @@ -79,7 +79,7 @@ public: void close(); private: - virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts); + virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts); private: srs_error_t on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); @@ -140,7 +140,7 @@ public: private: SRT_DATA_MSG_PTR get_data_message(); virtual srs_error_t cycle(); - void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); + int handle_ts_data(SRT_DATA_MSG_PTR data_ptr); void handle_close_rtmpsession(const std::string& key_path); void handle_log_data(SRT_DATA_MSG_PTR data_ptr); void check_rtmp_alive(); @@ -154,6 +154,9 @@ private: std::unordered_map _rtmp_client_map; int64_t _lastcheck_ts; +public: + static std::mutex _srt_error_mutex; + static std::map _srt_error_map; }; #endif diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp index 92d055e44..2c25f5e0a 100644 --- a/trunk/src/srt/ts_demux.cpp +++ b/trunk/src/srt/ts_demux.cpp @@ -276,7 +276,9 @@ int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_C uint64_t pts = 0; //callback last media data in data buffer - on_callback(callback, _last_pid, key_path, _last_dts, _last_pts); + int err_code = on_callback(callback, _last_pid, key_path, _last_dts, _last_pts); + if (err_code != 0) + return err_code; int ret = pes_parse(data_p+npos, npos, &ret_data_p, ret_size, dts, pts); if (ret > 188) { @@ -320,7 +322,7 @@ int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback) continue; } ret = decode_unit(data, path, callback); - if (ret < 0) + if (ret != 0) // srs_error_code is positive { break; } @@ -335,15 +337,15 @@ void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std: return; } -void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path, +int ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path, uint64_t dts, uint64_t pts) { if ((_data_total <=0 ) || (_data_buffer_vec.empty())) { - return; + return 0; } auto iter = _pmt._pid2steamtype.find(pid); if (iter == _pmt._pid2steamtype.end()) { - return; + return 0; } unsigned char stream_type = iter->second; auto total_data_ptr = std::make_shared(_data_total, key_path); @@ -358,8 +360,7 @@ void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, st _data_buffer_vec.clear(); _data_total = 0; - callback->on_data_callback(total_data_ptr, stream_type, dts, pts); - return; + return callback->on_data_callback(total_data_ptr, stream_type, dts, pts); } bool ts_demux::is_pmt(unsigned short pid) { diff --git a/trunk/src/srt/ts_demux.hpp b/trunk/src/srt/ts_demux.hpp index bdd7e9221..5a824567f 100644 --- a/trunk/src/srt/ts_demux.hpp +++ b/trunk/src/srt/ts_demux.hpp @@ -74,7 +74,7 @@ Value Description class ts_media_data_callback_I { public: - virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0; + virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0; }; typedef std::shared_ptr TS_DATA_CALLBACK_PTR; @@ -227,7 +227,7 @@ private: int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size, uint64_t& dts, uint64_t& pts); void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid); - void on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, + int on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path, uint64_t dts, uint64_t pts); private: