@ -202,6 +202,11 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const
return on_rtp_packet ( from , fromlen , buf , nb_buf ) ;
}
}
srs_error_t SrsGb28181PsRtpProcessor : : on_tcp_packet ( const sockaddr * from , const int fromlen , char * buf , int nb_buf )
{
on_udp_packet ( from , fromlen , buf , nb_buf ) ;
}
srs_error_t SrsGb28181PsRtpProcessor : : on_rtp_packet ( const sockaddr * from , const int fromlen , char * buf , int nb_buf )
{
@ -460,334 +465,6 @@ srs_error_t SrsGb28181PsRtpProcessor::on_rtp_packet_jitter(const sockaddr* from,
return err ;
}
//SrsGb28181TcpPsRtpProcessor
SrsGb28181TcpPsRtpProcessor : : SrsGb28181TcpPsRtpProcessor ( SrsGb28181Config * c , std : : string id )
{
config = c ;
pprint = SrsPithyPrint : : create_caster ( ) ;
channel_id = id ;
}
SrsGb28181TcpPsRtpProcessor : : ~ SrsGb28181TcpPsRtpProcessor ( )
{
dispose ( ) ;
srs_freep ( pprint ) ;
}
void SrsGb28181TcpPsRtpProcessor : : dispose ( )
{
map < std : : string , SrsPsRtpPacket * > : : iterator it2 ;
for ( it2 = cache_ps_rtp_packet . begin ( ) ; it2 ! = cache_ps_rtp_packet . end ( ) ; + + it2 ) {
srs_freep ( it2 - > second ) ;
}
cache_ps_rtp_packet . clear ( ) ;
clear_pre_packet ( ) ;
return ;
}
void SrsGb28181TcpPsRtpProcessor : : clear_pre_packet ( )
{
map < std : : string , SrsPsRtpPacket * > : : iterator it ;
for ( it = pre_packet . begin ( ) ; it ! = pre_packet . end ( ) ; + + it ) {
srs_freep ( it - > second ) ;
}
pre_packet . clear ( ) ;
}
srs_error_t SrsGb28181TcpPsRtpProcessor : : on_rtp ( char * buf , int nb_buf , std : : string ip , int port )
{
srs_error_t err = srs_success ;
if ( config - > jitterbuffer_enable ) {
err = on_rtp_packet_jitter ( buf , nb_buf , ip , port ) ;
if ( err ! = srs_success ) {
srs_warn ( " SrsGb28181TcpPsRtpProcessor::on_rtp on_rtp_packet_jitter err " ) ;
}
}
else {
return on_rtp_packet ( buf , nb_buf , ip , port ) ;
}
return err ;
}
srs_error_t SrsGb28181TcpPsRtpProcessor : : on_rtp_packet ( char * buf , int nb_buf , std : : string ip , int port )
{
srs_error_t err = srs_success ;
bool completed = false ;
pprint - > elapse ( ) ;
char address_string [ 64 ] = { 0 } ;
char port_string [ 16 ] = { 0 } ;
/*if (getnameinfo(from, fromlen,
( char * ) & address_string , sizeof ( address_string ) ,
( char * ) & port_string , sizeof ( port_string ) ,
NI_NUMERICHOST | NI_NUMERICSERV ) ) {
return srs_error_new ( ERROR_SYSTEM_IP_INVALID , " bad address " ) ;
} */
//itoa(port, port_string, 10);
int peer_port = port ; // atoi(port_string);
if ( true ) {
SrsBuffer stream ( buf , nb_buf ) ;
SrsPsRtpPacket pkt ;
if ( ( err = pkt . decode ( & stream ) ) ! = srs_success ) {
return srs_error_wrap ( err , " ps rtp decode error " ) ;
}
//TODO: fixme: the same device uses the same SSRC to send with different local ports
std : : stringstream ss ;
ss < < pkt . ssrc < < " : " < < pkt . timestamp < < " : " < < port ; // port_string;
std : : string pkt_key = ss . str ( ) ;
std : : stringstream ss2 ;
ss2 < < pkt . ssrc < < " : " < < port_string ;
std : : string pre_pkt_key = ss2 . str ( ) ;
if ( pre_packet . find ( pre_pkt_key ) = = pre_packet . end ( ) ) {
pre_packet [ pre_pkt_key ] = new SrsPsRtpPacket ( ) ;
pre_packet [ pre_pkt_key ] - > copy ( & pkt ) ;
}
//cache pkt by ssrc and timestamp
if ( cache_ps_rtp_packet . find ( pkt_key ) = = cache_ps_rtp_packet . end ( ) ) {
cache_ps_rtp_packet [ pkt_key ] = new SrsPsRtpPacket ( ) ;
}
//get previous timestamp by ssrc
uint32_t pre_timestamp = pre_packet [ pre_pkt_key ] - > timestamp ;
uint32_t pre_sequence_number = pre_packet [ pre_pkt_key ] - > sequence_number ;
//TODO: check sequence number out of order
//it may be out of order, or multiple streaming ssrc are the same
if ( ( ( pre_sequence_number + 1 ) % 65536 ) ! = pkt . sequence_number & &
pre_sequence_number ! = pkt . sequence_number ) {
srs_warn ( " gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s) " ,
pkt . ssrc , pre_sequence_number , pkt . sequence_number , ip . c_str ( ) , port_string ) ;
//return err;
}
//copy header to cache
cache_ps_rtp_packet [ pkt_key ] - > copy ( & pkt ) ;
//accumulate one frame of data, to payload cache
cache_ps_rtp_packet [ pkt_key ] - > payload - > append ( pkt . payload ) ;
//detect whether it is a completed frame
if ( pkt . marker ) { // rtp maker is true, is a completed frame
completed = true ;
}
else if ( pre_timestamp ! = pkt . timestamp ) {
//current timestamp is different from previous timestamp
//previous timestamp, is a completed frame
std : : stringstream ss ;
ss < < pkt . ssrc < < " : " < < pre_timestamp < < " : " < < port_string ;
pkt_key = ss . str ( ) ;
if ( cache_ps_rtp_packet . find ( pkt_key ) ! = cache_ps_rtp_packet . end ( ) ) {
completed = true ;
}
}
if ( pprint - > can_print ( ) ) {
srs_trace ( " <- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB " ,
channel_id . c_str ( ) , ip . c_str ( ) , peer_port , nb_buf , pprint - > age ( ) , pkt . version ,
pkt . payload_type , pkt . sequence_number , pkt . timestamp , pkt . ssrc ,
pkt . payload - > length ( )
) ;
}
//current packet becomes previous packet
srs_freep ( pre_packet [ pre_pkt_key ] ) ;
pre_packet [ pre_pkt_key ] = new SrsPsRtpPacket ( ) ;
pre_packet [ pre_pkt_key ] - > copy ( & pkt ) ; ;
if ( ! completed ) {
return err ;
}
//process completed frame data
//clear processed one ps frame
//on completed frame data rtp packet in muxer enqueue
map < std : : string , SrsPsRtpPacket * > : : iterator key = cache_ps_rtp_packet . find ( pkt_key ) ;
if ( key ! = cache_ps_rtp_packet . end ( ) )
{
SrsGb28181RtmpMuxer * muxer = NULL ;
//First, search according to the channel_id. Otherwise, search according to the SSRC.
//Some channel_id are created by RTP pool, which are different ports.
//No channel_id are created by multiplexing ports, which are the same port
if ( ! channel_id . empty ( ) ) {
muxer = _srs_gb28181 - > fetch_rtmpmuxer ( channel_id ) ;
}
else {
muxer = _srs_gb28181 - > fetch_rtmpmuxer_by_ssrc ( pkt . ssrc ) ;
}
//auto crate channel
if ( ! muxer & & config - > auto_create_channel ) {
//auto create channel generated id
std : : stringstream ss , ss1 ;
ss < < " chid " < < pkt . ssrc ;
std : : string tmp_id = ss . str ( ) ;
SrsGb28181StreamChannel channel ;
channel . set_channel_id ( tmp_id ) ;
channel . set_port_mode ( RTP_PORT_MODE_FIXED ) ;
channel . set_ssrc ( pkt . ssrc ) ;
srs_error_t err2 = srs_success ;
if ( ( err2 = _srs_gb28181 - > create_stream_channel ( & channel ) ) ! = srs_success ) {
srs_warn ( " gb28181: RtpProcessor create stream channel error %s " , srs_error_desc ( err2 ) . c_str ( ) ) ;
srs_error_reset ( err2 ) ;
} ;
muxer = _srs_gb28181 - > fetch_rtmpmuxer ( tmp_id ) ;
}
if ( muxer ) {
//TODO: fixme: the same device uses the same SSRC to send with different local ports
//record the first peer port
muxer - > set_channel_peer_port ( peer_port ) ;
muxer - > set_channel_peer_ip ( address_string ) ;
//not the first peer port's non processing
if ( muxer - > channel_peer_port ( ) ! = peer_port ) {
srs_warn ( " <- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d " ,
muxer - > get_channel_id ( ) . c_str ( ) , pkt . ssrc , muxer - > channel_peer_port ( ) , peer_port ) ;
srs_freep ( key - > second ) ;
}
else {
//put it in queue, wait for consumer to process, and then free
muxer - > ps_packet_enqueue ( key - > second ) ;
}
}
else {
//no consumer process it, discarded
srs_freep ( key - > second ) ;
}
cache_ps_rtp_packet . erase ( pkt_key ) ;
}
}
return err ;
}
SrsGb28181RtmpMuxer * SrsGb28181TcpPsRtpProcessor : : create_rtmpmuxer ( std : : string channel_id , uint32_t ssrc )
{
if ( true ) {
SrsGb28181RtmpMuxer * muxer = NULL ;
//First, search according to the channel_id. Otherwise, search according to the SSRC.
//Some channel_id are created by RTP pool, which are different ports.
//No channel_id are created by multiplexing ports, which are the same port
if ( ! channel_id . empty ( ) ) {
muxer = _srs_gb28181 - > fetch_rtmpmuxer ( channel_id ) ;
}
else {
muxer = _srs_gb28181 - > fetch_rtmpmuxer_by_ssrc ( ssrc ) ;
}
//auto crate channel
if ( ! muxer & & config - > auto_create_channel ) {
//auto create channel generated id
std : : stringstream ss , ss1 ;
ss < < " chid " < < ssrc ;
std : : string tmp_id = ss . str ( ) ;
SrsGb28181StreamChannel channel ;
channel . set_channel_id ( tmp_id ) ;
channel . set_port_mode ( RTP_PORT_MODE_FIXED ) ;
channel . set_ssrc ( ssrc ) ;
srs_error_t err2 = srs_success ;
if ( ( err2 = _srs_gb28181 - > create_stream_channel ( & channel ) ) ! = srs_success ) {
srs_warn ( " gb28181: RtpProcessor create stream channel error %s " , srs_error_desc ( err2 ) . c_str ( ) ) ;
srs_error_reset ( err2 ) ;
} ;
muxer = _srs_gb28181 - > fetch_rtmpmuxer ( tmp_id ) ;
}
return muxer ;
} //end if FoundFrame
}
srs_error_t SrsGb28181TcpPsRtpProcessor : : rtmpmuxer_enqueue_data ( SrsGb28181RtmpMuxer * muxer , uint32_t ssrc ,
int peer_port , std : : string address_string , SrsPsRtpPacket * pkt )
{
srs_error_t err = srs_success ;
if ( ! muxer )
return err ;
if ( muxer ) {
//TODO: fixme: the same device uses the same SSRC to send with different local ports
//record the first peer port
muxer - > set_channel_peer_port ( peer_port ) ;
muxer - > set_channel_peer_ip ( address_string ) ;
//not the first peer port's non processing
if ( muxer - > channel_peer_port ( ) ! = peer_port ) {
srs_warn ( " <- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, ssrc=%#x, first peer_port=%d cur peer_port=%d " ,
muxer - > get_channel_id ( ) . c_str ( ) , ssrc , muxer - > channel_peer_port ( ) , peer_port ) ;
}
else {
//muxer->ps_packet_enqueue(pkt);
muxer - > insert_jitterbuffer ( pkt ) ;
} //end if (muxer->channel_peer_port() != peer_port)
} //end if (muxer)
return err ;
}
srs_error_t SrsGb28181TcpPsRtpProcessor : : on_rtp_packet_jitter ( char * buf , int nb_buf , std : : string ip , int port )
{
srs_error_t err = srs_success ;
pprint - > elapse ( ) ;
char address_string [ 64 ] = { 0 } ;
/*char port_string[16] = {0};
if ( getnameinfo ( from , fromlen ,
( char * ) & address_string , sizeof ( address_string ) ,
( char * ) & port_string , sizeof ( port_string ) ,
NI_NUMERICHOST | NI_NUMERICSERV ) ) {
return srs_error_new ( ERROR_SYSTEM_IP_INVALID , " bad address " ) ;
} */
//itoa(port, port_string, 10);
int peer_port = port ; // atoi(port_string);
if ( true ) {
SrsBuffer stream ( buf , nb_buf ) ;
SrsPsRtpPacket * pkt = new SrsPsRtpPacket ( ) ; ;
if ( ( err = pkt - > decode ( & stream ) ) ! = srs_success ) {
srs_freep ( pkt ) ;
return srs_error_wrap ( err , " ps rtp decode error " ) ;
}
std : : stringstream ss3 ;
ss3 < < pkt - > ssrc < < " : " < < port ; // port_string;
std : : string jitter_key = ss3 . str ( ) ;
pkt - > completed = pkt - > marker ;
if ( pprint - > can_print ( ) ) {
srs_trace ( " <- " SRS_CONSTS_LOG_GB28181_CASTER " SrsGb28181TcpPsRtpProcessor::on_rtp_packet_jitter gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB " ,
channel_id . c_str ( ) , address_string , peer_port , nb_buf , pprint - > age ( ) , pkt - > version ,
pkt - > payload_type , pkt - > sequence_number , pkt - > timestamp , pkt - > ssrc ,
pkt - > payload - > length ( )
) ;
}
SrsGb28181RtmpMuxer * muxer = create_rtmpmuxer ( channel_id , pkt - > ssrc ) ;
if ( muxer ) {
rtmpmuxer_enqueue_data ( muxer , pkt - > ssrc , peer_port , ip , pkt ) ;
}
SrsAutoFree ( SrsPsRtpPacket , pkt ) ;
}
return err ;
}
//ISrsPsStreamHander ps stream raw video/audio hander interface
ISrsPsStreamHander : : ISrsPsStreamHander ( )
@ -2830,9 +2507,10 @@ srs_error_t SrsGb28181Manger::query_device_list(std::string id, SrsJsonArray* ar
return sip_service - > query_device_list ( id , arr ) ;
}
# define SRS_RTSP_BUFFER 262144
SrsGb28181Conn : : SrsGb28181Conn ( SrsGb28181Caster * c , srs_netfd_t fd , SrsGb28181TcpPsRtpProcessor * rtp_processor )
# define SRS_RTSP_BUFFER 8192
# define RTP_TCP_HEADER 2
# define MAX_PACKAGE_SIZE 1024 * 10
SrsGb28181Conn : : SrsGb28181Conn ( SrsGb28181Caster * c , srs_netfd_t fd , SrsGb28181PsRtpProcessor * rtp_processor )
{
caster = c ;
stfd = fd ;
@ -2877,90 +2555,64 @@ srs_error_t SrsGb28181Conn::do_cycle()
{
srs_error_t err = srs_success ;
// retrieve ip of client.
int fd = srs_netfd_fileno ( stfd ) ;
std : : string ip = srs_get_peer_ip ( fd ) ;
int port = srs_get_peer_port ( fd ) ;
// retrieve ip of client.
int fd = srs_netfd_fileno ( stfd ) ;
std : : string ip = srs_get_peer_ip ( fd ) ;
int port = srs_get_peer_port ( fd ) ;
int addr_len = sizeof ( sockaddr_in ) ;
sockaddr_in * peer_sockaddr = ( sockaddr_in * ) malloc ( addr_len ) ;
peer_sockaddr - > sin_family = AF_INET ; //设置地址家族
peer_sockaddr - > sin_port = htons ( port ) ; //设置端口
peer_sockaddr - > sin_addr . s_addr = inet_addr ( ip . c_str ( ) ) ;
if ( ip . empty ( ) & & ! _srs_config - > empty_ip_ok ( ) ) {
srs_warn ( " empty ip for fd=%d " , srs_netfd_fileno ( stfd ) ) ;
}
srs_trace ( " rtsp: serve %s:%d " , ip . c_str ( ) , port ) ;
char * leftData = ( char * ) malloc ( SRS_RTSP_BUFFER ) ; ;
uint32_t leftDataLength = 0 ;
int16_t length = 0 ;
char * pp = ( char * ) & length ;
char * p = & ( mbuffer [ 0 ] ) ;
ssize_t nb_read = 0 ;
int16_t length2 ;
// consume all rtp data.
while ( true ) {
if ( ( err = trd - > pull ( ) ) ! = srs_success ) {
free ( leftData ) ;
return srs_error_wrap ( err , " rtsp cycle " ) ;
}
//memset(buffer, 0, SRS_RTSP_BUFFER);
nb_read = 0 ;
if ( ( err = skt - > read ( mbuffer + leftDataLength , SRS_RTSP_BUFFER - leftDataLength , & nb_read ) ) ! = srs_success ) {
free ( leftData ) ;
return srs_error_wrap ( err , " recv data " ) ;
}
nb_read = nb_read + leftDataLength ;
pp = ( char * ) & length ;
p = & ( mbuffer [ 0 ] ) ;
pp [ 1 ] = * p + + ;
pp [ 0 ] = * p + + ;
if ( ip . empty ( ) & & ! _srs_config - > empty_ip_ok ( ) ) {
srs_warn ( " empty ip for fd=%d " , srs_netfd_fileno ( stfd ) ) ;
}
srs_trace ( " gb28181 new connect by rtp-tcp from: %s:%d " , ip . c_str ( ) , port ) ;
if ( nb_read < ( length + 2 ) ) { //Not enough one packet.
leftDataLength = leftDataLength + nb_read ;
continue ;
}
uint32_t left_data_len = 0 ; //缓存剩余数据
ssize_t nb_read = 0 ;
uint16_t packet_len = 0 ; //rtp包长度
memset ( leftData , 0 , SRS_RTSP_BUFFER ) ;
// consume all rtp data.
while ( true ) {
if ( ( err = trd - > pull ( ) ) ! = srs_success ) {
return srs_error_wrap ( err , " rtsp cycle " ) ;
}
nb_read = 0 ;
if ( ( err = skt - > read ( mbuffer + left_data_len , SRS_RTSP_BUFFER - left_data_len , & nb_read ) ) ! = srs_success ) {
return srs_error_wrap ( err , " recv data " ) ;
}
while ( length > 0 ) {
if ( ( length + 2 ) = = nb_read ) { //Only one packet.
nb_read = nb_read - 2 ;
processor - > on_rtp ( mbuffer + 2 , nb_read , ip , port ) ;
leftDataLength = 0 ;
break ;
}
else { //multi packets.
pp = ( char * ) & length2 ;
p = & ( mbuffer [ length + 2 ] ) ;
pp [ 1 ] = * p + + ;
pp [ 0 ] = * p + + ;
processor - > on_rtp ( mbuffer + 2 , length , ip , port ) ;
leftDataLength = nb_read - ( length + 2 ) ;
nb_read = leftDataLength ;
memcpy ( leftData , mbuffer + length + 2 , leftDataLength ) ;
pp = ( char * ) & length ;
p = & ( mbuffer [ length + 2 ] ) ;
pp [ 1 ] = * p + + ;
pp [ 0 ] = * p + + ;
if ( leftDataLength < ( length + 2 ) ) { //Not enough one packet.
memcpy ( mbuffer , leftData , leftDataLength ) ;
break ;
}
else {
memcpy ( mbuffer , leftData , leftDataLength ) ;
}
}
}
}
left_data_len = nb_read + left_data_len ;
char * buf = mbuffer ;
free ( leftData ) ;
uint32_t index = 0 ;
for ( ; index < left_data_len ; ) {
if ( index + RTP_TCP_HEADER > = left_data_len ) { //less rtp package
break ;
}
packet_len = ( ( ( uint8_t * ) buf ) [ index ] < < 8 ) | ( ( uint8_t * ) buf ) [ index + 1 ] ;
if ( packet_len > MAX_PACKAGE_SIZE ) {
//FIXME 自动重新invite?
srs_error ( " abnormal RTP packet length:%d, close the tcp conn:%s " , packet_len , remote_ip ( ) . c_str ( ) ) ;
return err ;
}
if ( index + RTP_TCP_HEADER + packet_len > = left_data_len ) {
break ;
}
processor - > on_tcp_packet ( ( sockaddr * ) peer_sockaddr , addr_len , buf + index + RTP_TCP_HEADER , packet_len ) ;
index = index + RTP_TCP_HEADER + packet_len ;
}
if ( index ! = 0 ) { //update left data
left_data_len = left_data_len - index ;
memmove ( mbuffer , buf + index , left_data_len ) ;
}
return err ;
}
free ( peer_sockaddr ) ;
return err ;
}
srs_error_t SrsGb28181Conn : : cycle ( )
@ -2996,7 +2648,7 @@ SrsGb28181Caster::SrsGb28181Caster(SrsConfDirective* c)
// TODO: FIXME: support reload.
output = _srs_config - > get_stream_caster_output ( c ) ;
config = new SrsGb28181Config ( c ) ;
rtp_processor = new SrsGb28181 Tcp PsRtpProcessor( config , " " ) ;
rtp_processor = new SrsGb28181 PsRtpProcessor( config , " " ) ;
manager = new SrsResourceManager ( " GB28181TCP " , true ) ;
}