diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index a7e0fcc51..23af2665b 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -844,6 +844,36 @@ srs_error_t SrsRtcPlayer::messages_to_packets(SrsRtcSource* source, vectorrtp_header.set_timestamp(audio_timestamp); + pkt->rtp_header.set_sequence(audio_sequence++); + pkt->rtp_header.set_ssrc(audio_ssrc); + pkt->rtp_header.set_payload_type(audio_payload_type); + + // TODO: FIXME: Padding audio to the max payload in RTP packets. + if (max_padding > 0) { + } + + // TODO: FIXME: Why 960? Need Refactoring? + audio_timestamp += 960; + + return err; +} + +srs_error_t SrsRtcPlayer::package_video(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + pkt->rtp_header.set_sequence(video_sequence++); + pkt->rtp_header.set_ssrc(video_ssrc); + pkt->rtp_header.set_payload_type(video_payload_type); + + return err; +} + srs_error_t SrsRtcPlayer::send_packets(std::vector& pkts, SrsRtcOutgoingPackets& info) { srs_error_t err = srs_success; @@ -1159,36 +1189,6 @@ srs_error_t SrsRtcPlayer::send_packets_gso(vector& pkts, SrsRtcO return err; } -srs_error_t SrsRtcPlayer::package_opus(SrsRtpPacket2* pkt) -{ - srs_error_t err = srs_success; - - pkt->rtp_header.set_timestamp(audio_timestamp); - pkt->rtp_header.set_sequence(audio_sequence++); - pkt->rtp_header.set_ssrc(audio_ssrc); - pkt->rtp_header.set_payload_type(audio_payload_type); - - // TODO: FIXME: Padding audio to the max payload in RTP packets. - if (max_padding > 0) { - } - - // TODO: FIXME: Why 960? Need Refactoring? - audio_timestamp += 960; - - return err; -} - -srs_error_t SrsRtcPlayer::package_video(SrsRtpPacket2* pkt) -{ - srs_error_t err = srs_success; - - pkt->rtp_header.set_sequence(video_sequence++); - pkt->rtp_header.set_ssrc(video_ssrc); - pkt->rtp_header.set_payload_type(video_payload_type); - - return err; -} - void SrsRtcPlayer::nack_fetch(vector& pkts, uint32_t ssrc, uint16_t seq) { SrsRtpPacket2* pkt = NULL; @@ -1712,7 +1712,8 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf) SrsRtpPacket2* pkt = new SrsRtpPacket2(); pkt->set_decode_handler(this); - pkt->original_bytes = buf; + pkt->original_msg = new SrsSharedPtrMessage(); + pkt->original_msg->wrap(buf, nb_buf); SrsBuffer b(buf, nb_buf); if ((err = pkt->decode(&b)) != srs_success) { @@ -1737,7 +1738,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf) } } -void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload) +void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) { // No payload, ignore. if (buf->empty()) { diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 83e9a770a..a10492217 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -257,11 +257,10 @@ public: private: srs_error_t send_messages(SrsRtcSource* source, std::vector& pkts, SrsRtcOutgoingPackets& info); srs_error_t messages_to_packets(SrsRtcSource* source, std::vector& pkts, SrsRtcOutgoingPackets& info); - srs_error_t send_packets(std::vector& pkts, SrsRtcOutgoingPackets& info); - srs_error_t send_packets_gso(std::vector& pkts, SrsRtcOutgoingPackets& info); -private: srs_error_t package_opus(SrsRtpPacket2* pkt); srs_error_t package_video(SrsRtpPacket2* pkt); + srs_error_t send_packets(std::vector& pkts, SrsRtcOutgoingPackets& info); + srs_error_t send_packets_gso(std::vector& pkts, SrsRtcOutgoingPackets& info); public: void nack_fetch(std::vector& pkts, uint32_t ssrc, uint16_t seq); void simulate_nack_drop(int nn); @@ -313,7 +312,7 @@ private: srs_error_t send_rtcp_fb_pli(uint32_t ssrc); public: srs_error_t on_rtp(char* buf, int nb_buf); - virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload); + virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload); private: srs_error_t on_audio(SrsRtpPacket2* pkt); srs_error_t on_video(SrsRtpPacket2* pkt); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 6b62ea5ae..2084583d3 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -424,9 +424,11 @@ srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; + SrsAutoFree(SrsRtpPacket2, pkt); + for (int i = 0; i < (int)consumers.size(); i++) { SrsRtcConsumer* consumer = consumers.at(i); - if ((err = consumer->enqueue2(pkt)) != srs_success) { + if ((err = consumer->enqueue2(pkt->copy())) != srs_success) { return srs_error_wrap(err, "consume message"); } } @@ -702,8 +704,8 @@ srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPack raw->nn_payload = size; memcpy(raw->payload, data, size); - // When free the RTP packet, should free the bytes allocated here. - pkt->original_bytes = raw->payload; + pkt->original_msg = new SrsSharedPtrMessage(); + pkt->original_msg->wrap(raw->payload, size); *ppkt = pkt; @@ -846,27 +848,29 @@ srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcSource* source, SrsShare stap->nri = (SrsAvcNaluType)header; // Copy the SPS/PPS bytes, because it may change. - char* p = new char[sps.size() + pps.size()]; - pkt->original_bytes = p; + int size = (int)(sps.size() + pps.size()); + char* payload = new char[size]; + pkt->original_msg = new SrsSharedPtrMessage(); + pkt->original_msg->wrap(payload, size); if (true) { SrsSample* sample = new SrsSample(); - sample->bytes = p; + sample->bytes = payload; sample->size = (int)sps.size(); stap->nalus.push_back(sample); - memcpy(p, (char*)&sps[0], sps.size()); - p += (int)sps.size(); + memcpy(payload, (char*)&sps[0], sps.size()); + payload += (int)sps.size(); } if (true) { SrsSample* sample = new SrsSample(); - sample->bytes = p; + sample->bytes = payload; sample->size = (int)pps.size(); stap->nalus.push_back(sample); - memcpy(p, (char*)&pps[0], pps.size()); - p += (int)pps.size(); + memcpy(payload, (char*)&pps[0], pps.size()); + payload += (int)pps.size(); } *ppkt = pkt; diff --git a/trunk/src/kernel/srs_kernel_codec.cpp b/trunk/src/kernel/srs_kernel_codec.cpp index e104790e0..e3d3ca4a3 100644 --- a/trunk/src/kernel/srs_kernel_codec.cpp +++ b/trunk/src/kernel/srs_kernel_codec.cpp @@ -414,6 +414,7 @@ SrsSample* SrsSample::copy() SrsSample* p = new SrsSample(); p->bytes = bytes; p->size = size; + p->bframe = bframe; return p; } diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index ff8be065c..76d808a44 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -303,6 +303,18 @@ srs_error_t SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload return err; } +void SrsSharedPtrMessage::wrap(char* payload, int size) +{ + srs_assert(!ptr); + ptr = new SrsSharedPtrPayload(); + + ptr->payload = payload; + ptr->size = size; + + this->payload = ptr->payload; + this->size = ptr->size; +} + int SrsSharedPtrMessage::count() { srs_assert(ptr); diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index 4c8c39ea7..60d8ee1d3 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -341,6 +341,9 @@ public: // @remark user should never free the payload. // @param pheader, the header to copy to the message. NULL to ignore. virtual srs_error_t create(SrsMessageHeader* pheader, char* payload, int size); + // Create shared ptr message from RAW payload. + // @remark Note that the header is set to zero. + virtual void wrap(char* payload, int size); // Get current reference count. // when this object created, count set to 0. // if copy() this object, count increase 1. diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index d3af5de6c..2bdb5fda2 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -265,6 +265,14 @@ uint8_t SrsRtpHeader::get_padding_length() const return padding_length; } +ISrsRtpPayloader::ISrsRtpPayloader() +{ +} + +ISrsRtpPayloader::~ISrsRtpPayloader() +{ +} + ISrsRtpPacketDecodeHandler::ISrsRtpPacketDecodeHandler() { } @@ -359,6 +367,24 @@ bool SrsRtpPacket2::is_audio() return frame_type == SrsFrameTypeAudio; } +SrsRtpPacket2* SrsRtpPacket2::copy() +{ + SrsRtpPacket2* cp = new SrsRtpPacket2(); + + cp->rtp_header = rtp_header; + cp->payload = payload? payload->copy():NULL; + cp->padding = padding; + + cp->nalu_type = nalu_type; + cp->original_msg = original_msg? original_msg->copy():NULL; + cp->frame_type = frame_type; + + cp->cache_payload = cache_payload; + cp->decode_handler = decode_handler; + + return cp; +} + int SrsRtpPacket2::nb_bytes() { if (!cache_payload) { @@ -469,6 +495,16 @@ srs_error_t SrsRtpRawPayload::decode(SrsBuffer* buf) return srs_success; } +ISrsRtpPayloader* SrsRtpRawPayload::copy() +{ + SrsRtpRawPayload* cp = new SrsRtpRawPayload(); + + cp->payload = payload; + cp->nn_payload = nn_payload; + + return cp; +} + SrsRtpRawNALUs::SrsRtpRawNALUs() { cursor = 0; @@ -591,6 +627,22 @@ srs_error_t SrsRtpRawNALUs::decode(SrsBuffer* buf) return srs_success; } +ISrsRtpPayloader* SrsRtpRawNALUs::copy() +{ + SrsRtpRawNALUs* cp = new SrsRtpRawNALUs(); + + cp->nn_bytes = nn_bytes; + cp->cursor = cursor; + + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + cp->nalus.push_back(p->copy()); + } + + return cp; +} + SrsRtpSTAPPayload::SrsRtpSTAPPayload() { nri = (SrsAvcNaluType)0; @@ -715,6 +767,21 @@ srs_error_t SrsRtpSTAPPayload::decode(SrsBuffer* buf) return srs_success; } +ISrsRtpPayloader* SrsRtpSTAPPayload::copy() +{ + SrsRtpSTAPPayload* cp = new SrsRtpSTAPPayload(); + + cp->nri = nri; + + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + cp->nalus.push_back(p->copy()); + } + + return cp; +} + SrsRtpFUAPayload::SrsRtpFUAPayload() { start = end = false; @@ -809,6 +876,24 @@ srs_error_t SrsRtpFUAPayload::decode(SrsBuffer* buf) return srs_success; } +ISrsRtpPayloader* SrsRtpFUAPayload::copy() +{ + SrsRtpFUAPayload* cp = new SrsRtpFUAPayload(); + + cp->nri = nri; + cp->start = start; + cp->end = end; + cp->nalu_type = nalu_type; + + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + cp->nalus.push_back(p->copy()); + } + + return cp; +} + SrsRtpFUAPayload2::SrsRtpFUAPayload2() { start = end = false; @@ -886,3 +971,17 @@ srs_error_t SrsRtpFUAPayload2::decode(SrsBuffer* buf) return srs_success; } + +ISrsRtpPayloader* SrsRtpFUAPayload2::copy() +{ + SrsRtpFUAPayload2* cp = new SrsRtpFUAPayload2(); + + cp->nri = nri; + cp->start = start; + cp->end = end; + cp->nalu_type = nalu_type; + cp->payload = payload; + cp->size = size; + + return cp; +} diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index e348631aa..65ff51e2c 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -97,6 +97,15 @@ public: uint8_t get_padding_length() const; }; +class ISrsRtpPayloader : public ISrsCodec +{ +public: + ISrsRtpPayloader(); + virtual ~ISrsRtpPayloader(); +public: + virtual ISrsRtpPayloader* copy() = 0; +}; + class ISrsRtpPacketDecodeHandler { public: @@ -104,7 +113,7 @@ public: virtual ~ISrsRtpPacketDecodeHandler(); public: // We don't know the actual payload, so we depends on external handler. - virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload) = 0; + virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) = 0; }; class SrsRtpPacket2 @@ -113,7 +122,7 @@ class SrsRtpPacket2 public: // TODO: FIXME: Rename to header. SrsRtpHeader rtp_header; - ISrsCodec* payload; + ISrsRtpPayloader* payload; // TODO: FIXME: Merge into rtp_header. int padding; // Helper fields. @@ -121,6 +130,7 @@ public: // The first byte as nalu type, for video decoder only. SrsAvcNaluType nalu_type; // The original bytes for decoder or bridger only, we will free it. + // TODO: FIXME: Should covert to shared prt message. char* original_bytes; // The original msg for bridger only, we will free it. SrsSharedPtrMessage* original_msg; @@ -152,6 +162,8 @@ public: void set_decode_handler(ISrsRtpPacketDecodeHandler* h); // Whether the packet is Audio packet. bool is_audio(); + // Copy the RTP packet. + SrsRtpPacket2* copy(); // interface ISrsEncoder public: virtual int nb_bytes(); @@ -160,7 +172,7 @@ public: }; // Single payload data. -class SrsRtpRawPayload : public ISrsCodec +class SrsRtpRawPayload : public ISrsRtpPayloader { public: // The RAW payload, directly point to the shared memory. @@ -170,15 +182,16 @@ public: public: SrsRtpRawPayload(); virtual ~SrsRtpRawPayload(); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; // Multiple NALUs, automatically insert 001 between NALUs. -class SrsRtpRawNALUs : public ISrsCodec +class SrsRtpRawNALUs : public ISrsRtpPayloader { private: // We will manage the samples, but the sample itself point to the shared memory. @@ -194,15 +207,16 @@ public: uint8_t skip_first_byte(); // We will manage the returned samples, if user want to manage it, please copy it. srs_error_t read_samples(std::vector& samples, int packet_size); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; // STAP-A, for multiple NALUs. -class SrsRtpSTAPPayload : public ISrsCodec +class SrsRtpSTAPPayload : public ISrsRtpPayloader { public: // The NRI in NALU type. @@ -216,16 +230,17 @@ public: public: SrsSample* get_sps(); SrsSample* get_pps(); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; // FU-A, for one NALU with multiple fragments. // With more than one payload. -class SrsRtpFUAPayload : public ISrsCodec +class SrsRtpFUAPayload : public ISrsRtpPayloader { public: // The NRI in NALU type. @@ -240,16 +255,17 @@ public: public: SrsRtpFUAPayload(); virtual ~SrsRtpFUAPayload(); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; // FU-A, for one NALU with multiple fragments. // With only one payload. -class SrsRtpFUAPayload2 : public ISrsCodec +class SrsRtpFUAPayload2 : public ISrsRtpPayloader { public: // The NRI in NALU type. @@ -264,11 +280,12 @@ public: public: SrsRtpFUAPayload2(); virtual ~SrsRtpFUAPayload2(); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; #endif