SRT: Use thread-safe log for multiple-threading SRT module. (#2474) v4.0.143

* solve srt push bugs

* solve h264 mutiple nalus in srt when obs is configured in zerolatency

* optimize error code

* optimize error code

* optimize error code

* add commemnt:we only skip pps/sps frame and send left nalus in srt

* add commemnt:we only skip pps/sps frame and send left nalus in srt

* optimize srt log system

* update conf

* update srt hpp

Co-authored-by: shiwei <shiwei05@kuaishou.com>
pull/2486/head
Alex.CR 4 years ago committed by winlin
parent 486b838397
commit af05f89925

@ -6,6 +6,7 @@ The changelog for SRS.
## SRS 4.0 Changelog
* v4.0, 2021-07-21, Merge [#2474](https://github.com/ossrs/srs/pull/2474), SRT: Use thread-safe log for multiple-threading SRT module. 4.0.143
* v4.0, 2021-07-17, Fix bugs and enhance code. 4.0.142
* v4.0, 2021-07-16, Support [CLion and cmake](https://github.com/ossrs/srs/wiki/v4_CN_IDE#clion) to build and debug SRS. 4.0.141
* v4.0, 2021-07-08, For [#2403](https://github.com/ossrs/srs/issues/2403), fix padding packets for RTMP2RTC. 4.0.140

2
trunk/configure vendored

@ -246,7 +246,7 @@ if [[ $SRS_SRT == YES ]]; then
MODULE_ID="SRT"
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "APP")
ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot} ${LibSRTRoot})
MODULE_FILES=("srt_server" "srt_handle" "srt_conn" "srt_to_rtmp" "ts_demux" "srt_data")
MODULE_FILES=("srt_server" "srt_handle" "srt_conn" "srt_to_rtmp" "ts_demux" "srt_data" "srt_log")
SRT_INCS=(${LibSRTRoot} ${SrsSRTRoot}); MODULE_DIR=${SrsSRTRoot} . auto/modules.sh
SRT_OBJS="${MODULE_OBJS[@]}"
fi

@ -9,6 +9,6 @@
#define VERSION_MAJOR 4
#define VERSION_MINOR 0
#define VERSION_REVISION 142
#define VERSION_REVISION 143
#endif

@ -474,6 +474,7 @@ SrsRtcpRR::SrsRtcpRR(uint32_t sender_ssrc)
header_.version = kRtcpVersion;
header_.length = 7;
ssrc_ = sender_ssrc;
// TODO: FIXME: Fix warning.
memset(&rb_, 0, sizeof(SrsRtcpRB));
}

@ -7,6 +7,7 @@
#include "srt_conn.hpp"
#include "time_help.h"
#include "stringex.hpp"
#include "srt_log.hpp"
#include <vector>
#include <srs_app_config.hpp>
@ -134,7 +135,7 @@ srt_conn::srt_conn(SRTSOCKET conn_fd, const std::string& streamid):_conn_fd(conn
} else {
_vhost = "__default_host__";
}
srs_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s",
srt_log_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s",
streamid.c_str(), _mode, _url_subpath.c_str(), _vhost.c_str());
}
@ -182,7 +183,7 @@ int srt_conn::read(unsigned char* data, int len) {
ret = srt_recv(_conn_fd, (char*)data, len);
if (ret <= 0) {
srs_error("srt read error:%d, socket fd:%d", ret, _conn_fd);
srt_log_error("srt read error:%d, socket fd:%d", ret, _conn_fd);
return ret;
}
return ret;
@ -193,7 +194,7 @@ int srt_conn::write(unsigned char* data, int len) {
ret = srt_send(_conn_fd, (char*)data, len);
if (ret <= 0) {
srs_error("srt write error:%d, socket fd:%d", ret, _conn_fd);
srt_log_error("srt write error:%d, socket fd:%d", ret, _conn_fd);
return ret;
}
return ret;

@ -29,6 +29,13 @@ SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::s
memcpy(_data_p, data_p, len);
}
SRT_DATA_MSG::SRT_DATA_MSG(LOGGER_LEVEL log_level, const std::string& log_content): _msg_type(SRT_MSG_LOG_TYPE)
,_log_content(log_content)
,_log_level(log_level)
{
}
SRT_DATA_MSG::~SRT_DATA_MSG() {
if (_data_p && (_len > 0)) {
delete[] _data_p;
@ -39,6 +46,10 @@ unsigned int SRT_DATA_MSG::msg_type() {
return _msg_type;
}
void SRT_DATA_MSG::set_msg_type(unsigned int msg_type) {
_msg_type = msg_type;
}
std::string SRT_DATA_MSG::get_path() {
return _key_path;
}
@ -50,3 +61,11 @@ unsigned int SRT_DATA_MSG::data_len() {
unsigned char* SRT_DATA_MSG::get_data() {
return _data_p;
}
LOGGER_LEVEL SRT_DATA_MSG::get_log_level() {
return _log_level;
}
const char* SRT_DATA_MSG::get_log_string() {
return _log_content.c_str();
}

@ -7,6 +7,7 @@
#ifndef SRT_DATA_H
#define SRT_DATA_H
#include "srt_log.hpp"
#include <srs_core.hpp>
#include <string>
@ -14,24 +15,32 @@
#define SRT_MSG_DATA_TYPE 0x01
#define SRT_MSG_CLOSE_TYPE 0x02
#define SRT_MSG_LOG_TYPE 0x03
class SRT_DATA_MSG {
public:
SRT_DATA_MSG(const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(LOGGER_LEVEL log_level, const std::string& log_content);
~SRT_DATA_MSG();
unsigned int msg_type();
unsigned int data_len();
unsigned char* get_data();
std::string get_path();
LOGGER_LEVEL get_log_level();
const char* get_log_string();
void set_msg_type(unsigned int msg_type);
private:
unsigned int _msg_type;
unsigned int _len;
unsigned char* _data_p;
unsigned int _len = 0;
unsigned char* _data_p = nullptr;
std::string _key_path;
std::string _log_content;
LOGGER_LEVEL _log_level = SRT_LOGGER_TRACE_LEVEL;
};
typedef std::shared_ptr<SRT_DATA_MSG> SRT_DATA_MSG_PTR;

@ -6,6 +6,13 @@
#include "srt_handle.hpp"
#include "time_help.h"
#include "srt_log.hpp"
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
#include <srt/udt.h>
#include <stdio.h>
#include <vector>
@ -14,11 +21,6 @@
#include <assert.h>
#include <list>
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
static bool MONITOR_STATICS_ENABLE = false;
static long long MONITOR_TIMEOUT = 5000;
const unsigned int DEF_DATA_SIZE = 188*7;
@ -66,7 +68,7 @@ void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid)
output << "LINK RTT: " << std::setw(9) << mon.msRTT << "ms BANDWIDTH: " << std::setw(7) << mon.mbpsBandwidth << "Mb/s " << std::endl;
output << "BUFFERLEFT: SND: " << std::setw(11) << mon.byteAvailSndBuf << " RCV: " << std::setw(11) << mon.byteAvailRcvBuf << std::endl;
srs_trace("\r\n%s", output.str().c_str());
srt_log_trace("\r\n%s", output.str().c_str());
return;
}
@ -79,10 +81,10 @@ void srt_handle::add_new_puller(SRT_CONN_PTR conn_ptr, std::string stream_id) {
srtsocket_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
_streamid_map.insert(std::make_pair(stream_id, srtsocket_map));
srs_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str());
srt_log_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str());
} else {
iter->second.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
srs_trace("add new puller fd:%d, streamid:%s, size:%d",
srt_log_trace("add new puller fd:%d, streamid:%s, size:%d",
conn_ptr->get_conn(), stream_id.c_str(), iter->second.size());
}
@ -90,7 +92,7 @@ void srt_handle::add_new_puller(SRT_CONN_PTR conn_ptr, std::string stream_id) {
}
void srt_handle::close_pull_conn(SRTSOCKET srtsocket, std::string stream_id) {
srs_warn("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str());
srt_log_trace("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str());
srt_epoll_remove_usock(_handle_pollid, srtsocket);
auto streamid_iter = _streamid_map.find(stream_id);
@ -140,36 +142,36 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) {
int opt64_len = sizeof(int64_t);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len);
srs_trace("srto SRTO_LATENCY=%d", val_i);
srt_log_trace("srto SRTO_LATENCY=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_PEERLATENCY, &val_i, &opt_len);
srs_trace("srto SRTO_PEERLATENCY=%d", val_i);
srt_log_trace("srto SRTO_PEERLATENCY=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVLATENCY, &val_i, &opt_len);
srs_trace("srto SRTO_RCVLATENCY=%d", val_i);
srt_log_trace("srto SRTO_RCVLATENCY=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len);
srs_trace("srto SRTO_SNDBUF=%d", val_i);
srt_log_trace("srto SRTO_SNDBUF=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len);
srs_trace("srto SRTO_RCVBUF=%d", val_i);
srt_log_trace("srto SRTO_RCVBUF=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i64, &opt64_len);
srs_trace("srto SRTO_MAXBW=%d", val_i64);
srs_trace("srt mix_correct is %s.", _srs_config->get_srt_mix_correct() ? "enable" : "disable");
srs_trace("srt h264 sei filter is %s.", _srs_config->get_srt_sei_filter() ? "enable" : "disable");
srt_log_trace("srto SRTO_MAXBW=%d", val_i64);
srt_log_trace("srt mix_correct is %s.", _srs_config->get_srt_mix_correct() ? "enable" : "disable");
srt_log_trace("srt h264 sei filter is %s.", _srs_config->get_srt_sei_filter() ? "enable" : "disable");
if (conn_ptr->get_mode() == PULL_SRT_MODE) {
add_new_puller(conn_ptr, conn_ptr->get_subpath());
} else {
if(add_new_pusher(conn_ptr) == false) {
srs_trace("push connection is repeated and rejected, fd:%d, streamid:%s",
srt_log_trace("push connection is repeated and rejected, fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_streamid().c_str());
conn_ptr->close();
return;
}
}
srs_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events);
srt_log_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events);
int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);
if (ret < 0) {
srs_error("srt handle run add epoll error:%d", ret);
srt_log_error("srt handle run add epoll error:%d", ret);
return;
}
@ -183,19 +185,19 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
srt_conn_ptr = get_srt_conn(conn_fd);
if (!srt_conn_ptr) {
srs_error("handle_push_data fd:%d fail to find srt connection.", conn_fd);
srt_log_error("handle_push_data fd:%d fail to find srt connection.", conn_fd);
return;
}
if (status != SRTS_CONNECTED) {
srs_error("handle_push_data error status:%d fd:%d", status, conn_fd);
srt_log_error("handle_push_data error status:%d fd:%d", status, conn_fd);
close_push_conn(conn_fd);
return;
}
ret = srt_conn_ptr->read(data, DEF_DATA_SIZE);
if (ret <= 0) {
srs_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd);
srt_log_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd);
close_push_conn(conn_fd);
return;
}
@ -208,10 +210,10 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
auto streamid_iter = _streamid_map.find(subpath);
if (streamid_iter == _streamid_map.end()) {//no puler
srs_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd);
srt_log_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd);
return;
}
srs_info("receive data size(%d) from pusher(%d) to pullers, count:%d",
srt_log_info("receive data size(%d) from pusher(%d) to pullers, count:%d",
ret, conn_fd, streamid_iter->second.size());
for (auto puller_iter = streamid_iter->second.begin();
@ -219,11 +221,11 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
puller_iter++) {
auto player_conn = puller_iter->second;
if (!player_conn) {
srs_error("handle_push_data get srt connect error from fd:%d", puller_iter->first);
srt_log_error("handle_push_data get srt connect error from fd:%d", puller_iter->first);
continue;
}
int write_ret = player_conn->write(data, ret);
srs_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first);
srt_log_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first);
if (write_ret > 0) {
puller_iter->second->update_timestamp(srt_now_ms);
}
@ -261,15 +263,15 @@ void srt_handle::check_alive() {
{
SRT_CONN_PTR conn_ptr = *del_iter;
if (conn_ptr->get_mode() == PUSH_SRT_MODE) {
srs_warn("check alive close pull connection fd:%d, streamid:%s",
srt_log_warn("check alive close pull connection fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_subpath().c_str());
close_push_conn(conn_ptr->get_conn());
} else if (conn_ptr->get_mode() == PULL_SRT_MODE) {
srs_warn("check alive close pull connection fd:%d, streamid:%s",
srt_log_warn("check alive close pull connection fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_subpath().c_str());
close_pull_conn(conn_ptr->get_conn(), conn_ptr->get_subpath());
} else {
srs_error("check_alive get unkown srt mode:%d, fd:%d",
srt_log_error("check_alive get unkown srt mode:%d, fd:%d",
conn_ptr->get_mode(), conn_ptr->get_conn());
assert(0);
}
@ -302,17 +304,17 @@ bool srt_handle::add_new_pusher(SRT_CONN_PTR conn_ptr) {
}
_push_conn_map.insert(std::make_pair(conn_ptr->get_subpath(), conn_ptr));
_conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
srs_trace("srt_handle add new pusher streamid:%s, subpath:%s",
srt_log_trace("srt_handle add new pusher streamid:%s, subpath:%s",
conn_ptr->get_streamid().c_str(), conn_ptr->get_subpath().c_str());
return true;
}
void srt_handle::handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
srs_info("handle_pull_data status:%d, subpath:%s, fd:%d",
srt_log_info("handle_pull_data status:%d, subpath:%s, fd:%d",
status, subpath.c_str(), conn_fd);
auto conn_ptr = get_srt_conn(conn_fd);
if (!conn_ptr) {
srs_error("handle_pull_data fail to find fd(%d)", conn_fd);
srt_log_error("handle_pull_data fail to find fd(%d)", conn_fd);
assert(0);
return;
}
@ -327,7 +329,7 @@ void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd)
if (!conn_ptr) {
if (status != SRTS_CLOSED) {
srs_error("handle_srt_socket find srt connection error, fd:%d, status:%d",
srt_log_error("handle_srt_socket find srt connection error, fd:%d, status:%d",
conn_fd, status);
}
return;
@ -349,13 +351,13 @@ void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd)
}
case SRTS_BROKEN:
{
srs_warn("srt push disconnected event fd:%d, streamid:%s",
srt_log_warn("srt push disconnected event fd:%d, streamid:%s",
conn_fd, conn_ptr->get_streamid().c_str());
close_push_conn(conn_fd);
break;
}
default:
srs_error("push mode unkown status:%d, fd:%d", status, conn_fd);
srt_log_error("push mode unkown status:%d, fd:%d", status, conn_fd);
break;
}
} else if (mode == PULL_SRT_MODE) {
@ -368,13 +370,13 @@ void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd)
}
case SRTS_BROKEN:
{
srs_warn("srt pull disconnected fd:%d, streamid:%s",
srt_log_warn("srt pull disconnected fd:%d, streamid:%s",
conn_fd, conn_ptr->get_streamid().c_str());
close_pull_conn(conn_fd, subpath);
break;
}
default:
srs_error("pull mode unkown status:%d, fd:%d", status, conn_fd);
srt_log_error("pull mode unkown status:%d, fd:%d", status, conn_fd);
break;
}
} else {

@ -0,0 +1,36 @@
#include "srt_log.hpp"
#include "srt_to_rtmp.hpp"
#include <string>
#include <stdint.h>
#include <stdarg.h>
LOGGER_LEVEL s_log_level = SRT_LOGGER_TRACE_LEVEL;
static char* srt_log_buffer = new char[LOGGER_BUFFER_SIZE];
void snprintbuffer(char* buffer, size_t size, const char* fmt, ...) {
va_list ap;
va_start(ap, fmt);
vsnprintf(buffer, size, fmt, ap);
va_end(ap);
return;
}
void set_srt_log_level(LOGGER_LEVEL level) {
s_log_level = level;
}
LOGGER_LEVEL get_srt_log_level() {
return s_log_level;
}
char* get_srt_log_buffer() {
return srt_log_buffer;
}
void srt_log_output(LOGGER_LEVEL level, const char* buffer) {
std::string log_content(buffer);
srt2rtmp::get_instance()->insert_log_message(level, log_content);
return;
}

@ -0,0 +1,52 @@
#ifndef SRT_LOG_HPP
#define SRT_LOG_HPP
#include <stdint.h>
#include <stddef.h>
#define LOGGER_BUFFER_SIZE (10*1024)
typedef enum {
SRT_LOGGER_INFO_LEVEL,
SRT_LOGGER_TRACE_LEVEL,
SRT_LOGGER_WARN_LEVEL,
SRT_LOGGER_ERROR_LEVEL
} LOGGER_LEVEL;
void set_srt_log_level(LOGGER_LEVEL level);
LOGGER_LEVEL get_srt_log_level();
char* get_srt_log_buffer();
void srt_log_output(LOGGER_LEVEL level, const char* buffer);
void snprintbuffer(char* buffer, size_t size, const char* fmt, ...);
#define srt_log_error(...) \
if (get_srt_log_level() <= SRT_LOGGER_ERROR_LEVEL) \
{ \
char* buffer = get_srt_log_buffer(); \
snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \
srt_log_output(SRT_LOGGER_ERROR_LEVEL, buffer); \
}
#define srt_log_warn(...) \
if (get_srt_log_level() <= SRT_LOGGER_WARN_LEVEL) \
{ \
char* buffer = get_srt_log_buffer(); \
snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \
srt_log_output(SRT_LOGGER_WARN_LEVEL, buffer); \
}
#define srt_log_trace(...) \
if (get_srt_log_level() <= SRT_LOGGER_TRACE_LEVEL) \
{ \
char* buffer = get_srt_log_buffer(); \
snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \
srt_log_output(SRT_LOGGER_TRACE_LEVEL, buffer); \
}
#define srt_log_info(...) \
if (get_srt_log_level() <= SRT_LOGGER_INFO_LEVEL) \
{ \
char* buffer = get_srt_log_buffer(); \
snprintbuffer(buffer, LOGGER_BUFFER_SIZE, __VA_ARGS__); \
srt_log_output(SRT_LOGGER_INFO_LEVEL, buffer); \
}
#endif //SRT_LOG_HPP

@ -6,6 +6,7 @@
#include "srt_server.hpp"
#include "srt_handle.hpp"
#include "srt_log.hpp"
#include <srt/udt.h>
#include <thread>
#include <netdb.h>
@ -20,6 +21,7 @@
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
#include <srs_app_utility.hpp>
srt_server::srt_server(unsigned short port):_listen_port(port)
,_server_socket(-1)
@ -74,17 +76,51 @@ int srt_server::init_srt_parameter() {
srt_setsockopt(_server_socket, 0, SRTO_PEERLATENCY, &recv_latency, opt_len);
}
srs_trace("init srt parameter, maxbw:%d, mss:%d, tlpkdrop:%d, connect timeout:%d, \
srt_log_trace("init srt parameter, maxbw:%d, mss:%d, tlpkdrop:%d, connect timeout:%d, \
send buff:%d, recv buff:%d, payload size:%d, latency:%d, recv latency:%d, peer latency:%d",
maxbw, mss, tlpkdrop, connection_timeout, send_buff, recv_buff, payload_size,
latency, recv_latency, peer_latency);
return 0;
}
void srt_server::init_srt_log() {
SrsLogLevel level = srs_get_log_level(_srs_config->get_log_level());
switch (level) {
case SrsLogLevelInfo:
{
set_srt_log_level(SRT_LOGGER_INFO_LEVEL);
break;
}
case SrsLogLevelTrace:
{
set_srt_log_level(SRT_LOGGER_TRACE_LEVEL);
break;
}
case SrsLogLevelWarn:
{
set_srt_log_level(SRT_LOGGER_WARN_LEVEL);
break;
}
case SrsLogLevelError:
{
set_srt_log_level(SRT_LOGGER_ERROR_LEVEL);
break;
}
default:
{
set_srt_log_level(SRT_LOGGER_TRACE_LEVEL);
}
}
return;
}
int srt_server::init_srt() {
if (_server_socket != -1) {
return -1;
}
init_srt_log();
_server_socket = srt_create_socket();
sockaddr_in sa;
memset(&sa, 0, sizeof sa);
@ -97,7 +133,7 @@ int srt_server::init_srt() {
if ( ret == SRT_ERROR )
{
srt_close(_server_socket);
srs_error("srt bind error: %d", ret);
srt_log_error("srt bind error: %d", ret);
return -1;
}
@ -105,7 +141,7 @@ int srt_server::init_srt() {
if (ret == SRT_ERROR)
{
srt_close(_server_socket);
srs_error("srt listen error: %d", ret);
srt_log_error("srt listen error: %d", ret);
return -2;
}
@ -113,7 +149,7 @@ int srt_server::init_srt() {
_pollid = srt_epoll_create();
if (_pollid < -1) {
srs_error("srt server srt_epoll_create error, port=%d", _listen_port);
srt_log_error("srt server srt_epoll_create error, port=%d", _listen_port);
return -1;
}
_handle_ptr = std::make_shared<srt_handle>(_pollid);
@ -121,11 +157,11 @@ int srt_server::init_srt() {
int events = SRT_EPOLL_IN | SRT_EPOLL_ERR;
ret = srt_epoll_add_usock(_pollid, _server_socket, &events);
if (ret < 0) {
srs_error("srt server run add epoll error:%d", ret);
srt_log_error("srt server run add epoll error:%d", ret);
return ret;
}
srs_trace("srt server listen port=%d, server_fd=%d", _listen_port, _server_socket);
srt_log_trace("srt server listen port=%d, server_fd=%d", _listen_port, _server_socket);
return 0;
}
@ -139,7 +175,7 @@ int srt_server::start()
}
run_flag = true;
srs_trace("srt server is starting... port(%d)", _listen_port);
srt_log_trace("srt server is starting... port(%d)", _listen_port);
thread_run_ptr = std::make_shared<std::thread>(&srt_server::on_work, this);
return 0;
}
@ -171,18 +207,18 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd
//add new srt connect into srt handle
std::string streamid = UDT::getstreamid(conn_fd);
if (!is_streamid_valid(streamid)) {
srs_trace("srt streamid(%s) error, fd:%d", streamid.c_str(), conn_fd);
srt_log_trace("srt streamid(%s) error, fd:%d", streamid.c_str(), conn_fd);
srt_close(conn_fd);
return;
}
SRT_CONN_PTR srt_conn_ptr = std::make_shared<srt_conn>(conn_fd, streamid);
std::string vhost_str = srt_conn_ptr->get_vhost();
srs_trace("new srt connection streamid:%s, fd:%d, vhost:%s",
srt_log_trace("new srt connection streamid:%s, fd:%d, vhost:%s",
streamid.c_str(), conn_fd, vhost_str.c_str());
SrsConfDirective* vhost_p = _srs_config->get_vhost(vhost_str, true);
if (!vhost_p) {
srs_trace("srt streamid(%s): no vhost %s, fd:%d",
srt_log_trace("srt streamid(%s): no vhost %s, fd:%d",
streamid.c_str(), vhost_str.c_str(), conn_fd);
srt_conn_ptr->close();
return;
@ -193,7 +229,7 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd
} else if (srt_conn_ptr->get_mode() == PUSH_SRT_MODE) {
conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;
} else {
srs_trace("stream mode error, it shoulde be m=push or m=pull, streamid:%s",
srt_log_trace("stream mode error, it shoulde be m=push or m=pull, streamid:%s",
srt_conn_ptr->get_streamid().c_str());
srt_conn_ptr->close();
return;
@ -204,19 +240,19 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd
}
case SRTS_CONNECTED:
{
srs_trace("srt connected: socket=%d, mode:%s", input_fd, dscr.c_str());
srt_log_trace("srt connected: socket=%d, mode:%s", input_fd, dscr.c_str());
break;
}
case SRTS_BROKEN:
{
srt_epoll_remove_usock(_pollid, input_fd);
srt_close(input_fd);
srs_warn("srt close: socket=%d", input_fd);
srt_log_warn("srt close: socket=%d", input_fd);
break;
}
default:
{
srs_error("srt server unkown status:%d", status);
srt_log_error("srt server unkown status:%d", status);
}
}
}
@ -229,7 +265,7 @@ void srt_server::srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, cons
void srt_server::on_work()
{
const unsigned int SRT_FD_MAX = 100;
srs_trace("srt server is working port(%d)", _listen_port);
srt_log_trace("srt server is working port(%d)", _listen_port);
while (run_flag)
{
SRTSOCKET read_fds[SRT_FD_MAX];
@ -294,9 +330,9 @@ srs_error_t SrtServerAdapter::run()
// TODO: FIXME: We could start a coroutine to dispatch SRT task to processes.
if(_srs_config->get_srt_enabled()) {
srs_trace("srt server is enabled...");
srt_log_trace("srt server is enabled...");
unsigned short srt_port = _srs_config->get_srt_listen_port();
srs_trace("srt server listen port:%d", srt_port);
srt_log_trace("srt server listen port:%d", srt_port);
err = srt2rtmp::get_instance()->init();
if (err != srs_success) {
return srs_error_wrap(err, "srt start srt2rtmp error");
@ -307,7 +343,7 @@ srs_error_t SrtServerAdapter::run()
return srs_error_wrap(err, "srt listen %d", srt_port);
}
} else {
srs_trace("srt server is disabled...");
srt_log_trace("srt server is disabled...");
}
if(_srs_config->get_srt_enabled()) {

@ -30,6 +30,7 @@ private:
//init srt socket and srt epoll
int init_srt();
int init_srt_parameter();
void init_srt_log();
//srt main epoll loop
void on_work();

@ -76,6 +76,17 @@ void srt2rtmp::insert_ctrl_message(unsigned int msg_type, const std::string& key
//_notify_cond.notify_one();
return;
}
void srt2rtmp::insert_log_message(LOGGER_LEVEL level, const std::string& log_content) {
std::unique_lock<std::mutex> locker(_mutex);
SRT_DATA_MSG_PTR msg_ptr = std::make_shared<SRT_DATA_MSG>(level, log_content);
msg_ptr->set_msg_type(SRT_MSG_LOG_TYPE);
_msg_queue.push(msg_ptr);
return;
}
SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
std::unique_lock<std::mutex> locker(_mutex);
SRT_DATA_MSG_PTR msg_ptr;
@ -161,6 +172,11 @@ srs_error_t srt2rtmp::cycle() {
handle_close_rtmpsession(msg_ptr->get_path());
break;
}
case SRT_MSG_LOG_TYPE:
{
handle_log_data(msg_ptr);
break;
}
default:
{
srs_error("srt to rtmp get wrong message type(%u), path:%s",
@ -193,6 +209,36 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
return;
}
void srt2rtmp::handle_log_data(SRT_DATA_MSG_PTR data_ptr) {
switch (data_ptr->get_log_level()) {
case SRT_LOGGER_INFO_LEVEL:
{
srs_info(data_ptr->get_log_string());
break;
}
case SRT_LOGGER_TRACE_LEVEL:
{
srs_trace(data_ptr->get_log_string());
break;
}
case SRT_LOGGER_WARN_LEVEL:
{
srs_warn(data_ptr->get_log_string());
break;
}
case SRT_LOGGER_ERROR_LEVEL:
{
srs_error(data_ptr->get_log_string());
break;
}
default:
{
srs_trace(data_ptr->get_log_string());
}
}
return;
}
rtmp_client::rtmp_client(std::string key_path):_key_path(key_path)
, _connect_flag(false) {
const std::string DEF_VHOST = "DEFAULT_VHOST";

@ -24,6 +24,7 @@
#include "srt_data.hpp"
#include "ts_demux.hpp"
#include "srt_log.hpp"
#define SRT_VIDEO_MSG_TYPE 0x01
#define SRT_AUDIO_MSG_TYPE 0x02
@ -134,12 +135,14 @@ public:
void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path);
void insert_ctrl_message(unsigned int msg_type, const std::string& key_path);
void insert_log_message(LOGGER_LEVEL level, const std::string& log_content);
private:
SRT_DATA_MSG_PTR get_data_message();
virtual srs_error_t cycle();
void handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
void handle_close_rtmpsession(const std::string& key_path);
void handle_log_data(SRT_DATA_MSG_PTR data_ptr);
void check_rtmp_alive();
private:

Loading…
Cancel
Save