Refine get_publish_1stpkt_timeout in time unit

pull/1651/head
winlin 6 years ago
parent d6828a3e58
commit 170872135d

@ -4774,10 +4774,10 @@ bool SrsConfig::get_reduce_sequence_header(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
int SrsConfig::get_publish_1stpkt_timeout(string vhost)
srs_utime_t SrsConfig::get_publish_1stpkt_timeout(string vhost)
{
// when no msg recevied for publisher, use larger timeout.
static int DEFAULT = 20000;
static srs_utime_t DEFAULT = 20 * SRS_UTIME_SECONDS;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
@ -4794,7 +4794,7 @@ int SrsConfig::get_publish_1stpkt_timeout(string vhost)
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS);
}
int SrsConfig::get_publish_normal_timeout(string vhost)

@ -805,9 +805,9 @@ public:
*/
virtual bool get_reduce_sequence_header(std::string vhost);
/**
* the 1st packet timeout in ms for encoder.
* the 1st packet timeout in srs_utime_t for encoder.
*/
virtual int get_publish_1stpkt_timeout(std::string vhost);
virtual srs_utime_t get_publish_1stpkt_timeout(std::string vhost);
/**
* the normal packet timeout in ms for encoder.
*/

@ -143,7 +143,7 @@ srs_error_t SrsConnection::set_socket_buffer(srs_utime_t buffer_v)
// 2000*3000/8=750000B(about 732KB).
// 2000*5000/8=1250000B(about 1220KB).
int kbps = 4000;
int iv = (buffer_v / SRS_UTIME_MILLISECONDS) * kbps / 8;
int iv = srsu2ms(buffer_v) * kbps / 8;
// socket send buffer, system will double it.
iv = iv / 2;
@ -161,7 +161,7 @@ srs_error_t SrsConnection::set_socket_buffer(srs_utime_t buffer_v)
return srs_error_new(ERROR_SOCKET_SNDBUF, "getsockopt fd=%d, r0=%d", fd, r0);
}
srs_trace("set fd=%d, SO_SNDBUF=%d=>%d, buffer=%dms", fd, ov, iv, buffer_v / SRS_UTIME_MILLISECONDS);
srs_trace("set fd=%d, SO_SNDBUF=%d=>%d, buffer=%dms", fd, ov, iv, srsu2ms(buffer_v));
return err;
}

@ -190,7 +190,7 @@ srs_error_t SrsMpdWriter::write(SrsFormat* format)
srs_error_t err = srs_success;
// MPD is not expired?
if (last_update_mpd != -1 && srs_get_system_time_ms() - last_update_mpd < int64_t(update_period / SRS_UTIME_MILLISECONDS)) {
if (last_update_mpd != -1 && srs_get_system_time_ms() - last_update_mpd < int64_t(srsu2ms(update_period))) {
return err;
}
last_update_mpd = srs_get_system_time_ms();
@ -266,7 +266,7 @@ srs_error_t SrsMpdWriter::get_fragment(bool video, std::string& home, std::strin
home = fragment_home;
sn = srs_update_system_time_ms() * SRS_UTIME_MILLISECONDS / fragment;
basetime = sn * fragment / SRS_UTIME_MILLISECONDS;
basetime = sn * srsu2ms(fragment);
if (video) {
file_name = "video-" + srs_int2str(sn) + ".m4s";
@ -335,7 +335,7 @@ srs_error_t SrsDashController::on_audio(SrsSharedPtrMessage* shared_audio, SrsFo
return refresh_init_mp4(shared_audio, format);
}
if (acurrent->duration() >= int64_t(fragment / SRS_UTIME_MILLISECONDS)) {
if (acurrent->duration() >= int64_t(srsu2ms(fragment))) {
if ((err = acurrent->reap(audio_dts)) != srs_success) {
return srs_error_wrap(err, "reap current");
}
@ -367,7 +367,7 @@ srs_error_t SrsDashController::on_video(SrsSharedPtrMessage* shared_video, SrsFo
return refresh_init_mp4(shared_video, format);
}
bool reopen = format->video->frame_type == SrsVideoAvcFrameTypeKeyFrame && vcurrent->duration() >= int64_t(fragment / SRS_UTIME_MILLISECONDS);
bool reopen = format->video->frame_type == SrsVideoAvcFrameTypeKeyFrame && vcurrent->duration() >= int64_t(srsu2ms(fragment));
if (reopen) {
if ((err = vcurrent->reap(video_dts)) != srs_success) {
return srs_error_wrap(err, "reap current");

@ -114,7 +114,7 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
}
srs_freep(sdk);
int64_t cto = SRS_EDGE_INGESTER_TMMS / SRS_UTIME_MILLISECONDS;
int64_t cto = srsu2ms(SRS_EDGE_INGESTER_TMMS);
int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
@ -294,7 +294,7 @@ srs_error_t SrsEdgeIngester::ingest()
SrsAutoFree(SrsPithyPrint, pprint);
// set to larger timeout to read av data from origin.
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TMMS / SRS_UTIME_MILLISECONDS);
upstream->set_recv_timeout(srsu2ms(SRS_EDGE_INGESTER_TMMS));
while (true) {
srs_error_t err = srs_success;
@ -474,7 +474,7 @@ srs_error_t SrsEdgeForwarder::start()
// open socket.
srs_freep(sdk);
int64_t cto = SRS_EDGE_FORWARDER_TMMS / SRS_UTIME_MILLISECONDS;
int64_t cto = srsu2ms(SRS_EDGE_FORWARDER_TMMS);
int64_t sto = SRS_CONSTS_RTMP_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);

@ -213,7 +213,7 @@ srs_error_t SrsForwarder::do_cycle()
}
srs_freep(sdk);
int64_t cto = SRS_FORWARDER_CIMS / SRS_UTIME_MILLISECONDS;
int64_t cto = srsu2ms(SRS_FORWARDER_CIMS);
int64_t sto = SRS_CONSTS_RTMP_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);

@ -602,7 +602,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
}
srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d",
entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, int(mw_sleep / SRS_UTIME_MILLISECONDS),
entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, srsu2msi(mw_sleep),
enc->has_cache(), msgs.max);
// TODO: free and erase the disabled entry after all related connections is closed.
@ -630,7 +630,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
if (pprint->can_print()) {
srs_trace("-> " SRS_CONSTS_LOG_HTTP_STREAM " http: got %d msgs, age=%d, min=%d, mw=%d",
count, pprint->age(), SRS_PERF_MW_MIN_MSGS, int(mw_sleep / SRS_UTIME_MILLISECONDS));
count, pprint->age(), SRS_PERF_MW_MIN_MSGS, srsu2msi(mw_sleep));
}
// sendout all messages.

@ -539,7 +539,7 @@ void SrsPublishRecvThread::set_socket_buffer(srs_utime_t sleep_v)
// 2000*3000/8=750000B(about 732KB).
// 2000*5000/8=1250000B(about 1220KB).
int kbps = 5000;
int socket_buffer_size = (sleep_v / SRS_UTIME_MILLISECONDS) * kbps / 8;
int socket_buffer_size = srsu2msi(sleep_v) * kbps / 8;
int fd = mr_fd;
int onb_rbuf = 0;
@ -554,7 +554,7 @@ void SrsPublishRecvThread::set_socket_buffer(srs_utime_t sleep_v)
getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);
srs_trace("mr change sleep %d=>%d, erbuf=%d, rbuf %d=>%d, sbytes=%d, realtime=%d",
mr_sleep / SRS_UTIME_MILLISECONDS, sleep_v / SRS_UTIME_MILLISECONDS, socket_buffer_size, onb_rbuf, nb_rbuf,
srsu2msi(mr_sleep), srsu2msi(sleep_v), socket_buffer_size, onb_rbuf, nb_rbuf,
SRS_MR_SMALL_BYTES, realtime);
rtmp->set_recv_buffer(nb_rbuf);

@ -319,9 +319,9 @@ srs_error_t SrsRtmpConn::on_reload_vhost_publish(string vhost)
return err;
}
int p1stpt = _srs_config->get_publish_1stpkt_timeout(req->vhost);
srs_utime_t p1stpt = _srs_config->get_publish_1stpkt_timeout(req->vhost);
if (p1stpt != publish_1stpkt_timeout) {
srs_trace("p1stpt changed %d=>%d", publish_1stpkt_timeout, p1stpt);
srs_trace("p1stpt changed %d=>%d", srsu2msi(publish_1stpkt_timeout), srsu2msi(p1stpt));
publish_1stpkt_timeout = p1stpt;
}
@ -646,7 +646,7 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
// Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217
SrsQueueRecvThread trd(consumer, rtmp, int(SRS_PERF_MW_SLEEP / SRS_UTIME_MILLISECONDS));
SrsQueueRecvThread trd(consumer, rtmp, srsu2msi(SRS_PERF_MW_SLEEP));
if ((err = trd.start()) != srs_success) {
return srs_error_wrap(err, "rtmp: start receive thread");
@ -693,7 +693,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
send_min_interval, int(mw_sleep / SRS_UTIME_MILLISECONDS), mw_enabled, realtime, tcp_nodelay);
send_min_interval, srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay);
while (true) {
// collect elapse for pithy print.
@ -725,10 +725,10 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// @see https://github.com/ossrs/srs/issues/257
if (realtime) {
// for realtime, min required msgs is 0, send when got one+ msgs.
consumer->wait(0, int(mw_sleep / SRS_UTIME_MILLISECONDS));
consumer->wait(0, srsu2msi(mw_sleep));
} else {
// for no-realtime, got some msgs then send.
consumer->wait(SRS_PERF_MW_MIN_MSGS, int(mw_sleep / SRS_UTIME_MILLISECONDS));
consumer->wait(SRS_PERF_MW_MIN_MSGS, srsu2msi(mw_sleep));
}
#endif
@ -745,7 +745,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
kbps->sample();
srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
(int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), int(mw_sleep / SRS_UTIME_MILLISECONDS));
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep));
}
if (count <= 0) {
@ -861,9 +861,9 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
if (true) {
bool mr = _srs_config->get_mr_enabled(req->vhost);
int mr_sleep = _srs_config->get_mr_sleep(req->vhost) / SRS_UTIME_MILLISECONDS;
srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d, rtcid=%d",
mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout, tcp_nodelay, receive_thread_cid);
mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), publish_normal_timeout, tcp_nodelay, receive_thread_cid);
}
int64_t nb_msgs = 0;
@ -879,7 +879,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
if (nb_msgs == 0) {
// when not got msgs, wait for a larger timeout.
// @see https://github.com/ossrs/srs/issues/441
rtrd->wait(publish_1stpkt_timeout);
rtrd->wait(srsu2msi(publish_1stpkt_timeout));
} else {
rtrd->wait(publish_normal_timeout);
}
@ -892,7 +892,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
// when not got any messages, timeout.
if (rtrd->nb_msgs() <= nb_msgs) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "rtmp: publish timeout %dms, nb_msgs=%d",
nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, (int)nb_msgs);
nb_msgs? publish_normal_timeout : srsu2msi(publish_1stpkt_timeout), (int)nb_msgs);
}
nb_msgs = rtrd->nb_msgs();
@ -908,10 +908,11 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
if (pprint->can_print()) {
kbps->sample();
bool mr = _srs_config->get_mr_enabled(req->vhost);
int mr_sleep = _srs_config->get_mr_sleep(req->vhost) / SRS_UTIME_MILLISECONDS;
srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d",
(int)pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout);
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),
srsu2msi(publish_1stpkt_timeout), publish_normal_timeout);
}
}

@ -118,8 +118,8 @@ private:
bool realtime;
// the minimal interval in ms for delivery stream.
double send_min_interval;
// publish 1st packet timeout in ms
int publish_1stpkt_timeout;
// publish 1st packet timeout in srs_utime_t
srs_utime_t publish_1stpkt_timeout;
// publish normal packet timeout in ms
int publish_normal_timeout;
// whether enable the tcp_nodelay.

@ -32,6 +32,10 @@ typedef uint64_t srs_utime_t;
// The time unit in ms, for example 100 * SRS_UTIME_MILLISECONDS means 100ms.
#define SRS_UTIME_MILLISECONDS 1000
// Convert srs_utime_t in ms unit.
#define srsu2ms(us) (us / SRS_UTIME_MILLISECONDS)
#define srsu2msi(us) int(us / SRS_UTIME_MILLISECONDS)
// The time unit in ms, for example 120 * SRS_UTIME_SECONDS means 120s.
#define SRS_UTIME_SECONDS 1000000

@ -287,7 +287,7 @@ void show_macro_features()
stringstream ss;
// mw(merged-write)
ss << "mw sleep:" << SRS_PERF_MW_SLEEP / SRS_UTIME_MILLISECONDS << "ms";
ss << "mw sleep:" << srsu2msi(SRS_PERF_MW_SLEEP) << "ms";
// mr(merged-read)
ss << ". mr ";
@ -296,7 +296,7 @@ void show_macro_features()
#else
ss << "enabled:off";
#endif
ss << ", default:" << SRS_PERF_MR_ENABLED << ", sleep:" << SRS_PERF_MR_SLEEP / SRS_UTIME_MILLISECONDS << "ms";
ss << ", default:" << SRS_PERF_MR_ENABLED << ", sleep:" << srsu2msi(SRS_PERF_MR_SLEEP) << "ms";
srs_trace(ss.str().c_str());
}
@ -340,10 +340,10 @@ void show_macro_features()
// others
int possible_mr_latency = 0;
#ifdef SRS_PERF_MERGED_READ
possible_mr_latency = SRS_PERF_MR_SLEEP / SRS_UTIME_MILLISECONDS;
possible_mr_latency = srsu2msi(SRS_PERF_MR_SLEEP);
#endif
srs_trace("system default latency in ms: mw(0-%d) + mr(0-%d) + play-queue(0-%d)",
SRS_PERF_MW_SLEEP / SRS_UTIME_MILLISECONDS, possible_mr_latency, SRS_PERF_PLAY_QUEUE*1000);
srsu2msi(SRS_PERF_MW_SLEEP), possible_mr_latency, SRS_PERF_PLAY_QUEUE*1000);
#ifdef SRS_AUTO_MEM_WATCH
#warning "srs memory watcher will hurts performance. user should kill by SIGTERM or init.d script."

@ -1848,10 +1848,11 @@ VOID TEST(ConfigUnitTest, CheckDefaultValues)
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF));
EXPECT_EQ(350 * SRS_UTIME_MILLISECONDS, conf.get_mr_sleep(""));
EXPECT_EQ(350 * SRS_UTIME_MILLISECONDS, conf.get_mw_sleep(""));
EXPECT_EQ(20 * SRS_UTIME_SECONDS, conf.get_publish_1stpkt_timeout(""));
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{publish{mr_latency 1000;} play{mw_latency 1000;}}"));
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{publish{mr_latency 1000; firstpkt_timeout 100;} play{mw_latency 1000;}}"));
EXPECT_EQ(1000 * SRS_UTIME_MILLISECONDS, conf.get_mr_sleep("v"));
EXPECT_EQ(1000 * SRS_UTIME_MILLISECONDS, conf.get_mw_sleep("v"));
EXPECT_EQ(100 * SRS_UTIME_MILLISECONDS, conf.get_publish_1stpkt_timeout("v"));
}
}

Loading…
Cancel
Save