From fe65c7bf8471ab5ca4f179fefc54f924c3bfacb2 Mon Sep 17 00:00:00 2001 From: yinjiaoyuan Date: Sun, 15 Nov 2020 22:50:59 +0800 Subject: [PATCH] For 2034, GB28181: Support transport over TCP --- AUTHORS.txt | 1 + trunk/conf/full.conf | 3 + trunk/conf/push.gb28181.conf | 3 + trunk/src/app/srs_app_config.cpp | 18 +- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_gb28181.cpp | 605 +++++++++++++++++++++++++- trunk/src/app/srs_app_gb28181.hpp | 80 ++++ trunk/src/app/srs_app_gb28181_sip.cpp | 2 +- trunk/src/app/srs_app_server.cpp | 64 ++- trunk/src/app/srs_app_server.hpp | 15 + trunk/src/protocol/srs_sip_stack.cpp | 58 ++- trunk/src/protocol/srs_sip_stack.hpp | 2 +- 12 files changed, 807 insertions(+), 46 deletions(-) diff --git a/AUTHORS.txt b/AUTHORS.txt index 28e13cb16..31b47a193 100644 --- a/AUTHORS.txt +++ b/AUTHORS.txt @@ -72,3 +72,4 @@ CONTRIBUTORS ordered by first contribution. * chenhaibo * jasongwq * xialixin@kanzhun.com +* yinjiaoyuan diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 79d596172..142dfc233 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -349,6 +349,9 @@ stream_caster { # @remark We can bundle all gb28181 to this port, to reuse this port. # User can choose to bundle port in API port_mode or SIP invite_port_fixed. listen 9000; + # Listen as TCP if on; otherwise, listen as UDP. + # default: off + tcp_enable off; # If not bundle ports, use specified ports for each stream. rtp_port_min 58200; rtp_port_max 58300; diff --git a/trunk/conf/push.gb28181.conf b/trunk/conf/push.gb28181.conf index a1b1a593c..64578d801 100644 --- a/trunk/conf/push.gb28181.conf +++ b/trunk/conf/push.gb28181.conf @@ -25,6 +25,9 @@ stream_caster { # 接收设备端rtp流的多路复用端口 listen 9000; + # 多路复用端口类型,on为tcp,off为udp + # 默认:off + tcp_enable on; # rtp接收监听端口范围,最小值 rtp_port_min 58200; diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index af774d152..a07b81d2b 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3746,7 +3746,7 @@ srs_error_t SrsConfig::check_normal_config() SrsConfDirective* conf = stream_caster->at(i); string n = conf->name; if (n != "enabled" && n != "caster" && n != "output" - && n != "listen" && n != "rtp_port_min" && n != "rtp_port_max" + && n != "listen" && n != "tcp_enable" && n != "rtp_port_min" && n != "rtp_port_max" && n != "rtp_idle_timeout" && n != "sip" && n != "audio_enable" && n != "wait_keyframe" && n != "jitterbuffer_enable" && n != "host" && n != "auto_create_channel") { @@ -4370,6 +4370,22 @@ int SrsConfig::get_stream_caster_listen(SrsConfDirective* conf) return ::atoi(conf->arg0().c_str()); } +bool SrsConfig::get_stream_caster_tcp_enable(SrsConfDirective* conf) +{ + static bool DEFAULT = false; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("tcp_enable"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + int SrsConfig::get_stream_caster_rtp_port_min(SrsConfDirective* conf) { static int DEFAULT = 0; diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index d1e3da877..485725912 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -499,6 +499,8 @@ public: virtual std::string get_stream_caster_output(SrsConfDirective* conf); // Get the listen port of stream caster. virtual int get_stream_caster_listen(SrsConfDirective* conf); + // Get the listen port type of stream caster. + virtual bool get_stream_caster_tcp_enable(SrsConfDirective* conf); // Get the min udp port for rtp of stream caster rtsp. virtual int get_stream_caster_rtp_port_min(SrsConfDirective* conf); // Get the max udp port for rtp of stream caster rtsp. diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index bbcd78db6..1c4f4bc70 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -472,6 +472,338 @@ srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet_jitter(const sockaddr* from, return err; } +//SrsGb28181TcpPsRtpProcessor +SrsGb28181TcpPsRtpProcessor::SrsGb28181TcpPsRtpProcessor(SrsGb28181Config* c, std::string id) +{ + config = c; + pprint = SrsPithyPrint::create_caster(); + channel_id = id; +} + +SrsGb28181TcpPsRtpProcessor::~SrsGb28181TcpPsRtpProcessor() +{ + dispose(); + srs_freep(pprint); +} + +void SrsGb28181TcpPsRtpProcessor::dispose() +{ + map::iterator it2; + for (it2 = cache_ps_rtp_packet.begin(); it2 != cache_ps_rtp_packet.end(); ++it2) { + srs_freep(it2->second); + } + cache_ps_rtp_packet.clear(); + + clear_pre_packet(); + + return; +} + +void SrsGb28181TcpPsRtpProcessor::clear_pre_packet() +{ + map::iterator it; + for (it = pre_packet.begin(); it != pre_packet.end(); ++it) { + srs_freep(it->second); + } + pre_packet.clear(); +} + +srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp(char* buf, int nb_buf, std::string ip, int port) +{ + srs_error_t err = srs_success; + + if (config->jitterbuffer_enable) { + err = on_rtp_packet_jitter(buf, nb_buf, ip, port); + if (err != srs_success) { + srs_warn("SrsGb28181TcpPsRtpProcessor::on_rtp on_rtp_packet_jitter err"); + } + } + else { + return on_rtp_packet(buf, nb_buf, ip, port); + } + return err; +} + +srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp_packet(char* buf, int nb_buf, std::string ip, int port) +{ + srs_error_t err = srs_success; + bool completed = false; + + pprint->elapse(); + + char address_string[64] = {0}; + char port_string[16] = {0}; + /*if (getnameinfo(from, fromlen, + (char*)&address_string, sizeof(address_string), + (char*)&port_string, sizeof(port_string), + NI_NUMERICHOST | NI_NUMERICSERV)) { + return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address"); + }*/ + + //itoa(port, port_string, 10); + + int peer_port = port;// atoi(port_string); + + if (true) { + SrsBuffer stream(buf, nb_buf); + SrsPsRtpPacket pkt; + + if ((err = pkt.decode(&stream)) != srs_success) { + return srs_error_wrap(err, "ps rtp decode error"); + } + + //TODO: fixme: the same device uses the same SSRC to send with different local ports + std::stringstream ss; + ss << pkt.ssrc << ":" << pkt.timestamp << ":" << port;// port_string; + std::string pkt_key = ss.str(); + + std::stringstream ss2; + ss2 << pkt.ssrc << ":" << port_string; + std::string pre_pkt_key = ss2.str(); + + if (pre_packet.find(pre_pkt_key) == pre_packet.end()) { + pre_packet[pre_pkt_key] = new SrsPsRtpPacket(); + pre_packet[pre_pkt_key]->copy(&pkt); + } + //cache pkt by ssrc and timestamp + if (cache_ps_rtp_packet.find(pkt_key) == cache_ps_rtp_packet.end()) { + cache_ps_rtp_packet[pkt_key] = new SrsPsRtpPacket(); + } + + //get previous timestamp by ssrc + uint32_t pre_timestamp = pre_packet[pre_pkt_key]->timestamp; + uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number; + + //TODO: check sequence number out of order + //it may be out of order, or multiple streaming ssrc are the same + if (((pre_sequence_number + 1) % 65536) != pkt.sequence_number && + pre_sequence_number != pkt.sequence_number) { + srs_warn("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s)", + pkt.ssrc, pre_sequence_number, pkt.sequence_number, ip.c_str(), port_string); + //return err; + } + + //copy header to cache + cache_ps_rtp_packet[pkt_key]->copy(&pkt); + //accumulate one frame of data, to payload cache + cache_ps_rtp_packet[pkt_key]->payload->append(pkt.payload); + + //detect whether it is a completed frame + if (pkt.marker) {// rtp maker is true, is a completed frame + completed = true; + } + else if (pre_timestamp != pkt.timestamp) { + //current timestamp is different from previous timestamp + //previous timestamp, is a completed frame + std::stringstream ss; + ss << pkt.ssrc << ":" << pre_timestamp << ":" << port_string; + pkt_key = ss.str(); + if (cache_ps_rtp_packet.find(pkt_key) != cache_ps_rtp_packet.end()) { + completed = true; + } + } + + if (pprint->can_print()) { + srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", + channel_id.c_str(), ip.c_str(), peer_port, nb_buf, pprint->age(), pkt.version, + pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc, + pkt.payload->length() + ); + } + + //current packet becomes previous packet + srs_freep(pre_packet[pre_pkt_key]); + pre_packet[pre_pkt_key] = new SrsPsRtpPacket(); + pre_packet[pre_pkt_key]->copy(&pkt);; + + if (!completed) { + return err; + } + //process completed frame data + //clear processed one ps frame + //on completed frame data rtp packet in muxer enqueue + map::iterator key = cache_ps_rtp_packet.find(pkt_key); + if (key != cache_ps_rtp_packet.end()) + { + SrsGb28181RtmpMuxer* muxer = NULL; + //First, search according to the channel_id. Otherwise, search according to the SSRC. + //Some channel_id are created by RTP pool, which are different ports. + //No channel_id are created by multiplexing ports, which are the same port + if (!channel_id.empty()) { + muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id); + } + else { + muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(pkt.ssrc); + } + + //auto crate channel + if (!muxer && config->auto_create_channel) { + //auto create channel generated id + std::stringstream ss, ss1; + ss << "chid" << pkt.ssrc; + std::string tmp_id = ss.str(); + + SrsGb28181StreamChannel channel; + channel.set_channel_id(tmp_id); + channel.set_port_mode(RTP_PORT_MODE_FIXED); + channel.set_ssrc(pkt.ssrc); + + srs_error_t err2 = srs_success; + if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success) { + srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str()); + srs_error_reset(err2); + }; + + muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id); + } + + if (muxer) { + //TODO: fixme: the same device uses the same SSRC to send with different local ports + //record the first peer port + muxer->set_channel_peer_port(peer_port); + muxer->set_channel_peer_ip(address_string); + //not the first peer port's non processing + if (muxer->channel_peer_port() != peer_port) { + srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d", + muxer->get_channel_id().c_str(), pkt.ssrc, muxer->channel_peer_port(), peer_port); + srs_freep(key->second); + } + else { + //put it in queue, wait for consumer to process, and then free + muxer->ps_packet_enqueue(key->second); + } + } + else { + //no consumer process it, discarded + srs_freep(key->second); + } + cache_ps_rtp_packet.erase(pkt_key); + } + } + return err; +} + +SrsGb28181RtmpMuxer* SrsGb28181TcpPsRtpProcessor::create_rtmpmuxer(std::string channel_id, uint32_t ssrc) +{ + if (true) { + SrsGb28181RtmpMuxer* muxer = NULL; + //First, search according to the channel_id. Otherwise, search according to the SSRC. + //Some channel_id are created by RTP pool, which are different ports. + //No channel_id are created by multiplexing ports, which are the same port + if (!channel_id.empty()) { + muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id); + } + else { + muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(ssrc); + } + + //auto crate channel + if (!muxer && config->auto_create_channel) { + //auto create channel generated id + std::stringstream ss, ss1; + ss << "chid" << ssrc; + std::string tmp_id = ss.str(); + + SrsGb28181StreamChannel channel; + channel.set_channel_id(tmp_id); + channel.set_port_mode(RTP_PORT_MODE_FIXED); + channel.set_ssrc(ssrc); + + srs_error_t err2 = srs_success; + if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success) { + srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str()); + srs_error_reset(err2); + }; + + muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id); + } + + return muxer; + }//end if FoundFrame +} + +srs_error_t SrsGb28181TcpPsRtpProcessor::rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer *muxer, uint32_t ssrc, + int peer_port, std::string address_string, SrsPsRtpPacket *pkt) +{ + srs_error_t err = srs_success; + + if (!muxer) + return err; + + if (muxer) { + //TODO: fixme: the same device uses the same SSRC to send with different local ports + //record the first peer port + muxer->set_channel_peer_port(peer_port); + muxer->set_channel_peer_ip(address_string); + //not the first peer port's non processing + if (muxer->channel_peer_port() != peer_port) { + srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d", + muxer->get_channel_id().c_str(), ssrc, muxer->channel_peer_port(), peer_port); + } + else { + //muxer->ps_packet_enqueue(pkt); + muxer->insert_jitterbuffer(pkt); + }//end if (muxer->channel_peer_port() != peer_port) + }//end if (muxer) + + return err; +} + +srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp_packet_jitter(char* buf, int nb_buf, std::string ip, int port) +{ + srs_error_t err = srs_success; + bool completed = false; + + pprint->elapse(); + + char address_string[64] = {0}; + char port_string[16] = {0}; + /*if (getnameinfo(from, fromlen, + (char*)&address_string, sizeof(address_string), + (char*)&port_string, sizeof(port_string), + NI_NUMERICHOST | NI_NUMERICSERV)) { + return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address"); + }*/ + + //itoa(port, port_string, 10); + + int peer_port = port;// atoi(port_string); + + if (true) { + SrsBuffer stream(buf, nb_buf); + SrsPsRtpPacket *pkt = new SrsPsRtpPacket();; + + if ((err = pkt->decode(&stream)) != srs_success) { + srs_freep(pkt); + return srs_error_wrap(err, "ps rtp decode error"); + } + + std::stringstream ss3; + ss3 << pkt->ssrc << ":" << port;// port_string; + std::string jitter_key = ss3.str(); + + pkt->completed = pkt->marker; + + + if (pprint->can_print()) { + srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " SrsGb28181TcpPsRtpProcessor::on_rtp_packet_jitter gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", + channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt->version, + pkt->payload_type, pkt->sequence_number, pkt->timestamp, pkt->ssrc, + pkt->payload->length() + ); + } + + SrsGb28181RtmpMuxer *muxer = create_rtmpmuxer(channel_id, pkt->ssrc); + if (muxer) { + rtmpmuxer_enqueue_data(muxer, pkt->ssrc, peer_port, ip, pkt); + } + + SrsAutoFree(SrsPsRtpPacket, pkt); + } + + return err; +} + //ISrsPsStreamHander ps stream raw video/audio hander interface ISrsPsStreamHander::ISrsPsStreamHander() { @@ -901,6 +1233,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c) host = get_host_candidate_ips(c); output = _srs_config->get_stream_caster_output(c); rtp_mux_port = _srs_config->get_stream_caster_listen(c); + rtp_mux_tcp_enable = _srs_config->get_stream_caster_tcp_enable(c); rtp_port_min = _srs_config->get_stream_caster_rtp_port_min(c); rtp_port_max = _srs_config->get_stream_caster_rtp_port_max(c); rtp_idle_timeout = _srs_config->get_stream_caster_gb28181_rtp_idle_timeout(c); @@ -2042,13 +2375,15 @@ void SrsGb28181Manger::rtmpmuxer_unmap_by_ssrc(uint32_t ssrc) void SrsGb28181Manger::destroy() { - //destory ps rtp listen - std::map::iterator it; - for (it = rtp_pool.begin(); it != rtp_pool.end(); ++it) { - SrsPsRtpListener* listener = it->second; - srs_freep(listener); - } - rtp_pool.clear(); + if (!config->rtp_mux_tcp_enable) { + //destory ps rtp listen + std::map::iterator it; + for (it = rtp_pool.begin(); it != rtp_pool.end(); ++it) { + SrsPsRtpListener* listener = it->second; + srs_freep(listener); + } + rtp_pool.clear(); + } //destory gb28181 muxer std::map::iterator it2; @@ -2092,17 +2427,19 @@ srs_error_t SrsGb28181Manger::start_ps_rtp_listen(std::string id, int port) return srs_error_wrap(err, "start rtp listen port rtmp muxer is null"); } - if (rtp_pool.find(port) == rtp_pool.end()) - { - SrsPsRtpListener* rtp = new SrsPsRtpListener(this->config, port, id); - rtp_pool[port] = rtp; - if ((err = rtp_pool[port]->listen()) != srs_success) { - stop_rtp_listen(id); - return srs_error_wrap(err, "rtp listen"); - } + if (!config->rtp_mux_tcp_enable) { + if (rtp_pool.find(port) == rtp_pool.end()) + { + SrsPsRtpListener* rtp = new SrsPsRtpListener(this->config, port, id); + rtp_pool[port] = rtp; + if ((err = rtp_pool[port]->listen()) != srs_success) { + stop_rtp_listen(id); + return srs_error_wrap(err, "rtp listen"); + } - srs_trace("gb28181: start rtp ps stream over server-port=%d", port); - } + srs_trace("gb28181: start rtp ps stream over server-port=%d", port); + } + } return err; } @@ -2122,12 +2459,13 @@ void SrsGb28181Manger::stop_rtp_listen(std::string id) return; } - map::iterator it2 = rtp_pool.find(port); - if (it2 != rtp_pool.end()){ - srs_freep(it2->second); - rtp_pool.erase(it2); - } - + if (!config->rtp_mux_tcp_enable) { + map::iterator it2 = rtp_pool.find(port); + if (it2 != rtp_pool.end()) { + srs_freep(it2->second); + rtp_pool.erase(it2); + } + } free_port(port, port+1); } @@ -2411,3 +2749,224 @@ srs_error_t SrsGb28181Manger::query_sip_session(std::string id, SrsJsonArray* ar return sip_service->query_sip_session(id, arr); } + + + + +#define SRS_RTSP_BUFFER 262144 +SrsGb28181Conn::SrsGb28181Conn(SrsGb28181Caster* c, srs_netfd_t fd, SrsGb28181TcpPsRtpProcessor *rtp_processor) +{ + caster = c; + stfd = fd; + skt = new SrsStSocket(); + rtsp = new SrsRtspStack(skt); + trd = new SrsSTCoroutine("rtsp", this); + mbuffer = (char*)malloc(SRS_RTSP_BUFFER); + processor = rtp_processor; +} + +SrsGb28181Conn::~SrsGb28181Conn() +{ + free(mbuffer); + srs_close_stfd(stfd); + + srs_freep(trd); + srs_freep(skt); + srs_freep(rtsp); +} + +srs_error_t SrsGb28181Conn::serve() +{ + srs_error_t err = srs_success; + + if ((err = skt->initialize(stfd)) != srs_success) { + return srs_error_wrap(err, "socket initialize"); + } + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "rtsp connection"); + } + return err; +} + +std::string SrsGb28181Conn::remote_ip() +{ + // TODO: FIXME: Implement it. + return ""; +} + +srs_error_t SrsGb28181Conn::do_cycle() +{ + srs_error_t err = srs_success; + + // retrieve ip of client. + int fd = srs_netfd_fileno(stfd); + std::string ip = srs_get_peer_ip(fd); + int port = srs_get_peer_port(fd); + + if (ip.empty() && !_srs_config->empty_ip_ok()) { + srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd)); + } + srs_trace("rtsp: serve %s:%d", ip.c_str(), port); + + char* leftData = (char*)malloc(SRS_RTSP_BUFFER);; + uint32_t leftDataLength = 0; + int16_t length = 0; + char* pp = (char*)&length; + char* p = &(mbuffer[0]); + ssize_t nb_read = 0; + int16_t length2; + + // consume all rtp data. + while (true) { + if ((err = trd->pull()) != srs_success) { + free(leftData); + return srs_error_wrap(err, "rtsp cycle"); + } + + //memset(buffer, 0, SRS_RTSP_BUFFER); + nb_read = 0; + if ((err = skt->read(mbuffer + leftDataLength, SRS_RTSP_BUFFER - leftDataLength, &nb_read)) != srs_success) { + free(leftData); + return srs_error_wrap(err, "recv data"); + } + + nb_read = nb_read + leftDataLength; + + length; + pp = (char*)&length; + p = &(mbuffer[0]); + pp[1] = *p++; + pp[0] = *p++; + + if (nb_read < (length + 2)) {//Not enough one packet. + leftDataLength = leftDataLength + nb_read; + continue; + } + + memset(leftData, 0, SRS_RTSP_BUFFER); + + while (length > 0) { + if ((length + 2) == nb_read) {//Only one packet. + nb_read = nb_read - 2; + processor->on_rtp(mbuffer + 2, nb_read, ip, port); + leftDataLength = 0; + break; + } + else { //multi packets. + pp = (char*)&length2; + p = &(mbuffer[length + 2]); + pp[1] = *p++; + pp[0] = *p++; + + processor->on_rtp(mbuffer + 2, length, ip, port); + + leftDataLength = nb_read - (length + 2); + nb_read = leftDataLength; + memcpy(leftData, mbuffer + length + 2, leftDataLength); + + pp = (char*)&length; + p = &(mbuffer[length + 2]); + pp[1] = *p++; + pp[0] = *p++; + + if (leftDataLength < (length + 2)) {//Not enough one packet. + memcpy(mbuffer, leftData, leftDataLength); + break; + } + else { + memcpy(mbuffer, leftData, leftDataLength); + } + } + } + } + + free(leftData); + + return err; +} + +srs_error_t SrsGb28181Conn::cycle() +{ + // serve the rtsp client. + srs_error_t err = do_cycle(); + + caster->remove(this); + + if (err == srs_success) { + srs_trace("client finished."); + } + else if (srs_is_client_gracefully_close(err)) { + srs_warn("client disconnect peer. code=%d", srs_error_code(err)); + srs_freep(err); + } + + return err; +} + +std::string SrsGb28181Conn::desc() +{ + return "GB28181TcpConn"; +} + +const SrsContextId& SrsGb28181Conn::get_id() +{ + return trd->cid(); +} + +SrsGb28181Caster::SrsGb28181Caster(SrsConfDirective* c) +{ + // TODO: FIXME: support reload. + output = _srs_config->get_stream_caster_output(c); + config = new SrsGb28181Config(c); + rtp_processor = new SrsGb28181TcpPsRtpProcessor(config, ""); + manager = new SrsResourceManager("GB28181TCP", true); +} + +SrsGb28181Caster::~SrsGb28181Caster() +{ + std::vector::iterator it; + for (it = clients.begin(); it != clients.end(); ++it) { + SrsGb28181Conn* conn = *it; + manager->remove(conn); + } + clients.clear(); + + srs_freep(manager); +} + +srs_error_t SrsGb28181Caster::initialize() +{ + srs_error_t err = srs_success; + if ((err = manager->start()) != srs_success) { + return srs_error_wrap(err, "start manager"); + } + return err; +} + +srs_error_t SrsGb28181Caster::on_tcp_client(srs_netfd_t stfd) +{ + srs_error_t err = srs_success; + + SrsGb28181Conn* conn = new SrsGb28181Conn(this, stfd, rtp_processor); + + if ((err = conn->serve()) != srs_success) { + srs_freep(conn); + return srs_error_wrap(err, "serve conn"); + } + + clients.push_back(conn); + + return err; +} + +void SrsGb28181Caster::remove(SrsGb28181Conn* conn) +{ + std::vector::iterator it = find(clients.begin(), clients.end(), conn); + if (it != clients.end()) { + clients.erase(it); + } + srs_info("rtsp: remove connection from caster."); + + manager->remove(conn); +} diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index aaadda8bb..cc9e1c304 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -95,6 +95,7 @@ class SrsSipRequest; class SrsGb28181RtmpMuxer; class SrsGb28181Config; class SrsGb28181PsRtpProcessor; +class SrsGb28181TcpPsRtpProcessor; class SrsGb28181SipService; class SrsGb28181StreamChannel; class SrsGb28181SipSession; @@ -103,6 +104,8 @@ class SrsServer; class SrsSource; class SrsRequest; class SrsResourceManager; +class SrsGb28181Conn; +class SrsGb28181Caster; //ps rtp header packet parse class SrsPsRtpPacket: public SrsRtpPacket @@ -178,6 +181,33 @@ public: virtual srs_error_t on_rtp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); }; +class SrsGb28181TcpPsRtpProcessor +{ +private: + SrsPithyPrint* pprint; + SrsGb28181Config* config; + std::map cache_ps_rtp_packet; + std::map pre_packet; + std::string channel_id; + bool auto_create_channel; +public: + SrsGb28181TcpPsRtpProcessor(SrsGb28181Config* c, std::string sid); + virtual ~SrsGb28181TcpPsRtpProcessor(); +private: + bool can_send_ps_av_packet(); + void dispose(); + void clear_pre_packet(); + SrsGb28181RtmpMuxer* create_rtmpmuxer(std::string channel_id, uint32_t ssrc); + srs_error_t rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer *muxer, uint32_t ssrc, + int peer_port, std::string address_string, SrsPsRtpPacket *pkt); + // Interface ISrsTcpHandler +public: + virtual srs_error_t on_rtp(char* buf, int nb_buf, std::string ip, int port); +public: + virtual srs_error_t on_rtp_packet_jitter(char* buf, int nb_buf, std::string ip, int port); + virtual srs_error_t on_rtp_packet(char* buf, int nb_buf, std::string ip, int port); +}; + //ps stream processing parsing interface class ISrsPsStreamHander { @@ -385,6 +415,7 @@ public: int rtp_port_min; int rtp_port_max; int rtp_mux_port; + bool rtp_mux_tcp_enable; bool auto_create_channel; bool jitterbuffer_enable; @@ -529,5 +560,54 @@ public: void remove_sip_session(SrsGb28181SipSession* sess); }; +// The gb28181 tcp connection serve the fd. +class SrsGb28181Conn : public ISrsCoroutineHandler, public ISrsConnection +{ +private: + char* mbuffer; + srs_netfd_t stfd; + SrsStSocket* skt; + SrsRtspStack* rtsp; + SrsGb28181Caster* caster; + SrsCoroutine* trd; + SrsGb28181TcpPsRtpProcessor *processor; +public: + SrsGb28181Conn(SrsGb28181Caster* c, srs_netfd_t fd, SrsGb28181TcpPsRtpProcessor *rtp_processor); + virtual ~SrsGb28181Conn(); +public: + virtual srs_error_t serve(); + virtual std::string remote_ip(); +private: + virtual srs_error_t do_cycle(); + // Interface ISrsOneCycleThreadHandler +public: + virtual srs_error_t cycle(); + virtual std::string desc(); + virtual const SrsContextId& get_id(); +}; + +// The caster for gb28181. +class SrsGb28181Caster : public ISrsTcpHandler +{ +private: + std::string output; + SrsGb28181Config *config; + SrsGb28181TcpPsRtpProcessor *rtp_processor; +private: + std::vector clients; + SrsResourceManager* manager; +public: + SrsGb28181Caster(SrsConfDirective* c); + virtual ~SrsGb28181Caster(); +public: + virtual srs_error_t initialize(); + // Interface ISrsTcpHandler +public: + virtual srs_error_t on_tcp_client(srs_netfd_t stfd); + // internal methods. +public: + virtual void remove(SrsGb28181Conn* conn); +}; + #endif diff --git a/trunk/src/app/srs_app_gb28181_sip.cpp b/trunk/src/app/srs_app_gb28181_sip.cpp index a5b7acb82..b4bf2a3be 100644 --- a/trunk/src/app/srs_app_gb28181_sip.cpp +++ b/trunk/src/app/srs_app_gb28181_sip.cpp @@ -699,7 +699,7 @@ srs_error_t SrsGb28181SipService::send_invite(SrsSipRequest *req, string ip, i req->from_realm = config->sip_realm; std::stringstream ss; - sip->req_invite(ss, req, ip, port, ssrc); + sip->req_invite(ss, req, ip, port, ssrc, config->rtp_mux_tcp_enable); sockaddr addr = sip_session->sockaddr_from(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index dee0e72b7..5c4aca1d3 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -373,6 +373,62 @@ SrsGb28181Listener::~SrsGb28181Listener() srs_freep(caster); } +SrsGb28181TcpListener::SrsGb28181TcpListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t) +{ + // the caller already ensure the type is ok, + // we just assert here for unknown stream caster. + srs_assert(type == SrsListenerGb28181RtpMux); + + caster = new SrsGb28181Caster(c); + listener = NULL; +} + +SrsGb28181TcpListener::~SrsGb28181TcpListener() +{ + srs_freep(caster); + srs_freep(listener); +} + +srs_error_t SrsGb28181TcpListener::listen(std::string i, int p) +{ + srs_error_t err = srs_success; + + // the caller already ensure the type is ok, + // we just assert here for unknown stream caster. + srs_assert(type == SrsListenerGb28181RtpMux); + + ip = i; + port = p; + + if ((err = caster->initialize()) != srs_success) { + return srs_error_wrap(err, "init caster"); + } + + srs_freep(listener); + listener = new SrsTcpListener(this, ip, port); + + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "rtsp listen %s:%d", ip.c_str(), port); + } + + string v = srs_listener_type2string(type); + + return err; +} + +srs_error_t SrsGb28181TcpListener::on_tcp_client(srs_netfd_t stfd) +{ + int fd = srs_netfd_fileno(stfd); + string ip = srs_get_peer_ip(fd); + + srs_error_t err = caster->on_tcp_client(stfd); + if (err != srs_success) { + srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + return srs_success; +} + #endif SrsSignalManager* SrsSignalManager::instance = NULL; @@ -1460,8 +1516,8 @@ srs_error_t SrsServer::listen_stream_caster() listener = new SrsRtspListener(this, SrsListenerRtsp, stream_caster); } else if (srs_stream_caster_is_flv(caster)) { listener = new SrsHttpFlvListener(this, SrsListenerFlv, stream_caster); - } else if (srs_stream_caster_is_gb28181(caster)) { #ifdef SRS_GB28181 + } else if (srs_stream_caster_is_gb28181(caster)) { //init global gb28181 manger if (_srs_gb28181 == NULL){ _srs_gb28181 = new SrsGb28181Manger(this, stream_caster); @@ -1478,7 +1534,11 @@ srs_error_t SrsServer::listen_stream_caster() } //gb28181 stream listener - listener = new SrsGb28181Listener(this, SrsListenerGb28181RtpMux, stream_caster); + if (!_srs_config->get_stream_caster_tcp_enable(stream_caster)) { + listener = new SrsGb28181Listener(this, SrsListenerGb28181RtpMux, stream_caster); + } else { + listener = new SrsGb28181TcpListener(this, SrsListenerGb28181RtpMux, stream_caster); + } #else srs_warn("gb28181 is disabled, please enable it by: ./configure --with-gb28181"); continue; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index f599c26b4..60a69d517 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -177,6 +177,21 @@ public: virtual ~SrsGb28181Listener(); }; +class SrsGb28181TcpListener : virtual public SrsListener, virtual public ISrsTcpHandler +{ +private: + SrsTcpListener* listener; + SrsGb28181Caster* caster; +public: + SrsGb28181TcpListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); + virtual ~SrsGb28181TcpListener(); +public: + virtual srs_error_t listen(std::string i, int p); +// Interface ISrsTcpHandler +public: + virtual srs_error_t on_tcp_client(srs_netfd_t stfd); +}; + #endif // Convert signal to io, diff --git a/trunk/src/protocol/srs_sip_stack.cpp b/trunk/src/protocol/srs_sip_stack.cpp index 30235a159..97004d226 100644 --- a/trunk/src/protocol/srs_sip_stack.cpp +++ b/trunk/src/protocol/srs_sip_stack.cpp @@ -842,7 +842,7 @@ void SrsSipStack::resp_status(stringstream& ss, SrsSipRequest *req) } -void SrsSipStack::req_invite(stringstream& ss, SrsSipRequest *req, string ip, int port, uint32_t ssrc) +void SrsSipStack::req_invite(stringstream& ss, SrsSipRequest *req, string ip, int port, uint32_t ssrc, bool tcpFlag) { /* //request: sip-agent <-------INVITE------ sip-server @@ -919,23 +919,45 @@ void SrsSipStack::req_invite(stringstream& ss, SrsSipRequest *req, string ip, in sprintf(_ssrc, "%010d", ssrc); std::stringstream sdp; - sdp << "v=0" << SRS_RTSP_CRLF - << "o=" << req->serial << " 0 0 IN IP4 " << ip << SRS_RTSP_CRLF - << "s=Play" << SRS_RTSP_CRLF - << "c=IN IP4 " << ip << SRS_RTSP_CRLF - << "t=0 0" << SRS_RTSP_CRLF - //TODO 97 98 99 current no support - //<< "m=video " << port <<" RTP/AVP 96 97 98 99" << SRS_RTSP_CRLF - << "m=video " << port <<" RTP/AVP 96" << SRS_RTSP_CRLF - << "a=recvonly" << SRS_RTSP_CRLF - << "a=rtpmap:96 PS/90000" << SRS_RTSP_CRLF - //TODO: current no support - //<< "a=rtpmap:97 MPEG4/90000" << SRS_RTSP_CRLF - //<< "a=rtpmap:98 H264/90000" << SRS_RTSP_CRLF - //<< "a=rtpmap:99 H265/90000" << SRS_RTSP_CRLF - //<< "a=streamMode:MAIN\r\n" - //<< "a=filesize:0\r\n" - << "y=" << _ssrc << SRS_RTSP_CRLF; + if (!tcpFlag){ + sdp << "v=0" << SRS_RTSP_CRLF + << "o=" << req->serial << " 0 0 IN IP4 " << ip << SRS_RTSP_CRLF + << "s=Play" << SRS_RTSP_CRLF + << "c=IN IP4 " << ip << SRS_RTSP_CRLF + << "t=0 0" << SRS_RTSP_CRLF + //TODO 97 98 99 current no support + //<< "m=video " << port <<" RTP/AVP 96 97 98 99" << SRS_RTSP_CRLF + << "m=video " << port <<" RTP/AVP 96" << SRS_RTSP_CRLF + << "a=recvonly" << SRS_RTSP_CRLF + << "a=rtpmap:96 PS/90000" << SRS_RTSP_CRLF + //TODO: current no support + //<< "a=rtpmap:97 MPEG4/90000" << SRS_RTSP_CRLF + //<< "a=rtpmap:98 H264/90000" << SRS_RTSP_CRLF + //<< "a=rtpmap:99 H265/90000" << SRS_RTSP_CRLF + //<< "a=streamMode:MAIN\r\n" + //<< "a=filesize:0\r\n" + << "y=" << _ssrc << SRS_RTSP_CRLF; + } else { + sdp << "v=0" << SRS_RTSP_CRLF + << "o=" << req->serial << " 0 0 IN IP4 " << ip << SRS_RTSP_CRLF + << "s=Play" << SRS_RTSP_CRLF + << "c=IN IP4 " << ip << SRS_RTSP_CRLF + << "t=0 0" << SRS_RTSP_CRLF + //TODO 97 98 99 current no support + //<< "m=video " << port <<" RTP/AVP 96 97 98 99" << SRS_RTSP_CRLF + //<< "m=video " << port <<" RTP/AVP 96" << SRS_RTSP_CRLF + << "m=video " << port << " TCP/RTP/AVP 96" << SRS_RTSP_CRLF + //<< "m=video " << port << " TCP/RTP/AVP 98" << SRS_RTSP_CRLF + << "a=recvonly" << SRS_RTSP_CRLF + << "a=rtpmap:96 PS/90000" << SRS_RTSP_CRLF + //TODO: current no support + //<< "a=rtpmap:97 MPEG4/90000" << SRS_RTSP_CRLF + //<< "a=rtpmap:98 H264/90000" << SRS_RTSP_CRLF + //<< "a=rtpmap:99 H265/90000" << SRS_RTSP_CRLF + //<< "a=streamMode:MAIN\r\n" + //<< "a=filesize:0\r\n" + << "y=" << _ssrc << SRS_RTSP_CRLF; + } std::stringstream from, to, uri; diff --git a/trunk/src/protocol/srs_sip_stack.hpp b/trunk/src/protocol/srs_sip_stack.hpp index a91897d18..18b726436 100644 --- a/trunk/src/protocol/srs_sip_stack.hpp +++ b/trunk/src/protocol/srs_sip_stack.hpp @@ -169,7 +169,7 @@ public: //request: request sent by the sip-server, wait for sip-agent response virtual void req_invite(std::stringstream& ss, SrsSipRequest *req, std::string ip, - int port, uint32_t ssrc); + int port, uint32_t ssrc, bool tcpFlag); virtual void req_ack(std::stringstream& ss, SrsSipRequest *req); virtual void req_bye(std::stringstream& ss, SrsSipRequest *req); virtual void req_401_unauthorized(std::stringstream& ss, SrsSipRequest *req);