From 3e7536c49377b22894de4dcb6067803ec1a283fc Mon Sep 17 00:00:00 2001 From: cfw11 <34058899+cfw11@users.noreply.github.com> Date: Fri, 28 May 2021 21:19:05 +0800 Subject: [PATCH] GB28181: fix parse rtp-tcp failed (#2378) * fix parse rtp-tcp failed * fix parse rtp-tcp failed Co-authored-by: cfw --- .gitignore | 2 + trunk/conf/push.gb28181.conf | 7 +- trunk/src/app/srs_app_gb28181.cpp | 470 ++++-------------------------- trunk/src/app/srs_app_gb28181.hpp | 34 +-- 4 files changed, 71 insertions(+), 442 deletions(-) diff --git a/.gitignore b/.gitignore index 50715e164..f7e6541ff 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,5 @@ .idea .DS_Store +/cmake-build-debug/ +/CMakeLists.txt diff --git a/trunk/conf/push.gb28181.conf b/trunk/conf/push.gb28181.conf index c64a6dde0..0890dd94f 100644 --- a/trunk/conf/push.gb28181.conf +++ b/trunk/conf/push.gb28181.conf @@ -33,8 +33,8 @@ stream_caster { # 接收设备端rtp流的多路复用端口 listen 9000; # 多路复用端口类型,on为tcp,off为udp - # 默认:off - tcp_enable off; + # 默认:on + tcp_enable on; # rtp接收监听端口范围,最小值 rtp_port_min 58200; @@ -64,7 +64,8 @@ stream_caster { # 是否开启rtp缓冲 # 开启之后能有效解决rtp乱序等问题 - jitterbuffer_enable on; + # tcp模式建议关闭 + jitterbuffer_enable off; # 服务器主机号,可以域名或ip地址 # 也就是设备端将媒体发送的地址,如果是服务器是内外网 diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 3fd610428..0fe9ff637 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -202,6 +202,11 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const return on_rtp_packet(from, fromlen, buf, nb_buf); } } +srs_error_t SrsGb28181PsRtpProcessor::on_tcp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) +{ + on_udp_packet(from, fromlen, buf, nb_buf); +} + srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) { @@ -460,334 +465,6 @@ srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet_jitter(const sockaddr* from, return err; } -//SrsGb28181TcpPsRtpProcessor -SrsGb28181TcpPsRtpProcessor::SrsGb28181TcpPsRtpProcessor(SrsGb28181Config* c, std::string id) -{ - config = c; - pprint = SrsPithyPrint::create_caster(); - channel_id = id; -} - -SrsGb28181TcpPsRtpProcessor::~SrsGb28181TcpPsRtpProcessor() -{ - dispose(); - srs_freep(pprint); -} - -void SrsGb28181TcpPsRtpProcessor::dispose() -{ - map::iterator it2; - for (it2 = cache_ps_rtp_packet.begin(); it2 != cache_ps_rtp_packet.end(); ++it2) { - srs_freep(it2->second); - } - cache_ps_rtp_packet.clear(); - - clear_pre_packet(); - - return; -} - -void SrsGb28181TcpPsRtpProcessor::clear_pre_packet() -{ - map::iterator it; - for (it = pre_packet.begin(); it != pre_packet.end(); ++it) { - srs_freep(it->second); - } - pre_packet.clear(); -} - -srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp(char* buf, int nb_buf, std::string ip, int port) -{ - srs_error_t err = srs_success; - - if (config->jitterbuffer_enable) { - err = on_rtp_packet_jitter(buf, nb_buf, ip, port); - if (err != srs_success) { - srs_warn("SrsGb28181TcpPsRtpProcessor::on_rtp on_rtp_packet_jitter err"); - } - } - else { - return on_rtp_packet(buf, nb_buf, ip, port); - } - return err; -} - -srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp_packet(char* buf, int nb_buf, std::string ip, int port) -{ - srs_error_t err = srs_success; - bool completed = false; - - pprint->elapse(); - - char address_string[64] = {0}; - char port_string[16] = {0}; - /*if (getnameinfo(from, fromlen, - (char*)&address_string, sizeof(address_string), - (char*)&port_string, sizeof(port_string), - NI_NUMERICHOST | NI_NUMERICSERV)) { - return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address"); - }*/ - - //itoa(port, port_string, 10); - int peer_port = port;// atoi(port_string); - - if (true) { - SrsBuffer stream(buf, nb_buf); - SrsPsRtpPacket pkt; - - if ((err = pkt.decode(&stream)) != srs_success) { - return srs_error_wrap(err, "ps rtp decode error"); - } - - //TODO: fixme: the same device uses the same SSRC to send with different local ports - std::stringstream ss; - ss << pkt.ssrc << ":" << pkt.timestamp << ":" << port;// port_string; - std::string pkt_key = ss.str(); - - std::stringstream ss2; - ss2 << pkt.ssrc << ":" << port_string; - std::string pre_pkt_key = ss2.str(); - - if (pre_packet.find(pre_pkt_key) == pre_packet.end()) { - pre_packet[pre_pkt_key] = new SrsPsRtpPacket(); - pre_packet[pre_pkt_key]->copy(&pkt); - } - //cache pkt by ssrc and timestamp - if (cache_ps_rtp_packet.find(pkt_key) == cache_ps_rtp_packet.end()) { - cache_ps_rtp_packet[pkt_key] = new SrsPsRtpPacket(); - } - - //get previous timestamp by ssrc - uint32_t pre_timestamp = pre_packet[pre_pkt_key]->timestamp; - uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number; - - //TODO: check sequence number out of order - //it may be out of order, or multiple streaming ssrc are the same - if (((pre_sequence_number + 1) % 65536) != pkt.sequence_number && - pre_sequence_number != pkt.sequence_number) { - srs_warn("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s)", - pkt.ssrc, pre_sequence_number, pkt.sequence_number, ip.c_str(), port_string); - //return err; - } - - //copy header to cache - cache_ps_rtp_packet[pkt_key]->copy(&pkt); - //accumulate one frame of data, to payload cache - cache_ps_rtp_packet[pkt_key]->payload->append(pkt.payload); - - //detect whether it is a completed frame - if (pkt.marker) {// rtp maker is true, is a completed frame - completed = true; - } - else if (pre_timestamp != pkt.timestamp) { - //current timestamp is different from previous timestamp - //previous timestamp, is a completed frame - std::stringstream ss; - ss << pkt.ssrc << ":" << pre_timestamp << ":" << port_string; - pkt_key = ss.str(); - if (cache_ps_rtp_packet.find(pkt_key) != cache_ps_rtp_packet.end()) { - completed = true; - } - } - - if (pprint->can_print()) { - srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", - channel_id.c_str(), ip.c_str(), peer_port, nb_buf, pprint->age(), pkt.version, - pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc, - pkt.payload->length() - ); - } - - //current packet becomes previous packet - srs_freep(pre_packet[pre_pkt_key]); - pre_packet[pre_pkt_key] = new SrsPsRtpPacket(); - pre_packet[pre_pkt_key]->copy(&pkt);; - - if (!completed) { - return err; - } - //process completed frame data - //clear processed one ps frame - //on completed frame data rtp packet in muxer enqueue - map::iterator key = cache_ps_rtp_packet.find(pkt_key); - if (key != cache_ps_rtp_packet.end()) - { - SrsGb28181RtmpMuxer* muxer = NULL; - //First, search according to the channel_id. Otherwise, search according to the SSRC. - //Some channel_id are created by RTP pool, which are different ports. - //No channel_id are created by multiplexing ports, which are the same port - if (!channel_id.empty()) { - muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id); - } - else { - muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(pkt.ssrc); - } - - //auto crate channel - if (!muxer && config->auto_create_channel) { - //auto create channel generated id - std::stringstream ss, ss1; - ss << "chid" << pkt.ssrc; - std::string tmp_id = ss.str(); - - SrsGb28181StreamChannel channel; - channel.set_channel_id(tmp_id); - channel.set_port_mode(RTP_PORT_MODE_FIXED); - channel.set_ssrc(pkt.ssrc); - - srs_error_t err2 = srs_success; - if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success) { - srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str()); - srs_error_reset(err2); - }; - - muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id); - } - - if (muxer) { - //TODO: fixme: the same device uses the same SSRC to send with different local ports - //record the first peer port - muxer->set_channel_peer_port(peer_port); - muxer->set_channel_peer_ip(address_string); - //not the first peer port's non processing - if (muxer->channel_peer_port() != peer_port) { - srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d", - muxer->get_channel_id().c_str(), pkt.ssrc, muxer->channel_peer_port(), peer_port); - srs_freep(key->second); - } - else { - //put it in queue, wait for consumer to process, and then free - muxer->ps_packet_enqueue(key->second); - } - } - else { - //no consumer process it, discarded - srs_freep(key->second); - } - cache_ps_rtp_packet.erase(pkt_key); - } - } - return err; -} - -SrsGb28181RtmpMuxer* SrsGb28181TcpPsRtpProcessor::create_rtmpmuxer(std::string channel_id, uint32_t ssrc) -{ - if (true) { - SrsGb28181RtmpMuxer* muxer = NULL; - //First, search according to the channel_id. Otherwise, search according to the SSRC. - //Some channel_id are created by RTP pool, which are different ports. - //No channel_id are created by multiplexing ports, which are the same port - if (!channel_id.empty()) { - muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id); - } - else { - muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(ssrc); - } - - //auto crate channel - if (!muxer && config->auto_create_channel) { - //auto create channel generated id - std::stringstream ss, ss1; - ss << "chid" << ssrc; - std::string tmp_id = ss.str(); - - SrsGb28181StreamChannel channel; - channel.set_channel_id(tmp_id); - channel.set_port_mode(RTP_PORT_MODE_FIXED); - channel.set_ssrc(ssrc); - - srs_error_t err2 = srs_success; - if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success) { - srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str()); - srs_error_reset(err2); - }; - - muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id); - } - - return muxer; - }//end if FoundFrame -} - -srs_error_t SrsGb28181TcpPsRtpProcessor::rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer *muxer, uint32_t ssrc, - int peer_port, std::string address_string, SrsPsRtpPacket *pkt) -{ - srs_error_t err = srs_success; - - if (!muxer) - return err; - - if (muxer) { - //TODO: fixme: the same device uses the same SSRC to send with different local ports - //record the first peer port - muxer->set_channel_peer_port(peer_port); - muxer->set_channel_peer_ip(address_string); - //not the first peer port's non processing - if (muxer->channel_peer_port() != peer_port) { - srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d", - muxer->get_channel_id().c_str(), ssrc, muxer->channel_peer_port(), peer_port); - } - else { - //muxer->ps_packet_enqueue(pkt); - muxer->insert_jitterbuffer(pkt); - }//end if (muxer->channel_peer_port() != peer_port) - }//end if (muxer) - - return err; -} - -srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp_packet_jitter(char* buf, int nb_buf, std::string ip, int port) -{ - srs_error_t err = srs_success; - - pprint->elapse(); - - char address_string[64] = {0}; - /*char port_string[16] = {0}; - if (getnameinfo(from, fromlen, - (char*)&address_string, sizeof(address_string), - (char*)&port_string, sizeof(port_string), - NI_NUMERICHOST | NI_NUMERICSERV)) { - return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address"); - }*/ - - //itoa(port, port_string, 10); - int peer_port = port;// atoi(port_string); - - if (true) { - SrsBuffer stream(buf, nb_buf); - SrsPsRtpPacket *pkt = new SrsPsRtpPacket();; - - if ((err = pkt->decode(&stream)) != srs_success) { - srs_freep(pkt); - return srs_error_wrap(err, "ps rtp decode error"); - } - - std::stringstream ss3; - ss3 << pkt->ssrc << ":" << port;// port_string; - std::string jitter_key = ss3.str(); - - pkt->completed = pkt->marker; - - - if (pprint->can_print()) { - srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " SrsGb28181TcpPsRtpProcessor::on_rtp_packet_jitter gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", - channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt->version, - pkt->payload_type, pkt->sequence_number, pkt->timestamp, pkt->ssrc, - pkt->payload->length() - ); - } - - SrsGb28181RtmpMuxer *muxer = create_rtmpmuxer(channel_id, pkt->ssrc); - if (muxer) { - rtmpmuxer_enqueue_data(muxer, pkt->ssrc, peer_port, ip, pkt); - } - - SrsAutoFree(SrsPsRtpPacket, pkt); - } - - return err; -} //ISrsPsStreamHander ps stream raw video/audio hander interface ISrsPsStreamHander::ISrsPsStreamHander() @@ -2830,9 +2507,10 @@ srs_error_t SrsGb28181Manger::query_device_list(std::string id, SrsJsonArray* ar return sip_service->query_device_list(id, arr); } - -#define SRS_RTSP_BUFFER 262144 -SrsGb28181Conn::SrsGb28181Conn(SrsGb28181Caster* c, srs_netfd_t fd, SrsGb28181TcpPsRtpProcessor *rtp_processor) +#define SRS_RTSP_BUFFER 8192 +#define RTP_TCP_HEADER 2 +#define MAX_PACKAGE_SIZE 1024 * 10 +SrsGb28181Conn::SrsGb28181Conn(SrsGb28181Caster* c, srs_netfd_t fd, SrsGb28181PsRtpProcessor *rtp_processor) { caster = c; stfd = fd; @@ -2877,90 +2555,64 @@ srs_error_t SrsGb28181Conn::do_cycle() { srs_error_t err = srs_success; - // retrieve ip of client. - int fd = srs_netfd_fileno(stfd); - std::string ip = srs_get_peer_ip(fd); - int port = srs_get_peer_port(fd); + // retrieve ip of client. + int fd = srs_netfd_fileno(stfd); + std::string ip = srs_get_peer_ip(fd); + int port = srs_get_peer_port(fd); + int addr_len = sizeof(sockaddr_in); + sockaddr_in *peer_sockaddr = (sockaddr_in*)malloc(addr_len); + peer_sockaddr->sin_family = AF_INET; //设置地址家族 + peer_sockaddr->sin_port = htons(port); //设置端口 + peer_sockaddr->sin_addr.s_addr = inet_addr(ip.c_str()); - if (ip.empty() && !_srs_config->empty_ip_ok()) { - srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd)); - } - srs_trace("rtsp: serve %s:%d", ip.c_str(), port); - - char* leftData = (char*)malloc(SRS_RTSP_BUFFER);; - uint32_t leftDataLength = 0; - int16_t length = 0; - char* pp = (char*)&length; - char* p = &(mbuffer[0]); - ssize_t nb_read = 0; - int16_t length2; - - // consume all rtp data. - while (true) { - if ((err = trd->pull()) != srs_success) { - free(leftData); - return srs_error_wrap(err, "rtsp cycle"); - } - - //memset(buffer, 0, SRS_RTSP_BUFFER); - nb_read = 0; - if ((err = skt->read(mbuffer + leftDataLength, SRS_RTSP_BUFFER - leftDataLength, &nb_read)) != srs_success) { - free(leftData); - return srs_error_wrap(err, "recv data"); - } - nb_read = nb_read + leftDataLength; - - pp = (char*)&length; - p = &(mbuffer[0]); - pp[1] = *p++; - pp[0] = *p++; + if (ip.empty() && !_srs_config->empty_ip_ok()) { + srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd)); + } + srs_trace("gb28181 new connect by rtp-tcp from: %s:%d", ip.c_str(), port); - if (nb_read < (length + 2)) {//Not enough one packet. - leftDataLength = leftDataLength + nb_read; - continue; - } + uint32_t left_data_len = 0; //缓存剩余数据 + ssize_t nb_read = 0; + uint16_t packet_len = 0; //rtp包长度 - memset(leftData, 0, SRS_RTSP_BUFFER); + // consume all rtp data. + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "rtsp cycle"); + } + nb_read = 0; + if ((err = skt->read(mbuffer + left_data_len, SRS_RTSP_BUFFER - left_data_len, &nb_read)) != srs_success) { + return srs_error_wrap(err, "recv data"); + } - while (length > 0) { - if ((length + 2) == nb_read) {//Only one packet. - nb_read = nb_read - 2; - processor->on_rtp(mbuffer + 2, nb_read, ip, port); - leftDataLength = 0; - break; - } - else { //multi packets. - pp = (char*)&length2; - p = &(mbuffer[length + 2]); - pp[1] = *p++; - pp[0] = *p++; - - processor->on_rtp(mbuffer + 2, length, ip, port); - - leftDataLength = nb_read - (length + 2); - nb_read = leftDataLength; - memcpy(leftData, mbuffer + length + 2, leftDataLength); - - pp = (char*)&length; - p = &(mbuffer[length + 2]); - pp[1] = *p++; - pp[0] = *p++; - - if (leftDataLength < (length + 2)) {//Not enough one packet. - memcpy(mbuffer, leftData, leftDataLength); - break; - } - else { - memcpy(mbuffer, leftData, leftDataLength); - } - } - } - } + left_data_len = nb_read + left_data_len; + char * buf = mbuffer; - free(leftData); + uint32_t index = 0; + for( ; index < left_data_len; ){ + if (index + RTP_TCP_HEADER >= left_data_len){ //less rtp package + break; + } + packet_len = (((uint8_t *) buf)[index] << 8) | ((uint8_t *) buf)[index + 1]; + if (packet_len > MAX_PACKAGE_SIZE){ + //FIXME 自动重新invite? + srs_error("abnormal RTP packet length:%d, close the tcp conn:%s", packet_len, remote_ip().c_str()); + return err; + } + if (index + RTP_TCP_HEADER + packet_len >= left_data_len){ + break; + } + processor->on_tcp_packet((sockaddr*)peer_sockaddr, addr_len, buf + index + RTP_TCP_HEADER, packet_len); + index = index + RTP_TCP_HEADER + packet_len; + } + if (index != 0) { //update left data + left_data_len = left_data_len - index; + memmove(mbuffer, buf + index, left_data_len); + } - return err; + } + free(peer_sockaddr); + return err; } srs_error_t SrsGb28181Conn::cycle() @@ -2996,7 +2648,7 @@ SrsGb28181Caster::SrsGb28181Caster(SrsConfDirective* c) // TODO: FIXME: support reload. output = _srs_config->get_stream_caster_output(c); config = new SrsGb28181Config(c); - rtp_processor = new SrsGb28181TcpPsRtpProcessor(config, ""); + rtp_processor = new SrsGb28181PsRtpProcessor(config, ""); manager = new SrsResourceManager("GB28181TCP", true); } diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 07bd0cfa2..04283bc27 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -94,7 +94,6 @@ class SrsSipRequest; class SrsGb28181RtmpMuxer; class SrsGb28181Config; class SrsGb28181PsRtpProcessor; -class SrsGb28181TcpPsRtpProcessor; class SrsGb28181SipService; class SrsGb28181StreamChannel; class SrsGb28181SipSession; @@ -176,37 +175,12 @@ private: // Interface ISrsUdpHandler public: virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); + virtual srs_error_t on_tcp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); public: virtual srs_error_t on_rtp_packet_jitter(const sockaddr* from, const int fromlen, char* buf, int nb_buf); virtual srs_error_t on_rtp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); }; -class SrsGb28181TcpPsRtpProcessor -{ -private: - SrsPithyPrint* pprint; - SrsGb28181Config* config; - std::map cache_ps_rtp_packet; - std::map pre_packet; - std::string channel_id; - bool auto_create_channel; -public: - SrsGb28181TcpPsRtpProcessor(SrsGb28181Config* c, std::string sid); - virtual ~SrsGb28181TcpPsRtpProcessor(); -private: - bool can_send_ps_av_packet(); - void dispose(); - void clear_pre_packet(); - SrsGb28181RtmpMuxer* create_rtmpmuxer(std::string channel_id, uint32_t ssrc); - srs_error_t rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer *muxer, uint32_t ssrc, - int peer_port, std::string address_string, SrsPsRtpPacket *pkt); - // Interface ISrsTcpHandler -public: - virtual srs_error_t on_rtp(char* buf, int nb_buf, std::string ip, int port); -public: - virtual srs_error_t on_rtp_packet_jitter(char* buf, int nb_buf, std::string ip, int port); - virtual srs_error_t on_rtp_packet(char* buf, int nb_buf, std::string ip, int port); -}; //ps stream processing parsing interface class ISrsPsStreamHander @@ -581,9 +555,9 @@ private: SrsRtspStack* rtsp; SrsGb28181Caster* caster; SrsCoroutine* trd; - SrsGb28181TcpPsRtpProcessor *processor; + SrsGb28181PsRtpProcessor *processor; public: - SrsGb28181Conn(SrsGb28181Caster* c, srs_netfd_t fd, SrsGb28181TcpPsRtpProcessor *rtp_processor); + SrsGb28181Conn(SrsGb28181Caster* c, srs_netfd_t fd, SrsGb28181PsRtpProcessor *rtp_processor); virtual ~SrsGb28181Conn(); public: virtual srs_error_t serve(); @@ -603,7 +577,7 @@ class SrsGb28181Caster : public ISrsTcpHandler private: std::string output; SrsGb28181Config *config; - SrsGb28181TcpPsRtpProcessor *rtp_processor; + SrsGb28181PsRtpProcessor *rtp_processor; private: std::vector clients; SrsResourceManager* manager;