From 6f2b78f16afd57cabb5d2438e850e460826aa5c4 Mon Sep 17 00:00:00 2001 From: winlin <winlin@vip.126.com> Date: Sat, 2 May 2020 09:15:49 +0800 Subject: [PATCH] Refactor code to keep sample function order --- trunk/src/app/srs_app_rtc_conn.cpp | 1090 +++++++++++++----------- trunk/src/app/srs_app_rtc_conn.hpp | 45 +- trunk/src/app/srs_app_rtp_queue.cpp | 74 +- trunk/src/app/srs_app_rtp_queue.hpp | 23 +- trunk/src/kernel/srs_kernel_buffer.cpp | 15 + trunk/src/kernel/srs_kernel_buffer.hpp | 54 +- trunk/src/kernel/srs_kernel_flv.cpp | 40 + trunk/src/kernel/srs_kernel_flv.hpp | 16 +- trunk/src/kernel/srs_kernel_rtp.cpp | 242 +++++- trunk/src/kernel/srs_kernel_rtp.hpp | 67 +- 10 files changed, 1030 insertions(+), 636 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index cab12402e..686ccbb88 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -125,41 +125,6 @@ static std::vector<std::string> get_candidate_ips() return candidate_ips; } - -static int cal_rtp_frame_size(const vector<SrsRtpSharedPacket*>& frame, int& frame_size, vector<uint32_t>& nalu_lens) -{ - uint32_t nalu_len = 0; - for (size_t n = 0; n < frame.size(); ++n) { - SrsRtpH264Header* rtp_h264_header = dynamic_cast<SrsRtpH264Header*>(frame[n]->rtp_payload_header); - for (size_t j = 0; j < rtp_h264_header->nalu_offset.size(); ++j) { - if (rtp_h264_header->nalu_type != kFuA) { - uint8_t* p = reinterpret_cast<uint8_t*>(frame[n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first); - if (((p[0] & kNalTypeMask) != SrsAvcNaluTypeAccessUnitDelimiter) && - ((p[0] & kNalTypeMask) != SrsAvcNaluTypeSEI) && - ((p[0] & kNalTypeMask) != SrsAvcNaluTypeSPS) && - ((p[0] & kNalTypeMask) != SrsAvcNaluTypePPS)) { - frame_size += rtp_h264_header->nalu_offset[j].second + 4; - nalu_lens.push_back(rtp_h264_header->nalu_offset[j].second); - } - } else { - if (frame[n]->rtp_payload_header->is_first_packet_of_frame) { - frame_size += 5; - nalu_len += 1; - } - frame_size += rtp_h264_header->nalu_offset[j].second; - nalu_len += rtp_h264_header->nalu_offset[j].second; - - if (frame[n]->rtp_payload_header->is_last_packet_of_frame) { - nalu_lens.push_back(nalu_len); - nalu_len = 0; - } - } - } - } - - return frame_size; -} - uint64_t SrsNtp::kMagicNtpFractionalUnit = 1ULL << 32; SrsNtp::SrsNtp() @@ -1501,7 +1466,6 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) rtc_session = session; rtp_h264_demuxer = new SrsRtpH264Demuxer(); - rtp_opus_demuxer = new SrsRtpOpusDemuxer(); rtp_video_queue = new SrsRtpQueue(1000); rtp_audio_queue = new SrsRtpQueue(100, true); @@ -1519,7 +1483,6 @@ SrsRtcPublisher::~SrsRtcPublisher() srs_freep(report_timer); srs_freep(rtp_h264_demuxer); - srs_freep(rtp_opus_demuxer); srs_freep(rtp_video_queue); srs_freep(rtp_audio_queue); } @@ -1904,31 +1867,54 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf) { srs_error_t err = srs_success; - SrsRtpSharedPacket* pkt = new SrsRtpSharedPacket(); - SrsAutoFree(SrsRtpSharedPacket, pkt); - if ((err = pkt->decode(buf, nb_buf)) != srs_success) { - return srs_error_wrap(err, "rtp packet decode failed"); + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + + pkt->set_decode_handler(this); + pkt->set_original_bytes(buf, nb_buf); + + SrsBuffer b(buf, nb_buf); + if ((err = pkt->decode(&b)) != srs_success) { + return srs_error_wrap(err, "decode rtp packet"); } uint32_t ssrc = pkt->rtp_header.get_ssrc(); - if (ssrc == audio_ssrc) { return on_audio(pkt); } else if (ssrc == video_ssrc) { return on_video(pkt); + } else { + srs_freep(pkt); + return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc); } - - return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc); } -srs_error_t SrsRtcPublisher::on_audio(SrsRtpSharedPacket* pkt) +void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload) { - srs_error_t err = srs_success; + // No payload, ignore. + if (buf->empty()) { + return; + } - pkt->rtp_payload_header = new SrsRtpOpusHeader(); - if ((err = rtp_opus_demuxer->parse(pkt)) != srs_success) { - return srs_error_wrap(err, "rtp opus demux failed"); + uint32_t ssrc = pkt->rtp_header.get_ssrc(); + if (ssrc == audio_ssrc) { + *ppayload = pkt->reuse_raw(); + } else if (ssrc == video_ssrc) { + uint8_t v = (uint8_t)pkt->nalu_type; + if (v == kStapA) { + *ppayload = new SrsRtpSTAPPayload(); + } else if (v == kFuA) { + *ppayload = pkt->reuse_fua(); + } else { + *ppayload = pkt->reuse_raw(); + } } +} + +srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt) +{ + pkt->is_first_packet_of_frame = true; + pkt->is_last_packet_of_frame = true; + pkt->is_key_frame = true; // TODO: FIXME: Error check. rtp_audio_queue->consume(pkt); @@ -1940,42 +1926,24 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpSharedPacket* pkt) check_send_nacks(rtp_audio_queue, audio_ssrc); - return collect_audio_frame(); + return collect_audio_frames(); } -srs_error_t SrsRtcPublisher::collect_audio_frame() +srs_error_t SrsRtcPublisher::collect_audio_frames() { srs_error_t err = srs_success; - std::vector<std::vector<SrsRtpSharedPacket*> > framess; - rtp_audio_queue->collect_frames(framess); - - for (size_t i = 0; i < framess.size(); ++i) { - vector<SrsRtpSharedPacket*> frames = framess[i]; - - for (size_t j = 0; j < frames.size(); ++j) { - SrsRtpSharedPacket* pkt = frames[j]; + std::vector<std::vector<SrsRtpPacket2*> > frames; + rtp_audio_queue->collect_frames(frames); - // TODO: FIXME: Transcode OPUS to AAC. - if (pkt->rtp_payload_size() > 0) { - SrsMessageHeader header; - header.message_type = RTMP_MSG_AudioMessage; - // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = pkt->rtp_header.get_timestamp() / 90; - - SrsSharedPtrMessage msg; - // TODO: FIXME: Check error. - msg.create(&header, NULL, 0); + for (size_t i = 0; i < frames.size(); ++i) { + vector<SrsRtpPacket2*>& packets = frames[i]; - SrsSample sample; - sample.size = pkt->rtp_payload_size(); - sample.bytes = new char[sample.size]; - memcpy((void*)sample.bytes, pkt->rtp_payload(), sample.size); - msg.set_extra_payloads(&sample, 1); + for (size_t j = 0; j < packets.size(); ++j) { + SrsRtpPacket2* pkt = packets[j]; - // TODO: FIXME: Check error. - source->on_rtc_audio(&msg); - } + // TODO: FIXME: Check error. + do_collect_audio_frame(pkt); srs_freep(pkt); } @@ -1984,14 +1952,64 @@ srs_error_t SrsRtcPublisher::collect_audio_frame() return err; } -srs_error_t SrsRtcPublisher::on_video(SrsRtpSharedPacket* pkt) +srs_error_t SrsRtcPublisher::do_collect_audio_frame(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; - pkt->rtp_payload_header = new SrsRtpH264Header(); + SrsRtpRawPayload* payload = dynamic_cast<SrsRtpRawPayload*>(pkt->payload); - if ((err = rtp_h264_demuxer->parse(pkt)) != srs_success) { - return srs_error_wrap(err, "rtp h264 demux failed"); + if (!payload) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "OPUS payload"); + } + + // TODO: FIXME: Transcode OPUS to AAC. + if (!payload->nn_payload) { + return err; + } + + SrsMessageHeader header; + header.message_type = RTMP_MSG_AudioMessage; + // TODO: FIXME: Maybe the tbn is not 90k. + header.timestamp = pkt->rtp_header.get_timestamp() / 90; + + SrsSharedPtrMessage msg; + // TODO: FIXME: Check error. + msg.create(&header, NULL, 0); + + SrsSample sample; + sample.size = payload->nn_payload; + sample.bytes = new char[sample.size]; + memcpy((void*)sample.bytes, payload->payload, sample.size); + msg.set_extra_payloads(&sample, 1); + + // TODO: FIXME: Check error. + source->on_rtc_audio(&msg); + + return err; +} + +srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) +{ + uint8_t v = (uint8_t)pkt->nalu_type; + if (v == kFuA) { + SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload); + if (!payload) { + srs_freep(pkt); + return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload"); + } + + pkt->is_first_packet_of_frame = payload->start; + pkt->is_last_packet_of_frame = payload->end; + pkt->is_key_frame = (payload->nalu_type == SrsAvcNaluTypeIDR); + } else { + pkt->is_first_packet_of_frame = true; + pkt->is_last_packet_of_frame = true; + + if (v == kStapA) { + pkt->is_key_frame = true; + } else { + pkt->is_key_frame = (pkt->nalu_type == SrsAvcNaluTypeIDR); + } } // TODO: FIXME: Error check. @@ -2004,155 +2022,200 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpSharedPacket* pkt) check_send_nacks(rtp_video_queue, video_ssrc); - return collect_video_frame(); + return collect_video_frames(); } -srs_error_t SrsRtcPublisher::collect_video_frame() +srs_error_t SrsRtcPublisher::collect_video_frames() { - srs_error_t err = srs_success; - - std::vector<std::vector<SrsRtpSharedPacket*> > frames; + std::vector<std::vector<SrsRtpPacket2*> > frames; rtp_video_queue->collect_frames(frames); for (size_t i = 0; i < frames.size(); ++i) { - if (!frames[i].empty()) { - srs_verbose("collect %d video frames, seq range %u,%u", - frames.size(), frames[i].front()->rtp_header.get_sequence(), - frames[i].back()->rtp_header.get_sequence()); - } - - int frame_size = 5; - vector<uint32_t> nalu_lens; - cal_rtp_frame_size(frames[i], frame_size, nalu_lens); - - uint8_t* frame_buffer = new uint8_t[frame_size]; - // Skip flv video tag header. - int frame_buffer_index = 5; - - bool video_header_change = false; - int64_t timestamp = 0; - - bool idr = false; - size_t len_index = 0; - for (size_t n = 0; n < frames[i].size(); ++n) { - SrsRtpH264Header* rtp_h264_header = dynamic_cast<SrsRtpH264Header*>(frames[i][n]->rtp_payload_header); - for (size_t j = 0; j < rtp_h264_header->nalu_offset.size(); ++j) { - timestamp = frames[i][n]->rtp_header.get_timestamp(); - - uint8_t* p = reinterpret_cast<uint8_t*>(frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first); - if (rtp_h264_header->nalu_type != kFuA) { - if ((p[0] & kNalTypeMask) == SrsAvcNaluTypeSPS) { - string cur_sps = string((char*)p, rtp_h264_header->nalu_offset[j].second); - if (!cur_sps.empty() && sps != cur_sps) { - video_header_change = true; - sps = cur_sps; - } - } else if ((p[0] & kNalTypeMask) == SrsAvcNaluTypePPS) { - string cur_pps = string((char*)p, rtp_h264_header->nalu_offset[j].second); - if (!cur_pps.empty() && pps != cur_pps) { - video_header_change = true; - pps = cur_pps; - } - } else if (((p[0] & kNalTypeMask) != SrsAvcNaluTypeAccessUnitDelimiter) && ((p[0] & kNalTypeMask) != SrsAvcNaluTypeSEI)) { - uint32_t len = nalu_lens[len_index++]; - SrsBuffer stream((char*)frame_buffer + frame_buffer_index, 4); - stream.write_4bytes(len); - frame_buffer_index += 4; - memcpy(frame_buffer + frame_buffer_index, p, rtp_h264_header->nalu_offset[j].second); - frame_buffer_index += rtp_h264_header->nalu_offset[j].second; - } - } else { - if (frames[i][n]->rtp_payload_header->is_first_packet_of_frame) { - uint32_t len = nalu_lens[len_index++]; - SrsBuffer stream((char*)frame_buffer + frame_buffer_index, 4); - stream.write_4bytes(len); - frame_buffer_index += 4; - frame_buffer[frame_buffer_index++] = rtp_h264_header->nalu_header; - - if ((rtp_h264_header->nalu_header & kNalTypeMask) == SrsAvcNaluTypeIDR) { - idr = true; - } - } - memcpy(frame_buffer + frame_buffer_index, frames[i][n]->rtp_payload() + rtp_h264_header->nalu_offset[j].first, - rtp_h264_header->nalu_offset[j].second); - frame_buffer_index += rtp_h264_header->nalu_offset[j].second; - } - } - srs_freep(frames[i][n]); - } - - if (video_header_change) { - srs_verbose("sps/pps change or init"); - - uint8_t* video_header = new uint8_t[1500]; - SrsBuffer *stream = new SrsBuffer((char*)video_header, 1500); - SrsAutoFree(SrsBuffer, stream); - stream->write_1bytes(0x17); - stream->write_1bytes(0x00); - stream->write_1bytes(0x00); - stream->write_1bytes(0x00); - stream->write_1bytes(0x00); - - // FIXME: Replace magic number. - stream->write_1bytes(0x01); - stream->write_1bytes(0x42); - stream->write_1bytes(0xC0); - stream->write_1bytes(0x1E); - stream->write_1bytes(0xFF); - stream->write_1bytes(0xE1); - - stream->write_2bytes(sps.size()); - stream->write_string(sps); - - stream->write_1bytes(0x01); - - stream->write_2bytes(pps.size()); - stream->write_string(pps); - - SrsMessageHeader header; - header.message_type = RTMP_MSG_VideoMessage; - // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = timestamp / 90; - SrsCommonMessage* shared_video = new SrsCommonMessage(); - SrsAutoFree(SrsCommonMessage, shared_video); - // TODO: FIXME: Check error. - shared_video->create(&header, reinterpret_cast<char*>(video_header), stream->pos()); - srs_error_t e = source->on_video(shared_video); - if (e != srs_success) { - srs_warn("on video header err=%s", srs_error_desc(e).c_str()); - srs_error_reset(e); - } + vector<SrsRtpPacket2*>& packets = frames[i]; + if (packets.empty()) { + continue; + } + + // TODO: FIXME: Check error. + do_collect_video_frame(packets); - srs_verbose("rtp on video header"); + for (size_t j = 0; j < packets.size(); ++j) { + SrsRtpPacket2* pkt = packets[j]; + srs_freep(pkt); } + } - if (!sps.empty() && !pps.empty()) { - if (idr) { - frame_buffer[0] = 0x17; - } else { - frame_buffer[0] = 0x27; - } - frame_buffer[1] = 0x01; - frame_buffer[2] = 0x00; - frame_buffer[3] = 0x00; - frame_buffer[4] = 0x00; - - SrsMessageHeader header; - header.message_type = RTMP_MSG_VideoMessage; - // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = timestamp / 90; - SrsCommonMessage* shared_video = new SrsCommonMessage(); - SrsAutoFree(SrsCommonMessage, shared_video); - // TODO: FIXME: Check error. - shared_video->create(&header, reinterpret_cast<char*>(frame_buffer), frame_buffer_index); - srs_error_t e = source->on_video(shared_video); - if (e != srs_success) { - srs_warn("on video err=%s", srs_error_desc(e).c_str()); + return srs_success; +} + +srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector<SrsRtpPacket2*>& packets) +{ + srs_error_t err = srs_success; + + // Although a video frame may contain many packets, they share the same NALU type. + SrsRtpPacket2* head = packets.at(0); + SrsAvcNaluType nalu_type = head->nalu_type; + int64_t timestamp = head->rtp_header.get_timestamp(); + + if (nalu_type == (SrsAvcNaluType)kFuA) { + // For FU-A, there must be more than one packets. + if (packets.size() < 2) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A %d packets", packets.size()); + } + } else { + // For others type, should be one packet for one frame. + if (packets.size() != 1) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "NonFU-A %d packets", packets.size()); + } + } + + // For FU-A, group packets to one video frame. + if (nalu_type == (SrsAvcNaluType)kFuA) { + int nn_payload = 0; + for (size_t i = 0; i < packets.size(); ++i) { + SrsRtpPacket2* pkt = packets[i]; + SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload); + if (!payload) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload"); } + nn_payload += payload->size; + } + if (!nn_payload) { + return err; + } + + // TODO: FIXME: Directly covert to sample for performance. + // 1 byte NALU header. + // 5 bytes FLV tag header. + nn_payload += 1 + 5; + char* data = new char[nn_payload]; + SrsRtpFUAPayload2* head_payload = dynamic_cast<SrsRtpFUAPayload2*>(head->payload); + + char* p = data + 5; + *p++ = head_payload->nri | head_payload->nalu_type; + + for (size_t i = 0; i < packets.size(); ++i) { + SrsRtpPacket2* pkt = packets[i]; + SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload); + memcpy(p, payload->payload, payload->size); + p += payload->size; } + + if (head_payload->nalu_type == SrsAvcNaluTypeIDR) { + data[0] = 0x17; + } else { + data[0] = 0x27; + } + data[1] = 0x01; + data[2] = 0x00; + data[3] = 0x00; + data[4] = 0x00; + + SrsMessageHeader header; + header.message_type = RTMP_MSG_VideoMessage; + // TODO: FIXME: Maybe the tbn is not 90k. + header.timestamp = timestamp / 90; + SrsCommonMessage* shared_video = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, shared_video); + // TODO: FIXME: Check error. + shared_video->create(&header, data, nn_payload); + return source->on_video(shared_video); + } + + // For STAP-A, it must be SPS/PPS, and only one packet. + if (nalu_type == (SrsAvcNaluType)kStapA) { + SrsRtpPacket2* pkt = head; + SrsRtpSTAPPayload* payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload); + if (!payload) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload"); + } + if (payload->nalus.size() != 2) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload %d nalus", payload->nalus.size()); + } + + SrsSample* sps = payload->nalus[0]; + SrsSample* pps = payload->nalus[1]; + if (!sps->size || !pps->size) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload %d sps, %d pps", sps->size, pps->size); + } + + // TODO: FIXME: Directly covert to sample for performance. + // 5 bytes flv tag header. + // 6 bytes sps/pps sequence header. + // 1 byte seperator between sps and pps. + int nn_payload = sps->size + pps->size + 5 + 6 + 1; + char* data = new char[nn_payload]; + SrsBuffer buf(data, nn_payload); + buf.write_1bytes(0x17); + buf.write_1bytes(0x00); + buf.write_1bytes(0x00); + buf.write_1bytes(0x00); + buf.write_1bytes(0x00); + + // FIXME: Replace magic number for avc_demux_sps_pps. + buf.write_1bytes(0x01); + buf.write_1bytes(0x42); + buf.write_1bytes(0xC0); + buf.write_1bytes(0x1E); + buf.write_1bytes(0xFF); + buf.write_1bytes(0xE1); + + buf.write_2bytes(sps->size); + buf.write_string(sps->bytes); + + buf.write_1bytes(0x01); + + buf.write_2bytes(pps->size); + buf.write_string(pps->bytes); + + SrsMessageHeader header; + header.message_type = RTMP_MSG_VideoMessage; + // TODO: FIXME: Maybe the tbn is not 90k. + header.timestamp = timestamp / 90; + SrsCommonMessage* shared_video = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, shared_video); + // TODO: FIXME: Check error. + shared_video->create(&header, data, nn_payload); + return source->on_video(shared_video); } - return err; + // For RAW NALU, should be one RAW packet. + SrsRtpPacket2* pkt = head; + SrsRtpRawPayload* payload = dynamic_cast<SrsRtpRawPayload*>(pkt->payload); + if (!payload) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "RAW-NALU payload"); + } + if (!payload->nn_payload) { + return err; + } + + // TODO: FIXME: Directly covert to sample for performance. + // 1 byte NALU header. + // 5 bytes FLV tag header. + int nn_payload = payload->nn_payload + 1 + 5; + char* data = new char[nn_payload]; + + if (nalu_type == SrsAvcNaluTypeIDR) { + data[0] = 0x17; + } else { + data[0] = 0x27; + } + data[1] = 0x01; + data[2] = 0x00; + data[3] = 0x00; + data[4] = 0x00; + + memcpy(data + 5, payload->payload, payload->nn_payload); + + SrsMessageHeader header; + header.message_type = RTMP_MSG_VideoMessage; + // TODO: FIXME: Maybe the tbn is not 90k. + header.timestamp = timestamp / 90; + SrsCommonMessage* shared_video = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, shared_video); + // TODO: FIXME: Check error. + shared_video->create(&header, data, nn_payload); + return source->on_video(shared_video); } void SrsRtcPublisher::request_keyframe() @@ -2211,16 +2274,67 @@ SrsRtcSession::~SrsRtcSession() srs_freep(sendonly_skt); } +SrsSdp* SrsRtcSession::get_local_sdp() +{ + return &local_sdp; +} + void SrsRtcSession::set_local_sdp(const SrsSdp& sdp) { local_sdp = sdp; } +SrsSdp* SrsRtcSession::get_remote_sdp() +{ + return &remote_sdp; +} + +void SrsRtcSession::set_remote_sdp(const SrsSdp& sdp) +{ + remote_sdp = sdp; +} + +SrsRtcSessionStateType SrsRtcSession::get_session_state() +{ + return session_state; +} + +void SrsRtcSession::set_session_state(SrsRtcSessionStateType state) +{ + session_state = state; +} + +std::string SrsRtcSession::id() const +{ + return peer_id + "_" + username; +} + + +std::string SrsRtcSession::get_peer_id() const +{ + return peer_id; +} + +void SrsRtcSession::set_peer_id(const std::string& id) +{ + peer_id = id; +} + +void SrsRtcSession::set_encrypt(bool v) +{ + encrypt = v; +} + void SrsRtcSession::switch_to_context() { _srs_context->set_id(cid); } +int SrsRtcSession::context_id() +{ + return cid; +} + srs_error_t SrsRtcSession::initialize() { srs_error_t err = srs_success; @@ -2290,76 +2404,292 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) return err; } -#ifdef SRS_OSX -// These functions are similar to the older byteorder(3) family of functions. -// For example, be32toh() is identical to ntohl(). -// @see https://linux.die.net/man/3/be32toh -#define be32toh ntohl -#endif +srs_error_t SrsRtcSession::on_dtls(char* data, int nb_data) +{ + return dtls_session->on_dtls(data, nb_data); +} -srs_error_t SrsRtcSession::on_binding_request(SrsStunPacket* r) +srs_error_t SrsRtcSession::on_rtcp(char* data, int nb_data) { srs_error_t err = srs_success; - bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost); - if (strict_check && r->get_ice_controlled()) { - // @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1 - // TODO: Send 487 (Role Conflict) error response. - return srs_error_new(ERROR_RTC_STUN, "Peer must not in ice-controlled role in ice-lite mode."); + if (dtls_session == NULL) { + return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done"); } - SrsStunPacket stun_binding_response; - char buf[kRtpPacketSize]; - SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); - SrsAutoFree(SrsBuffer, stream); - - stun_binding_response.set_message_type(BindingResponse); - stun_binding_response.set_local_ufrag(r->get_remote_ufrag()); - stun_binding_response.set_remote_ufrag(r->get_local_ufrag()); - stun_binding_response.set_transcation_id(r->get_transcation_id()); - // FIXME: inet_addr is deprecated, IPV6 support - stun_binding_response.set_mapped_address(be32toh(inet_addr(sendonly_skt->get_peer_ip().c_str()))); - stun_binding_response.set_mapped_port(sendonly_skt->get_peer_port()); - - if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { - return srs_error_wrap(err, "stun binding response encode failed"); + char unprotected_buf[kRtpPacketSize]; + int nb_unprotected_buf = nb_data; + if ((err = dtls_session->unprotect_rtcp(unprotected_buf, data, nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "rtcp unprotect failed"); } - if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { - return srs_error_wrap(err, "stun binding response send failed"); + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = unprotected_buf; int len = nb_unprotected_buf; + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); } - if (get_session_state() == WAITING_STUN) { - set_session_state(DOING_DTLS_HANDSHAKE); + char* ph = unprotected_buf; + int nb_left = nb_unprotected_buf; + while (nb_left) { + uint8_t payload_type = ph[1]; + uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; - peer_id = sendonly_skt->get_peer_id(); - rtc_server->insert_into_id_sessions(peer_id, this); + int length = (length_4bytes + 1) * 4; - set_session_state(DOING_DTLS_HANDSHAKE); - srs_trace("rtc session=%s, STUN done, waitting DTLS handshake.", id().c_str()); - } + if (length > nb_unprotected_buf) { + return srs_error_new(ERROR_RTC_RTCP, "invalid rtcp packet, length=%u", length); + } - if (blackhole && blackhole_addr && blackhole_stfd) { - // Ignore any error for black-hole. - void* p = stream->data(); int len = stream->pos(); - srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + srs_verbose("on rtcp, payload_type=%u", payload_type); + + switch (payload_type) { + case kSR: { + err = on_rtcp_sender_report(ph, length); + break; + } + case kRR: { + err = on_rtcp_receiver_report(ph, length); + break; + } + case kSDES: { + break; + } + case kBye: { + break; + } + case kApp: { + break; + } + case kRtpFb: { + err = on_rtcp_feedback(ph, length); + break; + } + case kPsFb: { + err = on_rtcp_ps_feedback(ph, length); + break; + } + case kXR: { + err = on_rtcp_xr(ph, length); + break; + } + default:{ + return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type); + break; + } + } + + if (err != srs_success) { + return srs_error_wrap(err, "rtcp"); + } + + ph += length; + nb_left -= length; } return err; } -srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf) +srs_error_t SrsRtcSession::on_rtp(char* data, int nb_data) { srs_error_t err = srs_success; - if (nb_buf < 12) { - return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp feedback packet, nb_buf=%d", nb_buf); + if (publisher == NULL) { + return srs_error_new(ERROR_RTC_RTCP, "rtc publisher null"); } - SrsBuffer* stream = new SrsBuffer(buf, nb_buf); - SrsAutoFree(SrsBuffer, stream); + if (dtls_session == NULL) { + return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done"); + } - // @see: https://tools.ietf.org/html/rfc4585#section-6.1 + int nb_unprotected_buf = nb_data; + char* unprotected_buf = new char[kRtpPacketSize]; + if ((err = dtls_session->unprotect_rtp(unprotected_buf, data, nb_unprotected_buf)) != srs_success) { + srs_freepa(unprotected_buf); + return srs_error_wrap(err, "rtp unprotect failed"); + } + + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = unprotected_buf; int len = nb_unprotected_buf; + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + + return publisher->on_rtp(unprotected_buf, nb_unprotected_buf); +} + +srs_error_t SrsRtcSession::on_connection_established() +{ + srs_error_t err = srs_success; + + srs_trace("rtc session=%s, to=%dms connection established", id().c_str(), srsu2msi(sessionStunTimeout)); + + if (!local_sdp.media_descs_.empty() && + (local_sdp.media_descs_.back().recvonly_ || local_sdp.media_descs_.back().sendrecv_)) { + if ((err = start_publish()) != srs_success) { + return srs_error_wrap(err, "start publish"); + } + } + + if (!local_sdp.media_descs_.empty() && + (local_sdp.media_descs_.back().sendonly_ || local_sdp.media_descs_.back().sendrecv_)) { + if ((err = start_play()) != srs_success) { + return srs_error_wrap(err, "start play"); + } + } + + return err; +} + +srs_error_t SrsRtcSession::start_play() +{ + srs_error_t err = srs_success; + + srs_freep(sender); + sender = new SrsRtcSenderThread(this, _srs_context->get_id()); + + uint32_t video_ssrc = 0; + uint32_t audio_ssrc = 0; + uint16_t video_payload_type = 0; + uint16_t audio_payload_type = 0; + for (size_t i = 0; i < local_sdp.media_descs_.size(); ++i) { + const SrsMediaDesc& media_desc = local_sdp.media_descs_[i]; + if (media_desc.is_audio()) { + audio_ssrc = media_desc.ssrc_infos_[0].ssrc_; + audio_payload_type = media_desc.payload_types_[0].payload_type_; + } else if (media_desc.is_video()) { + video_ssrc = media_desc.ssrc_infos_[0].ssrc_; + video_payload_type = media_desc.payload_types_[0].payload_type_; + } + } + + if ((err = sender->initialize(video_ssrc, audio_ssrc, video_payload_type, audio_payload_type)) != srs_success) { + return srs_error_wrap(err, "SrsRtcSenderThread init"); + } + + if ((err = sender->start()) != srs_success) { + return srs_error_wrap(err, "start SrsRtcSenderThread"); + } + + return err; +} + +srs_error_t SrsRtcSession::start_publish() +{ + srs_error_t err = srs_success; + + srs_freep(publisher); + publisher = new SrsRtcPublisher(this); + + uint32_t video_ssrc = 0; + uint32_t audio_ssrc = 0; + for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { + const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i]; + if (media_desc.is_audio()) { + if (!media_desc.ssrc_infos_.empty()) { + audio_ssrc = media_desc.ssrc_infos_[0].ssrc_; + } + } else if (media_desc.is_video()) { + if (!media_desc.ssrc_infos_.empty()) { + video_ssrc = media_desc.ssrc_infos_[0].ssrc_; + } + } + } + + // FIXME: err process. + if ((err = publisher->initialize(video_ssrc, audio_ssrc, req)) != srs_success) { + return srs_error_wrap(err, "rtc publisher init"); + } + + return err; +} + +bool SrsRtcSession::is_stun_timeout() +{ + return last_stun_time + sessionStunTimeout < srs_get_system_time(); +} + +void SrsRtcSession::update_sendonly_socket(SrsUdpMuxSocket* skt) +{ + if (sendonly_skt) { + srs_trace("session %s address changed, update %s -> %s", + id().c_str(), sendonly_skt->get_peer_id().c_str(), skt->get_peer_id().c_str()); + } + + srs_freep(sendonly_skt); + sendonly_skt = skt->copy_sendonly(); +} + +#ifdef SRS_OSX +// These functions are similar to the older byteorder(3) family of functions. +// For example, be32toh() is identical to ntohl(). +// @see https://linux.die.net/man/3/be32toh +#define be32toh ntohl +#endif + +srs_error_t SrsRtcSession::on_binding_request(SrsStunPacket* r) +{ + srs_error_t err = srs_success; + + bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost); + if (strict_check && r->get_ice_controlled()) { + // @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1 + // TODO: Send 487 (Role Conflict) error response. + return srs_error_new(ERROR_RTC_STUN, "Peer must not in ice-controlled role in ice-lite mode."); + } + + SrsStunPacket stun_binding_response; + char buf[kRtpPacketSize]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + stun_binding_response.set_message_type(BindingResponse); + stun_binding_response.set_local_ufrag(r->get_remote_ufrag()); + stun_binding_response.set_remote_ufrag(r->get_local_ufrag()); + stun_binding_response.set_transcation_id(r->get_transcation_id()); + // FIXME: inet_addr is deprecated, IPV6 support + stun_binding_response.set_mapped_address(be32toh(inet_addr(sendonly_skt->get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(sendonly_skt->get_peer_port()); + + if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { + return srs_error_wrap(err, "stun binding response encode failed"); + } + + if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { + return srs_error_wrap(err, "stun binding response send failed"); + } + + if (get_session_state() == WAITING_STUN) { + set_session_state(DOING_DTLS_HANDSHAKE); + + peer_id = sendonly_skt->get_peer_id(); + rtc_server->insert_into_id_sessions(peer_id, this); + + set_session_state(DOING_DTLS_HANDSHAKE); + srs_trace("rtc session=%s, STUN done, waitting DTLS handshake.", id().c_str()); + } + + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = stream->data(); int len = stream->pos(); + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + + return err; +} + +srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf) +{ + srs_error_t err = srs_success; + + if (nb_buf < 12) { + return srs_error_new(ERROR_RTC_RTCP_CHECK, "invalid rtp feedback packet, nb_buf=%d", nb_buf); + } + + SrsBuffer* stream = new SrsBuffer(buf, nb_buf); + SrsAutoFree(SrsBuffer, stream); + + // @see: https://tools.ietf.org/html/rfc4585#section-6.1 /* 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 @@ -2571,221 +2901,6 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ return err; } -srs_error_t SrsRtcSession::on_connection_established() -{ - srs_error_t err = srs_success; - - srs_trace("rtc session=%s, to=%dms connection established", id().c_str(), srsu2msi(sessionStunTimeout)); - - if (!local_sdp.media_descs_.empty() && - (local_sdp.media_descs_.back().recvonly_ || local_sdp.media_descs_.back().sendrecv_)) { - if ((err = start_publish()) != srs_success) { - return srs_error_wrap(err, "start publish"); - } - } - - if (!local_sdp.media_descs_.empty() && - (local_sdp.media_descs_.back().sendonly_ || local_sdp.media_descs_.back().sendrecv_)) { - if ((err = start_play()) != srs_success) { - return srs_error_wrap(err, "start play"); - } - } - - return err; -} - -srs_error_t SrsRtcSession::start_play() -{ - srs_error_t err = srs_success; - - srs_freep(sender); - sender = new SrsRtcSenderThread(this, _srs_context->get_id()); - - uint32_t video_ssrc = 0; - uint32_t audio_ssrc = 0; - uint16_t video_payload_type = 0; - uint16_t audio_payload_type = 0; - for (size_t i = 0; i < local_sdp.media_descs_.size(); ++i) { - const SrsMediaDesc& media_desc = local_sdp.media_descs_[i]; - if (media_desc.is_audio()) { - audio_ssrc = media_desc.ssrc_infos_[0].ssrc_; - audio_payload_type = media_desc.payload_types_[0].payload_type_; - } else if (media_desc.is_video()) { - video_ssrc = media_desc.ssrc_infos_[0].ssrc_; - video_payload_type = media_desc.payload_types_[0].payload_type_; - } - } - - if ((err = sender->initialize(video_ssrc, audio_ssrc, video_payload_type, audio_payload_type)) != srs_success) { - return srs_error_wrap(err, "SrsRtcSenderThread init"); - } - - if ((err = sender->start()) != srs_success) { - return srs_error_wrap(err, "start SrsRtcSenderThread"); - } - - return err; -} - -srs_error_t SrsRtcSession::start_publish() -{ - srs_error_t err = srs_success; - - srs_freep(publisher); - publisher = new SrsRtcPublisher(this); - - uint32_t video_ssrc = 0; - uint32_t audio_ssrc = 0; - for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { - const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i]; - if (media_desc.is_audio()) { - if (!media_desc.ssrc_infos_.empty()) { - audio_ssrc = media_desc.ssrc_infos_[0].ssrc_; - } - } else if (media_desc.is_video()) { - if (!media_desc.ssrc_infos_.empty()) { - video_ssrc = media_desc.ssrc_infos_[0].ssrc_; - } - } - } - - // FIXME: err process. - if ((err = publisher->initialize(video_ssrc, audio_ssrc, req)) != srs_success) { - return srs_error_wrap(err, "rtc publisher init"); - } - - return err; -} - -bool SrsRtcSession::is_stun_timeout() -{ - return last_stun_time + sessionStunTimeout < srs_get_system_time(); -} - -srs_error_t SrsRtcSession::on_dtls(char* data, int nb_data) -{ - return dtls_session->on_dtls(data, nb_data); -} - -srs_error_t SrsRtcSession::on_rtcp(char* data, int nb_data) -{ - srs_error_t err = srs_success; - - if (dtls_session == NULL) { - return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done"); - } - - char unprotected_buf[kRtpPacketSize]; - int nb_unprotected_buf = nb_data; - if ((err = dtls_session->unprotect_rtcp(unprotected_buf, data, nb_unprotected_buf)) != srs_success) { - return srs_error_wrap(err, "rtcp unprotect failed"); - } - - if (blackhole && blackhole_addr && blackhole_stfd) { - // Ignore any error for black-hole. - void* p = unprotected_buf; int len = nb_unprotected_buf; - srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); - } - - char* ph = unprotected_buf; - int nb_left = nb_unprotected_buf; - while (nb_left) { - uint8_t payload_type = ph[1]; - uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; - - int length = (length_4bytes + 1) * 4; - - if (length > nb_unprotected_buf) { - return srs_error_new(ERROR_RTC_RTCP, "invalid rtcp packet, length=%u", length); - } - - srs_verbose("on rtcp, payload_type=%u", payload_type); - - switch (payload_type) { - case kSR: { - err = on_rtcp_sender_report(ph, length); - break; - } - case kRR: { - err = on_rtcp_receiver_report(ph, length); - break; - } - case kSDES: { - break; - } - case kBye: { - break; - } - case kApp: { - break; - } - case kRtpFb: { - err = on_rtcp_feedback(ph, length); - break; - } - case kPsFb: { - err = on_rtcp_ps_feedback(ph, length); - break; - } - case kXR: { - err = on_rtcp_xr(ph, length); - break; - } - default:{ - return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type); - break; - } - } - - if (err != srs_success) { - return srs_error_wrap(err, "rtcp"); - } - - ph += length; - nb_left -= length; - } - - return err; -} - -srs_error_t SrsRtcSession::on_rtp(char* data, int nb_data) -{ - srs_error_t err = srs_success; - - if (publisher == NULL) { - return srs_error_new(ERROR_RTC_RTCP, "rtc publisher null"); - } - - if (dtls_session == NULL) { - return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done"); - } - - char* unprotected_buf = new char[kRtpPacketSize]; - int nb_unprotected_buf = nb_data; - if ((err = dtls_session->unprotect_rtp(unprotected_buf, data, nb_unprotected_buf)) != srs_success) { - return srs_error_wrap(err, "rtp unprotect failed"); - } - - if (blackhole && blackhole_addr && blackhole_stfd) { - // Ignore any error for black-hole. - void* p = unprotected_buf; int len = nb_unprotected_buf; - srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); - } - - return publisher->on_rtp(unprotected_buf, nb_unprotected_buf); -} - -void SrsRtcSession::update_sendonly_socket(SrsUdpMuxSocket* skt) -{ - if (sendonly_skt) { - srs_trace("session %s address changed, update %s -> %s", - id().c_str(), sendonly_skt->get_peer_id().c_str(), skt->get_peer_id().c_str()); - } - - srs_freep(sendonly_skt); - sendonly_skt = skt->copy_sendonly(); -} - SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s) { lfd = NULL; @@ -3161,6 +3276,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) rtc_session->switch_to_context(); } + // For STUN, the peer address may change. if (is_stun((uint8_t*)data, size)) { SrsStunPacket sr; if ((err = sr.decode(data, size)) != srs_success) { @@ -3181,22 +3297,20 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) } return rtc_session->on_stun(skt, &sr); - } else if (is_dtls((uint8_t*)data, size)) { - if (rtc_session == NULL) { - return srs_error_new(ERROR_RTC_STUN, "can not find rtc_session, peer_id=%s", skt->get_peer_id().c_str()); - } + } + // For DTLS, RTCP or RTP, which does not support peer address changing. + if (rtc_session == NULL) { + return srs_error_new(ERROR_RTC_STUN, "can not find rtc_session, peer_id=%s", skt->get_peer_id().c_str()); + } + + if (is_dtls((uint8_t*)data, size)) { return rtc_session->on_dtls(data, size); } else if (is_rtp_or_rtcp((uint8_t*)data, size)) { - if (rtc_session == NULL) { - return srs_error_new(ERROR_RTC_STUN, "can not find rtc_session, peer_id=%s", skt->get_peer_id().c_str()); - } - if (is_rtcp((uint8_t*)data, size)) { return rtc_session->on_rtcp(data, size); - } else { - return rtc_session->on_rtp(data, size); } + return rtc_session->on_rtp(data, size); } return srs_error_new(ERROR_RTC_UDP, "unknown udp packet type"); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 927ad78d6..65e959e32 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -33,6 +33,7 @@ #include <srs_app_hourglass.hpp> #include <srs_app_sdp.hpp> #include <srs_app_reload.hpp> +#include <srs_kernel_rtp.hpp> #include <string> #include <map> @@ -54,6 +55,8 @@ class ISrsUdpSender; class SrsRtpQueue; class SrsRtpH264Demuxer; class SrsRtpOpusDemuxer; +class SrsRtpPacket2; +class ISrsCodec; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -248,7 +251,7 @@ private: srs_error_t package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets); }; -class SrsRtcPublisher : virtual public ISrsHourGlass +class SrsRtcPublisher : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler { private: SrsHourGlass* report_timer; @@ -258,7 +261,6 @@ private: uint32_t audio_ssrc; private: SrsRtpH264Demuxer* rtp_h264_demuxer; - SrsRtpOpusDemuxer* rtp_opus_demuxer; SrsRtpQueue* rtp_video_queue; SrsRtpQueue* rtp_audio_queue; private: @@ -283,11 +285,14 @@ private: srs_error_t send_rtcp_fb_pli(uint32_t ssrc); public: srs_error_t on_rtp(char* buf, int nb_buf); + virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload); private: - srs_error_t on_audio(SrsRtpSharedPacket* pkt); - srs_error_t collect_audio_frame(); - srs_error_t on_video(SrsRtpSharedPacket* pkt); - srs_error_t collect_video_frame(); + srs_error_t on_audio(SrsRtpPacket2* pkt); + srs_error_t collect_audio_frames(); + srs_error_t do_collect_audio_frame(SrsRtpPacket2* packet); + srs_error_t on_video(SrsRtpPacket2* pkt); + srs_error_t collect_video_frames(); + srs_error_t do_collect_video_frame(std::vector<SrsRtpPacket2*>& packets); public: void request_keyframe(); // interface ISrsHourGlass @@ -334,25 +339,18 @@ public: SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& un, int context_id); virtual ~SrsRtcSession(); public: - SrsSdp* get_local_sdp() { return &local_sdp; } + SrsSdp* get_local_sdp(); void set_local_sdp(const SrsSdp& sdp); - - SrsSdp* get_remote_sdp() { return &remote_sdp; } - void set_remote_sdp(const SrsSdp& sdp) { remote_sdp = sdp; } - - SrsRtcSessionStateType get_session_state() { return session_state; } - void set_session_state(SrsRtcSessionStateType state) { session_state = state; } - - std::string id() const { return peer_id + "_" + username; } - - std::string get_peer_id() const { return peer_id; } - void set_peer_id(const std::string& id) { peer_id = id; } - - void set_encrypt(bool v) { encrypt = v; } - + SrsSdp* get_remote_sdp(); + void set_remote_sdp(const SrsSdp& sdp); + SrsRtcSessionStateType get_session_state(); + void set_session_state(SrsRtcSessionStateType state); + std::string id() const; + std::string get_peer_id() const; + void set_peer_id(const std::string& id); + void set_encrypt(bool v); void switch_to_context(); - int context_id() { return cid; } - + int context_id(); public: srs_error_t initialize(); // The peer address may change, we can identify that by STUN messages. @@ -361,7 +359,6 @@ public: srs_error_t on_rtcp(char* data, int nb_data); srs_error_t on_rtp(char* data, int nb_data); public: - srs_error_t send_client_hello(); srs_error_t on_connection_established(); srs_error_t start_play(); srs_error_t start_publish(); diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 99c1dc6b1..ad399ea4a 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -141,8 +141,8 @@ SrsRtpRingBuffer::SrsRtpRingBuffer(size_t capacity) capacity_ = capacity; initialized_ = false; - queue_ = new SrsRtpSharedPacket*[capacity_]; - memset(queue_, 0, sizeof(SrsRtpSharedPacket*) * capacity); + queue_ = new SrsRtpPacket2*[capacity_]; + memset(queue_, 0, sizeof(SrsRtpPacket2*) * capacity); } SrsRtpRingBuffer::~SrsRtpRingBuffer() @@ -150,9 +150,9 @@ SrsRtpRingBuffer::~SrsRtpRingBuffer() srs_freepa(queue_); } -void SrsRtpRingBuffer::set(uint16_t at, SrsRtpSharedPacket* pkt) +void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt) { - SrsRtpSharedPacket* p = queue_[at % capacity_]; + SrsRtpPacket2* p = queue_[at % capacity_]; if (p) { srs_freep(p); @@ -161,9 +161,11 @@ void SrsRtpRingBuffer::set(uint16_t at, SrsRtpSharedPacket* pkt) queue_[at % capacity_] = pkt; } -void SrsRtpRingBuffer::remove(uint16_t at) +void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high) { - set(at, NULL); + for (uint16_t s = low; s != high; ++s) { + queue_[s % capacity_] = NULL; + } } uint16_t SrsRtpRingBuffer::next_start_of_frame() @@ -173,8 +175,8 @@ uint16_t SrsRtpRingBuffer::next_start_of_frame() } for (uint16_t s = low_ + 1 ; s != high_; ++s) { - SrsRtpSharedPacket*& pkt = queue_[s % capacity_]; - if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) { + SrsRtpPacket2*& pkt = queue_[s % capacity_]; + if (pkt && pkt->is_first_packet_of_frame) { return s; } } @@ -189,8 +191,8 @@ uint16_t SrsRtpRingBuffer::next_keyframe() } for (uint16_t s = low_ + 1 ; s != high_; ++s) { - SrsRtpSharedPacket*& pkt = queue_[s % capacity_]; - if (pkt && pkt->rtp_payload_header->is_key_frame && pkt->rtp_payload_header->is_first_packet_of_frame) { + SrsRtpPacket2*& pkt = queue_[s % capacity_]; + if (pkt && pkt->is_key_frame && pkt->is_first_packet_of_frame) { return s; } } @@ -269,7 +271,7 @@ SrsRtpQueue::~SrsRtpQueue() srs_freep(nack_); } -srs_error_t SrsRtpQueue::consume(SrsRtpSharedPacket* pkt) +srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -337,8 +339,8 @@ srs_error_t SrsRtpQueue::consume(SrsRtpSharedPacket* pkt) queue_->advance_to(next + 1); } - // TODO: FIXME: Change to ptr of ptr. - queue_->set(seq, pkt->copy()); + // Save packet at the position seq. + queue_->set(seq, pkt); // Collect packets to frame when: // 1. Marker bit means the last packet of frame received. @@ -351,7 +353,7 @@ srs_error_t SrsRtpQueue::consume(SrsRtpSharedPacket* pkt) return err; } -void SrsRtpQueue::collect_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames) +void SrsRtpQueue::collect_frames(std::vector<std::vector<SrsRtpPacket2*> >& frames) { frames.swap(frames_); } @@ -446,33 +448,39 @@ void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end) void SrsRtpQueue::collect_packet() { - vector<SrsRtpSharedPacket*> frame; - for (uint16_t s = queue_->low(); s != queue_->high(); ++s) { - SrsRtpSharedPacket* pkt = queue_->at(s); + while (queue_->low() != queue_->high()) { + vector<SrsRtpPacket2*> frame; + for (uint16_t s = queue_->low(); s != queue_->high(); ++s) { + SrsRtpPacket2* pkt = queue_->at(s); - if (nack_->find(s) != NULL) { - srs_verbose("seq=%u, found in nack list when collect frame", s); - break; - } + // In NACK, never collect frame. + if (nack_->find(s) != NULL) { + srs_verbose("seq=%u, found in nack list when collect frame", s); + return; + } - // We must collect frame from first packet to last packet. - if (s == queue_->low() && pkt->rtp_payload_size() != 0 && !pkt->rtp_payload_header->is_first_packet_of_frame) { - break; - } + // Ignore when the first packet not the start. + if (s == queue_->low() && pkt->nn_original_payload && !pkt->is_first_packet_of_frame) { + return; + } - frame.push_back(pkt->copy()); - if (pkt->rtp_header.get_marker() || one_packet_per_frame_) { + // OK, collect packet to frame. + frame.push_back(pkt); + + // Not the last packet, continue to process next one. + if (pkt->rtp_header.get_marker() || one_packet_per_frame_) { + continue; + } + + // Done, we got the last packet of frame. nn_collected_frames++; frames_.push_back(frame); - frame.clear(); + + // Reset the range of packets to NULL in buffer. + queue_->reset(queue_->low(), s); srs_verbose("head seq=%u, update to %u because collect one full farme", queue_->low(), s + 1); queue_->advance_to(s + 1); } } - - // remove the tmp buffer - for (size_t i = 0; i < frame.size(); ++i) { - srs_freep(frame[i]); - } } diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 33c1d3057..2e948d88e 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -30,7 +30,7 @@ #include <vector> #include <map> -class SrsRtpSharedPacket; +class SrsRtpPacket2; class SrsRtpQueue; struct SrsNackOption @@ -119,7 +119,7 @@ private: // Capacity of the ring-buffer. uint16_t capacity_; // Ring bufer. - SrsRtpSharedPacket** queue_; + SrsRtpPacket2** queue_; // Increase one when uint16 flip back, for get_extended_highest_sequence. uint64_t nn_seq_flip_backs; // Whether initialized, because we use uint16 so we can't use -1. @@ -132,12 +132,17 @@ public: SrsRtpRingBuffer(size_t capacity); virtual ~SrsRtpRingBuffer(); public: + // Move the position of buffer. uint16_t low() { return low_; } uint16_t high() { return high_; } void advance_to(uint16_t seq) { low_ = seq; } - void set(uint16_t at, SrsRtpSharedPacket* pkt); - void remove(uint16_t at); - bool overflow() { return low_ + capacity_ < high_; } + // Free the packet at position. + void set(uint16_t at, SrsRtpPacket2* pkt); + void remove(uint16_t at) { set(at, NULL); } + // Directly reset range [low, high] to NULL. + void reset(uint16_t low, uint16_t high); + // Whether queue overflow or heavy(too many packets and need clear). + bool overflow() { return high_ - low_ < capacity_; } bool is_heavy() { return high_ - low_ >= capacity_ / 2; } // Get the next start packet of frame. // @remark If not found, return the low_, which should never be the "next" one, @@ -151,7 +156,7 @@ public: // Update the sequence, got the nack range by [low, high]. void update(uint16_t seq, bool startup, uint16_t& nack_low, uint16_t& nack_high); // Get the packet by seq. - SrsRtpSharedPacket* at(uint16_t seq) { return queue_[seq % capacity_]; } + SrsRtpPacket2* at(uint16_t seq) { return queue_[seq % capacity_]; } }; class SrsRtpQueue @@ -170,14 +175,14 @@ private: uint64_t num_of_packet_received_; uint64_t number_of_packet_lossed_; private: - std::vector<std::vector<SrsRtpSharedPacket*> > frames_; + std::vector<std::vector<SrsRtpPacket2*> > frames_; bool request_key_frame_; public: SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false); virtual ~SrsRtpQueue(); public: - srs_error_t consume(SrsRtpSharedPacket* pkt); - void collect_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames); + srs_error_t consume(SrsRtpPacket2* pkt); + void collect_frames(std::vector<std::vector<SrsRtpPacket2*> >& frames); bool should_request_key_frame(); void notify_drop_seq(uint16_t seq); void notify_nack_list_full(); diff --git a/trunk/src/kernel/srs_kernel_buffer.cpp b/trunk/src/kernel/srs_kernel_buffer.cpp index 7f62d6b5a..ced0f776e 100644 --- a/trunk/src/kernel/srs_kernel_buffer.cpp +++ b/trunk/src/kernel/srs_kernel_buffer.cpp @@ -58,11 +58,26 @@ SrsBuffer::~SrsBuffer() { } +char* SrsBuffer::data() +{ + return bytes; +} + +char* SrsBuffer::head() +{ + return p; +} + int SrsBuffer::size() { return nb_bytes; } +void SrsBuffer::set_size(int v) +{ + nb_bytes = v; +} + int SrsBuffer::pos() { return (int)(p - bytes); diff --git a/trunk/src/kernel/srs_kernel_buffer.hpp b/trunk/src/kernel/srs_kernel_buffer.hpp index ce8852c87..f2f73bf70 100644 --- a/trunk/src/kernel/srs_kernel_buffer.hpp +++ b/trunk/src/kernel/srs_kernel_buffer.hpp @@ -105,62 +105,62 @@ public: // Create buffer with data b and size nn. // @remark User must free the data b. SrsBuffer(char* b, int nn); - virtual ~SrsBuffer(); + ~SrsBuffer(); public: // Get the data and head of buffer. // current-bytes = head() = data() + pos() - inline char* data() { return bytes; } - inline char* head() { return p; } + char* data(); + char* head(); // Get the total size of buffer. // left-bytes = size() - pos() - virtual int size(); - void set_size(int v) { nb_bytes = v; } + int size(); + void set_size(int v); // Get the current buffer position. - virtual int pos(); + int pos(); // Left bytes in buffer, total size() minus the current pos(). - virtual int left(); + int left(); // Whether buffer is empty. - virtual bool empty(); + bool empty(); // Whether buffer is able to supply required size of bytes. // @remark User should check buffer by require then do read/write. // @remark Assert the required_size is not negative. - virtual bool require(int required_size); + bool require(int required_size); public: // Skip some size. // @param size can be any value. positive to forward; nagetive to backward. // @remark to skip(pos()) to reset buffer. // @remark assert initialized, the data() not NULL. - virtual void skip(int size); + void skip(int size); public: // Read 1bytes char from buffer. - virtual int8_t read_1bytes(); + int8_t read_1bytes(); // Read 2bytes int from buffer. - virtual int16_t read_2bytes(); + int16_t read_2bytes(); // Read 3bytes int from buffer. - virtual int32_t read_3bytes(); + int32_t read_3bytes(); // Read 4bytes int from buffer. - virtual int32_t read_4bytes(); + int32_t read_4bytes(); // Read 8bytes int from buffer. - virtual int64_t read_8bytes(); + int64_t read_8bytes(); // Read string from buffer, length specifies by param len. - virtual std::string read_string(int len); + std::string read_string(int len); // Read bytes from buffer, length specifies by param len. - virtual void read_bytes(char* data, int size); + void read_bytes(char* data, int size); public: // Write 1bytes char to buffer. - virtual void write_1bytes(int8_t value); + void write_1bytes(int8_t value); // Write 2bytes int to buffer. - virtual void write_2bytes(int16_t value); + void write_2bytes(int16_t value); // Write 4bytes int to buffer. - virtual void write_4bytes(int32_t value); + void write_4bytes(int32_t value); // Write 3bytes int to buffer. - virtual void write_3bytes(int32_t value); + void write_3bytes(int32_t value); // Write 8bytes int to buffer. - virtual void write_8bytes(int64_t value); + void write_8bytes(int64_t value); // Write string to buffer - virtual void write_string(std::string value); + void write_string(std::string value); // Write bytes to buffer - virtual void write_bytes(char* data, int size); + void write_bytes(char* data, int size); }; /** @@ -175,10 +175,10 @@ private: SrsBuffer* stream; public: SrsBitBuffer(SrsBuffer* b); - virtual ~SrsBitBuffer(); + ~SrsBitBuffer(); public: - virtual bool empty(); - virtual int8_t read_bit(); + bool empty(); + int8_t read_bit(); }; #endif diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index 594f5598e..f10bb608a 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -388,6 +388,36 @@ void SrsSharedPtrMessage::set_extra_payloads(SrsSample* payloads, int nn_payload memcpy((void*)ptr->extra_payloads, payloads, nn_payloads * sizeof(SrsSample)); } +int SrsSharedPtrMessage::nn_extra_payloads() +{ + return ptr->nn_extra_payloads; +} + +SrsSample* SrsSharedPtrMessage::extra_payloads() +{ + return ptr->extra_payloads; +} + +void SrsSharedPtrMessage::set_max_extra_payload(int v) +{ + ptr->nn_max_extra_payloads = v; +} + +int SrsSharedPtrMessage::nn_max_extra_payloads() +{ + return ptr->nn_max_extra_payloads; +} + +bool SrsSharedPtrMessage::has_idr() +{ + return ptr->has_idr; +} + +void SrsSharedPtrMessage::set_has_idr(bool v) +{ + ptr->has_idr = v; +} + void SrsSharedPtrMessage::set_samples(SrsSample* samples, int nn_samples) { srs_assert(nn_samples); @@ -398,6 +428,16 @@ void SrsSharedPtrMessage::set_samples(SrsSample* samples, int nn_samples) ptr->samples = new SrsSample[nn_samples]; memcpy((void*)ptr->samples, samples, nn_samples * sizeof(SrsSample)); } + +int SrsSharedPtrMessage::nn_samples() +{ + return ptr->nn_samples; +} + +SrsSample* SrsSharedPtrMessage::samples() +{ + return ptr->samples; +} #endif SrsFlvTransmuxer::SrsFlvTransmuxer() diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index 80d3ce542..7545aa5b7 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -367,18 +367,18 @@ public: // Set extra samples, for example, when we transcode an AAC audio packet to OPUS, // we may get more than one OPUS packets, we set these OPUS packets in extra payloads. void set_extra_payloads(SrsSample* payloads, int nn_payloads); - int nn_extra_payloads() { return ptr->nn_extra_payloads; } - SrsSample* extra_payloads() { return ptr->extra_payloads; } + int nn_extra_payloads(); + SrsSample* extra_payloads(); // The max extra payload size. - void set_max_extra_payload(int v) { ptr->nn_max_extra_payloads = v; } - int nn_max_extra_payloads() { return ptr->nn_max_extra_payloads; } + void set_max_extra_payload(int v); + int nn_max_extra_payloads(); // Whether samples has idr. - bool has_idr() { return ptr->has_idr; } - void set_has_idr(bool v) { ptr->has_idr = v; } + bool has_idr(); + void set_has_idr(bool v); // Set samples, each sample points to the address of payload. void set_samples(SrsSample* samples, int nn_samples); - int nn_samples() { return ptr->nn_samples; } - SrsSample* samples() { return ptr->samples; } + int nn_samples(); + SrsSample* samples(); #endif }; diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index fda53ef63..105f75a43 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -61,11 +61,11 @@ SrsRtpHeader::~SrsRtpHeader() { } -srs_error_t SrsRtpHeader::decode(SrsBuffer* stream) +srs_error_t SrsRtpHeader::decode(SrsBuffer* buf) { srs_error_t err = srs_success; - if (stream->size() < kRtpHeaderFixedSize) { + if (buf->size() < kRtpHeaderFixedSize) { return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect"); } @@ -84,60 +84,58 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* stream) +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ */ - uint8_t first = stream->read_1bytes(); + uint8_t first = buf->read_1bytes(); padding = (first & 0x20); extension = (first & 0x10); cc = (first & 0x0F); - uint8_t second = stream->read_1bytes(); + uint8_t second = buf->read_1bytes(); marker = (second & 0x80); payload_type = (second & 0x7F); - sequence = stream->read_2bytes(); - timestamp = stream->read_4bytes(); - ssrc = stream->read_4bytes(); + sequence = buf->read_2bytes(); + timestamp = buf->read_4bytes(); + ssrc = buf->read_4bytes(); - if ((size_t)stream->size() < header_size()) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect"); + if (!buf->require(nb_bytes())) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d+ bytes", nb_bytes()); } for (uint8_t i = 0; i < cc; ++i) { - csrc[i] = stream->read_4bytes(); + csrc[i] = buf->read_4bytes(); } if (extension) { - // TODO: - uint16_t profile_id = stream->read_2bytes(); - extension_length = stream->read_2bytes(); - // @see: https://tools.ietf.org/html/rfc3550#section-5.3.1 - stream->skip(extension_length * 4); + uint16_t profile_id = buf->read_2bytes(); + extension_length = buf->read_2bytes(); - srs_verbose("extension, profile_id=%u, length=%u", profile_id, extension_length); + // TODO: FIXME: Read extensions. + // @see: https://tools.ietf.org/html/rfc3550#section-5.3.1 + buf->skip(extension_length * 4); // @see: https://tools.ietf.org/html/rfc5285#section-4.2 if (profile_id == 0xBEDE) { + // TODO: FIXME: Implements it. } } if (padding) { - padding_length = *(reinterpret_cast<uint8_t*>(stream->data() + stream->size() - 1)); - if (padding_length > (stream->size() - stream->pos())) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "rtp payload incorrect"); + padding_length = *(reinterpret_cast<uint8_t*>(buf->data() + buf->size() - 1)); + if (!buf->require(padding_length)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "padding requires %d bytes", padding_length); } - - srs_verbose("offset=%d, padding_length=%u", stream->size(), padding_length); } return err; } -srs_error_t SrsRtpHeader::encode(SrsBuffer* stream) +srs_error_t SrsRtpHeader::encode(SrsBuffer* buf) { srs_error_t err = srs_success; // Encode the RTP fix header, 12bytes. // @see https://tools.ietf.org/html/rfc1889#section-5.1 - char* op = stream->head(); + char* op = buf->head(); char* p = op; // The version, padding, extension and cc, total 1 byte. @@ -190,24 +188,42 @@ srs_error_t SrsRtpHeader::encode(SrsBuffer* stream) } // Consume the data. - stream->skip(p - op); + buf->skip(p - op); return err; } -size_t SrsRtpHeader::header_size() +int SrsRtpHeader::nb_bytes() { return kRtpHeaderFixedSize + cc * 4 + (extension ? (extension_length + 1) * 4 : 0); } +ISrsRtpPacketDecodeHandler::ISrsRtpPacketDecodeHandler() +{ +} + +ISrsRtpPacketDecodeHandler::~ISrsRtpPacketDecodeHandler() +{ +} + SrsRtpPacket2::SrsRtpPacket2() { - payload = NULL; padding = 0; + payload = NULL; + decode_handler = NULL; + + is_first_packet_of_frame = false; + is_last_packet_of_frame = false; + is_key_frame = false; + nalu_type = SrsAvcNaluTypeReserved; cache_raw = new SrsRtpRawPayload(); cache_fua = new SrsRtpFUAPayload2(); cache_payload = 0; + + original_bytes = NULL; + nn_original_bytes = 0; + nn_original_payload = 0; } SrsRtpPacket2::~SrsRtpPacket2() @@ -220,6 +236,8 @@ SrsRtpPacket2::~SrsRtpPacket2() srs_freep(payload); srs_freep(cache_raw); srs_freep(cache_fua); + + srs_freepa(original_bytes); } void SrsRtpPacket2::set_padding(int size) @@ -270,7 +288,7 @@ SrsRtpFUAPayload2* SrsRtpPacket2::reuse_fua() int SrsRtpPacket2::nb_bytes() { if (!cache_payload) { - cache_payload = rtp_header.header_size() + (payload? payload->nb_bytes():0) + padding; + cache_payload = rtp_header.nb_bytes() + (payload? payload->nb_bytes():0) + padding; } return cache_payload; } @@ -284,7 +302,7 @@ srs_error_t SrsRtpPacket2::encode(SrsBuffer* buf) } if (payload && (err = payload->encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode payload"); + return srs_error_wrap(err, "rtp payload"); } if (padding > 0) { @@ -298,6 +316,44 @@ srs_error_t SrsRtpPacket2::encode(SrsBuffer* buf) return err; } +srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf) +{ + srs_error_t err = srs_success; + + if ((err = rtp_header.decode(buf)) != srs_success) { + return srs_error_wrap(err, "rtp header"); + } + + // We must skip the padding bytes before parsing payload. + padding = rtp_header.get_padding_length(); + if (!buf->require(padding)) { + return srs_error_wrap(err, "requires padding %d bytes", padding); + } + buf->set_size(buf->size() - padding); + + // Try to parse the NALU type for video decoder. + if (!buf->empty()) { + nalu_type = SrsAvcNaluType((uint8_t)(buf->head()[0] & 0x3f)); + } + + // If user set the decode handler, call it to set the payload. + if (decode_handler) { + decode_handler->on_before_decode_payload(this, buf, &payload); + nn_original_payload = buf->left(); + } + + // By default, we always use the RAW payload. + if (!payload) { + payload = reuse_raw(); + } + + if ((err = payload->decode(buf)) != srs_success) { + return srs_error_wrap(err, "rtp payload"); + } + + return err; +} + SrsRtpRawPayload::SrsRtpRawPayload() { payload = NULL; @@ -328,6 +384,18 @@ srs_error_t SrsRtpRawPayload::encode(SrsBuffer* buf) return srs_success; } +srs_error_t SrsRtpRawPayload::decode(SrsBuffer* buf) +{ + if (buf->empty()) { + return srs_success; + } + + payload = buf->head(); + nn_payload = buf->left(); + + return srs_success; +} + SrsRtpRawNALUs::SrsRtpRawNALUs() { cursor = 0; @@ -336,12 +404,10 @@ SrsRtpRawNALUs::SrsRtpRawNALUs() SrsRtpRawNALUs::~SrsRtpRawNALUs() { - if (true) { - int nn_nalus = (int)nalus.size(); - for (int i = 0; i < nn_nalus; i++) { - SrsSample* p = nalus[i]; - srs_freep(p); - } + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + srs_freep(p); } } @@ -436,6 +502,22 @@ srs_error_t SrsRtpRawNALUs::encode(SrsBuffer* buf) return srs_success; } +srs_error_t SrsRtpRawNALUs::decode(SrsBuffer* buf) +{ + if (buf->empty()) { + return srs_success; + } + + SrsSample* sample = new SrsSample(); + sample->bytes = buf->head(); + sample->size = buf->left(); + buf->skip(sample->size); + + nalus.push_back(sample); + + return srs_success; +} + SrsRtpSTAPPayload::SrsRtpSTAPPayload() { nri = (SrsAvcNaluType)0; @@ -491,6 +573,39 @@ srs_error_t SrsRtpSTAPPayload::encode(SrsBuffer* buf) return srs_success; } +srs_error_t SrsRtpSTAPPayload::decode(SrsBuffer* buf) +{ + if (!buf->require(1)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1); + } + + // STAP header, RTP payload format for aggregation packets + // @see https://tools.ietf.org/html/rfc6184#section-5.7 + uint8_t v = buf->read_1bytes(); + nri = SrsAvcNaluType(v & kNalTypeMask); + + // NALUs. + while (!buf->empty()) { + if (!buf->require(2)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2); + } + + int size = buf->read_2bytes(); + if (!buf->require(size)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", size); + } + + SrsSample* sample = new SrsSample(); + sample->bytes = buf->head(); + sample->size = size; + buf->skip(size); + + nalus.push_back(sample); + } + + return srs_success; +} + SrsRtpFUAPayload::SrsRtpFUAPayload() { start = end = false; @@ -555,6 +670,36 @@ srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf) return srs_success; } +srs_error_t SrsRtpFUAPayload::decode(SrsBuffer* buf) +{ + if (!buf->require(2)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2); + } + + // FU indicator, @see https://tools.ietf.org/html/rfc6184#section-5.8 + uint8_t v = buf->read_1bytes(); + nri = SrsAvcNaluType(v & kNalTypeMask); + + // FU header, @see https://tools.ietf.org/html/rfc6184#section-5.8 + v = buf->read_1bytes(); + start = v & kStart; + end = v & kEnd; + nalu_type = SrsAvcNaluType(v & 0x3f); + + if (!buf->require(1)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1); + } + + SrsSample* sample = new SrsSample(); + sample->bytes = buf->head(); + sample->size = buf->left(); + buf->skip(sample->size); + + nalus.push_back(sample); + + return srs_success; +} + SrsRtpFUAPayload2::SrsRtpFUAPayload2() { start = end = false; @@ -606,6 +751,33 @@ srs_error_t SrsRtpFUAPayload2::encode(SrsBuffer* buf) return srs_success; } +srs_error_t SrsRtpFUAPayload2::decode(SrsBuffer* buf) +{ + if (!buf->require(2)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2); + } + + // FU indicator, @see https://tools.ietf.org/html/rfc6184#section-5.8 + uint8_t v = buf->read_1bytes(); + nri = SrsAvcNaluType(v & (~kNalTypeMask)); + + // FU header, @see https://tools.ietf.org/html/rfc6184#section-5.8 + v = buf->read_1bytes(); + start = v & kStart; + end = v & kEnd; + nalu_type = SrsAvcNaluType(v & 0x3f); + + if (!buf->require(1)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1); + } + + payload = buf->head(); + size = buf->left(); + buf->skip(size); + + return srs_success; +} + SrsRtpPayloadHeader::SrsRtpPayloadHeader() { is_first_packet_of_frame = false; @@ -721,7 +893,7 @@ srs_error_t SrsRtpSharedPacket::create(SrsRtpHeader* rtp_h, SrsRtpPayloadHeader* this->rtp_payload_header = rtp_ph; // TODO: rtp header padding. - size_t buffer_size = rtp_header.header_size() + s; + int buffer_size = rtp_header.nb_bytes() + s; char* buffer = new char[buffer_size]; SrsBuffer stream(buffer, buffer_size); diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index 42fda10c0..d97abdb6e 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -31,6 +31,8 @@ #include <string> +class SrsRtpPacket2; + const int kRtpHeaderFixedSize = 12; const uint8_t kRtpMarker = 0x80; @@ -72,10 +74,9 @@ public: virtual ~SrsRtpHeader(); void reset(); public: - srs_error_t decode(SrsBuffer* stream); - srs_error_t encode(SrsBuffer* stream); -public: - size_t header_size(); + virtual srs_error_t decode(SrsBuffer* buf); + virtual srs_error_t encode(SrsBuffer* buf); + virtual int nb_bytes(); public: inline void set_marker(bool v) { marker = v; } bool get_marker() const { return marker; } @@ -92,16 +93,48 @@ public: uint8_t get_padding_length() const { return padding_length; } }; +class ISrsRtpPacketDecodeHandler +{ +public: + ISrsRtpPacketDecodeHandler(); + virtual ~ISrsRtpPacketDecodeHandler(); +public: + // We don't know the actual payload, so we depends on external handler. + virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload) = 0; +}; + class SrsRtpPacket2 { +// RTP packet fields. public: + // TODO: FIXME: Rename to header. SrsRtpHeader rtp_header; - ISrsEncoder* payload; + ISrsCodec* payload; + // TODO: FIXME: Merge into rtp_header. int padding; +// Decoder helper. +public: + // Helper information for decoder. + bool is_first_packet_of_frame; + bool is_last_packet_of_frame; + bool is_key_frame; + // The first byte as nalu type, for video decoder only. + SrsAvcNaluType nalu_type; + // The original payload bytes length. + int nn_original_payload; +// Decoder helper. +private: + // The original bytes for decoder only, we will free it. + char* original_bytes; + int nn_original_bytes; +// Fast cache for performance. private: + // Cache frequently used payload for performance. SrsRtpRawPayload* cache_raw; SrsRtpFUAPayload2* cache_fua; int cache_payload; + // The helper handler for decoder, use RAW payload if NULL. + ISrsRtpPacketDecodeHandler* decode_handler; public: SrsRtpPacket2(); virtual ~SrsRtpPacket2(); @@ -116,14 +149,19 @@ public: SrsRtpRawPayload* reuse_raw(); // Reuse the cached fua message as payload. SrsRtpFUAPayload2* reuse_fua(); + // Set the decode handler. + void set_decode_handler(ISrsRtpPacketDecodeHandler* h) { decode_handler = h; } + // Set the original bytes. + void set_original_bytes(char* buf, int nn_buf) { original_bytes = buf; nn_original_bytes = nn_buf; } // interface ISrsEncoder public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); + virtual srs_error_t decode(SrsBuffer* buf); }; // Single payload data. -class SrsRtpRawPayload : public ISrsEncoder +class SrsRtpRawPayload : public ISrsCodec { public: // The RAW payload, directly point to the shared memory. @@ -137,10 +175,11 @@ public: public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); + virtual srs_error_t decode(SrsBuffer* buf); }; // Multiple NALUs, automatically insert 001 between NALUs. -class SrsRtpRawNALUs : public ISrsEncoder +class SrsRtpRawNALUs : public ISrsCodec { private: // We will manage the samples, but the sample itself point to the shared memory. @@ -160,10 +199,11 @@ public: public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); + virtual srs_error_t decode(SrsBuffer* buf); }; // STAP-A, for multiple NALUs. -class SrsRtpSTAPPayload : public ISrsEncoder +class SrsRtpSTAPPayload : public ISrsCodec { public: // The NRI in NALU type. @@ -178,11 +218,12 @@ public: public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); + virtual srs_error_t decode(SrsBuffer* buf); }; // FU-A, for one NALU with multiple fragments. // With more than one payload. -class SrsRtpFUAPayload : public ISrsEncoder +class SrsRtpFUAPayload : public ISrsCodec { public: // The NRI in NALU type. @@ -201,11 +242,12 @@ public: public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); + virtual srs_error_t decode(SrsBuffer* buf); }; // FU-A, for one NALU with multiple fragments. // With only one payload. -class SrsRtpFUAPayload2 : public ISrsEncoder +class SrsRtpFUAPayload2 : public ISrsCodec { public: // The NRI in NALU type. @@ -224,6 +266,7 @@ public: public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); + virtual srs_error_t decode(SrsBuffer* buf); }; class SrsRtpPayloadHeader @@ -290,8 +333,8 @@ public: srs_error_t decode(char* buf, int nb_buf); SrsRtpSharedPacket* copy(); public: - char* rtp_payload() { return payload + rtp_header.header_size(); } - int rtp_payload_size() { return size - rtp_header.header_size() - rtp_header.get_padding_length(); } + char* rtp_payload() { return payload + rtp_header.nb_bytes(); } + int rtp_payload_size() { return size - rtp_header.nb_bytes() - rtp_header.get_padding_length(); } // Interface to modify rtp header public: srs_error_t modify_rtp_header_marker(bool marker);