diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index ff5285d32..ec0b7022b 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -412,7 +412,7 @@ srs_error_t SrsGb28181PsRtpProcessor::rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer muxer->get_channel_id().c_str(), ssrc, muxer->channel_peer_port(), peer_port); }else { //muxer->ps_packet_enqueue(pkt); - muxer->insert_jitterbuffer(pkt); + muxer->insert_jitterbuffer(pkt); }//end if (muxer->channel_peer_port() != peer_port) }//end if (muxer) @@ -593,19 +593,6 @@ int64_t SrsPsStreamDemixer::parse_ps_timestamp(const uint8_t* p) return val; } -bool SrsPsStreamDemixer::is_aac(){ - // SrsBuffer *avs = new SrsBuffer(stream->bytes(), stream->length()); - // SrsAutoFree(SrsBuffer, avs); - // if (!avs->empty()) { - // char* frame = NULL; - // int frame_size = 0; - // SrsRawAacStreamCodec codec; - // if ((err = aac->adts_demux(avs, &frame, &frame_size, codec)) != srs_success) { - // return srs_error_wrap(err, "demux adts"); - // } - return true; -} - srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_t timestamp, uint32_t ssrc) { srs_error_t err = srs_success; @@ -976,8 +963,13 @@ SrsGb28181RtmpMuxer::SrsGb28181RtmpMuxer(SrsGb28181Manger* c, std::string id, bo source_publish = true; jitter_buffer = new SrsPsJitterBuffer(id); + jitter_buffer_audio = new SrsPsJitterBuffer(id); + ps_buflen = 0; ps_buffer = NULL; + + ps_buflen_auido = 0; + ps_buffer_audio = NULL; } SrsGb28181RtmpMuxer::~SrsGb28181RtmpMuxer() @@ -988,7 +980,10 @@ SrsGb28181RtmpMuxer::~SrsGb28181RtmpMuxer() srs_cond_destroy(wait_ps_queue); srs_freep(jitter_buffer); + srs_freep(jitter_buffer_audio); srs_freepa(ps_buffer); + srs_freepa(ps_buffer_audio); + srs_freep(channel); srs_freep(ps_demixer); srs_freep(trd); @@ -1094,6 +1089,14 @@ srs_error_t SrsGb28181RtmpMuxer::initialize(SrsServer *s, SrsRequest* r) jitter_buffer->SetNackMode(kNack, -1, -1); jitter_buffer->SetNackSettings(250, 450, 0); + if (!jitter_buffer_audio) { + jitter_buffer_audio = new SrsPsJitterBuffer(channel_id); + } + + jitter_buffer_audio->SetDecodeErrorMode(kSelectiveErrors); + jitter_buffer_audio->SetNackMode(kNack, -1, -1); + jitter_buffer_audio->SetNackSettings(250, 450, 0); + if (!source_publish) return err; req = r; @@ -1146,6 +1149,17 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() }; } } + + if(jitter_buffer_audio->FoundFrame(cur_timestamp)){ + jitter_buffer_audio->GetPsFrame(&ps_buffer_audio, ps_buflen_auido, buffer_size, cur_timestamp); + + if (buffer_size > 0){ + if ((err = ps_demixer->on_ps_stream(ps_buffer_audio, buffer_size, cur_timestamp, 0)) != srs_success){ + srs_warn("gb28181: demix ps stream error:%s", srs_error_desc(err).c_str()); + srs_freep(err); + }; + } + } }else { //demix ps to h264/aac, to rtmp while(!ps_queue.empty()){ @@ -1224,8 +1238,39 @@ void SrsGb28181RtmpMuxer::stop() void SrsGb28181RtmpMuxer::insert_jitterbuffer(SrsPsRtpPacket *pkt) { + if (!pkt){ + return; + } + recv_rtp_stream_time = srs_get_system_time(); - jitter_buffer->InsertPacket(*pkt, pkt->payload->bytes(), pkt->payload->length(), NULL); + + char *payload = pkt->payload->bytes(); + + uint8_t p1 = (uint8_t)(payload[0]); + uint8_t p2 = (uint8_t)(payload[1]); + uint8_t p3 = (uint8_t)(payload[2]); + uint8_t p4 = (uint8_t)(payload[3]); + + + //check for rtp ps audio streaming + bool av_same_ts = true; + + if (p1 == 0x00 && p2 == 0x00 && p3 == 0x01 && p4 == 0xC0 && + ps_rtp_video_ts != pkt->timestamp) { + av_same_ts = false; + } + + //if audio and video are the same clock, + //if both audio and video use jitter_buffer, + //otherwise audio uses jitter_buffer_audio, and video uses jitter_buffer + if (av_same_ts){ + pkt->marker = false; + jitter_buffer->InsertPacket(*pkt, pkt->payload->bytes(), pkt->payload->length(), NULL); + ps_rtp_video_ts = pkt->timestamp; + }else { + jitter_buffer_audio->InsertPacket(*pkt, pkt->payload->bytes(), pkt->payload->length(), NULL); + } + //srs_cond_signal(wait_ps_queue); } diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 9a0f70ed6..b50685070 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -260,7 +260,6 @@ private: public: int64_t parse_ps_timestamp(const uint8_t* p); std::string get_ps_map_type_str(uint8_t); - bool is_aac(); virtual srs_error_t on_ps_stream(char* ps_data, int ps_size, uint32_t timestamp, uint32_t ssrc); }; @@ -303,8 +302,16 @@ private: SrsServer* server; SrsPsJitterBuffer *jitter_buffer; + SrsPsJitterBuffer *jitter_buffer_audio; + char *ps_buffer; + char *ps_buffer_audio; + int ps_buflen; + int ps_buflen_auido; + + uint32_t ps_rtp_video_ts; + uint32_t ps_rtp_audio_ts; bool source_publish; diff --git a/trunk/src/app/srs_app_gb28181_jitbuffer.cpp b/trunk/src/app/srs_app_gb28181_jitbuffer.cpp index 9e8f9c0c0..27a40b85e 100644 --- a/trunk/src/app/srs_app_gb28181_jitbuffer.cpp +++ b/trunk/src/app/srs_app_gb28181_jitbuffer.cpp @@ -306,11 +306,11 @@ PsFrameBufferEnum SrsPsFrameBuffer::InsertPacket(const VCMPacket& packet, const } //TODO: not check marker, check a complete frame with timestamp - // if (packet.markerBit && - // (last_packet_seq_num_ == -1 || - // IsNewerSequenceNumber(packet.seqNum, last_packet_seq_num_))) { - // last_packet_seq_num_ = packet.seqNum; - // } + if (packet.markerBit && + (last_packet_seq_num_ == -1 || + IsNewerSequenceNumber(packet.seqNum, last_packet_seq_num_))) { + last_packet_seq_num_ = packet.seqNum; + } // The insert operation invalidates the iterator |rit|. PacketIterator packet_list_it = packets_.insert(rit.base(), packet); diff --git a/trunk/src/app/srs_app_gb28181_sip.cpp b/trunk/src/app/srs_app_gb28181_sip.cpp index 68aa39a92..7137f8117 100644 --- a/trunk/src/app/srs_app_gb28181_sip.cpp +++ b/trunk/src/app/srs_app_gb28181_sip.cpp @@ -163,7 +163,8 @@ srs_error_t SrsGb28181SipSession::do_cycle() if (_register_status == SrsGb28181SipSessionRegisterOk && _alive_status == SrsGb28181SipSessionAliveOk) { - + std::list auto_play_list; + std::map::iterator it; for (it = _device_list.begin(); it != _device_list.end(); it++) { SrsGb28181Device *device = it->second; @@ -191,8 +192,15 @@ srs_error_t SrsGb28181SipSession::do_cycle() //offline or already invite device does not need to send invite if (device->device_status != "ON" || device->invite_status != SrsGb28181SipSessionUnkonw) continue; + + auto_play_list.push_back(chid); + }//end for (it) - + //auto send sip invite and create stream chennal + while(auto_play_list.size() > 0){ + std::string chid = auto_play_list.front(); + auto_play_list.pop_front(); + SrsGb28181StreamChannel ch; ch.set_channel_id(_session_id + "@" + chid); @@ -226,7 +234,7 @@ srs_error_t SrsGb28181SipSession::do_cycle() srs_trace("gb28181: %s clients device=%s send invite code=%d", _session_id.c_str(), chid.c_str(), code); - }//end for (it) + }//end while (auto_play_list.size()) }//end if (config) if (_register_status == SrsGb28181SipSessionRegisterOk && @@ -911,7 +919,7 @@ srs_error_t SrsGb28181SipService::fetch_or_create_sip_session(SrsSipRequest *req if ((sess = fetch(req->sip_auth_id)) != NULL) { *sip_session = sess; return err; - } + } sess = new SrsGb28181SipSession(this, req);; if ((err = sess->serve()) != srs_success) {