From 28bde1d448e7784ae4370775bf30f64efa6ed346 Mon Sep 17 00:00:00 2001 From: kyxlx550 Date: Sat, 11 Apr 2020 20:36:28 +0800 Subject: [PATCH] fix rtmp send timeout, first key frame wait --- trunk/src/app/srs_app_gb28181.cpp | 95 +++++++++++++++++++++---------- trunk/src/app/srs_app_gb28181.hpp | 14 +++-- 2 files changed, 73 insertions(+), 36 deletions(-) diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index fe3fa90da..6a2fe471b 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -268,7 +268,7 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const } if (pprint->can_print()) { - srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", + srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt.version, pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc, pkt.payload->length() @@ -357,6 +357,7 @@ SrsPsStreamDemixer::SrsPsStreamDemixer(ISrsPsStreamHander *h, std::string id, bo audio_enable = a; wait_first_keyframe = k; channel_id = id; + first_keyframe_flag = false; } SrsPsStreamDemixer::~SrsPsStreamDemixer() @@ -404,7 +405,6 @@ int64_t SrsPsStreamDemixer::parse_ps_timestamp(const uint8_t* p) srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_t timestamp, uint32_t ssrc) { srs_error_t err = srs_success; - int complete_len = 0; int incomplete_len = ps_size; char *next_ps_pack = ps_data; @@ -589,8 +589,10 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_ //ts=1000 seq=4 mark=true payload= audio incomplete_len = ps_size - complete_len; complete_len = complete_len + incomplete_len; + } + first_keyframe_flag = false; srs_trace("gb28181: client_id %s, unkonw ps data (%#x/%u) %02x %02x %02x %02x\n", channel_id.c_str(), ssrc, timestamp, next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]); @@ -599,7 +601,7 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_ } if (complete_len != ps_size){ - srs_trace("gb28181: client_id %s decode ps packet error (%#x/%u)! ps_size=%d complete=%d \n", + srs_trace("gb28181: client_id %s decode ps packet error (%#x/%u)! ps_size=%d complete=%d \n", channel_id.c_str(), ssrc, timestamp, ps_size, complete_len); }else if (hander && video_stream.length() && can_send_ps_av_packet()) { if ((err = hander->on_rtp_video(&video_stream, video_pts)) != srs_success) { @@ -646,6 +648,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c) sip_ack_timeout = _srs_config->get_stream_caster_gb28181_ack_timeout(c); sip_keepalive_timeout = _srs_config->get_stream_caster_gb28181_keepalive_timeout(c); sip_invite_port_fixed = _srs_config->get_stream_caster_gb28181_sip_invite_port_fixed(c); + sip_query_catalog_interval = _srs_config->get_stream_caster_gb28181_sip_query_catalog_interval(c); } SrsGb28181Config::~SrsGb28181Config() @@ -653,7 +656,6 @@ SrsGb28181Config::~SrsGb28181Config() } - //SrsGb28181RtmpMuxer gb28181 rtmp muxer, process ps stream to rtmp SrsGb28181RtmpMuxer::SrsGb28181RtmpMuxer(SrsGb28181Manger* c, std::string id, bool a, bool k) { @@ -675,7 +677,8 @@ SrsGb28181RtmpMuxer::SrsGb28181RtmpMuxer(SrsGb28181Manger* c, std::string id, bo wait_ps_queue = srs_cond_new(); stream_idle_timeout = -1; - recv_stream_time = 0; + recv_rtp_stream_time = 0; + send_rtmp_stream_time = 0; _rtmp_url = ""; @@ -768,7 +771,7 @@ std::string SrsGb28181RtmpMuxer::rtmp_url() srs_utime_t SrsGb28181RtmpMuxer::get_recv_stream_time() { - return recv_stream_time; + return recv_rtp_stream_time; } @@ -785,7 +788,8 @@ void SrsGb28181RtmpMuxer::destroy() srs_error_t SrsGb28181RtmpMuxer::do_cycle() { srs_error_t err = srs_success; - recv_stream_time = srs_get_system_time(); + recv_rtp_stream_time = srs_get_system_time(); + send_rtmp_stream_time = srs_get_system_time(); //consume ps stream, and check status while (true) { @@ -819,7 +823,7 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() } srs_utime_t now = srs_get_system_time(); - srs_utime_t duration = now - recv_stream_time; + srs_utime_t duration = now - recv_rtp_stream_time; //if no RTP data is received within 2 seconds, //the peer-port and peer-ip will be cleared and @@ -831,13 +835,28 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() channel->set_rtp_peer_port(0); channel->set_rtp_peer_ip(""); } - + SrsGb28181Config config = gb28181_manger->get_gb28181_config(); if (duration > config.rtp_idle_timeout){ srs_trace("gb28181: client id=%s, stream idle timeout, stop!!!", channel_id.c_str()); break; } + //RTMP connection is about to timeout without receiving any data., + //waiting for the next time there is data automatically connected + //it is related to the following two parameter settings of the rtmp server + //the publish 1st packet timeout in srs_utime_t + //publish_1stpkt_timeout default 20000ms + //the publish normal packet timeout in srs_utime_t + //publish_normal_timeout default 5000ms + duration = now - send_rtmp_stream_time; + bool will_timeout = duration > (5 * SRS_UTIME_SECONDS); + if (will_timeout && sdk){ + srs_warn("gb28181: client id=%s RTMP connection is about to time out without receiving any data", + channel_id.c_str()); + rtmp_close(); + } + if (ps_queue.empty()){ srs_cond_timedwait(wait_ps_queue, 200 * SRS_UTIME_MILLISECONDS); }else { @@ -862,7 +881,7 @@ void SrsGb28181RtmpMuxer::ps_packet_enqueue(SrsPsRtpPacket *pkt) { srs_assert(pkt); - recv_stream_time = srs_get_system_time(); + recv_rtp_stream_time = srs_get_system_time(); //prevent consumers from being unable to process data //and accumulating in the queue @@ -923,7 +942,7 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_video(SrsSimpleStream *stream, int64_t f uint32_t pts = (uint32_t)(fpts / 90); srs_info("gb28181rtmpmuxer: on_rtp_video dts=%u", dts); - recv_stream_time = srs_get_system_time(); + SrsBuffer *avs = new SrsBuffer(stream->bytes(), stream->length()); SrsAutoFree(SrsBuffer, avs); @@ -1010,8 +1029,6 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_audio(SrsSimpleStream* stream, int64_t f return srs_error_wrap(err, "jitter"); } - recv_stream_time = srs_get_system_time(); - uint32_t dts = (uint32_t)(fdts / 90); // send each frame. @@ -1167,6 +1184,8 @@ srs_error_t SrsGb28181RtmpMuxer::rtmp_write_packet(char type, uint32_t timestamp SrsSharedPtrMessage* msg = NULL; + send_rtmp_stream_time = srs_get_system_time(); + if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) { return srs_error_wrap(err, "create message"); } @@ -1197,8 +1216,9 @@ srs_error_t SrsGb28181RtmpMuxer::connect() srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; sdk = new SrsSimpleRtmpClient(url, cto, sto); - + srs_trace("gb28181: rtmp connect url=%s", url.c_str()); + if ((err = sdk->connect()) != srs_success) { close(); return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); @@ -1368,8 +1388,8 @@ uint32_t SrsGb28181Manger::generate_ssrc(std::string id) { srand(uint(time(0))); // TODO: SSRC rules can be customized, - //gb28281 live ssrc max value 0999999999(3B9AC9FF) - //gb28281 vod ssrc max value 1999999999(773593FF) + //gb28181 live ssrc max value 0999999999(3B9AC9FF) + //gb28181 vod ssrc max value 1999999999(773593FF) uint8_t index = uint8_t(rand() % (0x0F - 0x01 + 1) + 0x01); uint32_t ssrc = 0x2FFFF00 & (hash_code(id) << 8) | index; //uint32_t ssrc = 0x00FFFFFF & (hash_code(id)); @@ -1660,7 +1680,7 @@ uint32_t SrsGb28181Manger::delete_stream_channel(std::string id) { //notify the device to stop streaming //if an internal sip service controlled channel - notify_sip_bye(id); + notify_sip_bye(id, id); SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id); if (muxer){ @@ -1673,7 +1693,7 @@ uint32_t SrsGb28181Manger::delete_stream_channel(std::string id) } -uint32_t SrsGb28181Manger::queue_stream_channel(std::string id, SrsJsonArray* arr) +uint32_t SrsGb28181Manger::query_stream_channel(std::string id, SrsJsonArray* arr) { if (!id.empty()){ SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id); @@ -1696,21 +1716,22 @@ uint32_t SrsGb28181Manger::queue_stream_channel(std::string id, SrsJsonArray* ar return ERROR_SUCCESS; } -uint32_t SrsGb28181Manger::notify_sip_invite(std::string id, std::string ip, int port, uint32_t ssrc) +uint32_t SrsGb28181Manger::notify_sip_invite(std::string id, std::string ip, int port, uint32_t ssrc, std::string chid) { if (!sip_service){ return ERROR_GB28181_SIP_NOT_RUN; } //if RTMP Muxer does not exist, you need to create - SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id); + std::string key = id+"@"+chid; + SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(key); if (!muxer){ //if there is an invalid parameter, the channel will be created automatically if (ip.empty() || port == 0 || ssrc == 0){ //channel not exist SrsGb28181StreamChannel channel; - channel.set_channel_id(id); + channel.set_channel_id(key); int code = create_stream_channel(&channel); if (code != ERROR_SUCCESS){ return code; @@ -1730,24 +1751,18 @@ uint32_t SrsGb28181Manger::notify_sip_invite(std::string id, std::string ip, int SrsSipRequest req; req.sip_auth_id = id; - return sip_service->send_invite(&req, ip, port, ssrc); - + return sip_service->send_invite(&req, ip, port, ssrc, chid); } -uint32_t SrsGb28181Manger::notify_sip_bye(std::string id) +uint32_t SrsGb28181Manger::notify_sip_bye(std::string id, std::string chid) { if (!sip_service){ return ERROR_GB28181_SIP_NOT_RUN; } - SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id); - if (muxer){ - muxer->rtmp_close(); - } - SrsSipRequest req; req.sip_auth_id = id; - return sip_service->send_bye(&req); + return sip_service->send_bye(&req, chid); } uint32_t SrsGb28181Manger::notify_sip_raw_data(std::string id, std::string data) @@ -1772,3 +1787,23 @@ uint32_t SrsGb28181Manger::notify_sip_unregister(std::string id) sip_service->remove_session(id); return ERROR_SUCCESS; } + +uint32_t SrsGb28181Manger::notify_sip_query_catalog(std::string id) +{ + if (!sip_service){ + return ERROR_GB28181_SIP_NOT_RUN; + } + + SrsSipRequest req; + req.sip_auth_id = id; + return sip_service->send_query_catalog(&req); +} + +uint32_t SrsGb28181Manger::query_sip_session(std::string id, SrsJsonArray* arr) +{ + if (!sip_service){ + return ERROR_GB28181_SIP_NOT_RUN; + } + + return sip_service->query_sip_session(id, arr); +} \ No newline at end of file diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index a7cde0b85..9a4607c59 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -218,7 +218,8 @@ private: SrsPithyPrint* pprint; SrsGb28181StreamChannel *channel; int stream_idle_timeout; - srs_utime_t recv_stream_time; + srs_utime_t recv_rtp_stream_time; + srs_utime_t send_rtmp_stream_time; private: std::string channel_id; std::string _rtmp_url; @@ -313,6 +314,7 @@ public: srs_utime_t sip_keepalive_timeout; bool sip_auto_play; bool sip_invite_port_fixed; + srs_utime_t sip_query_catalog_interval; public: SrsGb28181Config(SrsConfDirective* c); @@ -392,9 +394,7 @@ private: std::map rtmpmuxers_ssrc; std::map rtmpmuxers; SrsCoroutineManager* manager; - SrsGb28181SipService* sip_service; - public: SrsGb28181Manger(SrsConfDirective* c); virtual ~SrsGb28181Manger(); @@ -415,12 +415,14 @@ public: //stream channel api uint32_t create_stream_channel(SrsGb28181StreamChannel *channel); uint32_t delete_stream_channel(std::string id); - uint32_t queue_stream_channel(std::string id, SrsJsonArray* arr); + uint32_t query_stream_channel(std::string id, SrsJsonArray* arr); //sip api - uint32_t notify_sip_invite(std::string id, std::string ip, int port, uint32_t ssrc); - uint32_t notify_sip_bye(std::string id); + uint32_t notify_sip_invite(std::string id, std::string ip, int port, uint32_t ssrc, std::string chid); + uint32_t notify_sip_bye(std::string id, std::string chid); uint32_t notify_sip_raw_data(std::string id, std::string data); uint32_t notify_sip_unregister(std::string id); + uint32_t notify_sip_query_catalog(std::string id); + uint32_t query_sip_session(std::string id, SrsJsonArray* arr); private: void destroy();