fix generate ssrc, rtmp muxer cycle sleep, ps steam parase etc..

pull/1691/head
xialixin 5 years ago
parent 9e9b5374d5
commit d2b8b937d6

@ -50,11 +50,10 @@ using namespace std;
#include <srs_protocol_format.hpp>
#include <srs_sip_stack.hpp>
//#define W_PS_FILE
//#define W_VIDEO_FILE
//#define W_AUDIO_FILE
//#define W_UNKONW_FILE
SrsPsRtpPacket::SrsPsRtpPacket()
{
@ -177,9 +176,20 @@ void SrsGb28181PsRtpProcessor::dispose()
}
cache_ps_rtp_packet.clear();
clear_pre_packet();
return;
}
void SrsGb28181PsRtpProcessor::clear_pre_packet()
{
map<std::string, SrsPsRtpPacket*>::iterator it;
for (it = pre_packet.begin(); it != pre_packet.end(); ++it) {
srs_freep(it->second);
}
pre_packet.clear();
}
srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf)
{
srs_error_t err = srs_success;
@ -227,15 +237,16 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const
//get previous timestamp by ssrc
uint32_t pre_timestamp = pre_packet[pre_pkt_key]->timestamp;
//uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number;
uint32_t pre_sequence_number = pre_packet[pre_pkt_key]->sequence_number;
//TODO: check sequence number out of order
//it may be out of order, or multiple streaming ssrc are the same
// if (pre_sequence_number > pkt.sequence_number){
// srs_info("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, addr=%s,port=%s",
// pkt.ssrc, pre_sequence_number, pkt.sequence_number, address_string, port_string);
// //return err;
// }
if (pre_sequence_number + 1 != pkt.sequence_number &&
pre_sequence_number != pkt.sequence_number){
srs_warn("gb28181: ps sequence_number out of order, ssrc=%#x, pre=%u, cur=%u, peer(%s, %s)",
pkt.ssrc, pre_sequence_number, pkt.sequence_number, address_string, port_string);
//return err;
}
//copy header to cache
cache_ps_rtp_packet[pkt_key]->copy(&pkt);
@ -257,7 +268,7 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const
}
if (pprint->can_print()) {
srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " gb28181: client_id %s, peer(ip=%s, port=%d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB",
srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB",
channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt.version,
pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc,
pkt.payload->length()
@ -556,15 +567,40 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_
}
else
{
srs_trace("gb28181: client_id %s, unkonw ps data %02x %02x %02x %02x\n",
channel_id.c_str(), next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]);
#ifdef W_UNKONW_FILE
if (!unknow_fw.is_open()) {
std::string filename = "test_unknow_" + channel_id + ".mpg";
unknow_fw.open(filename.c_str());
}
unknow_fw.write(next_ps_pack, incomplete_len, NULL);
#endif
//TODO: fixme unkonw ps data parse
if (next_ps_pack
&& next_ps_pack[0] == (char)0x00
&& next_ps_pack[1] == (char)0x00
&& next_ps_pack[2] == (char)0x00
&& next_ps_pack[3] == (char)0x01){
//dahua's PS header may lose packets. It is sent by an RTP packet of Dahua's PS header
//dahua rtp send format:
//ts=1000 seq=1 mark=false payload= ps header
//ts=1000 seq=2 mark=false payload= video
//ts=1000 seq=3 mark=true payload= video
//ts=1000 seq=4 mark=true payload= audio
incomplete_len = ps_size - complete_len;
complete_len = complete_len + incomplete_len;
}
srs_trace("gb28181: client_id %s, unkonw ps data (%#x/%u) %02x %02x %02x %02x\n",
channel_id.c_str(), ssrc, timestamp,
next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]);
break;
}
}
if (complete_len != ps_size){
srs_trace("gb28181: client_id %s decode ps packet error! ps_size=%d complete=%d \n",
channel_id.c_str(), ps_size, complete_len);
srs_trace("gb28181: client_id %s decode ps packet error (%#x/%u)! ps_size=%d complete=%d \n",
channel_id.c_str(), ssrc, timestamp, ps_size, complete_len);
}else if (hander && video_stream.length() && can_send_ps_av_packet()) {
if ((err = hander->on_rtp_video(&video_stream, video_pts)) != srs_success) {
return srs_error_wrap(err, "process ps video packet");
@ -609,7 +645,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c)
sip_auto_play = _srs_config->get_stream_caster_gb28181_sip_auto_play(c);
sip_ack_timeout = _srs_config->get_stream_caster_gb28181_ack_timeout(c);
sip_keepalive_timeout = _srs_config->get_stream_caster_gb28181_keepalive_timeout(c);
print_sip_message = _srs_config->get_stream_caster_gb28181_print_sip_message(c);
//print_sip_message = _srs_config->get_stream_caster_gb28181_print_sip_message(c);
sip_invite_port_fixed = _srs_config->get_stream_caster_gb28181_sip_invite_port_fixed(c);
}
@ -706,6 +742,8 @@ void SrsGb28181RtmpMuxer::set_channel_peer_ip(std::string ip)
void SrsGb28181RtmpMuxer::set_channel_peer_port(int port)
{
if (channel->get_rtp_peer_port() == 0){
channel->set_recv_time_str(srs_sip_get_utc_date());
channel->set_recv_time(srs_get_system_time());
channel->set_rtp_peer_port(port);
}
}
@ -775,7 +813,10 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle()
}
if (pprint->can_print()) {
srs_trace("gb28181: client id=%s, rtmp muxer is alive", channel_id.c_str());
srs_trace("gb28181: client id=%s, ssrc=%#x, peer(%s, %d), rtmp muxer is alive",
channel_id.c_str(), channel->get_ssrc(),
channel->get_rtp_peer_ip().c_str(),
channel->get_rtp_peer_port());
}
srs_utime_t now = srs_get_system_time();
@ -785,7 +826,7 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle()
//the peer-port and peer-ip will be cleared and
//other port data will be received again
if (duration > (2 * SRS_UTIME_SECONDS) && channel->get_rtp_peer_port() != 0){
srs_warn("gb28181: client id=%s ssrc=%#x, peer(ip=%s port=%d), no rtp data %d in seconds, clean it, wait other port!",
srs_warn("gb28181: client id=%s ssrc=%#x, peer(%s, %d), no rtp data %d in seconds, clean it, wait other port!",
channel_id.c_str(), channel->get_ssrc(), channel->get_rtp_peer_ip().c_str(),
channel->get_rtp_peer_port(), duration/SRS_UTIME_SECONDS);
channel->set_rtp_peer_port(0);
@ -798,8 +839,11 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle()
break;
}
srs_cond_timedwait(wait_ps_queue, 5 * SRS_UTIME_MILLISECONDS);
//srs_usleep(1000 * 5);
if (ps_queue.empty()){
srs_cond_timedwait(wait_ps_queue, 200 * SRS_UTIME_MILLISECONDS);
}else {
srs_cond_timedwait(wait_ps_queue, 10 * SRS_UTIME_MILLISECONDS);
}
}
return err;
@ -819,7 +863,7 @@ void SrsGb28181RtmpMuxer::ps_packet_enqueue(SrsPsRtpPacket *pkt)
{
srs_assert(pkt);
recv_stream_time = srs_update_system_time();
recv_stream_time = srs_get_system_time();
//prevent consumers from being unable to process data
//and accumulating in the queue
@ -1197,6 +1241,8 @@ SrsGb28181StreamChannel::SrsGb28181StreamChannel(){
rtp_peer_port = 0;
rtp_peer_ip = "";
rtmp_url = "";
recv_time = 0;
recv_time_str = "";
}
SrsGb28181StreamChannel::~SrsGb28181StreamChannel()
@ -1219,6 +1265,10 @@ void SrsGb28181StreamChannel::copy(const SrsGb28181StreamChannel *s){
rtp_peer_port = s->get_rtp_peer_port();
rtmp_url = s->get_rtmp_url();
recv_time_str = s->get_recv_time_str();
recv_time = s->get_recv_time();
}
void SrsGb28181StreamChannel::dumps(SrsJsonObject* obj)
@ -1235,6 +1285,8 @@ void SrsGb28181StreamChannel::dumps(SrsJsonObject* obj)
obj->set("port_mode", SrsJsonAny::str(port_mode.c_str()));
obj->set("rtp_peer_port", SrsJsonAny::integer(rtp_peer_port));
obj->set("rtp_peer_ip", SrsJsonAny::str(rtp_peer_ip.c_str()));
obj->set("recv_time", SrsJsonAny::integer(recv_time/SRS_UTIME_SECONDS));
obj->set("recv_time_str", SrsJsonAny::str(recv_time_str.c_str()));
}
@ -1316,10 +1368,12 @@ uint32_t SrsGb28181Manger::hash_code(std::string str)
uint32_t SrsGb28181Manger::generate_ssrc(std::string id)
{
srand(uint(time(0)));
// TODO: SSRC rules can be customized,
//uint8_t index = uint8_t(rand() % (0x0F - 0x01 + 1) + 0x01);
//uint32_t ssrc = 0x00FFFFF0 & (hash_code(id) << 4) | index;
uint32_t ssrc = 0x00FFFFFF & (hash_code(id));
// TODO: SSRC rules can be customized,
//gb28281 live ssrc max value 0999999999(3B9AC9FF)
//gb28281 vod ssrc max value 1999999999(773593FF)
uint8_t index = uint8_t(rand() % (0x0F - 0x01 + 1) + 0x01);
uint32_t ssrc = 0x2FFFF00 & (hash_code(id) << 8) | index;
//uint32_t ssrc = 0x00FFFFFF & (hash_code(id));
srs_trace("gb28181: generate ssrc id=%s, ssrc=%u", id.c_str(), ssrc);
return ssrc;
}
@ -1417,6 +1471,10 @@ void SrsGb28181Manger::remove(SrsGb28181RtmpMuxer* muxer)
manager->remove(muxer);
}
void SrsGb28181Manger::remove_sip_session(SrsGb28181SipSession* sess)
{
manager->remove(sess);
}
srs_error_t SrsGb28181Manger::start_ps_rtp_listen(std::string id, int port)
{
@ -1654,8 +1712,6 @@ uint32_t SrsGb28181Manger::notify_sip_invite(std::string id, std::string ip, int
//channel not exist
SrsGb28181StreamChannel channel;
channel.set_channel_id(id);
channel.set_app("live");
channel.set_stream(id);
int code = create_stream_channel(&channel);
if (code != ERROR_SUCCESS){
return code;

@ -65,6 +65,7 @@ class SrsGb28181Config;
class SrsGb28181PsRtpProcessor;
class SrsGb28181SipService;
class SrsGb28181StreamChannel;
class SrsGb28181SipSession;
//ps rtp header packet parse
class SrsPsRtpPacket: public SrsRtpPacket
@ -127,6 +128,7 @@ public:
private:
bool can_send_ps_av_packet();
void dispose();
void clear_pre_packet();
// Interface ISrsUdpHandler
public:
virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf);
@ -188,6 +190,7 @@ private:
SrsFileWriter ps_fw;
SrsFileWriter video_fw;
SrsFileWriter audio_fw;
SrsFileWriter unknow_fw;
bool first_keyframe_flag;
bool wait_first_keyframe;
@ -330,6 +333,8 @@ private:
int rtp_port;
int rtmp_port;
uint32_t ssrc;
srs_utime_t recv_time;
std::string recv_time_str;
//send rtp stream client local port
int rtp_peer_port;
@ -351,6 +356,8 @@ public:
uint32_t get_rtp_peer_port() const { return rtp_peer_port; }
std::string get_rtp_peer_ip() const { return rtp_peer_ip; }
std::string get_rtmp_url() const { return rtmp_url; }
srs_utime_t get_recv_time() const { return recv_time; }
std::string get_recv_time_str() const { return recv_time_str; }
void set_channel_id(const std::string &i) { channel_id = i; }
void set_port_mode(const std::string &p) { port_mode = p; }
@ -363,6 +370,8 @@ public:
void set_rtp_peer_ip( const std::string &p) { rtp_peer_ip = p; }
void set_rtp_peer_port( const int &s) { rtp_peer_port = s;}
void set_rtmp_url( const std::string &u) { rtmp_url = u; }
void set_recv_time( const srs_utime_t &u) { recv_time = u; }
void set_recv_time_str( const std::string &u) { recv_time_str = u; }
void copy(const SrsGb28181StreamChannel *s);
void dumps(SrsJsonObject* obj);
@ -431,7 +440,7 @@ public:
public:
void remove(SrsGb28181RtmpMuxer* conn);
void remove_sip_session(SrsGb28181SipSession* sess);
};
#endif

Loading…
Cancel
Save