SRT: Eliminate unused files for SRT.

pull/3089/head
winlin 3 years ago
parent cf4fbb887b
commit bb91297071

@ -1,263 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#include "srt_conn.hpp"
#include "time_help.hpp"
#include "stringex.hpp"
#include "srt_log.hpp"
#include <vector>
#include <srs_protocol_utility.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_config.hpp>
bool is_streamid_valid(const std::string& streamid) {
if (streamid.empty()) {
return false;
}
size_t pos = streamid.find(" ");
if (pos != streamid.npos) {
return false;
}
int mode;
std::string subpath;
std::string vhost;
// Parse the stream info from streamid, see https://github.com/ossrs/srs/issues/2893
bool ret = get_streamid_info(streamid, mode, vhost, subpath);
if (!ret) {
return false;
}
std::vector<std::string> info_vec;
string_split(subpath, "/", info_vec);
// TODO: FIXME: Should fail at parsing the original SRT URL.
if (info_vec.size() != 2) {
srt_log_warn("path format must be appname/stream?key=value...");
return false;
}
for (auto item : info_vec) {
if (item.empty()) {
return false;
}
pos = item.find(" ");
if (pos != item.npos) {
return false;
}
}
return true;
}
bool get_key_value(const std::string& info, std::string& key, std::string& value) {
size_t pos = info.find("=");
if (pos == info.npos) {
return false;
}
key = info.substr(0, pos);
value = info.substr(pos+1);
if (key.empty() || value.empty()) {
return false;
}
return true;
}
// See streamid of https://github.com/ossrs/srs/issues/2893
// TODO: FIMXE: We should parse SRT streamid to URL object, rather than a HTTP url subpath.
bool get_streamid_info(const std::string& streamid, int& mode, std::string& vhost, std::string& url_subpath)
{
mode = PULL_SRT_MODE;
size_t pos = streamid.find("#!::");
if (pos != 0) {
pos = streamid.find("/");
if (pos == streamid.npos) {
url_subpath = _srs_config->get_default_app_name() + "/" + streamid;
return true;
}
url_subpath = streamid;
return true;
}
//SRT url supports multiple QueryStrings, which are passed to RTMP to realize authentication and other capabilities
//@see https://github.com/ossrs/srs/issues/2893
std::string params;
std::string real_streamid;
real_streamid = streamid.substr(4);
// Compatible with previous auth querystring, like this one:
// srt://127.0.0.1:10080?streamid=#!::h=live/livestream?secret=xxx,m=publish
real_streamid = srs_string_replace(real_streamid, "?", ",");
std::map<std::string, std::string> query;
srs_parse_query_string(real_streamid, query);
for (std::map<std::string, std::string>::iterator it = query.begin(); it != query.end(); ++it) {
if (it->first == "h") {
std::string host = it->second;
size_t r0 = host.find("/");
size_t r1 = host.rfind("/");
if (r0 != std::string::npos && r0 != std::string::npos) {
// Compatible with previous style, see https://github.com/ossrs/srs/issues/2893#compatible
// srt://127.0.0.1:10080?streamid=#!::h=live/livestream,m=publish
// srt://127.0.0.1:10080?streamid=#!::h=live/livestream,m=request
// srt://127.0.0.1:10080?streamid=#!::h=srs.srt.com.cn/live/livestream,m=publish
if (r0 != r1) {
// We got vhost in host.
url_subpath = host.substr(r0 + 1);
host = host.substr(0, r0);
params.append("vhost=");
params.append(host);
params.append("&");
vhost = host;
} else {
// Only stream in host.
url_subpath = host;
}
} else {
// New URL style, see https://github.com/ossrs/srs/issues/2893#solution
// srt://host.com:10080?streamid=#!::h=host.com,r=app/stream,key1=value1,key2=value2
// srt://1.2.3.4:10080?streamid=#!::h=host.com,r=app/stream,key1=value1,key2=value2
// srt://1.2.3.4:10080?streamid=#!::r=app/stream,key1=value1,key2=value2
params.append("vhost=");
params.append(host);
params.append("&");
vhost = host;
}
} else if (it->first == "r") {
url_subpath = it->second;
} else if (it->first == "m") {
std::string mode_str = it->second; // support m=publish or m=request
std::transform(it->second.begin(), it->second.end(), mode_str.begin(), ::tolower);
if (mode_str == "publish") {
mode = PUSH_SRT_MODE;
} else if (mode_str == "request") {
mode = PULL_SRT_MODE;
} else {
srt_log_warn("unknown mode_str:%s", mode_str.c_str());
return false;
}
} else {
params.append(it->first);
params.append("=");
params.append(it->second);
params.append("&");
}
}
if (url_subpath.empty()) {
return false;
}
if (!params.empty()) {
url_subpath.append("?");
url_subpath.append(params);
url_subpath.pop_back(); // remove last '&'
}
return true;
}
srt_conn::srt_conn(SRTSOCKET conn_fd, const std::string& streamid):_conn_fd(conn_fd),
_streamid(streamid),
write_fail_cnt_(0)
{
get_streamid_info(streamid, _mode, _vhost, _url_subpath);
_update_timestamp = now_ms();
if (_vhost.empty()) {
_vhost = "__default_host__";
}
srt_log_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s",
streamid.c_str(), _mode, _url_subpath.c_str(), _vhost.c_str());
}
srt_conn::~srt_conn() {
close();
}
std::string srt_conn::get_vhost() {
return _vhost;
}
void srt_conn::update_timestamp(long long now_ts) {
_update_timestamp = now_ts;
}
long long srt_conn::get_last_ts() {
return _update_timestamp;
}
void srt_conn::close() {
if (_conn_fd == SRT_INVALID_SOCK) {
return;
}
srt_close(_conn_fd);
_conn_fd = SRT_INVALID_SOCK;
}
SRTSOCKET srt_conn::get_conn() {
return _conn_fd;
}
int srt_conn::get_mode() {
return _mode;
}
std::string srt_conn::get_streamid() {
return _streamid;
}
std::string srt_conn::get_path() {
if (!_url_path.empty()) {
return _url_path;
}
size_t pos = _url_subpath.find("?");
_url_path = (pos != std::string::npos) ? _url_subpath.substr(0, pos) : _url_subpath;
return _url_path;
}
std::string srt_conn::get_subpath() {
return _url_subpath;
}
int srt_conn::read(unsigned char* data, int len) {
int ret = 0;
ret = srt_recv(_conn_fd, (char*)data, len);
if (ret <= 0) {
srt_log_error("srt read error:%d, socket fd:%d", ret, _conn_fd);
return ret;
}
return ret;
}
int srt_conn::write(unsigned char* data, int len) {
int ret = 0;
ret = srt_send(_conn_fd, (char*)data, len);
if (ret <= 0) {
srt_log_error("srt write error:%d, socket fd:%d", ret, _conn_fd);
write_fail_cnt_++;
return ret;
}
write_fail_cnt_ = 0;
return ret;
}
int srt_conn::get_write_fail_count() {
return write_fail_cnt_;
}

@ -1,63 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef SRT_CONN_H
#define SRT_CONN_H
#include <srs_core.hpp>
#include "stringex.hpp"
#include <srt/srt.h>
#include <thread>
#include <memory>
#include <string>
#include <vector>
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
#define ERR_SRT_MODE 0x00
#define PULL_SRT_MODE 0x01
#define PUSH_SRT_MODE 0x02
bool is_streamid_valid(const std::string& streamid);
bool get_key_value(const std::string& info, std::string& key, std::string& value);
bool get_streamid_info(const std::string& streamid, int& mode, std::string& vhost, std::string& url_subpash);
class srt_conn {
public:
srt_conn(SRTSOCKET conn_fd, const std::string& streamid);
~srt_conn();
void close();
SRTSOCKET get_conn();
int get_mode();
std::string get_streamid();
std::string get_path();
std::string get_subpath();
std::string get_vhost();
int read(unsigned char* data, int len);
int write(unsigned char* data, int len);
void update_timestamp(long long now_ts);
long long get_last_ts();
int get_write_fail_count();
private:
SRTSOCKET _conn_fd;
std::string _streamid;
std::string _url_path;
std::string _url_subpath;
std::string _vhost;
int _mode;
long long _update_timestamp;
int write_fail_cnt_;
};
typedef std::shared_ptr<srt_conn> SRT_CONN_PTR;
#endif //SRT_CONN_H

@ -1,71 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#include "srt_data.hpp"
#include <string.h>
SRT_DATA_MSG::SRT_DATA_MSG(const std::string& path, unsigned int msg_type):_msg_type(msg_type)
,_len(0)
,_data_p(nullptr)
,_key_path(path) {
}
SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type)
,_len(len)
,_key_path(path) {
_data_p = new unsigned char[len];
memset(_data_p, 0, len);
}
SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type)
,_len(len)
,_key_path(path)
{
_data_p = new unsigned char[len];
memcpy(_data_p, data_p, len);
}
SRT_DATA_MSG::SRT_DATA_MSG(LOGGER_LEVEL log_level, const std::string& log_content): _msg_type(SRT_MSG_LOG_TYPE)
,_log_content(log_content)
,_log_level(log_level)
{
}
SRT_DATA_MSG::~SRT_DATA_MSG() {
if (_data_p && (_len > 0)) {
delete[] _data_p;
}
}
unsigned int SRT_DATA_MSG::msg_type() {
return _msg_type;
}
void SRT_DATA_MSG::set_msg_type(unsigned int msg_type) {
_msg_type = msg_type;
}
std::string SRT_DATA_MSG::get_path() {
return _key_path;
}
unsigned int SRT_DATA_MSG::data_len() {
return _len;
}
unsigned char* SRT_DATA_MSG::get_data() {
return _data_p;
}
LOGGER_LEVEL SRT_DATA_MSG::get_log_level() {
return _log_level;
}
const char* SRT_DATA_MSG::get_log_string() {
return _log_content.c_str();
}

@ -1,48 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef SRT_DATA_H
#define SRT_DATA_H
#include "srt_log.hpp"
#include <srs_core.hpp>
#include <string>
#include <memory>
#define SRT_MSG_DATA_TYPE 0x01
#define SRT_MSG_CLOSE_TYPE 0x02
#define SRT_MSG_LOG_TYPE 0x03
class SRT_DATA_MSG {
public:
SRT_DATA_MSG(const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(LOGGER_LEVEL log_level, const std::string& log_content);
~SRT_DATA_MSG();
unsigned int msg_type();
unsigned int data_len();
unsigned char* get_data();
std::string get_path();
LOGGER_LEVEL get_log_level();
const char* get_log_string();
void set_msg_type(unsigned int msg_type);
private:
unsigned int _msg_type;
unsigned int _len = 0;
unsigned char* _data_p = nullptr;
std::string _key_path;
std::string _log_content;
LOGGER_LEVEL _log_level = SRT_LOGGER_TRACE_LEVEL;
};
typedef std::shared_ptr<SRT_DATA_MSG> SRT_DATA_MSG_PTR;
#endif

@ -1,406 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#include "srt_handle.hpp"
#include "time_help.hpp"
#include "srt_log.hpp"
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
#include <srt/udt.h>
#include <stdio.h>
#include <vector>
#include <sstream>
#include <iomanip>
#include <assert.h>
#include <list>
static bool MONITOR_STATICS_ENABLE = false;
static long long MONITOR_TIMEOUT = 5000;
const unsigned int DEF_DATA_SIZE = 188*7;
const long long CHECK_ALIVE_INTERVAL = 5*1000;
const long long CHECK_ALIVE_TIMEOUT = 5*1000;
static const int SRT_WRTIE_FAIL_MAX = 10;
long long srt_now_ms = 0;
srt_handle::srt_handle(int pollid):_handle_pollid(pollid)
,_last_timestamp(0)
,_last_check_alive_ts(0) {
}
srt_handle::~srt_handle() {
}
void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) {
SRT_TRACEBSTATS mon;
srt_bstats(srtsocket, &mon, 1);
std::ostringstream output;
long long now_ul = now_ms();
if (!MONITOR_STATICS_ENABLE) {
return;
}
if (_last_timestamp == 0) {
_last_timestamp = now_ul;
return;
}
if ((now_ul - _last_timestamp) < MONITOR_TIMEOUT) {
return;
}
_last_timestamp = now_ul;
output << "======= SRT STATS: sid=" << streamid << std::endl;
output << "PACKETS SENT: " << std::setw(11) << mon.pktSent << " RECEIVED: " << std::setw(11) << mon.pktRecv << std::endl;
output << "LOST PKT SENT: " << std::setw(11) << mon.pktSndLoss << " RECEIVED: " << std::setw(11) << mon.pktRcvLoss << std::endl;
output << "REXMIT SENT: " << std::setw(11) << mon.pktRetrans << " RECEIVED: " << std::setw(11) << mon.pktRcvRetrans << std::endl;
output << "DROP PKT SENT: " << std::setw(11) << mon.pktSndDrop << " RECEIVED: " << std::setw(11) << mon.pktRcvDrop << std::endl;
output << "RATE SENDING: " << std::setw(11) << mon.mbpsSendRate << " RECEIVING: " << std::setw(11) << mon.mbpsRecvRate << std::endl;
output << "BELATED RECEIVED: " << std::setw(11) << mon.pktRcvBelated << " AVG TIME: " << std::setw(11) << mon.pktRcvAvgBelatedTime << std::endl;
output << "REORDER DISTANCE: " << std::setw(11) << mon.pktReorderDistance << std::endl;
output << "WINDOW FLOW: " << std::setw(11) << mon.pktFlowWindow << " CONGESTION: " << std::setw(11) << mon.pktCongestionWindow << " FLIGHT: " << std::setw(11) << mon.pktFlightSize << std::endl;
output << "LINK RTT: " << std::setw(9) << mon.msRTT << "ms BANDWIDTH: " << std::setw(7) << mon.mbpsBandwidth << "Mb/s " << std::endl;
output << "BUFFERLEFT: SND: " << std::setw(11) << mon.byteAvailSndBuf << " RCV: " << std::setw(11) << mon.byteAvailRcvBuf << std::endl;
srt_log_trace("\r\n%s", output.str().c_str());
return;
}
void srt_handle::add_new_puller(SRT_CONN_PTR conn_ptr, std::string stream_id) {
_conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
auto iter = _streamid_map.find(stream_id);
if (iter == _streamid_map.end()) {
std::unordered_map<SRTSOCKET, SRT_CONN_PTR> srtsocket_map;
srtsocket_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
_streamid_map.insert(std::make_pair(stream_id, srtsocket_map));
srt_log_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str());
} else {
iter->second.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
srt_log_trace("add new puller fd:%d, streamid:%s, size:%d",
conn_ptr->get_conn(), stream_id.c_str(), iter->second.size());
}
return;
}
void srt_handle::close_pull_conn(SRTSOCKET srtsocket, std::string stream_id) {
srt_log_trace("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str());
srt_epoll_remove_usock(_handle_pollid, srtsocket);
auto streamid_iter = _streamid_map.find(stream_id);
if (streamid_iter != _streamid_map.end()) {
if (streamid_iter->second.size() == 0) {
_streamid_map.erase(stream_id);
} else if (streamid_iter->second.size() == 1) {
streamid_iter->second.erase(srtsocket);
_streamid_map.erase(stream_id);
} else {
streamid_iter->second.erase(srtsocket);
}
} else {
assert(0);
}
auto conn_iter = _conn_map.find(srtsocket);
if (conn_iter != _conn_map.end()) {
_conn_map.erase(conn_iter);
return;
} else {
assert(0);
}
return;
}
SRT_CONN_PTR srt_handle::get_srt_conn(SRTSOCKET conn_srt_socket) {
SRT_CONN_PTR ret_conn;
auto iter = _conn_map.find(conn_srt_socket);
if (iter == _conn_map.end()) {
return ret_conn;
}
ret_conn = iter->second;
return ret_conn;
}
void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) {
int val_i;
int opt_len = sizeof(int);
int64_t val_i64;
int opt64_len = sizeof(int64_t);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len);
srt_log_trace("srto SRTO_LATENCY=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_PEERLATENCY, &val_i, &opt_len);
srt_log_trace("srto SRTO_PEERLATENCY=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVLATENCY, &val_i, &opt_len);
srt_log_trace("srto SRTO_RCVLATENCY=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len);
srt_log_trace("srto SRTO_SNDBUF=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len);
srt_log_trace("srto SRTO_RCVBUF=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i64, &opt64_len);
srt_log_trace("srto SRTO_MAXBW=%d", val_i64);
srt_log_trace("srt mix_correct is %s.", _srs_config->get_srt_mix_correct() ? "enable" : "disable");
srt_log_trace("srt h264 sei filter is %s.", _srs_config->get_srt_sei_filter() ? "enable" : "disable");
if (conn_ptr->get_mode() == PULL_SRT_MODE) {
add_new_puller(conn_ptr, conn_ptr->get_path());
} else {
if(add_new_pusher(conn_ptr) == false) {
srt_log_trace("push connection is repeated and rejected, fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_streamid().c_str());
conn_ptr->close();
return;
}
}
srt_log_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events);
int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);
if (ret < 0) {
srt_log_error("srt handle run add epoll error:%d", ret);
return;
}
return;
}
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& path, const std::string& subpath, SRTSOCKET conn_fd) {
SRT_CONN_PTR srt_conn_ptr;
unsigned char data[DEF_DATA_SIZE];
int ret;
srt_conn_ptr = get_srt_conn(conn_fd);
if (!srt_conn_ptr) {
srt_log_error("handle_push_data fd:%d fail to find srt connection.", conn_fd);
return;
}
if (status != SRTS_CONNECTED) {
srt_log_error("handle_push_data error status:%d fd:%d", status, conn_fd);
close_push_conn(conn_fd);
return;
}
ret = srt_conn_ptr->read(data, DEF_DATA_SIZE);
if (ret <= 0) {
srt_log_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd);
close_push_conn(conn_fd);
return;
}
srt_conn_ptr->update_timestamp(srt_now_ms);
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);
{
std::unique_lock<std::mutex> locker(srt2rtmp::_srt_error_mutex);
if (srt2rtmp::_srt_error_map.count(subpath) == 1) {
int err_code = srt2rtmp::_srt_error_map[subpath];
if (err_code != ERROR_SUCCESS) {
close_push_conn(conn_fd);
srt_log_error("handle_push_data srt to rtmp error:%d, fd:%d", err_code,conn_fd);
//todo: reset to next use, maybe update by srt2rtmp::cycle again
srt2rtmp::_srt_error_map[subpath] = ERROR_SUCCESS;
return;
}
}
}
//send data to subscriber(players)
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
auto streamid_iter = _streamid_map.find(path);
if (streamid_iter == _streamid_map.end()) {//no puler
srt_log_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd);
return;
}
srt_log_info("receive data size(%d) from pusher(%d) to pullers, count:%d",
ret, conn_fd, streamid_iter->second.size());
std::vector<SRTSOCKET> remove_vec;
for (auto puller_iter = streamid_iter->second.begin();
puller_iter != streamid_iter->second.end();
puller_iter++) {
auto player_conn = puller_iter->second;
if (!player_conn) {
srt_log_error("handle_push_data get srt connect error from fd:%d", puller_iter->first);
continue;
}
int write_ret = player_conn->write(data, ret);
srt_log_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first);
if (write_ret > 0) {
puller_iter->second->update_timestamp(srt_now_ms);
} else {
if (player_conn->get_write_fail_count() > SRT_WRTIE_FAIL_MAX) {
remove_vec.push_back(puller_iter->first);
}
}
}
for (auto item : remove_vec) {
streamid_iter->second.erase(item);
if (streamid_iter->second.empty()) {
_streamid_map.erase(streamid_iter);
}
}
return;
}
void srt_handle::check_alive() {
long long diff_t;
std::list<SRT_CONN_PTR> conn_list;
if (_last_check_alive_ts == 0) {
_last_check_alive_ts = srt_now_ms;
return;
}
diff_t = srt_now_ms - _last_check_alive_ts;
if (diff_t < CHECK_ALIVE_INTERVAL) {
return;
}
for (auto conn_iter = _conn_map.begin();
conn_iter != _conn_map.end();
conn_iter++)
{
long long timeout = srt_now_ms - conn_iter->second->get_last_ts();
if (timeout > CHECK_ALIVE_TIMEOUT) {
conn_list.push_back(conn_iter->second);
}
}
for (auto del_iter = conn_list.begin();
del_iter != conn_list.end();
del_iter++)
{
SRT_CONN_PTR conn_ptr = *del_iter;
if (conn_ptr->get_mode() == PUSH_SRT_MODE) {
srt_log_warn("check alive close pull connection fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_subpath().c_str());
close_push_conn(conn_ptr->get_conn());
} else if (conn_ptr->get_mode() == PULL_SRT_MODE) {
srt_log_warn("check alive close pull connection fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_path().c_str());
close_pull_conn(conn_ptr->get_conn(), conn_ptr->get_path());
} else {
srt_log_error("check_alive get unkown srt mode:%d, fd:%d",
conn_ptr->get_mode(), conn_ptr->get_conn());
assert(0);
}
}
}
void srt_handle::close_push_conn(SRTSOCKET srtsocket) {
auto iter = _conn_map.find(srtsocket);
if (iter != _conn_map.end()) {
SRT_CONN_PTR conn_ptr = iter->second;
auto push_iter = _push_conn_map.find(conn_ptr->get_path());
if (push_iter != _push_conn_map.end()) {
_push_conn_map.erase(push_iter);
}
_conn_map.erase(iter);
srt2rtmp::get_instance()->insert_ctrl_message(SRT_MSG_CLOSE_TYPE, conn_ptr->get_subpath());
conn_ptr->close();
}
srt_epoll_remove_usock(_handle_pollid, srtsocket);
return;
}
bool srt_handle::add_new_pusher(SRT_CONN_PTR conn_ptr) {
auto push_iter = _push_conn_map.find(conn_ptr->get_path());
if (push_iter != _push_conn_map.end()) {
return false;
}
_push_conn_map.insert(std::make_pair(conn_ptr->get_path(), conn_ptr));
_conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
srt_log_trace("srt_handle add new pusher streamid:%s, subpath:%s, sid:%s",
conn_ptr->get_streamid().c_str(), conn_ptr->get_subpath().c_str(), conn_ptr->get_path().c_str());
return true;
}
void srt_handle::handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
srt_log_info("handle_pull_data status:%d, subpath:%s, fd:%d",
status, subpath.c_str(), conn_fd);
auto conn_ptr = get_srt_conn(conn_fd);
if (!conn_ptr) {
srt_log_error("handle_pull_data fail to find fd(%d)", conn_fd);
assert(0);
return;
}
conn_ptr->update_timestamp(srt_now_ms);
}
void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd)
{
auto conn_ptr = get_srt_conn(conn_fd);
if (!conn_ptr) {
if (status != SRTS_CLOSED) {
srt_log_error("handle_srt_socket find srt connection error, fd:%d, status:%d",
conn_fd, status);
}
return;
}
std::string path = conn_ptr->get_path();
std::string subpath = conn_ptr->get_subpath();
int mode = conn_ptr->get_mode();
if (mode == PUSH_SRT_MODE) {
switch (status)
{
case SRTS_CONNECTED:
{
handle_push_data(status, path, subpath, conn_fd);
break;
}
case SRTS_BROKEN:
{
srt_log_warn("srt push disconnected event fd:%d, streamid:%s",
conn_fd, conn_ptr->get_streamid().c_str());
close_push_conn(conn_fd);
break;
}
default:
srt_log_error("push mode unkown status:%d, fd:%d", status, conn_fd);
break;
}
} else if (mode == PULL_SRT_MODE) {
switch (status)
{
case SRTS_CONNECTED:
{
handle_pull_data(status, subpath, conn_fd);
break;
}
case SRTS_BROKEN:
{
srt_log_warn("srt pull disconnected fd:%d, streamid:%s",
conn_fd, conn_ptr->get_streamid().c_str());
close_pull_conn(conn_fd, path);
break;
}
default:
srt_log_error("pull mode unkown status:%d, fd:%d", status, conn_fd);
break;
}
} else {
assert(0);
}
return;
}

@ -1,70 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef SRT_HANDLE_H
#define SRT_HANDLE_H
#include <srs_core.hpp>
#include <srt/srt.h>
#include <thread>
#include <memory>
#include <unordered_map>
#include <queue>
#include <string.h>
#include <mutex>
#include "srt_conn.hpp"
#include "srt_to_rtmp.hpp"
class srt_handle {
public:
srt_handle(int pollid);
~srt_handle();
//add new srt connection into epoll event
void add_newconn(SRT_CONN_PTR conn_ptr, int events);
//handle recv/send srt socket
void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd);
//check srt connection whether it's still alive.
void check_alive();
private:
//get srt conn object by srt socket
SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket);
void handle_push_data(SRT_SOCKSTATUS status, const std::string& path, const std::string& subpath, SRTSOCKET conn_fd);
void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
//add new puller into puller list and conn_map
void add_new_puller(SRT_CONN_PTR, std::string stream_id);
//remove pull srt from play list
void close_pull_conn(SRTSOCKET srtsocket, std::string stream_id);
//add new pusher into pusher map: <socket fd, pusher conn ptr>
bool add_new_pusher(SRT_CONN_PTR conn_ptr);
//remove push connection and remove epoll
void close_push_conn(SRTSOCKET srtsocket);
//debug statics
void debug_statics(SRTSOCKET srtsocket, const std::string& streamid);
private:
int _handle_pollid;
std::unordered_map<SRTSOCKET, SRT_CONN_PTR> _conn_map;//save all srt connection: pull or push
//save push srt connection for prevent from repeat push connection
std::unordered_map<std::string, SRT_CONN_PTR> _push_conn_map;//key:streamid, value:SRT_CONN_PTR
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
std::unordered_map<std::string, std::unordered_map<SRTSOCKET, SRT_CONN_PTR>> _streamid_map;
long long _last_timestamp;
long long _last_check_alive_ts;
};
#endif //SRT_HANDLE_H

@ -1,36 +0,0 @@
#include "srt_log.hpp"
#include "srt_to_rtmp.hpp"
#include <string>
#include <stdint.h>
#include <stdarg.h>
LOGGER_LEVEL s_log_level = SRT_LOGGER_TRACE_LEVEL;
static char* srt_log_buffer = new char[LOGGER_BUFFER_SIZE];
void snprintbuffer(char* buffer, size_t size, const char* fmt, ...) {
va_list ap;
va_start(ap, fmt);
vsnprintf(buffer, size, fmt, ap);
va_end(ap);
return;
}
void set_srt_log_level(LOGGER_LEVEL level) {
s_log_level = level;
}
LOGGER_LEVEL get_srt_log_level() {
return s_log_level;
}
char* get_srt_log_buffer() {
return srt_log_buffer;
}
void srt_log_output(LOGGER_LEVEL level, const char* buffer) {
std::string log_content(buffer);
srt2rtmp::get_instance()->insert_log_message(level, log_content);
return;
}

@ -1,52 +0,0 @@
#ifndef SRT_LOG_HPP
#define SRT_LOG_HPP
#include <stdint.h>
#include <stddef.h>
#define LOGGER_BUFFER_SIZE (10*1024)
typedef enum {
SRT_LOGGER_INFO_LEVEL,
SRT_LOGGER_TRACE_LEVEL,
SRT_LOGGER_WARN_LEVEL,
SRT_LOGGER_ERROR_LEVEL
} LOGGER_LEVEL;
void set_srt_log_level(LOGGER_LEVEL level);
LOGGER_LEVEL get_srt_log_level();
char* get_srt_log_buffer();
void srt_log_output(LOGGER_LEVEL level, const char* buffer);
void snprintbuffer(char* buffer, size_t size, const char* fmt, ...);
#define srt_log_error(...) \
if (get_srt_log_level() <= SRT_LOGGER_ERROR_LEVEL) \
{ \
char* buffer = get_srt_log_buffer(); \
snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \
srt_log_output(SRT_LOGGER_ERROR_LEVEL, buffer); \
}
#define srt_log_warn(...) \
if (get_srt_log_level() <= SRT_LOGGER_WARN_LEVEL) \
{ \
char* buffer = get_srt_log_buffer(); \
snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \
srt_log_output(SRT_LOGGER_WARN_LEVEL, buffer); \
}
#define srt_log_trace(...) \
if (get_srt_log_level() <= SRT_LOGGER_TRACE_LEVEL) \
{ \
char* buffer = get_srt_log_buffer(); \
snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \
srt_log_output(SRT_LOGGER_TRACE_LEVEL, buffer); \
}
#define srt_log_info(...) \
if (get_srt_log_level() <= SRT_LOGGER_INFO_LEVEL) \
{ \
char* buffer = get_srt_log_buffer(); \
snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \
srt_log_output(SRT_LOGGER_INFO_LEVEL, buffer); \
}
#endif //SRT_LOG_HPP

@ -1,359 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#include "srt_server.hpp"
#include "srt_handle.hpp"
#include "srt_log.hpp"
#include <srt/udt.h>
#include <thread>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdexcept>
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
#include <srs_app_utility.hpp>
srt_server::srt_server(unsigned short port):_listen_port(port)
,_server_socket(-1)
{
}
srt_server::~srt_server()
{
}
int srt_server::init_srt_parameter() {
const int DEF_RECV_LATENCY = 120;
const int DEF_PEER_LATENCY = 0;
int opt_len = sizeof(int);
if (_server_socket == -1) {
return -1;
}
int maxbw = _srs_config->get_srto_maxbw();
srt_setsockopt(_server_socket, 0, SRTO_MAXBW, &maxbw, opt_len);
int mss = _srs_config->get_srto_mss();
srt_setsockopt(_server_socket, 0, SRTO_MSS, &mss, opt_len);
bool tlpkdrop = _srs_config->get_srto_tlpkdrop();
int tlpkdrop_i = tlpkdrop ? 1 : 0;
srt_setsockopt(_server_socket, 0, SRTO_TLPKTDROP, &tlpkdrop_i, opt_len);
int connection_timeout = _srs_config->get_srto_conntimeout();
srt_setsockopt(_server_socket, 0, SRTO_CONNTIMEO, &connection_timeout, opt_len);
int send_buff = _srs_config->get_srto_sendbuf();
srt_setsockopt(_server_socket, 0, SRTO_SNDBUF, &send_buff, opt_len);
int recv_buff = _srs_config->get_srto_recvbuf();
srt_setsockopt(_server_socket, 0, SRTO_RCVBUF, &recv_buff, opt_len);
int payload_size = _srs_config->get_srto_payloadsize();
srt_setsockopt(_server_socket, 0, SRTO_PAYLOADSIZE, &payload_size, opt_len);
int latency = _srs_config->get_srto_latency();
if (DEF_RECV_LATENCY != latency) {
srt_setsockopt(_server_socket, 0, SRTO_LATENCY, &latency, opt_len);
}
int recv_latency = _srs_config->get_srto_recv_latency();
if (DEF_RECV_LATENCY != recv_latency) {
srt_setsockopt(_server_socket, 0, SRTO_RCVLATENCY, &recv_latency, opt_len);
}
int peer_latency = _srs_config->get_srto_peer_latency();
if (DEF_PEER_LATENCY != peer_latency) {
srt_setsockopt(_server_socket, 0, SRTO_PEERLATENCY, &recv_latency, opt_len);
}
srt_log_trace("init srt parameter, maxbw:%d, mss:%d, tlpkdrop:%d, connect timeout:%d, \
send buff:%d, recv buff:%d, payload size:%d, latency:%d, recv latency:%d, peer latency:%d",
maxbw, mss, tlpkdrop, connection_timeout, send_buff, recv_buff, payload_size,
latency, recv_latency, peer_latency);
return 0;
}
void srt_server::init_srt_log() {
SrsLogLevel level = srs_get_log_level(_srs_config->get_log_level());
switch (level) {
case SrsLogLevelInfo:
{
set_srt_log_level(SRT_LOGGER_INFO_LEVEL);
break;
}
case SrsLogLevelTrace:
{
set_srt_log_level(SRT_LOGGER_TRACE_LEVEL);
break;
}
case SrsLogLevelWarn:
{
set_srt_log_level(SRT_LOGGER_WARN_LEVEL);
break;
}
case SrsLogLevelError:
{
set_srt_log_level(SRT_LOGGER_ERROR_LEVEL);
break;
}
default:
{
set_srt_log_level(SRT_LOGGER_TRACE_LEVEL);
}
}
return;
}
int srt_server::init_srt() {
if (_server_socket != -1) {
return -1;
}
init_srt_log();
_server_socket = srt_create_socket();
sockaddr_in sa;
memset(&sa, 0, sizeof sa);
sa.sin_family = AF_INET;
sa.sin_port = htons(_listen_port);
sockaddr* psa = (sockaddr*)&sa;
int ret = srt_bind(_server_socket, psa, sizeof(sa));
if ( ret == SRT_ERROR )
{
srt_close(_server_socket);
srt_log_error("srt bind error: %d", ret);
return -1;
}
ret = srt_listen(_server_socket, 5);
if (ret == SRT_ERROR)
{
srt_close(_server_socket);
srt_log_error("srt listen error: %d", ret);
return -2;
}
init_srt_parameter();
_pollid = srt_epoll_create();
if (_pollid < -1) {
srt_log_error("srt server srt_epoll_create error, port=%d", _listen_port);
return -1;
}
_handle_ptr = std::make_shared<srt_handle>(_pollid);
int events = SRT_EPOLL_IN | SRT_EPOLL_ERR;
ret = srt_epoll_add_usock(_pollid, _server_socket, &events);
if (ret < 0) {
srt_log_error("srt server run add epoll error:%d", ret);
return ret;
}
srt_log_trace("srt server listen port=%d, server_fd=%d", _listen_port, _server_socket);
return 0;
}
int srt_server::start()
{
int ret;
if ((ret = init_srt()) < 0) {
return ret;
}
run_flag = true;
srt_log_trace("srt server is starting... port(%d)", _listen_port);
thread_run_ptr = std::make_shared<std::thread>(&srt_server::on_work, this);
return 0;
}
void srt_server::stop()
{
run_flag = false;
if (!thread_run_ptr) {
return;
}
thread_run_ptr->join();
return;
}
void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) {
SRTSOCKET conn_fd = -1;
sockaddr_in scl;
int sclen = sizeof(scl);
int conn_event;// = SRT_EPOLL_IN |SRT_EPOLL_OUT| SRT_EPOLL_ERR;
switch(status) {
case SRTS_LISTENING:
{
conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen);
if (conn_fd == -1) {
return;
}
//add new srt connect into srt handle
std::string streamid = UDT::getstreamid(conn_fd);
if (!is_streamid_valid(streamid)) {
srt_log_trace("srt streamid(%s) error, fd:%d", streamid.c_str(), conn_fd);
srt_close(conn_fd);
return;
}
SRT_CONN_PTR srt_conn_ptr = std::make_shared<srt_conn>(conn_fd, streamid);
std::string vhost_str = srt_conn_ptr->get_vhost();
srt_log_trace("new srt connection streamid:%s, fd:%d, vhost:%s",
streamid.c_str(), conn_fd, vhost_str.c_str());
SrsConfDirective* vhost_p = _srs_config->get_vhost(vhost_str, true);
if (!vhost_p) {
srt_log_trace("srt streamid(%s): no vhost %s, fd:%d",
streamid.c_str(), vhost_str.c_str(), conn_fd);
srt_conn_ptr->close();
return;
}
if (srt_conn_ptr->get_mode() == PULL_SRT_MODE) {
//add SRT_EPOLL_IN for information notify
conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;//not inlucde SRT_EPOLL_OUT for save cpu
} else if (srt_conn_ptr->get_mode() == PUSH_SRT_MODE) {
conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;
} else {
srt_log_trace("stream mode error, it should be m=push or m=pull, streamid:%s",
srt_conn_ptr->get_streamid().c_str());
srt_conn_ptr->close();
return;
}
_handle_ptr->add_newconn(srt_conn_ptr, conn_event);
break;
}
case SRTS_CONNECTED:
{
srt_log_trace("srt connected: socket=%d, mode:%s", input_fd, dscr.c_str());
break;
}
case SRTS_BROKEN:
{
srt_epoll_remove_usock(_pollid, input_fd);
srt_close(input_fd);
srt_log_warn("srt close: socket=%d", input_fd);
break;
}
default:
{
srt_log_error("srt server unkown status:%d", status);
}
}
}
void srt_server::srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) {
_handle_ptr->handle_srt_socket(status, input_fd);
return;
}
void srt_server::on_work()
{
const unsigned int SRT_FD_MAX = 100;
srt_log_trace("srt server is working port(%d)", _listen_port);
while (run_flag)
{
SRTSOCKET read_fds[SRT_FD_MAX];
SRTSOCKET write_fds[SRT_FD_MAX];
int rfd_num = SRT_FD_MAX;
int wfd_num = SRT_FD_MAX;
int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds, &wfd_num, -1,
nullptr, nullptr, nullptr, nullptr);
if (ret < 0) {
continue;
}
_handle_ptr->check_alive();
for (int index = 0; index < rfd_num; index++) {
SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);
if (_server_socket == read_fds[index]) {
srt_handle_connection(status, read_fds[index], "read fd");
} else {
srt_handle_data(status, read_fds[index], "read fd");
}
}
for (int index = 0; index < wfd_num; index++) {
SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]);
if (_server_socket == write_fds[index]) {
srt_handle_connection(status, write_fds[index], "write fd");
} else {
srt_handle_data(status, write_fds[index], "write fd");
}
}
}
// New API at 2020-01-28, >1.4.1
// @see https://github.com/Haivision/srt/commit/b8c70ec801a56bea151ecce9c09c4ebb720c2f68#diff-fb66028e8746fea578788532533a296bR786
#if (SRT_VERSION_MAJOR<<24 | SRT_VERSION_MINOR<<16 | SRT_VERSION_PATCH<<8) > 0x01040100
srt_epoll_clear_usocks(_pollid);
#endif
}
SrtServerAdapter::SrtServerAdapter()
{
}
SrtServerAdapter::~SrtServerAdapter()
{
}
srs_error_t SrtServerAdapter::initialize()
{
srs_error_t err = srs_success;
// TODO: FIXME: We could fork processes here, because here only ST is initialized.
return err;
}
srs_error_t SrtServerAdapter::run(SrsWaitGroup* wg)
{
srs_error_t err = srs_success;
// TODO: FIXME: We could start a coroutine to dispatch SRT task to processes.
if(_srs_config->get_srt_enabled()) {
srt_log_trace("srt server is enabled...");
unsigned short srt_port = _srs_config->get_srt_listen_port();
srt_log_trace("srt server listen port:%d", srt_port);
err = srt2rtmp::get_instance()->init();
if (err != srs_success) {
return srs_error_wrap(err, "srt start srt2rtmp error");
}
srt_ptr = std::make_shared<srt_server>(srt_port);
if (!srt_ptr) {
return srs_error_wrap(err, "srt listen %d", srt_port);
}
} else {
srt_log_trace("srt server is disabled...");
}
if(_srs_config->get_srt_enabled()) {
srt_ptr->start();
}
return err;
}
void SrtServerAdapter::stop()
{
// TODO: FIXME: If forked processes, we should do cleanup.
}

@ -1,67 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef SRT_SERVER_H
#define SRT_SERVER_H
#include <srs_core.hpp>
#include <srt/srt.h>
#include <thread>
#include <memory>
#include <srs_app_hybrid.hpp>
class srt_handle;
class SrsWaitGroup;
class srt_server {
public:
srt_server(unsigned short port);
~srt_server();
int start();//init srt handl and create srt main thread loop
void stop();//stop srt main thread loop
private:
//init srt socket and srt epoll
int init_srt();
int init_srt_parameter();
void init_srt_log();
//srt main epoll loop
void on_work();
//accept new srt connection
void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr);
//get srt data read/write
void srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr);
private:
unsigned short _listen_port;
SRTSOCKET _server_socket;
int _pollid;
bool run_flag;
std::shared_ptr<std::thread> thread_run_ptr;
std::shared_ptr<srt_handle> _handle_ptr;
};
typedef std::shared_ptr<srt_server> SRT_SERVER_PTR;
class SrtServerAdapter : public ISrsHybridServer
{
private:
SRT_SERVER_PTR srt_ptr;
public:
SrtServerAdapter();
virtual ~SrtServerAdapter();
public:
virtual srs_error_t initialize();
virtual srs_error_t run(SrsWaitGroup* wg);
virtual void stop();
};
#endif//SRT_SERVER_H

@ -1,779 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#include "srt_to_rtmp.hpp"
#include "stringex.hpp"
#include "time_help.hpp"
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
#include <srs_kernel_stream.hpp>
#include <list>
#include <sstream>
std::shared_ptr<srt2rtmp> srt2rtmp::s_srt2rtmp_ptr;
std::mutex srt2rtmp::_srt_error_mutex;
std::map<std::string, int> srt2rtmp::_srt_error_map;
std::shared_ptr<srt2rtmp> srt2rtmp::get_instance() {
if (!s_srt2rtmp_ptr) {
s_srt2rtmp_ptr = std::make_shared<srt2rtmp>();
}
return s_srt2rtmp_ptr;
}
srt2rtmp::srt2rtmp():_lastcheck_ts(0) {
}
srt2rtmp::~srt2rtmp() {
release();
}
srs_error_t srt2rtmp::init() {
srs_error_t err = srs_success;
if (_trd_ptr.get() != nullptr) {
return srs_error_wrap(err, "don't start thread again");
}
_trd_ptr = std::make_shared<SrsSTCoroutine>("srt2rtmp", this);
if ((err = _trd_ptr->start()) != srs_success) {
return srs_error_wrap(err, "start thread");
}
srs_trace("srt2rtmp start coroutine...");
return err;
}
void srt2rtmp::release() {
if (!_trd_ptr) {
return;
}
_trd_ptr->stop();
_trd_ptr = nullptr;
}
void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path) {
std::unique_lock<std::mutex> locker(_mutex);
SRT_DATA_MSG_PTR msg_ptr = std::make_shared<SRT_DATA_MSG>(data_p, len, key_path);
_msg_queue.push(msg_ptr);
//_notify_cond.notify_one();
return;
}
void srt2rtmp::insert_ctrl_message(unsigned int msg_type, const std::string& key_path) {
std::unique_lock<std::mutex> locker(_mutex);
SRT_DATA_MSG_PTR msg_ptr = std::make_shared<SRT_DATA_MSG>(key_path, msg_type);
_msg_queue.push(msg_ptr);
//_notify_cond.notify_one();
return;
}
void srt2rtmp::insert_log_message(LOGGER_LEVEL level, const std::string& log_content) {
std::unique_lock<std::mutex> locker(_mutex);
SRT_DATA_MSG_PTR msg_ptr = std::make_shared<SRT_DATA_MSG>(level, log_content);
msg_ptr->set_msg_type(SRT_MSG_LOG_TYPE);
_msg_queue.push(msg_ptr);
return;
}
SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
std::unique_lock<std::mutex> locker(_mutex);
SRT_DATA_MSG_PTR msg_ptr;
if (_msg_queue.empty())
{
return msg_ptr;
}
//while (_msg_queue.empty()) {
// _notify_cond.wait(locker);
//}
msg_ptr = _msg_queue.front();
_msg_queue.pop();
return msg_ptr;
}
void srt2rtmp::check_rtmp_alive() {
const int64_t CHECK_INTERVAL = 5*1000;
const int64_t ALIVE_TIMEOUT_MAX = 5*1000;
if (_lastcheck_ts == 0) {
_lastcheck_ts = now_ms();
return;
}
int64_t timenow_ms = now_ms();
if ((timenow_ms - _lastcheck_ts) > CHECK_INTERVAL) {
_lastcheck_ts = timenow_ms;
for (auto iter = _rtmp_client_map.begin();
iter != _rtmp_client_map.end();) {
RTMP_CLIENT_PTR rtmp_ptr = iter->second;
if ((timenow_ms - rtmp_ptr->get_last_live_ts()) >= ALIVE_TIMEOUT_MAX) {
srs_warn("srt2rtmp client is timeout, url:%s",
rtmp_ptr->get_url().c_str());
_rtmp_client_map.erase(iter++);
rtmp_ptr->close();
} else {
iter++;
}
}
}
return;
}
void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) {
RTMP_CLIENT_PTR rtmp_ptr;
auto iter = _rtmp_client_map.find(key_path);
if (iter == _rtmp_client_map.end()) {
srs_error("fail to close rtmp session fail, can't find session by key_path:%s",
key_path.c_str());
return;
}
rtmp_ptr = iter->second;
_rtmp_client_map.erase(iter);
srs_trace("close rtmp session which key_path is %s", key_path.c_str());
rtmp_ptr->close();
return;
}
//the cycle is running in srs coroutine
srs_error_t srt2rtmp::cycle() {
srs_error_t err = srs_success;
_lastcheck_ts = 0;
int err_code = -1;
while(true) {
SRT_DATA_MSG_PTR msg_ptr = get_data_message();
if (!msg_ptr) {
srs_usleep((30 * SRS_UTIME_MILLISECONDS));
} else {
switch (msg_ptr->msg_type()) {
case SRT_MSG_DATA_TYPE:
{
err_code = handle_ts_data(msg_ptr);
if (err_code != ERROR_SUCCESS) {
std::unique_lock<std::mutex> locker(_srt_error_mutex);
_srt_error_map[msg_ptr->get_path()] = err_code;
}
break;
}
case SRT_MSG_CLOSE_TYPE:
{
handle_close_rtmpsession(msg_ptr->get_path());
break;
}
case SRT_MSG_LOG_TYPE:
{
handle_log_data(msg_ptr);
break;
}
default:
{
srs_error("srt to rtmp get wrong message type(%u), path:%s",
msg_ptr->msg_type(), msg_ptr->get_path().c_str());
assert(0);
}
}
}
check_rtmp_alive();
if ((err = _trd_ptr->pull()) != srs_success) {
return srs_error_wrap(err, "forwarder");
}
}
}
int srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
RTMP_CLIENT_PTR rtmp_ptr;
auto iter = _rtmp_client_map.find(data_ptr->get_path());
if (iter == _rtmp_client_map.end()) {
srs_trace("new rtmp client for srt upstream, key_path:%s", data_ptr->get_path().c_str());
rtmp_ptr = std::make_shared<rtmp_client>(data_ptr->get_path());
_rtmp_client_map.insert(std::make_pair(data_ptr->get_path(), rtmp_ptr));
} else {
rtmp_ptr = iter->second;
}
return rtmp_ptr->receive_ts_data(data_ptr);
}
void srt2rtmp::handle_log_data(SRT_DATA_MSG_PTR data_ptr) {
switch (data_ptr->get_log_level()) {
case SRT_LOGGER_INFO_LEVEL:
{
srs_info(data_ptr->get_log_string());
break;
}
case SRT_LOGGER_TRACE_LEVEL:
{
srs_trace(data_ptr->get_log_string());
break;
}
case SRT_LOGGER_WARN_LEVEL:
{
srs_warn(data_ptr->get_log_string());
break;
}
case SRT_LOGGER_ERROR_LEVEL:
{
srs_error(data_ptr->get_log_string());
break;
}
default:
{
srs_trace(data_ptr->get_log_string());
}
}
return;
}
rtmp_client::rtmp_client(std::string key_path):_key_path(key_path)
, _connect_flag(false) {
const std::string DEF_VHOST = "DEFAULT_VHOST";
_ts_demux_ptr = std::make_shared<ts_demux>();
_avc_ptr = std::make_shared<SrsRawH264Stream>();
_aac_ptr = std::make_shared<SrsRawAacStream>();
std::vector<std::string> ret_vec;
string_split(key_path, "/", ret_vec);
if (ret_vec.size() >= 3) {
_vhost = ret_vec[0];
_appname = ret_vec[1];
_streamname = ret_vec[2];
} else {
_vhost = DEF_VHOST;
_appname = ret_vec[0];
_streamname = ret_vec[1];
}
std::stringstream url_ss;
std::vector<std::string> ip_ports = _srs_config->get_listens();
int port = 0;
std::string ip;
for (auto item : ip_ports) {
srs_parse_endpoint(item, ip, port);
if (port != 0) {
break;
}
}
port = (port == 0) ? SRS_CONSTS_RTMP_DEFAULT_PORT : port;
std::stringstream ss;
ss << "rtmp://" << SRS_CONSTS_LOCALHOST;
ss << ":" << port;
ss << "/" << _appname;
ss << "/" << _streamname;
ss << (_streamname.find("?") != std::string::npos ? "&" : "?") << "upstream=srt";
if (_vhost != DEF_VHOST) {
ss << "&vhost=" << _vhost;
}
_url = ss.str();
_h264_sps_changed = false;
_h264_pps_changed = false;
_h264_sps_pps_sent = false;
_last_live_ts = now_ms();
srs_trace("rtmp client construct url:%s", url_ss.str().c_str());
}
rtmp_client::~rtmp_client() {
}
void rtmp_client::close() {
_connect_flag = false;
if (!_rtmp_conn_ptr) {
return;
}
srs_trace("rtmp client close url:%s", _url.c_str());
_rtmp_conn_ptr->close();
_rtmp_conn_ptr = nullptr;
}
int64_t rtmp_client::get_last_live_ts() {
return _last_live_ts;
}
std::string rtmp_client::get_url() {
return _url;
}
srs_error_t rtmp_client::connect() {
srs_error_t err = srs_success;
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
_last_live_ts = now_ms();
if (_connect_flag) {
return srs_success;
}
if (_rtmp_conn_ptr.get() != nullptr) {
return srs_error_wrap(err, "repeated connect %s failed, cto=%dms, sto=%dms.",
_url.c_str(), srsu2msi(cto), srsu2msi(sto));
}
_rtmp_conn_ptr = std::make_shared<SrsSimpleRtmpClient>(_url, cto, sto);
if ((err = _rtmp_conn_ptr->connect()) != srs_success) {
close();
return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.",
_url.c_str(), srsu2msi(cto), srsu2msi(sto));
}
if ((err = _rtmp_conn_ptr->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
close();
return srs_error_wrap(err, "rtmp client in srt2rtmp publish fail url:%s", _url.c_str());
}
_connect_flag = true;
return err;
}
int rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
return _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback
}
srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) {
srs_error_t err = srs_success;
// TODO: FIMXE: there exists bug, see following comments.
// when sps or pps changed, update the sequence header,
// for the pps maybe not changed while sps changed.
// so, we must check when each video ts message frame parsed.
if (!_h264_sps_changed || !_h264_pps_changed) {
return err;
}
// h264 raw to h264 packet.
std::string sh;
if ((err = _avc_ptr->mux_sequence_header(_h264_sps, _h264_pps, dts, pts, sh)) != srs_success) {
return srs_error_wrap(err, "mux sequence header");
}
// h264 packet to flv packet.
int8_t frame_type = SrsVideoAvcFrameTypeKeyFrame;
int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader;
char* flv = NULL;
int nb_flv = 0;
if ((err = _avc_ptr->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) {
return srs_error_wrap(err, "avc to flv");
}
if (_srs_config->get_srt_mix_correct()) {
_rtmp_queue.insert_rtmp_data((unsigned char*)flv, nb_flv, (int64_t)dts, SrsFrameTypeVideo);
err = rtmp_write_work();
} else {
err = rtmp_write_packet(SrsFrameTypeVideo, dts, flv, nb_flv);
}
// reset sps and pps.
_h264_sps_changed = false;
_h264_pps_changed = false;
_h264_sps_pps_sent = true;
return err;
}
srs_error_t rtmp_client::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) {
srs_error_t err = srs_success;
// when sps or pps not sent, ignore the packet.
// @see https://github.com/ossrs/srs/issues/203
if (!_h264_sps_pps_sent) {
return srs_error_new(ERROR_H264_DROP_BEFORE_SPS_PPS, "drop sps/pps");
}
// 5bits, 7.3.1 NAL unit syntax,
// ISO_IEC_14496-10-AVC-2003.pdf, page 44.
// 7: SPS, 8: PPS, 5: I Frame, 1: P Frame
SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f);
// for IDR frame, the frame is keyframe.
SrsVideoAvcFrameType frame_type = SrsVideoAvcFrameTypeInterFrame;
if (nal_unit_type == SrsAvcNaluTypeIDR) {
frame_type = SrsVideoAvcFrameTypeKeyFrame;
}
std::string ibp;
if ((err = _avc_ptr->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) {
return srs_error_wrap(err, "mux frame");
}
int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU;
char* flv = NULL;
int nb_flv = 0;
if ((err = _avc_ptr->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) {
return srs_error_wrap(err, "mux avc to flv");
}
if (_srs_config->get_srt_mix_correct()) {
_rtmp_queue.insert_rtmp_data((unsigned char*)flv, nb_flv, (int64_t)dts, SrsFrameTypeVideo);
err = rtmp_write_work();
} else {
err = rtmp_write_packet(SrsFrameTypeVideo, dts, flv, nb_flv);
}
return err;
}
srs_error_t rtmp_client::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) {
srs_error_t err = srs_success;
char* data = NULL;
int size = 0;
if ((err = _aac_ptr->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) {
return srs_error_wrap(err, "mux aac to flv");
}
if (_srs_config->get_srt_mix_correct()) {
_rtmp_queue.insert_rtmp_data((unsigned char*)data, size, (int64_t)dts, SrsFrameTypeAudio);
err = rtmp_write_work();
} else {
err = rtmp_write_packet(SrsFrameTypeAudio, dts, data, size);
}
return err;
}
srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) {
srs_error_t err = srs_success;
SrsSharedPtrMessage* msg = NULL;
if (!_rtmp_conn_ptr) {
//when rtmp connection is closed, it's not error and just return;
srs_freepa(data);
return err;
}
if ((err = srs_rtmp_create_msg(type, timestamp, data, size, _rtmp_conn_ptr->sid(), &msg)) != srs_success) {
return srs_error_wrap(err, "create message fail, url:%s", _url.c_str());
}
srs_assert(msg);
// send out encoded msg.
if ((err = _rtmp_conn_ptr->send_and_free_message(msg)) != srs_success) {
close();
return srs_error_wrap(err, "rtmp client in srt2rtmp send message fail, url:%s", _url.c_str());
}
return err;
}
srs_error_t rtmp_client::rtmp_write_work() {
srs_error_t err = srs_success;
rtmp_packet_info_s packet_info;
bool ret = false;
do {
ret = _rtmp_queue.get_rtmp_data(packet_info);
if (ret) {
err = rtmp_write_packet(packet_info._type, packet_info._dts, (char*)packet_info._data, packet_info._len);
if (err != srs_success) {
break;
}
}
} while(ret);
return err;
}
srs_error_t rtmp_client::on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts) {
srs_error_t err = srs_success;
// ensure rtmp connected.
if ((err = connect()) != srs_success) {
return err;
}
dts = dts / 90;
pts = pts / 90;
if (dts == 0) {
dts = pts;
}
// send each frame.
while (!avs_ptr->empty()) {
char* frame = NULL;
int frame_size = 0;
if ((err = _avc_ptr->annexb_demux(avs_ptr.get(), &frame, &frame_size)) != srs_success) {
return srs_error_wrap(err, "demux annexb");
}
// 5bits, 7.3.1 NAL unit syntax,
// ISO_IEC_14496-10-AVC-2003.pdf, page 44.
// 7: SPS, 8: PPS, 5: I Frame, 1: P Frame
SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f);
// ignore the nalu type aud(9), pad(12)
if ((nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter)
|| (nal_unit_type == SrsAvcNaluTypeFilterData)) {
continue;
}
// TODO: FIXME: Should cache this config, it's better not to get it for each video frame.
if (_srs_config->get_srt_sei_filter()) {
if (nal_unit_type == SrsAvcNaluTypeSEI) {
continue;
}
}
// for sps
if (_avc_ptr->is_sps(frame, frame_size)) {
std::string sps;
if ((err = _avc_ptr->sps_demux(frame, frame_size, sps)) != srs_success) {
return srs_error_wrap(err, "demux sps");
}
if (_h264_sps == sps) {
continue;
}
_h264_sps_changed = true;
_h264_sps = sps;
if ((err = write_h264_sps_pps(dts, pts)) != srs_success) {
return srs_error_wrap(err, "write sps/pps");
}
continue;
}
// for pps
if (_avc_ptr->is_pps(frame, frame_size)) {
std::string pps;
if ((err = _avc_ptr->pps_demux(frame, frame_size, pps)) != srs_success) {
return srs_error_wrap(err, "demux pps");
}
if (_h264_pps == pps) {
continue;
}
_h264_pps_changed = true;
_h264_pps = pps;
if ((err = write_h264_sps_pps(dts, pts)) != srs_success) {
return srs_error_wrap(err, "write sps/pps");
}
continue;
}
// ibp frame.
// for Issue: https://github.com/ossrs/srs/issues/2390
// we only skip pps/sps frame and send left nalus.
srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", avs_ptr->left() + frame_size, dts);
if ((err = write_h264_ipb_frame(avs_ptr->head() - frame_size, avs_ptr->left() + frame_size, dts, pts)) != srs_success) {
return srs_error_wrap(err, "write frame");
}
_last_live_ts = now_ms();
break;
}
return err;
}
int rtmp_client::get_sample_rate(char sample_index) {
int sample_rate = 44100;
if ((sample_index >= 0) && (sample_index < SrsAAcSampleRateNumbers)) {
sample_rate = srs_aac_srates[(uint8_t)sample_index];
}
return sample_rate;
}
srs_error_t rtmp_client::on_ts_audio(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts) {
srs_error_t err = srs_success;
uint64_t base_dts;
uint64_t real_dts;
uint64_t first_dts;
int index = 0;
int sample_size = 1024;
// ensure rtmp connected.
if ((err = connect()) != srs_success) {
return srs_error_wrap(err, "connect");
}
base_dts = dts/90;
if (base_dts == 0) {
base_dts = pts/90;
}
// send each frame.
while (!avs_ptr->empty()) {
char* frame = NULL;
int frame_size = 0;
SrsRawAacStreamCodec codec;
if ((err = _aac_ptr->adts_demux(avs_ptr.get(), &frame, &frame_size, codec)) != srs_success) {
return srs_error_wrap(err, "demux adts");
}
if (frame_size <= 0) {
continue;
}
int sample_rate = get_sample_rate(codec.sound_rate);
if (codec.aac_packet_type > SrsAudioOpusFrameTraitRaw) {
sample_size = 2048;
} else {
sample_size = 1024;
}
real_dts = base_dts + index * 1000.0 * sample_size / sample_rate;
if (index == 0) {
first_dts = real_dts;
}
index++;
// generate sh.
if (_aac_specific_config.empty()) {
std::string sh;
if ((err = _aac_ptr->mux_sequence_header(&codec, sh)) != srs_success) {
return srs_error_wrap(err, "mux sequence header");
}
_aac_specific_config = sh;
codec.aac_packet_type = 0;
if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, real_dts)) != srs_success) {
return srs_error_wrap(err, "write raw audio frame");
}
}
// audio raw data.
codec.aac_packet_type = 1;
if ((err = write_audio_raw_frame(frame, frame_size, &codec, real_dts)) != srs_success) {
return srs_error_wrap(err, "write audio raw frame");
}
_last_live_ts = now_ms();
}
uint64_t diff_t = real_dts - first_dts;
diff_t += 100;
if ((diff_t > 200) && (diff_t < 600)) {
srs_info("set_queue_timeout timeout:%lu", diff_t);
_rtmp_queue.set_queue_timeout(diff_t);
}
return err;
}
int rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type,
uint64_t dts, uint64_t pts)
{
srs_error_t err = srs_success;
if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) {
assert(0);
return 0;
}
auto avs_ptr = std::make_shared<SrsBuffer>((char*)data_ptr->get_data(), data_ptr->data_len());
if (media_type == STREAM_TYPE_VIDEO_H264) {
err = on_ts_video(avs_ptr, dts, pts);
} else if (media_type == STREAM_TYPE_AUDIO_AAC) {
err = on_ts_audio(avs_ptr, dts, pts);
} else {
srs_error("mpegts demux unkown stream type:0x%02x, only support h264+aac", media_type);
return 0;
}
if (err != srs_success) {
srs_error("send media data error:%s", srs_error_desc(err).c_str());
int err_code = srs_error_code(err);
srs_freep(err);
return err_code;
}
return 0;
}
rtmp_packet_queue::rtmp_packet_queue():_queue_timeout(QUEUE_DEF_TIMEOUT)
,_queue_maxlen(QUEUE_LEN_MAX)
,_first_packet_t(-1)
,_first_local_t(-1) {
}
rtmp_packet_queue::~rtmp_packet_queue() {
for (auto item : _send_map) {
rtmp_packet_info_s info = item.second;
if (info._data) {
delete info._data;
}
}
_send_map.clear();
}
void rtmp_packet_queue::set_queue_timeout(int64_t queue_timeout) {
_queue_timeout = queue_timeout;
}
void rtmp_packet_queue::insert_rtmp_data(unsigned char* data, int len, int64_t dts, char media_type) {
rtmp_packet_info_s packet_info;
packet_info._data = data;
packet_info._len = len;
packet_info._dts = dts;
packet_info._type = media_type;
if (_first_packet_t == -1) {
_first_packet_t = dts;
_first_local_t = (int64_t)now_ms();
}
_send_map.insert(std::make_pair(dts, packet_info));
return;
}
bool rtmp_packet_queue::is_ready() {
if (!_srs_config->get_srt_mix_correct() && !_send_map.empty()) {
return true;
}
if (_send_map.size() < 2) {
return false;
}
if (_send_map.size() >= (size_t)_queue_maxlen) {
return true;
}
auto first_item = _send_map.begin();
int64_t now_t = (int64_t)now_ms();
int64_t diff_t = (now_t - _first_local_t) - (first_item->first - _first_packet_t);
if (diff_t >= _queue_timeout) {
return true;
}
return false;
}
bool rtmp_packet_queue::get_rtmp_data(rtmp_packet_info_s& packet_info) {
if (!is_ready()) {
return false;
}
auto iter = _send_map.begin();
packet_info = iter->second;
_send_map.erase(iter);
return true;
}

@ -1,162 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef SRT_TO_RTMP_H
#define SRT_TO_RTMP_H
#include <srs_core.hpp>
#include <memory>
#include <string>
#include <thread>
#include <queue>
#include <map>
#include <mutex>
#include <condition_variable>
#include <srs_kernel_ts.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_protocol_raw_avc.hpp>
#include <srs_protocol_utility.hpp>
#include <unordered_map>
#include "srt_data.hpp"
#include "ts_demux.hpp"
#include "srt_log.hpp"
#define SRT_VIDEO_MSG_TYPE 0x01
#define SRT_AUDIO_MSG_TYPE 0x02
typedef std::shared_ptr<SrsSimpleRtmpClient> RTMP_CONN_PTR;
typedef std::shared_ptr<SrsRawH264Stream> AVC_PTR;
typedef std::shared_ptr<SrsRawAacStream> AAC_PTR;
#define DEFAULT_VHOST "__default_host__"
#define QUEUE_DEF_TIMEOUT 500
#define QUEUE_LEN_MAX 100
typedef struct {
unsigned char* _data;
int _len;
int64_t _dts;
char _type;
char reserve[3];
} rtmp_packet_info_s;
class rtmp_packet_queue {
public:
rtmp_packet_queue();
~rtmp_packet_queue();
void set_queue_timeout(int64_t queue_timeout);
void insert_rtmp_data(unsigned char* data, int len, int64_t dts, char media_type);
bool get_rtmp_data(rtmp_packet_info_s& packet_info);
private:
bool is_ready();
private:
int64_t _queue_timeout;
int64_t _queue_maxlen;
int64_t _first_packet_t;
int64_t _first_local_t;
std::multimap<int64_t, rtmp_packet_info_s> _send_map;//key:dts, value:rtmp_packet_info
};
class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_from_this<rtmp_client> {
public:
rtmp_client(std::string key_path);
virtual ~rtmp_client();
int receive_ts_data(SRT_DATA_MSG_PTR data_ptr);
int64_t get_last_live_ts();
std::string get_url();
srs_error_t connect();
void close();
private:
virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);
private:
srs_error_t on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts);
srs_error_t on_ts_audio(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts);
virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts);
virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts);
virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts);
int get_sample_rate(char sound_rate);
srs_error_t rtmp_write_work();
private:
virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size);
private:
std::string _key_path;
std::string _url;
std::string _vhost;
std::string _appname;
std::string _streamname;
TS_DEMUX_PTR _ts_demux_ptr;
private:
AVC_PTR _avc_ptr;
std::string _h264_sps;
bool _h264_sps_changed;
std::string _h264_pps;
bool _h264_pps_changed;
bool _h264_sps_pps_sent;
private:
std::string _aac_specific_config;
AAC_PTR _aac_ptr;
private:
RTMP_CONN_PTR _rtmp_conn_ptr;
bool _connect_flag;
int64_t _last_live_ts;
private:
rtmp_packet_queue _rtmp_queue;
};
typedef std::shared_ptr<rtmp_client> RTMP_CLIENT_PTR;
class srt2rtmp : public ISrsCoroutineHandler {
public:
static std::shared_ptr<srt2rtmp> get_instance();
srt2rtmp();
virtual ~srt2rtmp();
srs_error_t init();
void release();
void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path);
void insert_ctrl_message(unsigned int msg_type, const std::string& key_path);
void insert_log_message(LOGGER_LEVEL level, const std::string& log_content);
private:
SRT_DATA_MSG_PTR get_data_message();
virtual srs_error_t cycle();
int handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
void handle_close_rtmpsession(const std::string& key_path);
void handle_log_data(SRT_DATA_MSG_PTR data_ptr);
void check_rtmp_alive();
private:
static std::shared_ptr<srt2rtmp> s_srt2rtmp_ptr;
std::shared_ptr<SrsCoroutine> _trd_ptr;
std::mutex _mutex;
//std::condition_variable_any _notify_cond;
std::queue<SRT_DATA_MSG_PTR> _msg_queue;
std::unordered_map<std::string, RTMP_CLIENT_PTR> _rtmp_client_map;
int64_t _lastcheck_ts;
public:
static std::mutex _srt_error_mutex;
static std::map<std::string, int> _srt_error_map;
};
#endif

@ -1,48 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef STRING_EX_H
#define STRING_EX_H
#include <srs_core.hpp>
#include <iostream>
#include <string.h>
#include <vector>
#include <algorithm>
#include <iterator>
#include <cctype>
inline int string_split(const std::string& input_str, const std::string& split_str, std::vector<std::string>& output_vec) {
if (input_str.length() == 0) {
return 0;
}
std::string tempString(input_str);
do {
size_t pos = tempString.find(split_str);
if (pos == tempString.npos) {
output_vec.push_back(tempString);
break;
}
std::string seg_str = tempString.substr(0, pos);
tempString = tempString.substr(pos+split_str.size());
output_vec.push_back(seg_str);
} while(tempString.size() > 0);
return output_vec.size();
}
inline std::string string_lower(const std::string input_str) {
std::string output_str(input_str);
std::transform(input_str.begin(), input_str.end(), output_str.begin(), ::tolower);
return output_str;
}
#endif//STRING_EX_H

@ -1,19 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef TIME_HELP_H
#define TIME_HELP_H
#include <srs_core.hpp>
#include <chrono>
inline long long now_ms() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
#endif //TIME_HELP_H

@ -1,603 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#include "ts_demux.hpp"
#include "srt_log.hpp"
#include <string.h>
ts_demux::ts_demux():_data_total(0)
,_last_pid(0)
,_last_dts(0)
,_last_pts(0)
{
}
ts_demux::~ts_demux() {
}
int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback)
{
int pos = 0;
int npos = 0;
ts_header ts_header_info;
ts_header_info._sync_byte = data_p[pos];
pos++;
ts_header_info._transport_error_indicator = (data_p[pos]&0x80)>>7;
ts_header_info._payload_unit_start_indicator = (data_p[pos]&0x40)>>6;
ts_header_info._transport_priority = (data_p[pos]&0x20)>>5;
ts_header_info._PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF;
pos += 2;
ts_header_info._transport_scrambling_control = (data_p[pos]&0xC0)>>6;
ts_header_info._adaptation_field_control = (data_p[pos]&0x30)>>4;
ts_header_info._continuity_counter = (data_p[pos]&0x0F);
pos++;
npos = pos;
adaptation_field* field_p = &(ts_header_info._adaptation_field_info);
// adaptation field
// 0x01 No adaptation_field, payload only
// 0x02 Adaptation_field only, no payload
// 0x03 Adaptation_field followed by payload
if( ts_header_info._adaptation_field_control == 2
|| ts_header_info._adaptation_field_control == 3 ){
// adaptation_field()
field_p->_adaptation_field_length = data_p[pos];
pos++;
if( field_p->_adaptation_field_length > 0 ){
field_p->_discontinuity_indicator = (data_p[pos]&0x80)>>7;
field_p->_random_access_indicator = (data_p[pos]&0x40)>>6;
field_p->_elementary_stream_priority_indicator = (data_p[pos]&0x20)>>5;
field_p->_PCR_flag = (data_p[pos]&0x10)>>4;
field_p->_OPCR_flag = (data_p[pos]&0x08)>>3;
field_p->_splicing_point_flag = (data_p[pos]&0x04)>>2;
field_p->_transport_private_data_flag = (data_p[pos]&0x02)>>1;
field_p->_adaptation_field_extension_flag = (data_p[pos]&0x01);
pos++;
if( field_p->_PCR_flag == 1 ) { // PCR info
//program_clock_reference_base 33 uimsbf
//reserved 6 bslbf
//program_clock_reference_extension 9 uimsbf
pos += 6;
}
if( field_p->_OPCR_flag == 1 ) {
//original_program_clock_reference_base 33 uimsbf
//reserved 6 bslbf
//original_program_clock_reference_extension 9 uimsbf
pos += 6;
}
if( field_p->_splicing_point_flag == 1 ) {
//splice_countdown 8 tcimsbf
pos++;
}
if( field_p->_transport_private_data_flag == 1 ) {
//transport_private_data_length 8 uimsbf
field_p->_transport_private_data_length = data_p[pos];
pos++;
memcpy(field_p->_private_data_byte, data_p + pos, field_p->_transport_private_data_length);
pos += field_p->_transport_private_data_length;
}
if( field_p->_adaptation_field_extension_flag == 1 ) {
//adaptation_field_extension_length 8 uimsbf
field_p->_adaptation_field_extension_length = data_p[pos];
pos++;
//ltw_flag 1 bslbf
field_p->_ltw_flag = (data_p[pos]&0x80)>>7;
//piecewise_rate_flag 1 bslbf
field_p->_piecewise_rate_flag = (data_p[pos]&0x40)>>6;
//seamless_splice_flag 1 bslbf
field_p->_seamless_splice_flag = (data_p[pos]&0x20)>>5;
//reserved 5 bslbf
pos++;
if (field_p->_ltw_flag == 1) {
//ltw_valid_flag 1 bslbf
//ltw_offset 15 uimsbf
pos += 2;
}
if (field_p->_piecewise_rate_flag == 1) {
//reserved 2 bslbf
//piecewise_rate 22 uimsbf
pos += 3;
}
if (field_p->_seamless_splice_flag == 1) {
//splice_type 4 bslbf
//DTS_next_AU[32..30] 3 bslbf
//marker_bit 1 bslbf
//DTS_next_AU[29..15] 15 bslbf
//marker_bit 1 bslbf
//DTS_next_AU[14..0] 15 bslbf
//marker_bit 1 bslbf
pos += 5;
}
}
}
npos += sizeof(field_p->_adaptation_field_length) + field_p->_adaptation_field_length;
pos = npos;//must consider the 'stuffing_byte' in adaptation field
}
if(ts_header_info._adaptation_field_control == 1
|| ts_header_info._adaptation_field_control == 3 ) {
// data_byte with placeholder
// payload parser
if(ts_header_info._PID == 0x00){
// PAT // program association table
if(ts_header_info._payload_unit_start_indicator) {
pos++;
}
_pat._table_id = data_p[pos];
pos++;
_pat._section_syntax_indicator = (data_p[pos]>>7)&0x01;
// skip 3 bits of 1 zero and 2 reserved
_pat._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF;
pos += 2;
_pat._transport_stream_id = (data_p[pos]<<8)|data_p[pos+1];
pos += 2;
// reserved 2 bits
_pat._version_number = (data_p[pos]&0x3E)>>1;
_pat._current_next_indicator = data_p[pos]&0x01;
pos++;
_pat._section_number = data_p[pos];
pos++;
_pat._last_section_number = data_p[pos];
if (_pat._table_id != 0x00) {
srt_log_error("pat table id(0x%02x) error, it must be 0x00", _pat._table_id);
return -1;
}
// PAT = section_length + 3
if((188 - npos) <= (_pat._section_length + 3)) {
srt_log_error("pat _section_length(%d) error, the left len:%d", _pat._section_length, (188 - npos));
return -1;
}
pos++;
_pat._pid_vec.clear();
for (;pos+4 <= _pat._section_length-5-4+9 + npos;) { // 4:CRC, 5:follow section_length item rpos + 4(following unit length) section_length + 9(above field and unit_start_first_byte )
PID_INFO pid_info;
//program_number 16 uimsbf
pid_info._program_number = data_p[pos]<<8|data_p[pos+1];
pos += 2;
// reserved 3 bslbf
if (pid_info._program_number == 0) {
// // network_PID 13 uimsbf
pid_info._network_id = (data_p[pos]<<8|data_p[pos+1])&0x1FFF;
pos += 2;
}
else {
// // program_map_PID 13 uimsbf
pid_info._pid = (data_p[pos]<<8|data_p[pos+1])&0x1FFF;
pos += 2;
}
_pat._pid_vec.push_back(pid_info);
// network_PID and program_map_PID save to list
}
// CRC_32 use pat to calc crc32, eq
pos += 4;
}else if(ts_header_info._PID == 0x01){
// CAT // conditional access table
}else if(ts_header_info._PID == 0x02){
//TSDT // transport stream description table
}else if(ts_header_info._PID == 0x03){
//IPMP // IPMP control information table
// 0x0004-0x000F Reserved
// 0x0010-0x1FFE May be assigned as network_PID, Program_map_PID, elementary_PID, or for other purposes
}else if(ts_header_info._PID == 0x11){
// SDT // https://en.wikipedia.org/wiki/Service_Description_Table / https://en.wikipedia.org/wiki/MPEG_transport_stream
}else if(is_pmt(ts_header_info._PID)) {
if(ts_header_info._payload_unit_start_indicator)
pos++;
_pmt._table_id = data_p[pos];
pos++;
_pmt._section_syntax_indicator = (data_p[pos]>>7)&0x01;
// skip 3 bits of 1 zero and 2 reserved
_pmt._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF;
pos += 2;
_pmt._program_number = (data_p[pos]<<8)|data_p[pos+1];
pos += 2;
// reserved 2 bits
_pmt._version_number = (data_p[pos]&0x3E)>>1;
_pmt._current_next_indicator = data_p[pos]&0x01;
pos++;
_pmt._section_number = data_p[pos];
pos++;
_pmt._last_section_number = data_p[pos];
pos++;
// skip 3 bits for reserved 3 bslbf
_pmt._PCR_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //PCR_PID 13 uimsbf
pos += 2;
//reserved 4 bslbf
_pmt._program_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF;//program_info_length 12 uimsbf
pos += 2;
//0x02, // TS_program_map_section
if (_pmt._table_id != 0x02) {
srt_log_error("pmt tableid(0x%02x) error, it must be 0x02", _pmt._table_id)
return -1;
}
memcpy(_pmt._dscr, data_p+pos, _pmt._program_info_length);
// for (i = 0; i < N; i++) {
// descriptor()
// }
pos += _pmt._program_info_length;
_pmt._stream_pid_vec.clear();
_pmt._pid2steamtype.clear();
for (; pos + 5 <= _pmt._section_length + 4 - 4 + npos; ) { // pos(above field length) i+5(following unit length) section_length +3(PMT begin three bytes)+1(payload_unit_start_indicator) -4(crc32)
STREAM_PID_INFO pid_info;
pid_info._stream_type = data_p[pos];//stream_type 8 uimsbf 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video
pos++;
//reserved 3 bslbf
pid_info._elementary_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //elementary_PID 13 uimsbf
pos += 2;
//reserved 4 bslbf
pid_info._ES_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; //ES_info_length 12 uimsbf
pos += 2;
if( pos + pid_info._ES_info_length > _pmt._section_length + 4 - 4 + npos )
break;
int absES_info_length = pos + pid_info._ES_info_length;
for (; pos< absES_info_length; ) {
//descriptor()
int descriptor_tag = data_p[pos];
(void)descriptor_tag;
pos++;
int descriptor_length = data_p[pos];
pos++;
memcpy(pid_info._dscr, data_p + pos, descriptor_length);
pos += descriptor_length;
}
// save program_number(stream num) elementary_PID(PES PID) stream_type(stream codec)
_pmt._stream_pid_vec.push_back(pid_info);
_pmt._pid2steamtype.insert(std::make_pair((unsigned short)pid_info._elementary_PID, pid_info._stream_type));
}
pos += 4;//CRC_32
}else if(ts_header_info._PID == 0x0042){
// USER
}else if(ts_header_info._PID == 0x1FFF){
// Null packet
}else{//pes packet or pure data packet
//bool isFound = false;
for (size_t i = 0; i < _pmt._stream_pid_vec.size(); i++) {
if(ts_header_info._PID == _pmt._stream_pid_vec[i]._elementary_PID){
//isFound = true;
if(ts_header_info._payload_unit_start_indicator){
unsigned char* ret_data_p = nullptr;
size_t ret_size = 0;
uint64_t dts = 0;
uint64_t pts = 0;
//callback last media data in data buffer
int err_code = on_callback(callback, _last_pid, key_path, _last_dts, _last_pts);
if (err_code != 0)
return err_code;
int ret = pes_parse(data_p+npos, npos, &ret_data_p, ret_size, dts, pts);
if (ret > 188) {
srt_log_error("pes length(%d) error", ret);
return -1;
}
_last_pts = pts;
_last_dts = (dts == 0) ? pts : dts;
if ((ret_data_p != nullptr) && (ret_size > 0)) {
insert_into_databuf(ret_data_p, ret_size, key_path, ts_header_info._PID);
}
}else{
//fwrite(p, 1, 188-(npos+pos), pes_info[i].fd);
insert_into_databuf(data_p + npos, 188-npos, key_path, ts_header_info._PID);
}
}
}
}
}
return 0;
}
int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback)
{
int ret = -1;
std::string path;
if (!data_ptr || (data_ptr->data_len() < 188) || (data_ptr->data_len()%188 != 0))
{
return -1;
}
unsigned int count = data_ptr->data_len()/188;
path = data_ptr->get_path();
for (unsigned int index = 0; index < count; index++)
{
unsigned char* data = data_ptr->get_data() + 188*index;
if (data[0] != 0x47) {
continue;
}
ret = decode_unit(data, path, callback);
if (ret != 0) // srs_error_code is positive
{
break;
}
}
return ret;
}
void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid) {
_last_pid = pid;
_data_total += data_size;
_data_buffer_vec.push_back(std::make_shared<SRT_DATA_MSG>(data_p, data_size, key_path));
return;
}
int ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path,
uint64_t dts, uint64_t pts) {
if ((_data_total <=0 ) || (_data_buffer_vec.empty())) {
return 0;
}
auto iter = _pmt._pid2steamtype.find(pid);
if (iter == _pmt._pid2steamtype.end()) {
return 0;
}
unsigned char stream_type = iter->second;
auto total_data_ptr = std::make_shared<SRT_DATA_MSG>(_data_total, key_path);
size_t pos = 0;
for (size_t index = 0; index < _data_buffer_vec.size(); index++) {
memcpy(total_data_ptr->get_data() + pos,
_data_buffer_vec[index]->get_data(),
_data_buffer_vec[index]->data_len());
pos += _data_buffer_vec[index]->data_len();
}
_data_buffer_vec.clear();
_data_total = 0;
return callback->on_data_callback(total_data_ptr, stream_type, dts, pts);
}
bool ts_demux::is_pmt(unsigned short pid) {
for (size_t index = 0; index < _pat._pid_vec.size(); index++) {
if (_pat._pid_vec[index]._program_number != 0) {
if (_pat._pid_vec[index]._pid == pid) {
return true;
}
}
}
return false;
}
int ts_demux::pes_parse(unsigned char* p, size_t npos,
unsigned char** ret_pp, size_t& ret_size,
uint64_t& dts, uint64_t& pts) {
int pos = 0;
int packet_start_code_prefix = (p[pos]<<16)|(p[pos+1]<<8)|p[pos+2]; //packet_start_code_prefix 24 bslbf
pos += 3;
int stream_id = p[pos]; //stream_id 8 uimsbf
pos++;
int PES_packet_length = ((unsigned int)p[pos]<<8)|p[pos+1]; //PES_packet_length 16 uimsbf
(void)PES_packet_length;
pos += 2;
if (0x00000001 != packet_start_code_prefix) {
srt_log_error("pes packet start code prefix(%06x) error, it must be 0x00 00 01", packet_start_code_prefix);
return 255;
}
if (stream_id != 188//program_stream_map 1011 1100
&& stream_id != 190//padding_stream 1011 1110
&& stream_id != 191//private_stream_2 1011 1111
&& stream_id != 240//ECM 1111 0000
&& stream_id != 241//EMM 1111 0001
&& stream_id != 255//program_stream_directory 1111 1111
&& stream_id != 242//DSMCC_stream 1111 0010
&& stream_id != 248//ITU-T Rec. H.222.1 type E stream 1111 1000
)
{
if (0x80 != (p[pos] & 0xc0)) {
srt_log_error("the first 2 bits:0x%02x error, it must be 0x80.", (p[pos] & 0xc0));
return 255;
}
//skip 2bits//'10' 2 bslbf
int PES_scrambling_control = (p[pos]&30)>>4; //PES_scrambling_control 2 bslbf
(void)PES_scrambling_control;
int PES_priority = (p[pos]&0x08)>>3; //PES_priority 1 bslbf
(void)PES_priority;
int data_alignment_indicator = (p[pos]&0x04)>>2;//data_alignment_indicator 1 bslbf
(void)data_alignment_indicator;
int copyright = (p[pos]&0x02)>>1; //copyright 1 bslbf
(void)copyright;
int original_or_copy = (p[pos]&0x01);//original_or_copy 1 bslbf
(void)original_or_copy;
pos++;
int PTS_DTS_flags = (p[pos]&0xC0)>>6; //PTS_DTS_flags 2 bslbf
int ESCR_flag = (p[pos]&0x20)>>5; // ESCR_flag 1 bslbf
int ES_rate_flag = (p[pos]&0x10)>>4;//ES_rate_flag 1 bslbf
int DSM_trick_mode_flag = (p[pos]&0x08)>>3;//DSM_trick_mode_flag 1 bslbf
int additional_copy_info_flag = (p[pos]&0x04)>>2; //additional_copy_info_flag 1 bslbf
int PES_CRC_flag = (p[pos]&0x02)>>1; //PES_CRC_flag 1 bslbf
int PES_extension_flag = (p[pos]&0x01);//PES_extension_flag 1 bslbf
pos++;
int PES_header_data_length = p[pos]; //PES_header_data_length 8 uimsbf
(void)PES_header_data_length;
pos++;
if (PTS_DTS_flags == 2) {
// skip 4 bits '0010' 4 bslbf
// PTS [32..30] 3 bslbf
// marker_bit 1 bslbf
// PTS [29..15] 15 bslbf
// marker_bit 1 bslbf
// PTS [14..0] 15 bslbf
// marker_bit 1 bslbf
pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F);
pos += 5;
}
if (PTS_DTS_flags == 3) {
// '0011' 4 bslbf
// PTS [32..30] 3 bslbf
// marker_bit 1 bslbf
//PTS [29..15] 15 bslbf
//marker_bit 1 bslbf
// PTS [14..0] 15 bslbf
// marker_bit 1 bslbf
pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F);
pos += 5;
// '0001' 4 bslbf
// DTS [32..30] 3 bslbf
// marker_bit 1 bslbf
// DTS [29..15] 15 bslbf
// marker_bit 1 bslbf
// DTS [14..0] 15 bslbf
// marker_bit 1 bslbf
dts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F);
pos += 5;
}
if (ESCR_flag == 1) {
// reserved 2 bslbf
// ESCR_base[32..30] 3 bslbf
// marker_bit 1 bslbf
// ESCR_base[29..15] 15 bslbf
// marker_bit 1 bslbf
// ESCR_base[14..0] 15 bslbf
// marker_bit 1 bslbf
// ESCR_extension 9 uimsbf
// marker_bit 1 bslbf
uint64_t ESCR_base = ((((uint64_t)p[pos] >> 3) & 0x07) << 30) | (((uint64_t)p[pos] & 0x03) << 28) | ((uint64_t)p[pos + 1] << 20) | ((((uint64_t)p[pos + 2] >> 3) & 0x1F) << 15) | (((uint64_t)p[pos + 2] & 0x3) << 13) | ((uint64_t)p[pos + 3] << 5) | ((p[pos + 4] >> 3) & 0x1F);
int ESCR_extension = ((p[pos + 4] & 0x03) << 7) | ((p[pos + 5] >> 1) & 0x7F);
(void)ESCR_base;
(void)ESCR_extension;
pos += 6;
}
if (ES_rate_flag == 1) {
// marker_bit 1 bslbf
// ES_rate 22 uimsbf
// marker_bit 1 bslbf
int ES_rate = (p[pos]&0x7F)<<15 | (p[pos+1])<<7 | (p[pos+2]&0x7F)>>1;
(void)ES_rate;
pos += 3;
}
if (DSM_trick_mode_flag == 1) { // ignore
int trick_mode_control = (p[pos]&0xE0)>>5;//trick_mode_control 3 uimsbf
if ( trick_mode_control == 0/*fast_forward*/ ) {
// field_id 2 bslbf
// intra_slice_refresh 1 bslbf
// frequency_truncation 2 bslbf
}
else if ( trick_mode_control == 1/*slow_motion*/ ) {
//rep_cntrl 5 uimsbf
}
else if ( trick_mode_control == 2/*freeze_frame*/ ) {
// field_id 2 uimsbf
// reserved 3 bslbf
}
else if ( trick_mode_control == 3/*fast_reverse*/ ) {
// field_id 2 bslbf
// intra_slice_refresh 1 bslbf
// frequency_truncation 2 bslbf
}else if ( trick_mode_control == 4/*slow_reverse*/ ) {
// rep_cntrl 5 uimsbf
}
else{
//reserved 5 bslbf
}
pos++;
}
if ( additional_copy_info_flag == 1) { // ignore
// marker_bit 1 bslbf
// additional_copy_info 7 bslbf
pos++;
}
if ( PES_CRC_flag == 1) { // ignore
// previous_PES_packet_CRC 16 bslbf
pos += 2;
}
if ( PES_extension_flag == 1) { // ignore
int PES_private_data_flag = (p[pos]&0x80)>>7;// PES_private_data_flag 1 bslbf
int pack_header_field_flag = (p[pos]&0x40)>>6;// pack_header_field_flag 1 bslbf
int program_packet_sequence_counter_flag = (p[pos]&0x20)>>5;// program_packet_sequence_counter_flag 1 bslbf
int P_STD_buffer_flag = (p[pos]&0x10)>>4; // P-STD_buffer_flag 1 bslbf
// reserved 3 bslbf
int PES_extension_flag_2 = (p[pos]&0x01);// PES_extension_flag_2 1 bslbf
pos++;
if ( PES_private_data_flag == 1) {
// PES_private_data 128 bslbf
pos += 16;
}
if (pack_header_field_flag == 1) {
// pack_field_length 8 uimsbf
// pack_header()
}
if (program_packet_sequence_counter_flag == 1) {
// marker_bit 1 bslbf
// program_packet_sequence_counter 7 uimsbf
// marker_bit 1 bslbf
// MPEG1_MPEG2_identifier 1 bslbf
// original_stuff_length 6 uimsbf
pos += 2;
}
if ( P_STD_buffer_flag == 1) {
// '01' 2 bslbf
// P-STD_buffer_scale 1 bslbf
// P-STD_buffer_size 13 uimsbf
pos += 2;
}
if ( PES_extension_flag_2 == 1) {
// marker_bit 1 bslbf
int PES_extension_field_length = (p[pos]&0x7F);// PES_extension_field_length 7 uimsbf
pos++;
for (int i = 0; i < PES_extension_field_length; i++) {
// reserved 8 bslbf
pos++;
}
}
}
// for (int i = 0; i < N1; i++) {
//stuffing_byte 8 bslbf
// rpos++;
// }
// for (int i = 0; i < N2; i++) {
//PES_packet_data_byte 8 bslbf
// rpos++;
// }
*ret_pp = p+pos;
ret_size = 188-(npos+pos);
}
else if ( stream_id == 188//program_stream_map 1011 1100 BC
|| stream_id == 191//private_stream_2 1011 1111 BF
|| stream_id == 240//ECM 1111 0000 F0
|| stream_id == 241//EMM 1111 0001 F1
|| stream_id == 255//program_stream_directory 1111 1111 FF
|| stream_id == 242//DSMCC_stream 1111 0010 F2
|| stream_id == 248//ITU-T Rec. H.222.1 type E stream 1111 1000 F8
) {
// for (i = 0; i < PES_packet_length; i++) {
//PES_packet_data_byte 8 bslbf
// rpos++;
// }
*ret_pp = p+pos;
ret_size = 188-(npos+pos);
//fwrite(p, 1, 188-(npos+rpos), fd);
}
else if ( stream_id == 190//padding_stream 1011 1110
) {
// for (i = 0; i < PES_packet_length; i++) {
// padding_byte 8 bslbf
// rpos++;
*ret_pp = p+pos;
ret_size = 188-(npos+pos);
// }
}
return pos;
}

@ -1,247 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef TS_DEMUX_H
#define TS_DEMUX_H
#include <srs_core.hpp>
#include "srt_data.hpp"
#include <string>
#include <memory>
#include <vector>
#include <unordered_map>
/* mpegts stream type in ts pmt
Value Description
0x00 ITU-T | ISO/IEC Reserved
0x01 ISO/IEC 11172-2 Video (mpeg video v1)
0x02 ITU-T Rec. H.262 | ISO/IEC 13818-2 Video(mpeg video v2)or ISO/IEC 11172-2 constrained parameter video stream
0x03 ISO/IEC 11172-3 Audio (MPEG 1 Audio codec Layer I, Layer II and Layer III audio specifications)
0x04 ISO/IEC 13818-3 Audio (BC Audio Codec)
0x05 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 private_sections
0x06 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 PES packets containing private data
0x07 ISO/IEC 13522 MHEG
0x08 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Annex A DSM-CC
0x09 ITU-T Rec. H.222.1
0x0A ISO/IEC 13818-6 type A
0x0B ISO/IEC 13818-6 type B
0x0C ISO/IEC 13818-6 type C
0x0D ISO/IEC 13818-6 type D
0x0E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 auxiliary
0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax
0x10 ISO/IEC 14496-2 Visual
0x11 ISO/IEC 14496-3 Audio with the LATM transport syntax as defined in ISO/IEC 14496-3/Amd.1
0x12 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in PES packets
0x13 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in ISO/IEC 14496_sections
0x14 ISO/IEC 13818-6 Synchronized Download Protocol
0x15 Metadata carried in PES packets
0x16 Metadata carried in metadata_sections
0x17 Metadata carried in ISO/IEC 13818-6 Data Carousel
0x18 Metadata carried in ISO/IEC 13818-6 Object Carousel
0x19 Metadata carried in ISO/IEC 13818-6 Synchronized Download Protocol
0x1A IPMP stream (defined in ISO/IEC 13818-11, MPEG-2 IPMP)
0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video (h.264)
0x1C ISO/IEC 14496-3 Audio, without using any additional transport syntax, such as DST, ALS and SLS
0x1D ISO/IEC 14496-17 Text
0x1E Auxiliary video stream as defined in ISO/IEC 23002-3 (AVS)
0x1F-0x7E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Reserved
0x7F IPMP stream 0x80-0xFF User Private
*/
#define STREAM_TYPE_VIDEO_MPEG1 0x01
#define STREAM_TYPE_VIDEO_MPEG2 0x02
#define STREAM_TYPE_AUDIO_MPEG1 0x03
#define STREAM_TYPE_AUDIO_MPEG2 0x04
#define STREAM_TYPE_PRIVATE_SECTION 0x05
#define STREAM_TYPE_PRIVATE_DATA 0x06
#define STREAM_TYPE_AUDIO_AAC 0x0f
#define STREAM_TYPE_AUDIO_AAC_LATM 0x11
#define STREAM_TYPE_VIDEO_MPEG4 0x10
#define STREAM_TYPE_METADATA 0x15
#define STREAM_TYPE_VIDEO_H264 0x1b
#define STREAM_TYPE_VIDEO_HEVC 0x24
#define STREAM_TYPE_VIDEO_CAVS 0x42
#define STREAM_TYPE_VIDEO_VC1 0xea
#define STREAM_TYPE_VIDEO_DIRAC 0xd1
#define STREAM_TYPE_AUDIO_AC3 0x81
#define STREAM_TYPE_AUDIO_DTS 0x82
#define STREAM_TYPE_AUDIO_TRUEHD 0x83
#define STREAM_TYPE_AUDIO_EAC3 0x87
class ts_media_data_callback_I {
public:
virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0;
};
typedef std::shared_ptr<ts_media_data_callback_I> TS_DATA_CALLBACK_PTR;
class adaptation_field {
public:
adaptation_field(){};
~adaptation_field(){};
public:
unsigned char _adaptation_field_length;
unsigned char _discontinuity_indicator:1;
unsigned char _random_access_indicator:1;
unsigned char _elementary_stream_priority_indicator:1;
unsigned char _PCR_flag:1;
unsigned char _OPCR_flag:1;
unsigned char _splicing_point_flag:1;
unsigned char _transport_private_data_flag:1;
unsigned char _adaptation_field_extension_flag:1;
//if(PCR_flag == '1')
unsigned long _program_clock_reference_base;//33 bits
unsigned short _program_clock_reference_extension;//9bits
//if (OPCR_flag == '1')
unsigned long _original_program_clock_reference_base;//33 bits
unsigned short _original_program_clock_reference_extension;//9bits
//if (splicing_point_flag == '1')
unsigned char _splice_countdown;
//if (transport_private_data_flag == '1')
unsigned char _transport_private_data_length;
unsigned char _private_data_byte[256];
//if (adaptation_field_extension_flag == '1')
unsigned char _adaptation_field_extension_length;
unsigned char _ltw_flag;
unsigned char _piecewise_rate_flag;
unsigned char _seamless_splice_flag;
unsigned char _reserved0;
//if (ltw_flag == '1')
unsigned short _ltw_valid_flag:1;
unsigned short _ltw_offset:15;
//if (piecewise_rate_flag == '1')
unsigned int _piecewise_rate;//22bits
//if (seamless_splice_flag == '1')
unsigned char _splice_type;//4bits
unsigned char _DTS_next_AU1;//3bits
unsigned char _marker_bit1;//1bit
unsigned short _DTS_next_AU2;//15bit
unsigned char _marker_bit2;//1bit
unsigned short _DTS_next_AU3;//15bit
unsigned char _marker_bit3;//1bit
};
class ts_header {
public:
ts_header(){}
~ts_header(){}
public:
unsigned char _sync_byte;
unsigned short _transport_error_indicator:1;
unsigned short _payload_unit_start_indicator:1;
unsigned short _transport_priority:1;
unsigned short _PID:13;
unsigned char _transport_scrambling_control:2;
unsigned char _adaptation_field_control:2;
unsigned char _continuity_counter:4;
adaptation_field _adaptation_field_info;
};
typedef struct {
unsigned short _program_number;
unsigned short _pid;
unsigned short _network_id;
} PID_INFO;
class pat_info {
public:
pat_info(){};
~pat_info(){};
public:
unsigned char _table_id;
unsigned short _section_syntax_indicator:1;
unsigned short _reserved0:1;
unsigned short _reserved1:2;
unsigned short _section_length:12;
unsigned short _transport_stream_id;
unsigned char _reserved3:2;
unsigned char _version_number:5;
unsigned char _current_next_indicator:1;
unsigned char _section_number;
unsigned char _last_section_number;
std::vector<PID_INFO> _pid_vec;
};
typedef struct {
unsigned char _stream_type;
unsigned short _reserved1:3;
unsigned short _elementary_PID:13;
unsigned short _reserved:4;
unsigned short _ES_info_length;
unsigned char _dscr[4096];
unsigned int _crc_32;
} STREAM_PID_INFO;
class pmt_info {
public:
pmt_info(){};
~pmt_info(){};
public:
unsigned char _table_id;
unsigned short _section_syntax_indicator:1;
unsigned short _reserved1:1;
unsigned short _reserved2:2;
unsigned short _section_length:12;
unsigned short _program_number:16;
unsigned char _reserved:2;
unsigned char _version_number:5;
unsigned char _current_next_indicator:5;
unsigned char _section_number;
unsigned char _last_section_number;
unsigned short _reserved3:3;
unsigned short _PCR_PID:13;
unsigned short _reserved4:4;
unsigned short _program_info_length:12;
unsigned char _dscr[4096];
std::unordered_map<unsigned short, unsigned char> _pid2steamtype;
std::vector<STREAM_PID_INFO> _stream_pid_vec;
};
class ts_demux {
public:
ts_demux();
~ts_demux();
int decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback);
private:
int decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback);
bool is_pmt(unsigned short pmt_id);
int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size,
uint64_t& dts, uint64_t& pts);
void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid);
int on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid,
std::string key_path, uint64_t dts, uint64_t pts);
private:
std::string _key_path;//only for srt
pat_info _pat;
pmt_info _pmt;
std::vector<SRT_DATA_MSG_PTR> _data_buffer_vec;
size_t _data_total;
unsigned short _last_pid;
uint64_t _last_dts;
uint64_t _last_pts;
};
typedef std::shared_ptr<ts_demux> TS_DEMUX_PTR;
#endif

@ -1,61 +0,0 @@
//
// Copyright (c) 2013-2021 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#include "ts_demux.hpp"
#include <string>
#include <memory>
#define TS_MAX 188
class media_data_get : public ts_media_data_callback_I {
public:
media_data_get() {};
virtual ~media_data_get() {};
public:
virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type
, uint64_t dts, uint64_t pts) {
printf("media type:%d, data len:%d, key_path:%s, dts:%lu(%lu), pts:%lu(%lu)\r\n",
media_type, data_ptr->data_len(), data_ptr->get_path().c_str(), dts, dts/90, pts, pts/90);
FILE* file_p;
char filename[80];
sprintf(filename, "%u.media", media_type);
file_p = fopen(filename, "ab+");
if (file_p) {
fwrite(data_ptr->get_data(), data_ptr->data_len(), 1, file_p);
fclose(file_p);
}
return;
}
};
int main(int argn, char** argv) {
unsigned char data[TS_MAX];
ts_demux demux_obj;
auto callback_ptr = std::make_shared<media_data_get>();
FILE* file_p;
if (argn < 2) {
printf("please input ts name.\r\n");
return 0;
}
const char* file_name = argv[1];
printf("input ts name:%s.\r\n", file_name);
file_p = fopen(file_name, "r");
fseek(file_p, 0L, SEEK_END); /* 定位到文件末尾 */
size_t flen = ftell(file_p); /* 得到文件大小 */
fseek(file_p, 0L, SEEK_SET); /* 定位到文件开头 */
do {
fread(data, TS_MAX, 1, file_p);
auto input_ptr = std::make_shared<SRT_DATA_MSG>((unsigned char*)data, (unsigned int)TS_MAX, std::string("live/shiwei"));
demux_obj.decode(input_ptr, callback_ptr);
flen -= TS_MAX;
} while(flen > 0);
return 1;
}
Loading…
Cancel
Save