RTC: Refine twcc to connection

pull/1908/head
winlin 5 years ago
parent 5f88dc357e
commit 3a3d908a63

@ -225,7 +225,6 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, SrsContextId parent_cid)
mw_msgs = 0;
realtime = true;
nn_simulate_nack_drop = 0;
nack_enabled_ = false;
_srs_config->subscribe(this);
@ -432,9 +431,6 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<Sr
return err;
}
// TODO: FIXME: find track when send
// TODO: FIXME: Fix the stat bug.
// TODO: FIXME: Any simple solution?
vector<SrsRtpPacket2*> send_pkts;
// Covert kernel messages to RTP packets.
for (int i = 0; i < (int)pkts.size(); i++) {
@ -445,16 +441,11 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<Sr
continue;
}
// Update stats.
info.nn_bytes += pkt->nb_bytes();
// For audio, we transcoded AAC to opus in extra payloads.
if (pkt->is_audio()) {
info.nn_audios++;
SrsRtcAudioSendTrack* audio_track = audio_tracks_[pkt->header.get_ssrc()];
// TODO: FIXME: Any simple solution?
if ((err = audio_track->on_rtp(send_pkts, pkt)) != srs_success) {
if ((err = audio_track->on_rtp(pkt, info)) != srs_success) {
return srs_error_wrap(err, "audio_track on rtp");
}
// TODO: FIXME: Padding audio to the max payload in RTP packets.
@ -463,7 +454,7 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<Sr
SrsRtcVideoSendTrack* video_track = video_tracks_[pkt->header.get_ssrc()];
// TODO: FIXME: Any simple solution?
if ((err = video_track->on_rtp(send_pkts, pkt)) != srs_success) {
if ((err = video_track->on_rtp(pkt, info)) != srs_success) {
return srs_error_wrap(err, "audio_track on rtp");
}
}
@ -473,80 +464,6 @@ srs_error_t SrsRtcPlayStream::send_packets(SrsRtcStream* source, const vector<Sr
pkt->header.get_timestamp(), pkt->nb_bytes());
}
// By default, we send packets by sendmmsg.
if ((err = do_send_packets(send_pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
return err;
}
srs_error_t SrsRtcPlayStream::do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info)
{
srs_error_t err = srs_success;
// Cache the encrypt flag and sender.
bool encrypt = session_->encrypt;
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts.at(i);
// For this message, select the first iovec.
iovec* iov = new iovec();
SrsAutoFree(iovec, iov);
char* iov_base = new char[kRtpPacketSize];
SrsAutoFreeA(char, iov_base);
iov->iov_base = iov_base;
iov->iov_len = kRtpPacketSize;
// Marshal packet to bytes in iovec.
if (true) {
#ifdef SRS_CXX14
// should set twcc sn before packet encode.
if(twcc_id_) {
twcc_sn = twcc_controller.allocate_twcc_sn();
pkt->header.set_twcc_sequence_number(twcc_id_, twcc_sn);
}
#endif
SrsBuffer stream((char*)iov->iov_base, iov->iov_len);
if ((err = pkt->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
iov->iov_len = stream.pos();
}
// Whether encrypt the RTP bytes.
if (encrypt) {
int nn_encrypt = (int)iov->iov_len;
if ((err = session_->transport_->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
}
info.nn_rtp_bytes += (int)iov->iov_len;
// When we send out a packet, increase the stat counter.
info.nn_rtp_pkts++;
// For NACK simulator, drop packet.
if (nn_simulate_nack_drop) {
simulate_drop_packet(&pkt->header, (int)iov->iov_len);
iov->iov_len = 0;
continue;
}
// TODO: FIXME: Handle error.
session_->sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0);
// Detail log, should disable it in release version.
srs_info("RTC: SEND PT=%u, SSRC=%#x, SEQ=%u, Time=%u, %u/%u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(),
pkt->header.get_sequence(), pkt->header.get_timestamp(), pkt->nb_bytes(), iov->iov_len);
}
return err;
}
@ -579,20 +496,6 @@ void SrsRtcPlayStream::nack_fetch(vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, u
}
}
void SrsRtcPlayStream::simulate_nack_drop(int nn)
{
nn_simulate_nack_drop = nn;
}
void SrsRtcPlayStream::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes)
{
srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u, ts=%u, %d bytes", nn_simulate_nack_drop,
h->get_sequence(), h->get_ssrc(), h->get_timestamp(),
nn_bytes);
nn_simulate_nack_drop--;
}
srs_error_t SrsRtcPlayStream::on_rtcp(char* data, int nb_data)
{
srs_error_t err = srs_success;
@ -697,10 +600,13 @@ srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf)
: Feedback Control Information (FCI) :
: :
*/
/*uint8_t first = */stream->read_1bytes();
uint8_t first = stream->read_1bytes();
//uint8_t version = first & 0xC0;
//uint8_t padding = first & 0x20;
//uint8_t fmt = first & 0x1F;
uint8_t fmt = first & 0x1F;
if(15 == fmt) {
return session_->on_rtcp_feedback(buf, nb_buf);
}
/*uint8_t payload_type = */stream->read_1bytes();
/*uint16_t length = */stream->read_2bytes();
@ -746,7 +652,7 @@ srs_error_t SrsRtcPlayStream::on_rtcp_feedback(char* buf, int nb_buf)
}
// By default, we send packets by sendmmsg.
if ((err = do_send_packets(resend_pkts, info)) != srs_success) {
if ((err = session_->do_send_packets(resend_pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
@ -1577,6 +1483,8 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s)
blackhole = false;
blackhole_addr = NULL;
blackhole_stfd = NULL;
twcc_id_ = 0;
nn_simulate_player_nack_drop = 0;
}
SrsRtcConnection::~SrsRtcConnection()
@ -1877,6 +1785,11 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data)
return err;
}
srs_error_t SrsRtcConnection::on_rtcp_feedback(char* data, int nb_data)
{
return srs_success;
}
srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
{
if (publisher_ == NULL) {
@ -2166,13 +2079,78 @@ srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc)
void SrsRtcConnection::simulate_nack_drop(int nn)
{
if (player_) {
player_->simulate_nack_drop(nn);
}
if (publisher_) {
publisher_->simulate_nack_drop(nn);
}
nn_simulate_player_nack_drop = nn;
}
void SrsRtcConnection::simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes)
{
srs_warn("RTC NACK simulator #%d player drop seq=%u, ssrc=%u, ts=%u, %d bytes", nn_simulate_player_nack_drop,
h->get_sequence(), h->get_ssrc(), h->get_timestamp(),
nn_bytes);
nn_simulate_player_nack_drop--;
}
srs_error_t SrsRtcConnection::do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info)
{
srs_error_t err = srs_success;
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts.at(i);
// For this message, select the first iovec.
iovec* iov = new iovec();
SrsAutoFree(iovec, iov);
char* iov_base = new char[kRtpPacketSize];
SrsAutoFreeA(char, iov_base);
iov->iov_base = iov_base;
iov->iov_len = kRtpPacketSize;
// Marshal packet to bytes in iovec.
if (true) {
SrsBuffer stream((char*)iov->iov_base, iov->iov_len);
if ((err = pkt->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
iov->iov_len = stream.pos();
}
// Whether encrypt the RTP bytes.
if (encrypt) {
int nn_encrypt = (int)iov->iov_len;
if ((err = transport_->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
}
info.nn_rtp_bytes += (int)iov->iov_len;
// When we send out a packet, increase the stat counter.
info.nn_rtp_pkts++;
// For NACK simulator, drop packet.
if (nn_simulate_player_nack_drop) {
simulate_player_drop_packet(&pkt->header, (int)iov->iov_len);
iov->iov_len = 0;
continue;
}
// TODO: FIXME: Handle error.
sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0);
// Detail log, should disable it in release version.
srs_info("RTC: SEND PT=%u, SSRC=%#x, SEQ=%u, Time=%u, %u/%u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(),
pkt->header.get_sequence(), pkt->header.get_timestamp(), pkt->nb_bytes(), iov->iov_len);
}
return err;
}
#ifdef SRS_OSX
@ -2834,6 +2812,22 @@ srs_error_t SrsRtcConnection::create_player(SrsRequest* req, std::map<uint32_t,
return srs_error_wrap(err, "SrsRtcPlayStream init");
}
// TODO: FIXME: Support reload.
// The TWCC ID is the ext-map ID in local SDP, and we set to enable GCC.
// Whatever the ext-map, we will disable GCC when config disable it.
int twcc_id = 0;
if (true) {
std::map<uint32_t, SrsRtcTrackDescription*>::iterator it = sub_relations.begin();
while (it != sub_relations.end()) {
if (it->second->type_ == "video") {
SrsRtcTrackDescription* track = it->second;
twcc_id = track->get_rtp_extension_id(kTWCCExt);
}
++it;
}
}
srs_trace("RTC connection player gcc=%d", twcc_id);
return err;
}

@ -175,14 +175,13 @@ private:
// key: publish_ssrc, value: send track to process rtp/rtcp
std::map<uint32_t, SrsRtcAudioSendTrack*> audio_tracks_;
std::map<uint32_t, SrsRtcVideoSendTrack*> video_tracks_;
// Simulators.
int nn_simulate_nack_drop;
private:
// For merged-write messages.
int mw_msgs;
bool realtime;
// Whether enabled nack.
bool nack_enabled_;
private:
// Whether palyer started.
bool is_started;
// statistic send packets.
@ -206,12 +205,8 @@ public:
virtual srs_error_t cycle();
private:
srs_error_t send_packets(SrsRtcStream* source, const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
srs_error_t do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
public:
void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq);
void simulate_nack_drop(int nn);
private:
void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes);
public:
srs_error_t on_rtcp(char* data, int nb_data);
private:
@ -336,6 +331,11 @@ private:
bool blackhole;
sockaddr_in* blackhole_addr;
srs_netfd_t blackhole_stfd;
private:
// twcc handler
int twcc_id_;
// Simulators.
int nn_simulate_player_nack_drop;
public:
SrsRtcConnection(SrsRtcServer* s);
virtual ~SrsRtcConnection();
@ -366,6 +366,7 @@ public:
srs_error_t on_dtls(char* data, int nb_data);
srs_error_t on_rtp(char* data, int nb_data);
srs_error_t on_rtcp(char* data, int nb_data);
srs_error_t on_rtcp_feedback(char* buf, int nb_buf);
public:
srs_error_t on_connection_established();
srs_error_t start_play();
@ -381,6 +382,8 @@ public:
public:
// Simulate the NACK to drop nn packets.
void simulate_nack_drop(int nn);
void simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes);
srs_error_t do_send_packets(const std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info);
private:
srs_error_t on_binding_request(SrsStunPacket* r);
// publish media capabilitiy negotiate

@ -1723,7 +1723,7 @@ std::string SrsRtcSendTrack::get_track_id()
return track_desc_->id_;
}
srs_error_t SrsRtcSendTrack::on_rtp(std::vector<SrsRtpPacket2*>& send_packets, SrsRtpPacket2* pkt)
srs_error_t SrsRtcSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info)
{
return srs_success;
}
@ -1742,7 +1742,7 @@ SrsRtcAudioSendTrack::~SrsRtcAudioSendTrack()
{
}
srs_error_t SrsRtcAudioSendTrack::on_rtp(std::vector<SrsRtpPacket2*>& send_packets, SrsRtpPacket2* pkt)
srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info)
{
srs_error_t err = srs_success;
@ -1750,16 +1750,25 @@ srs_error_t SrsRtcAudioSendTrack::on_rtp(std::vector<SrsRtpPacket2*>& send_packe
return err;
}
std::vector<SrsRtpPacket2*> pkts;
pkt->header.set_ssrc(track_desc_->ssrc_);
send_packets.push_back(pkt);
// Put rtp packet to NACK/ARQ queue
if (true) {
SrsRtpPacket2* nack = pkt->copy();
rtp_queue_->set(nack->header.get_sequence(), nack);
}
pkts.push_back(pkt);
// Update stats.
info.nn_bytes += pkt->nb_bytes();
info.nn_audios++;
if ((err = session_->do_send_packets(pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
return err;
}
@ -1779,7 +1788,7 @@ SrsRtcVideoSendTrack::~SrsRtcVideoSendTrack()
{
}
srs_error_t SrsRtcVideoSendTrack::on_rtp(std::vector<SrsRtpPacket2*>& send_packets, SrsRtpPacket2* pkt)
srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info)
{
srs_error_t err = srs_success;
@ -1787,15 +1796,24 @@ srs_error_t SrsRtcVideoSendTrack::on_rtp(std::vector<SrsRtpPacket2*>& send_packe
return err;
}
pkt->header.set_ssrc(track_desc_->ssrc_);
std::vector<SrsRtpPacket2*> pkts;
send_packets.push_back(pkt);
pkt->header.set_ssrc(track_desc_->ssrc_);
// Put rtp packet to NACK/ARQ queue
if (true) {
SrsRtpPacket2* nack = pkt->copy();
rtp_queue_->set(nack->header.get_sequence(), nack);
}
pkts.push_back(pkt);
// Update stats.
info.nn_bytes += pkt->nb_bytes();
info.nn_videos++;
if ((err = session_->do_send_packets(pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
return err;
}

@ -54,6 +54,7 @@ class SrsRtcConnection;
class SrsRtpRingBuffer;
class SrsRtpNackForReceiver;
class SrsJsonObject;
class SrsRtcOutgoingInfo;
class SrsNtp
{
@ -470,7 +471,7 @@ public:
void set_track_status(bool active);
std::string get_track_id();
public:
virtual srs_error_t on_rtp(std::vector<SrsRtpPacket2*>& send_packets, SrsRtpPacket2* pkt);
virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info);
virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt);
};
@ -480,7 +481,7 @@ public:
SrsRtcAudioSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc);
virtual ~SrsRtcAudioSendTrack();
public:
virtual srs_error_t on_rtp(std::vector<SrsRtpPacket2*>& send_packets, SrsRtpPacket2* pkt);
virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info);
virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt);
};
@ -490,7 +491,7 @@ public:
SrsRtcVideoSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc);
virtual ~SrsRtcVideoSendTrack();
public:
virtual srs_error_t on_rtp(std::vector<SrsRtpPacket2*>& send_packets, SrsRtpPacket2* pkt);
virtual srs_error_t on_rtp(SrsRtpPacket2* pkt, SrsRtcOutgoingInfo& info);
virtual srs_error_t on_rtcp(SrsRtpPacket2* pkt);
};

Loading…
Cancel
Save