solve rtmp client timeout bugs in srt2rtmp

pull/1587/head
runner365 5 years ago
parent 06e7a20b5f
commit dece458a90

@ -61,7 +61,7 @@ bool get_streamid_info(const std::string& streamid, int& mode, std::string& url_
return false; return false;
} }
for (int index = 0; index < info_vec.size(); index++) { for (size_t index = 0; index < info_vec.size(); index++) {
std::string key; std::string key;
std::string value; std::string value;

@ -1,11 +1,13 @@
#include "srt_to_rtmp.hpp" #include "srt_to_rtmp.hpp"
#include "stringex.hpp"
#include "time_help.h"
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_kernel_buffer.hpp> #include <srs_kernel_buffer.hpp>
#include <srs_app_rtmp_conn.hpp> #include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp> #include <srs_app_config.hpp>
#include <srs_kernel_stream.hpp> #include <srs_kernel_stream.hpp>
#include "stringex.hpp" #include <list>
std::shared_ptr<srt2rtmp> srt2rtmp::s_srt2rtmp_ptr; std::shared_ptr<srt2rtmp> srt2rtmp::s_srt2rtmp_ptr;
@ -16,7 +18,7 @@ std::shared_ptr<srt2rtmp> srt2rtmp::get_instance() {
return s_srt2rtmp_ptr; return s_srt2rtmp_ptr;
} }
srt2rtmp::srt2rtmp() { srt2rtmp::srt2rtmp():_lastcheck_ts(0) {
} }
@ -36,6 +38,7 @@ srs_error_t srt2rtmp::init() {
if ((err = _trd_ptr->start()) != srs_success) { if ((err = _trd_ptr->start()) != srs_success) {
return srs_error_wrap(err, "start thread"); return srs_error_wrap(err, "start thread");
} }
srs_trace("srt2rtmp start coroutine..."); srs_trace("srt2rtmp start coroutine...");
return err; return err;
@ -75,9 +78,40 @@ SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
return msg_ptr; return msg_ptr;
} }
void srt2rtmp::check_rtmp_alive() {
const int64_t CHECK_INTERVAL = 15*1000;
const int64_t ALIVE_TIMEOUT_MAX = 20*1000;
if (_lastcheck_ts == 0) {
_lastcheck_ts = now_ms();
return;
}
int64_t timenow_ms = now_ms();
if ((timenow_ms - _lastcheck_ts) > CHECK_INTERVAL) {
_lastcheck_ts = timenow_ms;
for (auto iter = _rtmp_client_map.begin();
iter != _rtmp_client_map.end();) {
RTMP_CLIENT_PTR rtmp_ptr = iter->second;
if ((timenow_ms - rtmp_ptr->get_last_live_ts()) >= ALIVE_TIMEOUT_MAX) {
srs_warn("srt2rtmp client is timeout, url:%s",
rtmp_ptr->get_url().c_str());
_rtmp_client_map.erase(iter++);
rtmp_ptr->close();
} else {
iter++;
}
}
}
return;
}
//the cycle is running in srs coroutine //the cycle is running in srs coroutine
srs_error_t srt2rtmp::cycle() { srs_error_t srt2rtmp::cycle() {
srs_error_t err = srs_success; srs_error_t err = srs_success;
_lastcheck_ts = 0;
while(true) { while(true) {
SRT_DATA_MSG_PTR msg_ptr = get_data_message(); SRT_DATA_MSG_PTR msg_ptr = get_data_message();
@ -87,7 +121,7 @@ srs_error_t srt2rtmp::cycle() {
} else { } else {
handle_ts_data(msg_ptr); handle_ts_data(msg_ptr);
} }
check_rtmp_alive();
if ((err = _trd_ptr->pull()) != srs_success) { if ((err = _trd_ptr->pull()) != srs_success) {
return srs_error_wrap(err, "forwarder"); return srs_error_wrap(err, "forwarder");
} }
@ -112,6 +146,7 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) rtmp_client::rtmp_client(std::string key_path):_key_path(key_path)
, _connect_flag(false) { , _connect_flag(false) {
const std::string DEF_VHOST = "DEFAULT_VHOST";
_ts_demux_ptr = std::make_shared<ts_demux>(); _ts_demux_ptr = std::make_shared<ts_demux>();
_avc_ptr = std::make_shared<SrsRawH264Stream>(); _avc_ptr = std::make_shared<SrsRawH264Stream>();
_aac_ptr = std::make_shared<SrsRawAacStream>(); _aac_ptr = std::make_shared<SrsRawAacStream>();
@ -124,13 +159,20 @@ rtmp_client::rtmp_client(std::string key_path):_key_path(key_path)
_appname = ret_vec[1]; _appname = ret_vec[1];
_streamname = ret_vec[2]; _streamname = ret_vec[2];
} else { } else {
_vhost = "DEFAULT_VHOST"; _vhost = DEF_VHOST;
_appname = ret_vec[0]; _appname = ret_vec[0];
_streamname = ret_vec[1]; _streamname = ret_vec[1];
} }
char url_sz[128]; char url_sz[128];
sprintf(url_sz, "rtmp://127.0.0.1/%s?vhost=%s/%s",
_appname.c_str(), _vhost.c_str(), _streamname.c_str()); if (_vhost == DEF_VHOST) {
sprintf(url_sz, "rtmp://127.0.0.1/%s/%s",
_appname.c_str(), _streamname.c_str());
} else {
sprintf(url_sz, "rtmp://127.0.0.1/%s?vhost=%s/%s",
_appname.c_str(), _vhost.c_str(), _streamname.c_str());
}
_url = url_sz; _url = url_sz;
_h264_sps_changed = false; _h264_sps_changed = false;
@ -148,11 +190,20 @@ void rtmp_client::close() {
if (!_rtmp_conn_ptr) { if (!_rtmp_conn_ptr) {
return; return;
} }
srs_trace("rtmp client close url:%s", _url.c_str());
_rtmp_conn_ptr->close(); _rtmp_conn_ptr->close();
_rtmp_conn_ptr = nullptr; _rtmp_conn_ptr = nullptr;
} }
int64_t rtmp_client::get_last_live_ts() {
return _last_live_ts;
}
std::string rtmp_client::get_url() {
return _url;
}
srs_error_t rtmp_client::connect() { srs_error_t rtmp_client::connect() {
srs_error_t err = srs_success; srs_error_t err = srs_success;
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
@ -366,6 +417,7 @@ srs_error_t rtmp_client::on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_
if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) {
return srs_error_wrap(err, "write frame"); return srs_error_wrap(err, "write frame");
} }
_last_live_ts = now_ms();
} }
return err; return err;
@ -417,6 +469,7 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr<SrsBuffer> avs_ptr, uint64_
if ((err = write_audio_raw_frame(frame, frame_size, &codec, dts)) != srs_success) { if ((err = write_audio_raw_frame(frame, frame_size, &codec, dts)) != srs_success) {
return srs_error_wrap(err, "write audio raw frame"); return srs_error_wrap(err, "write audio raw frame");
} }
_last_live_ts = now_ms();
} }
return err; return err;

@ -30,13 +30,15 @@ public:
~rtmp_client(); ~rtmp_client();
void receive_ts_data(SRT_DATA_MSG_PTR data_ptr); void receive_ts_data(SRT_DATA_MSG_PTR data_ptr);
int64_t get_last_live_ts();
private: std::string get_url();
virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);
srs_error_t connect(); srs_error_t connect();
void close(); void close();
private:
virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);
private: private:
srs_error_t on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts); srs_error_t on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts);
srs_error_t on_ts_audio(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts); srs_error_t on_ts_audio(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts);
@ -68,6 +70,7 @@ private:
private: private:
RTMP_CONN_PTR _rtmp_conn_ptr; RTMP_CONN_PTR _rtmp_conn_ptr;
bool _connect_flag; bool _connect_flag;
int64_t _last_live_ts;
}; };
typedef std::shared_ptr<rtmp_client> RTMP_CLIENT_PTR; typedef std::shared_ptr<rtmp_client> RTMP_CLIENT_PTR;
@ -87,6 +90,7 @@ private:
SRT_DATA_MSG_PTR get_data_message(); SRT_DATA_MSG_PTR get_data_message();
virtual srs_error_t cycle(); virtual srs_error_t cycle();
void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); void handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
void check_rtmp_alive();
private: private:
static std::shared_ptr<srt2rtmp> s_srt2rtmp_ptr; static std::shared_ptr<srt2rtmp> s_srt2rtmp_ptr;
@ -96,6 +100,7 @@ private:
std::queue<SRT_DATA_MSG_PTR> _msg_queue; std::queue<SRT_DATA_MSG_PTR> _msg_queue;
std::unordered_map<std::string, RTMP_CLIENT_PTR> _rtmp_client_map; std::unordered_map<std::string, RTMP_CLIENT_PTR> _rtmp_client_map;
int64_t _lastcheck_ts;
}; };
#endif #endif
Loading…
Cancel
Save