RTC: Use shared message for RTP packet

pull/1804/head
winlin 5 years ago
parent 4e1935f678
commit f794a7d3a7

@ -844,6 +844,36 @@ srs_error_t SrsRtcPlayer::messages_to_packets(SrsRtcSource* source, vector<SrsRt
return err;
}
srs_error_t SrsRtcPlayer::package_opus(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
pkt->rtp_header.set_timestamp(audio_timestamp);
pkt->rtp_header.set_sequence(audio_sequence++);
pkt->rtp_header.set_ssrc(audio_ssrc);
pkt->rtp_header.set_payload_type(audio_payload_type);
// TODO: FIXME: Padding audio to the max payload in RTP packets.
if (max_padding > 0) {
}
// TODO: FIXME: Why 960? Need Refactoring?
audio_timestamp += 960;
return err;
}
srs_error_t SrsRtcPlayer::package_video(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
pkt->rtp_header.set_sequence(video_sequence++);
pkt->rtp_header.set_ssrc(video_ssrc);
pkt->rtp_header.set_payload_type(video_payload_type);
return err;
}
srs_error_t SrsRtcPlayer::send_packets(std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info)
{
srs_error_t err = srs_success;
@ -1159,36 +1189,6 @@ srs_error_t SrsRtcPlayer::send_packets_gso(vector<SrsRtpPacket2*>& pkts, SrsRtcO
return err;
}
srs_error_t SrsRtcPlayer::package_opus(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
pkt->rtp_header.set_timestamp(audio_timestamp);
pkt->rtp_header.set_sequence(audio_sequence++);
pkt->rtp_header.set_ssrc(audio_ssrc);
pkt->rtp_header.set_payload_type(audio_payload_type);
// TODO: FIXME: Padding audio to the max payload in RTP packets.
if (max_padding > 0) {
}
// TODO: FIXME: Why 960? Need Refactoring?
audio_timestamp += 960;
return err;
}
srs_error_t SrsRtcPlayer::package_video(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
pkt->rtp_header.set_sequence(video_sequence++);
pkt->rtp_header.set_ssrc(video_ssrc);
pkt->rtp_header.set_payload_type(video_payload_type);
return err;
}
void SrsRtcPlayer::nack_fetch(vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq)
{
SrsRtpPacket2* pkt = NULL;
@ -1712,7 +1712,8 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf)
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkt->set_decode_handler(this);
pkt->original_bytes = buf;
pkt->original_msg = new SrsSharedPtrMessage();
pkt->original_msg->wrap(buf, nb_buf);
SrsBuffer b(buf, nb_buf);
if ((err = pkt->decode(&b)) != srs_success) {
@ -1737,7 +1738,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf)
}
}
void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload)
void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload)
{
// No payload, ignore.
if (buf->empty()) {

@ -257,11 +257,10 @@ public:
private:
srs_error_t send_messages(SrsRtcSource* source, std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info);
srs_error_t messages_to_packets(SrsRtcSource* source, std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info);
srs_error_t send_packets(std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info);
srs_error_t send_packets_gso(std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info);
private:
srs_error_t package_opus(SrsRtpPacket2* pkt);
srs_error_t package_video(SrsRtpPacket2* pkt);
srs_error_t send_packets(std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info);
srs_error_t send_packets_gso(std::vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingPackets& info);
public:
void nack_fetch(std::vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq);
void simulate_nack_drop(int nn);
@ -313,7 +312,7 @@ private:
srs_error_t send_rtcp_fb_pli(uint32_t ssrc);
public:
srs_error_t on_rtp(char* buf, int nb_buf);
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload);
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload);
private:
srs_error_t on_audio(SrsRtpPacket2* pkt);
srs_error_t on_video(SrsRtpPacket2* pkt);

@ -424,9 +424,11 @@ srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
SrsAutoFree(SrsRtpPacket2, pkt);
for (int i = 0; i < (int)consumers.size(); i++) {
SrsRtcConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue2(pkt)) != srs_success) {
if ((err = consumer->enqueue2(pkt->copy())) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
@ -702,8 +704,8 @@ srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPack
raw->nn_payload = size;
memcpy(raw->payload, data, size);
// When free the RTP packet, should free the bytes allocated here.
pkt->original_bytes = raw->payload;
pkt->original_msg = new SrsSharedPtrMessage();
pkt->original_msg->wrap(raw->payload, size);
*ppkt = pkt;
@ -846,27 +848,29 @@ srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcSource* source, SrsShare
stap->nri = (SrsAvcNaluType)header;
// Copy the SPS/PPS bytes, because it may change.
char* p = new char[sps.size() + pps.size()];
pkt->original_bytes = p;
int size = (int)(sps.size() + pps.size());
char* payload = new char[size];
pkt->original_msg = new SrsSharedPtrMessage();
pkt->original_msg->wrap(payload, size);
if (true) {
SrsSample* sample = new SrsSample();
sample->bytes = p;
sample->bytes = payload;
sample->size = (int)sps.size();
stap->nalus.push_back(sample);
memcpy(p, (char*)&sps[0], sps.size());
p += (int)sps.size();
memcpy(payload, (char*)&sps[0], sps.size());
payload += (int)sps.size();
}
if (true) {
SrsSample* sample = new SrsSample();
sample->bytes = p;
sample->bytes = payload;
sample->size = (int)pps.size();
stap->nalus.push_back(sample);
memcpy(p, (char*)&pps[0], pps.size());
p += (int)pps.size();
memcpy(payload, (char*)&pps[0], pps.size());
payload += (int)pps.size();
}
*ppkt = pkt;

@ -414,6 +414,7 @@ SrsSample* SrsSample::copy()
SrsSample* p = new SrsSample();
p->bytes = bytes;
p->size = size;
p->bframe = bframe;
return p;
}

@ -303,6 +303,18 @@ srs_error_t SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload
return err;
}
void SrsSharedPtrMessage::wrap(char* payload, int size)
{
srs_assert(!ptr);
ptr = new SrsSharedPtrPayload();
ptr->payload = payload;
ptr->size = size;
this->payload = ptr->payload;
this->size = ptr->size;
}
int SrsSharedPtrMessage::count()
{
srs_assert(ptr);

@ -341,6 +341,9 @@ public:
// @remark user should never free the payload.
// @param pheader, the header to copy to the message. NULL to ignore.
virtual srs_error_t create(SrsMessageHeader* pheader, char* payload, int size);
// Create shared ptr message from RAW payload.
// @remark Note that the header is set to zero.
virtual void wrap(char* payload, int size);
// Get current reference count.
// when this object created, count set to 0.
// if copy() this object, count increase 1.

@ -265,6 +265,14 @@ uint8_t SrsRtpHeader::get_padding_length() const
return padding_length;
}
ISrsRtpPayloader::ISrsRtpPayloader()
{
}
ISrsRtpPayloader::~ISrsRtpPayloader()
{
}
ISrsRtpPacketDecodeHandler::ISrsRtpPacketDecodeHandler()
{
}
@ -359,6 +367,24 @@ bool SrsRtpPacket2::is_audio()
return frame_type == SrsFrameTypeAudio;
}
SrsRtpPacket2* SrsRtpPacket2::copy()
{
SrsRtpPacket2* cp = new SrsRtpPacket2();
cp->rtp_header = rtp_header;
cp->payload = payload? payload->copy():NULL;
cp->padding = padding;
cp->nalu_type = nalu_type;
cp->original_msg = original_msg? original_msg->copy():NULL;
cp->frame_type = frame_type;
cp->cache_payload = cache_payload;
cp->decode_handler = decode_handler;
return cp;
}
int SrsRtpPacket2::nb_bytes()
{
if (!cache_payload) {
@ -469,6 +495,16 @@ srs_error_t SrsRtpRawPayload::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpRawPayload::copy()
{
SrsRtpRawPayload* cp = new SrsRtpRawPayload();
cp->payload = payload;
cp->nn_payload = nn_payload;
return cp;
}
SrsRtpRawNALUs::SrsRtpRawNALUs()
{
cursor = 0;
@ -591,6 +627,22 @@ srs_error_t SrsRtpRawNALUs::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpRawNALUs::copy()
{
SrsRtpRawNALUs* cp = new SrsRtpRawNALUs();
cp->nn_bytes = nn_bytes;
cp->cursor = cursor;
int nn_nalus = (int)nalus.size();
for (int i = 0; i < nn_nalus; i++) {
SrsSample* p = nalus[i];
cp->nalus.push_back(p->copy());
}
return cp;
}
SrsRtpSTAPPayload::SrsRtpSTAPPayload()
{
nri = (SrsAvcNaluType)0;
@ -715,6 +767,21 @@ srs_error_t SrsRtpSTAPPayload::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpSTAPPayload::copy()
{
SrsRtpSTAPPayload* cp = new SrsRtpSTAPPayload();
cp->nri = nri;
int nn_nalus = (int)nalus.size();
for (int i = 0; i < nn_nalus; i++) {
SrsSample* p = nalus[i];
cp->nalus.push_back(p->copy());
}
return cp;
}
SrsRtpFUAPayload::SrsRtpFUAPayload()
{
start = end = false;
@ -809,6 +876,24 @@ srs_error_t SrsRtpFUAPayload::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpFUAPayload::copy()
{
SrsRtpFUAPayload* cp = new SrsRtpFUAPayload();
cp->nri = nri;
cp->start = start;
cp->end = end;
cp->nalu_type = nalu_type;
int nn_nalus = (int)nalus.size();
for (int i = 0; i < nn_nalus; i++) {
SrsSample* p = nalus[i];
cp->nalus.push_back(p->copy());
}
return cp;
}
SrsRtpFUAPayload2::SrsRtpFUAPayload2()
{
start = end = false;
@ -886,3 +971,17 @@ srs_error_t SrsRtpFUAPayload2::decode(SrsBuffer* buf)
return srs_success;
}
ISrsRtpPayloader* SrsRtpFUAPayload2::copy()
{
SrsRtpFUAPayload2* cp = new SrsRtpFUAPayload2();
cp->nri = nri;
cp->start = start;
cp->end = end;
cp->nalu_type = nalu_type;
cp->payload = payload;
cp->size = size;
return cp;
}

@ -97,6 +97,15 @@ public:
uint8_t get_padding_length() const;
};
class ISrsRtpPayloader : public ISrsCodec
{
public:
ISrsRtpPayloader();
virtual ~ISrsRtpPayloader();
public:
virtual ISrsRtpPayloader* copy() = 0;
};
class ISrsRtpPacketDecodeHandler
{
public:
@ -104,7 +113,7 @@ public:
virtual ~ISrsRtpPacketDecodeHandler();
public:
// We don't know the actual payload, so we depends on external handler.
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload) = 0;
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) = 0;
};
class SrsRtpPacket2
@ -113,7 +122,7 @@ class SrsRtpPacket2
public:
// TODO: FIXME: Rename to header.
SrsRtpHeader rtp_header;
ISrsCodec* payload;
ISrsRtpPayloader* payload;
// TODO: FIXME: Merge into rtp_header.
int padding;
// Helper fields.
@ -121,6 +130,7 @@ public:
// The first byte as nalu type, for video decoder only.
SrsAvcNaluType nalu_type;
// The original bytes for decoder or bridger only, we will free it.
// TODO: FIXME: Should covert to shared prt message.
char* original_bytes;
// The original msg for bridger only, we will free it.
SrsSharedPtrMessage* original_msg;
@ -152,6 +162,8 @@ public:
void set_decode_handler(ISrsRtpPacketDecodeHandler* h);
// Whether the packet is Audio packet.
bool is_audio();
// Copy the RTP packet.
SrsRtpPacket2* copy();
// interface ISrsEncoder
public:
virtual int nb_bytes();
@ -160,7 +172,7 @@ public:
};
// Single payload data.
class SrsRtpRawPayload : public ISrsCodec
class SrsRtpRawPayload : public ISrsRtpPayloader
{
public:
// The RAW payload, directly point to the shared memory.
@ -170,15 +182,16 @@ public:
public:
SrsRtpRawPayload();
virtual ~SrsRtpRawPayload();
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
// Multiple NALUs, automatically insert 001 between NALUs.
class SrsRtpRawNALUs : public ISrsCodec
class SrsRtpRawNALUs : public ISrsRtpPayloader
{
private:
// We will manage the samples, but the sample itself point to the shared memory.
@ -194,15 +207,16 @@ public:
uint8_t skip_first_byte();
// We will manage the returned samples, if user want to manage it, please copy it.
srs_error_t read_samples(std::vector<SrsSample*>& samples, int packet_size);
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
// STAP-A, for multiple NALUs.
class SrsRtpSTAPPayload : public ISrsCodec
class SrsRtpSTAPPayload : public ISrsRtpPayloader
{
public:
// The NRI in NALU type.
@ -216,16 +230,17 @@ public:
public:
SrsSample* get_sps();
SrsSample* get_pps();
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
// FU-A, for one NALU with multiple fragments.
// With more than one payload.
class SrsRtpFUAPayload : public ISrsCodec
class SrsRtpFUAPayload : public ISrsRtpPayloader
{
public:
// The NRI in NALU type.
@ -240,16 +255,17 @@ public:
public:
SrsRtpFUAPayload();
virtual ~SrsRtpFUAPayload();
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
// FU-A, for one NALU with multiple fragments.
// With only one payload.
class SrsRtpFUAPayload2 : public ISrsCodec
class SrsRtpFUAPayload2 : public ISrsRtpPayloader
{
public:
// The NRI in NALU type.
@ -264,11 +280,12 @@ public:
public:
SrsRtpFUAPayload2();
virtual ~SrsRtpFUAPayload2();
// interface ISrsEncoder
// interface ISrsRtpPayloader
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
virtual ISrsRtpPayloader* copy();
};
#endif

Loading…
Cancel
Save