From c70ed1cd74eb4cb2862c1d31ebf3713003731ccb Mon Sep 17 00:00:00 2001 From: xialixin Date: Sun, 5 Apr 2020 13:49:47 +0800 Subject: [PATCH] add sip session status check --- trunk/src/app/srs_app_gb28181_sip.cpp | 226 ++++++++++++++++++++------ trunk/src/app/srs_app_gb28181_sip.hpp | 40 +++-- 2 files changed, 202 insertions(+), 64 deletions(-) diff --git a/trunk/src/app/srs_app_gb28181_sip.cpp b/trunk/src/app/srs_app_gb28181_sip.cpp index f734c648b..2c819ad17 100644 --- a/trunk/src/app/srs_app_gb28181_sip.cpp +++ b/trunk/src/app/srs_app_gb28181_sip.cpp @@ -47,11 +47,34 @@ using namespace std; #include +std::string srs_get_sip_session_status_str(SrsGb28181SipSessionStatusType status) +{ + switch(status){ + case SrsGb28181SipSessionRegisterOk: + return "RegisterOk"; + case SrsGb28181SipSessionAliveOk: + return "AliveOk"; + case SrsGb28181SipSessionInviteOk: + return "InviteOk"; + case SrsGb28181SipSessionTrying: + return "InviteTrying"; + case SrsGb28181SipSessionBye: + return "InviteBye"; + default: + return "Unknow"; + } +} + SrsGb28181SipSession::SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipRequest* r) { - caster = c; + servcie = c; req = new SrsSipRequest(); req->copy(r); + _session_id = req->sip_auth_id; + _reg_expires = 3600 * SRS_UTIME_SECONDS; + + trd = new SrsSTCoroutine("gb28181sip", this); + pprint = SrsPithyPrint::create_caster(); _register_status = SrsGb28181SipSessionUnkonw; _alive_status = SrsGb28181SipSessionUnkonw; @@ -59,9 +82,7 @@ SrsGb28181SipSession::SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipReques _register_time = 0; _alive_time = 0; _invite_time = 0; - _recv_rtp_time = 0; - _reg_expires = 0; - + _peer_ip = ""; _peer_port = 0; @@ -71,6 +92,103 @@ SrsGb28181SipSession::SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipReques SrsGb28181SipSession::~SrsGb28181SipSession() { srs_freep(req); + srs_freep(trd); + srs_freep(pprint); +} + +srs_error_t SrsGb28181SipSession::serve() +{ + srs_error_t err = srs_success; + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "gb28181sip"); + } + + return err; +} + +srs_error_t SrsGb28181SipSession::do_cycle() +{ + srs_error_t err = srs_success; + _register_time = srs_get_system_time(); + _alive_time = srs_get_system_time(); + _invite_time = srs_get_system_time(); + + while (true) { + + pprint->elapse(); + + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "gb28181 sip session cycle"); + } + + srs_utime_t now = srs_get_system_time(); + srs_utime_t reg_duration = now - _register_time; + srs_utime_t alive_duration = now - _alive_time; + srs_utime_t invite_duration = now - _invite_time; + SrsGb28181Config *config = servcie->get_config(); + + + if (_register_status == SrsGb28181SipSessionRegisterOk && + reg_duration > _reg_expires){ + srs_trace("gb28181: sip session=%s register expire", _session_id.c_str()); + break; + } + + if (_register_status == SrsGb28181SipSessionRegisterOk && + _alive_status == SrsGb28181SipSessionAliveOk && + alive_duration > config->sip_keepalive_timeout){ + srs_trace("gb28181: sip session=%s keepalive timeout", _session_id.c_str()); + break; + } + + if (_invite_status == SrsGb28181SipSessionTrying && + invite_duration > config->sip_ack_timeout){ + _invite_status == SrsGb28181SipSessionUnkonw; + } + + if (pprint->can_print()){ + srs_trace("gb28181: sip session=%s peer(%s, %d) status(%s,%s,%s) duration(%u,%u,%u)", + _session_id.c_str(), _peer_ip.c_str(), _peer_port, + srs_get_sip_session_status_str(_register_status).c_str(), + srs_get_sip_session_status_str(_alive_status).c_str(), + srs_get_sip_session_status_str(_invite_status).c_str(), + (reg_duration / SRS_UTIME_SECONDS), + (alive_duration / SRS_UTIME_SECONDS), + (invite_duration / SRS_UTIME_SECONDS)); + + //It is possible that the camera head keeps pushing and opening, + //and the duration will be very large. It will take 1 day to update + if (invite_duration > 24 * SRS_UTIME_HOURS){ + _invite_time = srs_get_system_time(); + } + } + srs_usleep(5* SRS_UTIME_SECONDS); + } + + return err; +} + +std::string SrsGb28181SipSession::remote_ip() +{ + return _peer_ip; +} + +srs_error_t SrsGb28181SipSession::cycle() +{ + srs_error_t err = do_cycle(); + + servcie->remove_session(_session_id); + srs_trace("gb28181: client id=%s sip session is remove", _session_id.c_str()); + + if (err == srs_success) { + srs_trace("gb28181: sip client finished."); + } else if (srs_is_client_gracefully_close(err)) { + srs_warn("gb28181: sip client disconnect code=%d", srs_error_code(err)); + srs_freep(err); + } + + return err; } //gb28181 sip Service @@ -92,6 +210,11 @@ SrsGb28181SipService::~SrsGb28181SipService() srs_freep(config); } +SrsGb28181Config* SrsGb28181SipService::get_config() +{ + return config; +} + void SrsGb28181SipService::set_stfd(srs_netfd_t fd) { lfd = fd; @@ -122,11 +245,8 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, { srs_error_t err = srs_success; - if (config->print_sip_message) - { - srs_trace("gb28181: request peer_ip=%s, peer_port=%d nbbuf=%d", peer_ip.c_str(), peer_port, nb_buf); - srs_trace("gb28181: request recv message=%s", buf); - } + srs_info("gb28181: request peer(%s, %d) nbbuf=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_info("gb28181: request recv message=%s", buf); if (nb_buf < 10) { return err; @@ -137,14 +257,7 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, if ((err = sip->parse_request(&req, buf, nb_buf)) != srs_success) { return srs_error_wrap(err, "parse sip request"); } - - if (config->print_sip_message) - { - srs_trace("gb28181: %s method=%s, uri=%s, version=%s ", - req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); - srs_trace("gb28181: request client id=%s", req->sip_auth_id.c_str()); - } - + req->peer_ip = peer_ip; req->peer_port = peer_port; SrsAutoFree(SrsSipRequest, req); @@ -154,21 +267,22 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, if (req->is_register()) { std::vector serial = srs_string_split(srs_string_replace(req->uri,"sip:", ""), "@"); if (serial.at(0) != config->sip_serial){ - srs_trace("gb28181: client:%s request serial and server serial inconformity(%s:%s)", + srs_warn("gb28181: client:%s request serial and server serial inconformity(%s:%s)", req->sip_auth_id.c_str(), serial.at(0).c_str(), config->sip_serial.c_str()); return err; } - srs_trace("gb28181: request peer_ip=%s, peer_port=%d", peer_ip.c_str(), peer_port, nb_buf); - srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", - req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); - srs_trace("gb28181: request client id=%s", req->sip_auth_id.c_str()); + srs_trace("gb28181: request client id=%s peer(%s, %d)", req->sip_auth_id.c_str(), peer_ip.c_str(), peer_port); + srs_trace("gb28181: request %s method=%s, uri=%s, version=%s expires=%d", + req->get_cmdtype_str().c_str(), req->method.c_str(), + req->uri.c_str(), req->version.c_str(), req->expires); - SrsGb28181SipSession* sip_session = create_sip_session(req); - if (!sip_session) { - srs_trace("gb28181: create sip session faild:%s", req->uri.c_str()); + SrsGb28181SipSession* sip_session = NULL; + if ((err = fetch_or_create_sip_session(req, &sip_session)) != srs_success) { + srs_error_wrap(err, "create sip session error!"); return err; } + srs_assert(sip_session); send_status(req, from, fromlen); sip_session->set_register_status(SrsGb28181SipSessionRegisterOk); @@ -187,8 +301,8 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, //reponse status send_status(req, from, fromlen); - sip_session->set_register_status(SrsGb28181SipSessionRegisterOk); - sip_session->set_register_time(srs_get_system_time()); + //sip_session->set_register_status(SrsGb28181SipSessionRegisterOk); + //sip_session->set_register_time(srs_get_system_time()); sip_session->set_alive_status(SrsGb28181SipSessionAliveOk); sip_session->set_alive_time(srs_get_system_time()); sip_session->set_sockaddr((sockaddr)*from); @@ -202,8 +316,10 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, sip_session->alive_status() == SrsGb28181SipSessionAliveOk && sip_session->invite_status() == SrsGb28181SipSessionUnkonw) { - //stop the possible stream and push a new stream - //send_bye(req, from, fromlen); + srs_trace("gb28181: request client id=%s, peer(%s, %d)", req->sip_auth_id.c_str(), + peer_ip.c_str(), peer_port); + srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", req->get_cmdtype_str().c_str(), + req->method.c_str(), req->uri.c_str(), req->version.c_str()); SrsGb28181StreamChannel ch; ch.set_channel_id(session_id); @@ -229,10 +345,10 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, }else if (req->is_invite()) { SrsGb28181SipSession* sip_session = fetch(session_id); - srs_trace("gb28181: request peer_ip=%s, peer_port=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_trace("gb28181: request client id=%s, peer(%s, %d)", req->sip_auth_id.c_str(), + peer_ip.c_str(), peer_port); srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); - srs_trace("gb28181: request client id=%s", req->sip_auth_id.c_str()); if (!sip_session){ srs_trace("gb28181: %s client not registered", req->sip_auth_id.c_str()); @@ -257,14 +373,14 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, sip_session->set_request(req); }else{ sip_session->set_invite_status(SrsGb28181SipSessionUnkonw); - sip_session->set_invite_time(0); + sip_session->set_invite_time(srs_get_system_time()); } }else if (req->is_bye()) { - srs_trace("gb28181: request peer_ip=%s, peer_port=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_trace("gb28181: request client id=%s, peer(%s, %d)", req->sip_auth_id.c_str(), + peer_ip.c_str(), peer_port); srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); - srs_trace("gb28181: request client id=%s", req->sip_auth_id.c_str()); - + SrsGb28181SipSession* sip_session = fetch(session_id); send_status(req, from, fromlen); @@ -277,7 +393,7 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, sip_session->set_sockaddr_len(fromlen); sip_session->set_invite_status(SrsGb28181SipSessionBye); - sip_session->set_invite_time(0); + sip_session->set_invite_time(srs_get_system_time()); }else{ srs_trace("gb28181: ingor request method=%s", req->method.c_str()); @@ -289,8 +405,7 @@ srs_error_t SrsGb28181SipService::on_udp_sip(string peer_ip, int peer_port, int SrsGb28181SipService::send_message(sockaddr* from, int fromlen, std::stringstream& ss) { std::string str = ss.str(); - if (config->print_sip_message) - srs_trace("gb28181: send_message:%s", str.c_str()); + srs_info("gb28181: send_message:%s", str.c_str()); srs_assert(!str.empty()); int ret = srs_sendto(lfd, (char*)str.c_str(), (int)str.length(), from, fromlen, SRS_UTIME_NO_TIMEOUT); @@ -402,11 +517,8 @@ int SrsGb28181SipService::send_bye(SrsSipRequest *req) } return ERROR_SUCCESS; - - } - int SrsGb28181SipService::send_sip_raw_data(SrsSipRequest *req, std::string data) { srs_assert(req); @@ -429,19 +541,25 @@ int SrsGb28181SipService::send_sip_raw_data(SrsSipRequest *req, std::string dat return ERROR_SUCCESS; } -SrsGb28181SipSession* SrsGb28181SipService::create_sip_session(SrsSipRequest *req) +srs_error_t SrsGb28181SipService::fetch_or_create_sip_session(SrsSipRequest *req, SrsGb28181SipSession** sip_session) { - SrsGb28181SipSession *sess = NULL; + srs_error_t err = srs_success; + + SrsGb28181SipSession* sess = NULL; + if ((sess = fetch(req->sip_auth_id)) != NULL) { + *sip_session = sess; + return err; + } - std::map::iterator it = sessions.find(req->sip_auth_id); - if (it == sessions.end()){ - sess = new SrsGb28181SipSession(this, req); - }else{ - return it->second; + sess = new SrsGb28181SipSession(this, req);; + if ((err = sess->serve()) != srs_success) { + return srs_error_wrap(err, "gb28181: sip serssion serve %s", req->sip_auth_id.c_str()); } - + sessions[req->sip_auth_id] = sess; - return sess; + *sip_session = sess; + + return err; } SrsGb28181SipSession* SrsGb28181SipService::fetch(std::string sid) @@ -458,7 +576,9 @@ void SrsGb28181SipService::remove_session(std::string sid) { std::map::iterator it = sessions.find(sid); if (it != sessions.end()){ - srs_freep(it->second); + //srs_freep(it->second); + //thread exit management by gb28181 manger + _srs_gb28181->remove_sip_session(it->second); sessions.erase(it); } } @@ -469,7 +589,9 @@ void SrsGb28181SipService::destroy() //destory all sip session std::map::iterator it; for (it = sessions.begin(); it != sessions.end(); ++it) { - srs_freep(it->second); + //srs_freep(it->second); + //thread exit management by gb28181 manger + _srs_gb28181->remove_sip_session(it->second); } sessions.clear(); } diff --git a/trunk/src/app/srs_app_gb28181_sip.hpp b/trunk/src/app/srs_app_gb28181_sip.hpp index bed361cd6..d42d59e41 100644 --- a/trunk/src/app/srs_app_gb28181_sip.hpp +++ b/trunk/src/app/srs_app_gb28181_sip.hpp @@ -33,6 +33,7 @@ #include #include #include +#include class SrsConfDirective; @@ -50,12 +51,14 @@ enum SrsGb28181SipSessionStatusType{ SrsGb28181SipSessionBye = 5, }; -class SrsGb28181SipSession +class SrsGb28181SipSession: public ISrsCoroutineHandler, public ISrsConnection { private: //SrsSipRequest *req; - SrsGb28181SipService *caster; - std::string session_id; + SrsGb28181SipService *servcie; + std::string _session_id; + SrsCoroutine* trd; + SrsPithyPrint* pprint; private: SrsGb28181SipSessionStatusType _register_status; SrsGb28181SipSessionStatusType _alive_status; @@ -63,8 +66,7 @@ private: srs_utime_t _register_time; srs_utime_t _alive_time; srs_utime_t _invite_time; - srs_utime_t _recv_rtp_time; - int _reg_expires; + srs_utime_t _reg_expires; std::string _peer_ip; int _peer_port; @@ -73,6 +75,10 @@ private: int _fromlen; SrsSipRequest *req; +public: + SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipRequest* r); + virtual ~SrsGb28181SipSession(); + public: void set_register_status(SrsGb28181SipSessionStatusType s) { _register_status = s;} void set_alive_status(SrsGb28181SipSessionStatusType s) { _alive_status = s;} @@ -80,8 +86,8 @@ public: void set_register_time(srs_utime_t t) { _register_time = t;} void set_alive_time(srs_utime_t t) { _alive_time = t;} void set_invite_time(srs_utime_t t) { _invite_time = t;} - void set_recv_rtp_time(srs_utime_t t) { _recv_rtp_time = t;} - void set_reg_expires(int e) { _reg_expires = e;} + //void set_recv_rtp_time(srs_utime_t t) { _recv_rtp_time = t;} + void set_reg_expires(int e) { _reg_expires = e*SRS_UTIME_SECONDS;} void set_peer_ip(std::string i) { _peer_ip = i;} void set_peer_port(int o) { _peer_port = o;} void set_sockaddr(sockaddr f) { _from = f;} @@ -95,7 +101,7 @@ public: srs_utime_t register_time() { return _register_time;} srs_utime_t alive_time() { return _alive_time;} srs_utime_t invite_time() { return _invite_time;} - srs_utime_t recv_rtp_time() { return _recv_rtp_time;} + //srs_utime_t recv_rtp_time() { return _recv_rtp_time;} int reg_expires() { return _reg_expires;} std::string peer_ip() { return _peer_ip;} int peer_port() { return _peer_port;} @@ -103,10 +109,17 @@ public: int sockaddr_fromlen() { return _fromlen;} SrsSipRequest request() { return *req;} -public: - SrsGb28181SipSession(SrsGb28181SipService *c, SrsSipRequest* r); - virtual ~SrsGb28181SipSession(); + std::string session_id() { return _session_id;} +public: + virtual srs_error_t serve(); + +// Interface ISrsOneCycleThreadHandler +public: + virtual srs_error_t cycle(); + virtual std::string remote_ip(); +private: + virtual srs_error_t do_cycle(); }; class SrsGb28181SipService : public ISrsUdpHandler @@ -153,9 +166,12 @@ public: // int send_sip_raw_data(SrsSipRequest *req, std::string data); - SrsGb28181SipSession* create_sip_session(SrsSipRequest *req); +public: + srs_error_t fetch_or_create_sip_session(SrsSipRequest *req, SrsGb28181SipSession** sess); SrsGb28181SipSession* fetch(std::string id); void remove_session(std::string id); + SrsGb28181Config* get_config(); + }; #endif