|
|
|
@ -202,6 +202,11 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const
|
|
|
|
|
return on_rtp_packet(from, fromlen, buf, nb_buf);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
srs_error_t SrsGb28181PsRtpProcessor::on_tcp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf)
|
|
|
|
|
{
|
|
|
|
|
on_udp_packet(from, fromlen, buf, nb_buf);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf)
|
|
|
|
|
{
|
|
|
|
@ -460,334 +465,6 @@ srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet_jitter(const sockaddr* from,
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//SrsGb28181TcpPsRtpProcessor
|
|
|
|
|
SrsGb28181TcpPsRtpProcessor::SrsGb28181TcpPsRtpProcessor(SrsGb28181Config* c, std::string id)
|
|
|
|
|
{
|
|
|
|
|
config = c;
|
|
|
|
|
pprint = SrsPithyPrint::create_caster();
|
|
|
|
|
channel_id = id;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsGb28181TcpPsRtpProcessor::~SrsGb28181TcpPsRtpProcessor()
|
|
|
|
|
{
|
|
|
|
|
dispose();
|
|
|
|
|
srs_freep(pprint);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void SrsGb28181TcpPsRtpProcessor::dispose()
|
|
|
|
|
{
|
|
|
|
|
map<std::string, SrsPsRtpPacket*>::iterator it2;
|
|
|
|
|
for (it2 = cache_ps_rtp_packet.begin(); it2 != cache_ps_rtp_packet.end(); ++it2) {
|
|
|
|
|
srs_freep(it2->second);
|
|
|
|
|
}
|
|
|
|
|
cache_ps_rtp_packet.clear();
|
|
|
|
|
|
|
|
|
|
clear_pre_packet();
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void SrsGb28181TcpPsRtpProcessor::clear_pre_packet()
|
|
|
|
|
{
|
|
|
|
|
map<std::string, SrsPsRtpPacket*>::iterator it;
|
|
|
|
|
for (it = pre_packet.begin(); it != pre_packet.end(); ++it) {
|
|
|
|
|
srs_freep(it->second);
|
|
|
|
|
}
|
|
|
|
|
pre_packet.clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp(char* buf, int nb_buf, std::string ip, int port)
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
|
|
|
|
|
if (config->jitterbuffer_enable) {
|
|
|
|
|
err = on_rtp_packet_jitter(buf, nb_buf, ip, port);
|
|
|
|
|
if (err != srs_success) {
|
|
|
|
|
srs_warn("SrsGb28181TcpPsRtpProcessor::on_rtp on_rtp_packet_jitter err");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
return on_rtp_packet(buf, nb_buf, ip, port);
|
|
|
|
|
}
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp_packet(char* buf, int nb_buf, std::string ip, int port)
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
bool completed = false;
|
|
|
|
|
|
|
|
|
|
pprint->elapse();
|
|
|
|
|
|
|
|
|
|
char address_string[64] = {0};
|
|
|
|
|
char port_string[16] = {0};
|
|
|
|
|
/*if (getnameinfo(from, fromlen,
|
|
|
|
|
(char*)&address_string, sizeof(address_string),
|
|
|
|
|
(char*)&port_string, sizeof(port_string),
|
|
|
|
|
NI_NUMERICHOST | NI_NUMERICSERV)) {
|
|
|
|
|
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address");
|
|
|
|
|
}*/
|
|
|
|
|
|
|
|
|
|
//itoa(port, port_string, 10);
|
|
|
|
|
int peer_port = port;// atoi(port_string);
|
|
|
|
|
|
|
|
|
|
if (true) {
|
|
|
|
|
SrsBuffer stream(buf, nb_buf);
|
|
|
|
|
SrsPsRtpPacket pkt;
|
|
|
|
|
|
|
|
|
|
if ((err = pkt.decode(&stream)) != srs_success) {
|
|
|
|
|
return srs_error_wrap(err, "ps rtp decode error");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//TODO: fixme: the same device uses the same SSRC to send with different local ports
|
|
|
|
|
std::stringstream ss;
|
|
|
|
|
ss << pkt.ssrc << ":" << pkt.timestamp << ":" << port;// port_string;
|
|
|
|
|
std::string pkt_key = ss.str();
|
|
|
|
|
|
|
|
|
|
std::stringstream ss2;
|
|
|
|
|
ss2 << pkt.ssrc << ":" << port_string;
|
|
|
|
|
std::string pre_pkt_key = ss2.str();
|
|
|
|
|
|
|
|
|
|
if (pre_packet.find(pre_pkt_key) == pre_packet.end()) {
|
|
|
|
|
pre_packet[pre_pkt_key] = new SrsPsRtpPacket();
|
|
|
|
|
pre_packet[pre_pkt_key]->copy(&pkt);
|
|
|
|
|
}
|
|
|
|
|
//cache pkt by ssrc and timestamp
|
|
|
|
|
if (cache_ps_rtp_packet.find(pkt_key) == cache_ps_rtp_packet.end()) {
|
|
|
|
|
cache_ps_rtp_packet[pkt_key] = new SrsPsRtpPacket();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//get previous timestamp by ssrc
|
|
|
|
|
uint32_t pre_timestamp = pre_packet[pre_pkt_key]->timestamp;
|
|
|
|
|
uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number;
|
|
|
|
|
|
|
|
|
|
//TODO: check sequence number out of order
|
|
|
|
|
//it may be out of order, or multiple streaming ssrc are the same
|
|
|
|
|
if (((pre_sequence_number + 1) % 65536) != pkt.sequence_number &&
|
|
|
|
|
pre_sequence_number != pkt.sequence_number) {
|
|
|
|
|
srs_warn("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s)",
|
|
|
|
|
pkt.ssrc, pre_sequence_number, pkt.sequence_number, ip.c_str(), port_string);
|
|
|
|
|
//return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//copy header to cache
|
|
|
|
|
cache_ps_rtp_packet[pkt_key]->copy(&pkt);
|
|
|
|
|
//accumulate one frame of data, to payload cache
|
|
|
|
|
cache_ps_rtp_packet[pkt_key]->payload->append(pkt.payload);
|
|
|
|
|
|
|
|
|
|
//detect whether it is a completed frame
|
|
|
|
|
if (pkt.marker) {// rtp maker is true, is a completed frame
|
|
|
|
|
completed = true;
|
|
|
|
|
}
|
|
|
|
|
else if (pre_timestamp != pkt.timestamp) {
|
|
|
|
|
//current timestamp is different from previous timestamp
|
|
|
|
|
//previous timestamp, is a completed frame
|
|
|
|
|
std::stringstream ss;
|
|
|
|
|
ss << pkt.ssrc << ":" << pre_timestamp << ":" << port_string;
|
|
|
|
|
pkt_key = ss.str();
|
|
|
|
|
if (cache_ps_rtp_packet.find(pkt_key) != cache_ps_rtp_packet.end()) {
|
|
|
|
|
completed = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pprint->can_print()) {
|
|
|
|
|
srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB",
|
|
|
|
|
channel_id.c_str(), ip.c_str(), peer_port, nb_buf, pprint->age(), pkt.version,
|
|
|
|
|
pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc,
|
|
|
|
|
pkt.payload->length()
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//current packet becomes previous packet
|
|
|
|
|
srs_freep(pre_packet[pre_pkt_key]);
|
|
|
|
|
pre_packet[pre_pkt_key] = new SrsPsRtpPacket();
|
|
|
|
|
pre_packet[pre_pkt_key]->copy(&pkt);;
|
|
|
|
|
|
|
|
|
|
if (!completed) {
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
//process completed frame data
|
|
|
|
|
//clear processed one ps frame
|
|
|
|
|
//on completed frame data rtp packet in muxer enqueue
|
|
|
|
|
map<std::string, SrsPsRtpPacket*>::iterator key = cache_ps_rtp_packet.find(pkt_key);
|
|
|
|
|
if (key != cache_ps_rtp_packet.end())
|
|
|
|
|
{
|
|
|
|
|
SrsGb28181RtmpMuxer* muxer = NULL;
|
|
|
|
|
//First, search according to the channel_id. Otherwise, search according to the SSRC.
|
|
|
|
|
//Some channel_id are created by RTP pool, which are different ports.
|
|
|
|
|
//No channel_id are created by multiplexing ports, which are the same port
|
|
|
|
|
if (!channel_id.empty()) {
|
|
|
|
|
muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(pkt.ssrc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//auto crate channel
|
|
|
|
|
if (!muxer && config->auto_create_channel) {
|
|
|
|
|
//auto create channel generated id
|
|
|
|
|
std::stringstream ss, ss1;
|
|
|
|
|
ss << "chid" << pkt.ssrc;
|
|
|
|
|
std::string tmp_id = ss.str();
|
|
|
|
|
|
|
|
|
|
SrsGb28181StreamChannel channel;
|
|
|
|
|
channel.set_channel_id(tmp_id);
|
|
|
|
|
channel.set_port_mode(RTP_PORT_MODE_FIXED);
|
|
|
|
|
channel.set_ssrc(pkt.ssrc);
|
|
|
|
|
|
|
|
|
|
srs_error_t err2 = srs_success;
|
|
|
|
|
if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success) {
|
|
|
|
|
srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str());
|
|
|
|
|
srs_error_reset(err2);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (muxer) {
|
|
|
|
|
//TODO: fixme: the same device uses the same SSRC to send with different local ports
|
|
|
|
|
//record the first peer port
|
|
|
|
|
muxer->set_channel_peer_port(peer_port);
|
|
|
|
|
muxer->set_channel_peer_ip(address_string);
|
|
|
|
|
//not the first peer port's non processing
|
|
|
|
|
if (muxer->channel_peer_port() != peer_port) {
|
|
|
|
|
srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d",
|
|
|
|
|
muxer->get_channel_id().c_str(), pkt.ssrc, muxer->channel_peer_port(), peer_port);
|
|
|
|
|
srs_freep(key->second);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
//put it in queue, wait for consumer to process, and then free
|
|
|
|
|
muxer->ps_packet_enqueue(key->second);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
//no consumer process it, discarded
|
|
|
|
|
srs_freep(key->second);
|
|
|
|
|
}
|
|
|
|
|
cache_ps_rtp_packet.erase(pkt_key);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsGb28181RtmpMuxer* SrsGb28181TcpPsRtpProcessor::create_rtmpmuxer(std::string channel_id, uint32_t ssrc)
|
|
|
|
|
{
|
|
|
|
|
if (true) {
|
|
|
|
|
SrsGb28181RtmpMuxer* muxer = NULL;
|
|
|
|
|
//First, search according to the channel_id. Otherwise, search according to the SSRC.
|
|
|
|
|
//Some channel_id are created by RTP pool, which are different ports.
|
|
|
|
|
//No channel_id are created by multiplexing ports, which are the same port
|
|
|
|
|
if (!channel_id.empty()) {
|
|
|
|
|
muxer = _srs_gb28181->fetch_rtmpmuxer(channel_id);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
muxer = _srs_gb28181->fetch_rtmpmuxer_by_ssrc(ssrc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//auto crate channel
|
|
|
|
|
if (!muxer && config->auto_create_channel) {
|
|
|
|
|
//auto create channel generated id
|
|
|
|
|
std::stringstream ss, ss1;
|
|
|
|
|
ss << "chid" << ssrc;
|
|
|
|
|
std::string tmp_id = ss.str();
|
|
|
|
|
|
|
|
|
|
SrsGb28181StreamChannel channel;
|
|
|
|
|
channel.set_channel_id(tmp_id);
|
|
|
|
|
channel.set_port_mode(RTP_PORT_MODE_FIXED);
|
|
|
|
|
channel.set_ssrc(ssrc);
|
|
|
|
|
|
|
|
|
|
srs_error_t err2 = srs_success;
|
|
|
|
|
if ((err2 = _srs_gb28181->create_stream_channel(&channel)) != srs_success) {
|
|
|
|
|
srs_warn("gb28181: RtpProcessor create stream channel error %s", srs_error_desc(err2).c_str());
|
|
|
|
|
srs_error_reset(err2);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
muxer = _srs_gb28181->fetch_rtmpmuxer(tmp_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return muxer;
|
|
|
|
|
}//end if FoundFrame
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsGb28181TcpPsRtpProcessor::rtmpmuxer_enqueue_data(SrsGb28181RtmpMuxer *muxer, uint32_t ssrc,
|
|
|
|
|
int peer_port, std::string address_string, SrsPsRtpPacket *pkt)
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
|
|
|
|
|
if (!muxer)
|
|
|
|
|
return err;
|
|
|
|
|
|
|
|
|
|
if (muxer) {
|
|
|
|
|
//TODO: fixme: the same device uses the same SSRC to send with different local ports
|
|
|
|
|
//record the first peer port
|
|
|
|
|
muxer->set_channel_peer_port(peer_port);
|
|
|
|
|
muxer->set_channel_peer_ip(address_string);
|
|
|
|
|
//not the first peer port's non processing
|
|
|
|
|
if (muxer->channel_peer_port() != peer_port) {
|
|
|
|
|
srs_warn("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d",
|
|
|
|
|
muxer->get_channel_id().c_str(), ssrc, muxer->channel_peer_port(), peer_port);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
//muxer->ps_packet_enqueue(pkt);
|
|
|
|
|
muxer->insert_jitterbuffer(pkt);
|
|
|
|
|
}//end if (muxer->channel_peer_port() != peer_port)
|
|
|
|
|
}//end if (muxer)
|
|
|
|
|
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsGb28181TcpPsRtpProcessor::on_rtp_packet_jitter(char* buf, int nb_buf, std::string ip, int port)
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
|
|
|
|
|
pprint->elapse();
|
|
|
|
|
|
|
|
|
|
char address_string[64] = {0};
|
|
|
|
|
/*char port_string[16] = {0};
|
|
|
|
|
if (getnameinfo(from, fromlen,
|
|
|
|
|
(char*)&address_string, sizeof(address_string),
|
|
|
|
|
(char*)&port_string, sizeof(port_string),
|
|
|
|
|
NI_NUMERICHOST | NI_NUMERICSERV)) {
|
|
|
|
|
return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address");
|
|
|
|
|
}*/
|
|
|
|
|
|
|
|
|
|
//itoa(port, port_string, 10);
|
|
|
|
|
int peer_port = port;// atoi(port_string);
|
|
|
|
|
|
|
|
|
|
if (true) {
|
|
|
|
|
SrsBuffer stream(buf, nb_buf);
|
|
|
|
|
SrsPsRtpPacket *pkt = new SrsPsRtpPacket();;
|
|
|
|
|
|
|
|
|
|
if ((err = pkt->decode(&stream)) != srs_success) {
|
|
|
|
|
srs_freep(pkt);
|
|
|
|
|
return srs_error_wrap(err, "ps rtp decode error");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::stringstream ss3;
|
|
|
|
|
ss3 << pkt->ssrc << ":" << port;// port_string;
|
|
|
|
|
std::string jitter_key = ss3.str();
|
|
|
|
|
|
|
|
|
|
pkt->completed = pkt->marker;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (pprint->can_print()) {
|
|
|
|
|
srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " SrsGb28181TcpPsRtpProcessor::on_rtp_packet_jitter gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB",
|
|
|
|
|
channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt->version,
|
|
|
|
|
pkt->payload_type, pkt->sequence_number, pkt->timestamp, pkt->ssrc,
|
|
|
|
|
pkt->payload->length()
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsGb28181RtmpMuxer *muxer = create_rtmpmuxer(channel_id, pkt->ssrc);
|
|
|
|
|
if (muxer) {
|
|
|
|
|
rtmpmuxer_enqueue_data(muxer, pkt->ssrc, peer_port, ip, pkt);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsAutoFree(SrsPsRtpPacket, pkt);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//ISrsPsStreamHander ps stream raw video/audio hander interface
|
|
|
|
|
ISrsPsStreamHander::ISrsPsStreamHander()
|
|
|
|
@ -2830,9 +2507,10 @@ srs_error_t SrsGb28181Manger::query_device_list(std::string id, SrsJsonArray* ar
|
|
|
|
|
|
|
|
|
|
return sip_service->query_device_list(id, arr);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#define SRS_RTSP_BUFFER 262144
|
|
|
|
|
SrsGb28181Conn::SrsGb28181Conn(SrsGb28181Caster* c, srs_netfd_t fd, SrsGb28181TcpPsRtpProcessor *rtp_processor)
|
|
|
|
|
#define SRS_RTSP_BUFFER 8192
|
|
|
|
|
#define RTP_TCP_HEADER 2
|
|
|
|
|
#define MAX_PACKAGE_SIZE 1024 * 10
|
|
|
|
|
SrsGb28181Conn::SrsGb28181Conn(SrsGb28181Caster* c, srs_netfd_t fd, SrsGb28181PsRtpProcessor *rtp_processor)
|
|
|
|
|
{
|
|
|
|
|
caster = c;
|
|
|
|
|
stfd = fd;
|
|
|
|
@ -2877,90 +2555,64 @@ srs_error_t SrsGb28181Conn::do_cycle()
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
|
|
|
|
|
// retrieve ip of client.
|
|
|
|
|
int fd = srs_netfd_fileno(stfd);
|
|
|
|
|
std::string ip = srs_get_peer_ip(fd);
|
|
|
|
|
int port = srs_get_peer_port(fd);
|
|
|
|
|
// retrieve ip of client.
|
|
|
|
|
int fd = srs_netfd_fileno(stfd);
|
|
|
|
|
std::string ip = srs_get_peer_ip(fd);
|
|
|
|
|
int port = srs_get_peer_port(fd);
|
|
|
|
|
int addr_len = sizeof(sockaddr_in);
|
|
|
|
|
sockaddr_in *peer_sockaddr = (sockaddr_in*)malloc(addr_len);
|
|
|
|
|
peer_sockaddr->sin_family = AF_INET; //设置地址家族
|
|
|
|
|
peer_sockaddr->sin_port = htons(port); //设置端口
|
|
|
|
|
peer_sockaddr->sin_addr.s_addr = inet_addr(ip.c_str());
|
|
|
|
|
|
|
|
|
|
if (ip.empty() && !_srs_config->empty_ip_ok()) {
|
|
|
|
|
srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd));
|
|
|
|
|
}
|
|
|
|
|
srs_trace("rtsp: serve %s:%d", ip.c_str(), port);
|
|
|
|
|
|
|
|
|
|
char* leftData = (char*)malloc(SRS_RTSP_BUFFER);;
|
|
|
|
|
uint32_t leftDataLength = 0;
|
|
|
|
|
int16_t length = 0;
|
|
|
|
|
char* pp = (char*)&length;
|
|
|
|
|
char* p = &(mbuffer[0]);
|
|
|
|
|
ssize_t nb_read = 0;
|
|
|
|
|
int16_t length2;
|
|
|
|
|
|
|
|
|
|
// consume all rtp data.
|
|
|
|
|
while (true) {
|
|
|
|
|
if ((err = trd->pull()) != srs_success) {
|
|
|
|
|
free(leftData);
|
|
|
|
|
return srs_error_wrap(err, "rtsp cycle");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//memset(buffer, 0, SRS_RTSP_BUFFER);
|
|
|
|
|
nb_read = 0;
|
|
|
|
|
if ((err = skt->read(mbuffer + leftDataLength, SRS_RTSP_BUFFER - leftDataLength, &nb_read)) != srs_success) {
|
|
|
|
|
free(leftData);
|
|
|
|
|
return srs_error_wrap(err, "recv data");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nb_read = nb_read + leftDataLength;
|
|
|
|
|
|
|
|
|
|
pp = (char*)&length;
|
|
|
|
|
p = &(mbuffer[0]);
|
|
|
|
|
pp[1] = *p++;
|
|
|
|
|
pp[0] = *p++;
|
|
|
|
|
if (ip.empty() && !_srs_config->empty_ip_ok()) {
|
|
|
|
|
srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd));
|
|
|
|
|
}
|
|
|
|
|
srs_trace("gb28181 new connect by rtp-tcp from: %s:%d", ip.c_str(), port);
|
|
|
|
|
|
|
|
|
|
if (nb_read < (length + 2)) {//Not enough one packet.
|
|
|
|
|
leftDataLength = leftDataLength + nb_read;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
uint32_t left_data_len = 0; //缓存剩余数据
|
|
|
|
|
ssize_t nb_read = 0;
|
|
|
|
|
uint16_t packet_len = 0; //rtp包长度
|
|
|
|
|
|
|
|
|
|
memset(leftData, 0, SRS_RTSP_BUFFER);
|
|
|
|
|
// consume all rtp data.
|
|
|
|
|
while (true) {
|
|
|
|
|
if ((err = trd->pull()) != srs_success) {
|
|
|
|
|
return srs_error_wrap(err, "rtsp cycle");
|
|
|
|
|
}
|
|
|
|
|
nb_read = 0;
|
|
|
|
|
if ((err = skt->read(mbuffer + left_data_len, SRS_RTSP_BUFFER - left_data_len, &nb_read)) != srs_success) {
|
|
|
|
|
return srs_error_wrap(err, "recv data");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while (length > 0) {
|
|
|
|
|
if ((length + 2) == nb_read) {//Only one packet.
|
|
|
|
|
nb_read = nb_read - 2;
|
|
|
|
|
processor->on_rtp(mbuffer + 2, nb_read, ip, port);
|
|
|
|
|
leftDataLength = 0;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
else { //multi packets.
|
|
|
|
|
pp = (char*)&length2;
|
|
|
|
|
p = &(mbuffer[length + 2]);
|
|
|
|
|
pp[1] = *p++;
|
|
|
|
|
pp[0] = *p++;
|
|
|
|
|
|
|
|
|
|
processor->on_rtp(mbuffer + 2, length, ip, port);
|
|
|
|
|
|
|
|
|
|
leftDataLength = nb_read - (length + 2);
|
|
|
|
|
nb_read = leftDataLength;
|
|
|
|
|
memcpy(leftData, mbuffer + length + 2, leftDataLength);
|
|
|
|
|
|
|
|
|
|
pp = (char*)&length;
|
|
|
|
|
p = &(mbuffer[length + 2]);
|
|
|
|
|
pp[1] = *p++;
|
|
|
|
|
pp[0] = *p++;
|
|
|
|
|
|
|
|
|
|
if (leftDataLength < (length + 2)) {//Not enough one packet.
|
|
|
|
|
memcpy(mbuffer, leftData, leftDataLength);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
memcpy(mbuffer, leftData, leftDataLength);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
left_data_len = nb_read + left_data_len;
|
|
|
|
|
char * buf = mbuffer;
|
|
|
|
|
|
|
|
|
|
free(leftData);
|
|
|
|
|
uint32_t index = 0;
|
|
|
|
|
for( ; index < left_data_len; ){
|
|
|
|
|
if (index + RTP_TCP_HEADER >= left_data_len){ //less rtp package
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
packet_len = (((uint8_t *) buf)[index] << 8) | ((uint8_t *) buf)[index + 1];
|
|
|
|
|
if (packet_len > MAX_PACKAGE_SIZE){
|
|
|
|
|
//FIXME 自动重新invite?
|
|
|
|
|
srs_error("abnormal RTP packet length:%d, close the tcp conn:%s", packet_len, remote_ip().c_str());
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
if (index + RTP_TCP_HEADER + packet_len >= left_data_len){
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
processor->on_tcp_packet((sockaddr*)peer_sockaddr, addr_len, buf + index + RTP_TCP_HEADER, packet_len);
|
|
|
|
|
index = index + RTP_TCP_HEADER + packet_len;
|
|
|
|
|
}
|
|
|
|
|
if (index != 0) { //update left data
|
|
|
|
|
left_data_len = left_data_len - index;
|
|
|
|
|
memmove(mbuffer, buf + index, left_data_len);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
free(peer_sockaddr);
|
|
|
|
|
return err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
srs_error_t SrsGb28181Conn::cycle()
|
|
|
|
@ -2996,7 +2648,7 @@ SrsGb28181Caster::SrsGb28181Caster(SrsConfDirective* c)
|
|
|
|
|
// TODO: FIXME: support reload.
|
|
|
|
|
output = _srs_config->get_stream_caster_output(c);
|
|
|
|
|
config = new SrsGb28181Config(c);
|
|
|
|
|
rtp_processor = new SrsGb28181TcpPsRtpProcessor(config, "");
|
|
|
|
|
rtp_processor = new SrsGb28181PsRtpProcessor(config, "");
|
|
|
|
|
manager = new SrsResourceManager("GB28181TCP", true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|