From d2b8b937d69fb621e747d8e0f57802b889a448cb Mon Sep 17 00:00:00 2001 From: xialixin Date: Sun, 5 Apr 2020 13:53:14 +0800 Subject: [PATCH] fix generate ssrc, rtmp muxer cycle sleep, ps steam parase etc.. --- trunk/src/app/srs_app_gb28181.cpp | 106 +++++++++++++++++++++++------- trunk/src/app/srs_app_gb28181.hpp | 11 +++- 2 files changed, 91 insertions(+), 26 deletions(-) diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 1f17c7c24..c964c9ac9 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -50,11 +50,10 @@ using namespace std; #include #include - - //#define W_PS_FILE //#define W_VIDEO_FILE //#define W_AUDIO_FILE +//#define W_UNKONW_FILE SrsPsRtpPacket::SrsPsRtpPacket() { @@ -177,9 +176,20 @@ void SrsGb28181PsRtpProcessor::dispose() } cache_ps_rtp_packet.clear(); + clear_pre_packet(); + return; } +void SrsGb28181PsRtpProcessor::clear_pre_packet() +{ + map::iterator it; + for (it = pre_packet.begin(); it != pre_packet.end(); ++it) { + srs_freep(it->second); + } + pre_packet.clear(); +} + srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -227,15 +237,16 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const //get previous timestamp by ssrc uint32_t pre_timestamp = pre_packet[pre_pkt_key]->timestamp; - //uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number; + uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number; //TODO: check sequence number out of order //it may be out of order, or multiple streaming ssrc are the same - // if (pre_sequence_number > pkt.sequence_number){ - // srs_info("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, addr=%s,port=%s", - // pkt.ssrc, pre_sequence_number, pkt.sequence_number, address_string, port_string); - // //return err; - // } + if (pre_sequence_number + 1 != pkt.sequence_number && + pre_sequence_number != pkt.sequence_number){ + srs_warn("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s)", + pkt.ssrc, pre_sequence_number, pkt.sequence_number, address_string, port_string); + //return err; + } //copy header to cache cache_ps_rtp_packet[pkt_key]->copy(&pkt); @@ -257,7 +268,7 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const } if (pprint->can_print()) { - srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " gb28181: client_id %s, peer(ip=%s, port=%d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", + srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt.version, pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc, pkt.payload->length() @@ -556,15 +567,40 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_ } else { - srs_trace("gb28181: client_id %s, unkonw ps data %02x %02x %02x %02x\n", - channel_id.c_str(), next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]); + +#ifdef W_UNKONW_FILE + if (!unknow_fw.is_open()) { + std::string filename = "test_unknow_" + channel_id + ".mpg"; + unknow_fw.open(filename.c_str()); + } + unknow_fw.write(next_ps_pack, incomplete_len, NULL); +#endif + //TODO: fixme unkonw ps data parse + if (next_ps_pack + && next_ps_pack[0] == (char)0x00 + && next_ps_pack[1] == (char)0x00 + && next_ps_pack[2] == (char)0x00 + && next_ps_pack[3] == (char)0x01){ + //dahua's PS header may lose packets. It is sent by an RTP packet of Dahua's PS header + //dahua rtp send format: + //ts=1000 seq=1 mark=false payload= ps header + //ts=1000 seq=2 mark=false payload= video + //ts=1000 seq=3 mark=true payload= video + //ts=1000 seq=4 mark=true payload= audio + incomplete_len = ps_size - complete_len; + complete_len = complete_len + incomplete_len; + } + + srs_trace("gb28181: client_id %s, unkonw ps data (%#x/%u) %02x %02x %02x %02x\n", + channel_id.c_str(), ssrc, timestamp, + next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]); break; } } if (complete_len != ps_size){ - srs_trace("gb28181: client_id %s decode ps packet error! ps_size=%d complete=%d \n", - channel_id.c_str(), ps_size, complete_len); + srs_trace("gb28181: client_id %s decode ps packet error (%#x/%u)! ps_size=%d complete=%d \n", + channel_id.c_str(), ssrc, timestamp, ps_size, complete_len); }else if (hander && video_stream.length() && can_send_ps_av_packet()) { if ((err = hander->on_rtp_video(&video_stream, video_pts)) != srs_success) { return srs_error_wrap(err, "process ps video packet"); @@ -609,7 +645,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c) sip_auto_play = _srs_config->get_stream_caster_gb28181_sip_auto_play(c); sip_ack_timeout = _srs_config->get_stream_caster_gb28181_ack_timeout(c); sip_keepalive_timeout = _srs_config->get_stream_caster_gb28181_keepalive_timeout(c); - print_sip_message = _srs_config->get_stream_caster_gb28181_print_sip_message(c); + //print_sip_message = _srs_config->get_stream_caster_gb28181_print_sip_message(c); sip_invite_port_fixed = _srs_config->get_stream_caster_gb28181_sip_invite_port_fixed(c); } @@ -706,6 +742,8 @@ void SrsGb28181RtmpMuxer::set_channel_peer_ip(std::string ip) void SrsGb28181RtmpMuxer::set_channel_peer_port(int port) { if (channel->get_rtp_peer_port() == 0){ + channel->set_recv_time_str(srs_sip_get_utc_date()); + channel->set_recv_time(srs_get_system_time()); channel->set_rtp_peer_port(port); } } @@ -775,7 +813,10 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() } if (pprint->can_print()) { - srs_trace("gb28181: client id=%s, rtmp muxer is alive", channel_id.c_str()); + srs_trace("gb28181: client id=%s, ssrc=%#x, peer(%s, %d), rtmp muxer is alive", + channel_id.c_str(), channel->get_ssrc(), + channel->get_rtp_peer_ip().c_str(), + channel->get_rtp_peer_port()); } srs_utime_t now = srs_get_system_time(); @@ -785,7 +826,7 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() //the peer-port and peer-ip will be cleared and //other port data will be received again if (duration > (2 * SRS_UTIME_SECONDS) && channel->get_rtp_peer_port() != 0){ - srs_warn("gb28181: client id=%s ssrc=%#x, peer(ip=%s port=%d), no rtp data %d in seconds, clean it, wait other port!", + srs_warn("gb28181: client id=%s ssrc=%#x, peer(%s, %d), no rtp data %d in seconds, clean it, wait other port!", channel_id.c_str(), channel->get_ssrc(), channel->get_rtp_peer_ip().c_str(), channel->get_rtp_peer_port(), duration/SRS_UTIME_SECONDS); channel->set_rtp_peer_port(0); @@ -798,8 +839,11 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle() break; } - srs_cond_timedwait(wait_ps_queue, 5 * SRS_UTIME_MILLISECONDS); - //srs_usleep(1000 * 5); + if (ps_queue.empty()){ + srs_cond_timedwait(wait_ps_queue, 200 * SRS_UTIME_MILLISECONDS); + }else { + srs_cond_timedwait(wait_ps_queue, 10 * SRS_UTIME_MILLISECONDS); + } } return err; @@ -819,7 +863,7 @@ void SrsGb28181RtmpMuxer::ps_packet_enqueue(SrsPsRtpPacket *pkt) { srs_assert(pkt); - recv_stream_time = srs_update_system_time(); + recv_stream_time = srs_get_system_time(); //prevent consumers from being unable to process data //and accumulating in the queue @@ -1197,6 +1241,8 @@ SrsGb28181StreamChannel::SrsGb28181StreamChannel(){ rtp_peer_port = 0; rtp_peer_ip = ""; rtmp_url = ""; + recv_time = 0; + recv_time_str = ""; } SrsGb28181StreamChannel::~SrsGb28181StreamChannel() @@ -1219,6 +1265,10 @@ void SrsGb28181StreamChannel::copy(const SrsGb28181StreamChannel *s){ rtp_peer_port = s->get_rtp_peer_port(); rtmp_url = s->get_rtmp_url(); + + recv_time_str = s->get_recv_time_str(); + recv_time = s->get_recv_time(); + } void SrsGb28181StreamChannel::dumps(SrsJsonObject* obj) @@ -1235,6 +1285,8 @@ void SrsGb28181StreamChannel::dumps(SrsJsonObject* obj) obj->set("port_mode", SrsJsonAny::str(port_mode.c_str())); obj->set("rtp_peer_port", SrsJsonAny::integer(rtp_peer_port)); obj->set("rtp_peer_ip", SrsJsonAny::str(rtp_peer_ip.c_str())); + obj->set("recv_time", SrsJsonAny::integer(recv_time/SRS_UTIME_SECONDS)); + obj->set("recv_time_str", SrsJsonAny::str(recv_time_str.c_str())); } @@ -1316,10 +1368,12 @@ uint32_t SrsGb28181Manger::hash_code(std::string str) uint32_t SrsGb28181Manger::generate_ssrc(std::string id) { srand(uint(time(0))); - // TODO: SSRC rules can be customized, - //uint8_t index = uint8_t(rand() % (0x0F - 0x01 + 1) + 0x01); - //uint32_t ssrc = 0x00FFFFF0 & (hash_code(id) << 4) | index; - uint32_t ssrc = 0x00FFFFFF & (hash_code(id)); + // TODO: SSRC rules can be customized, + //gb28281 live ssrc max value 0999999999(3B9AC9FF) + //gb28281 vod ssrc max value 1999999999(773593FF) + uint8_t index = uint8_t(rand() % (0x0F - 0x01 + 1) + 0x01); + uint32_t ssrc = 0x2FFFF00 & (hash_code(id) << 8) | index; + //uint32_t ssrc = 0x00FFFFFF & (hash_code(id)); srs_trace("gb28181: generate ssrc id=%s, ssrc=%u", id.c_str(), ssrc); return ssrc; } @@ -1417,6 +1471,10 @@ void SrsGb28181Manger::remove(SrsGb28181RtmpMuxer* muxer) manager->remove(muxer); } +void SrsGb28181Manger::remove_sip_session(SrsGb28181SipSession* sess) +{ + manager->remove(sess); +} srs_error_t SrsGb28181Manger::start_ps_rtp_listen(std::string id, int port) { @@ -1654,8 +1712,6 @@ uint32_t SrsGb28181Manger::notify_sip_invite(std::string id, std::string ip, int //channel not exist SrsGb28181StreamChannel channel; channel.set_channel_id(id); - channel.set_app("live"); - channel.set_stream(id); int code = create_stream_channel(&channel); if (code != ERROR_SUCCESS){ return code; diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 945cf1c08..4600ba010 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -65,6 +65,7 @@ class SrsGb28181Config; class SrsGb28181PsRtpProcessor; class SrsGb28181SipService; class SrsGb28181StreamChannel; +class SrsGb28181SipSession; //ps rtp header packet parse class SrsPsRtpPacket: public SrsRtpPacket @@ -127,6 +128,7 @@ public: private: bool can_send_ps_av_packet(); void dispose(); + void clear_pre_packet(); // Interface ISrsUdpHandler public: virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); @@ -188,6 +190,7 @@ private: SrsFileWriter ps_fw; SrsFileWriter video_fw; SrsFileWriter audio_fw; + SrsFileWriter unknow_fw; bool first_keyframe_flag; bool wait_first_keyframe; @@ -330,6 +333,8 @@ private: int rtp_port; int rtmp_port; uint32_t ssrc; + srs_utime_t recv_time; + std::string recv_time_str; //send rtp stream client local port int rtp_peer_port; @@ -351,6 +356,8 @@ public: uint32_t get_rtp_peer_port() const { return rtp_peer_port; } std::string get_rtp_peer_ip() const { return rtp_peer_ip; } std::string get_rtmp_url() const { return rtmp_url; } + srs_utime_t get_recv_time() const { return recv_time; } + std::string get_recv_time_str() const { return recv_time_str; } void set_channel_id(const std::string &i) { channel_id = i; } void set_port_mode(const std::string &p) { port_mode = p; } @@ -363,6 +370,8 @@ public: void set_rtp_peer_ip( const std::string &p) { rtp_peer_ip = p; } void set_rtp_peer_port( const int &s) { rtp_peer_port = s;} void set_rtmp_url( const std::string &u) { rtmp_url = u; } + void set_recv_time( const srs_utime_t &u) { recv_time = u; } + void set_recv_time_str( const std::string &u) { recv_time_str = u; } void copy(const SrsGb28181StreamChannel *s); void dumps(SrsJsonObject* obj); @@ -431,7 +440,7 @@ public: public: void remove(SrsGb28181RtmpMuxer* conn); - + void remove_sip_session(SrsGb28181SipSession* sess); }; #endif