Copy 4.0.71 at 2021-02-18

min
winlin 4 years ago
parent b6234a9c88
commit 6cdb08cc87

@ -8282,7 +8282,7 @@ bool SrsConfig::get_heartbeat_enabled()
srs_utime_t SrsConfig::get_heartbeat_interval()
{
static srs_utime_t DEFAULT = (srs_utime_t)(9.9 * SRS_UTIME_SECONDS);
static srs_utime_t DEFAULT = (srs_utime_t)(10 * SRS_UTIME_SECONDS);
SrsConfDirective* conf = get_heartbeart();
if (!conf) {
@ -8294,7 +8294,7 @@ srs_utime_t SrsConfig::get_heartbeat_interval()
return DEFAULT;
}
return (srs_utime_t)(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}
string SrsConfig::get_heartbeat_url()

@ -36,6 +36,12 @@ using namespace std;
#include <srs_app_config.hpp>
#include <srs_core_autofree.hpp>
#include <srs_protocol_kbps.hpp>
SrsPps* _srs_pps_ids = new SrsPps(_srs_clock);
SrsPps* _srs_pps_fids = new SrsPps(_srs_clock);
SrsPps* _srs_pps_fids_level0 = new SrsPps(_srs_clock);
ISrsDisposingHandler::ISrsDisposingHandler()
{
}
@ -52,6 +58,9 @@ SrsResourceManager::SrsResourceManager(const std::string& label, bool verbose)
trd = NULL;
p_disposing_ = NULL;
removing_ = false;
nn_level0_cache_ = 100000;
conns_level0_cache_ = new SrsResourceFastIdItem[nn_level0_cache_];
}
SrsResourceManager::~SrsResourceManager()
@ -65,6 +74,8 @@ SrsResourceManager::~SrsResourceManager()
}
clear();
srs_freepa(conns_level0_cache_);
}
srs_error_t SrsResourceManager::start()
@ -114,10 +125,14 @@ srs_error_t SrsResourceManager::cycle()
return err;
}
void SrsResourceManager::add(ISrsResource* conn)
void SrsResourceManager::add(ISrsResource* conn, bool* exists)
{
if (std::find(conns_.begin(), conns_.end(), conn) == conns_.end()) {
conns_.push_back(conn);
} else {
if (exists) {
*exists = false;
}
}
}
@ -127,6 +142,39 @@ void SrsResourceManager::add_with_id(const std::string& id, ISrsResource* conn)
conns_id_[id] = conn;
}
void SrsResourceManager::add_with_fast_id(uint64_t id, ISrsResource* conn)
{
bool exists = false;
add(conn, &exists);
conns_fast_id_[id] = conn;
if (exists) {
return;
}
// For new resource, build the level-0 cache for fast-id.
SrsResourceFastIdItem* item = &conns_level0_cache_[(id | id>>32) % nn_level0_cache_];
// Ignore if exits item.
if (item->fast_id && item->fast_id == id) {
return;
}
// Fresh one, create the item.
if (!item->fast_id) {
item->fast_id = id;
item->impl = conn;
item->nn_collisions = 1;
item->available = true;
}
// Collision, increase the collisions.
if (item->fast_id != id) {
item->nn_collisions++;
item->available = false;
}
}
void SrsResourceManager::add_with_name(const std::string& name, ISrsResource* conn)
{
add(conn);
@ -140,12 +188,27 @@ ISrsResource* SrsResourceManager::at(int index)
ISrsResource* SrsResourceManager::find_by_id(std::string id)
{
++_srs_pps_ids->sugar;
map<string, ISrsResource*>::iterator it = conns_id_.find(id);
return (it != conns_id_.end())? it->second : NULL;
}
ISrsResource* SrsResourceManager::find_by_fast_id(uint64_t id)
{
SrsResourceFastIdItem* item = &conns_level0_cache_[(id | id>>32) % nn_level0_cache_];
if (item->available && item->fast_id == id) {
++_srs_pps_fids_level0->sugar;
return item->impl;
}
++_srs_pps_fids->sugar;
map<uint64_t, ISrsResource*>::iterator it = conns_fast_id_.find(id);
return (it != conns_fast_id_.end())? it->second : NULL;
}
ISrsResource* SrsResourceManager::find_by_name(std::string name)
{
++_srs_pps_ids->sugar;
map<string, ISrsResource*>::iterator it = conns_name_.find(name);
return (it != conns_name_.end())? it->second : NULL;
}
@ -316,6 +379,24 @@ void SrsResourceManager::dispose(ISrsResource* c)
}
}
for (map<uint64_t, ISrsResource*>::iterator it = conns_fast_id_.begin(); it != conns_fast_id_.end();) {
if (c != it->second) {
++it;
} else {
// Update the level-0 cache for fast-id.
uint64_t id = it->first;
SrsResourceFastIdItem* item = &conns_level0_cache_[(id | id>>32) % nn_level0_cache_];
item->nn_collisions--;
if (!item->nn_collisions) {
item->fast_id = 0;
item->available = false;
}
// Use C++98 style: https://stackoverflow.com/a/4636230
conns_fast_id_.erase(it++);
}
}
vector<ISrsResource*>::iterator it = std::find(conns_.begin(), conns_.end(), c);
if (it != conns_.end()) {
conns_.erase(it);

@ -52,6 +52,27 @@ public:
virtual void on_disposing(ISrsResource* c) = 0;
};
// The item to identify the fast id object.
class SrsResourceFastIdItem
{
public:
// If available, use the resource in item.
bool available;
// How many resource have the same fast-id, which contribute a collision.
int nn_collisions;
// The first fast-id of resources.
uint64_t fast_id;
// The first resource object.
ISrsResource* impl;
public:
SrsResourceFastIdItem() {
available = false;
nn_collisions = 0;
fast_id = 0;
impl = NULL;
}
};
// The resource manager remove resource and delete it asynchronously.
class SrsResourceManager : virtual public ISrsCoroutineHandler, virtual public ISrsResourceManager
{
@ -76,6 +97,11 @@ private:
std::vector<ISrsResource*> conns_;
// The connections with resource id.
std::map<std::string, ISrsResource*> conns_id_;
// The connections with resource fast(int) id.
std::map<uint64_t, ISrsResource*> conns_fast_id_;
// The level-0 fast cache for fast id.
int nn_level0_cache_;
SrsResourceFastIdItem* conns_level0_cache_;
// The connections with resource name.
std::map<std::string, ISrsResource*> conns_name_;
public:
@ -89,11 +115,13 @@ public:
public:
virtual srs_error_t cycle();
public:
void add(ISrsResource* conn);
void add(ISrsResource* conn, bool* exists = NULL);
void add_with_id(const std::string& id, ISrsResource* conn);
void add_with_fast_id(uint64_t id, ISrsResource* conn);
void add_with_name(const std::string& name, ISrsResource* conn);
ISrsResource* at(int index);
ISrsResource* find_by_id(std::string id);
ISrsResource* find_by_fast_id(uint64_t id);
ISrsResource* find_by_name(std::string name);
public:
void subscribe(ISrsDisposingHandler* h);

@ -28,6 +28,10 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_protocol_kbps.hpp>
SrsPps* _srs_pps_timer = new SrsPps(_srs_clock);
ISrsHourGlass::ISrsHourGlass()
{
}
@ -36,8 +40,9 @@ ISrsHourGlass::~ISrsHourGlass()
{
}
SrsHourGlass::SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution)
SrsHourGlass::SrsHourGlass(string label, ISrsHourGlass* h, srs_utime_t resolution)
{
label_ = label;
handler = h;
_resolution = resolution;
total_elapse = 0;
@ -60,6 +65,11 @@ srs_error_t SrsHourGlass::start()
return err;
}
void SrsHourGlass::stop()
{
trd->stop();
}
srs_error_t SrsHourGlass::tick(srs_utime_t interval)
{
return tick(0, interval);
@ -94,6 +104,8 @@ srs_error_t SrsHourGlass::cycle()
srs_utime_t interval = it->second;
if (interval == 0 || (total_elapse % interval) == 0) {
++_srs_pps_timer->sugar;
if ((err = handler->notify(event, interval, total_elapse)) != srs_success) {
return srs_error_wrap(err, "notify");
}

@ -29,6 +29,7 @@
#include <srs_app_st.hpp>
#include <map>
#include <string>
class SrsCoroutine;
@ -59,17 +60,18 @@ public:
// It's a complex and high-performance timer.
//
// Usage:
// SrsHourGlass* hg = new SrsHourGlass(handler, 1 * SRS_UTIME_MILLISECONDS);
// SrsHourGlass* hg = new SrsHourGlass("nack", handler, 100 * SRS_UTIME_MILLISECONDS);
//
// hg->tick(1, 3 * SRS_UTIME_MILLISECONDS);
// hg->tick(2, 5 * SRS_UTIME_MILLISECONDS);
// hg->tick(3, 7 * SRS_UTIME_MILLISECONDS);
// hg->tick(1, 300 * SRS_UTIME_MILLISECONDS);
// hg->tick(2, 500 * SRS_UTIME_MILLISECONDS);
// hg->tick(3, 700 * SRS_UTIME_MILLISECONDS);
//
// // The hg will create a thread for timer.
// hg->start();
class SrsHourGlass : virtual public ISrsCoroutineHandler
{
private:
std::string label_;
SrsCoroutine* trd;
ISrsHourGlass* handler;
srs_utime_t _resolution;
@ -81,11 +83,13 @@ private:
// for each cycle, we increase it with a resolution.
srs_utime_t total_elapse;
public:
SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution);
// TODO: FIMXE: Refine to SrsHourGlass(std::string label);
SrsHourGlass(std::string label, ISrsHourGlass* h, srs_utime_t resolution);
virtual ~SrsHourGlass();
public:
// Start the hourglass.
// Start or stop the hourglass.
virtual srs_error_t start();
virtual void stop();
public:
// Add a pair of tick(event, interval).
// @param event the event of tick, default is 0.

@ -27,9 +27,18 @@
#include <srs_app_config.hpp>
#include <srs_kernel_error.hpp>
#include <srs_service_st.hpp>
#include <srs_app_utility.hpp>
using namespace std;
extern SrsPps* _srs_pps_cids_get;
extern SrsPps* _srs_pps_cids_set;
extern SrsPps* _srs_pps_timer;
extern SrsPps* _srs_pps_pub;
extern SrsPps* _srs_pps_conn;
extern SrsPps* _srs_pps_dispose;
ISrsHybridServer::ISrsHybridServer()
{
}
@ -91,8 +100,8 @@ srs_error_t SrsServerAdapter::run()
return srs_error_wrap(err, "ingest");
}
if ((err = srs->cycle()) != srs_success) {
return srs_error_wrap(err, "main cycle");
if ((err = srs->start()) != srs_success) {
return srs_error_wrap(err, "start");
}
return err;
@ -109,6 +118,7 @@ SrsServer* SrsServerAdapter::instance()
SrsHybridServer::SrsHybridServer()
{
timer_ = NULL;
}
SrsHybridServer::~SrsHybridServer()
@ -135,6 +145,10 @@ srs_error_t SrsHybridServer::initialize()
return srs_error_wrap(err, "initialize st failed");
}
if ((err = setup_ticks()) != srs_success) {
return srs_error_wrap(err, "tick");
}
vector<ISrsHybridServer*>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer* server = *it;
@ -151,29 +165,17 @@ srs_error_t SrsHybridServer::run()
{
srs_error_t err = srs_success;
// TODO: FIXME: Identify master server directly.
// Run master server in this main thread.
SrsServerAdapter* master_server = NULL;
vector<ISrsHybridServer*>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer* server = *it;
if (!master_server) {
master_server = dynamic_cast<SrsServerAdapter*>(server);
if (master_server) {
continue;
}
}
if ((err = server->run()) != srs_success) {
return srs_error_wrap(err, "run server");
}
}
if (master_server) {
return master_server->run();
}
// Wait for all server to quit.
srs_thread_exit(NULL);
return err;
}
@ -197,5 +199,41 @@ SrsServerAdapter* SrsHybridServer::srs()
return NULL;
}
srs_error_t SrsHybridServer::setup_ticks()
{
srs_error_t err = srs_success;
timer_ = new SrsHourGlass("hybrid", this, 1 * SRS_UTIME_SECONDS);
if ((err = timer_->tick(1, 5 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}
return err;
}
srs_error_t SrsHybridServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
// Show statistics for RTC server.
SrsProcSelfStat* u = srs_get_self_proc_stat();
// Resident Set Size: number of pages the process has in real memory.
int memory = (int)(u->rss * 4 / 1024);
_srs_pps_timer->update(); _srs_pps_pub->update(); _srs_pps_conn->update();
srs_trace("Hybrid cpu=%.2f%%,%dMB, timer=%d,%d,%d",
u->percent * 100, memory,
_srs_pps_timer->r10s(), _srs_pps_pub->r10s(), _srs_pps_conn->r10s()
);
return err;
}
SrsHybridServer* _srs_hybrid = new SrsHybridServer();

@ -28,6 +28,8 @@
#include <vector>
#include <srs_app_hourglass.hpp>
class SrsServer;
// The hibrid server interfaces, we could register many servers.
@ -62,10 +64,11 @@ public:
};
// The hybrid server manager.
class SrsHybridServer
class SrsHybridServer : public ISrsHourGlass
{
private:
std::vector<ISrsHybridServer*> servers;
SrsHourGlass* timer_;
public:
SrsHybridServer();
virtual ~SrsHybridServer();
@ -77,6 +80,10 @@ public:
virtual void stop();
public:
virtual SrsServerAdapter* srs();
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
};
extern SrsHybridServer* _srs_hybrid;

@ -41,6 +41,13 @@ using namespace std;
#include <srs_app_server.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_protocol_kbps.hpp>
SrsPps* _srs_pps_pkts = new SrsPps(_srs_clock);
SrsPps* _srs_pps_addrs = new SrsPps(_srs_clock);
SrsPps* _srs_pps_fast_addrs = new SrsPps(_srs_clock);
// set the max packet size.
#define SRS_UDP_MAX_PACKET_SIZE 65535
@ -293,17 +300,29 @@ SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd)
fromlen = 0;
peer_port = 0;
fast_id_ = 0;
address_changed_ = false;
cache_buffer_ = new SrsBuffer(buf, nb_buf);
}
SrsUdpMuxSocket::~SrsUdpMuxSocket()
{
srs_freepa(buf);
srs_freep(cache_buffer_);
}
int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout)
{
fromlen = sizeof(from);
nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &fromlen, timeout);
if (nread <= 0) {
return nread;
}
// Reset the fast cache buffer size.
cache_buffer_->set_size(nread);
cache_buffer_->skip(-1 * cache_buffer_->pos());
// Drop UDP health check packet of Aliyun SLB.
// Healthcheck udp check
@ -313,21 +332,18 @@ int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout)
return 0;
}
if (nread > 0) {
// TODO: FIXME: Maybe we should not covert to string for each packet.
char address_string[64];
char port_string[16];
if (getnameinfo((sockaddr*)&from, fromlen,
(char*)&address_string, sizeof(address_string),
(char*)&port_string, sizeof(port_string),
NI_NUMERICHOST|NI_NUMERICSERV)) {
return -1;
}
peer_ip = std::string(address_string);
peer_port = atoi(port_string);
// Parse address from cache.
if (from.ss_family == AF_INET) {
sockaddr_in* addr = (sockaddr_in*)&from;
fast_id_ = uint64_t(addr->sin_port)<<48 | uint64_t(addr->sin_addr.s_addr);
}
// We will regenerate the peer_ip, peer_port and peer_id.
address_changed_ = true;
// Update the stat.
++_srs_pps_pkts->sugar;
return nread;
}
@ -385,10 +401,63 @@ int SrsUdpMuxSocket::get_peer_port() const
std::string SrsUdpMuxSocket::peer_id()
{
char id_buf[1024];
int len = snprintf(id_buf, sizeof(id_buf), "%s:%d", peer_ip.c_str(), peer_port);
if (address_changed_) {
address_changed_ = false;
// Parse address from cache.
bool parsed = false;
if (from.ss_family == AF_INET) {
sockaddr_in* addr = (sockaddr_in*)&from;
// Load from fast cache, previous ip.
std::map<uint32_t, string>::iterator it = cache_.find(addr->sin_addr.s_addr);
if (it == cache_.end()) {
peer_ip = inet_ntoa(addr->sin_addr);
cache_[addr->sin_addr.s_addr] = peer_ip;
} else {
peer_ip = it->second;
}
peer_port = ntohs(addr->sin_port);
parsed = true;
}
if (!parsed) {
// TODO: FIXME: Maybe we should not covert to string for each packet.
char address_string[64];
char port_string[16];
if (getnameinfo((sockaddr*)&from, fromlen,
(char*)&address_string, sizeof(address_string),
(char*)&port_string, sizeof(port_string),
NI_NUMERICHOST|NI_NUMERICSERV)) {
return "";
}
peer_ip = std::string(address_string);
peer_port = atoi(port_string);
}
// Build the peer id.
static char id_buf[128];
int len = snprintf(id_buf, sizeof(id_buf), "%s:%d", peer_ip.c_str(), peer_port);
peer_id_ = string(id_buf, len);
// Update the stat.
++_srs_pps_addrs->sugar;
}
return peer_id_;
}
uint64_t SrsUdpMuxSocket::fast_id()
{
++_srs_pps_fast_addrs->sugar;
return fast_id_;
}
return string(id_buf, len);
SrsBuffer* SrsUdpMuxSocket::buffer()
{
return cache_buffer_;
}
SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly()
@ -405,6 +474,11 @@ SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly()
sendonly->peer_ip = peer_ip;
sendonly->peer_port = peer_port;
// Copy the fast id.
sendonly->peer_id_ = peer_id_;
sendonly->fast_id_ = fast_id_;
sendonly->address_changed_ = address_changed_;
return sendonly;
}
@ -514,6 +588,11 @@ srs_error_t SrsUdpMuxListener::cycle()
SrsAutoFree(SrsErrorPithyPrint, pp_pkt_handler_err);
set_socket_buffer();
// Because we have to decrypt the cipher of received packet payload,
// and the size is not determined, so we think there is at least one copy,
// and we can reuse the plaintext h264/opus with players when got plaintext.
SrsUdpMuxSocket skt(lfd);
while (true) {
if ((err = trd->pull()) != srs_success) {
@ -522,12 +601,6 @@ srs_error_t SrsUdpMuxListener::cycle()
nn_loop++;
// TODO: FIXME: Refactor the memory cache for receiver.
// Because we have to decrypt the cipher of received packet payload,
// and the size is not determined, so we think there is at least one copy,
// and we can reuse the plaintext h264/opus with players when got plaintext.
SrsUdpMuxSocket skt(lfd);
int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT);
if (nread <= 0) {
if (nread < 0) {
@ -540,15 +613,16 @@ srs_error_t SrsUdpMuxListener::cycle()
nn_msgs++;
nn_msgs_stage++;
// Restore context when packets processed.
if (true) {
SrsContextRestore(cid);
err = handler->on_udp_packet(&skt);
}
// Handle the UDP packet.
err = handler->on_udp_packet(&skt);
// Use pithy print to show more smart information.
if (err != srs_success) {
uint32_t nn = 0;
if (pp_pkt_handler_err->can_print(err, &nn)) {
// For performance, only restore context when output log.
_srs_context->set_id(cid);
// Append more information.
err = srs_error_wrap(err, "size=%u, data=[%s]", skt.size(), srs_string_dumps_hex(skt.data(), skt.size(), 8).c_str());
srs_warn("handle udp pkt, count=%u/%u, err: %s", pp_pkt_handler_err->nn_count, nn, srs_error_desc(err).c_str());
@ -558,6 +632,9 @@ srs_error_t SrsUdpMuxListener::cycle()
pprint->elapse();
if (pprint->can_print()) {
// For performance, only restore context when output log.
_srs_context->set_id(cid);
int pps_average = 0; int pps_last = 0;
if (true) {
if (srs_get_system_time() > srs_get_system_startup_time()) {

@ -29,12 +29,14 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <map>
#include <string>
#include <srs_app_st.hpp>
struct sockaddr;
class SrsBuffer;
class SrsUdpMuxSocket;
// The udp packet handler.
@ -135,6 +137,9 @@ public:
// TODO: FIXME: Rename it. Refine it for performance issue.
class SrsUdpMuxSocket
{
private:
std::map<uint32_t, std::string> cache_;
SrsBuffer* cache_buffer_;
private:
char* buf;
int nb_buf;
@ -142,8 +147,16 @@ private:
srs_netfd_t lfd;
sockaddr_storage from;
int fromlen;
private:
std::string peer_ip;
int peer_port;
private:
// Cache for peer id.
std::string peer_id_;
// If the address changed, we should generate the peer_id.
bool address_changed_;
// For IPv4 client, we use 8 bytes int id to find it fastly.
uint64_t fast_id_;
public:
SrsUdpMuxSocket(srs_netfd_t fd);
virtual ~SrsUdpMuxSocket();
@ -158,6 +171,8 @@ public:
std::string get_peer_ip() const;
int get_peer_port() const;
std::string peer_id();
uint64_t fast_id();
SrsBuffer* buffer();
SrsUdpMuxSocket* copy_sendonly();
};

@ -58,8 +58,23 @@ using namespace std;
#include <srs_app_rtc_source.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_protocol_kbps.hpp>
SrsPps* _srs_pps_pli = new SrsPps(_srs_clock);
SrsPps* _srs_pps_twcc = new SrsPps(_srs_clock);
SrsPps* _srs_pps_rr = new SrsPps(_srs_clock);
SrsPps* _srs_pps_pub = new SrsPps(_srs_clock);
SrsPps* _srs_pps_conn = new SrsPps(_srs_clock);
extern SrsPps* _srs_pps_snack;
extern SrsPps* _srs_pps_snack2;
extern SrsPps* _srs_pps_rnack;
extern SrsPps* _srs_pps_rnack2;
#define SRS_TICKID_RTCP 0
#define SRS_TICKID_TWCC 2
#define SRS_TICKID_TWCC 1
#define SRS_TICKID_SEND_NACKS 2
ISrsRtcTransport::ISrsRtcTransport()
{
@ -170,30 +185,24 @@ srs_error_t SrsSecurityTransport::srtp_initialize()
return err;
}
srs_error_t SrsSecurityTransport::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher)
srs_error_t SrsSecurityTransport::protect_rtp(void* packet, int* nb_cipher)
{
return srtp_->protect_rtp(plaintext, cipher, nb_cipher);
return srtp_->protect_rtp(packet, nb_cipher);
}
srs_error_t SrsSecurityTransport::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher)
srs_error_t SrsSecurityTransport::protect_rtcp(void* packet, int* nb_cipher)
{
return srtp_->protect_rtcp(plaintext, cipher, nb_cipher);
return srtp_->protect_rtcp(packet, nb_cipher);
}
// TODO: FIXME: Merge with protect_rtp.
srs_error_t SrsSecurityTransport::protect_rtp2(void* rtp_hdr, int* len_ptr)
srs_error_t SrsSecurityTransport::unprotect_rtp(void* packet, int* nb_plaintext)
{
return srtp_->protect_rtp2(rtp_hdr, len_ptr);
return srtp_->unprotect_rtp(packet, nb_plaintext);
}
srs_error_t SrsSecurityTransport::unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext)
srs_error_t SrsSecurityTransport::unprotect_rtcp(void* packet, int* nb_plaintext)
{
return srtp_->unprotect_rtp(cipher, plaintext, nb_plaintext);
}
srs_error_t SrsSecurityTransport::unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext)
{
return srtp_->unprotect_rtcp(cipher, plaintext, nb_plaintext);
return srtp_->unprotect_rtcp(packet, nb_plaintext);
}
SrsSemiSecurityTransport::SrsSemiSecurityTransport(SrsRtcConnection* s) : SrsSecurityTransport(s)
@ -204,17 +213,12 @@ SrsSemiSecurityTransport::~SrsSemiSecurityTransport()
{
}
srs_error_t SrsSemiSecurityTransport::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher)
srs_error_t SrsSemiSecurityTransport::protect_rtp(void* packet, int* nb_cipher)
{
return srs_success;
}
srs_error_t SrsSemiSecurityTransport::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher)
{
return srs_success;
}
srs_error_t SrsSemiSecurityTransport::protect_rtp2(void* rtp_hdr, int* len_ptr)
srs_error_t SrsSemiSecurityTransport::protect_rtcp(void* packet, int* nb_cipher)
{
return srs_success;
}
@ -264,32 +268,23 @@ srs_error_t SrsPlaintextTransport::write_dtls_data(void* data, int size)
return srs_success;
}
srs_error_t SrsPlaintextTransport::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher)
{
memcpy(cipher, plaintext, nb_cipher);
return srs_success;
}
srs_error_t SrsPlaintextTransport::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher)
srs_error_t SrsPlaintextTransport::protect_rtp(void* packet, int* nb_cipher)
{
memcpy(cipher, plaintext, nb_cipher);
return srs_success;
}
srs_error_t SrsPlaintextTransport::protect_rtp2(void* rtp_hdr, int* len_ptr)
srs_error_t SrsPlaintextTransport::protect_rtcp(void* packet, int* nb_cipher)
{
return srs_success;
}
srs_error_t SrsPlaintextTransport::unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext)
srs_error_t SrsPlaintextTransport::unprotect_rtp(void* packet, int* nb_plaintext)
{
memcpy(plaintext, cipher, nb_plaintext);
return srs_success;
}
srs_error_t SrsPlaintextTransport::unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext)
srs_error_t SrsPlaintextTransport::unprotect_rtcp(void* packet, int* nb_plaintext)
{
memcpy(plaintext, cipher, nb_plaintext);
return srs_success;
}
@ -351,6 +346,8 @@ srs_error_t SrsRtcPLIWorker::cycle()
uint32_t ssrc = it->first;
SrsContextId cid = it->second;
++_srs_pps_pli->sugar;
if ((err = handler_->do_request_keyframe(ssrc, cid)) != srs_success) {
srs_warn("PLI error, %s", srs_error_desc(err).c_str());
srs_error_reset(err);
@ -393,13 +390,18 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
nack_enabled_ = false;
_srs_config->subscribe(this);
timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS);
timer_ = new SrsHourGlass("play", this, 1000 * SRS_UTIME_MILLISECONDS);
nack_epp = new SrsErrorPithyPrint();
pli_worker_ = new SrsRtcPLIWorker(this);
}
SrsRtcPlayStream::~SrsRtcPlayStream()
{
// TODO: FIXME: Should not do callback in de-constructor?
if (_srs_rtc_hijacker) {
_srs_rtc_hijacker->on_stop_play(session_, this, req_);
}
_srs_config->unsubscribe(this);
srs_freep(nack_epp);
@ -670,53 +672,6 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<Sr
return err;
}
void SrsRtcPlayStream::nack_fetch(vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq)
{
for (map<uint32_t, SrsRtcAudioSendTrack*>::iterator it = audio_tracks_.begin(); it != audio_tracks_.end(); ++it) {
SrsRtcAudioSendTrack* track = it->second;
// If track is inactive, not process nack request.
if (!track->get_track_status()){
continue;
}
if (!track->has_ssrc(ssrc)) {
continue;
}
// update recv nack statistic
track->on_recv_nack();
SrsRtpPacket2* pkt = track->fetch_rtp_packet(seq);
if (pkt != NULL) {
pkts.push_back(pkt);
}
return;
}
for (map<uint32_t, SrsRtcVideoSendTrack*>::iterator it = video_tracks_.begin(); it != video_tracks_.end(); ++it) {
SrsRtcVideoSendTrack* track = it->second;
// If track is inactive, not process nack request.
if (!track->get_track_status()){
continue;
}
if (!track->has_ssrc(ssrc)) {
continue;
}
// update recv nack statistic
track->on_recv_nack();
SrsRtpPacket2* pkt = track->fetch_rtp_packet(seq);
if (pkt != NULL) {
pkts.push_back(pkt);
}
return;
}
}
void SrsRtcPlayStream::set_all_tracks_status(bool status)
{
std::ostringstream merged_log;
@ -806,36 +761,46 @@ srs_error_t SrsRtcPlayStream::on_rtcp_nack(SrsRtcpNack* rtcp)
{
srs_error_t err = srs_success;
++_srs_pps_rnack->sugar;
uint32_t ssrc = rtcp->get_media_ssrc();
// If NACK disabled, print a log.
if (!nack_enabled_) {
vector<uint16_t> sns = rtcp->get_lost_sns();
srs_trace("RTC NACK ssrc=%u, seq=%s, ignored", rtcp->get_media_ssrc(), srs_join_vector_string(sns, ",").c_str());
srs_trace("RTC NACK ssrc=%u, seq=%s, ignored", ssrc, srs_join_vector_string(sns, ",").c_str());
return err;
}
// TODO: FIXME: Support ARQ.
vector<SrsRtpPacket2*> resend_pkts;
SrsRtcSendTrack* target = NULL;
// Try audio track first.
for (map<uint32_t, SrsRtcAudioSendTrack*>::iterator it = audio_tracks_.begin(); it != audio_tracks_.end(); ++it) {
SrsRtcAudioSendTrack* track = it->second;
if (!track->get_track_status() || !track->has_ssrc(ssrc)) {
continue;
}
vector<uint16_t> sns = rtcp->get_lost_sns();
for(int i = 0; i < (int)sns.size(); ++i) {
uint16_t seq = sns.at(i);
nack_fetch(resend_pkts, rtcp->get_media_ssrc(), seq);
target = track;
break;
}
for (int i = 0; i < (int)resend_pkts.size(); ++i) {
SrsRtpPacket2* pkt = resend_pkts[i];
info.nn_bytes += pkt->nb_bytes();
uint32_t nn = 0;
if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) {
srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", pkt->header.get_sequence(),
pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes());
// If not found, try video track.
for (map<uint32_t, SrsRtcVideoSendTrack*>::iterator it = video_tracks_.begin(); !target && it != video_tracks_.end(); ++it) {
SrsRtcVideoSendTrack* track = it->second;
if (!track->get_track_status() || !track->has_ssrc(ssrc)) {
continue;
}
target = track;
break;
}
// Error if no track.
if (!target) {
return srs_error_new(ERROR_RTC_NO_TRACK, "no track for %u ssrc", ssrc);
}
// By default, we send packets by sendmmsg.
if ((err = session_->do_send_packets(resend_pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
vector<uint16_t> seqs = rtcp->get_lost_sns();
if((err = target->on_recv_nack(seqs, info)) != srs_success) {
return srs_error_wrap(err, "track response nack. id:%s, ssrc=%u", target->get_track_id().c_str(), ssrc);
}
session_->stat_->nn_nack++;
@ -909,13 +874,14 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid)
{
timer_ = new SrsHourGlass(this, 10 * SRS_UTIME_MILLISECONDS);
timer_ = new SrsHourGlass("publish", this, 20 * SRS_UTIME_MILLISECONDS);
cid_ = cid;
is_started = false;
session_ = session;
request_keyframe_ = false;
pli_epp = new SrsErrorPithyPrint();
twcc_epp_ = new SrsErrorPithyPrint(3.0);
req = NULL;
source = NULL;
@ -933,16 +899,20 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon
SrsRtcPublishStream::~SrsRtcPublishStream()
{
if (_srs_rtc_hijacker) {
_srs_rtc_hijacker->on_stop_publish(session_, this, req);
}
// TODO: FIXME: Should remove and delete source.
if (source) {
source->set_publish_stream(NULL);
source->on_unpublish();
}
// TODO: FIXME: Should not do callback in de-constructor?
// NOTE: on_stop_publish lead to switch io,
// it must be called after source stream unpublish (set source stream is_created=false).
// if not, it lead to republish failed.
if (_srs_rtc_hijacker) {
_srs_rtc_hijacker->on_stop_publish(session_, this, req);
}
for (int i = 0; i < (int)video_tracks_.size(); ++i) {
SrsRtcVideoRecvTrack* track = video_tracks_.at(i);
srs_freep(track);
@ -957,6 +927,7 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
srs_freep(timer_);
srs_freep(pli_worker_);
srs_freep(twcc_epp_);
srs_freep(pli_epp);
srs_freep(req);
}
@ -991,9 +962,9 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost);
pt_to_drop_ = (uint16_t)_srs_config->get_rtc_drop_for_pt(req->vhost);
bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost);
twcc_enabled_ = _srs_config->get_rtc_twcc_enabled(req->vhost);
srs_trace("RTC publisher nack=%d, pt-drop=%u, twcc=%u/%d", nack_enabled_, pt_to_drop_, twcc_enabled, twcc_id);
srs_trace("RTC publisher nack=%d, pt-drop=%u, twcc=%u/%d", nack_enabled_, pt_to_drop_, twcc_enabled_, twcc_id);
session_->stat_->nn_publishers++;
@ -1014,7 +985,7 @@ srs_error_t SrsRtcPublishStream::start()
return err;
}
if ((err = timer_->tick(SRS_TICKID_TWCC, 50 * SRS_UTIME_MILLISECONDS)) != srs_success) {
if ((err = timer_->tick(SRS_TICKID_TWCC, 40 * SRS_UTIME_MILLISECONDS)) != srs_success) {
return srs_error_wrap(err, "twcc tick");
}
@ -1150,23 +1121,22 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
return err;
}
// Decode the header first.
SrsRtpHeader h;
if (pt_to_drop_ && twcc_id_) {
SrsBuffer b(data, nb_data);
h.ignore_padding(true); h.set_extensions(&extension_types_);
if ((err = h.decode(&b)) != srs_success) {
return srs_error_wrap(err, "twcc decode header");
// If payload type is configed to drop, ignore this packet.
if (pt_to_drop_) {
uint8_t pt = srs_rtp_fast_parse_pt(data, nb_data);
if (pt_to_drop_ == pt) {
return err;
}
}
// We must parse the TWCC from RTP header before SRTP unprotect, because:
// 1. Client may send some padding packets with invalid SequenceNumber, which causes the SRTP fail.
// 2. Server may send multiple duplicated NACK to client, and got more than one ARQ packet, which also fail SRTP.
// so, we must parse the header before SRTP unprotect(which may fail and drop packet).
// Decode the header first.
if (twcc_id_) {
// We must parse the TWCC from RTP header before SRTP unprotect, because:
// 1. Client may send some padding packets with invalid SequenceNumber, which causes the SRTP fail.
// 2. Server may send multiple duplicated NACK to client, and got more than one ARQ packet, which also fail SRTP.
// so, we must parse the header before SRTP unprotect(which may fail and drop packet).
uint16_t twcc_sn = 0;
if ((err = h.get_twcc_sequence_number(twcc_sn)) == srs_success) {
if ((err = srs_rtp_fast_parse_twcc(data, nb_data, &extension_types_, twcc_sn)) == srs_success) {
if((err = on_twcc(twcc_sn)) != srs_success) {
return srs_error_wrap(err, "on twcc");
}
@ -1175,34 +1145,36 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
}
}
// If payload type is configed to drop, ignore this packet.
if (pt_to_drop_ && pt_to_drop_ == h.get_payload_type()) {
return err;
}
// Decrypt the cipher to plaintext RTP data.
int nb_unprotected_buf = nb_data;
char* unprotected_buf = new char[kRtpPacketSize];
if ((err = session_->transport_->unprotect_rtp(data, unprotected_buf, nb_unprotected_buf)) != srs_success) {
if ((err = session_->transport_->unprotect_rtp(data, &nb_unprotected_buf)) != srs_success) {
// We try to decode the RTP header for more detail error informations.
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true);
srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding.
err = srs_error_wrap(err, "marker=%u, pt=%u, seq=%u, ts=%u, ssrc=%u, pad=%u, payload=%uB", h.get_marker(), h.get_payload_type(),
h.get_sequence(), h.get_timestamp(), h.get_ssrc(), h.get_padding(), nb_data - b.pos());
srs_freepa(unprotected_buf);
return err;
}
srs_assert(nb_unprotected_buf > 0);
char* unprotected_buf = new char[nb_unprotected_buf];
memcpy(unprotected_buf, data, nb_unprotected_buf);
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf);
}
// Handle the plaintext RTP packet.
if ((err = do_on_rtp(unprotected_buf, nb_unprotected_buf)) != srs_success) {
// We try to decode the RTP header for more detail error informations.
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true);
srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding.
int nb_header = h.nb_bytes();
const char* body = unprotected_buf + nb_header;
int nb_body = nb_unprotected_buf - nb_header;
const char* body = data + nb_header;
int nb_body = nb_data - nb_header;
return srs_error_wrap(err, "cipher=%u, plaintext=%u, body=[%s]", nb_data, nb_unprotected_buf,
srs_string_dumps_hex(body, nb_body, 8).c_str());
}
@ -1239,28 +1211,20 @@ srs_error_t SrsRtcPublishStream::do_on_rtp(char* plaintext, int nb_plaintext)
SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc);
if (audio_track) {
pkt->frame_type = SrsFrameTypeAudio;
if ((err = audio_track->on_rtp(source, pkt)) != srs_success) {
if ((err = audio_track->on_rtp(source, pkt, nack_enabled_)) != srs_success) {
return srs_error_wrap(err, "on audio");
}
} else if (video_track) {
pkt->frame_type = SrsFrameTypeVideo;
if ((err = video_track->on_rtp(source, pkt)) != srs_success) {
if ((err = video_track->on_rtp(source, pkt, nack_enabled_)) != srs_success) {
return srs_error_wrap(err, "on video");
}
} else {
return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc);
}
// Check then send NACK every each RTP packet, to make it more efficient.
// For example, NACK of video track maybe triggered by audio RTP packets.
if ((err = check_send_nacks()) != srs_success) {
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
if (_srs_rtc_hijacker) {
// TODO: FIXME: copy pkt by hijacker itself
if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req, pkt->copy())) != srs_success) {
if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req, pkt)) != srs_success) {
return srs_error_wrap(err, "on rtp packet");
}
}
@ -1272,6 +1236,10 @@ srs_error_t SrsRtcPublishStream::check_send_nacks()
{
srs_error_t err = srs_success;
if (!nack_enabled_) {
return err;
}
for (int i = 0; i < (int)video_tracks_.size(); ++i) {
SrsRtcVideoRecvTrack* track = video_tracks_.at(i);
if ((err = track->check_send_nacks()) != srs_success) {
@ -1297,17 +1265,13 @@ void SrsRtcPublishStream::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer
}
uint32_t ssrc = pkt->header.get_ssrc();
if (get_audio_track(ssrc)) {
*ppayload = new SrsRtpRawPayload();
} else if (get_video_track(ssrc)) {
uint8_t v = (uint8_t)pkt->nalu_type;
if (v == kStapA) {
*ppayload = new SrsRtpSTAPPayload();
} else if (v == kFuA) {
*ppayload = new SrsRtpFUAPayload2();
} else {
*ppayload = new SrsRtpRawPayload();
}
SrsRtcAudioRecvTrack* audio_track = get_audio_track(ssrc);
SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc);
if (audio_track) {
audio_track->on_before_decode_payload(pkt, buf, ppayload);
} else if (video_track) {
video_track->on_before_decode_payload(pkt, buf, ppayload);
}
}
@ -1316,9 +1280,11 @@ srs_error_t SrsRtcPublishStream::send_periodic_twcc()
srs_error_t err = srs_success;
if (last_time_send_twcc_) {
uint32_t nn = 0;
srs_utime_t duration = srs_duration(last_time_send_twcc_, srs_get_system_time());
if (duration > 80 * SRS_UTIME_MILLISECONDS) {
srs_warn2(TAG_LARGE_TIMER, "send_twcc interval exceeded %dms > 100ms", srsu2msi(duration));
if (duration > 80 * SRS_UTIME_MILLISECONDS && twcc_epp_->can_print(0, &nn)) {
srs_warn2(TAG_LARGE_TIMER, "send_twcc interval exceeded %dms > 80ms, count=%u/%u",
srsu2msi(duration), nn, twcc_epp_->nn_count);
}
}
last_time_send_twcc_ = srs_get_system_time();
@ -1339,12 +1305,11 @@ srs_error_t SrsRtcPublishStream::send_periodic_twcc()
}
int nb_protected_buf = buffer->pos();
char protected_buf[kRtpPacketSize];
if ((err = session_->transport_->protect_rtcp(pkt, protected_buf, nb_protected_buf)) != srs_success) {
if ((err = session_->transport_->protect_rtcp(pkt, &nb_protected_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp, size=%u", nb_protected_buf);
}
return session_->sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
return session_->sendonly_skt->sendto(pkt, nb_protected_buf, 0);
}
srs_error_t SrsRtcPublishStream::on_rtcp(SrsRtcpCommon* rtcp)
@ -1485,7 +1450,7 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim
}
}
if (type == SRS_TICKID_TWCC) {
if (twcc_enabled_ && type == SRS_TICKID_TWCC) {
// We should not depends on the received packet,
// instead we should send feedback every Nms.
if ((err = send_periodic_twcc()) != srs_success) {
@ -1619,7 +1584,7 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
req = NULL;
cid_ = cid;
stat_ = new SrsRtcConnectionStatistic();
timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS);
timer_ = new SrsHourGlass("conn", this, 20 * SRS_UTIME_MILLISECONDS);
hijacker_ = NULL;
sendonly_skt = NULL;
@ -1776,6 +1741,7 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRequest* req, const SrsSdp& remot
SrsRtcStreamDescription* stream_desc = new SrsRtcStreamDescription();
SrsAutoFree(SrsRtcStreamDescription, stream_desc);
// TODO: FIXME: Change to api of stream desc.
if ((err = negotiate_publish_capability(req, remote_sdp, stream_desc)) != srs_success) {
return srs_error_wrap(err, "publish negotiate");
}
@ -1922,6 +1888,10 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
return srs_error_wrap(err, "init");
}
if ((err = timer_->tick(SRS_TICKID_SEND_NACKS, 20 * SRS_UTIME_MILLISECONDS)) != srs_success) {
return srs_error_wrap(err, "tick nack");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
@ -1969,12 +1939,12 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data)
{
srs_error_t err = srs_success;
char unprotected_buf[kRtpPacketSize];
int nb_unprotected_buf = nb_data;
if ((err = transport_->unprotect_rtcp(data, unprotected_buf, nb_unprotected_buf)) != srs_success) {
if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtcp unprotect");
}
char* unprotected_buf = data;
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf);
}
@ -2107,27 +2077,36 @@ srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
{
srs_error_t err = srs_success;
SrsRtcPublishStream* publisher = NULL;
if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) {
return srs_error_wrap(err, "find");
}
srs_assert(publisher);
return publisher->on_rtp(data, nb_data);
}
srs_error_t SrsRtcConnection::find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher)
{
srs_error_t err = srs_success;
if (publishers_.size() == 0) {
return srs_error_new(ERROR_RTC_RTCP, "no publisher");
}
SrsRtpHeader header;
if (true) {
SrsBuffer* buffer = new SrsBuffer(data, nb_data);
SrsAutoFree(SrsBuffer, buffer);
header.ignore_padding(true);
if(srs_success != (err = header.decode(buffer))) {
return srs_error_wrap(err, "decode rtp header");
}
uint32_t ssrc = srs_rtp_fast_parse_ssrc(buf, size);
if (ssrc == 0) {
return srs_error_new(ERROR_RTC_NO_PUBLISHER, "invalid ssrc");
}
map<uint32_t, SrsRtcPublishStream*>::iterator it = publishers_ssrc_map_.find(header.get_ssrc());
map<uint32_t, SrsRtcPublishStream*>::iterator it = publishers_ssrc_map_.find(ssrc);
if(it == publishers_ssrc_map_.end()) {
return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no publisher for ssrc:%u", header.get_ssrc());
return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no publisher for ssrc:%u", ssrc);
}
SrsRtcPublishStream* publisher = it->second;
return publisher->on_rtp(data, nb_data);
*ppublisher = it->second;
return err;
}
srs_error_t SrsRtcConnection::on_connection_established()
@ -2228,7 +2207,7 @@ srs_error_t SrsRtcConnection::start_publish(std::string stream_uri)
bool SrsRtcConnection::is_alive()
{
return last_stun_time + session_timeout < srs_get_system_time();
return last_stun_time + session_timeout > srs_get_system_time();
}
void SrsRtcConnection::alive()
@ -2272,7 +2251,12 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt)
// If no cache, build cache and setup the relations in connection.
if (!addr_cache) {
peer_addresses_[peer_id] = addr_cache = skt->copy_sendonly();
server_->insert_into_id_sessions(peer_id, this);
_srs_rtc_manager->add_with_id(peer_id, this);
uint64_t fast_id = skt->fast_id();
if (fast_id) {
_srs_rtc_manager->add_with_fast_id(fast_id, this);
}
}
// Update the transport.
@ -2282,6 +2266,24 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt)
srs_error_t SrsRtcConnection::notify(int type, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
++_srs_pps_conn->sugar;
// For publisher to send NACK.
if (type == SRS_TICKID_SEND_NACKS) {
srs_update_system_time();
std::map<std::string, SrsRtcPublishStream*>::iterator it;
for (it = publishers_.begin(); it != publishers_.end(); it++) {
SrsRtcPublishStream* publisher = it->second;
if ((err = publisher->check_send_nacks()) != srs_success) {
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
}
}
return err;
}
@ -2290,12 +2292,11 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data)
srs_error_t err = srs_success;
int nb_buf = nb_data;
char protected_buf[kRtpPacketSize];
if ((err = transport_->protect_rtcp(data, protected_buf, nb_buf)) != srs_success) {
if ((err = transport_->protect_rtcp(data, &nb_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp");
}
if ((err = sendonly_skt->sendto(protected_buf, nb_buf, 0)) != srs_success) {
if ((err = sendonly_skt->sendto(data, nb_buf, 0)) != srs_success) {
return srs_error_wrap(err, "send");
}
@ -2304,16 +2305,19 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data)
void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc, uint32_t& sent_nacks, uint32_t& timeout_nacks)
{
++_srs_pps_snack->sugar;
SrsRtcpNack rtcpNack(ssrc);
rtcpNack.set_media_ssrc(ssrc);
nack->get_nack_seqs(rtcpNack, timeout_nacks);
sent_nacks = rtcpNack.get_lost_sns().size();
if(!sent_nacks){
if(rtcpNack.empty()){
return;
}
++_srs_pps_snack2->sugar;
char buf[kRtcpPacketSize];
SrsBuffer stream(buf, sizeof(buf));
@ -2321,12 +2325,11 @@ void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ss
rtcpNack.encode(&stream);
// TODO: FIXME: Check error.
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf);
transport_->protect_rtcp(stream.data(), &nb_protected_buf);
// TODO: FIXME: Check error.
sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
sendonly_skt->sendto(stream.data(), nb_protected_buf, 0);
}
srs_error_t SrsRtcConnection::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue, const uint64_t& last_send_systime, const SrsNtp& last_send_ntp)
@ -2366,13 +2369,12 @@ srs_error_t SrsRtcConnection::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_
srs_info("RR ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, extended_highest_sequence=%u, interarrival_jitter=%u",
ssrc, fraction_lost, cumulative_number_of_packets_lost, extended_highest_sequence, interarrival_jitter);
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if ((err = transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) {
if ((err = transport_->protect_rtcp(stream.data(), &nb_protected_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp rr");
}
return sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
return sendonly_skt->sendto(stream.data(), nb_protected_buf, 0);
}
srs_error_t SrsRtcConnection::send_rtcp_xr_rrtr(uint32_t ssrc)
@ -2419,13 +2421,12 @@ srs_error_t SrsRtcConnection::send_rtcp_xr_rrtr(uint32_t ssrc)
stream.write_4bytes(cur_ntp.ntp_second_);
stream.write_4bytes(cur_ntp.ntp_fractions_);
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if ((err = transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) {
if ((err = transport_->protect_rtcp(stream.data(), &nb_protected_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp xr");
}
return sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
return sendonly_skt->sendto(stream.data(), nb_protected_buf, 0);
}
srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc, const SrsContextId& cid_of_subscriber)
@ -2450,13 +2451,12 @@ srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc, const SrsContextId
_srs_blackhole->sendto(stream.data(), stream.pos());
}
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if ((err = transport_->protect_rtcp(stream.data(), protected_buf, nb_protected_buf)) != srs_success) {
if ((err = transport_->protect_rtcp(stream.data(), &nb_protected_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp psfb pli");
}
return sendonly_skt->sendto(protected_buf, nb_protected_buf, 0);
return sendonly_skt->sendto(stream.data(), nb_protected_buf, 0);
}
void SrsRtcConnection::simulate_nack_drop(int nn)
@ -2507,7 +2507,7 @@ srs_error_t SrsRtcConnection::do_send_packets(const std::vector<SrsRtpPacket2*>&
// Cipher RTP to SRTP packet.
if (true) {
int nn_encrypt = (int)iov->iov_len;
if ((err = transport_->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) {
if ((err = transport_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;

@ -97,11 +97,14 @@ public:
virtual srs_error_t on_dtls(char* data, int nb_data) = 0;
virtual srs_error_t on_dtls_alert(std::string type, std::string desc) = 0;
public:
virtual srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher) = 0;
virtual srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher) = 0;
virtual srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr) = 0;
virtual srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext) = 0;
virtual srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext) = 0;
// Encrypt the packet(paintext) to cipher, which is aso the packet ptr.
// The nb_cipher should be initialized to the size of cipher, with some paddings.
virtual srs_error_t protect_rtp(void* packet, int* nb_cipher) = 0;
virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher) = 0;
// Decrypt the packet(cipher) to plaintext, which is also the packet ptr.
// The nb_plaintext should be initialized to the size of cipher.
virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext) = 0;
virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext) = 0;
};
// The security transport, use DTLS/SRTP to protect the data.
@ -122,19 +125,14 @@ public:
srs_error_t on_dtls(char* data, int nb_data);
srs_error_t on_dtls_alert(std::string type, std::string desc);
public:
// Encrypt the input plaintext to output cipher with nb_cipher bytes.
// @remark Note that the nb_cipher is the size of input plaintext, and
// it also is the length of output cipher when return.
srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher);
srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher);
// Encrypt the input rtp_hdr with *len_ptr bytes.
// @remark the input plaintext and out cipher reuse rtp_hdr.
srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr);
// Decrypt the input cipher to output cipher with nb_cipher bytes.
// @remark Note that the nb_plaintext is the size of input cipher, and
// it also is the length of output plaintext when return.
srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext);
srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext);
// Encrypt the packet(paintext) to cipher, which is aso the packet ptr.
// The nb_cipher should be initialized to the size of cipher, with some paddings.
srs_error_t protect_rtp(void* packet, int* nb_cipher);
srs_error_t protect_rtcp(void* packet, int* nb_cipher);
// Decrypt the packet(cipher) to plaintext, which is also the packet ptr.
// The nb_plaintext should be initialized to the size of cipher.
srs_error_t unprotect_rtp(void* packet, int* nb_plaintext);
srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext);
// implement ISrsDtlsCallback
public:
virtual srs_error_t on_dtls_handshake_done();
@ -151,9 +149,8 @@ public:
SrsSemiSecurityTransport(SrsRtcConnection* s);
virtual ~SrsSemiSecurityTransport();
public:
virtual srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher);
virtual srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher);
virtual srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr);
srs_error_t protect_rtp(void* packet, int* nb_cipher);
srs_error_t protect_rtcp(void* packet, int* nb_cipher);
};
// Plaintext transport, without DTLS or SRTP.
@ -173,11 +170,10 @@ public:
virtual srs_error_t on_dtls_application_data(const char* data, const int len);
virtual srs_error_t write_dtls_data(void* data, int size);
public:
virtual srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher);
virtual srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher);
virtual srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr);
virtual srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext);
virtual srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext);
srs_error_t protect_rtp(void* packet, int* nb_cipher);
srs_error_t protect_rtcp(void* packet, int* nb_cipher);
srs_error_t unprotect_rtp(void* packet, int* nb_plaintext);
srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext);
};
// The handler for PLI worker coroutine.
@ -289,7 +285,6 @@ public:
virtual srs_error_t cycle();
private:
srs_error_t send_packets(SrsRtcStream* source, const std::vector<SrsRtpPacket2*>& pkts, SrsRtcPlayStreamStatistic& info);
void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq);
public:
// Directly set the status of track, generally for init to set the default value.
void set_all_tracks_status(bool status);
@ -318,11 +313,13 @@ private:
SrsHourGlass* timer_;
uint64_t nn_audio_frames;
SrsRtcPLIWorker* pli_worker_;
SrsErrorPithyPrint* twcc_epp_;
private:
SrsRtcConnection* session_;
uint16_t pt_to_drop_;
// Whether enabled nack.
bool nack_enabled_;
bool twcc_enabled_;
private:
bool request_keyframe_;
SrsErrorPithyPrint* pli_epp;
@ -358,6 +355,7 @@ public:
srs_error_t on_rtp(char* buf, int nb_buf);
private:
srs_error_t do_on_rtp(char* plaintext, int nb_plaintext);
public:
srs_error_t check_send_nacks();
public:
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload);
@ -415,8 +413,11 @@ public:
};
// A RTC Peer Connection, SDP level object.
class SrsRtcConnection : virtual public ISrsHourGlass, virtual public ISrsResource
, virtual public ISrsDisposingHandler
//
// For performance, we use non-virtual public from resource,
// see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a
class SrsRtcConnection : public ISrsResource
, virtual public ISrsHourGlass, virtual public ISrsDisposingHandler
{
friend class SrsSecurityTransport;
friend class SrsRtcPlayStream;
@ -507,6 +508,10 @@ public:
srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r);
srs_error_t on_dtls(char* data, int nb_data);
srs_error_t on_rtp(char* data, int nb_data);
private:
// Decode the RTP header from buf, find the publisher by SSRC.
srs_error_t find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher);
public:
srs_error_t on_rtcp(char* data, int nb_data);
private:
srs_error_t dispatch_rtcp(SrsRtcpCommon* rtcp);
@ -575,6 +580,8 @@ public:
virtual srs_error_t on_before_play(SrsRtcConnection* session, SrsRequest* req) = 0;
// When start player by RTC.
virtual srs_error_t on_start_play(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req) = 0;
// When stop player by RTC.
virtual void on_stop_play(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req) = 0;
// When start consuming for player for RTC.
virtual srs_error_t on_start_consume(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req, SrsRtcConsumer* consumer) = 0;
};

@ -955,7 +955,7 @@ srs_error_t SrsSRTP::initialize(string recv_key, std::string send_key)
return err;
}
srs_error_t SrsSRTP::protect_rtp(const char* plaintext, char* cipher, int& nb_cipher)
srs_error_t SrsSRTP::protect_rtp(void* packet, int* nb_cipher)
{
srs_error_t err = srs_success;
@ -964,17 +964,15 @@ srs_error_t SrsSRTP::protect_rtp(const char* plaintext, char* cipher, int& nb_ci
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "not ready");
}
memcpy(cipher, plaintext, nb_cipher);
srtp_err_status_t r0 = srtp_err_status_ok;
if ((r0 = srtp_protect(send_ctx_, cipher, &nb_cipher)) != srtp_err_status_ok) {
if ((r0 = srtp_protect(send_ctx_, packet, nb_cipher)) != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect r0=%u", r0);
}
return err;
}
srs_error_t SrsSRTP::protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher)
srs_error_t SrsSRTP::protect_rtcp(void* packet, int* nb_cipher)
{
srs_error_t err = srs_success;
@ -983,34 +981,15 @@ srs_error_t SrsSRTP::protect_rtcp(const char* plaintext, char* cipher, int& nb_c
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "not ready");
}
memcpy(cipher, plaintext, nb_cipher);
srtp_err_status_t r0 = srtp_err_status_ok;
if ((r0 = srtp_protect_rtcp(send_ctx_, cipher, &nb_cipher)) != srtp_err_status_ok) {
if ((r0 = srtp_protect_rtcp(send_ctx_, packet, nb_cipher)) != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtcp protect r0=%u", r0);
}
return err;
}
srs_error_t SrsSRTP::protect_rtp2(void* rtp_hdr, int* len_ptr)
{
srs_error_t err = srs_success;
// If DTLS/SRTP is not ready, fail.
if (!send_ctx_) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "not ready");
}
srtp_err_status_t r0 = srtp_err_status_ok;
if ((r0 = srtp_protect(send_ctx_, rtp_hdr, len_ptr)) != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect r0=%u", r0);
}
return err;
}
srs_error_t SrsSRTP::unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext)
srs_error_t SrsSRTP::unprotect_rtp(void* packet, int* nb_plaintext)
{
srs_error_t err = srs_success;
@ -1019,17 +998,15 @@ srs_error_t SrsSRTP::unprotect_rtp(const char* cipher, char* plaintext, int& nb_
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
}
memcpy(plaintext, cipher, nb_plaintext);
srtp_err_status_t r0 = srtp_err_status_ok;
if ((r0 = srtp_unprotect(recv_ctx_, plaintext, &nb_plaintext)) != srtp_err_status_ok) {
if ((r0 = srtp_unprotect(recv_ctx_, packet, nb_plaintext)) != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtp unprotect r0=%u", r0);
}
return err;
}
srs_error_t SrsSRTP::unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext)
srs_error_t SrsSRTP::unprotect_rtcp(void* packet, int* nb_plaintext)
{
srs_error_t err = srs_success;
@ -1038,10 +1015,8 @@ srs_error_t SrsSRTP::unprotect_rtcp(const char* cipher, char* plaintext, int& nb
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
}
memcpy(plaintext, cipher, nb_plaintext);
srtp_err_status_t r0 = srtp_err_status_ok;
if ((r0 = srtp_unprotect_rtcp(recv_ctx_, plaintext, &nb_plaintext)) != srtp_err_status_ok) {
if ((r0 = srtp_unprotect_rtcp(recv_ctx_, packet, nb_plaintext)) != srtp_err_status_ok) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect r0=%u", r0);
}

@ -224,19 +224,10 @@ public:
// Intialize srtp context with recv_key and send_key.
srs_error_t initialize(std::string recv_key, std::string send_key);
public:
// Encrypt the input plaintext to output cipher with nb_cipher bytes.
// @remark Note that the nb_cipher is the size of input plaintext, and
// it also is the length of output cipher when return.
srs_error_t protect_rtp(const char* plaintext, char* cipher, int& nb_cipher);
srs_error_t protect_rtcp(const char* plaintext, char* cipher, int& nb_cipher);
// Encrypt the input rtp_hdr with *len_ptr bytes.
// @remark the input plaintext and out cipher reuse rtp_hdr.
srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr);
// Decrypt the input cipher to output cipher with nb_cipher bytes.
// @remark Note that the nb_plaintext is the size of input cipher, and
// it also is the length of output plaintext when return.
srs_error_t unprotect_rtp(const char* cipher, char* plaintext, int& nb_plaintext);
srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext);
srs_error_t protect_rtp(void* packet, int* nb_cipher);
srs_error_t protect_rtcp(void* packet, int* nb_cipher);
srs_error_t unprotect_rtp(void* packet, int* nb_plaintext);
srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext);
};
#endif

@ -146,10 +146,7 @@ SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq) {
void SrsRtpRingBuffer::notify_nack_list_full()
{
while(begin <= end) {
remove(begin);
++begin;
}
clear_all_histroy();
begin = end = 0;
initialized_ = false;
@ -161,6 +158,29 @@ void SrsRtpRingBuffer::notify_drop_seq(uint16_t seq)
advance_to(seq+1);
}
void SrsRtpRingBuffer::clear_histroy(uint16_t seq)
{
// TODO FIXME Did not consider loopback
for (uint16_t i = 0; i < capacity_; i++) {
SrsRtpPacket2* p = queue_[i];
if (p && p->header.get_sequence() < seq) {
srs_freep(p);
queue_[i] = NULL;
}
}
}
void SrsRtpRingBuffer::clear_all_histroy()
{
for (uint16_t i = 0; i < capacity_; i++) {
SrsRtpPacket2* p = queue_[i];
if (p) {
srs_freep(p);
queue_[i] = NULL;
}
}
}
SrsNackOption::SrsNackOption()
{
max_count = 15;
@ -170,7 +190,7 @@ SrsNackOption::SrsNackOption()
max_nack_interval = 500 * SRS_UTIME_MILLISECONDS;
min_nack_interval = 20 * SRS_UTIME_MILLISECONDS;
nack_check_interval = 3 * SRS_UTIME_MILLISECONDS;
nack_check_interval = 20 * SRS_UTIME_MILLISECONDS;
//TODO: FIXME: audio and video using diff nack strategy
// video:
@ -239,8 +259,7 @@ void SrsRtpNackForReceiver::check_queue_size()
void SrsRtpNackForReceiver::get_nack_seqs(SrsRtcpNack& seqs, uint32_t& timeout_nacks)
{
// TODO: FIXME: Use packet as tick count, not clock.
srs_utime_t now = srs_update_system_time();
srs_utime_t now = srs_get_system_time();
srs_utime_t interval = now - pre_check_time_;
if (interval < opts_.nack_check_interval) {

@ -87,6 +87,9 @@ public:
// TODO: FIXME: Refine it?
void notify_nack_list_full();
void notify_drop_seq(uint16_t seq);
public:
void clear_histroy(uint16_t seq);
void clear_all_histroy();
};
struct SrsNackOption

@ -45,6 +45,30 @@ using namespace std;
#include <srs_app_rtc_api.hpp>
#include <srs_protocol_utility.hpp>
extern SrsPps* _srs_pps_pkts;
extern SrsPps* _srs_pps_addrs;
extern SrsPps* _srs_pps_fast_addrs;
extern SrsPps* _srs_pps_ids;
extern SrsPps* _srs_pps_fids;
extern SrsPps* _srs_pps_fids_level0;
extern SrsPps* _srs_pps_pli;
extern SrsPps* _srs_pps_twcc;
extern SrsPps* _srs_pps_rr;
extern SrsPps* _srs_pps_timer;
extern SrsPps* _srs_pps_pub;
extern SrsPps* _srs_pps_conn;
extern SrsPps* _srs_pps_snack;
extern SrsPps* _srs_pps_snack2;
extern SrsPps* _srs_pps_sanack;
extern SrsPps* _srs_pps_svnack;
extern SrsPps* _srs_pps_rnack;
extern SrsPps* _srs_pps_rnack2;
SrsRtcBlackhole::SrsRtcBlackhole()
{
blackhole = false;
@ -217,7 +241,7 @@ SrsRtcServer::SrsRtcServer()
{
handler = NULL;
hijacker = NULL;
timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS);
timer = new SrsHourGlass("server", this, 1 * SRS_UTIME_SECONDS);
}
SrsRtcServer::~SrsRtcServer()
@ -300,27 +324,32 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
{
srs_error_t err = srs_success;
string peer_id = skt->peer_id();
SrsRtcConnection* session = NULL;
char* data = skt->data(); int size = skt->size();
bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)data, size);
bool is_rtcp = srs_is_rtcp((uint8_t*)data, size);
SrsRtcConnection* session = NULL;
if (true) {
ISrsResource* conn = _srs_rtc_manager->find_by_id(peer_id);
if (conn) {
// Switch to the session to write logs to the context.
session = dynamic_cast<SrsRtcConnection*>(conn);
session->switch_to_context();
}
uint64_t fast_id = skt->fast_id();
// Try fast id first, if not found, search by long peer id.
if (fast_id) {
session = (SrsRtcConnection*)_srs_rtc_manager->find_by_fast_id(fast_id);
}
if (!session) {
string peer_id = skt->peer_id();
session = (SrsRtcConnection*)_srs_rtc_manager->find_by_id(peer_id);
}
// When got any packet, the session is alive now.
if (session) {
// When got any packet, the session is alive now.
session->alive();
}
// Notify hijack to handle the UDP packet.
if (hijacker) {
if (hijacker && is_rtp_or_rtcp && is_rtcp) {
bool consumed = false;
if (session) {
session->switch_to_context();
}
if ((err = hijacker->on_udp_packet(skt, session, &consumed)) != srs_success) {
return srs_error_wrap(err, "hijack consumed=%u", consumed);
}
@ -331,27 +360,27 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
}
// For STUN, the peer address may change.
if (srs_is_stun((uint8_t*)data, size)) {
if (!is_rtp_or_rtcp && srs_is_stun((uint8_t*)data, size)) {
string peer_id = skt->peer_id();
SrsStunPacket ping;
if ((err = ping.decode(data, size)) != srs_success) {
return srs_error_wrap(err, "decode stun packet failed");
}
srs_info("recv stun packet from %s, use-candidate=%d, ice-controlled=%d, ice-controlling=%d",
peer_id.c_str(), ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling());
if (!session) {
session = find_session_by_username(ping.get_username());
// Switch to the session to write logs to the context.
if (session) {
session->switch_to_context();
}
}
if (session) {
session->switch_to_context();
}
srs_info("recv stun packet from %s, fast=%" PRId64 ", use-candidate=%d, ice-controlled=%d, ice-controlling=%d",
peer_id.c_str(), fast_id, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling());
// TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it.
if (!session) {
return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s",
ping.get_username().c_str(), peer_id.c_str());
return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s, fast=%" PRId64,
ping.get_username().c_str(), peer_id.c_str(), fast_id);
}
return session->on_stun(skt, &ping);
@ -359,18 +388,26 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
// For DTLS, RTCP or RTP, which does not support peer address changing.
if (!session) {
return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s", peer_id.c_str());
string peer_id = skt->peer_id();
return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s, fast=%" PRId64, peer_id.c_str(), fast_id);
}
if (srs_is_dtls((uint8_t*)data, size)) {
return session->on_dtls(data, size);
} else if (srs_is_rtp_or_rtcp((uint8_t*)data, size)) {
if (srs_is_rtcp((uint8_t*)data, size)) {
return session->on_rtcp(data, size);
// Note that we don't(except error) switch to the context of session, for performance issue.
if (is_rtp_or_rtcp && !is_rtcp) {
err = session->on_rtp(data, size);
if (err != srs_success) {
session->switch_to_context();
}
return session->on_rtp(data, size);
return err;
}
session->switch_to_context();
if (is_rtp_or_rtcp && is_rtcp) {
return session->on_rtcp(data, size);
}
if (srs_is_dtls((uint8_t*)data, size)) {
return session->on_dtls(data, size);
}
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
}
@ -469,8 +506,12 @@ srs_error_t SrsRtcServer::do_create_session(
// We allows to mock the eip of server.
if (!mock_eip.empty()) {
local_sdp.add_candidate(mock_eip, _srs_config->get_rtc_server_listen(), "host");
srs_trace("RTC: Use candidate mock_eip %s", mock_eip.c_str());
string host;
int port = _srs_config->get_rtc_server_listen();
srs_parse_hostport(mock_eip, host, port);
local_sdp.add_candidate(host, port, "host");
srs_trace("RTC: Use candidate mock_eip %s as %s:%d", mock_eip.c_str(), host.c_str(), port);
} else {
std::vector<string> candidate_ips = get_candidate_ips();
for (int i = 0; i < (int)candidate_ips.size(); ++i) {
@ -534,8 +575,12 @@ srs_error_t SrsRtcServer::create_session2(SrsRequest* req, SrsSdp& local_sdp, co
// We allows to mock the eip of server.
if (!mock_eip.empty()) {
local_sdp.add_candidate(mock_eip, _srs_config->get_rtc_server_listen(), "host");
srs_trace("RTC: Use candidate mock_eip %s", mock_eip.c_str());
string host;
int port = _srs_config->get_rtc_server_listen();
srs_parse_hostport(mock_eip, host, port);
local_sdp.add_candidate(host, port, "host");
srs_trace("RTC: Use candidate mock_eip %s as %s:%d", mock_eip.c_str(), host.c_str(), port);
} else {
std::vector<string> candidate_ips = get_candidate_ips();
for (int i = 0; i < (int)candidate_ips.size(); ++i) {
@ -574,11 +619,6 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest*
return err;
}
void SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session)
{
_srs_rtc_manager->add_with_id(peer_id, session);
}
SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& username)
{
ISrsResource* conn = _srs_rtc_manager->find_by_name(username);
@ -595,7 +635,13 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic
// Check all sessions and dispose the dead sessions.
for (int i = 0; i < (int)_srs_rtc_manager->size(); i++) {
SrsRtcConnection* session = dynamic_cast<SrsRtcConnection*>(_srs_rtc_manager->at(i));
if (!session || !session->is_alive() || session->disposing_) {
// Ignore not session, or already disposing.
if (!session || session->disposing_) {
continue;
}
// Update stat if session is alive.
if (session->is_alive()) {
nn_rtc_conns++;
continue;
}
@ -616,12 +662,23 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic
return err;
}
// Show statistics for RTC server.
SrsProcSelfStat* u = srs_get_self_proc_stat();
// Resident Set Size: number of pages the process has in real memory.
int memory = (int)(u->rss * 4 / 1024);
// Update the pps stat for UDP socket and adddresses.
_srs_pps_pkts->update(); _srs_pps_addrs->update(); _srs_pps_fast_addrs->update();
_srs_pps_ids->update(); _srs_pps_fids->update(); _srs_pps_fids_level0->update();
_srs_pps_pli->update(); _srs_pps_twcc->update(); _srs_pps_rr->update();
_srs_pps_timer->update(); _srs_pps_pub->update(); _srs_pps_conn->update();
_srs_pps_snack->update(); _srs_pps_snack2->update(); _srs_pps_sanack->update(); _srs_pps_svnack->update();
_srs_pps_rnack->update(); _srs_pps_rnack2->update();
// TODO: FIXME: Show more data for RTC server.
srs_trace("RTC: Server conns=%u, cpu=%.2f%%, rss=%dMB", nn_rtc_conns, u->percent * 100, memory);
srs_trace("RTC: Server conns=%u, pkts=%d, addrs=%d,%d, fid=%d,%d,%d, rtcp=%d,%d,%d, snk=%d,%d,%d,%d, rnk=%d,%d",
nn_rtc_conns,
_srs_pps_pkts->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s(),
_srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(),
_srs_pps_pli->r10s(), _srs_pps_twcc->r10s(), _srs_pps_rr->r10s(),
_srs_pps_snack->r10s(), _srs_pps_snack2->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(),
_srs_pps_rnack->r10s(), _srs_pps_rnack2->r10s()
);
return err;
}

@ -122,8 +122,6 @@ public:
// We start offering, create_session2 to generate offer, setup_session2 to handle answer.
srs_error_t create_session2(SrsRequest* req, SrsSdp& local_sdp, const std::string& mock_eip, bool unified_plan, SrsRtcConnection** psession);
srs_error_t setup_session2(SrsRtcConnection* session, SrsRequest* req, const SrsSdp& remote_sdp);
public:
void insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session);
public:
SrsRtcConnection* find_session_by_username(const std::string& ufrag);
// interface ISrsHourGlass

@ -48,6 +48,17 @@
#include <srs_app_rtc_codec.hpp>
#endif
#include <srs_protocol_kbps.hpp>
// The NACK sent by us(SFU).
SrsPps* _srs_pps_snack = new SrsPps(_srs_clock);
SrsPps* _srs_pps_snack2 = new SrsPps(_srs_clock);
SrsPps* _srs_pps_sanack = new SrsPps(_srs_clock);
SrsPps* _srs_pps_svnack = new SrsPps(_srs_clock);
SrsPps* _srs_pps_rnack = new SrsPps(_srs_clock);
SrsPps* _srs_pps_rnack2 = new SrsPps(_srs_clock);
// Firefox defaults as 109, Chrome is 111.
const int kAudioPayloadType = 111;
const int kAudioChannel = 2;
@ -1831,7 +1842,17 @@ SrsRtcAudioRecvTrack::~SrsRtcAudioRecvTrack()
{
}
srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt)
void SrsRtcAudioRecvTrack::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload)
{
// No payload, ignore.
if (buf->empty()) {
return;
}
*ppayload = new SrsRtpRawPayload();
}
srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled)
{
srs_error_t err = srs_success;
@ -1847,7 +1868,7 @@ srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pk
}
// For NACK to handle packet.
if ((err = on_nack(pkt)) != srs_success) {
if (nack_enabled && (err = on_nack(pkt)) != srs_success) {
return srs_error_wrap(err, "on nack");
}
@ -1858,6 +1879,8 @@ srs_error_t SrsRtcAudioRecvTrack::check_send_nacks()
{
srs_error_t err = srs_success;
++_srs_pps_sanack->sugar;
uint32_t timeout_nacks = 0;
if ((err = do_check_send_nacks(timeout_nacks)) != srs_success) {
return srs_error_wrap(err, "audio");
@ -1875,7 +1898,24 @@ SrsRtcVideoRecvTrack::~SrsRtcVideoRecvTrack()
{
}
srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt)
void SrsRtcVideoRecvTrack::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload)
{
// No payload, ignore.
if (buf->empty()) {
return;
}
uint8_t v = (uint8_t)pkt->nalu_type;
if (v == kStapA) {
*ppayload = new SrsRtpSTAPPayload();
} else if (v == kFuA) {
*ppayload = new SrsRtpFUAPayload2();
} else {
*ppayload = new SrsRtpRawPayload();
}
}
srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled)
{
srs_error_t err = srs_success;
@ -1893,7 +1933,7 @@ srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pk
}
// For NACK to handle packet.
if ((err = on_nack(pkt)) != srs_success) {
if (nack_enabled && (err = on_nack(pkt)) != srs_success) {
return srs_error_wrap(err, "on nack");
}
@ -1904,6 +1944,8 @@ srs_error_t SrsRtcVideoRecvTrack::check_send_nacks()
{
srs_error_t err = srs_success;
++_srs_pps_svnack->sugar;
uint32_t timeout_nacks = 0;
if ((err = do_check_send_nacks(timeout_nacks)) != srs_success) {
return srs_error_wrap(err, "video");
@ -1989,11 +2031,39 @@ std::string SrsRtcSendTrack::get_track_id()
return track_desc_->id_;
}
void SrsRtcSendTrack::on_recv_nack()
srs_error_t SrsRtcSendTrack::on_recv_nack(const vector<uint16_t>& lost_seqs, SrsRtcPlayStreamStatistic& info)
{
srs_error_t err = srs_success;
++_srs_pps_rnack2->sugar;
SrsRtcTrackStatistic* statistic = statistic_;
statistic->nacks++;
vector<SrsRtpPacket2*> resend_pkts;
for(int i = 0; i < (int)lost_seqs.size(); ++i) {
uint16_t seq = lost_seqs.at(i);
SrsRtpPacket2* pkt = fetch_rtp_packet(seq);
if (pkt == NULL) {
continue;
}
resend_pkts.push_back(pkt);
info.nn_bytes += pkt->nb_bytes();
uint32_t nn = 0;
if (nack_epp->can_print(pkt->header.get_ssrc(), &nn)) {
srs_trace("RTC NACK ARQ seq=%u, ssrc=%u, ts=%u, count=%u/%u, %d bytes", pkt->header.get_sequence(),
pkt->header.get_ssrc(), pkt->header.get_timestamp(), nn, nack_epp->nn_count, pkt->nb_bytes());
}
}
// By default, we send packets by sendmmsg.
if ((err = session_->do_send_packets(resend_pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
return err;
}
SrsRtcAudioSendTrack::SrsRtcAudioSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc)

@ -36,6 +36,7 @@
#include <srs_app_rtc_sdp.hpp>
#include <srs_service_st.hpp>
#include <srs_app_source.hpp>
#include <srs_kernel_rtc_rtp.hpp>
class SrsRequest;
class SrsMetaCache;
@ -525,29 +526,33 @@ public:
protected:
srs_error_t on_nack(SrsRtpPacket2* pkt);
public:
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt) = 0;
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled) = 0;
virtual srs_error_t check_send_nacks() = 0;
protected:
virtual srs_error_t do_check_send_nacks(uint32_t& timeout_nacks);
};
class SrsRtcAudioRecvTrack : public SrsRtcRecvTrack
class SrsRtcAudioRecvTrack : virtual public SrsRtcRecvTrack, virtual public ISrsRtpPacketDecodeHandler
{
public:
SrsRtcAudioRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc);
virtual ~SrsRtcAudioRecvTrack();
public:
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt);
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload);
public:
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled);
virtual srs_error_t check_send_nacks();
};
class SrsRtcVideoRecvTrack : public SrsRtcRecvTrack
class SrsRtcVideoRecvTrack : virtual public SrsRtcRecvTrack, virtual public ISrsRtpPacketDecodeHandler
{
public:
SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* stream_descs);
virtual ~SrsRtcVideoRecvTrack();
public:
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt);
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload);
public:
virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt, bool nack_enabled);
virtual srs_error_t check_send_nacks();
};
@ -577,7 +582,7 @@ public:
public:
virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcPlayStreamStatistic& info) = 0;
virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt) = 0;
virtual void on_recv_nack();
virtual srs_error_t on_recv_nack(const std::vector<uint16_t>& lost_seqs, SrsRtcPlayStreamStatistic& info);
};
class SrsRtcAudioSendTrack : public SrsRtcSendTrack

@ -56,47 +56,6 @@ using namespace std;
#include <srs_app_gb28181.hpp>
#include <srs_app_gb28181_sip.hpp>
// system interval in srs_utime_t,
// all resolution times should be times togother,
// for example, system-interval is x=1s(1000ms),
// then rusage can be 3*x, for instance, 3*1=3s,
// the meminfo canbe 6*x, for instance, 6*1=6s,
// for performance refine, @see: https://github.com/ossrs/srs/issues/194
// @remark, recomment to 1000ms.
#define SRS_SYS_CYCLE_INTERVAL (1000 * SRS_UTIME_MILLISECONDS)
// update time interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_TIME_RESOLUTION_MS_TIMES
#define SRS_SYS_TIME_RESOLUTION_MS_TIMES 1
// update rusage interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_RUSAGE_RESOLUTION_TIMES
#define SRS_SYS_RUSAGE_RESOLUTION_TIMES 3
// update network devices info interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES
#define SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES 3
// update rusage interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_CPU_STAT_RESOLUTION_TIMES
#define SRS_SYS_CPU_STAT_RESOLUTION_TIMES 3
// update the disk iops interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_DISK_STAT_RESOLUTION_TIMES
#define SRS_SYS_DISK_STAT_RESOLUTION_TIMES 6
// update rusage interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_MEMINFO_RESOLUTION_TIMES
#define SRS_SYS_MEMINFO_RESOLUTION_TIMES 6
// update platform info interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES
#define SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES 9
// update network devices info interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES
#define SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 9
std::string srs_listener_type2string(SrsListenerType type)
{
switch (type) {
@ -728,6 +687,8 @@ SrsServer::SrsServer()
http_server = new SrsHttpServer(this);
http_heartbeat = new SrsHttpHeartbeat();
ingester = new SrsIngester();
trd_ = new SrsSTCoroutine("srs", this, _srs_context->get_id());
timer_ = NULL;
}
SrsServer::~SrsServer()
@ -738,7 +699,10 @@ SrsServer::~SrsServer()
void SrsServer::destroy()
{
srs_warn("start destroy server");
srs_freep(trd_);
srs_freep(timer_);
dispose();
srs_freep(http_api_mux);
@ -1101,6 +1065,25 @@ srs_error_t SrsServer::ingest()
return err;
}
srs_error_t SrsServer::start()
{
srs_error_t err = srs_success;
if ((err = _srs_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "sources");
}
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}
if ((err = setup_ticks()) != srs_success) {
return srs_error_wrap(err, "tick");
}
return err;
}
srs_error_t SrsServer::cycle()
{
srs_error_t err = srs_success;
@ -1151,7 +1134,6 @@ srs_error_t SrsServer::cycle()
return err;
}
void SrsServer::on_signal(int signo)
{
if (signo == SRS_SIGNAL_RELOAD) {
@ -1221,134 +1203,132 @@ srs_error_t SrsServer::do_cycle()
{
srs_error_t err = srs_success;
// find the max loop
int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);
// for asprocess.
bool asprocess = _srs_config->get_asprocess();
// the daemon thread, update the time cache
// TODO: FIXME: use SrsHourGlass.
while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
if (handler && (err = handler->on_cycle()) != srs_success) {
return srs_error_wrap(err, "handle callback");
}
// the interval in config.
int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL);
// dynamic fetch the max.
int dynamic_max = srs_max(max, heartbeat_max_resolution);
for (int i = 0; i < dynamic_max; i++) {
srs_usleep(SRS_SYS_CYCLE_INTERVAL);
// asprocess check.
if (asprocess && ::getppid() != ppid) {
return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid());
}
// gracefully quit for SIGINT or SIGTERM or SIGQUIT.
if (signal_fast_quit || signal_gracefully_quit) {
srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit, signal_gracefully_quit);
return err;
}
// for gperf heap checker,
// @see: research/gperftools/heap-checker/heap_checker.cc
// if user interrupt the program, exit to check mem leak.
// but, if gperf, use reload to ensure main return normally,
// because directly exit will cause core-dump.
// asprocess check.
if (asprocess && ::getppid() != ppid) {
return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid());
}
// gracefully quit for SIGINT or SIGTERM or SIGQUIT.
if (signal_fast_quit || signal_gracefully_quit) {
srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit, signal_gracefully_quit);
return err;
}
// for gperf heap checker,
// @see: research/gperftools/heap-checker/heap_checker.cc
// if user interrupt the program, exit to check mem leak.
// but, if gperf, use reload to ensure main return normally,
// because directly exit will cause core-dump.
#ifdef SRS_GPERF_MC
if (signal_gmc_stop) {
srs_warn("gmc got singal to stop server.");
return err;
}
if (signal_gmc_stop) {
srs_warn("gmc got singal to stop server.");
return err;
}
#endif
// do persistence config to file.
if (signal_persistence_config) {
signal_persistence_config = false;
srs_info("get signal to persistence config to file.");
if ((err = _srs_config->persistence()) != srs_success) {
return srs_error_wrap(err, "config persistence to file");
}
srs_trace("persistence config to file success.");
}
// do reload the config.
if (signal_reload) {
signal_reload = false;
srs_info("get signal to reload the config.");
if ((err = _srs_config->reload()) != srs_success) {
return srs_error_wrap(err, "config reload");
}
srs_trace("reload config success.");
}
// notice the stream sources to cycle.
if ((err = _srs_sources->cycle()) != srs_success) {
return srs_error_wrap(err, "source cycle");
}
// update the cache time
if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {
srs_info("update current time cache.");
srs_update_system_time();
}
if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {
srs_info("update resource info, rss.");
srs_update_system_rusage();
}
if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {
srs_info("update cpu info, cpu usage.");
srs_update_proc_stat();
}
if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {
srs_info("update disk info, disk iops.");
srs_update_disk_stat();
}
if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {
srs_info("update memory info, usage/free.");
srs_update_meminfo();
}
if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {
srs_info("update platform info, uptime/load.");
srs_update_platform_info();
}
if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {
srs_info("update network devices info.");
srs_update_network_devices();
}
if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
srs_info("update network server kbps info.");
resample_kbps();
// do persistence config to file.
if (signal_persistence_config) {
signal_persistence_config = false;
srs_info("get signal to persistence config to file.");
if ((err = _srs_config->persistence()) != srs_success) {
return srs_error_wrap(err, "config persistence to file");
}
if (_srs_config->get_heartbeat_enabled()) {
if ((i % heartbeat_max_resolution) == 0) {
srs_info("do http heartbeat, for internal server to report.");
http_heartbeat->heartbeat();
}
srs_trace("persistence config to file success.");
}
// do reload the config.
if (signal_reload) {
signal_reload = false;
srs_info("get signal to reload the config.");
if ((err = _srs_config->reload()) != srs_success) {
return srs_error_wrap(err, "config reload");
}
srs_info("server main thread loop");
srs_trace("reload config success.");
}
srs_usleep(1 * SRS_UTIME_SECONDS);
}
return err;
}
srs_error_t SrsServer::setup_ticks()
{
srs_error_t err = srs_success;
srs_freep(timer_);
timer_ = new SrsHourGlass("srs", this, 1 * SRS_UTIME_SECONDS);
if ((err = timer_->tick(1, 1 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(2, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(3, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(4, 6 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(5, 6 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(6, 9 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(7, 9 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if (_srs_config->get_heartbeat_enabled()) {
if ((err = timer_->tick(9, _srs_config->get_heartbeat_interval())) != srs_success) {
return srs_error_wrap(err, "tick");
}
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}
return err;
}
srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
switch (event) {
case 1: srs_update_system_time(); break;
case 2: srs_update_system_rusage(); break;
case 3: srs_update_proc_stat(); break;
case 4: srs_update_disk_stat(); break;
case 5: srs_update_meminfo(); break;
case 6: srs_update_platform_info(); break;
case 7: srs_update_network_devices(); break;
case 8: resample_kbps(); break;
case 9: http_heartbeat->heartbeat(); break;
}
return err;
}
srs_error_t SrsServer::listen_rtmp()
{
srs_error_t err = srs_success;

@ -38,6 +38,7 @@
#include <srs_service_st.hpp>
#include <srs_app_gb28181.hpp>
#include <srs_app_gb28181_sip.hpp>
#include <srs_app_hourglass.hpp>
class SrsServer;
class SrsHttpServeMux;
@ -260,7 +261,9 @@ public:
// TODO: FIXME: Rename to SrsLiveServer.
// SRS RTMP server, initialize and listen, start connection service thread, destroy client.
class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler, virtual public ISrsResourceManager
class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler
, virtual public ISrsResourceManager, virtual public ISrsCoroutineHandler
, virtual public ISrsHourGlass
{
private:
// TODO: FIXME: Extract an HttpApiServer.
@ -269,6 +272,8 @@ private:
SrsHttpHeartbeat* http_heartbeat;
SrsIngester* ingester;
SrsResourceManager* conn_manager;
SrsCoroutine* trd_;
SrsHourGlass* timer_;
private:
// The pid file fd, lock the file write when server is running.
// @remark the init.d script should cleanup the pid file, when stop service,
@ -315,6 +320,9 @@ public:
virtual srs_error_t register_signal();
virtual srs_error_t http_handle();
virtual srs_error_t ingest();
virtual srs_error_t start();
// interface ISrsCoroutineHandler
public:
virtual srs_error_t cycle();
// server utilities.
public:
@ -337,6 +345,11 @@ private:
// update the global static data, for instance, the current time,
// the cpu/mem/network statistic.
virtual srs_error_t do_cycle();
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
private:
// listen at specified protocol.
virtual srs_error_t listen_rtmp();
virtual srs_error_t listen_http_api();

@ -1689,6 +1689,7 @@ SrsSourceManager* _srs_sources = new SrsSourceManager();
SrsSourceManager::SrsSourceManager()
{
lock = NULL;
timer_ = NULL;
}
SrsSourceManager::~SrsSourceManager()
@ -1696,6 +1697,11 @@ SrsSourceManager::~SrsSourceManager()
srs_mutex_destroy(lock);
}
srs_error_t SrsSourceManager::initialize()
{
return setup_ticks();
}
srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
srs_error_t err = srs_success;
@ -1789,28 +1795,37 @@ void SrsSourceManager::dispose()
return;
}
srs_error_t SrsSourceManager::cycle()
srs_error_t SrsSourceManager::setup_ticks()
{
SrsContextId cid = _srs_context->get_id();
srs_error_t err = do_cycle();
_srs_context->set_id(cid);
srs_error_t err = srs_success;
srs_freep(timer_);
timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS);
if ((err = timer_->tick(1, 1 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}
return err;
}
srs_error_t SrsSourceManager::do_cycle()
srs_error_t SrsSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSource* source = it->second;
// Do cycle source to cleanup components, such as hls dispose.
if ((err = source->cycle()) != srs_success) {
return srs_error_wrap(err, "source=%s/%s cycle", source->source_id().c_str(), source->pre_source_id().c_str());
}
// TODO: FIXME: support source cleanup.
// @see https://github.com/ossrs/srs/issues/713
// @see https://github.com/ossrs/srs/issues/714
@ -1825,7 +1840,7 @@ srs_error_t SrsSourceManager::do_cycle()
_srs_context->set_id(cid);
}
srs_trace("cleanup die source, total=%d", (int)pool.size());
srs_freep(source);
pool.erase(it++);
} else {
@ -1835,7 +1850,7 @@ srs_error_t SrsSourceManager::do_cycle()
++it;
#endif
}
return err;
}

@ -34,6 +34,7 @@
#include <srs_app_reload.hpp>
#include <srs_core_performance.hpp>
#include <srs_service_st.hpp>
#include <srs_app_hourglass.hpp>
class SrsFormat;
class SrsRtmpFormat;
@ -450,15 +451,17 @@ public:
};
// The source manager to create and refresh all stream sources.
class SrsSourceManager
class SrsSourceManager : public ISrsHourGlass
{
private:
srs_mutex_t lock;
std::map<std::string, SrsSource*> pool;
SrsHourGlass* timer_;
public:
SrsSourceManager();
virtual ~SrsSourceManager();
public:
virtual srs_error_t initialize();
// create source when fetch from cache failed.
// @param r the client request.
// @param h the event handler for source.
@ -471,9 +474,10 @@ private:
public:
// dispose and cycle all sources.
virtual void dispose();
virtual srs_error_t cycle();
// interface ISrsHourGlass
private:
virtual srs_error_t do_cycle();
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
public:
// when system exit, destroy th`e sources,
// For gmc to analysis mem leaks.

@ -24,6 +24,6 @@
#ifndef SRS_CORE_VERSION4_HPP
#define SRS_CORE_VERSION4_HPP
#define SRS_VERSION4_REVISION 62
#define SRS_VERSION4_REVISION 71
#endif

@ -1212,6 +1212,11 @@ vector<uint16_t> SrsRtcpNack::get_lost_sns() const
return sn;
}
bool SrsRtcpNack::empty()
{
return lost_sns_.empty();
}
void SrsRtcpNack::set_media_ssrc(uint32_t ssrc)
{
media_ssrc_ = ssrc;

@ -345,6 +345,7 @@ public:
uint32_t get_media_ssrc() const;
std::vector<uint16_t> get_lost_sns() const;
bool empty();
void set_media_ssrc(uint32_t ssrc);
void add_lost_sn(uint16_t sn);

@ -23,6 +23,7 @@
#include <srs_kernel_rtc_rtp.hpp>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sstream>
using namespace std;
@ -33,6 +34,106 @@ using namespace std;
#include <srs_kernel_utility.hpp>
#include <srs_kernel_flv.hpp>
/* @see https://tools.ietf.org/html/rfc1889#section-5.1
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P|X| CC |M| PT | sequence number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| synchronization source (SSRC) identifier |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| contributing source (CSRC) identifiers |
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
uint32_t srs_rtp_fast_parse_ssrc(char* buf, int size)
{
if (size < 12) {
return 0;
}
uint32_t value = 0;
char* pp = (char*)&value;
char* p = buf + 8;
pp[3] = *p++;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
return value;
}
uint8_t srs_rtp_fast_parse_pt(char* buf, int size)
{
if (size < 12) {
return 0;
}
return buf[1] & 0x7f;
}
srs_error_t srs_rtp_fast_parse_twcc(char* buf, int size, SrsRtpExtensionTypes* ext_types, uint16_t& twcc_sn)
{
srs_error_t err = srs_success;
int need_size = 12 /*rtp head fix len*/ + 4 /* extension header len*/ + 3 /* twcc extension len*/;
if(size < (need_size)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "required %d bytes, actual %d", need_size, size);
}
uint8_t first = buf[0];
bool extension = (first & 0x10);
uint8_t cc = (first & 0x0F);
if(!extension) {
return srs_error_new(ERROR_RTC_RTP, "no extension in rtp");
}
need_size += cc * 4; // csrc size
if(size < (need_size)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "required %d bytes, actual %d", need_size, size);
}
buf += 12 + 4*cc;
uint16_t value = *((uint16_t*)buf);
value = ntohs(value);
if(0xBEDE != value) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "no support this type(0x%02x) extension", value);
}
buf += 2;
uint16_t extension_length = ntohs(*((uint16_t*)buf));
buf += 2;
extension_length *= 4;
need_size += extension_length; // entension size
if(size < (need_size)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "required %d bytes, actual %d", need_size, size);
}
while(extension_length > 0) {
uint8_t v = buf[0];
buf++;
extension_length--;
if(0 == v) {
continue;
}
uint8_t id = (v & 0xF0) >>4;
uint8_t len = (v & 0x0F) + 1;
SrsRtpExtensionType xtype = ext_types->get_type(id);
if(xtype == kRtpExtensionTransportSequenceNumber) {
twcc_sn = ntohs(*((uint16_t*)buf));
return err;
} else {
buf += len;
extension_length -= len;
}
}
return err;
}
// If value is newer than pre_valuereturn true; otherwise false
bool srs_seq_is_newer(uint16_t value, uint16_t pre_value)
{
@ -179,6 +280,58 @@ void SrsRtpExtensionTwcc::set_sn(uint16_t sn)
has_twcc_ = true;
}
SrsRtpExtensionOneByte::SrsRtpExtensionOneByte() : has_ext_(false), id_(0), value_(0)
{
}
void SrsRtpExtensionOneByte::set_id(int id)
{
id_ = id;
has_ext_ = true;
}
void SrsRtpExtensionOneByte::set_value(uint8_t value)
{
value_ = value;
has_ext_ = true;
}
srs_error_t SrsRtpExtensionOneByte::decode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
if (!buf->require(2)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2);
}
uint8_t v = buf->read_1bytes();
id_ = (v & 0xF0) >> 4;
uint8_t len = (v & 0x0F);
if(!id_ || len != 0) {
return srs_error_new(ERROR_RTC_RTP, "invalid rtp extension id=%d, len=%d", id_, len);
}
value_ = buf->read_1bytes();
has_ext_ = true;
return err;
}
srs_error_t SrsRtpExtensionOneByte::encode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
if (!buf->require(2)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2);
}
uint8_t id_len = (id_ & 0x0F)<< 4 | 0x00;
buf->write_1bytes(id_len);
buf->write_1bytes(value_);
return err;
}
SrsRtpExtensions::SrsRtpExtensions() : has_ext_(false)
{
}
@ -252,6 +405,11 @@ srs_error_t SrsRtpExtensions::decode_0xbede(SrsBuffer* buf)
return srs_error_wrap(err, "decode twcc extension");
}
has_ext_ = true;
} else if (xtype == kRtpExtensionAudioLevel) {
if((err = audio_level_.decode(buf)) != srs_success) {
return srs_error_wrap(err, "decode audio level extension");
}
has_ext_ = true;
} else {
buf->skip(1 + (len + 1));
}
@ -263,6 +421,7 @@ srs_error_t SrsRtpExtensions::decode_0xbede(SrsBuffer* buf)
uint64_t SrsRtpExtensions::nb_bytes()
{
int size = 4 + (twcc_.has_twcc_ext() ? twcc_.nb_bytes() : 0);
size += (audio_level_.exists() ? audio_level_.nb_bytes() : 0);
// add padding
size += (size % 4 == 0) ? 0 : (4 - size % 4);
return size;
@ -281,6 +440,10 @@ srs_error_t SrsRtpExtensions::encode(SrsBuffer* buf)
len += twcc_.nb_bytes();
}
if (audio_level_.exists()) {
len += audio_level_.nb_bytes();
}
int padding_count = (len % 4 == 0) ? 0 : (4 - len % 4);
len += padding_count;
@ -293,6 +456,12 @@ srs_error_t SrsRtpExtensions::encode(SrsBuffer* buf)
}
}
if (audio_level_.exists()) {
if (srs_success != (err = audio_level_.encode(buf))) {
return srs_error_wrap(err, "encode audio level extension");
}
}
// add padding
while(padding_count > 0) {
buf->write_1bytes(0);
@ -331,6 +500,23 @@ srs_error_t SrsRtpExtensions::set_twcc_sequence_number(uint8_t id, uint16_t sn)
return srs_success;
}
srs_error_t SrsRtpExtensions::get_audio_level(uint8_t& level)
{
if(audio_level_.exists()) {
level = audio_level_.get_value();
return srs_success;
}
return srs_error_new(ERROR_RTC_RTP_MUXER, "not find rtp extension audio level");
}
srs_error_t SrsRtpExtensions::set_audio_level(int id, uint8_t level)
{
has_ext_ = true;
audio_level_.set_id(id);
audio_level_.set_value(level);
return srs_success;
}
SrsRtpHeader::SrsRtpHeader()
{
padding_length = 0;

@ -57,6 +57,12 @@ class SrsBuffer;
class SrsRtpRawPayload;
class SrsRtpFUAPayload2;
class SrsSharedPtrMessage;
class SrsRtpExtensionTypes;
// Fast parse the SSRC from RTP packet. Return 0 if invalid.
uint32_t srs_rtp_fast_parse_ssrc(char* buf, int size);
uint8_t srs_rtp_fast_parse_pt(char* buf, int size);
srs_error_t srs_rtp_fast_parse_twcc(char* buf, int size, SrsRtpExtensionTypes* types, uint16_t& twcc_sn);
// The "distance" between two uint16 number, for example:
// distance(prev_value=3, value=5) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)5) === -2
@ -86,9 +92,12 @@ enum SrsRtpExtensionType
{
kRtpExtensionNone,
kRtpExtensionTransportSequenceNumber,
kRtpExtensionAudioLevel,
kRtpExtensionNumberOfExtensions // Must be the last entity in the enum.
};
const std::string kAudioLevelUri = "urn:ietf:params:rtp-hdrext:ssrc-audio-level";
struct SrsExtensionInfo
{
SrsRtpExtensionType type;
@ -96,7 +105,8 @@ struct SrsExtensionInfo
};
const SrsExtensionInfo kExtensions[] = {
{kRtpExtensionTransportSequenceNumber, std::string("http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01")}
{kRtpExtensionTransportSequenceNumber, std::string("http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01")},
{kRtpExtensionAudioLevel, kAudioLevelUri},
};
class SrsRtpExtensionTypes
@ -116,7 +126,8 @@ private:
uint8_t ids_[kRtpExtensionNumberOfExtensions];
};
class SrsRtpExtensionTwcc : public ISrsCodec
// Note that the extensions should never extends from any class, for performance.
class SrsRtpExtensionTwcc// : public ISrsCodec
{
bool has_twcc_;
uint8_t id_;
@ -138,12 +149,36 @@ public:
virtual uint64_t nb_bytes();
};
class SrsRtpExtensions : public ISrsCodec
// Note that the extensions should never extends from any class, for performance.
class SrsRtpExtensionOneByte// : public ISrsCodec
{
bool has_ext_;
int id_;
uint8_t value_;
public:
SrsRtpExtensionOneByte();
virtual ~SrsRtpExtensionOneByte() {}
bool exists() { return has_ext_; }
int get_id() { return id_; }
uint8_t get_value() { return value_; }
void set_id(int id);
void set_value(uint8_t value);
public:
// ISrsCodec
virtual srs_error_t decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual uint64_t nb_bytes() { return 2; };
};
// Note that the extensions should never extends from any class, for performance.
class SrsRtpExtensions// : public ISrsCodec
{
private:
bool has_ext_;
SrsRtpExtensionTypes types_;
SrsRtpExtensionTwcc twcc_;
SrsRtpExtensionOneByte audio_level_;
public:
SrsRtpExtensions();
virtual ~SrsRtpExtensions();
@ -152,6 +187,8 @@ public:
void set_types_(const SrsRtpExtensionTypes* types);
srs_error_t get_twcc_sequence_number(uint16_t& twcc_sn);
srs_error_t set_twcc_sequence_number(uint8_t id, uint16_t sn);
srs_error_t get_audio_level(uint8_t& level);
srs_error_t set_audio_level(int id, uint8_t level);
// ISrsCodec
public:
@ -163,7 +200,8 @@ public:
virtual uint64_t nb_bytes();
};
class SrsRtpHeader : public ISrsCodec
// Note that the header should never extends from any class, for performance.
class SrsRtpHeader// : public ISrsCodec
{
private:
uint8_t padding_length;
@ -257,7 +295,7 @@ public:
// Whether the packet is Audio packet.
bool is_audio();
// Copy the RTP packet.
SrsRtpPacket2* copy();
virtual SrsRtpPacket2* copy();
// Set RTP header extensions for encoding or decoding header extension
void set_extension_types(const SrsRtpExtensionTypes* v);
// interface ISrsEncoder

@ -25,24 +25,83 @@
#include <srs_kernel_utility.hpp>
SrsKbpsSample::SrsKbpsSample()
SrsRateSample::SrsRateSample()
{
bytes = time = -1;
kbps = 0;
total = time = -1;
rate = 0;
}
SrsKbpsSample::~SrsKbpsSample()
SrsRateSample::~SrsRateSample()
{
}
SrsKbpsSample* SrsKbpsSample::update(int64_t b, srs_utime_t t, int k)
SrsRateSample* SrsRateSample::update(int64_t nn, srs_utime_t t, int k)
{
bytes = b;
total = nn;
time = t;
kbps = k;
rate = k;
return this;
}
SrsPps::SrsPps(SrsWallClock* c)
{
clk_ = c;
sugar = 0;
}
SrsPps::~SrsPps()
{
}
void SrsPps::update()
{
update(sugar);
}
void SrsPps::update(int64_t nn)
{
srs_utime_t now = clk_->now();
if (sample_30s_.time < 0) {
sample_30s_.update(nn, now, 0);
}
if (sample_1m_.time < 0) {
sample_1m_.update(nn, now, 0);
}
if (sample_5m_.time < 0) {
sample_5m_.update(nn, now, 0);
}
if (sample_60m_.time < 0) {
sample_60m_.update(nn, now, 0);
}
if (now - sample_10s_.time >= 10 * SRS_UTIME_SECONDS) {
int kps = (int)((nn - sample_10s_.total) * 1000 / srsu2ms(now - sample_10s_.time));
sample_10s_.update(nn, now, kps);
}
if (now - sample_30s_.time >= 30 * SRS_UTIME_SECONDS) {
int kps = (int)((nn - sample_30s_.total) * 1000 / srsu2ms(now - sample_30s_.time));
sample_30s_.update(nn, now, kps);
}
if (now - sample_1m_.time >= 60 * SRS_UTIME_SECONDS) {
int kps = (int)((nn - sample_1m_.total) * 1000 / srsu2ms(now - sample_1m_.time));
sample_1m_.update(nn, now, kps);
}
if (now - sample_5m_.time >= 300 * SRS_UTIME_SECONDS) {
int kps = (int)((nn - sample_5m_.total) * 1000 / srsu2ms(now - sample_5m_.time));
sample_5m_.update(nn, now, kps);
}
if (now - sample_60m_.time >= 3600 * SRS_UTIME_SECONDS) {
int kps = (int)((nn - sample_60m_.total) * 1000 / srsu2ms(now - sample_60m_.time));
sample_60m_.update(nn, now, kps);
}
}
int SrsPps::r10s()
{
return sample_10s_.rate;
}
SrsKbpsSlice::SrsKbpsSlice(SrsWallClock* c)
{
clk = c;
@ -78,19 +137,19 @@ void SrsKbpsSlice::sample()
}
if (now - sample_30s.time >= 30 * SRS_UTIME_SECONDS) {
int kbps = (int)((total_bytes - sample_30s.bytes) * 8 / srsu2ms(now - sample_30s.time));
int kbps = (int)((total_bytes - sample_30s.total) * 8 / srsu2ms(now - sample_30s.time));
sample_30s.update(total_bytes, now, kbps);
}
if (now - sample_1m.time >= 60 * SRS_UTIME_SECONDS) {
int kbps = (int)((total_bytes - sample_1m.bytes) * 8 / srsu2ms(now - sample_1m.time));
int kbps = (int)((total_bytes - sample_1m.total) * 8 / srsu2ms(now - sample_1m.time));
sample_1m.update(total_bytes, now, kbps);
}
if (now - sample_5m.time >= 300 * SRS_UTIME_SECONDS) {
int kbps = (int)((total_bytes - sample_5m.bytes) * 8 / srsu2ms(now - sample_5m.time));
int kbps = (int)((total_bytes - sample_5m.total) * 8 / srsu2ms(now - sample_5m.time));
sample_5m.update(total_bytes, now, kbps);
}
if (now - sample_60m.time >= 3600 * SRS_UTIME_SECONDS) {
int kbps = (int)((total_bytes - sample_60m.bytes) * 8 / srsu2ms(now - sample_60m.time));
int kbps = (int)((total_bytes - sample_60m.total) * 8 / srsu2ms(now - sample_60m.time));
sample_60m.update(total_bytes, now, kbps);
}
}
@ -188,22 +247,22 @@ int SrsKbps::get_recv_kbps()
int SrsKbps::get_send_kbps_30s()
{
return os.sample_30s.kbps;
return os.sample_30s.rate;
}
int SrsKbps::get_recv_kbps_30s()
{
return is.sample_30s.kbps;
return is.sample_30s.rate;
}
int SrsKbps::get_send_kbps_5m()
{
return os.sample_5m.kbps;
return os.sample_5m.rate;
}
int SrsKbps::get_recv_kbps_5m()
{
return is.sample_5m.kbps;
return is.sample_5m.rate;
}
void SrsKbps::add_delta(int64_t in, int64_t out)
@ -297,3 +356,5 @@ int SrsKbps::size_memory()
return sizeof(SrsKbps);
}
SrsWallClock* _srs_clock = new SrsWallClock();

@ -30,21 +30,46 @@
class SrsWallClock;
/**
* a kbps sample, for example, the kbps at time,
* 10minute kbps sample.
*/
class SrsKbpsSample
// A sample for rate-based stat, such as kbps or kps.
class SrsRateSample
{
public:
int64_t bytes;
int64_t total;
srs_utime_t time;
int kbps;
// kbps or kps
int rate;
public:
SrsKbpsSample();
virtual ~SrsKbpsSample();
SrsRateSample();
virtual ~SrsRateSample();
public:
virtual SrsKbpsSample* update(int64_t b, srs_utime_t t, int k);
virtual SrsRateSample* update(int64_t nn, srs_utime_t t, int k);
};
// A pps manager every some duration.
class SrsPps
{
private:
SrsWallClock* clk_;
private:
// samples
SrsRateSample sample_10s_;
SrsRateSample sample_30s_;
SrsRateSample sample_1m_;
SrsRateSample sample_5m_;
SrsRateSample sample_60m_;
public:
// Sugar for target to stat.
int64_t sugar;
public:
SrsPps(SrsWallClock* clk);
virtual ~SrsPps();
public:
// Update with the nn which is target.
void update();
// Update with the nn.
void update(int64_t nn);
// Get the 10s average stat.
int r10s();
};
/**
@ -82,10 +107,10 @@ public:
// cache for io maybe freed.
int64_t last_bytes;
// samples
SrsKbpsSample sample_30s;
SrsKbpsSample sample_1m;
SrsKbpsSample sample_5m;
SrsKbpsSample sample_60m;
SrsRateSample sample_30s;
SrsRateSample sample_1m;
SrsRateSample sample_5m;
SrsRateSample sample_60m;
public:
// for the delta bytes.
int64_t delta_bytes;
@ -233,4 +258,7 @@ public:
virtual int size_memory();
};
// The global clock.
extern SrsWallClock* _srs_clock;
#endif

@ -151,6 +151,11 @@ srs_thread_t srs_thread_self()
return (srs_thread_t)st_thread_self();
}
void srs_thread_exit(void* retval)
{
st_thread_exit(retval);
}
srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd)
{
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;

@ -57,6 +57,7 @@ extern srs_error_t srs_fd_keepalive(int fd);
// Get current coroutine/thread.
extern srs_thread_t srs_thread_self();
extern void srs_thread_exit(void* retval);
// For client, to open socket and connect to server.
// @param tm The timeout in srs_utime_t.

@ -30,6 +30,119 @@ using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_st.hpp>
#include <srs_service_conn.hpp>
#include <srs_app_conn.hpp>
class MockIDResource : public ISrsResource
{
public:
int id;
MockIDResource(int v) {
id = v;
}
virtual ~MockIDResource() {
}
virtual const SrsContextId& get_id() {
return _srs_context->get_id();
}
virtual std::string desc() {
return "";
}
};
VOID TEST(AppResourceManagerTest, FindByFastID)
{
srs_error_t err = srs_success;
if (true) {
SrsResourceManager m("test");
HELPER_EXPECT_SUCCESS(m.start());
m.add_with_fast_id(101, new MockIDResource(1));
m.add_with_fast_id(102, new MockIDResource(2));
m.add_with_fast_id(103, new MockIDResource(3));
EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(101))->id);
EXPECT_EQ(2, ((MockIDResource*)m.find_by_fast_id(102))->id);
EXPECT_EQ(3, ((MockIDResource*)m.find_by_fast_id(103))->id);
}
if (true) {
SrsResourceManager m("test");
HELPER_EXPECT_SUCCESS(m.start());
MockIDResource* r1 = new MockIDResource(1);
MockIDResource* r2 = new MockIDResource(2);
MockIDResource* r3 = new MockIDResource(3);
m.add_with_fast_id(101, r1);
m.add_with_fast_id(102, r2);
m.add_with_fast_id(103, r3);
EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(101))->id);
EXPECT_EQ(2, ((MockIDResource*)m.find_by_fast_id(102))->id);
EXPECT_EQ(3, ((MockIDResource*)m.find_by_fast_id(103))->id);
m.remove(r2); srs_usleep(0);
EXPECT_TRUE(m.find_by_fast_id(102) == NULL);
}
if (true) {
SrsResourceManager m("test");
HELPER_EXPECT_SUCCESS(m.start());
MockIDResource* r1 = new MockIDResource(1);
MockIDResource* r2 = new MockIDResource(2);
MockIDResource* r3 = new MockIDResource(3);
m.add_with_fast_id(1, r1);
m.add_with_fast_id(100001, r2);
m.add_with_fast_id(1000001, r3);
EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(1))->id);
EXPECT_EQ(2, ((MockIDResource*)m.find_by_fast_id(100001))->id);
EXPECT_EQ(3, ((MockIDResource*)m.find_by_fast_id(1000001))->id);
m.remove(r2); srs_usleep(0);
EXPECT_TRUE(m.find_by_fast_id(100001) == NULL);
m.remove(r3); srs_usleep(0);
EXPECT_TRUE(m.find_by_fast_id(1000001) == NULL);
m.remove(r1); srs_usleep(0);
EXPECT_TRUE(m.find_by_fast_id(1) == NULL);
}
if (true) {
SrsResourceManager m("test");
HELPER_EXPECT_SUCCESS(m.start());
m.add_with_fast_id(101, new MockIDResource(1));
m.add_with_fast_id(10101, new MockIDResource(2));
m.add_with_fast_id(1010101, new MockIDResource(3));
m.add_with_fast_id(101010101, new MockIDResource(4));
m.add_with_fast_id(10101010101LL, new MockIDResource(5));
m.add_with_fast_id(1010101010101LL, new MockIDResource(6));
m.add_with_fast_id(101010101010101LL, new MockIDResource(7));
m.add_with_fast_id(10101010101010101LL, new MockIDResource(8));
m.add_with_fast_id(1010101010101010101ULL, new MockIDResource(9));
m.add_with_fast_id(11010101010101010101ULL, new MockIDResource(10));
EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(101))->id);
EXPECT_EQ(2, ((MockIDResource*)m.find_by_fast_id(10101))->id);
EXPECT_EQ(3, ((MockIDResource*)m.find_by_fast_id(1010101))->id);
EXPECT_EQ(4, ((MockIDResource*)m.find_by_fast_id(101010101))->id);
EXPECT_EQ(5, ((MockIDResource*)m.find_by_fast_id(10101010101LL))->id);
EXPECT_EQ(6, ((MockIDResource*)m.find_by_fast_id(1010101010101LL))->id);
EXPECT_EQ(7, ((MockIDResource*)m.find_by_fast_id(101010101010101LL))->id);
EXPECT_EQ(8, ((MockIDResource*)m.find_by_fast_id(10101010101010101LL))->id);
EXPECT_EQ(9, ((MockIDResource*)m.find_by_fast_id(1010101010101010101ULL))->id);
EXPECT_EQ(10, ((MockIDResource*)m.find_by_fast_id(11010101010101010101ULL))->id);
}
if (true) {
SrsResourceManager m("test");
HELPER_EXPECT_SUCCESS(m.start());
m.add_with_fast_id(101, new MockIDResource(1));
m.add_with_fast_id(101, new MockIDResource(4));
EXPECT_EQ(1, ((MockIDResource*)m.find_by_fast_id(101))->id);
}
}
VOID TEST(AppCoroutineTest, Dummy)
{

@ -1973,7 +1973,7 @@ VOID TEST(ConfigUnitTest, CheckDefaultValuesVhost)
if (true) {
HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF));
EXPECT_EQ(srs_utime_t(9.9 * SRS_UTIME_SECONDS), conf.get_heartbeat_interval());
EXPECT_EQ(srs_utime_t(10 * SRS_UTIME_SECONDS), conf.get_heartbeat_interval());
HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "heartbeat{interval 10;}"));
EXPECT_EQ(10 * SRS_UTIME_SECONDS, conf.get_heartbeat_interval());

Loading…
Cancel
Save