diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 99631237f..bf2d99852 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -24,6 +24,8 @@ #include #include #include +#include +#include using namespace std; @@ -57,6 +59,7 @@ using namespace std; SrsPsRtpPacket::SrsPsRtpPacket() { + isFirstPacket = false; } SrsPsRtpPacket::~SrsPsRtpPacket() @@ -191,6 +194,15 @@ void SrsGb28181PsRtpProcessor::clear_pre_packet() } srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) +{ + if (config->jitterbuffer_enable){ + return on_rtp_packet_jitter(from, fromlen, buf, nb_buf); + }else{ + return on_rtp_packet(from, fromlen, buf, nb_buf); + } +} + +srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) { srs_error_t err = srs_success; bool completed = false; @@ -217,7 +229,6 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const } //TODO: fixme: the same device uses the same SSRC to send with different local ports - std::stringstream ss; ss << pkt.ssrc << ":" << pkt.timestamp << ":" << port_string; std::string pkt_key = ss.str(); @@ -241,7 +252,7 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const //TODO: check sequence number out of order //it may be out of order, or multiple streaming ssrc are the same - if (pre_sequence_number + 1 != pkt.sequence_number && + if (((pre_sequence_number + 1) % 65536) != pkt.sequence_number && pre_sequence_number != pkt.sequence_number){ srs_warn("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s)", pkt.ssrc, pre_sequence_number, pkt.sequence_number, address_string, port_string); @@ -283,7 +294,6 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const if (!completed){ return err; } - //process completed frame data //clear processed one ps frame //on completed frame data rtp packet in muxer enqueue @@ -291,7 +301,6 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const if(key != cache_ps_rtp_packet.end()) { SrsGb28181RtmpMuxer* muxer = NULL; - //First, search according to the channel_id. Otherwise, search according to the SSRC. //Some channel_id are created by RTP pool, which are different ports. //No channel_id are created by multiplexing ports, which are the same port @@ -346,6 +355,123 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const return err; } +SrsGb28181RtmpMuxer* SrsGb28181PsRtpProcessor::create_rtmpmuxer(std::string channel_id, uint32_t ssrc) +{ + if(true){ + SrsGb28181RtmpMuxer* muxer = NULL; + //First, search according to the channel_id. Otherwise, search according to the SSRC. + //Some channel_id are created by RTP pool, which are different ports. + //No channel_id are created by multiplexing ports, which are the same port + if (!channel_id.empty()){ + muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id); + }else { + muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(ssrc); + } + + //auto crate channel + if (!muxer && config->auto_create_channel){ + //auto create channel generated id + std::stringstream ss, ss1; + ss << "chid" << ssrc; + std::string tmp_id = ss.str(); + + SrsGb28181StreamChannel channel; + channel.set_channel_id(tmp_id); + channel.set_port_mode(RTP_PORT_MODE_FIXED); + channel.set_ssrc(ssrc); + + srs_error_t err2 = srs_success; + if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success){ + srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str()); + srs_error_reset(err2); + }; + + muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id); + } + + return muxer; + }//end if FoundFrame +} + +srs_error_t SrsGb28181PsRtpProcessor::rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer *muxer, uint32_t ssrc, + int peer_port, std::string address_string, SrsPsRtpPacket *pkt) +{ + srs_error_t err = srs_success; + + if (!muxer) + return err; + + if (muxer){ + //TODO: fixme: the same device uses the same SSRC to send with different local ports + //record the first peer port + muxer->set_channel_peer_port(peer_port); + muxer->set_channel_peer_ip(address_string); + //not the first peer port's non processing + if (muxer->channel_peer_port() != peer_port){ + srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d", + muxer->get_channel_id().c_str(), ssrc, muxer->channel_peer_port(), peer_port); + }else { + //muxer->ps_packet_enqueue(pkt); + muxer->insert_jitterbuffer(pkt); + }//end if (muxer->channel_peer_port() != peer_port) + }//end if (muxer) + + return err; +} + +srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet_jitter(const sockaddr* from, const int fromlen, char* buf, int nb_buf) +{ + srs_error_t err = srs_success; + bool completed = false; + + pprint->elapse(); + + char address_string[64]; + char port_string[16]; + if (getnameinfo(from, fromlen, + (char*)&address_string, sizeof(address_string), + (char*)&port_string, sizeof(port_string), + NI_NUMERICHOST|NI_NUMERICSERV)){ + return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address"); + } + + int peer_port = atoi(port_string); + + if (true) { + SrsBuffer stream(buf, nb_buf); + SrsPsRtpPacket *pkt = new SrsPsRtpPacket();; + + if ((err = pkt->decode(&stream)) != srs_success) { + srs_freep(pkt); + return srs_error_wrap(err, "ps rtp decode error"); + } + + std::stringstream ss3; + ss3 << pkt->ssrc << ":" << port_string; + std::string jitter_key = ss3.str(); + + pkt->completed = pkt->marker; + + + if (pprint->can_print()) { + srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", + channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt->version, + pkt->payload_type, pkt->sequence_number, pkt->timestamp, pkt->ssrc, + pkt->payload->length() + ); + } + + SrsGb28181RtmpMuxer *muxer = create_rtmpmuxer(channel_id, pkt->ssrc); + if (muxer){ + rtmpmuxer_enqueue_data(muxer, pkt->ssrc, peer_port, address_string, pkt); + } + + SrsAutoFree(SrsPsRtpPacket, pkt); + } + + return err; +} + //ISrsPsStreamHander ps stream raw video/audio hander interface ISrsPsStreamHander::ISrsPsStreamHander() { @@ -594,7 +720,6 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_ //ts=1000 seq=4 mark=true payload= audio incomplete_len = ps_size - complete_len; complete_len = complete_len + incomplete_len; - } first_keyframe_flag = false; @@ -644,6 +769,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c) wait_keyframe = _srs_config->get_stream_caster_gb28181_wait_keyframe(c); audio_enable = _srs_config->get_stream_caster_gb28181_audio_enable(c); auto_create_channel = _srs_config->get_stream_caster_gb28181_auto_create_channel(c); + jitterbuffer_enable = _srs_config->get_stream_caster_gb28181_jitterbuffer_enable(c); //sip config sip_enable = _srs_config->get_stream_caster_gb28181_sip_enable(c); @@ -692,14 +818,24 @@ SrsGb28181RtmpMuxer::SrsGb28181RtmpMuxer(SrsGb28181Manger* c, std::string id, bo h264_pps = ""; aac_specific_config = ""; + req = NULL; + server = NULL; + source = NULL; + source_publish = true; + + jitter_buffer = new SrsPsJitterBuffer(id); + ps_buffer = new char[1024*200]; } SrsGb28181RtmpMuxer::~SrsGb28181RtmpMuxer() { + close(); - destroy(); + srs_cond_destroy(wait_ps_queue); - + + srs_freep(jitter_buffer); + srs_freepa(ps_buffer); srs_freep(channel); srs_freep(ps_demixer); srs_freep(trd); @@ -707,6 +843,8 @@ SrsGb28181RtmpMuxer::~SrsGb28181RtmpMuxer() srs_freep(vjitter); srs_freep(ajitter); srs_freep(pprint); + + destroy(); } srs_error_t SrsGb28181RtmpMuxer::serve() @@ -791,12 +929,48 @@ void SrsGb28181RtmpMuxer::destroy() } } +srs_error_t SrsGb28181RtmpMuxer::initialize(SrsServer *s, SrsRequest* r) +{ + srs_error_t err = srs_success; + + if (!jitter_buffer) { + jitter_buffer = new SrsPsJitterBuffer(channel_id); + } + + jitter_buffer->SetDecodeErrorMode(kSelectiveErrors); + jitter_buffer->SetNackMode(kNack, -1, -1); + jitter_buffer->SetNackSettings(250, 450, 0); + + if (!source_publish) return err; + + req = r; + server = s; + + if ((err = _srs_sources->fetch_or_create(req, (ISrsSourceHandler*)server, &source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + //TODO: ??? + // if (!source->can_publish(false)) { + // return srs_error_new(ERROR_GB28181_SESSION_IS_EXIST, "stream %s busy", req->get_stream_url().c_str()); + // } + + if ((err = source->on_publish()) != srs_success) { + return srs_error_wrap(err, "on publish"); + } + + return err; +} + + srs_error_t SrsGb28181RtmpMuxer::do_cycle() { srs_error_t err = srs_success; recv_rtp_stream_time = srs_get_system_time(); send_rtmp_stream_time = srs_get_system_time(); - + uint32_t cur_timestamp = 0; + int buffer_size = 0; + //consume ps stream, and check status while (true) { @@ -806,19 +980,35 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() return srs_error_wrap(err, "gb28181 rtmp muxer cycle"); } - //demix ps to h264/aac, to rtmp - while(!ps_queue.empty()){ - SrsPsRtpPacket* pkt = ps_queue.front(); - if (pkt){ - if ((err = ps_demixer->on_ps_stream(pkt->payload->bytes(), - pkt->payload->length(), pkt->timestamp, pkt->ssrc)) != srs_success){ - srs_warn("gb28181: demix ps stream error:%s", srs_error_desc(err).c_str()); - srs_freep(err); - }; + SrsGb28181Config config = gb28181_manger->get_gb28181_config(); + + if (config.jitterbuffer_enable){ + + if(jitter_buffer->FoundFrame(cur_timestamp)){ + jitter_buffer->GetPsFrame(ps_buffer, buffer_size, cur_timestamp); + + if (buffer_size > 0){ + if ((err = ps_demixer->on_ps_stream(ps_buffer, buffer_size, cur_timestamp, 0)) != srs_success){ + srs_warn("gb28181: demix ps stream error:%s", srs_error_desc(err).c_str()); + srs_freep(err); + }; + } + } + }else { + //demix ps to h264/aac, to rtmp + while(!ps_queue.empty()){ + SrsPsRtpPacket* pkt = ps_queue.front(); + if (pkt){ + if ((err = ps_demixer->on_ps_stream(pkt->payload->bytes(), + pkt->payload->length(), pkt->timestamp, pkt->ssrc)) != srs_success){ + srs_warn("gb28181: demix ps stream error:%s", srs_error_desc(err).c_str()); + srs_freep(err); + }; + } + ps_queue.pop(); + //must be free pkt + srs_freep(pkt); } - ps_queue.pop(); - //must be free pkt - srs_freep(pkt); } if (pprint->can_print()) { @@ -842,7 +1032,7 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() channel->set_rtp_peer_ip(""); } - SrsGb28181Config config = gb28181_manger->get_gb28181_config(); + if (duration > config.rtp_idle_timeout){ srs_trace("gb28181: client id=%s, stream idle timeout, stop!!!", channel_id.c_str()); break; @@ -864,7 +1054,7 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() } if (ps_queue.empty()){ - srs_cond_timedwait(wait_ps_queue, 200 * SRS_UTIME_MILLISECONDS); + srs_cond_timedwait(wait_ps_queue, 20 * SRS_UTIME_MILLISECONDS); }else { srs_cond_timedwait(wait_ps_queue, 10 * SRS_UTIME_MILLISECONDS); } @@ -883,6 +1073,13 @@ void SrsGb28181RtmpMuxer::stop() close(); } + +void SrsGb28181RtmpMuxer::insert_jitterbuffer(SrsPsRtpPacket *pkt) +{ + recv_rtp_stream_time = srs_get_system_time(); + jitter_buffer->InsertPacket(*pkt, pkt->payload->bytes(), pkt->payload->length(), NULL); +} + void SrsGb28181RtmpMuxer::ps_packet_enqueue(SrsPsRtpPacket *pkt) { srs_assert(pkt); @@ -894,7 +1091,7 @@ void SrsGb28181RtmpMuxer::ps_packet_enqueue(SrsPsRtpPacket *pkt) uint32_t size = ps_queue.size(); if (size > 100){ srs_warn("gb28181: rtmpmuxer too much queue data, need to clear!!!"); - while(ps_queue.empty()) { + while(!ps_queue.empty()) { SrsPsRtpPacket* pkt = ps_queue.front(); ps_queue.pop(); srs_freep(pkt); @@ -929,14 +1126,16 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_video(SrsSimpleStream *stream, int64_t f { srs_error_t err = srs_success; - // ensure rtmp connected. - if ((err = connect()) != srs_success) { - //after the connection fails, need to clear flag - //and send the av header again next time - h264_sps = ""; - h264_pps = ""; - aac_specific_config = ""; - return srs_error_wrap(err, "connect"); + if (!source_publish){ + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + //after the connection fails, need to clear flag + //and send the av header again next time + h264_sps = ""; + h264_pps = ""; + aac_specific_config = ""; + return srs_error_wrap(err, "connect"); + } } if ((err = vjitter->correct(fpts)) != srs_success) { @@ -948,72 +1147,158 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_video(SrsSimpleStream *stream, int64_t f uint32_t pts = (uint32_t)(fpts / 90); srs_info("gb28181rtmpmuxer: on_rtp_video dts=%u", dts); + if (true) { + char *data = stream->bytes(); + int length = stream->length(); + + err = replace_startcode_with_nalulen(data, length, dts, pts); + } + return err; +} - SrsBuffer *avs = new SrsBuffer(stream->bytes(), stream->length()); - SrsAutoFree(SrsBuffer, avs); - // send each frame. - while (!avs->empty()) { - char* frame = NULL; - int frame_size = 0; - if ((err = avc->annexb_demux(avs, &frame, &frame_size)) != srs_success) { - return srs_error_wrap(err, "demux annexb"); +srs_error_t SrsGb28181RtmpMuxer::write_h264_ipb_frame2(char *frame, int frame_size, uint32_t pts, uint32_t dts) +{ + srs_error_t err = srs_success; + + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + // ignore the nalu type sei(6) aud(9) + if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter || + nal_unit_type == SrsAvcNaluTypeSEI) { + return err; + } + + // for sps + if (avc->is_sps(frame, frame_size)) { + std::string sps; + if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) { + return srs_error_wrap(err, "demux sps"); } - // 5bits, 7.3.1 NAL unit syntax, - // ISO_IEC_14496-10-AVC-2003.pdf, page 44. - // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame - SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); - - // ignore the nalu type sei(6) aud(9) - if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter || - nal_unit_type == SrsAvcNaluTypeSEI) { - continue; + if (h264_sps == sps) { + return err; } - - // for sps - if (avc->is_sps(frame, frame_size)) { - std::string sps; - if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) { - return srs_error_wrap(err, "demux sps"); - } - - if (h264_sps == sps) { - continue; - } - h264_sps = sps; - - if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { - return srs_error_wrap(err, "write sps/pps"); - } - continue; + h264_sps = sps; + + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + return err; + } + + // for pps + if (avc->is_pps(frame, frame_size)) { + std::string pps; + if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) { + return srs_error_wrap(err, "demux pps"); } - // for pps - if (avc->is_pps(frame, frame_size)) { - std::string pps; - if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) { - return srs_error_wrap(err, "demux pps"); - } - - if (h264_pps == pps) { - continue; - } - h264_pps = pps; - - if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { - return srs_error_wrap(err, "write sps/pps"); - } - continue; + if (h264_pps == pps) { + return err; } + h264_pps = pps; - // ibp frame. - srs_info("gb28181: demux avc ibp frame size=%d, dts=%d", frame_size, dts); - if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { - return srs_error_wrap(err, "write frame"); + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); } + return err; } - + + srs_info("gb28181: demux avc ibp frame size=%d, dts=%d", frame_size, dts); + if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { + return srs_error_wrap(err, "write frame"); + } + + return err; +} + + srs_error_t SrsGb28181RtmpMuxer::replace_startcode_with_nalulen(char *video_data, int &size, uint32_t pts, uint32_t dts) + { + srs_error_t err = srs_success; + + int index = 0; + std::list list_index; + + for(; index < size; index++){ + if (video_data[index] == 0x00 && video_data[index+1] == 0x00 && + video_data[index+2] == 0x00 && video_data[index+3] == 0x01){ + list_index.push_back(index); + } + + if (index > (size-4)) + break; + } + + if (list_index.size() == 1){ + int cur_pos = list_index.front(); + list_index.pop_front(); + + //0001xxxxxxxxxx + //xxxx0001xxxxxxx + uint32_t naluLen = size - cur_pos; + char *p = (char*)&naluLen; + + video_data[cur_pos] = p[3]; + video_data[cur_pos+1] = p[2]; + video_data[cur_pos+2] = p[1]; + video_data[cur_pos+3] = p[0]; + + char *frame = video_data + cur_pos + 4; + int frame_size = naluLen; + + err = write_h264_ipb_frame2(frame, frame_size, dts, pts); + + }else if (list_index.size() > 1){ + int pre_pos = list_index.front(); + list_index.pop_front(); + int first_pos = pre_pos; + + while(list_index.size() > 0){ + int cur_pos = list_index.front(); + list_index.pop_front(); + + //pre=========cur====================== + //0001xxxxxxxx0001xxxxxxxx0001xxxxxxxxx + //xxxxxxxxxxxx0001xxxxxxxx0001xxxxxxxxx + uint32_t naluLen = cur_pos - pre_pos - 4; + char *p = (char*)&naluLen; + + video_data[pre_pos] = p[3]; + video_data[pre_pos+1] = p[2]; + video_data[pre_pos+2] = p[1]; + video_data[pre_pos+3] = p[0]; + + char *frame = video_data + pre_pos + 4; + int frame_size = naluLen; + + pre_pos = cur_pos; + err = write_h264_ipb_frame2(frame, frame_size, dts, pts); + } + + //========================pre========== + //0001xxxxxxxx0001xxxxxxxx0001xxxxxxxxx + if (first_pos != pre_pos){ + + uint32_t naluLen = size - pre_pos - 4; + char *p = (char*)&naluLen; + + video_data[pre_pos] = p[3]; + video_data[pre_pos+1] = p[2]; + video_data[pre_pos+2] = p[1]; + video_data[pre_pos+3] = p[0]; + + char *frame = video_data + pre_pos + 4; + int frame_size = naluLen; + + err = write_h264_ipb_frame2(frame, frame_size, dts, pts); + } + }else{ + //xxxxxxxxxxxxxxxxxxx + char *frame = video_data; + int frame_size = size; + err = write_h264_ipb_frame2(frame, frame_size, dts, pts); + } + return err; } @@ -1021,16 +1306,18 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_audio(SrsSimpleStream* stream, int64_t f { srs_error_t err = srs_success; - // ensure rtmp connected. - if ((err = connect()) != srs_success) { - //after the connection fails, need to clear flag - //and send the av header again next time - h264_sps = ""; - h264_pps = ""; - aac_specific_config = ""; - return srs_error_wrap(err, "connect"); + if (!source_publish){ + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + //after the connection fails, need to clear flag + //and send the av header again next time + h264_sps = ""; + h264_pps = ""; + aac_specific_config = ""; + return srs_error_wrap(err, "connect"); + } } - + if ((err = ajitter->correct(fdts)) != srs_success) { return srs_error_wrap(err, "jitter"); } @@ -1110,6 +1397,10 @@ srs_error_t SrsGb28181RtmpMuxer::write_h264_sps_pps(uint32_t dts, uint32_t pts) { srs_error_t err = srs_success; + if (h264_sps == "" || h264_pps == ""){ + return err; + } + // h264 raw to h264 packet. std::string sh; if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) { @@ -1131,11 +1422,10 @@ srs_error_t SrsGb28181RtmpMuxer::write_h264_sps_pps(uint32_t dts, uint32_t pts) return srs_error_wrap(err, "write packet"); } - return err; } -srs_error_t SrsGb28181RtmpMuxer::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) +srs_error_t SrsGb28181RtmpMuxer::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts, bool writelen) { srs_error_t err = srs_success; @@ -1151,8 +1441,13 @@ srs_error_t SrsGb28181RtmpMuxer::write_h264_ipb_frame(char* frame, int frame_siz } std::string ibp; - if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { - return srs_error_wrap(err, "mux ibp frame"); + + if (writelen){ + if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { + return srs_error_wrap(err, "mux ibp frame"); + } + }else{ + ibp = string(frame, frame_size); } int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; @@ -1183,6 +1478,10 @@ srs_error_t SrsGb28181RtmpMuxer::write_audio_raw_frame(char* frame, int frame_si srs_error_t SrsGb28181RtmpMuxer::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) { srs_error_t err = srs_success; + + if (source_publish){ + return rtmp_write_packet_by_source(type, timestamp, data, size); + } if ((err = connect()) != srs_success) { return srs_error_wrap(err, "connect"); @@ -1202,6 +1501,41 @@ srs_error_t SrsGb28181RtmpMuxer::rtmp_write_packet(char type, uint32_t timestamp close(); return srs_error_wrap(err, "write message"); } + return err; +} + +srs_error_t SrsGb28181RtmpMuxer::rtmp_write_packet_by_source(char type, uint32_t timestamp, char* data, int size) +{ + srs_error_t err = srs_success; + + send_rtmp_stream_time = srs_get_system_time(); + + //create a source that will process stream without the need for internal rtmpclient + if (type == SrsFrameTypeAudio) { + SrsMessageHeader header; + header.message_type = RTMP_MSG_AudioMessage; + // TODO: FIXME: Maybe the tbn is not 90k. + header.timestamp = timestamp & 0x3fffffff; + + SrsCommonMessage* shared_video = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, shared_video); + + // TODO: FIXME: Check error. + shared_video->create(&header, data, size); + source->on_audio(shared_video); + }else if(type == SrsFrameTypeVideo) { + SrsMessageHeader header; + header.message_type = RTMP_MSG_VideoMessage; + // TODO: FIXME: Maybe the tbn is not 90k. + header.timestamp = timestamp & 0x3fffffff; + + SrsCommonMessage* shared_video = new SrsCommonMessage(); + SrsAutoFree(SrsCommonMessage, shared_video); + + // TODO: FIXME: Check error. + shared_video->create(&header, data, size); + source->on_video(shared_video); + } return err; } @@ -1248,6 +1582,10 @@ void SrsGb28181RtmpMuxer::close() h264_sps = ""; h264_pps = ""; aac_specific_config = ""; + + if (source_publish && !source){ + source->on_unpublish(); + } } void SrsGb28181RtmpMuxer::rtmp_close(){ @@ -1320,9 +1658,10 @@ void SrsGb28181StreamChannel::dumps(SrsJsonObject* obj) SrsGb28181Manger* _srs_gb28181 = NULL; //SrsGb28181Manger -SrsGb28181Manger::SrsGb28181Manger(SrsConfDirective* c) +SrsGb28181Manger::SrsGb28181Manger(SrsServer *s, SrsConfDirective* c) { // TODO: FIXME: support reload. + server = s; config = new SrsGb28181Config(c); manager = new SrsCoroutineManager(); } @@ -1330,11 +1669,10 @@ SrsGb28181Manger::SrsGb28181Manger(SrsConfDirective* c) SrsGb28181Manger::~SrsGb28181Manger() { used_ports.clear(); - + destroy(); + srs_freep(manager); srs_freep(config); - - destroy(); } srs_error_t SrsGb28181Manger::initialize() @@ -1403,7 +1741,7 @@ uint32_t SrsGb28181Manger::generate_ssrc(std::string id) return ssrc; } -srs_error_t SrsGb28181Manger::fetch_or_create_rtmpmuxer(std::string id, SrsGb28181RtmpMuxer** gb28181) +srs_error_t SrsGb28181Manger::fetch_or_create_rtmpmuxer(std::string id, SrsRequest *req, SrsGb28181RtmpMuxer** gb28181) { srs_error_t err = srs_success; @@ -1414,6 +1752,10 @@ srs_error_t SrsGb28181Manger::fetch_or_create_rtmpmuxer(std::string id, SrsGb28 } muxer = new SrsGb28181RtmpMuxer(this, id, config->audio_enable, config->wait_keyframe); + if ((err = muxer->initialize(server, req)) != srs_success) { + return srs_error_wrap(err, "gb28181: rtmp muxer initialize %s", id.c_str()); + } + if ((err = muxer->serve()) != srs_success) { return srs_error_wrap(err, "gb28181: rtmp muxer serve %s", id.c_str()); } @@ -1569,13 +1911,6 @@ srs_error_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *cha return err; } - //create on rtmp muxer, gb28181 stream to rtmp - - if ((err = fetch_or_create_rtmpmuxer(id, &muxer)) != srs_success){ - srs_warn("gb28181: create rtmp muxer error, %s", srs_error_desc(err).c_str()); - return err; - } - //Start RTP listening port, receive gb28181 stream, //fixed is mux port, //random is random allocation port @@ -1611,7 +1946,6 @@ srs_error_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *cha //of the string value of the id ssrc = generate_ssrc(id); } - rtmpmuxer_map_by_ssrc(muxer, ssrc); //generate RTMP push stream address, //if the app and stream in the API are empty, @@ -1621,6 +1955,8 @@ srs_error_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *cha string app = channel->get_app(); string stream = channel->get_stream(); + SrsRequest request; + if (true) { string tcUrl, stream_name; @@ -1670,8 +2006,19 @@ srs_error_t SrsGb28181Manger::create_stream_channel(SrsGb28181StreamChannel *cha channel->set_ip(config->host); std::string play_url = srs_generate_rtmp_url(config->host, rtmp_port, "", "", app, stream_name, ""); channel->set_rtmp_url(play_url); + + request.app = app; + request.stream = stream_name; + //request.vhost = config->host; } + //create on rtmp muxer, gb28181 stream to rtmp + if ((err = fetch_or_create_rtmpmuxer(id, &request, &muxer)) != srs_success){ + srs_warn("gb28181: create rtmp muxer error, %s", srs_error_desc(err).c_str()); + return err; + } + + rtmpmuxer_map_by_ssrc(muxer, ssrc); muxer->set_rtmp_url(url); srs_trace("gb28181: create new stream channel id:%s rtmp url=%s", id.c_str(), url.c_str());