From bb9129707176b7746974ceb0e0f2c883c456a8c4 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 16 Apr 2022 08:10:42 +0800 Subject: [PATCH] SRT: Eliminate unused files for SRT. --- trunk/src/srt/srt_conn.cpp | 263 ----------- trunk/src/srt/srt_conn.hpp | 63 --- trunk/src/srt/srt_data.cpp | 71 --- trunk/src/srt/srt_data.hpp | 48 -- trunk/src/srt/srt_handle.cpp | 406 ----------------- trunk/src/srt/srt_handle.hpp | 70 --- trunk/src/srt/srt_log.cpp | 36 -- trunk/src/srt/srt_log.hpp | 52 --- trunk/src/srt/srt_server.cpp | 359 --------------- trunk/src/srt/srt_server.hpp | 67 --- trunk/src/srt/srt_to_rtmp.cpp | 779 -------------------------------- trunk/src/srt/srt_to_rtmp.hpp | 162 ------- trunk/src/srt/stringex.hpp | 48 -- trunk/src/srt/time_help.hpp | 19 - trunk/src/srt/ts_demux.cpp | 603 ------------------------ trunk/src/srt/ts_demux.hpp | 247 ---------- trunk/src/srt/ts_demux_test.cpp | 61 --- 17 files changed, 3354 deletions(-) delete mode 100644 trunk/src/srt/srt_conn.cpp delete mode 100644 trunk/src/srt/srt_conn.hpp delete mode 100644 trunk/src/srt/srt_data.cpp delete mode 100644 trunk/src/srt/srt_data.hpp delete mode 100644 trunk/src/srt/srt_handle.cpp delete mode 100644 trunk/src/srt/srt_handle.hpp delete mode 100644 trunk/src/srt/srt_log.cpp delete mode 100644 trunk/src/srt/srt_log.hpp delete mode 100644 trunk/src/srt/srt_server.cpp delete mode 100644 trunk/src/srt/srt_server.hpp delete mode 100644 trunk/src/srt/srt_to_rtmp.cpp delete mode 100644 trunk/src/srt/srt_to_rtmp.hpp delete mode 100644 trunk/src/srt/stringex.hpp delete mode 100644 trunk/src/srt/time_help.hpp delete mode 100644 trunk/src/srt/ts_demux.cpp delete mode 100644 trunk/src/srt/ts_demux.hpp delete mode 100644 trunk/src/srt/ts_demux_test.cpp diff --git a/trunk/src/srt/srt_conn.cpp b/trunk/src/srt/srt_conn.cpp deleted file mode 100644 index 662670324..000000000 --- a/trunk/src/srt/srt_conn.cpp +++ /dev/null @@ -1,263 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#include "srt_conn.hpp" -#include "time_help.hpp" -#include "stringex.hpp" -#include "srt_log.hpp" -#include - -#include -#include -#include - -bool is_streamid_valid(const std::string& streamid) { - if (streamid.empty()) { - return false; - } - - size_t pos = streamid.find(" "); - if (pos != streamid.npos) { - return false; - } - - int mode; - std::string subpath; - std::string vhost; - - // Parse the stream info from streamid, see https://github.com/ossrs/srs/issues/2893 - bool ret = get_streamid_info(streamid, mode, vhost, subpath); - if (!ret) { - return false; - } - - std::vector info_vec; - string_split(subpath, "/", info_vec); - - // TODO: FIXME: Should fail at parsing the original SRT URL. - if (info_vec.size() != 2) { - srt_log_warn("path format must be appname/stream?key=value..."); - return false; - } - - for (auto item : info_vec) { - if (item.empty()) { - return false; - } - pos = item.find(" "); - if (pos != item.npos) { - return false; - } - } - return true; -} - -bool get_key_value(const std::string& info, std::string& key, std::string& value) { - size_t pos = info.find("="); - - if (pos == info.npos) { - return false; - } - - key = info.substr(0, pos); - value = info.substr(pos+1); - - if (key.empty() || value.empty()) { - return false; - } - return true; -} - -// See streamid of https://github.com/ossrs/srs/issues/2893 -// TODO: FIMXE: We should parse SRT streamid to URL object, rather than a HTTP url subpath. -bool get_streamid_info(const std::string& streamid, int& mode, std::string& vhost, std::string& url_subpath) -{ - mode = PULL_SRT_MODE; - - size_t pos = streamid.find("#!::"); - if (pos != 0) { - pos = streamid.find("/"); - if (pos == streamid.npos) { - url_subpath = _srs_config->get_default_app_name() + "/" + streamid; - return true; - } - url_subpath = streamid; - return true; - } - - //SRT url supports multiple QueryStrings, which are passed to RTMP to realize authentication and other capabilities - //@see https://github.com/ossrs/srs/issues/2893 - std::string params; - std::string real_streamid; - real_streamid = streamid.substr(4); - - // Compatible with previous auth querystring, like this one: - // srt://127.0.0.1:10080?streamid=#!::h=live/livestream?secret=xxx,m=publish - real_streamid = srs_string_replace(real_streamid, "?", ","); - - std::map query; - srs_parse_query_string(real_streamid, query); - for (std::map::iterator it = query.begin(); it != query.end(); ++it) { - if (it->first == "h") { - std::string host = it->second; - - size_t r0 = host.find("/"); - size_t r1 = host.rfind("/"); - if (r0 != std::string::npos && r0 != std::string::npos) { - // Compatible with previous style, see https://github.com/ossrs/srs/issues/2893#compatible - // srt://127.0.0.1:10080?streamid=#!::h=live/livestream,m=publish - // srt://127.0.0.1:10080?streamid=#!::h=live/livestream,m=request - // srt://127.0.0.1:10080?streamid=#!::h=srs.srt.com.cn/live/livestream,m=publish - if (r0 != r1) { - // We got vhost in host. - url_subpath = host.substr(r0 + 1); - host = host.substr(0, r0); - - params.append("vhost="); - params.append(host); - params.append("&"); - vhost = host; - } else { - // Only stream in host. - url_subpath = host; - } - } else { - // New URL style, see https://github.com/ossrs/srs/issues/2893#solution - // srt://host.com:10080?streamid=#!::h=host.com,r=app/stream,key1=value1,key2=value2 - // srt://1.2.3.4:10080?streamid=#!::h=host.com,r=app/stream,key1=value1,key2=value2 - // srt://1.2.3.4:10080?streamid=#!::r=app/stream,key1=value1,key2=value2 - params.append("vhost="); - params.append(host); - params.append("&"); - vhost = host; - } - } else if (it->first == "r") { - url_subpath = it->second; - } else if (it->first == "m") { - std::string mode_str = it->second; // support m=publish or m=request - std::transform(it->second.begin(), it->second.end(), mode_str.begin(), ::tolower); - if (mode_str == "publish") { - mode = PUSH_SRT_MODE; - } else if (mode_str == "request") { - mode = PULL_SRT_MODE; - } else { - srt_log_warn("unknown mode_str:%s", mode_str.c_str()); - return false; - } - } else { - params.append(it->first); - params.append("="); - params.append(it->second); - params.append("&"); - } - } - - if (url_subpath.empty()) { - return false; - } - - if (!params.empty()) { - url_subpath.append("?"); - url_subpath.append(params); - url_subpath.pop_back(); // remove last '&' - } - - return true; -} - -srt_conn::srt_conn(SRTSOCKET conn_fd, const std::string& streamid):_conn_fd(conn_fd), - _streamid(streamid), - write_fail_cnt_(0) -{ - get_streamid_info(streamid, _mode, _vhost, _url_subpath); - - _update_timestamp = now_ms(); - - if (_vhost.empty()) { - _vhost = "__default_host__"; - } - - srt_log_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s", - streamid.c_str(), _mode, _url_subpath.c_str(), _vhost.c_str()); -} - -srt_conn::~srt_conn() { - close(); -} - -std::string srt_conn::get_vhost() { - return _vhost; -} - -void srt_conn::update_timestamp(long long now_ts) { - _update_timestamp = now_ts; -} - -long long srt_conn::get_last_ts() { - return _update_timestamp; -} - -void srt_conn::close() { - if (_conn_fd == SRT_INVALID_SOCK) { - return; - } - srt_close(_conn_fd); - _conn_fd = SRT_INVALID_SOCK; -} - -SRTSOCKET srt_conn::get_conn() { - return _conn_fd; -} -int srt_conn::get_mode() { - return _mode; -} - -std::string srt_conn::get_streamid() { - return _streamid; -} - -std::string srt_conn::get_path() { - if (!_url_path.empty()) { - return _url_path; - } - - size_t pos = _url_subpath.find("?"); - _url_path = (pos != std::string::npos) ? _url_subpath.substr(0, pos) : _url_subpath; - - return _url_path; -} - -std::string srt_conn::get_subpath() { - return _url_subpath; -} - -int srt_conn::read(unsigned char* data, int len) { - int ret = 0; - - ret = srt_recv(_conn_fd, (char*)data, len); - if (ret <= 0) { - srt_log_error("srt read error:%d, socket fd:%d", ret, _conn_fd); - return ret; - } - return ret; -} - -int srt_conn::write(unsigned char* data, int len) { - int ret = 0; - - ret = srt_send(_conn_fd, (char*)data, len); - if (ret <= 0) { - srt_log_error("srt write error:%d, socket fd:%d", ret, _conn_fd); - write_fail_cnt_++; - return ret; - } - write_fail_cnt_ = 0; - return ret; -} - -int srt_conn::get_write_fail_count() { - return write_fail_cnt_; -} \ No newline at end of file diff --git a/trunk/src/srt/srt_conn.hpp b/trunk/src/srt/srt_conn.hpp deleted file mode 100644 index 0608e2305..000000000 --- a/trunk/src/srt/srt_conn.hpp +++ /dev/null @@ -1,63 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#ifndef SRT_CONN_H -#define SRT_CONN_H - -#include - -#include "stringex.hpp" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define ERR_SRT_MODE 0x00 -#define PULL_SRT_MODE 0x01 -#define PUSH_SRT_MODE 0x02 - -bool is_streamid_valid(const std::string& streamid); -bool get_key_value(const std::string& info, std::string& key, std::string& value); -bool get_streamid_info(const std::string& streamid, int& mode, std::string& vhost, std::string& url_subpash); - -class srt_conn { -public: - srt_conn(SRTSOCKET conn_fd, const std::string& streamid); - ~srt_conn(); - - void close(); - SRTSOCKET get_conn(); - int get_mode(); - std::string get_streamid(); - std::string get_path(); - std::string get_subpath(); - std::string get_vhost(); - int read(unsigned char* data, int len); - int write(unsigned char* data, int len); - - void update_timestamp(long long now_ts); - long long get_last_ts(); - int get_write_fail_count(); - -private: - SRTSOCKET _conn_fd; - std::string _streamid; - std::string _url_path; - std::string _url_subpath; - std::string _vhost; - int _mode; - long long _update_timestamp; - int write_fail_cnt_; -}; - -typedef std::shared_ptr SRT_CONN_PTR; - -#endif //SRT_CONN_H diff --git a/trunk/src/srt/srt_data.cpp b/trunk/src/srt/srt_data.cpp deleted file mode 100644 index 2087b70d5..000000000 --- a/trunk/src/srt/srt_data.cpp +++ /dev/null @@ -1,71 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#include "srt_data.hpp" -#include - -SRT_DATA_MSG::SRT_DATA_MSG(const std::string& path, unsigned int msg_type):_msg_type(msg_type) - ,_len(0) - ,_data_p(nullptr) - ,_key_path(path) { - -} - -SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type) - ,_len(len) - ,_key_path(path) { - _data_p = new unsigned char[len]; - memset(_data_p, 0, len); -} - -SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type) - ,_len(len) - ,_key_path(path) -{ - _data_p = new unsigned char[len]; - memcpy(_data_p, data_p, len); -} - -SRT_DATA_MSG::SRT_DATA_MSG(LOGGER_LEVEL log_level, const std::string& log_content): _msg_type(SRT_MSG_LOG_TYPE) - ,_log_content(log_content) - ,_log_level(log_level) -{ - -} - -SRT_DATA_MSG::~SRT_DATA_MSG() { - if (_data_p && (_len > 0)) { - delete[] _data_p; - } -} - -unsigned int SRT_DATA_MSG::msg_type() { - return _msg_type; -} - -void SRT_DATA_MSG::set_msg_type(unsigned int msg_type) { - _msg_type = msg_type; -} - -std::string SRT_DATA_MSG::get_path() { - return _key_path; -} - -unsigned int SRT_DATA_MSG::data_len() { - return _len; -} - -unsigned char* SRT_DATA_MSG::get_data() { - return _data_p; -} - -LOGGER_LEVEL SRT_DATA_MSG::get_log_level() { - return _log_level; -} - -const char* SRT_DATA_MSG::get_log_string() { - return _log_content.c_str(); -} diff --git a/trunk/src/srt/srt_data.hpp b/trunk/src/srt/srt_data.hpp deleted file mode 100644 index e7241fc90..000000000 --- a/trunk/src/srt/srt_data.hpp +++ /dev/null @@ -1,48 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#ifndef SRT_DATA_H -#define SRT_DATA_H - -#include "srt_log.hpp" -#include - -#include -#include - -#define SRT_MSG_DATA_TYPE 0x01 -#define SRT_MSG_CLOSE_TYPE 0x02 -#define SRT_MSG_LOG_TYPE 0x03 - -class SRT_DATA_MSG { -public: - SRT_DATA_MSG(const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE); - SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE); - SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE); - SRT_DATA_MSG(LOGGER_LEVEL log_level, const std::string& log_content); - ~SRT_DATA_MSG(); - - unsigned int msg_type(); - unsigned int data_len(); - unsigned char* get_data(); - std::string get_path(); - LOGGER_LEVEL get_log_level(); - const char* get_log_string(); - - void set_msg_type(unsigned int msg_type); - -private: - unsigned int _msg_type; - unsigned int _len = 0; - unsigned char* _data_p = nullptr; - std::string _key_path; - std::string _log_content; - LOGGER_LEVEL _log_level = SRT_LOGGER_TRACE_LEVEL; -}; - -typedef std::shared_ptr SRT_DATA_MSG_PTR; - -#endif diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp deleted file mode 100644 index fd1f951bb..000000000 --- a/trunk/src/srt/srt_handle.cpp +++ /dev/null @@ -1,406 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#include "srt_handle.hpp" -#include "time_help.hpp" -#include "srt_log.hpp" - -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -static bool MONITOR_STATICS_ENABLE = false; -static long long MONITOR_TIMEOUT = 5000; -const unsigned int DEF_DATA_SIZE = 188*7; -const long long CHECK_ALIVE_INTERVAL = 5*1000; -const long long CHECK_ALIVE_TIMEOUT = 5*1000; -static const int SRT_WRTIE_FAIL_MAX = 10; - -long long srt_now_ms = 0; - -srt_handle::srt_handle(int pollid):_handle_pollid(pollid) - ,_last_timestamp(0) - ,_last_check_alive_ts(0) { -} - -srt_handle::~srt_handle() { - -} - -void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) { - SRT_TRACEBSTATS mon; - srt_bstats(srtsocket, &mon, 1); - std::ostringstream output; - long long now_ul = now_ms(); - - if (!MONITOR_STATICS_ENABLE) { - return; - } - if (_last_timestamp == 0) { - _last_timestamp = now_ul; - return; - } - - if ((now_ul - _last_timestamp) < MONITOR_TIMEOUT) { - return; - } - _last_timestamp = now_ul; - output << "======= SRT STATS: sid=" << streamid << std::endl; - output << "PACKETS SENT: " << std::setw(11) << mon.pktSent << " RECEIVED: " << std::setw(11) << mon.pktRecv << std::endl; - output << "LOST PKT SENT: " << std::setw(11) << mon.pktSndLoss << " RECEIVED: " << std::setw(11) << mon.pktRcvLoss << std::endl; - output << "REXMIT SENT: " << std::setw(11) << mon.pktRetrans << " RECEIVED: " << std::setw(11) << mon.pktRcvRetrans << std::endl; - output << "DROP PKT SENT: " << std::setw(11) << mon.pktSndDrop << " RECEIVED: " << std::setw(11) << mon.pktRcvDrop << std::endl; - output << "RATE SENDING: " << std::setw(11) << mon.mbpsSendRate << " RECEIVING: " << std::setw(11) << mon.mbpsRecvRate << std::endl; - output << "BELATED RECEIVED: " << std::setw(11) << mon.pktRcvBelated << " AVG TIME: " << std::setw(11) << mon.pktRcvAvgBelatedTime << std::endl; - output << "REORDER DISTANCE: " << std::setw(11) << mon.pktReorderDistance << std::endl; - output << "WINDOW FLOW: " << std::setw(11) << mon.pktFlowWindow << " CONGESTION: " << std::setw(11) << mon.pktCongestionWindow << " FLIGHT: " << std::setw(11) << mon.pktFlightSize << std::endl; - output << "LINK RTT: " << std::setw(9) << mon.msRTT << "ms BANDWIDTH: " << std::setw(7) << mon.mbpsBandwidth << "Mb/s " << std::endl; - output << "BUFFERLEFT: SND: " << std::setw(11) << mon.byteAvailSndBuf << " RCV: " << std::setw(11) << mon.byteAvailRcvBuf << std::endl; - - srt_log_trace("\r\n%s", output.str().c_str()); - return; -} - -void srt_handle::add_new_puller(SRT_CONN_PTR conn_ptr, std::string stream_id) { - _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); - - auto iter = _streamid_map.find(stream_id); - if (iter == _streamid_map.end()) { - std::unordered_map srtsocket_map; - srtsocket_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); - - _streamid_map.insert(std::make_pair(stream_id, srtsocket_map)); - srt_log_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str()); - } else { - iter->second.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); - srt_log_trace("add new puller fd:%d, streamid:%s, size:%d", - conn_ptr->get_conn(), stream_id.c_str(), iter->second.size()); - } - - return; -} - -void srt_handle::close_pull_conn(SRTSOCKET srtsocket, std::string stream_id) { - srt_log_trace("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str()); - srt_epoll_remove_usock(_handle_pollid, srtsocket); - - auto streamid_iter = _streamid_map.find(stream_id); - if (streamid_iter != _streamid_map.end()) { - if (streamid_iter->second.size() == 0) { - _streamid_map.erase(stream_id); - } else if (streamid_iter->second.size() == 1) { - streamid_iter->second.erase(srtsocket); - _streamid_map.erase(stream_id); - } else { - streamid_iter->second.erase(srtsocket); - } - } else { - assert(0); - } - - auto conn_iter = _conn_map.find(srtsocket); - if (conn_iter != _conn_map.end()) { - _conn_map.erase(conn_iter); - return; - } else { - assert(0); - } - - return; -} - -SRT_CONN_PTR srt_handle::get_srt_conn(SRTSOCKET conn_srt_socket) { - SRT_CONN_PTR ret_conn; - - auto iter = _conn_map.find(conn_srt_socket); - if (iter == _conn_map.end()) { - return ret_conn; - } - - ret_conn = iter->second; - - return ret_conn; -} - -void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { - int val_i; - int opt_len = sizeof(int); - - int64_t val_i64; - int opt64_len = sizeof(int64_t); - - srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len); - srt_log_trace("srto SRTO_LATENCY=%d", val_i); - - srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_PEERLATENCY, &val_i, &opt_len); - srt_log_trace("srto SRTO_PEERLATENCY=%d", val_i); - srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVLATENCY, &val_i, &opt_len); - srt_log_trace("srto SRTO_RCVLATENCY=%d", val_i); - - srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len); - srt_log_trace("srto SRTO_SNDBUF=%d", val_i); - srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len); - srt_log_trace("srto SRTO_RCVBUF=%d", val_i); - srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i64, &opt64_len); - srt_log_trace("srto SRTO_MAXBW=%d", val_i64); - srt_log_trace("srt mix_correct is %s.", _srs_config->get_srt_mix_correct() ? "enable" : "disable"); - srt_log_trace("srt h264 sei filter is %s.", _srs_config->get_srt_sei_filter() ? "enable" : "disable"); - - if (conn_ptr->get_mode() == PULL_SRT_MODE) { - add_new_puller(conn_ptr, conn_ptr->get_path()); - } else { - if(add_new_pusher(conn_ptr) == false) { - srt_log_trace("push connection is repeated and rejected, fd:%d, streamid:%s", - conn_ptr->get_conn(), conn_ptr->get_streamid().c_str()); - conn_ptr->close(); - return; - } - } - srt_log_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events); - int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events); - if (ret < 0) { - srt_log_error("srt handle run add epoll error:%d", ret); - return; - } - - return; -} - -void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& path, const std::string& subpath, SRTSOCKET conn_fd) { - SRT_CONN_PTR srt_conn_ptr; - unsigned char data[DEF_DATA_SIZE]; - int ret; - srt_conn_ptr = get_srt_conn(conn_fd); - - if (!srt_conn_ptr) { - srt_log_error("handle_push_data fd:%d fail to find srt connection.", conn_fd); - return; - } - - if (status != SRTS_CONNECTED) { - srt_log_error("handle_push_data error status:%d fd:%d", status, conn_fd); - close_push_conn(conn_fd); - return; - } - - ret = srt_conn_ptr->read(data, DEF_DATA_SIZE); - if (ret <= 0) { - srt_log_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd); - close_push_conn(conn_fd); - return; - } - - srt_conn_ptr->update_timestamp(srt_now_ms); - - srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); - { - std::unique_lock locker(srt2rtmp::_srt_error_mutex); - if (srt2rtmp::_srt_error_map.count(subpath) == 1) { - int err_code = srt2rtmp::_srt_error_map[subpath]; - if (err_code != ERROR_SUCCESS) { - close_push_conn(conn_fd); - srt_log_error("handle_push_data srt to rtmp error:%d, fd:%d", err_code,conn_fd); - //todo: reset to next use, maybe update by srt2rtmp::cycle again - srt2rtmp::_srt_error_map[subpath] = ERROR_SUCCESS; - return; - } - } - } - - //send data to subscriber(players) - //streamid, play map - auto streamid_iter = _streamid_map.find(path); - if (streamid_iter == _streamid_map.end()) {//no puler - srt_log_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd); - return; - } - srt_log_info("receive data size(%d) from pusher(%d) to pullers, count:%d", - ret, conn_fd, streamid_iter->second.size()); - - std::vector remove_vec; - for (auto puller_iter = streamid_iter->second.begin(); - puller_iter != streamid_iter->second.end(); - puller_iter++) { - auto player_conn = puller_iter->second; - if (!player_conn) { - srt_log_error("handle_push_data get srt connect error from fd:%d", puller_iter->first); - continue; - } - int write_ret = player_conn->write(data, ret); - srt_log_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first); - if (write_ret > 0) { - puller_iter->second->update_timestamp(srt_now_ms); - } else { - if (player_conn->get_write_fail_count() > SRT_WRTIE_FAIL_MAX) { - remove_vec.push_back(puller_iter->first); - } - } - } - - for (auto item : remove_vec) { - streamid_iter->second.erase(item); - if (streamid_iter->second.empty()) { - _streamid_map.erase(streamid_iter); - } - } - - return; -} - -void srt_handle::check_alive() { - long long diff_t; - std::list conn_list; - - if (_last_check_alive_ts == 0) { - _last_check_alive_ts = srt_now_ms; - return; - } - diff_t = srt_now_ms - _last_check_alive_ts; - if (diff_t < CHECK_ALIVE_INTERVAL) { - return; - } - - for (auto conn_iter = _conn_map.begin(); - conn_iter != _conn_map.end(); - conn_iter++) - { - long long timeout = srt_now_ms - conn_iter->second->get_last_ts(); - if (timeout > CHECK_ALIVE_TIMEOUT) { - conn_list.push_back(conn_iter->second); - } - } - - for (auto del_iter = conn_list.begin(); - del_iter != conn_list.end(); - del_iter++) - { - SRT_CONN_PTR conn_ptr = *del_iter; - if (conn_ptr->get_mode() == PUSH_SRT_MODE) { - srt_log_warn("check alive close pull connection fd:%d, streamid:%s", - conn_ptr->get_conn(), conn_ptr->get_subpath().c_str()); - close_push_conn(conn_ptr->get_conn()); - } else if (conn_ptr->get_mode() == PULL_SRT_MODE) { - srt_log_warn("check alive close pull connection fd:%d, streamid:%s", - conn_ptr->get_conn(), conn_ptr->get_path().c_str()); - close_pull_conn(conn_ptr->get_conn(), conn_ptr->get_path()); - } else { - srt_log_error("check_alive get unkown srt mode:%d, fd:%d", - conn_ptr->get_mode(), conn_ptr->get_conn()); - assert(0); - } - } -} - -void srt_handle::close_push_conn(SRTSOCKET srtsocket) { - auto iter = _conn_map.find(srtsocket); - - if (iter != _conn_map.end()) { - SRT_CONN_PTR conn_ptr = iter->second; - auto push_iter = _push_conn_map.find(conn_ptr->get_path()); - if (push_iter != _push_conn_map.end()) { - _push_conn_map.erase(push_iter); - } - _conn_map.erase(iter); - srt2rtmp::get_instance()->insert_ctrl_message(SRT_MSG_CLOSE_TYPE, conn_ptr->get_subpath()); - conn_ptr->close(); - } - - srt_epoll_remove_usock(_handle_pollid, srtsocket); - - return; -} - -bool srt_handle::add_new_pusher(SRT_CONN_PTR conn_ptr) { - auto push_iter = _push_conn_map.find(conn_ptr->get_path()); - if (push_iter != _push_conn_map.end()) { - return false; - } - _push_conn_map.insert(std::make_pair(conn_ptr->get_path(), conn_ptr)); - _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); - srt_log_trace("srt_handle add new pusher streamid:%s, subpath:%s, sid:%s", - conn_ptr->get_streamid().c_str(), conn_ptr->get_subpath().c_str(), conn_ptr->get_path().c_str()); - return true; -} - -void srt_handle::handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) { - srt_log_info("handle_pull_data status:%d, subpath:%s, fd:%d", - status, subpath.c_str(), conn_fd); - auto conn_ptr = get_srt_conn(conn_fd); - if (!conn_ptr) { - srt_log_error("handle_pull_data fail to find fd(%d)", conn_fd); - assert(0); - return; - } - conn_ptr->update_timestamp(srt_now_ms); -} - -void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) -{ - auto conn_ptr = get_srt_conn(conn_fd); - if (!conn_ptr) { - if (status != SRTS_CLOSED) { - srt_log_error("handle_srt_socket find srt connection error, fd:%d, status:%d", - conn_fd, status); - } - return; - } - - std::string path = conn_ptr->get_path(); - std::string subpath = conn_ptr->get_subpath(); - - int mode = conn_ptr->get_mode(); - if (mode == PUSH_SRT_MODE) { - switch (status) - { - case SRTS_CONNECTED: - { - handle_push_data(status, path, subpath, conn_fd); - break; - } - case SRTS_BROKEN: - { - srt_log_warn("srt push disconnected event fd:%d, streamid:%s", - conn_fd, conn_ptr->get_streamid().c_str()); - close_push_conn(conn_fd); - break; - } - default: - srt_log_error("push mode unkown status:%d, fd:%d", status, conn_fd); - break; - } - } else if (mode == PULL_SRT_MODE) { - switch (status) - { - case SRTS_CONNECTED: - { - handle_pull_data(status, subpath, conn_fd); - break; - } - case SRTS_BROKEN: - { - srt_log_warn("srt pull disconnected fd:%d, streamid:%s", - conn_fd, conn_ptr->get_streamid().c_str()); - close_pull_conn(conn_fd, path); - break; - } - default: - srt_log_error("pull mode unkown status:%d, fd:%d", status, conn_fd); - break; - } - } else { - assert(0); - } - return; -} diff --git a/trunk/src/srt/srt_handle.hpp b/trunk/src/srt/srt_handle.hpp deleted file mode 100644 index 052ac0f19..000000000 --- a/trunk/src/srt/srt_handle.hpp +++ /dev/null @@ -1,70 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#ifndef SRT_HANDLE_H -#define SRT_HANDLE_H - -#include - -#include - -#include -#include -#include -#include -#include -#include - -#include "srt_conn.hpp" -#include "srt_to_rtmp.hpp" - -class srt_handle { -public: - srt_handle(int pollid); - ~srt_handle(); - - //add new srt connection into epoll event - void add_newconn(SRT_CONN_PTR conn_ptr, int events); - //handle recv/send srt socket - void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd); - //check srt connection whether it's still alive. - void check_alive(); - -private: - //get srt conn object by srt socket - SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket); - - void handle_push_data(SRT_SOCKSTATUS status, const std::string& path, const std::string& subpath, SRTSOCKET conn_fd); - void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); - - //add new puller into puller list and conn_map - void add_new_puller(SRT_CONN_PTR, std::string stream_id); - //remove pull srt from play list - void close_pull_conn(SRTSOCKET srtsocket, std::string stream_id); - - //add new pusher into pusher map: - bool add_new_pusher(SRT_CONN_PTR conn_ptr); - //remove push connection and remove epoll - void close_push_conn(SRTSOCKET srtsocket); - - //debug statics - void debug_statics(SRTSOCKET srtsocket, const std::string& streamid); - -private: - int _handle_pollid; - - std::unordered_map _conn_map;//save all srt connection: pull or push - - //save push srt connection for prevent from repeat push connection - std::unordered_map _push_conn_map;//key:streamid, value:SRT_CONN_PTR - //streamid, play map - std::unordered_map> _streamid_map; - - long long _last_timestamp; - long long _last_check_alive_ts; -}; - -#endif //SRT_HANDLE_H diff --git a/trunk/src/srt/srt_log.cpp b/trunk/src/srt/srt_log.cpp deleted file mode 100644 index f850b7203..000000000 --- a/trunk/src/srt/srt_log.cpp +++ /dev/null @@ -1,36 +0,0 @@ -#include "srt_log.hpp" -#include "srt_to_rtmp.hpp" -#include -#include -#include - -LOGGER_LEVEL s_log_level = SRT_LOGGER_TRACE_LEVEL; -static char* srt_log_buffer = new char[LOGGER_BUFFER_SIZE]; - -void snprintbuffer(char* buffer, size_t size, const char* fmt, ...) { - va_list ap; - - va_start(ap, fmt); - vsnprintf(buffer, size, fmt, ap); - va_end(ap); - - return; -} - -void set_srt_log_level(LOGGER_LEVEL level) { - s_log_level = level; -} - -LOGGER_LEVEL get_srt_log_level() { - return s_log_level; -} - -char* get_srt_log_buffer() { - return srt_log_buffer; -} - -void srt_log_output(LOGGER_LEVEL level, const char* buffer) { - std::string log_content(buffer); - srt2rtmp::get_instance()->insert_log_message(level, log_content); - return; -} diff --git a/trunk/src/srt/srt_log.hpp b/trunk/src/srt/srt_log.hpp deleted file mode 100644 index a7b2f5d34..000000000 --- a/trunk/src/srt/srt_log.hpp +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef SRT_LOG_HPP -#define SRT_LOG_HPP -#include -#include - -#define LOGGER_BUFFER_SIZE (10*1024) - -typedef enum { - SRT_LOGGER_INFO_LEVEL, - SRT_LOGGER_TRACE_LEVEL, - SRT_LOGGER_WARN_LEVEL, - SRT_LOGGER_ERROR_LEVEL -} LOGGER_LEVEL; - -void set_srt_log_level(LOGGER_LEVEL level); -LOGGER_LEVEL get_srt_log_level(); -char* get_srt_log_buffer(); -void srt_log_output(LOGGER_LEVEL level, const char* buffer); -void snprintbuffer(char* buffer, size_t size, const char* fmt, ...); - -#define srt_log_error(...) \ - if (get_srt_log_level() <= SRT_LOGGER_ERROR_LEVEL) \ - { \ - char* buffer = get_srt_log_buffer(); \ - snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \ - srt_log_output(SRT_LOGGER_ERROR_LEVEL, buffer); \ - } - -#define srt_log_warn(...) \ - if (get_srt_log_level() <= SRT_LOGGER_WARN_LEVEL) \ - { \ - char* buffer = get_srt_log_buffer(); \ - snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \ - srt_log_output(SRT_LOGGER_WARN_LEVEL, buffer); \ - } - -#define srt_log_trace(...) \ - if (get_srt_log_level() <= SRT_LOGGER_TRACE_LEVEL) \ - { \ - char* buffer = get_srt_log_buffer(); \ - snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \ - srt_log_output(SRT_LOGGER_TRACE_LEVEL, buffer); \ - } - -#define srt_log_info(...) \ - if (get_srt_log_level() <= SRT_LOGGER_INFO_LEVEL) \ - { \ - char* buffer = get_srt_log_buffer(); \ - snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \ - srt_log_output(SRT_LOGGER_INFO_LEVEL, buffer); \ - } -#endif //SRT_LOG_HPP \ No newline at end of file diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp deleted file mode 100644 index 10630efa3..000000000 --- a/trunk/src/srt/srt_server.cpp +++ /dev/null @@ -1,359 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#include "srt_server.hpp" -#include "srt_handle.hpp" -#include "srt_log.hpp" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -srt_server::srt_server(unsigned short port):_listen_port(port) - ,_server_socket(-1) -{ -} - -srt_server::~srt_server() -{ - -} - -int srt_server::init_srt_parameter() { - const int DEF_RECV_LATENCY = 120; - const int DEF_PEER_LATENCY = 0; - - int opt_len = sizeof(int); - - if (_server_socket == -1) { - return -1; - } - int maxbw = _srs_config->get_srto_maxbw(); - srt_setsockopt(_server_socket, 0, SRTO_MAXBW, &maxbw, opt_len); - int mss = _srs_config->get_srto_mss(); - srt_setsockopt(_server_socket, 0, SRTO_MSS, &mss, opt_len); - - bool tlpkdrop = _srs_config->get_srto_tlpkdrop(); - int tlpkdrop_i = tlpkdrop ? 1 : 0; - srt_setsockopt(_server_socket, 0, SRTO_TLPKTDROP, &tlpkdrop_i, opt_len); - - int connection_timeout = _srs_config->get_srto_conntimeout(); - srt_setsockopt(_server_socket, 0, SRTO_CONNTIMEO, &connection_timeout, opt_len); - - int send_buff = _srs_config->get_srto_sendbuf(); - srt_setsockopt(_server_socket, 0, SRTO_SNDBUF, &send_buff, opt_len); - int recv_buff = _srs_config->get_srto_recvbuf(); - srt_setsockopt(_server_socket, 0, SRTO_RCVBUF, &recv_buff, opt_len); - int payload_size = _srs_config->get_srto_payloadsize(); - srt_setsockopt(_server_socket, 0, SRTO_PAYLOADSIZE, &payload_size, opt_len); - - int latency = _srs_config->get_srto_latency(); - if (DEF_RECV_LATENCY != latency) { - srt_setsockopt(_server_socket, 0, SRTO_LATENCY, &latency, opt_len); - } - - int recv_latency = _srs_config->get_srto_recv_latency(); - if (DEF_RECV_LATENCY != recv_latency) { - srt_setsockopt(_server_socket, 0, SRTO_RCVLATENCY, &recv_latency, opt_len); - } - - int peer_latency = _srs_config->get_srto_peer_latency(); - if (DEF_PEER_LATENCY != peer_latency) { - srt_setsockopt(_server_socket, 0, SRTO_PEERLATENCY, &recv_latency, opt_len); - } - - srt_log_trace("init srt parameter, maxbw:%d, mss:%d, tlpkdrop:%d, connect timeout:%d, \ -send buff:%d, recv buff:%d, payload size:%d, latency:%d, recv latency:%d, peer latency:%d", - maxbw, mss, tlpkdrop, connection_timeout, send_buff, recv_buff, payload_size, - latency, recv_latency, peer_latency); - return 0; -} - -void srt_server::init_srt_log() { - SrsLogLevel level = srs_get_log_level(_srs_config->get_log_level()); - switch (level) { - case SrsLogLevelInfo: - { - set_srt_log_level(SRT_LOGGER_INFO_LEVEL); - break; - } - case SrsLogLevelTrace: - { - set_srt_log_level(SRT_LOGGER_TRACE_LEVEL); - break; - } - case SrsLogLevelWarn: - { - set_srt_log_level(SRT_LOGGER_WARN_LEVEL); - break; - } - case SrsLogLevelError: - { - set_srt_log_level(SRT_LOGGER_ERROR_LEVEL); - break; - } - default: - { - set_srt_log_level(SRT_LOGGER_TRACE_LEVEL); - } - } - return; -} - -int srt_server::init_srt() { - if (_server_socket != -1) { - return -1; - } - - init_srt_log(); - - _server_socket = srt_create_socket(); - sockaddr_in sa; - memset(&sa, 0, sizeof sa); - sa.sin_family = AF_INET; - sa.sin_port = htons(_listen_port); - - sockaddr* psa = (sockaddr*)&sa; - - int ret = srt_bind(_server_socket, psa, sizeof(sa)); - if ( ret == SRT_ERROR ) - { - srt_close(_server_socket); - srt_log_error("srt bind error: %d", ret); - return -1; - } - - ret = srt_listen(_server_socket, 5); - if (ret == SRT_ERROR) - { - srt_close(_server_socket); - srt_log_error("srt listen error: %d", ret); - return -2; - } - - init_srt_parameter(); - - _pollid = srt_epoll_create(); - if (_pollid < -1) { - srt_log_error("srt server srt_epoll_create error, port=%d", _listen_port); - return -1; - } - _handle_ptr = std::make_shared(_pollid); - - int events = SRT_EPOLL_IN | SRT_EPOLL_ERR; - ret = srt_epoll_add_usock(_pollid, _server_socket, &events); - if (ret < 0) { - srt_log_error("srt server run add epoll error:%d", ret); - return ret; - } - - srt_log_trace("srt server listen port=%d, server_fd=%d", _listen_port, _server_socket); - - return 0; -} - -int srt_server::start() -{ - int ret; - - if ((ret = init_srt()) < 0) { - return ret; - } - - run_flag = true; - srt_log_trace("srt server is starting... port(%d)", _listen_port); - thread_run_ptr = std::make_shared(&srt_server::on_work, this); - return 0; -} - -void srt_server::stop() -{ - run_flag = false; - if (!thread_run_ptr) { - return; - } - thread_run_ptr->join(); - - return; -} - -void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) { - SRTSOCKET conn_fd = -1; - sockaddr_in scl; - int sclen = sizeof(scl); - int conn_event;// = SRT_EPOLL_IN |SRT_EPOLL_OUT| SRT_EPOLL_ERR; - - switch(status) { - case SRTS_LISTENING: - { - conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen); - if (conn_fd == -1) { - return; - } - //add new srt connect into srt handle - std::string streamid = UDT::getstreamid(conn_fd); - if (!is_streamid_valid(streamid)) { - srt_log_trace("srt streamid(%s) error, fd:%d", streamid.c_str(), conn_fd); - srt_close(conn_fd); - return; - } - SRT_CONN_PTR srt_conn_ptr = std::make_shared(conn_fd, streamid); - - std::string vhost_str = srt_conn_ptr->get_vhost(); - srt_log_trace("new srt connection streamid:%s, fd:%d, vhost:%s", - streamid.c_str(), conn_fd, vhost_str.c_str()); - SrsConfDirective* vhost_p = _srs_config->get_vhost(vhost_str, true); - if (!vhost_p) { - srt_log_trace("srt streamid(%s): no vhost %s, fd:%d", - streamid.c_str(), vhost_str.c_str(), conn_fd); - srt_conn_ptr->close(); - return; - } - if (srt_conn_ptr->get_mode() == PULL_SRT_MODE) { - //add SRT_EPOLL_IN for information notify - conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;//not inlucde SRT_EPOLL_OUT for save cpu - } else if (srt_conn_ptr->get_mode() == PUSH_SRT_MODE) { - conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR; - } else { - srt_log_trace("stream mode error, it should be m=push or m=pull, streamid:%s", - srt_conn_ptr->get_streamid().c_str()); - srt_conn_ptr->close(); - return; - } - - _handle_ptr->add_newconn(srt_conn_ptr, conn_event); - break; - } - case SRTS_CONNECTED: - { - srt_log_trace("srt connected: socket=%d, mode:%s", input_fd, dscr.c_str()); - break; - } - case SRTS_BROKEN: - { - srt_epoll_remove_usock(_pollid, input_fd); - srt_close(input_fd); - srt_log_warn("srt close: socket=%d", input_fd); - break; - } - default: - { - srt_log_error("srt server unkown status:%d", status); - } - } -} - -void srt_server::srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) { - _handle_ptr->handle_srt_socket(status, input_fd); - return; -} - -void srt_server::on_work() -{ - const unsigned int SRT_FD_MAX = 100; - srt_log_trace("srt server is working port(%d)", _listen_port); - while (run_flag) - { - SRTSOCKET read_fds[SRT_FD_MAX]; - SRTSOCKET write_fds[SRT_FD_MAX]; - int rfd_num = SRT_FD_MAX; - int wfd_num = SRT_FD_MAX; - - int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds, &wfd_num, -1, - nullptr, nullptr, nullptr, nullptr); - if (ret < 0) { - continue; - } - _handle_ptr->check_alive(); - - for (int index = 0; index < rfd_num; index++) { - SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]); - if (_server_socket == read_fds[index]) { - srt_handle_connection(status, read_fds[index], "read fd"); - } else { - srt_handle_data(status, read_fds[index], "read fd"); - } - } - - for (int index = 0; index < wfd_num; index++) { - SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]); - if (_server_socket == write_fds[index]) { - srt_handle_connection(status, write_fds[index], "write fd"); - } else { - srt_handle_data(status, write_fds[index], "write fd"); - } - } - } - - // New API at 2020-01-28, >1.4.1 - // @see https://github.com/Haivision/srt/commit/b8c70ec801a56bea151ecce9c09c4ebb720c2f68#diff-fb66028e8746fea578788532533a296bR786 -#if (SRT_VERSION_MAJOR<<24 | SRT_VERSION_MINOR<<16 | SRT_VERSION_PATCH<<8) > 0x01040100 - srt_epoll_clear_usocks(_pollid); -#endif -} - -SrtServerAdapter::SrtServerAdapter() -{ -} - -SrtServerAdapter::~SrtServerAdapter() -{ -} - -srs_error_t SrtServerAdapter::initialize() -{ - srs_error_t err = srs_success; - - // TODO: FIXME: We could fork processes here, because here only ST is initialized. - - return err; -} - -srs_error_t SrtServerAdapter::run(SrsWaitGroup* wg) -{ - srs_error_t err = srs_success; - - // TODO: FIXME: We could start a coroutine to dispatch SRT task to processes. - - if(_srs_config->get_srt_enabled()) { - srt_log_trace("srt server is enabled..."); - unsigned short srt_port = _srs_config->get_srt_listen_port(); - srt_log_trace("srt server listen port:%d", srt_port); - err = srt2rtmp::get_instance()->init(); - if (err != srs_success) { - return srs_error_wrap(err, "srt start srt2rtmp error"); - } - - srt_ptr = std::make_shared(srt_port); - if (!srt_ptr) { - return srs_error_wrap(err, "srt listen %d", srt_port); - } - } else { - srt_log_trace("srt server is disabled..."); - } - - if(_srs_config->get_srt_enabled()) { - srt_ptr->start(); - } - - return err; -} - -void SrtServerAdapter::stop() -{ - // TODO: FIXME: If forked processes, we should do cleanup. -} diff --git a/trunk/src/srt/srt_server.hpp b/trunk/src/srt/srt_server.hpp deleted file mode 100644 index 6b86ee4c7..000000000 --- a/trunk/src/srt/srt_server.hpp +++ /dev/null @@ -1,67 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#ifndef SRT_SERVER_H -#define SRT_SERVER_H - -#include - -#include - -#include -#include - -#include - -class srt_handle; -class SrsWaitGroup; - -class srt_server { -public: - srt_server(unsigned short port); - ~srt_server(); - - int start();//init srt handl and create srt main thread loop - void stop();//stop srt main thread loop - -private: - //init srt socket and srt epoll - int init_srt(); - int init_srt_parameter(); - void init_srt_log(); - - //srt main epoll loop - void on_work(); - //accept new srt connection - void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); - //get srt data read/write - void srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); - -private: - unsigned short _listen_port; - SRTSOCKET _server_socket; - int _pollid; - bool run_flag; - std::shared_ptr thread_run_ptr; - std::shared_ptr _handle_ptr; -}; - -typedef std::shared_ptr SRT_SERVER_PTR; - -class SrtServerAdapter : public ISrsHybridServer -{ -private: - SRT_SERVER_PTR srt_ptr; -public: - SrtServerAdapter(); - virtual ~SrtServerAdapter(); -public: - virtual srs_error_t initialize(); - virtual srs_error_t run(SrsWaitGroup* wg); - virtual void stop(); -}; - -#endif//SRT_SERVER_H diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp deleted file mode 100644 index f367201a4..000000000 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ /dev/null @@ -1,779 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#include "srt_to_rtmp.hpp" -#include "stringex.hpp" -#include "time_help.hpp" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -std::shared_ptr srt2rtmp::s_srt2rtmp_ptr; -std::mutex srt2rtmp::_srt_error_mutex; -std::map srt2rtmp::_srt_error_map; - -std::shared_ptr srt2rtmp::get_instance() { - if (!s_srt2rtmp_ptr) { - s_srt2rtmp_ptr = std::make_shared(); - } - return s_srt2rtmp_ptr; -} - -srt2rtmp::srt2rtmp():_lastcheck_ts(0) { - -} - -srt2rtmp::~srt2rtmp() { - release(); -} - -srs_error_t srt2rtmp::init() { - srs_error_t err = srs_success; - - if (_trd_ptr.get() != nullptr) { - return srs_error_wrap(err, "don't start thread again"); - } - - _trd_ptr = std::make_shared("srt2rtmp", this); - - if ((err = _trd_ptr->start()) != srs_success) { - return srs_error_wrap(err, "start thread"); - } - - srs_trace("srt2rtmp start coroutine..."); - - return err; -} - -void srt2rtmp::release() { - if (!_trd_ptr) { - return; - } - _trd_ptr->stop(); - _trd_ptr = nullptr; -} - -void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path) { - std::unique_lock locker(_mutex); - - SRT_DATA_MSG_PTR msg_ptr = std::make_shared(data_p, len, key_path); - _msg_queue.push(msg_ptr); - //_notify_cond.notify_one(); - return; -} - -void srt2rtmp::insert_ctrl_message(unsigned int msg_type, const std::string& key_path) { - std::unique_lock locker(_mutex); - - SRT_DATA_MSG_PTR msg_ptr = std::make_shared(key_path, msg_type); - _msg_queue.push(msg_ptr); - //_notify_cond.notify_one(); - return; -} - -void srt2rtmp::insert_log_message(LOGGER_LEVEL level, const std::string& log_content) { - std::unique_lock locker(_mutex); - - SRT_DATA_MSG_PTR msg_ptr = std::make_shared(level, log_content); - msg_ptr->set_msg_type(SRT_MSG_LOG_TYPE); - _msg_queue.push(msg_ptr); - - return; -} - -SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { - std::unique_lock locker(_mutex); - SRT_DATA_MSG_PTR msg_ptr; - - if (_msg_queue.empty()) - { - return msg_ptr; - } - //while (_msg_queue.empty()) { - // _notify_cond.wait(locker); - //} - - msg_ptr = _msg_queue.front(); - _msg_queue.pop(); - return msg_ptr; -} - -void srt2rtmp::check_rtmp_alive() { - const int64_t CHECK_INTERVAL = 5*1000; - const int64_t ALIVE_TIMEOUT_MAX = 5*1000; - - if (_lastcheck_ts == 0) { - _lastcheck_ts = now_ms(); - return; - } - int64_t timenow_ms = now_ms(); - - if ((timenow_ms - _lastcheck_ts) > CHECK_INTERVAL) { - _lastcheck_ts = timenow_ms; - - for (auto iter = _rtmp_client_map.begin(); - iter != _rtmp_client_map.end();) { - RTMP_CLIENT_PTR rtmp_ptr = iter->second; - - if ((timenow_ms - rtmp_ptr->get_last_live_ts()) >= ALIVE_TIMEOUT_MAX) { - srs_warn("srt2rtmp client is timeout, url:%s", - rtmp_ptr->get_url().c_str()); - _rtmp_client_map.erase(iter++); - rtmp_ptr->close(); - } else { - iter++; - } - } - } - return; -} - -void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) { - RTMP_CLIENT_PTR rtmp_ptr; - auto iter = _rtmp_client_map.find(key_path); - if (iter == _rtmp_client_map.end()) { - srs_error("fail to close rtmp session fail, can't find session by key_path:%s", - key_path.c_str()); - return; - } - rtmp_ptr = iter->second; - _rtmp_client_map.erase(iter); - srs_trace("close rtmp session which key_path is %s", key_path.c_str()); - rtmp_ptr->close(); - - return; -} - -//the cycle is running in srs coroutine -srs_error_t srt2rtmp::cycle() { - srs_error_t err = srs_success; - _lastcheck_ts = 0; - int err_code = -1; - - while(true) { - SRT_DATA_MSG_PTR msg_ptr = get_data_message(); - - if (!msg_ptr) { - srs_usleep((30 * SRS_UTIME_MILLISECONDS)); - } else { - switch (msg_ptr->msg_type()) { - case SRT_MSG_DATA_TYPE: - { - err_code = handle_ts_data(msg_ptr); - if (err_code != ERROR_SUCCESS) { - std::unique_lock locker(_srt_error_mutex); - _srt_error_map[msg_ptr->get_path()] = err_code; - } - break; - } - case SRT_MSG_CLOSE_TYPE: - { - handle_close_rtmpsession(msg_ptr->get_path()); - break; - } - case SRT_MSG_LOG_TYPE: - { - handle_log_data(msg_ptr); - break; - } - default: - { - srs_error("srt to rtmp get wrong message type(%u), path:%s", - msg_ptr->msg_type(), msg_ptr->get_path().c_str()); - assert(0); - } - } - - } - check_rtmp_alive(); - if ((err = _trd_ptr->pull()) != srs_success) { - return srs_error_wrap(err, "forwarder"); - } - } -} - -int srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { - RTMP_CLIENT_PTR rtmp_ptr; - auto iter = _rtmp_client_map.find(data_ptr->get_path()); - if (iter == _rtmp_client_map.end()) { - srs_trace("new rtmp client for srt upstream, key_path:%s", data_ptr->get_path().c_str()); - rtmp_ptr = std::make_shared(data_ptr->get_path()); - _rtmp_client_map.insert(std::make_pair(data_ptr->get_path(), rtmp_ptr)); - } else { - rtmp_ptr = iter->second; - } - - return rtmp_ptr->receive_ts_data(data_ptr); -} - -void srt2rtmp::handle_log_data(SRT_DATA_MSG_PTR data_ptr) { - switch (data_ptr->get_log_level()) { - case SRT_LOGGER_INFO_LEVEL: - { - srs_info(data_ptr->get_log_string()); - break; - } - case SRT_LOGGER_TRACE_LEVEL: - { - srs_trace(data_ptr->get_log_string()); - break; - } - case SRT_LOGGER_WARN_LEVEL: - { - srs_warn(data_ptr->get_log_string()); - break; - } - case SRT_LOGGER_ERROR_LEVEL: - { - srs_error(data_ptr->get_log_string()); - break; - } - default: - { - srs_trace(data_ptr->get_log_string()); - } - } - return; -} - -rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) - , _connect_flag(false) { - const std::string DEF_VHOST = "DEFAULT_VHOST"; - _ts_demux_ptr = std::make_shared(); - _avc_ptr = std::make_shared(); - _aac_ptr = std::make_shared(); - std::vector ret_vec; - - string_split(key_path, "/", ret_vec); - - if (ret_vec.size() >= 3) { - _vhost = ret_vec[0]; - _appname = ret_vec[1]; - _streamname = ret_vec[2]; - } else { - _vhost = DEF_VHOST; - _appname = ret_vec[0]; - _streamname = ret_vec[1]; - } - std::stringstream url_ss; - - std::vector ip_ports = _srs_config->get_listens(); - int port = 0; - std::string ip; - - for (auto item : ip_ports) { - srs_parse_endpoint(item, ip, port); - if (port != 0) { - break; - } - } - - port = (port == 0) ? SRS_CONSTS_RTMP_DEFAULT_PORT : port; - - std::stringstream ss; - ss << "rtmp://" << SRS_CONSTS_LOCALHOST; - ss << ":" << port; - ss << "/" << _appname; - ss << "/" << _streamname; - ss << (_streamname.find("?") != std::string::npos ? "&" : "?") << "upstream=srt"; - if (_vhost != DEF_VHOST) { - ss << "&vhost=" << _vhost; - } - - _url = ss.str(); - - _h264_sps_changed = false; - _h264_pps_changed = false; - _h264_sps_pps_sent = false; - - _last_live_ts = now_ms(); - srs_trace("rtmp client construct url:%s", url_ss.str().c_str()); -} - -rtmp_client::~rtmp_client() { - -} - -void rtmp_client::close() { - _connect_flag = false; - if (!_rtmp_conn_ptr) { - return; - } - srs_trace("rtmp client close url:%s", _url.c_str()); - _rtmp_conn_ptr->close(); - _rtmp_conn_ptr = nullptr; - -} - -int64_t rtmp_client::get_last_live_ts() { - return _last_live_ts; -} - -std::string rtmp_client::get_url() { - return _url; -} - -srs_error_t rtmp_client::connect() { - srs_error_t err = srs_success; - srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; - srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; - - _last_live_ts = now_ms(); - if (_connect_flag) { - return srs_success; - } - - if (_rtmp_conn_ptr.get() != nullptr) { - return srs_error_wrap(err, "repeated connect %s failed, cto=%dms, sto=%dms.", - _url.c_str(), srsu2msi(cto), srsu2msi(sto)); - } - - _rtmp_conn_ptr = std::make_shared(_url, cto, sto); - - if ((err = _rtmp_conn_ptr->connect()) != srs_success) { - close(); - return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", - _url.c_str(), srsu2msi(cto), srsu2msi(sto)); - } - - if ((err = _rtmp_conn_ptr->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) { - close(); - return srs_error_wrap(err, "rtmp client in srt2rtmp publish fail url:%s", _url.c_str()); - } - _connect_flag = true; - return err; -} - -int rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) { - return _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback -} - -srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) { - srs_error_t err = srs_success; - - // TODO: FIMXE: there exists bug, see following comments. - // when sps or pps changed, update the sequence header, - // for the pps maybe not changed while sps changed. - // so, we must check when each video ts message frame parsed. - if (!_h264_sps_changed || !_h264_pps_changed) { - return err; - } - - // h264 raw to h264 packet. - std::string sh; - if ((err = _avc_ptr->mux_sequence_header(_h264_sps, _h264_pps, dts, pts, sh)) != srs_success) { - return srs_error_wrap(err, "mux sequence header"); - } - - // h264 packet to flv packet. - int8_t frame_type = SrsVideoAvcFrameTypeKeyFrame; - int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader; - char* flv = NULL; - int nb_flv = 0; - if ((err = _avc_ptr->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { - return srs_error_wrap(err, "avc to flv"); - } - - if (_srs_config->get_srt_mix_correct()) { - _rtmp_queue.insert_rtmp_data((unsigned char*)flv, nb_flv, (int64_t)dts, SrsFrameTypeVideo); - err = rtmp_write_work(); - } else { - err = rtmp_write_packet(SrsFrameTypeVideo, dts, flv, nb_flv); - } - - // reset sps and pps. - _h264_sps_changed = false; - _h264_pps_changed = false; - _h264_sps_pps_sent = true; - - return err; -} - -srs_error_t rtmp_client::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) { - srs_error_t err = srs_success; - - // when sps or pps not sent, ignore the packet. - // @see https://github.com/ossrs/srs/issues/203 - if (!_h264_sps_pps_sent) { - return srs_error_new(ERROR_H264_DROP_BEFORE_SPS_PPS, "drop sps/pps"); - } - - // 5bits, 7.3.1 NAL unit syntax, - // ISO_IEC_14496-10-AVC-2003.pdf, page 44. - // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame - SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); - - // for IDR frame, the frame is keyframe. - SrsVideoAvcFrameType frame_type = SrsVideoAvcFrameTypeInterFrame; - if (nal_unit_type == SrsAvcNaluTypeIDR) { - frame_type = SrsVideoAvcFrameTypeKeyFrame; - } - - std::string ibp; - if ((err = _avc_ptr->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { - return srs_error_wrap(err, "mux frame"); - } - - int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; - char* flv = NULL; - int nb_flv = 0; - if ((err = _avc_ptr->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { - return srs_error_wrap(err, "mux avc to flv"); - } - if (_srs_config->get_srt_mix_correct()) { - _rtmp_queue.insert_rtmp_data((unsigned char*)flv, nb_flv, (int64_t)dts, SrsFrameTypeVideo); - err = rtmp_write_work(); - } else { - err = rtmp_write_packet(SrsFrameTypeVideo, dts, flv, nb_flv); - } - - return err; -} - -srs_error_t rtmp_client::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) { - srs_error_t err = srs_success; - - char* data = NULL; - int size = 0; - if ((err = _aac_ptr->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { - return srs_error_wrap(err, "mux aac to flv"); - } - if (_srs_config->get_srt_mix_correct()) { - _rtmp_queue.insert_rtmp_data((unsigned char*)data, size, (int64_t)dts, SrsFrameTypeAudio); - err = rtmp_write_work(); - } else { - err = rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); - } - - return err; -} - -srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) { - srs_error_t err = srs_success; - SrsSharedPtrMessage* msg = NULL; - - if (!_rtmp_conn_ptr) { - //when rtmp connection is closed, it's not error and just return; - srs_freepa(data); - return err; - } - - if ((err = srs_rtmp_create_msg(type, timestamp, data, size, _rtmp_conn_ptr->sid(), &msg)) != srs_success) { - return srs_error_wrap(err, "create message fail, url:%s", _url.c_str()); - } - srs_assert(msg); - - // send out encoded msg. - if ((err = _rtmp_conn_ptr->send_and_free_message(msg)) != srs_success) { - close(); - return srs_error_wrap(err, "rtmp client in srt2rtmp send message fail, url:%s", _url.c_str()); - } - - return err; -} - -srs_error_t rtmp_client::rtmp_write_work() { - srs_error_t err = srs_success; - rtmp_packet_info_s packet_info; - bool ret = false; - - do { - ret = _rtmp_queue.get_rtmp_data(packet_info); - if (ret) { - err = rtmp_write_packet(packet_info._type, packet_info._dts, (char*)packet_info._data, packet_info._len); - if (err != srs_success) { - break; - } - } - } while(ret); - - return err; -} - -srs_error_t rtmp_client::on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) { - srs_error_t err = srs_success; - - // ensure rtmp connected. - if ((err = connect()) != srs_success) { - return err; - } - dts = dts / 90; - pts = pts / 90; - - if (dts == 0) { - dts = pts; - } - - // send each frame. - while (!avs_ptr->empty()) { - char* frame = NULL; - int frame_size = 0; - if ((err = _avc_ptr->annexb_demux(avs_ptr.get(), &frame, &frame_size)) != srs_success) { - return srs_error_wrap(err, "demux annexb"); - } - - // 5bits, 7.3.1 NAL unit syntax, - // ISO_IEC_14496-10-AVC-2003.pdf, page 44. - // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame - SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); - - // ignore the nalu type aud(9), pad(12) - if ((nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter) - || (nal_unit_type == SrsAvcNaluTypeFilterData)) { - continue; - } - - // TODO: FIXME: Should cache this config, it's better not to get it for each video frame. - if (_srs_config->get_srt_sei_filter()) { - if (nal_unit_type == SrsAvcNaluTypeSEI) { - continue; - } - } - - // for sps - if (_avc_ptr->is_sps(frame, frame_size)) { - std::string sps; - if ((err = _avc_ptr->sps_demux(frame, frame_size, sps)) != srs_success) { - return srs_error_wrap(err, "demux sps"); - } - - if (_h264_sps == sps) { - continue; - } - _h264_sps_changed = true; - _h264_sps = sps; - - if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { - return srs_error_wrap(err, "write sps/pps"); - } - continue; - } - - // for pps - if (_avc_ptr->is_pps(frame, frame_size)) { - std::string pps; - if ((err = _avc_ptr->pps_demux(frame, frame_size, pps)) != srs_success) { - return srs_error_wrap(err, "demux pps"); - } - - if (_h264_pps == pps) { - continue; - } - _h264_pps_changed = true; - _h264_pps = pps; - - if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { - return srs_error_wrap(err, "write sps/pps"); - } - continue; - } - - // ibp frame. - // for Issue: https://github.com/ossrs/srs/issues/2390 - // we only skip pps/sps frame and send left nalus. - srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", avs_ptr->left() + frame_size, dts); - if ((err = write_h264_ipb_frame(avs_ptr->head() - frame_size, avs_ptr->left() + frame_size, dts, pts)) != srs_success) { - return srs_error_wrap(err, "write frame"); - } - _last_live_ts = now_ms(); - break; - } - - return err; -} - -int rtmp_client::get_sample_rate(char sample_index) { - int sample_rate = 44100; - - if ((sample_index >= 0) && (sample_index < SrsAAcSampleRateNumbers)) { - sample_rate = srs_aac_srates[(uint8_t)sample_index]; - } - - return sample_rate; -} - -srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) { - srs_error_t err = srs_success; - uint64_t base_dts; - uint64_t real_dts; - uint64_t first_dts; - int index = 0; - int sample_size = 1024; - - // ensure rtmp connected. - if ((err = connect()) != srs_success) { - return srs_error_wrap(err, "connect"); - } - - base_dts = dts/90; - if (base_dts == 0) { - base_dts = pts/90; - } - - // send each frame. - while (!avs_ptr->empty()) { - char* frame = NULL; - int frame_size = 0; - SrsRawAacStreamCodec codec; - if ((err = _aac_ptr->adts_demux(avs_ptr.get(), &frame, &frame_size, codec)) != srs_success) { - return srs_error_wrap(err, "demux adts"); - } - - if (frame_size <= 0) { - continue; - } - int sample_rate = get_sample_rate(codec.sound_rate); - - if (codec.aac_packet_type > SrsAudioOpusFrameTraitRaw) { - sample_size = 2048; - } else { - sample_size = 1024; - } - - real_dts = base_dts + index * 1000.0 * sample_size / sample_rate; - if (index == 0) { - first_dts = real_dts; - } - index++; - - // generate sh. - if (_aac_specific_config.empty()) { - std::string sh; - if ((err = _aac_ptr->mux_sequence_header(&codec, sh)) != srs_success) { - return srs_error_wrap(err, "mux sequence header"); - } - _aac_specific_config = sh; - - codec.aac_packet_type = 0; - - if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, real_dts)) != srs_success) { - return srs_error_wrap(err, "write raw audio frame"); - } - } - - // audio raw data. - codec.aac_packet_type = 1; - if ((err = write_audio_raw_frame(frame, frame_size, &codec, real_dts)) != srs_success) { - return srs_error_wrap(err, "write audio raw frame"); - } - _last_live_ts = now_ms(); - } - - uint64_t diff_t = real_dts - first_dts; - diff_t += 100; - if ((diff_t > 200) && (diff_t < 600)) { - srs_info("set_queue_timeout timeout:%lu", diff_t); - _rtmp_queue.set_queue_timeout(diff_t); - } - return err; -} - -int rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, - uint64_t dts, uint64_t pts) -{ - srs_error_t err = srs_success; - if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) { - assert(0); - return 0; - } - - auto avs_ptr = std::make_shared((char*)data_ptr->get_data(), data_ptr->data_len()); - - if (media_type == STREAM_TYPE_VIDEO_H264) { - err = on_ts_video(avs_ptr, dts, pts); - } else if (media_type == STREAM_TYPE_AUDIO_AAC) { - err = on_ts_audio(avs_ptr, dts, pts); - } else { - srs_error("mpegts demux unkown stream type:0x%02x, only support h264+aac", media_type); - return 0; - } - - if (err != srs_success) { - srs_error("send media data error:%s", srs_error_desc(err).c_str()); - int err_code = srs_error_code(err); - srs_freep(err); - return err_code; - } - return 0; -} - -rtmp_packet_queue::rtmp_packet_queue():_queue_timeout(QUEUE_DEF_TIMEOUT) - ,_queue_maxlen(QUEUE_LEN_MAX) - ,_first_packet_t(-1) - ,_first_local_t(-1) { - -} - -rtmp_packet_queue::~rtmp_packet_queue() { - for (auto item : _send_map) { - rtmp_packet_info_s info = item.second; - if (info._data) { - delete info._data; - } - } - _send_map.clear(); -} - -void rtmp_packet_queue::set_queue_timeout(int64_t queue_timeout) { - _queue_timeout = queue_timeout; -} - -void rtmp_packet_queue::insert_rtmp_data(unsigned char* data, int len, int64_t dts, char media_type) { - rtmp_packet_info_s packet_info; - - packet_info._data = data; - packet_info._len = len; - packet_info._dts = dts; - packet_info._type = media_type; - - if (_first_packet_t == -1) { - _first_packet_t = dts; - _first_local_t = (int64_t)now_ms(); - } - - _send_map.insert(std::make_pair(dts, packet_info)); - return; -} - -bool rtmp_packet_queue::is_ready() { - if (!_srs_config->get_srt_mix_correct() && !_send_map.empty()) { - return true; - } - if (_send_map.size() < 2) { - return false; - } - - if (_send_map.size() >= (size_t)_queue_maxlen) { - return true; - } - - auto first_item = _send_map.begin(); - int64_t now_t = (int64_t)now_ms(); - - int64_t diff_t = (now_t - _first_local_t) - (first_item->first - _first_packet_t); - - if (diff_t >= _queue_timeout) { - return true; - } - return false; -} - -bool rtmp_packet_queue::get_rtmp_data(rtmp_packet_info_s& packet_info) { - if (!is_ready()) { - return false; - } - auto iter = _send_map.begin(); - packet_info = iter->second; - _send_map.erase(iter); - - return true; -} diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp deleted file mode 100644 index cd54ff63c..000000000 --- a/trunk/src/srt/srt_to_rtmp.hpp +++ /dev/null @@ -1,162 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#ifndef SRT_TO_RTMP_H -#define SRT_TO_RTMP_H - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "srt_data.hpp" -#include "ts_demux.hpp" -#include "srt_log.hpp" - -#define SRT_VIDEO_MSG_TYPE 0x01 -#define SRT_AUDIO_MSG_TYPE 0x02 - -typedef std::shared_ptr RTMP_CONN_PTR; -typedef std::shared_ptr AVC_PTR; -typedef std::shared_ptr AAC_PTR; - -#define DEFAULT_VHOST "__default_host__" - -#define QUEUE_DEF_TIMEOUT 500 -#define QUEUE_LEN_MAX 100 - -typedef struct { - unsigned char* _data; - int _len; - int64_t _dts; - char _type; - char reserve[3]; -} rtmp_packet_info_s; - -class rtmp_packet_queue { -public: - rtmp_packet_queue(); - ~rtmp_packet_queue(); - - void set_queue_timeout(int64_t queue_timeout); - void insert_rtmp_data(unsigned char* data, int len, int64_t dts, char media_type); - bool get_rtmp_data(rtmp_packet_info_s& packet_info); - -private: - bool is_ready(); - -private: - int64_t _queue_timeout; - int64_t _queue_maxlen; - int64_t _first_packet_t; - int64_t _first_local_t; - std::multimap _send_map;//key:dts, value:rtmp_packet_info -}; - -class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_from_this { -public: - rtmp_client(std::string key_path); - virtual ~rtmp_client(); - - int receive_ts_data(SRT_DATA_MSG_PTR data_ptr); - int64_t get_last_live_ts(); - std::string get_url(); - - srs_error_t connect(); - void close(); - -private: - virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts); - -private: - srs_error_t on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); - srs_error_t on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); - virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); - virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts); - virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts); - - int get_sample_rate(char sound_rate); - - srs_error_t rtmp_write_work(); - -private: - virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size); - -private: - std::string _key_path; - std::string _url; - std::string _vhost; - std::string _appname; - std::string _streamname; - TS_DEMUX_PTR _ts_demux_ptr; - -private: - AVC_PTR _avc_ptr; - std::string _h264_sps; - bool _h264_sps_changed; - std::string _h264_pps; - bool _h264_pps_changed; - bool _h264_sps_pps_sent; -private: - std::string _aac_specific_config; - AAC_PTR _aac_ptr; -private: - RTMP_CONN_PTR _rtmp_conn_ptr; - bool _connect_flag; - int64_t _last_live_ts; - -private: - rtmp_packet_queue _rtmp_queue; -}; - -typedef std::shared_ptr RTMP_CLIENT_PTR; - -class srt2rtmp : public ISrsCoroutineHandler { -public: - static std::shared_ptr get_instance(); - srt2rtmp(); - virtual ~srt2rtmp(); - - srs_error_t init(); - void release(); - - void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path); - void insert_ctrl_message(unsigned int msg_type, const std::string& key_path); - void insert_log_message(LOGGER_LEVEL level, const std::string& log_content); - -private: - SRT_DATA_MSG_PTR get_data_message(); - virtual srs_error_t cycle(); - int handle_ts_data(SRT_DATA_MSG_PTR data_ptr); - void handle_close_rtmpsession(const std::string& key_path); - void handle_log_data(SRT_DATA_MSG_PTR data_ptr); - void check_rtmp_alive(); - -private: - static std::shared_ptr s_srt2rtmp_ptr; - std::shared_ptr _trd_ptr; - std::mutex _mutex; - //std::condition_variable_any _notify_cond; - std::queue _msg_queue; - - std::unordered_map _rtmp_client_map; - int64_t _lastcheck_ts; -public: - static std::mutex _srt_error_mutex; - static std::map _srt_error_map; -}; - -#endif diff --git a/trunk/src/srt/stringex.hpp b/trunk/src/srt/stringex.hpp deleted file mode 100644 index 633e58ecf..000000000 --- a/trunk/src/srt/stringex.hpp +++ /dev/null @@ -1,48 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#ifndef STRING_EX_H -#define STRING_EX_H - -#include - -#include -#include -#include -#include -#include -#include - -inline int string_split(const std::string& input_str, const std::string& split_str, std::vector& output_vec) { - if (input_str.length() == 0) { - return 0; - } - - std::string tempString(input_str); - do { - - size_t pos = tempString.find(split_str); - if (pos == tempString.npos) { - output_vec.push_back(tempString); - break; - } - std::string seg_str = tempString.substr(0, pos); - tempString = tempString.substr(pos+split_str.size()); - output_vec.push_back(seg_str); - } while(tempString.size() > 0); - - return output_vec.size(); -} - -inline std::string string_lower(const std::string input_str) { - std::string output_str(input_str); - - std::transform(input_str.begin(), input_str.end(), output_str.begin(), ::tolower); - - return output_str; -} - -#endif//STRING_EX_H diff --git a/trunk/src/srt/time_help.hpp b/trunk/src/srt/time_help.hpp deleted file mode 100644 index e58e5c0fd..000000000 --- a/trunk/src/srt/time_help.hpp +++ /dev/null @@ -1,19 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#ifndef TIME_HELP_H -#define TIME_HELP_H - -#include - -#include - -inline long long now_ms() { - return std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()).count(); -} - -#endif //TIME_HELP_H \ No newline at end of file diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp deleted file mode 100644 index 6b919014e..000000000 --- a/trunk/src/srt/ts_demux.cpp +++ /dev/null @@ -1,603 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#include "ts_demux.hpp" -#include "srt_log.hpp" -#include - -ts_demux::ts_demux():_data_total(0) - ,_last_pid(0) - ,_last_dts(0) - ,_last_pts(0) -{ - -} - -ts_demux::~ts_demux() { - -} - -int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback) -{ - int pos = 0; - int npos = 0; - ts_header ts_header_info; - - ts_header_info._sync_byte = data_p[pos]; - pos++; - - ts_header_info._transport_error_indicator = (data_p[pos]&0x80)>>7; - ts_header_info._payload_unit_start_indicator = (data_p[pos]&0x40)>>6; - ts_header_info._transport_priority = (data_p[pos]&0x20)>>5; - ts_header_info._PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; - pos += 2; - - ts_header_info._transport_scrambling_control = (data_p[pos]&0xC0)>>6; - ts_header_info._adaptation_field_control = (data_p[pos]&0x30)>>4; - ts_header_info._continuity_counter = (data_p[pos]&0x0F); - pos++; - npos = pos; - - adaptation_field* field_p = &(ts_header_info._adaptation_field_info); - // adaptation field - // 0x01 No adaptation_field, payload only - // 0x02 Adaptation_field only, no payload - // 0x03 Adaptation_field followed by payload - if( ts_header_info._adaptation_field_control == 2 - || ts_header_info._adaptation_field_control == 3 ){ - // adaptation_field() - field_p->_adaptation_field_length = data_p[pos]; - pos++; - - if( field_p->_adaptation_field_length > 0 ){ - field_p->_discontinuity_indicator = (data_p[pos]&0x80)>>7; - field_p->_random_access_indicator = (data_p[pos]&0x40)>>6; - field_p->_elementary_stream_priority_indicator = (data_p[pos]&0x20)>>5; - field_p->_PCR_flag = (data_p[pos]&0x10)>>4; - field_p->_OPCR_flag = (data_p[pos]&0x08)>>3; - field_p->_splicing_point_flag = (data_p[pos]&0x04)>>2; - field_p->_transport_private_data_flag = (data_p[pos]&0x02)>>1; - field_p->_adaptation_field_extension_flag = (data_p[pos]&0x01); - pos++; - - if( field_p->_PCR_flag == 1 ) { // PCR info - //program_clock_reference_base 33 uimsbf - //reserved 6 bslbf - //program_clock_reference_extension 9 uimsbf - pos += 6; - } - if( field_p->_OPCR_flag == 1 ) { - //original_program_clock_reference_base 33 uimsbf - //reserved 6 bslbf - //original_program_clock_reference_extension 9 uimsbf - pos += 6; - } - if( field_p->_splicing_point_flag == 1 ) { - //splice_countdown 8 tcimsbf - pos++; - } - if( field_p->_transport_private_data_flag == 1 ) { - //transport_private_data_length 8 uimsbf - field_p->_transport_private_data_length = data_p[pos]; - pos++; - memcpy(field_p->_private_data_byte, data_p + pos, field_p->_transport_private_data_length); - pos += field_p->_transport_private_data_length; - } - if( field_p->_adaptation_field_extension_flag == 1 ) { - //adaptation_field_extension_length 8 uimsbf - field_p->_adaptation_field_extension_length = data_p[pos]; - pos++; - //ltw_flag 1 bslbf - field_p->_ltw_flag = (data_p[pos]&0x80)>>7; - //piecewise_rate_flag 1 bslbf - field_p->_piecewise_rate_flag = (data_p[pos]&0x40)>>6; - //seamless_splice_flag 1 bslbf - field_p->_seamless_splice_flag = (data_p[pos]&0x20)>>5; - //reserved 5 bslbf - pos++; - if (field_p->_ltw_flag == 1) { - //ltw_valid_flag 1 bslbf - //ltw_offset 15 uimsbf - pos += 2; - } - if (field_p->_piecewise_rate_flag == 1) { - //reserved 2 bslbf - //piecewise_rate 22 uimsbf - pos += 3; - } - if (field_p->_seamless_splice_flag == 1) { - //splice_type 4 bslbf - //DTS_next_AU[32..30] 3 bslbf - //marker_bit 1 bslbf - //DTS_next_AU[29..15] 15 bslbf - //marker_bit 1 bslbf - //DTS_next_AU[14..0] 15 bslbf - //marker_bit 1 bslbf - pos += 5; - } - } - } - npos += sizeof(field_p->_adaptation_field_length) + field_p->_adaptation_field_length; - pos = npos;//must consider the 'stuffing_byte' in adaptation field - } - - if(ts_header_info._adaptation_field_control == 1 - || ts_header_info._adaptation_field_control == 3 ) { - // data_byte with placeholder - // payload parser - if(ts_header_info._PID == 0x00){ - // PAT // program association table - if(ts_header_info._payload_unit_start_indicator) { - pos++; - } - _pat._table_id = data_p[pos]; - pos++; - _pat._section_syntax_indicator = (data_p[pos]>>7)&0x01; - // skip 3 bits of 1 zero and 2 reserved - _pat._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; - pos += 2; - _pat._transport_stream_id = (data_p[pos]<<8)|data_p[pos+1]; - pos += 2; - // reserved 2 bits - _pat._version_number = (data_p[pos]&0x3E)>>1; - _pat._current_next_indicator = data_p[pos]&0x01; - pos++; - _pat._section_number = data_p[pos]; - pos++; - _pat._last_section_number = data_p[pos]; - - if (_pat._table_id != 0x00) { - srt_log_error("pat table id(0x%02x) error, it must be 0x00", _pat._table_id); - return -1; - } - // PAT = section_length + 3 - if((188 - npos) <= (_pat._section_length + 3)) { - srt_log_error("pat _section_length(%d) error, the left len:%d", _pat._section_length, (188 - npos)); - return -1; - } - pos++; - _pat._pid_vec.clear(); - for (;pos+4 <= _pat._section_length-5-4+9 + npos;) { // 4:CRC, 5:follow section_length item rpos + 4(following unit length) section_length + 9(above field and unit_start_first_byte ) - PID_INFO pid_info; - //program_number 16 uimsbf - pid_info._program_number = data_p[pos]<<8|data_p[pos+1]; - pos += 2; -// reserved 3 bslbf - - if (pid_info._program_number == 0) { -// // network_PID 13 uimsbf - pid_info._network_id = (data_p[pos]<<8|data_p[pos+1])&0x1FFF; - pos += 2; - } - else { -// // program_map_PID 13 uimsbf - pid_info._pid = (data_p[pos]<<8|data_p[pos+1])&0x1FFF; - pos += 2; - } - _pat._pid_vec.push_back(pid_info); - // network_PID and program_map_PID save to list - } -// CRC_32 use pat to calc crc32, eq - pos += 4; - }else if(ts_header_info._PID == 0x01){ - // CAT // conditional access table - }else if(ts_header_info._PID == 0x02){ - //TSDT // transport stream description table - }else if(ts_header_info._PID == 0x03){ - //IPMP // IPMP control information table - // 0x0004-0x000F Reserved - // 0x0010-0x1FFE May be assigned as network_PID, Program_map_PID, elementary_PID, or for other purposes - }else if(ts_header_info._PID == 0x11){ - // SDT // https://en.wikipedia.org/wiki/Service_Description_Table / https://en.wikipedia.org/wiki/MPEG_transport_stream - }else if(is_pmt(ts_header_info._PID)) { - if(ts_header_info._payload_unit_start_indicator) - pos++; - _pmt._table_id = data_p[pos]; - pos++; - _pmt._section_syntax_indicator = (data_p[pos]>>7)&0x01; - // skip 3 bits of 1 zero and 2 reserved - _pmt._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; - pos += 2; - _pmt._program_number = (data_p[pos]<<8)|data_p[pos+1]; - pos += 2; - // reserved 2 bits - _pmt._version_number = (data_p[pos]&0x3E)>>1; - _pmt._current_next_indicator = data_p[pos]&0x01; - pos++; - _pmt._section_number = data_p[pos]; - pos++; - _pmt._last_section_number = data_p[pos]; - pos++; - // skip 3 bits for reserved 3 bslbf - _pmt._PCR_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //PCR_PID 13 uimsbf - pos += 2; - - //reserved 4 bslbf - _pmt._program_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF;//program_info_length 12 uimsbf - pos += 2; - - //0x02, // TS_program_map_section - if (_pmt._table_id != 0x02) { - srt_log_error("pmt tableid(0x%02x) error, it must be 0x02", _pmt._table_id) - return -1; - } - memcpy(_pmt._dscr, data_p+pos, _pmt._program_info_length); -// for (i = 0; i < N; i++) { -// descriptor() -// } - pos += _pmt._program_info_length; - _pmt._stream_pid_vec.clear(); - _pmt._pid2steamtype.clear(); - - for (; pos + 5 <= _pmt._section_length + 4 - 4 + npos; ) { // pos(above field length) i+5(following unit length) section_length +3(PMT begin three bytes)+1(payload_unit_start_indicator) -4(crc32) - STREAM_PID_INFO pid_info; - pid_info._stream_type = data_p[pos];//stream_type 8 uimsbf 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video - pos++; - //reserved 3 bslbf - pid_info._elementary_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //elementary_PID 13 uimsbf - pos += 2; - //reserved 4 bslbf - pid_info._ES_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; //ES_info_length 12 uimsbf - pos += 2; - if( pos + pid_info._ES_info_length > _pmt._section_length + 4 - 4 + npos ) - break; - int absES_info_length = pos + pid_info._ES_info_length; - for (; pos< absES_info_length; ) { - //descriptor() - int descriptor_tag = data_p[pos]; - (void)descriptor_tag; - pos++; - int descriptor_length = data_p[pos]; - pos++; - memcpy(pid_info._dscr, data_p + pos, descriptor_length); - pos += descriptor_length; - } - // save program_number(stream num) elementary_PID(PES PID) stream_type(stream codec) - _pmt._stream_pid_vec.push_back(pid_info); - _pmt._pid2steamtype.insert(std::make_pair((unsigned short)pid_info._elementary_PID, pid_info._stream_type)); - } - pos += 4;//CRC_32 - }else if(ts_header_info._PID == 0x0042){ - // USER - }else if(ts_header_info._PID == 0x1FFF){ - // Null packet - }else{//pes packet or pure data packet - //bool isFound = false; - for (size_t i = 0; i < _pmt._stream_pid_vec.size(); i++) { - if(ts_header_info._PID == _pmt._stream_pid_vec[i]._elementary_PID){ - //isFound = true; - if(ts_header_info._payload_unit_start_indicator){ - unsigned char* ret_data_p = nullptr; - size_t ret_size = 0; - uint64_t dts = 0; - uint64_t pts = 0; - - //callback last media data in data buffer - int err_code = on_callback(callback, _last_pid, key_path, _last_dts, _last_pts); - if (err_code != 0) - return err_code; - - int ret = pes_parse(data_p+npos, npos, &ret_data_p, ret_size, dts, pts); - if (ret > 188) { - srt_log_error("pes length(%d) error", ret); - return -1; - } - - _last_pts = pts; - _last_dts = (dts == 0) ? pts : dts; - - if ((ret_data_p != nullptr) && (ret_size > 0)) { - insert_into_databuf(ret_data_p, ret_size, key_path, ts_header_info._PID); - } - }else{ - //fwrite(p, 1, 188-(npos+pos), pes_info[i].fd); - insert_into_databuf(data_p + npos, 188-npos, key_path, ts_header_info._PID); - } - } - } - } - } - - return 0; -} -int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback) -{ - int ret = -1; - std::string path; - - if (!data_ptr || (data_ptr->data_len() < 188) || (data_ptr->data_len()%188 != 0)) - { - return -1; - } - - unsigned int count = data_ptr->data_len()/188; - path = data_ptr->get_path(); - for (unsigned int index = 0; index < count; index++) - { - unsigned char* data = data_ptr->get_data() + 188*index; - if (data[0] != 0x47) { - continue; - } - ret = decode_unit(data, path, callback); - if (ret != 0) // srs_error_code is positive - { - break; - } - } - return ret; -} - -void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid) { - _last_pid = pid; - _data_total += data_size; - _data_buffer_vec.push_back(std::make_shared(data_p, data_size, key_path)); - return; -} - -int ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path, - uint64_t dts, uint64_t pts) { - if ((_data_total <=0 ) || (_data_buffer_vec.empty())) { - return 0; - } - - auto iter = _pmt._pid2steamtype.find(pid); - if (iter == _pmt._pid2steamtype.end()) { - return 0; - } - unsigned char stream_type = iter->second; - auto total_data_ptr = std::make_shared(_data_total, key_path); - size_t pos = 0; - - for (size_t index = 0; index < _data_buffer_vec.size(); index++) { - memcpy(total_data_ptr->get_data() + pos, - _data_buffer_vec[index]->get_data(), - _data_buffer_vec[index]->data_len()); - pos += _data_buffer_vec[index]->data_len(); - } - _data_buffer_vec.clear(); - _data_total = 0; - - return callback->on_data_callback(total_data_ptr, stream_type, dts, pts); -} - -bool ts_demux::is_pmt(unsigned short pid) { - for (size_t index = 0; index < _pat._pid_vec.size(); index++) { - if (_pat._pid_vec[index]._program_number != 0) { - if (_pat._pid_vec[index]._pid == pid) { - return true; - } - } - } - return false; -} - - -int ts_demux::pes_parse(unsigned char* p, size_t npos, - unsigned char** ret_pp, size_t& ret_size, - uint64_t& dts, uint64_t& pts) { - int pos = 0; - int packet_start_code_prefix = (p[pos]<<16)|(p[pos+1]<<8)|p[pos+2]; //packet_start_code_prefix 24 bslbf - pos += 3; - int stream_id = p[pos]; //stream_id 8 uimsbf - pos++; - - int PES_packet_length = ((unsigned int)p[pos]<<8)|p[pos+1]; //PES_packet_length 16 uimsbf - (void)PES_packet_length; - pos += 2; - - if (0x00000001 != packet_start_code_prefix) { - srt_log_error("pes packet start code prefix(%06x) error, it must be 0x00 00 01", packet_start_code_prefix); - return 255; - } - if (stream_id != 188//program_stream_map 1011 1100 - && stream_id != 190//padding_stream 1011 1110 - && stream_id != 191//private_stream_2 1011 1111 - && stream_id != 240//ECM 1111 0000 - && stream_id != 241//EMM 1111 0001 - && stream_id != 255//program_stream_directory 1111 1111 - && stream_id != 242//DSMCC_stream 1111 0010 - && stream_id != 248//ITU-T Rec. H.222.1 type E stream 1111 1000 - ) - { - if (0x80 != (p[pos] & 0xc0)) { - srt_log_error("the first 2 bits:0x%02x error, it must be 0x80.", (p[pos] & 0xc0)); - return 255; - } - //skip 2bits//'10' 2 bslbf - int PES_scrambling_control = (p[pos]&30)>>4; //PES_scrambling_control 2 bslbf - (void)PES_scrambling_control; - int PES_priority = (p[pos]&0x08)>>3; //PES_priority 1 bslbf - (void)PES_priority; - int data_alignment_indicator = (p[pos]&0x04)>>2;//data_alignment_indicator 1 bslbf - (void)data_alignment_indicator; - int copyright = (p[pos]&0x02)>>1; //copyright 1 bslbf - (void)copyright; - int original_or_copy = (p[pos]&0x01);//original_or_copy 1 bslbf - (void)original_or_copy; - pos++; - int PTS_DTS_flags = (p[pos]&0xC0)>>6; //PTS_DTS_flags 2 bslbf - int ESCR_flag = (p[pos]&0x20)>>5; // ESCR_flag 1 bslbf - int ES_rate_flag = (p[pos]&0x10)>>4;//ES_rate_flag 1 bslbf - int DSM_trick_mode_flag = (p[pos]&0x08)>>3;//DSM_trick_mode_flag 1 bslbf - int additional_copy_info_flag = (p[pos]&0x04)>>2; //additional_copy_info_flag 1 bslbf - int PES_CRC_flag = (p[pos]&0x02)>>1; //PES_CRC_flag 1 bslbf - int PES_extension_flag = (p[pos]&0x01);//PES_extension_flag 1 bslbf - pos++; - int PES_header_data_length = p[pos]; //PES_header_data_length 8 uimsbf - (void)PES_header_data_length; - pos++; - - if (PTS_DTS_flags == 2) { - // skip 4 bits '0010' 4 bslbf - // PTS [32..30] 3 bslbf - // marker_bit 1 bslbf - // PTS [29..15] 15 bslbf - // marker_bit 1 bslbf - // PTS [14..0] 15 bslbf - // marker_bit 1 bslbf - pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F); - pos += 5; - } - if (PTS_DTS_flags == 3) { - // '0011' 4 bslbf - // PTS [32..30] 3 bslbf - // marker_bit 1 bslbf - //PTS [29..15] 15 bslbf - //marker_bit 1 bslbf - // PTS [14..0] 15 bslbf - // marker_bit 1 bslbf - pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F); - pos += 5; - // '0001' 4 bslbf - // DTS [32..30] 3 bslbf - // marker_bit 1 bslbf - // DTS [29..15] 15 bslbf - // marker_bit 1 bslbf - // DTS [14..0] 15 bslbf - // marker_bit 1 bslbf - dts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F); - pos += 5; - } - if (ESCR_flag == 1) { - // reserved 2 bslbf - // ESCR_base[32..30] 3 bslbf - // marker_bit 1 bslbf - // ESCR_base[29..15] 15 bslbf - // marker_bit 1 bslbf - // ESCR_base[14..0] 15 bslbf - // marker_bit 1 bslbf - // ESCR_extension 9 uimsbf - // marker_bit 1 bslbf - uint64_t ESCR_base = ((((uint64_t)p[pos] >> 3) & 0x07) << 30) | (((uint64_t)p[pos] & 0x03) << 28) | ((uint64_t)p[pos + 1] << 20) | ((((uint64_t)p[pos + 2] >> 3) & 0x1F) << 15) | (((uint64_t)p[pos + 2] & 0x3) << 13) | ((uint64_t)p[pos + 3] << 5) | ((p[pos + 4] >> 3) & 0x1F); - int ESCR_extension = ((p[pos + 4] & 0x03) << 7) | ((p[pos + 5] >> 1) & 0x7F); - (void)ESCR_base; - (void)ESCR_extension; - pos += 6; - } - if (ES_rate_flag == 1) { - // marker_bit 1 bslbf - // ES_rate 22 uimsbf - // marker_bit 1 bslbf - int ES_rate = (p[pos]&0x7F)<<15 | (p[pos+1])<<7 | (p[pos+2]&0x7F)>>1; - (void)ES_rate; - pos += 3; - } - if (DSM_trick_mode_flag == 1) { // ignore - int trick_mode_control = (p[pos]&0xE0)>>5;//trick_mode_control 3 uimsbf - if ( trick_mode_control == 0/*fast_forward*/ ) { - // field_id 2 bslbf - // intra_slice_refresh 1 bslbf - // frequency_truncation 2 bslbf - } - else if ( trick_mode_control == 1/*slow_motion*/ ) { - //rep_cntrl 5 uimsbf - } - else if ( trick_mode_control == 2/*freeze_frame*/ ) { - // field_id 2 uimsbf - // reserved 3 bslbf - } - else if ( trick_mode_control == 3/*fast_reverse*/ ) { - // field_id 2 bslbf - // intra_slice_refresh 1 bslbf - // frequency_truncation 2 bslbf - }else if ( trick_mode_control == 4/*slow_reverse*/ ) { - // rep_cntrl 5 uimsbf - } - else{ - //reserved 5 bslbf - } - pos++; - } - if ( additional_copy_info_flag == 1) { // ignore - // marker_bit 1 bslbf - // additional_copy_info 7 bslbf - pos++; - } - if ( PES_CRC_flag == 1) { // ignore - // previous_PES_packet_CRC 16 bslbf - pos += 2; - } - if ( PES_extension_flag == 1) { // ignore - int PES_private_data_flag = (p[pos]&0x80)>>7;// PES_private_data_flag 1 bslbf - int pack_header_field_flag = (p[pos]&0x40)>>6;// pack_header_field_flag 1 bslbf - int program_packet_sequence_counter_flag = (p[pos]&0x20)>>5;// program_packet_sequence_counter_flag 1 bslbf - int P_STD_buffer_flag = (p[pos]&0x10)>>4; // P-STD_buffer_flag 1 bslbf - // reserved 3 bslbf - int PES_extension_flag_2 = (p[pos]&0x01);// PES_extension_flag_2 1 bslbf - pos++; - - if ( PES_private_data_flag == 1) { - // PES_private_data 128 bslbf - pos += 16; - } - if (pack_header_field_flag == 1) { - // pack_field_length 8 uimsbf - // pack_header() - } - if (program_packet_sequence_counter_flag == 1) { - // marker_bit 1 bslbf - // program_packet_sequence_counter 7 uimsbf - // marker_bit 1 bslbf - // MPEG1_MPEG2_identifier 1 bslbf - // original_stuff_length 6 uimsbf - pos += 2; - } - if ( P_STD_buffer_flag == 1) { - // '01' 2 bslbf - // P-STD_buffer_scale 1 bslbf - // P-STD_buffer_size 13 uimsbf - pos += 2; - } - if ( PES_extension_flag_2 == 1) { - // marker_bit 1 bslbf - int PES_extension_field_length = (p[pos]&0x7F);// PES_extension_field_length 7 uimsbf - pos++; - for (int i = 0; i < PES_extension_field_length; i++) { - // reserved 8 bslbf - pos++; - } - } - } - -// for (int i = 0; i < N1; i++) { - //stuffing_byte 8 bslbf -// rpos++; -// } -// for (int i = 0; i < N2; i++) { - //PES_packet_data_byte 8 bslbf -// rpos++; -// } - *ret_pp = p+pos; - ret_size = 188-(npos+pos); - } - else if ( stream_id == 188//program_stream_map 1011 1100 BC - || stream_id == 191//private_stream_2 1011 1111 BF - || stream_id == 240//ECM 1111 0000 F0 - || stream_id == 241//EMM 1111 0001 F1 - || stream_id == 255//program_stream_directory 1111 1111 FF - || stream_id == 242//DSMCC_stream 1111 0010 F2 - || stream_id == 248//ITU-T Rec. H.222.1 type E stream 1111 1000 F8 - ) { -// for (i = 0; i < PES_packet_length; i++) { - //PES_packet_data_byte 8 bslbf -// rpos++; -// } - *ret_pp = p+pos; - ret_size = 188-(npos+pos); - //fwrite(p, 1, 188-(npos+rpos), fd); - } - else if ( stream_id == 190//padding_stream 1011 1110 - ) { -// for (i = 0; i < PES_packet_length; i++) { - // padding_byte 8 bslbf -// rpos++; - *ret_pp = p+pos; - ret_size = 188-(npos+pos); -// } - } - - return pos; -} diff --git a/trunk/src/srt/ts_demux.hpp b/trunk/src/srt/ts_demux.hpp deleted file mode 100644 index 06a888c6c..000000000 --- a/trunk/src/srt/ts_demux.hpp +++ /dev/null @@ -1,247 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#ifndef TS_DEMUX_H -#define TS_DEMUX_H - -#include - -#include "srt_data.hpp" -#include -#include -#include -#include - -/* mpegts stream type in ts pmt -Value Description -0x00 ITU-T | ISO/IEC Reserved -0x01 ISO/IEC 11172-2 Video (mpeg video v1) -0x02 ITU-T Rec. H.262 | ISO/IEC 13818-2 Video(mpeg video v2)or ISO/IEC 11172-2 constrained parameter video stream -0x03 ISO/IEC 11172-3 Audio (MPEG 1 Audio codec Layer I, Layer II and Layer III audio specifications) -0x04 ISO/IEC 13818-3 Audio (BC Audio Codec) -0x05 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 private_sections -0x06 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 PES packets containing private data -0x07 ISO/IEC 13522 MHEG -0x08 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Annex A DSM-CC -0x09 ITU-T Rec. H.222.1 -0x0A ISO/IEC 13818-6 type A -0x0B ISO/IEC 13818-6 type B -0x0C ISO/IEC 13818-6 type C -0x0D ISO/IEC 13818-6 type D -0x0E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 auxiliary -0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax -0x10 ISO/IEC 14496-2 Visual -0x11 ISO/IEC 14496-3 Audio with the LATM transport syntax as defined in ISO/IEC 14496-3/Amd.1 -0x12 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in PES packets -0x13 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in ISO/IEC 14496_sections -0x14 ISO/IEC 13818-6 Synchronized Download Protocol -0x15 Metadata carried in PES packets -0x16 Metadata carried in metadata_sections -0x17 Metadata carried in ISO/IEC 13818-6 Data Carousel -0x18 Metadata carried in ISO/IEC 13818-6 Object Carousel -0x19 Metadata carried in ISO/IEC 13818-6 Synchronized Download Protocol -0x1A IPMP stream (defined in ISO/IEC 13818-11, MPEG-2 IPMP) -0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video (h.264) -0x1C ISO/IEC 14496-3 Audio, without using any additional transport syntax, such as DST, ALS and SLS -0x1D ISO/IEC 14496-17 Text -0x1E Auxiliary video stream as defined in ISO/IEC 23002-3 (AVS) -0x1F-0x7E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Reserved -0x7F IPMP stream 0x80-0xFF User Private -*/ -#define STREAM_TYPE_VIDEO_MPEG1 0x01 -#define STREAM_TYPE_VIDEO_MPEG2 0x02 -#define STREAM_TYPE_AUDIO_MPEG1 0x03 -#define STREAM_TYPE_AUDIO_MPEG2 0x04 -#define STREAM_TYPE_PRIVATE_SECTION 0x05 -#define STREAM_TYPE_PRIVATE_DATA 0x06 -#define STREAM_TYPE_AUDIO_AAC 0x0f -#define STREAM_TYPE_AUDIO_AAC_LATM 0x11 -#define STREAM_TYPE_VIDEO_MPEG4 0x10 -#define STREAM_TYPE_METADATA 0x15 -#define STREAM_TYPE_VIDEO_H264 0x1b -#define STREAM_TYPE_VIDEO_HEVC 0x24 -#define STREAM_TYPE_VIDEO_CAVS 0x42 -#define STREAM_TYPE_VIDEO_VC1 0xea -#define STREAM_TYPE_VIDEO_DIRAC 0xd1 - -#define STREAM_TYPE_AUDIO_AC3 0x81 -#define STREAM_TYPE_AUDIO_DTS 0x82 -#define STREAM_TYPE_AUDIO_TRUEHD 0x83 -#define STREAM_TYPE_AUDIO_EAC3 0x87 - -class ts_media_data_callback_I { -public: - virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0; -}; - -typedef std::shared_ptr TS_DATA_CALLBACK_PTR; - -class adaptation_field { -public: - adaptation_field(){}; - ~adaptation_field(){}; - -public: - unsigned char _adaptation_field_length; - - unsigned char _discontinuity_indicator:1; - unsigned char _random_access_indicator:1; - unsigned char _elementary_stream_priority_indicator:1; - unsigned char _PCR_flag:1; - unsigned char _OPCR_flag:1; - unsigned char _splicing_point_flag:1; - unsigned char _transport_private_data_flag:1; - unsigned char _adaptation_field_extension_flag:1; - - //if(PCR_flag == '1') - unsigned long _program_clock_reference_base;//33 bits - unsigned short _program_clock_reference_extension;//9bits - //if (OPCR_flag == '1') - unsigned long _original_program_clock_reference_base;//33 bits - unsigned short _original_program_clock_reference_extension;//9bits - //if (splicing_point_flag == '1') - unsigned char _splice_countdown; - //if (transport_private_data_flag == '1') - unsigned char _transport_private_data_length; - unsigned char _private_data_byte[256]; - //if (adaptation_field_extension_flag == '1') - unsigned char _adaptation_field_extension_length; - unsigned char _ltw_flag; - unsigned char _piecewise_rate_flag; - unsigned char _seamless_splice_flag; - unsigned char _reserved0; - //if (ltw_flag == '1') - unsigned short _ltw_valid_flag:1; - unsigned short _ltw_offset:15; - //if (piecewise_rate_flag == '1') - unsigned int _piecewise_rate;//22bits - //if (seamless_splice_flag == '1') - unsigned char _splice_type;//4bits - unsigned char _DTS_next_AU1;//3bits - unsigned char _marker_bit1;//1bit - unsigned short _DTS_next_AU2;//15bit - unsigned char _marker_bit2;//1bit - unsigned short _DTS_next_AU3;//15bit - unsigned char _marker_bit3;//1bit -}; - -class ts_header { -public: - ts_header(){} - ~ts_header(){} - -public: - unsigned char _sync_byte; - - unsigned short _transport_error_indicator:1; - unsigned short _payload_unit_start_indicator:1; - unsigned short _transport_priority:1; - unsigned short _PID:13; - - unsigned char _transport_scrambling_control:2; - unsigned char _adaptation_field_control:2; - unsigned char _continuity_counter:4; - - adaptation_field _adaptation_field_info; -}; - -typedef struct { - unsigned short _program_number; - unsigned short _pid; - unsigned short _network_id; -} PID_INFO; - -class pat_info { -public: - pat_info(){}; - ~pat_info(){}; - -public: - unsigned char _table_id; - - unsigned short _section_syntax_indicator:1; - unsigned short _reserved0:1; - unsigned short _reserved1:2; - unsigned short _section_length:12; - - unsigned short _transport_stream_id; - - unsigned char _reserved3:2; - unsigned char _version_number:5; - unsigned char _current_next_indicator:1; - - unsigned char _section_number; - unsigned char _last_section_number; - std::vector _pid_vec; -}; - -typedef struct { - unsigned char _stream_type; - unsigned short _reserved1:3; - unsigned short _elementary_PID:13; - unsigned short _reserved:4; - unsigned short _ES_info_length; - unsigned char _dscr[4096]; - unsigned int _crc_32; -} STREAM_PID_INFO; - -class pmt_info { -public: - pmt_info(){}; - ~pmt_info(){}; -public: - unsigned char _table_id; - unsigned short _section_syntax_indicator:1; - unsigned short _reserved1:1; - unsigned short _reserved2:2; - unsigned short _section_length:12; - unsigned short _program_number:16; - unsigned char _reserved:2; - unsigned char _version_number:5; - unsigned char _current_next_indicator:5; - unsigned char _section_number; - unsigned char _last_section_number; - unsigned short _reserved3:3; - unsigned short _PCR_PID:13; - unsigned short _reserved4:4; - unsigned short _program_info_length:12; - unsigned char _dscr[4096]; - - std::unordered_map _pid2steamtype; - std::vector _stream_pid_vec; -}; - -class ts_demux { -public: - ts_demux(); - ~ts_demux(); - - int decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback); - -private: - int decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback); - bool is_pmt(unsigned short pmt_id); - int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size, - uint64_t& dts, uint64_t& pts); - void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid); - int on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, - std::string key_path, uint64_t dts, uint64_t pts); - -private: - std::string _key_path;//only for srt - - pat_info _pat; - pmt_info _pmt; - std::vector _data_buffer_vec; - size_t _data_total; - unsigned short _last_pid; - uint64_t _last_dts; - uint64_t _last_pts; -}; - -typedef std::shared_ptr TS_DEMUX_PTR; - -#endif diff --git a/trunk/src/srt/ts_demux_test.cpp b/trunk/src/srt/ts_demux_test.cpp deleted file mode 100644 index e38679640..000000000 --- a/trunk/src/srt/ts_demux_test.cpp +++ /dev/null @@ -1,61 +0,0 @@ -// -// Copyright (c) 2013-2021 The SRS Authors -// -// SPDX-License-Identifier: MIT or MulanPSL-2.0 -// - -#include "ts_demux.hpp" -#include -#include - -#define TS_MAX 188 - -class media_data_get : public ts_media_data_callback_I { -public: - media_data_get() {}; - virtual ~media_data_get() {}; - -public: - virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type - , uint64_t dts, uint64_t pts) { - printf("media type:%d, data len:%d, key_path:%s, dts:%lu(%lu), pts:%lu(%lu)\r\n", - media_type, data_ptr->data_len(), data_ptr->get_path().c_str(), dts, dts/90, pts, pts/90); - FILE* file_p; - char filename[80]; - - sprintf(filename, "%u.media", media_type); - file_p = fopen(filename, "ab+"); - if (file_p) { - fwrite(data_ptr->get_data(), data_ptr->data_len(), 1, file_p); - fclose(file_p); - } - return; - } -}; - -int main(int argn, char** argv) { - unsigned char data[TS_MAX]; - ts_demux demux_obj; - auto callback_ptr = std::make_shared(); - FILE* file_p; - if (argn < 2) { - printf("please input ts name.\r\n"); - return 0; - } - - const char* file_name = argv[1]; - printf("input ts name:%s.\r\n", file_name); - - file_p = fopen(file_name, "r"); - fseek(file_p, 0L, SEEK_END); /* 定位到文件末尾 */ - size_t flen = ftell(file_p); /* 得到文件大小 */ - fseek(file_p, 0L, SEEK_SET); /* 定位到文件开头 */ - - do { - fread(data, TS_MAX, 1, file_p); - auto input_ptr = std::make_shared((unsigned char*)data, (unsigned int)TS_MAX, std::string("live/shiwei")); - demux_obj.decode(input_ptr, callback_ptr); - flen -= TS_MAX; - } while(flen > 0); - return 1; -}