fix rtmp send timeout, first key frame wait

pull/1748/head
kyxlx550 5 years ago
parent f74a398c1b
commit 28bde1d448

@ -268,7 +268,7 @@ srs_error_t SrsGb28181PsRtpProcessor::on_udp_packet(const sockaddr* from, const
} }
if (pprint->can_print()) { if (pprint->can_print()) {
srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", srs_trace("<- " SRS_CONSTS_LOG_GB28181_CASTER " gb28181: client_id %s, peer(%s, %d) ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB",
channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt.version, channel_id.c_str(), address_string, peer_port, nb_buf, pprint->age(), pkt.version,
pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc, pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc,
pkt.payload->length() pkt.payload->length()
@ -357,6 +357,7 @@ SrsPsStreamDemixer::SrsPsStreamDemixer(ISrsPsStreamHander *h, std::string id, bo
audio_enable = a; audio_enable = a;
wait_first_keyframe = k; wait_first_keyframe = k;
channel_id = id; channel_id = id;
first_keyframe_flag = false;
} }
SrsPsStreamDemixer::~SrsPsStreamDemixer() SrsPsStreamDemixer::~SrsPsStreamDemixer()
@ -404,7 +405,6 @@ int64_t SrsPsStreamDemixer::parse_ps_timestamp(const uint8_t* p)
srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_t timestamp, uint32_t ssrc) srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_t timestamp, uint32_t ssrc)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
int complete_len = 0; int complete_len = 0;
int incomplete_len = ps_size; int incomplete_len = ps_size;
char *next_ps_pack = ps_data; char *next_ps_pack = ps_data;
@ -589,8 +589,10 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_
//ts=1000 seq=4 mark=true payload= audio //ts=1000 seq=4 mark=true payload= audio
incomplete_len = ps_size - complete_len; incomplete_len = ps_size - complete_len;
complete_len = complete_len + incomplete_len; complete_len = complete_len + incomplete_len;
} }
first_keyframe_flag = false;
srs_trace("gb28181: client_id %s, unkonw ps data (%#x/%u) %02x %02x %02x %02x\n", srs_trace("gb28181: client_id %s, unkonw ps data (%#x/%u) %02x %02x %02x %02x\n",
channel_id.c_str(), ssrc, timestamp, channel_id.c_str(), ssrc, timestamp,
next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]); next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]);
@ -599,7 +601,7 @@ srs_error_t SrsPsStreamDemixer::on_ps_stream(char* ps_data, int ps_size, uint32_
} }
if (complete_len != ps_size){ if (complete_len != ps_size){
srs_trace("gb28181: client_id %s decode ps packet error (%#x/%u)! ps_size=%d complete=%d \n", srs_trace("gb28181: client_id %s decode ps packet error (%#x/%u)! ps_size=%d complete=%d \n",
channel_id.c_str(), ssrc, timestamp, ps_size, complete_len); channel_id.c_str(), ssrc, timestamp, ps_size, complete_len);
}else if (hander && video_stream.length() && can_send_ps_av_packet()) { }else if (hander && video_stream.length() && can_send_ps_av_packet()) {
if ((err = hander->on_rtp_video(&video_stream, video_pts)) != srs_success) { if ((err = hander->on_rtp_video(&video_stream, video_pts)) != srs_success) {
@ -646,6 +648,7 @@ SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c)
sip_ack_timeout = _srs_config->get_stream_caster_gb28181_ack_timeout(c); sip_ack_timeout = _srs_config->get_stream_caster_gb28181_ack_timeout(c);
sip_keepalive_timeout = _srs_config->get_stream_caster_gb28181_keepalive_timeout(c); sip_keepalive_timeout = _srs_config->get_stream_caster_gb28181_keepalive_timeout(c);
sip_invite_port_fixed = _srs_config->get_stream_caster_gb28181_sip_invite_port_fixed(c); sip_invite_port_fixed = _srs_config->get_stream_caster_gb28181_sip_invite_port_fixed(c);
sip_query_catalog_interval = _srs_config->get_stream_caster_gb28181_sip_query_catalog_interval(c);
} }
SrsGb28181Config::~SrsGb28181Config() SrsGb28181Config::~SrsGb28181Config()
@ -653,7 +656,6 @@ SrsGb28181Config::~SrsGb28181Config()
} }
//SrsGb28181RtmpMuxer gb28181 rtmp muxer, process ps stream to rtmp //SrsGb28181RtmpMuxer gb28181 rtmp muxer, process ps stream to rtmp
SrsGb28181RtmpMuxer::SrsGb28181RtmpMuxer(SrsGb28181Manger* c, std::string id, bool a, bool k) SrsGb28181RtmpMuxer::SrsGb28181RtmpMuxer(SrsGb28181Manger* c, std::string id, bool a, bool k)
{ {
@ -675,7 +677,8 @@ SrsGb28181RtmpMuxer::SrsGb28181RtmpMuxer(SrsGb28181Manger* c, std::string id, bo
wait_ps_queue = srs_cond_new(); wait_ps_queue = srs_cond_new();
stream_idle_timeout = -1; stream_idle_timeout = -1;
recv_stream_time = 0; recv_rtp_stream_time = 0;
send_rtmp_stream_time = 0;
_rtmp_url = ""; _rtmp_url = "";
@ -768,7 +771,7 @@ std::string SrsGb28181RtmpMuxer::rtmp_url()
srs_utime_t SrsGb28181RtmpMuxer::get_recv_stream_time() srs_utime_t SrsGb28181RtmpMuxer::get_recv_stream_time()
{ {
return recv_stream_time; return recv_rtp_stream_time;
} }
@ -785,7 +788,8 @@ void SrsGb28181RtmpMuxer::destroy()
srs_error_t SrsGb28181RtmpMuxer::do_cycle() srs_error_t SrsGb28181RtmpMuxer::do_cycle()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
recv_stream_time = srs_get_system_time(); recv_rtp_stream_time = srs_get_system_time();
send_rtmp_stream_time = srs_get_system_time();
//consume ps stream, and check status //consume ps stream, and check status
while (true) { while (true) {
@ -819,7 +823,7 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle()
} }
srs_utime_t now = srs_get_system_time(); srs_utime_t now = srs_get_system_time();
srs_utime_t duration = now - recv_stream_time; srs_utime_t duration = now - recv_rtp_stream_time;
//if no RTP data is received within 2 seconds, //if no RTP data is received within 2 seconds,
//the peer-port and peer-ip will be cleared and //the peer-port and peer-ip will be cleared and
@ -838,6 +842,21 @@ srs_error_t SrsGb28181RtmpMuxer::do_cycle()
break; break;
} }
//RTMP connection is about to timeout without receiving any data.,
//waiting for the next time there is data automatically connected
//it is related to the following two parameter settings of the rtmp server
//the publish 1st packet timeout in srs_utime_t
//publish_1stpkt_timeout default 20000ms
//the publish normal packet timeout in srs_utime_t
//publish_normal_timeout default 5000ms
duration = now - send_rtmp_stream_time;
bool will_timeout = duration > (5 * SRS_UTIME_SECONDS);
if (will_timeout && sdk){
srs_warn("gb28181: client id=%s RTMP connection is about to time out without receiving any data",
channel_id.c_str());
rtmp_close();
}
if (ps_queue.empty()){ if (ps_queue.empty()){
srs_cond_timedwait(wait_ps_queue, 200 * SRS_UTIME_MILLISECONDS); srs_cond_timedwait(wait_ps_queue, 200 * SRS_UTIME_MILLISECONDS);
}else { }else {
@ -862,7 +881,7 @@ void SrsGb28181RtmpMuxer::ps_packet_enqueue(SrsPsRtpPacket *pkt)
{ {
srs_assert(pkt); srs_assert(pkt);
recv_stream_time = srs_get_system_time(); recv_rtp_stream_time = srs_get_system_time();
//prevent consumers from being unable to process data //prevent consumers from being unable to process data
//and accumulating in the queue //and accumulating in the queue
@ -923,7 +942,7 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_video(SrsSimpleStream *stream, int64_t f
uint32_t pts = (uint32_t)(fpts / 90); uint32_t pts = (uint32_t)(fpts / 90);
srs_info("gb28181rtmpmuxer: on_rtp_video dts=%u", dts); srs_info("gb28181rtmpmuxer: on_rtp_video dts=%u", dts);
recv_stream_time = srs_get_system_time();
SrsBuffer *avs = new SrsBuffer(stream->bytes(), stream->length()); SrsBuffer *avs = new SrsBuffer(stream->bytes(), stream->length());
SrsAutoFree(SrsBuffer, avs); SrsAutoFree(SrsBuffer, avs);
@ -1010,8 +1029,6 @@ srs_error_t SrsGb28181RtmpMuxer::on_rtp_audio(SrsSimpleStream* stream, int64_t f
return srs_error_wrap(err, "jitter"); return srs_error_wrap(err, "jitter");
} }
recv_stream_time = srs_get_system_time();
uint32_t dts = (uint32_t)(fdts / 90); uint32_t dts = (uint32_t)(fdts / 90);
// send each frame. // send each frame.
@ -1167,6 +1184,8 @@ srs_error_t SrsGb28181RtmpMuxer::rtmp_write_packet(char type, uint32_t timestamp
SrsSharedPtrMessage* msg = NULL; SrsSharedPtrMessage* msg = NULL;
send_rtmp_stream_time = srs_get_system_time();
if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) { if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) {
return srs_error_wrap(err, "create message"); return srs_error_wrap(err, "create message");
} }
@ -1199,6 +1218,7 @@ srs_error_t SrsGb28181RtmpMuxer::connect()
sdk = new SrsSimpleRtmpClient(url, cto, sto); sdk = new SrsSimpleRtmpClient(url, cto, sto);
srs_trace("gb28181: rtmp connect url=%s", url.c_str()); srs_trace("gb28181: rtmp connect url=%s", url.c_str());
if ((err = sdk->connect()) != srs_success) { if ((err = sdk->connect()) != srs_success) {
close(); close();
return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
@ -1368,8 +1388,8 @@ uint32_t SrsGb28181Manger::generate_ssrc(std::string id)
{ {
srand(uint(time(0))); srand(uint(time(0)));
// TODO: SSRC rules can be customized, // TODO: SSRC rules can be customized,
//gb28281 live ssrc max value 0999999999(3B9AC9FF) //gb28181 live ssrc max value 0999999999(3B9AC9FF)
//gb28281 vod ssrc max value 1999999999(773593FF) //gb28181 vod ssrc max value 1999999999(773593FF)
uint8_t index = uint8_t(rand() % (0x0F - 0x01 + 1) + 0x01); uint8_t index = uint8_t(rand() % (0x0F - 0x01 + 1) + 0x01);
uint32_t ssrc = 0x2FFFF00 & (hash_code(id) << 8) | index; uint32_t ssrc = 0x2FFFF00 & (hash_code(id) << 8) | index;
//uint32_t ssrc = 0x00FFFFFF & (hash_code(id)); //uint32_t ssrc = 0x00FFFFFF & (hash_code(id));
@ -1660,7 +1680,7 @@ uint32_t SrsGb28181Manger::delete_stream_channel(std::string id)
{ {
//notify the device to stop streaming //notify the device to stop streaming
//if an internal sip service controlled channel //if an internal sip service controlled channel
notify_sip_bye(id); notify_sip_bye(id, id);
SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id); SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id);
if (muxer){ if (muxer){
@ -1673,7 +1693,7 @@ uint32_t SrsGb28181Manger::delete_stream_channel(std::string id)
} }
uint32_t SrsGb28181Manger::queue_stream_channel(std::string id, SrsJsonArray* arr) uint32_t SrsGb28181Manger::query_stream_channel(std::string id, SrsJsonArray* arr)
{ {
if (!id.empty()){ if (!id.empty()){
SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id); SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id);
@ -1696,21 +1716,22 @@ uint32_t SrsGb28181Manger::queue_stream_channel(std::string id, SrsJsonArray* ar
return ERROR_SUCCESS; return ERROR_SUCCESS;
} }
uint32_t SrsGb28181Manger::notify_sip_invite(std::string id, std::string ip, int port, uint32_t ssrc) uint32_t SrsGb28181Manger::notify_sip_invite(std::string id, std::string ip, int port, uint32_t ssrc, std::string chid)
{ {
if (!sip_service){ if (!sip_service){
return ERROR_GB28181_SIP_NOT_RUN; return ERROR_GB28181_SIP_NOT_RUN;
} }
//if RTMP Muxer does not exist, you need to create //if RTMP Muxer does not exist, you need to create
SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id); std::string key = id+"@"+chid;
SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(key);
if (!muxer){ if (!muxer){
//if there is an invalid parameter, the channel will be created automatically //if there is an invalid parameter, the channel will be created automatically
if (ip.empty() || port == 0 || ssrc == 0){ if (ip.empty() || port == 0 || ssrc == 0){
//channel not exist //channel not exist
SrsGb28181StreamChannel channel; SrsGb28181StreamChannel channel;
channel.set_channel_id(id); channel.set_channel_id(key);
int code = create_stream_channel(&channel); int code = create_stream_channel(&channel);
if (code != ERROR_SUCCESS){ if (code != ERROR_SUCCESS){
return code; return code;
@ -1730,24 +1751,18 @@ uint32_t SrsGb28181Manger::notify_sip_invite(std::string id, std::string ip, int
SrsSipRequest req; SrsSipRequest req;
req.sip_auth_id = id; req.sip_auth_id = id;
return sip_service->send_invite(&req, ip, port, ssrc); return sip_service->send_invite(&req, ip, port, ssrc, chid);
} }
uint32_t SrsGb28181Manger::notify_sip_bye(std::string id) uint32_t SrsGb28181Manger::notify_sip_bye(std::string id, std::string chid)
{ {
if (!sip_service){ if (!sip_service){
return ERROR_GB28181_SIP_NOT_RUN; return ERROR_GB28181_SIP_NOT_RUN;
} }
SrsGb28181RtmpMuxer *muxer = fetch_rtmpmuxer(id);
if (muxer){
muxer->rtmp_close();
}
SrsSipRequest req; SrsSipRequest req;
req.sip_auth_id = id; req.sip_auth_id = id;
return sip_service->send_bye(&req); return sip_service->send_bye(&req, chid);
} }
uint32_t SrsGb28181Manger::notify_sip_raw_data(std::string id, std::string data) uint32_t SrsGb28181Manger::notify_sip_raw_data(std::string id, std::string data)
@ -1772,3 +1787,23 @@ uint32_t SrsGb28181Manger::notify_sip_unregister(std::string id)
sip_service->remove_session(id); sip_service->remove_session(id);
return ERROR_SUCCESS; return ERROR_SUCCESS;
} }
uint32_t SrsGb28181Manger::notify_sip_query_catalog(std::string id)
{
if (!sip_service){
return ERROR_GB28181_SIP_NOT_RUN;
}
SrsSipRequest req;
req.sip_auth_id = id;
return sip_service->send_query_catalog(&req);
}
uint32_t SrsGb28181Manger::query_sip_session(std::string id, SrsJsonArray* arr)
{
if (!sip_service){
return ERROR_GB28181_SIP_NOT_RUN;
}
return sip_service->query_sip_session(id, arr);
}

@ -218,7 +218,8 @@ private:
SrsPithyPrint* pprint; SrsPithyPrint* pprint;
SrsGb28181StreamChannel *channel; SrsGb28181StreamChannel *channel;
int stream_idle_timeout; int stream_idle_timeout;
srs_utime_t recv_stream_time; srs_utime_t recv_rtp_stream_time;
srs_utime_t send_rtmp_stream_time;
private: private:
std::string channel_id; std::string channel_id;
std::string _rtmp_url; std::string _rtmp_url;
@ -313,6 +314,7 @@ public:
srs_utime_t sip_keepalive_timeout; srs_utime_t sip_keepalive_timeout;
bool sip_auto_play; bool sip_auto_play;
bool sip_invite_port_fixed; bool sip_invite_port_fixed;
srs_utime_t sip_query_catalog_interval;
public: public:
SrsGb28181Config(SrsConfDirective* c); SrsGb28181Config(SrsConfDirective* c);
@ -392,9 +394,7 @@ private:
std::map<uint32_t, SrsGb28181RtmpMuxer*> rtmpmuxers_ssrc; std::map<uint32_t, SrsGb28181RtmpMuxer*> rtmpmuxers_ssrc;
std::map<std::string, SrsGb28181RtmpMuxer*> rtmpmuxers; std::map<std::string, SrsGb28181RtmpMuxer*> rtmpmuxers;
SrsCoroutineManager* manager; SrsCoroutineManager* manager;
SrsGb28181SipService* sip_service; SrsGb28181SipService* sip_service;
public: public:
SrsGb28181Manger(SrsConfDirective* c); SrsGb28181Manger(SrsConfDirective* c);
virtual ~SrsGb28181Manger(); virtual ~SrsGb28181Manger();
@ -415,12 +415,14 @@ public:
//stream channel api //stream channel api
uint32_t create_stream_channel(SrsGb28181StreamChannel *channel); uint32_t create_stream_channel(SrsGb28181StreamChannel *channel);
uint32_t delete_stream_channel(std::string id); uint32_t delete_stream_channel(std::string id);
uint32_t queue_stream_channel(std::string id, SrsJsonArray* arr); uint32_t query_stream_channel(std::string id, SrsJsonArray* arr);
//sip api //sip api
uint32_t notify_sip_invite(std::string id, std::string ip, int port, uint32_t ssrc); uint32_t notify_sip_invite(std::string id, std::string ip, int port, uint32_t ssrc, std::string chid);
uint32_t notify_sip_bye(std::string id); uint32_t notify_sip_bye(std::string id, std::string chid);
uint32_t notify_sip_raw_data(std::string id, std::string data); uint32_t notify_sip_raw_data(std::string id, std::string data);
uint32_t notify_sip_unregister(std::string id); uint32_t notify_sip_unregister(std::string id);
uint32_t notify_sip_query_catalog(std::string id);
uint32_t query_sip_session(std::string id, SrsJsonArray* arr);
private: private:
void destroy(); void destroy();

Loading…
Cancel
Save