rtc publish, with debug code

pull/1753/head
xiaozhihong 5 years ago
parent c654f1e06e
commit a061d5c3db

2
trunk/configure vendored

@ -279,7 +279,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
"srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr"
"srs_app_coworkers" "srs_app_hybrid")
if [[ $SRS_RTC == YES ]]; then
MODULE_FILES+=("srs_app_rtc" "srs_app_rtc_conn" "srs_app_dtls" "srs_app_audio_recode" "srs_app_sdp")
MODULE_FILES+=("srs_app_rtc" "srs_app_rtc_conn" "srs_app_dtls" "srs_app_audio_recode" "srs_app_sdp" "srs_app_rtp_queue")
fi
if [[ $SRS_GB28181 == YES ]]; then
MODULE_FILES+=("srs_app_gb28181" "srs_app_gb28181_sip")

@ -1068,6 +1068,11 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str
local_media_desc.session_info_.setup_ = "active";
} else if (remote_media_desc.session_info_.setup_ == "actpass") {
local_media_desc.session_info_.setup_ = "passive";
} else {
// @see: https://tools.ietf.org/html/rfc4145#section-4.1
// The default value of the setup attribute in an offer/answer exchange
// is 'active' in the offer and 'passive' in the answer.
local_media_desc.session_info_.setup_ = "passive";
}
if (remote_media_desc.sendonly_) {
@ -1091,6 +1096,299 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str
return err;
}
uint32_t SrsGoApiRtcPublish::ssrc_num = 0;
SrsGoApiRtcPublish::SrsGoApiRtcPublish(SrsRtcServer* rtc_svr)
{
rtc_server = rtc_svr;
}
SrsGoApiRtcPublish::~SrsGoApiRtcPublish()
{
}
// Request:
// POST /rtc/v1/publish/
// {
// "sdp":"offer...", "streamurl":"webrtc://r.ossrs.net/live/livestream",
// "api":'http...", "clientip":"..."
// }
// Response:
// {"sdp":"answer...", "sid":"..."}
// @see https://github.com/rtcdn/rtcdn-draft
srs_error_t SrsGoApiRtcPublish::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
SrsJsonObject* res = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, res);
if ((err = do_serve_http(w, r, res)) != srs_success) {
srs_warn("RTC error %s", srs_error_desc(err).c_str()); srs_freep(err);
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
}
return srs_api_response(w, r, res->dumps());
}
srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res)
{
srs_error_t err = srs_success;
// For each RTC session, we use short-term HTTP connection.
SrsHttpHeader* hdr = w->header();
hdr->set("Connection", "Close");
// Parse req, the request json object, from body.
SrsJsonObject* req = NULL;
if (true) {
string req_json;
if ((err = r->body_read_all(req_json)) != srs_success) {
return srs_error_wrap(err, "read body");
}
SrsJsonAny* json = SrsJsonAny::loads(req_json);
if (!json || !json->is_object()) {
return srs_error_wrap(err, "not json");
}
req = json->to_object();
}
// Fetch params from req object.
SrsJsonAny* prop = NULL;
if ((prop = req->ensure_property_string("sdp")) == NULL) {
return srs_error_wrap(err, "not sdp");
}
string remote_sdp_str = prop->to_str();
if ((prop = req->ensure_property_string("streamurl")) == NULL) {
return srs_error_wrap(err, "not streamurl");
}
string streamurl = prop->to_str();
string clientip;
if ((prop = req->ensure_property_string("clientip")) != NULL) {
clientip = prop->to_str();
}
string api;
if ((prop = req->ensure_property_string("api")) != NULL) {
api = prop->to_str();
}
// Parse app and stream from streamurl.
string app;
string stream_name;
if (true) {
string tcUrl;
srs_parse_rtmp_url(streamurl, tcUrl, stream_name);
int port;
string schema, host, vhost, param;
srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);
}
srs_trace("RTC publish %s, api=%s, clientip=%s, app=%s, stream=%s, offer=%dB",
streamurl.c_str(), api.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length());
// TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information.
SrsSdp remote_sdp;
if ((err = remote_sdp.parse(remote_sdp_str)) != srs_success) {
return srs_error_wrap(err, "parse sdp failed: %s", remote_sdp_str.c_str());
}
if ((err = check_remote_sdp(remote_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp check failed");
}
SrsSdp local_sdp;
if ((err = exchange_sdp(app, stream_name, remote_sdp, local_sdp)) != srs_success) {
return srs_error_wrap(err, "remote sdp have error or unsupport attributes");
}
SrsRequest request;
request.app = app;
request.stream = stream_name;
// TODO: FIXME: Maybe need a better name?
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp, "");
ostringstream os;
if ((err = local_sdp.encode(os)) != srs_success) {
return srs_error_wrap(err, "encode sdp");
}
string local_sdp_str = os.str();
srs_trace("local_sdp=%s", local_sdp_str.c_str());
res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
res->set("server", SrsJsonAny::integer(SrsStatistic::instance()->server_id()));
// TODO: add candidates in response json?
res->set("sdp", SrsJsonAny::str(local_sdp_str.c_str()));
res->set("sessionid", SrsJsonAny::str(rtc_session->id().c_str()));
srs_trace("RTC sid=%s, offer=%dB, answer=%dB", rtc_session->id().c_str(), remote_sdp_str.length(), local_sdp_str.length());
return err;
}
srs_error_t SrsGoApiRtcPublish::check_remote_sdp(const SrsSdp& remote_sdp)
{
srs_error_t err = srs_success;
if (remote_sdp.group_policy_ != "BUNDLE") {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "now only support BUNDLE, group policy=%s", remote_sdp.group_policy_.c_str());
}
if (remote_sdp.media_descs_.empty()) {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no media descriptions");
}
for (std::vector<SrsMediaDesc>::const_iterator iter = remote_sdp.media_descs_.begin(); iter != remote_sdp.media_descs_.end(); ++iter) {
if (iter->type_ != "audio" && iter->type_ != "video") {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "unsupport media type=%s", iter->type_.c_str());
}
if (! iter->rtcp_mux_) {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "now only suppor rtcp-mux");
}
for (std::vector<SrsMediaPayloadType>::const_iterator iter_media = iter->payload_types_.begin(); iter_media != iter->payload_types_.end(); ++iter_media) {
if (iter->recvonly_) {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "publish API only support sendrecv/sendonly");
}
}
}
return err;
}
srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp)
{
srs_error_t err = srs_success;
local_sdp.version_ = "0";
local_sdp.username_ = RTMP_SIG_SRS_SERVER;
local_sdp.session_id_ = srs_int2str((int64_t)this);
local_sdp.session_version_ = "2";
local_sdp.nettype_ = "IN";
local_sdp.addrtype_ = "IP4";
local_sdp.unicast_address_ = "0.0.0.0";
local_sdp.session_name_ = "live_publish_session";
local_sdp.msid_semantic_ = "WMS";
local_sdp.msids_.push_back(app + "/" + stream);
local_sdp.group_policy_ = "BUNDLE";
int mid = 0;
for (int i = 0; i < remote_sdp.media_descs_.size(); ++i) {
const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i];
if (remote_media_desc.is_audio()) {
local_sdp.media_descs_.push_back(SrsMediaDesc("audio"));
} else if (remote_media_desc.is_video()) {
local_sdp.media_descs_.push_back(SrsMediaDesc("video"));
}
SrsMediaDesc& local_media_desc = local_sdp.media_descs_.back();
if (remote_media_desc.is_audio()) {
// TODO: check opus format specific param
std::vector<SrsMediaPayloadType> payloads = remote_media_desc.find_media_with_encoding_name("opus");
for (std::vector<SrsMediaPayloadType>::iterator iter = payloads.begin(); iter != payloads.end(); ++iter) {
// Only choose one match opus codec.
local_media_desc.payload_types_.push_back(*iter);
break;
}
if (local_media_desc.payload_types_.empty()) {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no valid found opus payload type");
}
} else if (remote_media_desc.is_video()) {
std::deque<SrsMediaPayloadType> backup_payloads;
std::vector<SrsMediaPayloadType> payloads = remote_media_desc.find_media_with_encoding_name("H264");
for (std::vector<SrsMediaPayloadType>::iterator iter = payloads.begin(); iter != payloads.end(); ++iter) {
if (iter->format_specific_param_.empty()) {
backup_payloads.push_front(*iter);
continue;
}
H264SpecificParam h264_param;
if ((err = parse_h264_fmtp(iter->format_specific_param_, h264_param)) != srs_success) {
srs_error_reset(err); continue;
}
// Try to pick the "best match" H.264 payload type.
if (h264_param.packetization_mode == "1" && h264_param.level_asymmerty_allow == "1") {
// Only choose first match H.264 payload type.
local_media_desc.payload_types_.push_back(*iter);
break;
}
backup_payloads.push_back(*iter);
}
// Try my best to pick at least one media payload type.
if (local_media_desc.payload_types_.empty() && ! backup_payloads.empty()) {
srs_warn("choose backup H.264 payload type=%d", backup_payloads.front().payload_type_);
local_media_desc.payload_types_.push_back(backup_payloads.front());
}
if (local_media_desc.payload_types_.empty()) {
return srs_error_new(ERROR_RTC_SDP_EXCHANGE, "no found valid H.264 payload type");
}
local_media_desc.payload_types_.back().rtcp_fb_.push_back("rrtr");
}
local_media_desc.mid_ = remote_media_desc.mid_;
local_sdp.groups_.push_back(local_media_desc.mid_);
local_media_desc.port_ = 9;
local_media_desc.protos_ = "UDP/TLS/RTP/SAVPF";
if (remote_media_desc.session_info_.setup_ == "active") {
local_media_desc.session_info_.setup_ = "passive";
} else if (remote_media_desc.session_info_.setup_ == "passive") {
local_media_desc.session_info_.setup_ = "active";
} else if (remote_media_desc.session_info_.setup_ == "actpass") {
local_media_desc.session_info_.setup_ = "passive";
} else {
// @see: https://tools.ietf.org/html/rfc4145#section-4.1
// The default value of the setup attribute in an offer/answer exchange
// is 'active' in the offer and 'passive' in the answer.
local_media_desc.session_info_.setup_ = "passive";
}
local_sdp.media_descs_.back().session_info_.ice_options_ = "trickle";
if (remote_media_desc.sendonly_) {
local_media_desc.recvonly_ = true;
} else if (remote_media_desc.recvonly_) {
local_media_desc.sendonly_ = true;
} else if (remote_media_desc.sendrecv_) {
local_media_desc.sendrecv_ = true;
}
local_media_desc.rtcp_mux_ = true;
SrsSSRCInfo ssrc_info;
ssrc_info.ssrc_ = ++ssrc_num;
ssrc_info.cname_ = "test_sdp_cname";
local_media_desc.ssrc_infos_.push_back(ssrc_info);
}
return err;
}
#endif
SrsGoApiClients::SrsGoApiClients()

@ -184,6 +184,23 @@ private:
srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t check_remote_sdp(const SrsSdp& remote_sdp);
};
class SrsGoApiRtcPublish : public ISrsHttpHandler
{
public:
static uint32_t ssrc_num;
private:
SrsRtcServer* rtc_server;
public:
SrsGoApiRtcPublish(SrsRtcServer* rtc_svr);
virtual ~SrsGoApiRtcPublish();
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp);
srs_error_t check_remote_sdp(const SrsSdp& remote_sdp);
};
#endif
class SrsGoApiClients : public ISrsHttpHandler

@ -212,6 +212,80 @@ srs_error_t SrsRtpOpusMuxer::transcode(SrsSharedPtrMessage* shared_audio, char*
return err;
}
SrsRtpH264Demuxer::SrsRtpH264Demuxer()
{
}
SrsRtpH264Demuxer::~SrsRtpH264Demuxer()
{
}
srs_error_t SrsRtpH264Demuxer::parse(SrsRtpSharedPacket* rtp_pkt)
{
srs_error_t err = srs_success;
SrsRtpH264Header* rtp_h264_header = dynamic_cast<SrsRtpH264Header*>(rtp_pkt->rtp_payload_header);
if (rtp_h264_header == NULL) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid rtp packet");
}
uint8_t* rtp_payload = reinterpret_cast<uint8_t*>(rtp_pkt->rtp_payload());
int rtp_payload_size = rtp_pkt->rtp_payload_size();
if (rtp_payload_size == 0) {
srs_verbose("seq=%u, empty payload", rtp_pkt->rtp_header.get_sequence());
return err;
}
uint8_t nal_type = rtp_payload[0] & kNalTypeMask;
if (nal_type >= 1 && nal_type <= 23) {
srs_verbose("seq=%u, single nalu", rtp_pkt->rtp_header.get_sequence());
rtp_h264_header->is_first_packet_of_frame = true;
rtp_h264_header->is_last_packet_of_frame = true;
rtp_h264_header->nalu_type = nal_type;
rtp_h264_header->nalu_header = rtp_payload[0];
rtp_h264_header->nalu_offset.push_back(make_pair(0, rtp_payload_size));
} else if (nal_type == kFuA) {
srs_verbose("seq=%u, fu-a", rtp_pkt->rtp_header.get_sequence());
if ((rtp_payload[1] & kStart)) {
rtp_h264_header->is_first_packet_of_frame = true;
}
if ((rtp_payload[1] & kEnd)) {
rtp_h264_header->is_last_packet_of_frame = true;
}
rtp_h264_header->nalu_type = nal_type;
rtp_h264_header->nalu_header = (rtp_payload[0] & (~kNalTypeMask)) | (rtp_payload[1] & kNalTypeMask);
rtp_h264_header->nalu_offset.push_back(make_pair(2, rtp_payload_size - 2));
} else if (nal_type == kStapA) {
srs_verbose("seq=%u, stap-a", rtp_pkt->rtp_header.get_sequence());
int i = 1;
rtp_h264_header->is_first_packet_of_frame = true;
rtp_h264_header->is_last_packet_of_frame = true;
rtp_h264_header->nalu_type = nal_type;
while (i < rtp_payload_size) {
srs_verbose("stap-a cur index=%s", srs_string_dumps_hex(reinterpret_cast<const char*>(rtp_payload + i), 2).c_str());
uint16_t nal_len = (rtp_payload[i]) << 8 | rtp_payload[i + 1];
if (nal_len > rtp_payload_size - i) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid stap-a packet, nal len=%u, i=%d, rtp_payload_size=%d", nal_len, i, rtp_payload_size);
}
rtp_h264_header->nalu_offset.push_back(make_pair(i + 2, nal_len));
i += nal_len + 2;
}
if (i != rtp_payload_size) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid stap-a packet");
}
} else {
srs_verbose("payload size=%d, payload=%s", rtp_payload_size, srs_string_dumps_hex(rtp_pkt->payload, rtp_pkt->size).c_str());
return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid h264 rtp packet");
}
return err;
}
SrsRtc::SrsRtc()
{
req = NULL;

@ -78,6 +78,15 @@ public:
srs_error_t transcode(SrsSharedPtrMessage* shared_audio, char* adts_audio, int nn_adts_audio);
};
class SrsRtpH264Demuxer
{
public:
SrsRtpH264Demuxer();
virtual ~SrsRtpH264Demuxer();
public:
srs_error_t parse(SrsRtpSharedPacket* rtp_pkt);
};
class SrsRtc
{
private:

@ -54,6 +54,7 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_app_config.hpp>
#include <srs_app_rtc.hpp>
#include <srs_app_rtp_queue.hpp>
#include <srs_app_source.hpp>
#include <srs_app_server.hpp>
#include <srs_service_utility.hpp>
@ -123,6 +124,44 @@ static std::vector<std::string> get_candidate_ips()
return candidate_ips;
}
static map<uint32_t, uint64_t> ssrc_lxr;
uint64_t SrsNtp::kMagicNtpFractionalUnit = 1ULL << 32;
SrsNtp::SrsNtp()
{
system_ms_ = 0;
ntp_ = 0;
ntp_second_ = 0;
ntp_fractions_ = 0;
}
SrsNtp::~SrsNtp()
{
}
SrsNtp SrsNtp::from_time_ms(uint64_t ms)
{
SrsNtp srs_ntp;
srs_ntp.system_ms_ = ms;
srs_ntp.ntp_second_ = ms / 1000;
srs_ntp.ntp_fractions_ = (static_cast<double>(ms % 1000 / 1000.0)) * kMagicNtpFractionalUnit;
srs_ntp.ntp_ = (static_cast<uint64_t>(srs_ntp.ntp_second_) << 32) | srs_ntp.ntp_fractions_;
return srs_ntp;
}
SrsNtp SrsNtp::to_time_ms(uint64_t ntp)
{
SrsNtp srs_ntp;
srs_ntp.ntp_ = ntp;
srs_ntp.ntp_second_ = (ntp & 0xFFFFFFFF00000000ULL) >> 32;
srs_ntp.ntp_fractions_ = (ntp & 0x00000000FFFFFFFFULL);
srs_ntp.system_ms_ = (static_cast<uint64_t>(srs_ntp.ntp_second_) * 1000) +
(static_cast<double>(static_cast<uint64_t>(srs_ntp.ntp_fractions_) * 1000.0) / kMagicNtpFractionalUnit);
return srs_ntp;
}
SrsDtlsSession::SrsDtlsSession(SrsRtcSession* s)
{
rtc_session = s;
@ -1415,6 +1454,356 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes
return err;
}
SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
{
rtc_session = session;
rtp_h264_demuxer = new SrsRtpH264Demuxer();
rtp_video_queue = new SrsRtpQueue(1000);
rtp_audio_queue = new SrsRtpQueue(100, true);
source = NULL;
}
SrsRtcPublisher::~SrsRtcPublisher()
{
srs_freep(rtp_h264_demuxer);
srs_freep(rtp_video_queue);
srs_freep(rtp_audio_queue);
}
void SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsRequest request)
{
video_ssrc = vssrc;
audio_ssrc = assrc;
this->request = request;
srs_verbose("video_ssrc=%u, audio_ssrc=%u", video_ssrc, audio_ssrc);
}
srs_error_t SrsRtcPublisher::on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf)
{
srs_error_t err = srs_success;
SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket();
SrsAutoFree(SrsRtpSharedPacket, rtp_shared_pkt);
if ((err = rtp_shared_pkt->decode(buf, nb_buf)) != srs_success) {
return srs_error_wrap(err, "rtp packet decode failed");
}
uint32_t ssrc = rtp_shared_pkt->rtp_header.get_ssrc();
if (ssrc == audio_ssrc) {
return on_audio(skt, rtp_shared_pkt);
} else if (ssrc == video_ssrc) {
return on_video(skt, rtp_shared_pkt);
}
return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc);
}
srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt)
{
srs_error_t err = srs_success;
return err;
vector<uint16_t> nack_seqs;
rtp_audio_queue->nack_.get_nack_seqs(nack_seqs);
vector<uint16_t>::iterator iter = nack_seqs.begin();
while (iter != nack_seqs.end()) {
char buf[1024];
SrsBuffer stream(buf, sizeof(buf));
stream.write_1bytes(0x81);
stream.write_1bytes(kRtpFb);
stream.write_2bytes(3);
stream.write_4bytes(audio_ssrc);
stream.write_4bytes(audio_ssrc);
uint16_t pid = *iter;
uint16_t blp = 0;
srs_verbose("pid=%u", pid);
while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) {
blp |= (1 << (*(iter + 1) - pid - 1));
srs_verbose("blp=%u", *(iter+1));
++iter;
}
stream.write_2bytes(pid);
stream.write_2bytes(blp);
srs_verbose("nack dump=%s", srs_string_dumps_hex(stream.data(), stream.pos()).c_str());
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if (rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) {
skt->sendto(protected_buf, nb_protected_buf, 0);
//skt->sendto(stream.data(), stream.pos(), 0);
srs_verbose("send nack req, len=%d", nb_protected_buf);
} else {
srs_verbose("send nack failed, because of protect rtcp failed");
}
++iter;
}
rtp_pkt->rtp_payload_header = new SrsRtpOpusHeader();
rtp_pkt->rtp_payload_header->is_first_packet_of_frame = true;
rtp_pkt->rtp_payload_header->is_last_packet_of_frame = true;
rtp_audio_queue->insert(rtp_pkt);
std::vector<std::vector<SrsRtpSharedPacket*> > frames;
rtp_audio_queue->get_and_clean_collected_frames(frames);
for (size_t i = 0; i < frames.size(); ++i) {
if (! frames[i].empty()) {
srs_verbose("collect %d audio frames, seq range %u,%u",
frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence());
}
for (size_t n = 0; n < frames[i].size(); ++n) {
srs_freep(frames[i][n]);
}
}
return err;
}
srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt)
{
srs_error_t err = srs_success;
vector<uint16_t> nack_seqs;
rtp_video_queue->nack_.get_nack_seqs(nack_seqs);
vector<uint16_t>::iterator iter = nack_seqs.begin();
while (iter != nack_seqs.end()) {
char buf[1024];
SrsBuffer stream(buf, sizeof(buf));
stream.write_1bytes(0x81);
stream.write_1bytes(kRtpFb);
stream.write_2bytes(3);
stream.write_4bytes(video_ssrc);
stream.write_4bytes(video_ssrc);
uint16_t pid = *iter;
uint16_t blp = 0;
srs_verbose("pid=%u", pid);
while (iter + 1 != nack_seqs.end() && (*(iter + 1) - pid <= 15)) {
blp |= (1 << (*(iter + 1) - pid - 1));
srs_verbose("blp=%u", *(iter+1));
++iter;
}
stream.write_2bytes(pid);
stream.write_2bytes(blp);
srs_verbose("nack dump=%s", srs_string_dumps_hex(stream.data(), stream.pos()).c_str());
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if (rtc_session->dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) {
skt->sendto(protected_buf, nb_protected_buf, 0);
//skt->sendto(stream.data(), stream.pos(), 0);
srs_verbose("send nack req, len=%d", nb_protected_buf);
} else {
srs_verbose("send nack failed, because of protect rtcp failed");
}
++iter;
}
rtp_pkt->rtp_payload_header = new SrsRtpH264Header();
if ((err = rtp_h264_demuxer->parse(rtp_pkt)) != srs_success) {
return srs_error_wrap(err, "rtp h264 demux failed");
}
rtp_video_queue->insert(rtp_pkt);
std::vector<std::vector<SrsRtpSharedPacket*> > frames;
rtp_video_queue->get_and_clean_collected_frames(frames);
for (size_t i = 0; i < frames.size(); ++i) {
if (! frames[i].empty()) {
srs_verbose("collect %d video frames, seq range %u,%u", frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence());
}
int frame_size = 5;
vector<uint32_t> nalu_len;
uint32_t len = 0;
for (size_t n = 0; n < frames[i].size(); ++n) {
SrsRtpH264Header* rtp_h264_header = dynamic_cast<SrsRtpH264Header*>(frames[i][n]->rtp_payload_header);
for (size_t j = 0; j < rtp_h264_header->nalu_offset.size(); ++j) {
if (rtp_h264_header->nalu_type != kFuA) {
uint8_t* p = reinterpret_cast<uint8_t*>(frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first);
if (((p[0] & kNalTypeMask) != SrsAvcNaluTypeAccessUnitDelimiter) &&
((p[0] & kNalTypeMask) != SrsAvcNaluTypeSEI) &&
((p[0] & kNalTypeMask) != SrsAvcNaluTypeSPS) &&
((p[0] & kNalTypeMask) != SrsAvcNaluTypePPS)) {
frame_size += rtp_h264_header->nalu_offset[j].second + 4;
nalu_len.push_back(rtp_h264_header->nalu_offset[j].second);
}
} else {
if (frames[i][n]->rtp_payload_header->is_first_packet_of_frame) {
frame_size += 5;
len += 1;
}
frame_size += rtp_h264_header->nalu_offset[j].second;
len += rtp_h264_header->nalu_offset[j].second;
if (frames[i][n]->rtp_payload_header->is_last_packet_of_frame) {
nalu_len.push_back(len);
len = 0;
}
}
}
}
uint8_t* frame = new uint8_t[frame_size];
int frame_len = 5;
bool video_header_change = false;
int64_t timestamp = 0;
bool idr = false;
size_t len_index = 0;
for (size_t n = 0; n < frames[i].size(); ++n) {
SrsRtpH264Header* rtp_h264_header = dynamic_cast<SrsRtpH264Header*>(frames[i][n]->rtp_payload_header);
for (size_t j = 0; j < rtp_h264_header->nalu_offset.size(); ++j) {
timestamp = frames[i][n]->rtp_header.get_timestamp();
uint8_t* p = reinterpret_cast<uint8_t*>(frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first);
srs_verbose("nalu_type=%u, %02X", rtp_h264_header->nalu_type, p[0]);
if (rtp_h264_header->nalu_type != kFuA) {
if ((p[0] & kNalTypeMask) == SrsAvcNaluTypeSPS) {
srs_verbose("sps");
string cur_sps = string((char*)p, rtp_h264_header->nalu_offset[j].second);
if (! cur_sps.empty() && sps != cur_sps) {
video_header_change = true;
sps = cur_sps;
}
} else if ((p[0] & kNalTypeMask) == SrsAvcNaluTypePPS) {
srs_verbose("pps");
string cur_pps = string((char*)p, rtp_h264_header->nalu_offset[j].second);
if (! cur_pps.empty() && pps != cur_pps) {
video_header_change = true;
pps = cur_pps;
}
} else if (((p[0] & kNalTypeMask) != SrsAvcNaluTypeAccessUnitDelimiter) && ((p[0] & kNalTypeMask) != SrsAvcNaluTypeSEI)) {
uint32_t len = nalu_len[len_index++];
srs_verbose("nalu len=%u", len);
SrsBuffer stream((char*)frame + frame_len, 4);
stream.write_4bytes(len);
frame_len += 4;
memcpy(frame + frame_len, p, rtp_h264_header->nalu_offset[j].second);
frame_len += rtp_h264_header->nalu_offset[j].second;
}
} else {
if (frames[i][n]->rtp_payload_header->is_first_packet_of_frame) {
uint32_t len = nalu_len[len_index++];
srs_verbose("nalu len=%u", len);
SrsBuffer stream((char*)frame + frame_len, 4);
stream.write_4bytes(len);
frame_len += 4;
frame[frame_len++] = rtp_h264_header->nalu_header;
if ((rtp_h264_header->nalu_header & kNalTypeMask) == SrsAvcNaluTypeIDR) {
srs_verbose("idr");
idr = true;
}
}
memcpy(frame + frame_len, frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first,
rtp_h264_header->nalu_offset[j].second);
frame_len += rtp_h264_header->nalu_offset[j].second;
}
}
srs_freep(frames[i][n]);
}
if (video_header_change) {
srs_verbose("sps/pps change or init");
if (source == NULL) {
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
source->on_publish();
}
uint8_t* video_header = new uint8_t[1500];
SrsBuffer *stream = new SrsBuffer((char*)video_header, 1500);
SrsAutoFree(SrsBuffer, stream);
stream->write_1bytes(0x17);
stream->write_1bytes(0x00);
stream->write_1bytes(0x00);
stream->write_1bytes(0x00);
stream->write_1bytes(0x00);
// NAL SIZE 61 76 63 43 01 42 C0 1E FF E1 SPS_LEN SPS 01 PPS_LEN PPS
stream->write_1bytes(0x01);
stream->write_1bytes(0x42);
stream->write_1bytes(0xC0);
stream->write_1bytes(0x1E);
stream->write_1bytes(0xFF);
stream->write_1bytes(0xE1);
stream->write_2bytes(sps.size());
stream->write_string(sps);
stream->write_1bytes(0x01);
stream->write_2bytes(pps.size());
stream->write_string(pps);
SrsMessageHeader header;
header.message_type = 9;
header.timestamp = timestamp / 90;
SrsCommonMessage* shared_video = new SrsCommonMessage();
SrsAutoFree(SrsCommonMessage, shared_video);
shared_video->create(&header, reinterpret_cast<char*>(video_header), stream->pos());
srs_error_t e = source->on_video(shared_video);
if (e != srs_success) {
srs_warn("on video header err=%s", srs_error_desc(e).c_str());
}
srs_verbose("rtp on video header");
}
if (! sps.empty() && ! pps.empty()) {
if (source == NULL) {
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
source->on_publish();
}
if (idr) {
frame[0] = 0x17;
} else {
frame[0] = 0x27;
}
frame[1] = 0x01;
frame[2] = 0x00;
frame[3] = 0x00;
frame[4] = 0x00;
SrsMessageHeader header;
header.message_type = 9;
header.timestamp = timestamp / 90;
SrsCommonMessage* shared_video = new SrsCommonMessage();
SrsAutoFree(SrsCommonMessage, shared_video);
shared_video->create(&header, reinterpret_cast<char*>(frame), frame_len);
srs_error_t e = source->on_video(shared_video);
if (e != srs_success) {
srs_warn("on video err=%s", srs_error_desc(e).c_str());
}
srs_verbose("rtp on video");
}
}
return err;
}
SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id)
{
rtc_server = rtc_svr;
@ -1435,6 +1824,8 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const
// TODO: FIXME: Support reload.
sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req.vhost);
rtc_publisher = new SrsRtcPublisher(this);
}
SrsRtcSession::~SrsRtcSession()
@ -1686,6 +2077,263 @@ srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxS
return err;
}
srs_error_t SrsRtcSession::on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt)
{
srs_error_t err = srs_success;
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P|reserved | PT=XR=207 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
: report blocks :
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
SrsBuffer stream(buf, nb_buf);
uint8_t first = stream.read_1bytes();
uint8_t pt = stream.read_1bytes();
uint16_t length = (stream.read_2bytes() + 1) * 4;
uint32_t ssrc = stream.read_4bytes();
if (length != nb_buf) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet, length=%u, nb_buf=%d", length, nb_buf);
}
while (stream.pos() + 4 < length) {
uint8_t bt = stream.read_1bytes();
stream.skip(1);
uint16_t block_length = (stream.read_2bytes() + 1) * 4;
srs_verbose("XR, bt=%u", bt);
if (stream.pos() + block_length - 4 > nb_buf) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid XR packet block, block_length=%u, nb_buf=%d", block_length, nb_buf);
}
if (bt == 5) {
for (int i = 4; i < block_length; i += 12) {
uint32_t ssrc = stream.read_4bytes();
uint32_t lrr = stream.read_4bytes();
uint32_t dlrr = stream.read_4bytes();
SrsNtp cur_ntp = SrsNtp::from_time_ms(srs_update_system_time()/1000);
uint32_t compact_ntp = (cur_ntp.ntp_second_ << 16) | (cur_ntp.ntp_fractions_ >> 16);
int rtt_ntp = compact_ntp - lrr - dlrr;
int rtt = ((rtt_ntp * 1000) >> 16) + ((rtt_ntp >> 16) * 1000);
/*
lsr = (srs_ntp.ntp_second_ << 16) | (srs_ntp.ntp_fractions_ >> 16);
uint32_t dlsr = (srs_update_system_time() - ssrc_lsr[ssrc_of_sender]) / 1000;
rr_dlsr = ((dlsr / 1000) << 16) | ((dlsr % 1000) * 65536 / 1000);
*/
srs_verbose("ssrc=%u, compact_ntp=%u, lrr=%u, dlrr=%u, rtt=%d",
ssrc, compact_ntp, lrr, dlrr, rtt);
if (ssrc == rtc_publisher->video_ssrc) {
rtc_publisher->rtp_video_queue->update_rtt(rtt);
} else if (ssrc == rtc_publisher->audio_ssrc) {
rtc_publisher->rtp_audio_queue->update_rtt(rtt);
}
}
}
}
return err;
}
srs_error_t SrsRtcSession::on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt)
{
srs_error_t err = srs_success;
if (nb_buf < 28) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp sender report packet, nb_buf=%d", nb_buf);
}
SrsBuffer* stream = new SrsBuffer(buf, nb_buf);
SrsAutoFree(SrsBuffer, stream);
srs_verbose("SS=%s", srs_string_dumps_hex(buf, nb_buf).c_str());
// @see: https://tools.ietf.org/html/rfc3550#section-6.4.1
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
header |V=2|P| RC | PT=SR=200 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of sender |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
sender | NTP timestamp, most significant word |
info +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| NTP timestamp, least significant word |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| RTP timestamp |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| sender's packet count |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| sender's octet count |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
report | SSRC_1 (SSRC of first source) |
block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
1 | fraction lost | cumulative number of packets lost |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| extended highest sequence number received |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| interarrival jitter |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| last SR (LSR) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| delay since last SR (DLSR) |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
report | SSRC_2 (SSRC of second source) |
block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
2 : ... :
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| profile-specific extensions |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
uint8_t first = stream->read_1bytes();
//uint8_t version = first & 0xC0;
//uint8_t padding = first & 0x20;
uint8_t rc = first & 0x1F;
/*uint8_t payload_type = */stream->read_1bytes();
uint16_t length = stream->read_2bytes();
if (((length + 1) * 4) != (rc * 24 + 28)) {
return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtcp sender report packet, length=%u, rc=%u", length, rc);
}
uint32_t ssrc_of_sender = stream->read_4bytes();
uint64_t ntp = stream->read_8bytes();
SrsNtp srs_ntp = SrsNtp::to_time_ms(ntp);
uint32_t rtp_time = stream->read_4bytes();
uint32_t sender_packet_count = stream->read_4bytes();
uint32_t sender_octec_count = stream->read_4bytes();
SrsNtp cur_ntp = SrsNtp::from_time_ms(srs_update_system_time()/1000);
srs_verbose("sender report, ssrc_of_sender=%u, ntp={%lu,%lu}, cur_ntp={%lu,%lu},rtp_time=%u, sender_packet_count=%u, sender_octec_count=%u",
ssrc_of_sender, srs_ntp.ntp_, srs_ntp.system_ms_, cur_ntp.ntp_, cur_ntp.system_ms_, rtp_time, sender_packet_count, sender_octec_count);
for (int i = 0; i < rc; ++i) {
uint32_t ssrc = stream->read_4bytes();
uint8_t fraction_lost = stream->read_1bytes();
uint32_t cumulative_number_of_packets_lost = stream->read_3bytes();
uint32_t highest_seq = stream->read_4bytes();
uint32_t jitter = stream->read_4bytes();
uint32_t lst = stream->read_4bytes();
uint32_t dlsr = stream->read_4bytes();
(void)ssrc; (void)fraction_lost; (void)cumulative_number_of_packets_lost; (void)highest_seq; (void)jitter; (void)lst; (void)dlsr;
srs_verbose("sender report, ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, highest_seq=%u, jitter=%u, lst=%u, dlst=%u",
ssrc, fraction_lost, cumulative_number_of_packets_lost, highest_seq, jitter, lst, dlsr);
}
static map<uint32_t, uint64_t> ssrc_lsr;
// Response RR
{
char buf[1024];
SrsBuffer stream(buf, sizeof(buf));
stream.write_1bytes(0x81);
stream.write_1bytes(kRR);
stream.write_2bytes(7);
stream.write_4bytes(ssrc_of_sender);
SrsRtpQueue* rtp_queue = NULL;
string type = "unknown";
if (ssrc_of_sender == rtc_publisher->video_ssrc) {
rtp_queue = rtc_publisher->rtp_video_queue;
type = "video";
} else if (ssrc_of_sender == rtc_publisher->audio_ssrc) {
rtp_queue = rtc_publisher->rtp_audio_queue;
type = "audio";
}
uint8_t fraction_lost = rtp_queue->get_fraction_lost();
uint32_t cumulative_number_of_packets_lost = rtp_queue->get_cumulative_number_of_packets_lost() & 0x7FFFFF;
uint32_t extended_highest_sequence = rtp_queue->get_extended_highest_sequence();
uint32_t interarrival_jitter = rtp_queue->get_interarrival_jitter();
uint32_t lsr = 0;
uint32_t rr_dlsr = 0;
if (ssrc_lsr[ssrc_of_sender] > 0) {
lsr = (srs_ntp.ntp_second_ << 16) | (srs_ntp.ntp_fractions_ >> 16);
uint32_t dlsr = (srs_update_system_time() - ssrc_lsr[ssrc_of_sender]) / 1000;
rr_dlsr = ((dlsr / 1000) << 16) | ((dlsr % 1000) * 65536 / 1000);
}
stream.write_4bytes(ssrc_of_sender);
stream.write_1bytes(fraction_lost);
stream.write_3bytes(cumulative_number_of_packets_lost);
stream.write_4bytes(extended_highest_sequence);
stream.write_4bytes(interarrival_jitter);
stream.write_4bytes(lsr);
stream.write_4bytes(rr_dlsr);
srs_verbose("RR type=%s, ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, extended_highest_sequence=%u, interarrival_jitter=%u",
type.c_str(), ssrc_of_sender, fraction_lost, cumulative_number_of_packets_lost, extended_highest_sequence, interarrival_jitter);
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if (dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) {
skt->sendto(protected_buf, nb_protected_buf, 0);
}
// XR
{
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P|reserved | PT=XR=207 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
: report blocks :
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| BT=4 | reserved | block length = 2 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| NTP timestamp, most significant word |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| NTP timestamp, least significant word |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
char buf[1024];
SrsBuffer stream(buf, sizeof(buf));
stream.write_1bytes(0x80);
stream.write_1bytes(kXR);
stream.write_2bytes(4);
stream.write_4bytes(ssrc_of_sender);
stream.write_1bytes(4);
stream.write_1bytes(0);
stream.write_2bytes(2);
stream.write_4bytes(cur_ntp.ntp_second_);
stream.write_4bytes(cur_ntp.ntp_fractions_);
ssrc_lxr[ssrc_of_sender] = srs_ntp.system_ms_;
//skt->sendto(stream.data(), stream.pos(), 0);
char protected_buf[kRtpPacketSize];
int nb_protected_buf = stream.pos();
if (dtls_session->protect_rtcp(protected_buf, stream.data(), nb_protected_buf) == srs_success) {
skt->sendto(protected_buf, nb_protected_buf, 0);
}
}
}
ssrc_lsr[ssrc_of_sender] = srs_update_system_time();
return err;
}
srs_error_t SrsRtcSession::on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt)
{
srs_error_t err = srs_success;
@ -1757,6 +2405,31 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* skt)
{
// FIXME:
if (true)
{
uint32_t video_ssrc = 0;
uint32_t audio_ssrc = 0;
uint16_t video_payload_type = 0;
uint16_t audio_payload_type = 0;
for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) {
const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i];
if (media_desc.is_audio()) {
if (! media_desc.ssrc_infos_.empty()) {
audio_ssrc = media_desc.ssrc_infos_[0].ssrc_;
audio_payload_type = media_desc.payload_types_[0].payload_type_;
}
} else if (media_desc.is_video()) {
if (! media_desc.ssrc_infos_.empty()) {
video_ssrc = media_desc.ssrc_infos_[0].ssrc_;
video_payload_type = media_desc.payload_types_[0].payload_type_;
}
}
}
rtc_publisher->initialize(video_ssrc, audio_ssrc, request);
}
srs_trace("rtc session=%s, to=%dms connection established", id().c_str(), srsu2msi(sessionStunTimeout));
return start_play(skt);
}
@ -1834,6 +2507,7 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt)
switch (payload_type) {
case kSR: {
err = on_rtcp_sender_report(ph, length, skt);
break;
}
case kRR: {
@ -1856,6 +2530,10 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt)
case kPsFb: {
err = on_rtcp_ps_feedback(ph, length, skt);
break;
}
case kXR: {
err = on_rtcp_xr(ph, length, skt);
break;
}
default:{
return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type);
@ -1874,6 +2552,23 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt)
return err;
}
srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* skt)
{
srs_error_t err = srs_success;
if (dtls_session == NULL) {
return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done");
}
char* unprotected_buf = new char[1460];
int nb_unprotected_buf = skt->size();
if ((err = dtls_session->unprotect_rtp(unprotected_buf, skt->data(), nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtp unprotect failed");
}
return rtc_publisher->on_rtp(skt, unprotected_buf, nb_unprotected_buf);
}
SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s)
{
lfd = NULL;
@ -2256,7 +2951,11 @@ srs_error_t SrsRtcServer::listen_api()
// TODO: FIXME: Fetch api from hybrid manager.
SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();
if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {
return srs_error_wrap(err, "handle sdp");
return srs_error_wrap(err, "handle play");
}
if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {
return srs_error_wrap(err, "handle publish");
}
return err;
@ -2369,9 +3068,7 @@ srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* skt)
if (is_rtcp(reinterpret_cast<const uint8_t*>(skt->data()), skt->size())) {
err = rtc_session->on_rtcp(skt);
} else {
// We disable it because no RTP for player.
// see https://github.com/ossrs/srs/blob/018577e685a07d9de7a47354e7a9c5f77f5f4202/trunk/src/app/srs_app_rtc_conn.cpp#L1081
// err = rtc_session->on_rtp(skt);
err = rtc_session->on_rtp(skt);
}
return err;

@ -51,6 +51,8 @@ class SrsSharedPtrMessage;
class SrsSource;
class SrsRtpPacket2;
class ISrsUdpSender;
class SrsRtpQueue;
class SrsRtpH264Demuxer;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -61,6 +63,7 @@ const uint8_t kApp = 204;
// @see: https://tools.ietf.org/html/rfc4585#section-6.1
const uint8_t kRtpFb = 205;
const uint8_t kPsFb = 206;
const uint8_t kXR = 207;
// @see: https://tools.ietf.org/html/rfc4585#section-6.3
const uint8_t kPLI = 1;
@ -68,6 +71,23 @@ const uint8_t kSLI = 2;
const uint8_t kRPSI = 3;
const uint8_t kAFB = 15;
class SrsNtp
{
public:
uint64_t system_ms_;
uint64_t ntp_;
uint32_t ntp_second_;
uint32_t ntp_fractions_;
public:
SrsNtp();
virtual ~SrsNtp();
public:
static SrsNtp from_time_ms(uint64_t ms);
static SrsNtp to_time_ms(uint64_t ntp);
public:
static uint64_t kMagicNtpFractionalUnit;
};
enum SrsRtcSessionStateType
{
// TODO: FIXME: Should prefixed by enum name.
@ -232,9 +252,37 @@ private:
srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets);
};
class SrsRtcPublisher
{
friend class SrsRtcSession;
private:
SrsRtcSession* rtc_session;
uint32_t video_ssrc;
uint32_t audio_ssrc;
private:
SrsRtpH264Demuxer* rtp_h264_demuxer;
SrsRtpQueue* rtp_video_queue;
SrsRtpQueue* rtp_audio_queue;
private:
SrsRequest request;
SrsSource* source;
std::string sps;
std::string pps;
public:
SrsRtcPublisher(SrsRtcSession* session);
virtual ~SrsRtcPublisher();
public:
void initialize(uint32_t vssrc, uint32_t assrc, SrsRequest request);
srs_error_t on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf);
private:
srs_error_t on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt);
srs_error_t on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt);
};
class SrsRtcSession
{
friend class SrsRtcSenderThread;
friend class SrsRtcPublisher;
private:
SrsRtcServer* rtc_server;
SrsSdp remote_sdp;
@ -258,6 +306,8 @@ private:
public:
SrsRequest request;
SrsSource* source;
private:
SrsRtcPublisher* rtc_publisher;
public:
SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id);
virtual ~SrsRtcSession();
@ -283,6 +333,7 @@ public:
srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req);
srs_error_t on_dtls(SrsUdpMuxSocket* skt);
srs_error_t on_rtcp(SrsUdpMuxSocket* skt);
srs_error_t on_rtp(SrsUdpMuxSocket* skt);
public:
srs_error_t send_client_hello(SrsUdpMuxSocket* skt);
srs_error_t on_connection_established(SrsUdpMuxSocket* skt);
@ -296,6 +347,8 @@ private:
private:
srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
srs_error_t on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
srs_error_t on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
};

@ -0,0 +1,390 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 John
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_app_rtp_queue.hpp>
#include <string.h>
#include <unistd.h>
#include <sstream>
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_rtp.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_utility.hpp>
SrsRtpNackInfo::SrsRtpNackInfo()
{
generate_time_ = srs_update_system_time();
pre_req_nack_time_ = 0;
req_nack_count_ = 0;
}
SrsRtpNackList::SrsRtpNackList(SrsRtpQueue* rtp_queue)
{
rtp_queue_ = rtp_queue;
pre_check_time_ = 0;
srs_info("nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%ld, nack_interval=%ld"
opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval);
}
SrsRtpNackList::~SrsRtpNackList()
{
}
void SrsRtpNackList::insert(uint16_t seq)
{
// FIXME: full, drop packet, and request key frame.
SrsRtpNackInfo& nack_info = queue_[seq];
}
void SrsRtpNackList::remove(uint16_t seq)
{
queue_.erase(seq);
}
SrsRtpNackInfo* SrsRtpNackList::find(uint16_t seq)
{
std::map<uint16_t, SrsRtpNackInfo>::iterator iter = queue_.find(seq);
if (iter == queue_.end()) {
return NULL;
}
return &(iter->second);
}
void SrsRtpNackList::dump()
{
return;
srs_verbose("@debug, queue size=%u", queue_.size());
for (std::map<uint16_t, SrsRtpNackInfo>::iterator iter = queue_.begin(); iter != queue_.end(); ++iter) {
srs_verbose("@debug, nack seq=%u", iter->first);
}
}
void SrsRtpNackList::get_nack_seqs(vector<uint16_t>& seqs)
{
srs_utime_t now = srs_update_system_time();
int interval = now - pre_check_time_;
if (interval < opts_.nack_interval / 2) {
return;
}
pre_check_time_ = now;
std::map<uint16_t, SrsRtpNackInfo>::iterator iter = queue_.begin();
while (iter != queue_.end()) {
const uint16_t& seq = iter->first;
SrsRtpNackInfo& nack_info = iter->second;
int alive_time = now - nack_info.generate_time_;
if (alive_time > opts_.max_alive_time || nack_info.req_nack_count_ > opts_.max_count) {
srs_verbose("NACK, drop seq=%u alive time %d bigger than max_alive_time=%d OR nack count %d bigger than %d",
seq, alive_time, opts_.max_alive_time, nack_info.req_nack_count_, opts_.max_count);
rtp_queue_->notify_drop_seq(seq);
queue_.erase(iter++);
continue;
}
// TODO:Statistics unorder packet.
if (now - nack_info.generate_time_ < opts_.first_nack_interval) {
break;
}
if (now - nack_info.pre_req_nack_time_ >= opts_.nack_interval && nack_info.req_nack_count_ <= opts_.max_count) {
++nack_info.req_nack_count_;
nack_info.pre_req_nack_time_ = now;
seqs.push_back(seq);
srs_verbose("NACK, resend seq=%u, count=%d", seq, nack_info.req_nack_count_);
}
++iter;
}
}
void SrsRtpNackList::update_rtt(int rtt)
{
rtt_ = rtt;
srs_verbose("NACK, update rtt from %ld to %d", opts_.nack_interval, rtt);
// FIXME: limit min and max value.
opts_.nack_interval = rtt_;
}
SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
: nack_(this)
{
capacity_ = capacity;
head_sequence_ = 0;
highest_sequence_ = 0;
initialized_ = false;
start_collected_ = false;
queue_ = new SrsRtpSharedPacket*[capacity_];
memset(queue_, 0, sizeof(SrsRtpSharedPacket*) * capacity);
cycle_ = 0;
jitter_ = 0;
last_trans_time_ = 0;
pre_number_of_packet_received_ = 0;
pre_number_of_packet_lossed_ = 0;
num_of_packet_received_ = 0;
number_of_packet_lossed_ = 0;
one_packet_per_frame_ = one_packet_per_frame;
}
SrsRtpQueue::~SrsRtpQueue()
{
srs_freepa(queue_);
}
srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
{
srs_error_t err = srs_success;
uint16_t seq = rtp_pkt->rtp_header.get_sequence();
srs_utime_t now = srs_update_system_time();
// First packet recv, init head_sequence and highest_sequence.
if (! initialized_) {
initialized_ = true;
head_sequence_ = seq;
highest_sequence_ = seq;
++num_of_packet_received_;
last_trans_time_ = now/1000 - rtp_pkt->rtp_header.get_timestamp()/90;
} else {
SrsRtpNackInfo* nack_info = NULL;
if ((nack_info = nack_.find(seq)) != NULL) {
srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success", seq, now - nack_info->generate_time_, nack_info->req_nack_count_);
nack_.remove(seq);
} else {
// Calc jitter.
{
int trans_time = now/1000 - rtp_pkt->rtp_header.get_timestamp()/90;
int cur_jitter = trans_time - last_trans_time_;
if (cur_jitter < 0) {
cur_jitter = -cur_jitter;
}
last_trans_time_ = trans_time;
jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast<double>(cur_jitter) / 16.0);
srs_verbose("jitter=%.2f", jitter_);
}
++num_of_packet_received_;
// seq > highest_sequence_
if (seq_cmp(highest_sequence_, seq)) {
insert_into_nack_list(highest_sequence_ + 1, seq);
if (seq < highest_sequence_) {
srs_verbose("warp around, cycle=%lu", cycle_);
++cycle_;
}
highest_sequence_ = seq;
} else {
// Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first paacet client sented.
if (! start_collected_) {
if (seq_cmp(seq, head_sequence_)) {
srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", head_sequence_, seq);
head_sequence_ = seq;
}
insert_into_nack_list(seq + 1, highest_sequence_);
} else {
srs_verbose("seq=%u, rtx success, too old", seq);
}
}
}
}
int delay = highest_sequence_ - head_sequence_ + 1;
srs_verbose("seqs range=[%u-%u], delay=%d", head_sequence_, highest_sequence_, delay);
// Check seqs out of range.
if (head_sequence_ + capacity_ < highest_sequence_) {
srs_verbose("try collect packet becuase seq out of range");
collect_packet();
}
while (head_sequence_ + capacity_ < highest_sequence_) {
srs_trace("seqs out of range, head seq=%u, hightest seq=%u", head_sequence_, highest_sequence_);
remove(head_sequence_);
uint16_t s = head_sequence_ + 1;
for ( ; s != highest_sequence_; ++s) {
SrsRtpSharedPacket*& pkt = queue_[s % capacity_];
// Choose the new head sequence. Must be the first packet of frame.
if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) {
srs_trace("find except, update head seq from %u to %u when seqs out of range", head_sequence_, s);
head_sequence_ = s;
break;
}
// Drop the seq.
nack_.remove(s);
srs_verbose("seqs out of range, drop seq=%u", s);
if (pkt && pkt->rtp_header.get_sequence() == s) {
delete pkt;
pkt = NULL;
}
}
srs_trace("force update, update head seq from %u to %u when seqs out of range", head_sequence_, s);
head_sequence_ = s;
}
SrsRtpSharedPacket* old_pkt = queue_[seq % capacity_];
if (old_pkt) {
delete old_pkt;
}
queue_[seq % capacity_] = rtp_pkt->copy();
// Marker bit means the last packet of frame received.
if (rtp_pkt->rtp_header.get_marker() || (highest_sequence_ - head_sequence_ >= capacity_ / 2) || one_packet_per_frame_) {
collect_packet();
}
return err;
}
srs_error_t SrsRtpQueue::remove(uint16_t seq)
{
srs_error_t err = srs_success;
SrsRtpSharedPacket*& pkt = queue_[seq % capacity_];
if (pkt && pkt->rtp_header.get_sequence() == seq) {
delete pkt;
pkt = NULL;
}
return err;
}
void SrsRtpQueue::get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames)
{
frames.swap(frames_);
}
void SrsRtpQueue::notify_drop_seq(uint16_t seq)
{
uint16_t s = seq + 1;
for ( ; s != highest_sequence_; ++s) {
SrsRtpSharedPacket* pkt = queue_[s % capacity_];
if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) {
break;
}
}
srs_verbose("drop seq=%u, highest seq=%u, update head seq %u to %u", seq, highest_sequence_, head_sequence_, s);
head_sequence_ = s;
}
uint32_t SrsRtpQueue::get_extended_highest_sequence()
{
return cycle_ * 65536 + highest_sequence_;
}
uint8_t SrsRtpQueue::get_fraction_lost()
{
int64_t total = (number_of_packet_lossed_ - pre_number_of_packet_lossed_ + num_of_packet_received_ - pre_number_of_packet_received_);
uint8_t loss = 0;
if (total > 0) {
loss = (number_of_packet_lossed_ - pre_number_of_packet_lossed_) * 256 / total;
}
pre_number_of_packet_lossed_ = number_of_packet_lossed_;
pre_number_of_packet_received_ = num_of_packet_received_;
return loss;
}
uint32_t SrsRtpQueue::get_cumulative_number_of_packets_lost()
{
return number_of_packet_lossed_;
}
uint32_t SrsRtpQueue::get_interarrival_jitter()
{
return static_cast<uint32_t>(jitter_);
}
void SrsRtpQueue::update_rtt(int rtt)
{
nack_.update_rtt(rtt);
}
void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end)
{
for (uint16_t s = seq_start; s != seq_end; ++s) {
srs_verbose("loss seq=%u, insert into nack list", s);
nack_.insert(s);
++number_of_packet_lossed_;
}
// FIXME: Record key frame sequence.
// FIXME: When nack list too long, clear and send PLI.
}
void SrsRtpQueue::collect_packet()
{
vector<SrsRtpSharedPacket*> frame;
for (uint16_t s = head_sequence_; s != highest_sequence_; ++s) {
SrsRtpSharedPacket* pkt = queue_[s % capacity_];
nack_.dump();
if (nack_.find(s) != NULL) {
srs_verbose("seq=%u, found in nack list when collect frame", s);
break;
}
// We must collect frame from first packet to last packet.
if (s == head_sequence_ && pkt->rtp_payload_size() != 0 && ! pkt->rtp_payload_header->is_first_packet_of_frame) {
break;
}
frame.push_back(pkt->copy());
if (pkt->rtp_header.get_marker() || one_packet_per_frame_) {
if (! start_collected_) {
start_collected_ = true;
}
frames_.push_back(frame);
frame.clear();
srs_verbose("head seq=%u, update to %u because collect one full farme", head_sequence_, s + 1);
head_sequence_ = s + 1;
}
}
// remove the tmp buffer
for (size_t i = 0; i < frame.size(); ++i) {
srs_freep(frame[i]);
}
}

@ -0,0 +1,157 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2020 John
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_RTP_QUEUE_HPP
#define SRS_APP_RTP_QUEUE_HPP
#include <srs_core.hpp>
#include <string>
#include <vector>
#include <map>
class SrsRtpSharedPacket;
class SrsRtpQueue;
struct SrsNackOption
{
SrsNackOption()
{
// Default nack option.
max_count = 5;
max_alive_time = 2 * SRS_UTIME_SECONDS;
first_nack_interval = 10 * SRS_UTIME_MILLISECONDS;
nack_interval = 400 * SRS_UTIME_MILLISECONDS;
}
int max_count;
srs_utime_t max_alive_time;
int64_t first_nack_interval;
int64_t nack_interval;
};
struct SrsRtpNackInfo
{
SrsRtpNackInfo();
// Use to control the time of first nack req and the life of seq.
srs_utime_t generate_time_;
// Use to control nack interval.
srs_utime_t pre_req_nack_time_;
// Use to control nack times.
int req_nack_count_;
};
inline bool seq_cmp(const uint16_t& l, const uint16_t& r)
{
return ((int16_t)(r - l)) > 0;
}
struct SeqComp
{
bool operator()(const uint16_t& l, const uint16_t& r) const
{
return seq_cmp(l, r);
}
};
class SrsRtpNackList
{
private:
// Nack queue, seq order, oldest to newest.
std::map<uint16_t, SrsRtpNackInfo, SeqComp> queue_;
SrsRtpQueue* rtp_queue_;
SrsNackOption opts_;
private:
srs_utime_t pre_check_time_;
private:
int rtt_;
public:
SrsRtpNackList(SrsRtpQueue* rtp_queue);
virtual ~SrsRtpNackList();
public:
void insert(uint16_t seq);
void remove(uint16_t seq);
SrsRtpNackInfo* find(uint16_t seq);
public:
void dump();
public:
void get_nack_seqs(std::vector<uint16_t>& seqs);
public:
void update_rtt(int rtt);
};
class SrsRtpQueue
{
private:
/*
*[seq1|seq2|seq3|seq4|seq5 ... seq10|seq11(loss)|seq12(loss)|seq13]
* \___(head_sequence_) \ \___(highest_sequence_)
* \___(no received, in nack list)
*/
// Capacity of the ring-buffer.
size_t capacity_;
// Thei highest sequence we have receive.
uint16_t highest_sequence_;
// The sequence waitting to read.
uint16_t head_sequence_;
bool initialized_;
bool start_collected_;
// Ring bufer.
SrsRtpSharedPacket** queue_;
private:
uint64_t cycle_;
double jitter_;
int64_t last_trans_time_;
uint64_t pre_number_of_packet_received_;
uint64_t pre_number_of_packet_lossed_;
uint64_t num_of_packet_received_;
uint64_t number_of_packet_lossed_;
private:
bool one_packet_per_frame_;
public:
SrsRtpNackList nack_;
private:
std::vector<std::vector<SrsRtpSharedPacket*> > frames_;
public:
SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false);
virtual ~SrsRtpQueue();
public:
srs_error_t insert(SrsRtpSharedPacket* rtp_pkt);
srs_error_t remove(uint16_t seq);
public:
void get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames);
void notify_drop_seq(uint16_t seq);
public:
uint32_t get_extended_highest_sequence();
uint8_t get_fraction_lost();
uint32_t get_cumulative_number_of_packets_lost();
uint32_t get_interarrival_jitter();
public:
void update_rtt(int rtt);
private:
void insert_into_nack_list(uint16_t seq_start, uint16_t seq_end);
private:
void collect_packet();
};
#endif

@ -32,19 +32,10 @@ using namespace std;
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_utility.hpp>
// @see: https://tools.ietf.org/html/rfc6184#section-5.2
const uint8_t kStapA = 24;
// @see: https://tools.ietf.org/html/rfc6184#section-5.2
const uint8_t kFuA = 28;
// @see: https://tools.ietf.org/html/rfc6184#section-5.8
const uint8_t kStart = 0x80; // Fu-header start bit
const uint8_t kEnd = 0x40; // Fu-header end bit
SrsRtpHeader::SrsRtpHeader()
{
padding = false;
padding_length = 0;
extension = false;
cc = 0;
marker = false;
@ -72,9 +63,70 @@ SrsRtpHeader::~SrsRtpHeader()
srs_error_t SrsRtpHeader::decode(SrsBuffer* stream)
{
srs_error_t err = srs_success;
srs_error_t err = srs_success;
if (stream->size() < kRtpHeaderFixedSize) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect");
}
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P|X| CC |M| PT | sequence number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| synchronization source (SSRC) identifier |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| contributing source (CSRC) identifiers |
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
uint8_t first = stream->read_1bytes();
padding = (first & 0x20);
extension = (first & 0x10);
cc = (first & 0x0F);
uint8_t second = stream->read_1bytes();
marker = (second & 0x80);
payload_type = (second & 0x7F);
sequence = stream->read_2bytes();
timestamp = stream->read_4bytes();
ssrc = stream->read_4bytes();
if (stream->size() < header_size()) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect");
}
for (uint8_t i = 0; i < cc; ++i) {
csrc[i] = stream->read_4bytes();
}
if (extension) {
// TODO:
uint16_t profile_id = stream->read_2bytes();
extension_length = stream->read_2bytes();
// @see: https://tools.ietf.org/html/rfc3550#section-5.3.1
stream->skip(extension_length * 4);
srs_verbose("extension, profile_id=%u, length=%u", profile_id, extension_length);
// @see: https://tools.ietf.org/html/rfc5285#section-4.2
if (profile_id == 0xBEDE) {
}
}
if (padding) {
padding_length = *(reinterpret_cast<uint8_t*>(stream->data() + stream->size() - 1));
if (padding_length > (stream->size() - stream->pos())) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect");
}
// TODO: FIXME: Implements it.
srs_verbose("offset=%d, padding_length=%u", stream->size(), padding_length);
}
return err;
}
@ -173,6 +225,7 @@ SrsRtpPacket2::~SrsRtpPacket2()
void SrsRtpPacket2::set_padding(int size)
{
rtp_header.set_padding(size > 0);
rtp_header.set_padding_length(size);
if (cache_payload) {
cache_payload += size - padding;
}
@ -182,6 +235,7 @@ void SrsRtpPacket2::set_padding(int size)
void SrsRtpPacket2::add_padding(int size)
{
rtp_header.set_padding(padding + size > 0);
rtp_header.set_padding_length(rtp_header.get_padding_length() + size);
if (cache_payload) {
cache_payload += size;
}
@ -552,6 +606,69 @@ srs_error_t SrsRtpFUAPayload2::encode(SrsBuffer* buf)
return srs_success;
}
SrsRtpPayloadHeader::SrsRtpPayloadHeader()
{
is_first_packet_of_frame = false;
is_last_packet_of_frame = false;
}
SrsRtpPayloadHeader::~SrsRtpPayloadHeader()
{
}
SrsRtpPayloadHeader::SrsRtpPayloadHeader(const SrsRtpPayloadHeader& rhs)
{
operator=(rhs);
}
SrsRtpPayloadHeader& SrsRtpPayloadHeader::operator=(const SrsRtpPayloadHeader& rhs)
{
is_first_packet_of_frame = rhs.is_first_packet_of_frame;
is_last_packet_of_frame = rhs.is_last_packet_of_frame;
}
SrsRtpH264Header::SrsRtpH264Header() : SrsRtpPayloadHeader()
{
}
SrsRtpH264Header::~SrsRtpH264Header()
{
}
SrsRtpH264Header::SrsRtpH264Header(const SrsRtpH264Header& rhs)
{
operator=(rhs);
}
SrsRtpH264Header& SrsRtpH264Header::operator=(const SrsRtpH264Header& rhs)
{
SrsRtpPayloadHeader::operator=(rhs);
nalu_type = rhs.nalu_type;
nalu_header = rhs.nalu_header;
nalu_offset = rhs.nalu_offset;
return *this;
}
SrsRtpOpusHeader::SrsRtpOpusHeader() : SrsRtpPayloadHeader()
{
}
SrsRtpOpusHeader::~SrsRtpOpusHeader()
{
}
SrsRtpOpusHeader::SrsRtpOpusHeader(const SrsRtpOpusHeader& rhs)
{
operator=(rhs);
}
SrsRtpOpusHeader& SrsRtpOpusHeader::operator=(const SrsRtpOpusHeader& rhs)
{
SrsRtpPayloadHeader::operator=(rhs);
return *this;
}
SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload()
{
payload = NULL;
@ -570,6 +687,8 @@ SrsRtpSharedPacket::SrsRtpSharedPacket()
payload = NULL;
size = 0;
rtp_payload_header = NULL;
}
SrsRtpSharedPacket::~SrsRtpSharedPacket()
@ -581,9 +700,11 @@ SrsRtpSharedPacket::~SrsRtpSharedPacket()
--payload_ptr->shared_count;
}
}
srs_freep(rtp_payload_header);
}
srs_error_t SrsRtpSharedPacket::create(int64_t timestamp, uint16_t sequence, uint32_t ssrc, uint16_t payload_type, char* p, int s)
srs_error_t SrsRtpSharedPacket::create(SrsRtpHeader* rtp_h, SrsRtpPayloadHeader* rtp_ph, char* p, int s)
{
srs_error_t err = srs_success;
@ -593,10 +714,8 @@ srs_error_t SrsRtpSharedPacket::create(int64_t timestamp, uint16_t sequence, uin
srs_assert(!payload_ptr);
rtp_header.set_timestamp(timestamp);
rtp_header.set_sequence(sequence);
rtp_header.set_ssrc(ssrc);
rtp_header.set_payload_type(payload_type);
this->rtp_header = *rtp_h;
this->rtp_payload_header = rtp_ph;
// TODO: rtp header padding.
size_t buffer_size = rtp_header.header_size() + s;
@ -619,6 +738,25 @@ srs_error_t SrsRtpSharedPacket::create(int64_t timestamp, uint16_t sequence, uin
return err;
}
srs_error_t SrsRtpSharedPacket::decode(char* buf, int nb_buf)
{
srs_error_t err = srs_success;
SrsBuffer stream(buf, nb_buf);
if ((err = rtp_header.decode(&stream)) != srs_success) {
return srs_error_wrap(err, "rtp header decode failed");
}
payload_ptr = new SrsRtpSharedPacketPayload();
payload_ptr->payload = buf;
payload_ptr->size = nb_buf;
this->payload = payload_ptr->payload;
this->size = payload_ptr->size;
return err;
}
SrsRtpSharedPacket* SrsRtpSharedPacket::copy()
{
SrsRtpSharedPacket* copy = new SrsRtpSharedPacket();
@ -627,6 +765,11 @@ SrsRtpSharedPacket* SrsRtpSharedPacket::copy()
payload_ptr->shared_count++;
copy->rtp_header = rtp_header;
if (dynamic_cast<SrsRtpH264Header*>(rtp_payload_header)) {
copy->rtp_payload_header = new SrsRtpH264Header(*(dynamic_cast<SrsRtpH264Header*>(rtp_payload_header)));
} else if (dynamic_cast<SrsRtpOpusHeader*>(rtp_payload_header)) {
copy->rtp_payload_header = new SrsRtpOpusHeader(*(dynamic_cast<SrsRtpOpusHeader*>(rtp_payload_header)));
}
copy->payload = payload;
copy->size = size;

@ -37,6 +37,17 @@ const uint8_t kRtpMarker = 0x80;
// H.264 nalu header type mask.
const uint8_t kNalTypeMask = 0x1F;
// @see: https://tools.ietf.org/html/rfc6184#section-5.2
const uint8_t kStapA = 24;
// @see: https://tools.ietf.org/html/rfc6184#section-5.2
const uint8_t kFuA = 28;
// @see: https://tools.ietf.org/html/rfc6184#section-5.8
const uint8_t kStart = 0x80; // Fu-header start bit
const uint8_t kEnd = 0x40; // Fu-header end bit
class SrsBuffer;
class SrsRtpRawPayload;
class SrsRtpFUAPayload2;
@ -45,6 +56,7 @@ class SrsRtpHeader
{
private:
bool padding;
uint8_t padding_length;
bool extension;
uint8_t cc;
bool marker;
@ -76,6 +88,8 @@ public:
inline void set_ssrc(uint32_t v) { ssrc = v; }
uint32_t get_ssrc() const { return ssrc; }
inline void set_padding(bool v) { padding = v; }
inline void set_padding_length(uint8_t v) { padding_length = v; }
uint8_t get_padding_length() const { return padding_length; }
};
class SrsRtpPacket2
@ -212,6 +226,40 @@ public:
virtual srs_error_t encode(SrsBuffer* buf);
};
class SrsRtpPayloadHeader
{
public:
bool is_first_packet_of_frame;
bool is_last_packet_of_frame;
public:
SrsRtpPayloadHeader();
virtual ~SrsRtpPayloadHeader();
SrsRtpPayloadHeader(const SrsRtpPayloadHeader& rhs);
SrsRtpPayloadHeader& operator=(const SrsRtpPayloadHeader& rhs);
};
class SrsRtpOpusHeader : public SrsRtpPayloadHeader
{
public:
SrsRtpOpusHeader();
virtual ~SrsRtpOpusHeader();
SrsRtpOpusHeader(const SrsRtpOpusHeader& rhs);
SrsRtpOpusHeader& operator=(const SrsRtpOpusHeader& rhs);
};
class SrsRtpH264Header : public SrsRtpPayloadHeader
{
public:
uint8_t nalu_type;
uint8_t nalu_header;
std::vector<std::pair<size_t, size_t> > nalu_offset; // offset, size
public:
SrsRtpH264Header();
virtual ~SrsRtpH264Header();
SrsRtpH264Header(const SrsRtpH264Header& rhs);
SrsRtpH264Header& operator=(const SrsRtpH264Header& rhs);
};
class SrsRtpSharedPacket
{
private:
@ -230,14 +278,19 @@ private:
SrsRtpSharedPacketPayload* payload_ptr;
public:
SrsRtpHeader rtp_header;
SrsRtpPayloadHeader* rtp_payload_header;
char* payload;
int size;
public:
SrsRtpSharedPacket();
virtual ~SrsRtpSharedPacket();
public:
srs_error_t create(int64_t timestamp, uint16_t sequence, uint32_t ssrc, uint16_t payload_type, char* payload, int size);
srs_error_t create(SrsRtpHeader* rtp_h, SrsRtpPayloadHeader* rtp_ph, char* p, int s);
srs_error_t decode(char* buf, int nb_buf);
SrsRtpSharedPacket* copy();
public:
char* rtp_payload() { return payload + rtp_header.header_size(); }
int rtp_payload_size() { return size - rtp_header.header_size() - rtp_header.get_padding_length(); }
// Interface to modify rtp header
public:
srs_error_t modify_rtp_header_marker(bool marker);

Loading…
Cancel
Save