Merge branch 3.0release into develop

min
winlin 6 years ago
commit cf00bb7606

@ -154,6 +154,8 @@ Please select according to languages:
- [ ] Support H.265 by pushing H.265 over RTMP, deliverying in HLS, read [#465][bug #465].
- [ ] Support HLS+, the HLS edge server, please read [#466][bug #466] and [#468][bug #468].
- [ ] Support UDP protocol such as QUIC or KCP in cluster.
- [ ] Support H.264+Opus codec for WebRTC.
- [ ] Support publishing stream by WebRTC.
> Remark: About the milestone and product plan, please read ([CN][v1_CN_Product], [EN][v1_EN_Product]) wiki.
@ -162,6 +164,7 @@ Please select according to languages:
### V3 changes
* v3.0, 2019-04-22, Refine in time unit. 3.0.49
* v3.0, 2019-04-07, Cover ST Coroutine and time unit. 3.0.48
* v3.0, 2019-04-06, Merge [#1304][bug #1304], Fix ST coroutine pull error. 3.0.47
* v3.0, 2019-04-05, Merge [#1339][bug #1339], Support HTTP-FLV params. 3.0.46

@ -180,8 +180,8 @@ srs_error_t SrsBandwidth::do_bandwidth_check(SrsKbpsLimit* limit)
SrsBandwidthSample publish_sample;
// timeout for a packet.
_rtmp->set_send_timeout(play_sample.duration_ms * 2);
_rtmp->set_recv_timeout(publish_sample.duration_ms * 2);
_rtmp->set_send_timeout(play_sample.duration_ms * 2 * SRS_UTIME_MILLISECONDS);
_rtmp->set_recv_timeout(publish_sample.duration_ms * 2 * SRS_UTIME_MILLISECONDS);
// start test.
srs_utime_t start_time = srs_update_system_time();

@ -192,12 +192,12 @@ srs_error_t SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecod
srs_freep(sdk);
int64_t cto = srsu2ms(SRS_CONSTS_RTMP_TIMEOUT);
int64_t sto = srsu2ms(SRS_CONSTS_RTMP_PULSE);
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
sdk = new SrsSimpleRtmpClient(output, cto, sto);
if ((err = sdk->connect()) != srs_success) {
return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto);
return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", output.c_str(), srsu2msi(cto), srsu2msi(sto));
}
if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {

@ -4459,9 +4459,9 @@ bool SrsConfig::get_mix_correct(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
double SrsConfig::get_queue_length(string vhost)
srs_utime_t SrsConfig::get_queue_length(string vhost)
{
static double DEFAULT = SRS_PERF_PLAY_QUEUE;
static srs_utime_t DEFAULT = SRS_PERF_PLAY_QUEUE;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
@ -4478,7 +4478,7 @@ double SrsConfig::get_queue_length(string vhost)
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
return srs_utime_t(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}
bool SrsConfig::get_refer_enabled(string vhost)
@ -4730,9 +4730,9 @@ bool SrsConfig::get_tcp_nodelay(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
double SrsConfig::get_send_min_interval(string vhost)
srs_utime_t SrsConfig::get_send_min_interval(string vhost)
{
static double DEFAULT = 0.0;
static srs_utime_t DEFAULT = 0;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
@ -4749,7 +4749,7 @@ double SrsConfig::get_send_min_interval(string vhost)
return DEFAULT;
}
return ::atof(conf->arg0().c_str());
return srs_utime_t(::atof(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS);
}
bool SrsConfig::get_reduce_sequence_header(string vhost)
@ -6093,9 +6093,9 @@ bool SrsConfig::get_hls_ts_floor(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
double SrsConfig::get_hls_fragment(string vhost)
srs_utime_t SrsConfig::get_hls_fragment(string vhost)
{
static double DEFAULT = 10;
static srs_utime_t DEFAULT = 10 * SRS_UTIME_SECONDS;
SrsConfDirective* conf = get_hls(vhost);
if (!conf) {
@ -6107,7 +6107,7 @@ double SrsConfig::get_hls_fragment(string vhost)
return DEFAULT;
}
return ::atof(conf->arg0().c_str());
return srs_utime_t(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}
double SrsConfig::get_hls_td_ratio(string vhost)
@ -6144,9 +6144,9 @@ double SrsConfig::get_hls_aof_ratio(string vhost)
return ::atof(conf->arg0().c_str());
}
double SrsConfig::get_hls_window(string vhost)
srs_utime_t SrsConfig::get_hls_window(string vhost)
{
static double DEFAULT = 60;
static srs_utime_t DEFAULT = (60 * SRS_UTIME_SECONDS);
SrsConfDirective* conf = get_hls(vhost);
if (!conf) {
@ -6158,7 +6158,7 @@ double SrsConfig::get_hls_window(string vhost)
return DEFAULT;
}
return ::atof(conf->arg0().c_str());
return srs_utime_t(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}
string SrsConfig::get_hls_on_error(string vhost)
@ -6413,9 +6413,9 @@ string SrsConfig::get_hds_path(const string &vhost)
return conf->arg0();
}
double SrsConfig::get_hds_fragment(const string &vhost)
srs_utime_t SrsConfig::get_hds_fragment(const string &vhost)
{
static double DEFAULT = 10;
static srs_utime_t DEFAULT = (10 * SRS_UTIME_SECONDS);
SrsConfDirective* conf = get_hds(vhost);
if (!conf) {
@ -6427,12 +6427,12 @@ double SrsConfig::get_hds_fragment(const string &vhost)
return DEFAULT;
}
return ::atof(conf->arg0().c_str());
return srs_utime_t(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}
double SrsConfig::get_hds_window(const string &vhost)
srs_utime_t SrsConfig::get_hds_window(const string &vhost)
{
static double DEFAULT = 60;
static srs_utime_t DEFAULT = (60 * SRS_UTIME_SECONDS);
SrsConfDirective* conf = get_hds(vhost);
if (!conf) {
@ -6444,7 +6444,7 @@ double SrsConfig::get_hds_window(const string &vhost)
return DEFAULT;
}
return ::atof(conf->arg0().c_str());
return srs_utime_t(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}
SrsConfDirective* SrsConfig::get_dvr(string vhost)
@ -6882,9 +6882,9 @@ bool SrsConfig::get_vhost_http_remux_enabled(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
double SrsConfig::get_vhost_http_remux_fast_cache(string vhost)
srs_utime_t SrsConfig::get_vhost_http_remux_fast_cache(string vhost)
{
static double DEFAULT = 0;
static srs_utime_t DEFAULT = 0;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
@ -6901,7 +6901,7 @@ double SrsConfig::get_vhost_http_remux_fast_cache(string vhost)
return DEFAULT;
}
return ::atof(conf->arg0().c_str());
return srs_utime_t(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}
string SrsConfig::get_vhost_http_remux_mount(string vhost)

@ -729,11 +729,11 @@ public:
*/
virtual bool get_mix_correct(std::string vhost);
/**
* get the cache queue length, in seconds.
* get the cache queue length, in srs_utime_t.
* when exceed the queue length, drop packet util I frame.
* @remark, default 10.
* @remark, default 10s.
*/
virtual double get_queue_length(std::string vhost);
virtual srs_utime_t get_queue_length(std::string vhost);
/**
* whether the refer hotlink-denial enabled.
*/
@ -796,9 +796,9 @@ public:
*/
virtual bool get_tcp_nodelay(std::string vhost);
/**
* the minimal send interval in ms.
* the minimal send interval in srs_utime_t.
*/
virtual double get_send_min_interval(std::string vhost);
virtual srs_utime_t get_send_min_interval(std::string vhost);
/**
* whether reduce the sequence header.
*/
@ -1192,9 +1192,9 @@ public:
*/
virtual bool get_hls_ts_floor(std::string vhost);
/**
* get the hls fragment time, in seconds.
* get the hls fragment time, in srs_utime_t.
*/
virtual double get_hls_fragment(std::string vhost);
virtual srs_utime_t get_hls_fragment(std::string vhost);
/**
* get the hls td(target duration) ratio.
*/
@ -1204,11 +1204,11 @@ public:
*/
virtual double get_hls_aof_ratio(std::string vhost);
/**
* get the hls window time, in seconds.
* get the hls window time, in srs_utime_t.
* a window is a set of ts, the ts collection in m3u8.
* @remark SRS will delete the ts exceed the window.
*/
virtual double get_hls_window(std::string vhost);
virtual srs_utime_t get_hls_window(std::string vhost);
/**
* get the hls hls_on_error config.
* the ignore will ignore error and disable hls.
@ -1277,15 +1277,14 @@ public:
*/
virtual std::string get_hds_path(const std::string &vhost);
/**
* get the hds fragment time, in seconds.
* get the hds fragment time, in srs_utime_t.
*/
// TODO: FIXME: Refine to time unit.
virtual double get_hds_fragment(const std::string &vhost);
virtual srs_utime_t get_hds_fragment(const std::string &vhost);
/**
* get the hds window time, in seconds.
* get the hds window time, in srs_utime_t.
* a window is a set of hds fragments.
*/
virtual double get_hds_window(const std::string &vhost);
virtual srs_utime_t get_hds_window(const std::string &vhost);
// dvr section
private:
/**
@ -1405,7 +1404,7 @@ public:
/**
* get the fast cache duration for http audio live stream.
*/
virtual double get_vhost_http_remux_fast_cache(std::string vhost);
virtual srs_utime_t get_vhost_http_remux_fast_cache(std::string vhost);
/**
* get the http flv live stream mount point for vhost.
* used to generate the flv stream mount path.

@ -102,7 +102,7 @@ public:
virtual srs_error_t start();
// Set socket option TCP_NODELAY.
virtual srs_error_t set_tcp_nodelay(bool v);
// Set socket option SO_SNDBUF in ms.
// Set socket option SO_SNDBUF in srs_utime_t.
virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v);
// interface ISrsOneCycleThreadHandler
public:

@ -88,7 +88,7 @@ srs_error_t SrsFragmentedMp4::initialize(SrsRequest* r, bool video, SrsMpdWriter
string file_home;
string file_name;
int64_t sequence_number;
uint64_t basetime;
srs_utime_t basetime;
if ((err = mpd->get_fragment(video, file_home, file_name, sequence_number, basetime)) != srs_success) {
return srs_error_wrap(err, "get fragment");
}
@ -172,19 +172,31 @@ SrsMpdWriter::~SrsMpdWriter()
srs_error_t SrsMpdWriter::initialize(SrsRequest* r)
{
req = r;
return srs_success;
}
srs_error_t SrsMpdWriter::on_publish()
{
SrsRequest* r = req;
fragment = _srs_config->get_dash_fragment(r->vhost);
update_period = _srs_config->get_dash_update_period(r->vhost);
timeshit = _srs_config->get_dash_timeshift(r->vhost);
home = _srs_config->get_dash_path(r->vhost);
mpd_file = _srs_config->get_dash_mpd_file(r->vhost);
string mpd_path = srs_path_build_stream(mpd_file, req->vhost, req->app, req->stream);
fragment_home = srs_path_dirname(mpd_path) + "/" + req->stream;
srs_trace("DASH: Config fragment=%" PRId64 ", period=%" PRId64, fragment, update_period);
return srs_success;
}
void SrsMpdWriter::on_unpublish()
{
}
srs_error_t SrsMpdWriter::write(SrsFormat* format)
{
srs_error_t err = srs_success;
@ -259,14 +271,14 @@ srs_error_t SrsMpdWriter::write(SrsFormat* format)
return err;
}
srs_error_t SrsMpdWriter::get_fragment(bool video, std::string& home, std::string& file_name, int64_t& sn, uint64_t& basetime)
srs_error_t SrsMpdWriter::get_fragment(bool video, std::string& home, std::string& file_name, int64_t& sn, srs_utime_t& basetime)
{
srs_error_t err = srs_success;
home = fragment_home;
sn = srs_update_system_time() / fragment;
basetime = sn * srsu2ms(fragment);
basetime = sn * fragment;
if (video) {
file_name = "video-" + srs_int2str(sn) + ".m4s";
@ -303,30 +315,50 @@ srs_error_t SrsDashController::initialize(SrsRequest* r)
srs_error_t err = srs_success;
req = r;
fragment = _srs_config->get_dash_fragment(r->vhost);
home = _srs_config->get_dash_path(r->vhost);
if ((err = mpd->initialize(r)) != srs_success) {
return srs_error_wrap(err, "mpd");
}
string home, path;
return err;
}
srs_error_t SrsDashController::on_publish()
{
srs_error_t err = srs_success;
SrsRequest* r = req;
fragment = _srs_config->get_dash_fragment(r->vhost);
home = _srs_config->get_dash_path(r->vhost);
srs_freep(vcurrent);
vcurrent = new SrsFragmentedMp4();
if ((err = vcurrent->initialize(req, true, mpd, video_tack_id)) != srs_success) {
return srs_error_wrap(err, "video fragment");
}
srs_freep(acurrent);
acurrent = new SrsFragmentedMp4();
if ((err = acurrent->initialize(req, false, mpd, audio_track_id)) != srs_success) {
return srs_error_wrap(err, "audio fragment");
}
if ((err = mpd->on_publish()) != srs_success) {
return srs_error_wrap(err, "mpd");
}
return err;
}
void SrsDashController::on_unpublish()
{
mpd->on_unpublish();
srs_freep(vcurrent);
srs_freep(acurrent);
}
srs_error_t SrsDashController::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format)
{
srs_error_t err = srs_success;
@ -490,6 +522,10 @@ srs_error_t SrsDash::on_publish()
return err;
}
enabled = true;
if ((err = controller->on_publish()) != srs_success) {
return srs_error_wrap(err, "controller");
}
return err;
}
@ -532,5 +568,7 @@ void SrsDash::on_unpublish()
}
enabled = false;
controller->on_unpublish();
}

@ -103,12 +103,14 @@ public:
virtual ~SrsMpdWriter();
public:
virtual srs_error_t initialize(SrsRequest* r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
// Write MPD according to parsed format of stream.
virtual srs_error_t write(SrsFormat* format);
public:
// Get the fragment relative home and filename.
// The basetime is the absolute time in ms, while the sn(sequence number) is basetime/fragment.
virtual srs_error_t get_fragment(bool video, std::string& home, std::string& filename, int64_t& sn, uint64_t& basetime);
// The basetime is the absolute time in srs_utime_t, while the sn(sequence number) is basetime/fragment.
virtual srs_error_t get_fragment(bool video, std::string& home, std::string& filename, int64_t& sn, srs_utime_t& basetime);
};
/**
@ -127,7 +129,7 @@ private:
uint64_t audio_dts;
uint64_t video_dts;
private:
// The fragment duration in ms to reap it.
// The fragment duration in srs_utime_t to reap it.
srs_utime_t fragment;
private:
std::string home;
@ -138,6 +140,8 @@ public:
virtual ~SrsDashController();
public:
virtual srs_error_t initialize(SrsRequest* r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format);
virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format);
private:

@ -48,10 +48,10 @@ using namespace std;
#include <srs_app_rtmp_conn.hpp>
// when edge timeout, retry next.
#define SRS_EDGE_INGESTER_TMMS (5 * SRS_UTIME_MILLISECONDS)
#define SRS_EDGE_INGESTER_TIMEOUT (5 * SRS_UTIME_SECONDS)
// when edge error, wait for quit
#define SRS_EDGE_FORWARDER_TMMS (150 * SRS_UTIME_MILLISECONDS)
#define SRS_EDGE_FORWARDER_TIMEOUT (150 * SRS_UTIME_MILLISECONDS)
SrsEdgeUpstream::SrsEdgeUpstream()
{
@ -114,12 +114,12 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
}
srs_freep(sdk);
int64_t cto = srsu2ms(SRS_EDGE_INGESTER_TMMS);
int64_t sto = srsu2ms(SRS_CONSTS_RTMP_PULSE);
srs_utime_t cto = SRS_EDGE_INGESTER_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((err = sdk->connect()) != srs_success) {
return srs_error_wrap(err, "edge pull %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
return srs_error_wrap(err, "edge pull %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
}
if ((err = sdk->play(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
@ -144,7 +144,7 @@ void SrsEdgeRtmpUpstream::close()
srs_freep(sdk);
}
void SrsEdgeRtmpUpstream::set_recv_timeout(int64_t tm)
void SrsEdgeRtmpUpstream::set_recv_timeout(srs_utime_t tm)
{
sdk->set_recv_timeout(tm);
}
@ -218,7 +218,7 @@ string SrsEdgeIngester::get_curr_origin()
}
// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_INGESTER_CIMS (3 * SRS_UTIME_MILLISECONDS)
#define SRS_EDGE_INGESTER_CIMS (3 * SRS_UTIME_SECONDS)
srs_error_t SrsEdgeIngester::cycle()
{
@ -294,7 +294,7 @@ srs_error_t SrsEdgeIngester::ingest()
SrsAutoFree(SrsPithyPrint, pprint);
// set to larger timeout to read av data from origin.
upstream->set_recv_timeout(srsu2ms(SRS_EDGE_INGESTER_TMMS));
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
while (true) {
srs_error_t err = srs_success;
@ -433,7 +433,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder()
srs_freep(queue);
}
void SrsEdgeForwarder::set_queue_size(double queue_size)
void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size)
{
return queue->set_queue_size(queue_size);
}
@ -474,13 +474,12 @@ srs_error_t SrsEdgeForwarder::start()
// open socket.
srs_freep(sdk);
// TODO: FIXME: Should switch cto with sto?
int64_t cto = srsu2ms(SRS_EDGE_FORWARDER_TMMS);
int64_t sto = srsu2ms(SRS_CONSTS_RTMP_TIMEOUT);
srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((err = sdk->connect()) != srs_success) {
return srs_error_wrap(err, "sdk connect %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
}
if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
@ -488,7 +487,7 @@ srs_error_t SrsEdgeForwarder::start()
}
srs_freep(trd);
trd = new SrsSTCoroutine("edge-fwr", this);
trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
@ -506,7 +505,7 @@ void SrsEdgeForwarder::stop()
}
// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_FORWARDER_CIMS (3 * SRS_UTIME_MILLISECONDS)
#define SRS_EDGE_FORWARDER_CIMS (3 * SRS_UTIME_SECONDS)
srs_error_t SrsEdgeForwarder::cycle()
{
@ -533,7 +532,7 @@ srs_error_t SrsEdgeForwarder::do_cycle()
{
srs_error_t err = srs_success;
sdk->set_recv_timeout(srsu2ms(SRS_CONSTS_RTMP_PULSE));
sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE);
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
@ -546,7 +545,7 @@ srs_error_t SrsEdgeForwarder::do_cycle()
}
if (send_error_code != ERROR_SUCCESS) {
srs_usleep(SRS_EDGE_FORWARDER_TMMS);
srs_usleep(SRS_EDGE_FORWARDER_TIMEOUT);
continue;
}
@ -710,7 +709,7 @@ SrsPublishEdge::~SrsPublishEdge()
srs_freep(forwarder);
}
void SrsPublishEdge::set_queue_size(double queue_size)
void SrsPublishEdge::set_queue_size(srs_utime_t queue_size)
{
return forwarder->set_queue_size(queue_size);
}

@ -86,7 +86,7 @@ public:
virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) = 0;
virtual void close() = 0;
public:
virtual void set_recv_timeout(int64_t tm) = 0;
virtual void set_recv_timeout(srs_utime_t tm) = 0;
virtual void kbps_sample(const char* label, int64_t age) = 0;
};
@ -107,7 +107,7 @@ public:
virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual void close();
public:
virtual void set_recv_timeout(int64_t tm);
virtual void set_recv_timeout(srs_utime_t tm);
virtual void kbps_sample(const char* label, int64_t age);
};
@ -170,7 +170,7 @@ public:
SrsEdgeForwarder();
virtual ~SrsEdgeForwarder();
public:
virtual void set_queue_size(double queue_size);
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r);
virtual srs_error_t start();
@ -232,7 +232,7 @@ public:
SrsPublishEdge();
virtual ~SrsPublishEdge();
public:
virtual void set_queue_size(double queue_size);
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
virtual bool can_publish();

@ -87,7 +87,7 @@ void SrsEncoder::on_unpublish()
}
// when error, encoder sleep for a while and retry.
#define SRS_RTMP_ENCODER_CIMS (3000 * SRS_UTIME_MILLISECONDS)
#define SRS_RTMP_ENCODER_CIMS (3 * SRS_UTIME_SECONDS)
srs_error_t SrsEncoder::cycle()
{

@ -85,7 +85,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
return err;
}
void SrsForwarder::set_queue_size(double queue_size)
void SrsForwarder::set_queue_size(srs_utime_t queue_size)
{
queue->set_queue_size(queue_size);
}
@ -174,7 +174,7 @@ srs_error_t SrsForwarder::on_video(SrsSharedPtrMessage* shared_video)
}
// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_CIMS (3000 * SRS_UTIME_MILLISECONDS)
#define SRS_FORWARDER_CIMS (3 * SRS_UTIME_SECONDS)
srs_error_t SrsForwarder::cycle()
{
@ -213,13 +213,12 @@ srs_error_t SrsForwarder::do_cycle()
}
srs_freep(sdk);
// TODO: FIXME: Should switch cto with sto?
int64_t cto = srsu2ms(SRS_FORWARDER_CIMS);
int64_t sto = srsu2ms(SRS_CONSTS_RTMP_TIMEOUT);
srs_utime_t cto = SRS_FORWARDER_CIMS;
srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((err = sdk->connect()) != srs_success) {
return srs_error_wrap(err, "sdk connect url=%s, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
return srs_error_wrap(err, "sdk connect url=%s, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
}
if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
@ -242,7 +241,7 @@ srs_error_t SrsForwarder::forward()
{
srs_error_t err = srs_success;
sdk->set_recv_timeout(srsu2ms(SRS_CONSTS_RTMP_PULSE));
sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE);
SrsPithyPrint* pprint = SrsPithyPrint::create_forwarder();
SrsAutoFree(SrsPithyPrint, pprint);

@ -71,7 +71,7 @@ public:
virtual ~SrsForwarder();
public:
virtual srs_error_t initialize(SrsRequest* r, std::string ep);
virtual void set_queue_size(double queue_size);
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t on_publish();
virtual void on_unpublish();

@ -52,13 +52,15 @@ void SrsFragment::append(int64_t dts)
dts = 0;
}
srs_utime_t dts_in_tbn = dts * SRS_UTIME_MILLISECONDS;
if (start_dts == -1) {
start_dts = dts;
start_dts = dts_in_tbn;
}
// TODO: FIXME: Use cumulus dts.
start_dts = srs_min(start_dts, dts);
dur = srs_utime_t(dts - start_dts) * SRS_UTIME_MILLISECONDS;
start_dts = srs_min(start_dts, dts_in_tbn);
dur = dts_in_tbn - start_dts;
}
srs_utime_t SrsFragment::duration()
@ -141,9 +143,12 @@ srs_error_t SrsFragment::rename()
ss << tempdur;
full_path = srs_string_replace(full_path, "[duration]", ss.str());
}
if (::rename(tmp_file.c_str(), full_path.c_str()) < 0) {
int r0 = ::rename(tmp_file.c_str(), full_path.c_str());
if (r0 < 0) {
return srs_error_new(ERROR_SYSTEM_FRAGMENT_RENAME, "rename %s to %s", tmp_file.c_str(), full_path.c_str());
}
filepath = full_path;
return err;
}
@ -201,7 +206,7 @@ void SrsFragmentWindow::append(SrsFragment* fragment)
fragments.push_back(fragment);
}
void SrsFragmentWindow::shrink(int64_t window)
void SrsFragmentWindow::shrink(srs_utime_t window)
{
srs_utime_t duration = 0;
@ -211,7 +216,7 @@ void SrsFragmentWindow::shrink(int64_t window)
SrsFragment* fragment = fragments[i];
duration += fragment->duration();
if (srsu2ms(duration) > window) {
if (duration > window) {
remove_index = i;
break;
}
@ -242,15 +247,15 @@ void SrsFragmentWindow::clear_expired(bool delete_files)
expired_fragments.clear();
}
int64_t SrsFragmentWindow::max_duration()
srs_utime_t SrsFragmentWindow::max_duration()
{
int64_t v = 0;
srs_utime_t v = 0;
std::vector<SrsFragment*>::iterator it;
for (it = fragments.begin(); it != fragments.end(); ++it) {
SrsFragment* fragment = *it;
v = srs_max(v, srsu2ms(fragment->duration()));
v = srs_max(v, fragment->duration());
}
return v;

@ -40,8 +40,8 @@ private:
srs_utime_t dur;
// The full file path of fragment.
std::string filepath;
// The start DTS in ms of segment.
int64_t start_dts;
// The start DTS in srs_utime_t of segment.
srs_utime_t start_dts;
// Whether current segement contains sequence header.
bool sequence_header;
public:
@ -93,11 +93,11 @@ public:
// Append a new fragment, which is ready to delivery to client.
virtual void append(SrsFragment* fragment);
// Shrink the window, push the expired fragment to a queue.
virtual void shrink(int64_t window);
virtual void shrink(srs_utime_t window);
// Clear the expired fragments.
virtual void clear_expired(bool delete_files);
// Get the max duration in ms of all fragments.
virtual int64_t max_duration();
// Get the max duration in srs_utime_t of all fragments.
virtual srs_utime_t max_duration();
public:
virtual bool empty();
virtual SrsFragment* first();

@ -348,7 +348,7 @@ srs_error_t SrsHds::on_video(SrsSharedPtrMessage* msg)
currentSegment->on_video(msg);
double fragment_duration = _srs_config->get_hds_fragment(hds_req->vhost) * 1000;
double fragment_duration = srsu2ms(_srs_config->get_hds_fragment(hds_req->vhost));
if (currentSegment->duration() >= fragment_duration) {
// flush segment
if ((err = currentSegment->flush()) != srs_success) {
@ -398,7 +398,7 @@ srs_error_t SrsHds::on_audio(SrsSharedPtrMessage* msg)
currentSegment->on_audio(msg);
double fragment_duration = _srs_config->get_hds_fragment(hds_req->vhost) * 1000;
double fragment_duration = srsu2ms(_srs_config->get_hds_fragment(hds_req->vhost));
if (currentSegment->duration() >= fragment_duration) {
// flush segment
if ((err = currentSegment->flush()) != srs_success) {
@ -718,7 +718,7 @@ void SrsHds::adjust_windows()
windows_size += fragment->duration();
}
double windows_size_limit = _srs_config->get_hds_window(hds_req->vhost) * 1000;
double windows_size_limit = srsu2ms(_srs_config->get_hds_window(hds_req->vhost));
if (windows_size > windows_size_limit ) {
SrsHdsFragment *fragment = fragments.front();
unlink(fragment->fragment_path().c_str());

@ -55,7 +55,7 @@ using namespace std;
// drop the segment when duration of ts too small.
// TODO: FIXME: Refine to time unit.
#define SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS 100
#define SRS_AUTO_HLS_SEGMENT_MIN_DURATION (100 * SRS_UTIME_MILLISECONDS)
// fragment plus the deviation percent.
#define SRS_HLS_FLOOR_REAP_PERCENT 0.3
@ -82,7 +82,7 @@ void SrsHlsSegment::config_cipher(unsigned char* key,unsigned char* iv)
fw->config_cipher(key, iv);
}
SrsDvrAsyncCallOnHls::SrsDvrAsyncCallOnHls(int c, SrsRequest* r, string p, string t, string m, string mu, int s, double d)
SrsDvrAsyncCallOnHls::SrsDvrAsyncCallOnHls(int c, SrsRequest* r, string p, string t, string m, string mu, int s, srs_utime_t d)
{
req = r->copy();
cid = c;
@ -252,9 +252,9 @@ string SrsHlsMuxer::ts_url()
return current? current->uri:"";
}
double SrsHlsMuxer::duration()
srs_utime_t SrsHlsMuxer::duration()
{
return current? srsu2ms(current->duration())/1000.0:0;
return current? current->duration():0;
}
int SrsHlsMuxer::deviation()
@ -279,7 +279,7 @@ srs_error_t SrsHlsMuxer::initialize()
}
srs_error_t SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
string path, string m3u8_file, string ts_file, double fragment, double window,
string path, string m3u8_file, string ts_file, srs_utime_t fragment, srs_utime_t window,
bool ts_floor, double aof_ratio, bool cleanup, bool wait_keyframe, bool keys,
int fragments_per_key, string key_file ,string key_file_path, string key_url)
{
@ -312,7 +312,7 @@ srs_error_t SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
m3u8 = path + "/" + m3u8_url;
// when update config, reset the history target duration.
max_td = (int)(fragment * _srs_config->get_hls_td_ratio(r->vhost));
max_td = fragment * _srs_config->get_hls_td_ratio(r->vhost);
// create m3u8 dir once.
m3u8_dir = srs_path_dirname(m3u8);
@ -391,7 +391,7 @@ srs_error_t SrsHlsMuxer::segment_open()
ts_file = srs_path_build_stream(ts_file, req->vhost, req->app, req->stream);
if (hls_ts_floor) {
// accept the floor ts for the first piece.
int64_t current_floor_ts = (int64_t)(srsu2ms(srs_update_system_time()) / (1000 * hls_fragment));
int64_t current_floor_ts = srs_update_system_time() / hls_fragment;
if (!accept_floor_ts) {
accept_floor_ts = current_floor_ts - 1;
} else {
@ -484,14 +484,13 @@ bool SrsHlsMuxer::is_segment_overflow()
srs_assert(current);
// to prevent very small segment.
if (srsu2msi(current->duration()) < 2 * SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS) {
if (current->duration() < 2 * SRS_AUTO_HLS_SEGMENT_MIN_DURATION) {
return false;
}
// use N% deviation, to smoother.
double deviation = hls_ts_floor? SRS_HLS_FLOOR_REAP_PERCENT * deviation_ts * hls_fragment : 0.0;
return srsu2msi(current->duration()) >= (hls_fragment + deviation) * 1000;
srs_utime_t deviation = hls_ts_floor? SRS_HLS_FLOOR_REAP_PERCENT * deviation_ts * hls_fragment : 0;
return current->duration() >= hls_fragment + deviation;
}
bool SrsHlsMuxer::wait_keyframe()
@ -505,14 +504,13 @@ bool SrsHlsMuxer::is_segment_absolutely_overflow()
srs_assert(current);
// to prevent very small segment.
if (srsu2msi(current->duration()) < 2 * SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS) {
if (current->duration() < 2 * SRS_AUTO_HLS_SEGMENT_MIN_DURATION) {
return false;
}
// use N% deviation, to smoother.
double deviation = hls_ts_floor? SRS_HLS_FLOOR_REAP_PERCENT * deviation_ts * hls_fragment : 0.0;
return srsu2msi(current->duration()) >= (hls_aof_ratio * hls_fragment + deviation) * 1000;
srs_utime_t deviation = hls_ts_floor? SRS_HLS_FLOOR_REAP_PERCENT * deviation_ts * hls_fragment : 0;
return current->duration() >= hls_aof_ratio * hls_fragment + deviation;
}
bool SrsHlsMuxer::pure_audio()
@ -577,6 +575,16 @@ srs_error_t SrsHlsMuxer::flush_video(SrsTsMessageCache* cache)
}
srs_error_t SrsHlsMuxer::segment_close()
{
srs_error_t err = do_segment_close();
// We always cleanup current segment.
srs_freep(current);
return err;
}
srs_error_t SrsHlsMuxer::do_segment_close()
{
srs_error_t err = srs_success;
@ -587,15 +595,22 @@ srs_error_t SrsHlsMuxer::segment_close()
// when close current segment, the current segment must not be NULL.
srs_assert(current);
// We should always close the underlayer writer.
if (current && current->writer) {
current->writer->close();
}
// valid, add to segments if segment duration is ok
// when too small, it maybe not enough data to play.
// when too large, it maybe timestamp corrupt.
// make the segment more acceptable, when in [min, max_td * 2], it's ok.
if (srsu2msi(current->duration()) >= SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS && (int)srsu2msi(current->duration()) <= max_td * 2 * 1000) {
bool matchMinDuration = current->duration() >= SRS_AUTO_HLS_SEGMENT_MIN_DURATION;
bool matchMaxDuration = current->duration() <= max_td * 2 * 1000;
if (matchMinDuration && matchMaxDuration) {
// use async to call the http hooks, for it will cause thread switch.
if ((err = async->execute(new SrsDvrAsyncCallOnHls(_srs_context->get_id(), req, current->fullpath(),
current->uri, m3u8, m3u8_url, current->sequence_no, srsu2msi(current->duration()) / 1000.0))) != srs_success) {
current->uri, m3u8, m3u8_url, current->sequence_no, current->duration()))) != srs_success) {
return srs_error_wrap(err, "segment close");
}
@ -625,11 +640,10 @@ srs_error_t SrsHlsMuxer::segment_close()
if ((err = current->unlink_tmpfile()) != srs_success) {
return srs_error_wrap(err, "rename");
}
srs_freep(current);
}
// shrink the segments.
segments->shrink(hls_window * 1000);
segments->shrink(hls_window);
// refresh the m3u8, donot contains the removed ts
err = refresh_m3u8();
@ -746,8 +760,8 @@ srs_error_t SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
* typical target duration is 10 seconds.
*/
// @see https://github.com/ossrs/srs/issues/304#issuecomment-74000081
int target_duration = (int)ceil(segments->max_duration() / 1000.0);
target_duration = srs_max(target_duration, max_td);
srs_utime_t max_duration = segments->max_duration();
int target_duration = (int)ceil(srsu2msi(srs_max(max_duration, max_td)) / 1000.0);
ss << "#EXT-X-TARGETDURATION:" << target_duration << SRS_CONSTS_LF;
@ -838,7 +852,7 @@ string SrsHlsController::ts_url()
return muxer->ts_url();
}
double SrsHlsController::duration()
srs_utime_t SrsHlsController::duration()
{
return muxer->duration();
}
@ -856,8 +870,8 @@ srs_error_t SrsHlsController::on_publish(SrsRequest* req)
std::string stream = req->stream;
std::string app = req->app;
double hls_fragment = _srs_config->get_hls_fragment(vhost);
double hls_window = _srs_config->get_hls_window(vhost);
srs_utime_t hls_fragment = _srs_config->get_hls_fragment(vhost);
srs_utime_t hls_window = _srs_config->get_hls_window(vhost);
// get the hls m3u8 ts list entry prefix config
std::string entry_prefix = _srs_config->get_hls_entry_prefix(vhost);
@ -893,8 +907,8 @@ srs_error_t SrsHlsController::on_publish(SrsRequest* req)
if ((err = muxer->segment_open()) != srs_success) {
return srs_error_wrap(err, "hls: segment open");
}
srs_trace("hls: win=%.2f, frag=%.2f, prefix=%s, path=%s, m3u8=%s, ts=%s, aof=%.2f, floor=%d, clean=%d, waitk=%d, dispose=%dms",
hls_window, hls_fragment, entry_prefix.c_str(), path.c_str(), m3u8_file.c_str(),
srs_trace("hls: win=%dms, frag=%dms, prefix=%s, path=%s, m3u8=%s, ts=%s, aof=%.2f, floor=%d, clean=%d, waitk=%d, dispose=%dms",
srsu2msi(hls_window), srsu2msi(hls_fragment), entry_prefix.c_str(), path.c_str(), m3u8_file.c_str(),
ts_file.c_str(), hls_aof_ratio, ts_floor, cleanup, wait_keyframe, srsu2msi(hls_dispose));
return err;
@ -1007,6 +1021,13 @@ srs_error_t SrsHlsController::reap_segment()
// close current ts.
if ((err = muxer->segment_close()) != srs_success) {
// When close segment error, we must reopen it for next packet to write.
srs_error_t r0 = muxer->segment_open();
if (r0 != srs_success) {
srs_warn("close segment err %s", srs_error_desc(r0).c_str());
srs_freep(r0);
}
return srs_error_wrap(err, "hls: segment close");
}
@ -1278,9 +1299,9 @@ void SrsHls::hls_show_mux_log()
// the run time is not equals to stream time,
// @see: https://github.com/ossrs/srs/issues/81#issuecomment-48100994
// it's ok.
srs_trace("-> " SRS_CONSTS_LOG_HLS " time=%" PRId64 ", sno=%d, ts=%s, dur=%.2f, dva=%dp",
srs_trace("-> " SRS_CONSTS_LOG_HLS " time=%dms, sno=%d, ts=%s, dur=%.2f, dva=%dp",
pprint->age(), controller->sequence_no(), controller->ts_url().c_str(),
controller->duration(), controller->deviation());
srsu2msi(controller->duration()), controller->deviation());
}

@ -91,10 +91,10 @@ private:
std::string m3u8_url;
int seq_no;
SrsRequest* req;
double duration;
srs_utime_t duration;
public:
// TODO: FIXME: Use TBN 1000.
SrsDvrAsyncCallOnHls(int c, SrsRequest* r, std::string p, std::string t, std::string m, std::string mu, int s, double d);
SrsDvrAsyncCallOnHls(int c, SrsRequest* r, std::string p, std::string t, std::string m, std::string mu, int s, srs_utime_t d);
virtual ~SrsDvrAsyncCallOnHls();
public:
virtual srs_error_t call();
@ -139,8 +139,8 @@ private:
std::string m3u8_dir;
double hls_aof_ratio;
// TODO: FIXME: Use TBN 1000.
double hls_fragment;
double hls_window;
srs_utime_t hls_fragment;
srs_utime_t hls_window;
SrsAsyncCallWorker* async;
private:
// whether use floor algorithm for timestamp.
@ -168,7 +168,7 @@ private:
SrsFileWriter *writer;
private:
int _sequence_no;
int max_td;
srs_utime_t max_td;
std::string m3u8;
std::string m3u8_url;
private:
@ -189,7 +189,7 @@ public:
public:
virtual int sequence_no();
virtual std::string ts_url();
virtual double duration();
virtual srs_utime_t duration();
virtual int deviation();
public:
/**
@ -201,7 +201,7 @@ public:
*/
virtual srs_error_t update_config(SrsRequest* r, std::string entry_prefix,
std::string path, std::string m3u8_file, std::string ts_file,
double fragment, double window, bool ts_floor, double aof_ratio,
srs_utime_t fragment, srs_utime_t window, bool ts_floor, double aof_ratio,
bool cleanup, bool wait_keyframe, bool keys, int fragments_per_key,
std::string key_file, std::string key_file_path, std::string key_url);
/**
@ -236,6 +236,7 @@ public:
*/
virtual srs_error_t segment_close();
private:
virtual srs_error_t do_segment_close();
virtual srs_error_t write_hls_key();
virtual srs_error_t refresh_m3u8();
virtual srs_error_t _refresh_m3u8(std::string m3u8_file);
@ -274,7 +275,7 @@ public:
virtual void dispose();
virtual int sequence_no();
virtual std::string ts_url();
virtual double duration();
virtual srs_utime_t duration();
virtual int deviation();
public:
/**

@ -37,10 +37,10 @@ ISrsHourGlass::~ISrsHourGlass()
{
}
SrsHourGlass::SrsHourGlass(ISrsHourGlass* h, int resolution_ms)
SrsHourGlass::SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution)
{
handler = h;
resolution = resolution_ms;
_resolution = resolution;
total_elapse = 0;
}
@ -48,12 +48,13 @@ SrsHourGlass::~SrsHourGlass()
{
}
srs_error_t SrsHourGlass::tick(int type, int interval)
srs_error_t SrsHourGlass::tick(int type, srs_utime_t interval)
{
srs_error_t err = srs_success;
if (resolution > 0 && (interval % resolution) != 0) {
return srs_error_new(ERROR_SYSTEM_HOURGLASS_RESOLUTION, "hourglass interval=%d invalid, resolution=%d", interval, resolution);
if (_resolution > 0 && (interval % _resolution) != 0) {
return srs_error_new(ERROR_SYSTEM_HOURGLASS_RESOLUTION,
"invalid interval=%dms, resolution=%dms", srsu2msi(interval), srsu2msi(_resolution));
}
ticks[type] = interval;
@ -65,10 +66,10 @@ srs_error_t SrsHourGlass::cycle()
{
srs_error_t err = srs_success;
map<int, int>::iterator it;
map<int, srs_utime_t>::iterator it;
for (it = ticks.begin(); it != ticks.end(); ++it) {
int type = it->first;
int interval = it->second;
srs_utime_t interval = it->second;
if (interval == 0 || (total_elapse % interval) == 0) {
if ((err = handler->notify(type, interval, total_elapse)) != srs_success) {
@ -76,9 +77,10 @@ srs_error_t SrsHourGlass::cycle()
}
}
}
total_elapse += resolution;
srs_usleep(resolution * 1000);
// TODO: FIXME: Maybe we should use wallclock.
total_elapse += _resolution;
srs_usleep(_resolution);
return err;
}

@ -40,7 +40,7 @@ public:
/**
* notify the handler, the type and tick.
*/
virtual srs_error_t notify(int type, int interval, int64_t tick) = 0;
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick) = 0;
};
/**
@ -60,10 +60,10 @@ public:
* this is used for server and bocar server and other manager.
*
* Usage:
* SrsHourGlass* hg = new SrsHourGlass(handler, 1000);
* hg->tick(1, 3000);
* hg->tick(2, 5000);
* hg->tick(3, 7000);
* SrsHourGlass* hg = new SrsHourGlass(handler, 1 * SRS_UTIME_MILLISECONDS);
* hg->tick(1, 3 * SRS_UTIME_MILLISECONDS);
* hg->tick(2, 5 * SRS_UTIME_MILLISECONDS);
* hg->tick(3, 7 * SRS_UTIME_MILLISECONDS);
* // create a thread to cycle, which will call handerl when ticked.
* while (true) {
* hg->cycle();
@ -73,21 +73,21 @@ class SrsHourGlass
{
private:
ISrsHourGlass* handler;
int resolution;
srs_utime_t _resolution;
// key: the type of tick.
// value: the interval of tick.
std::map<int, int> ticks;
std::map<int, srs_utime_t> ticks;
// the total elapsed time,
// for each cycle, we increase it with a resolution.
int64_t total_elapse;
srs_utime_t total_elapse;
public:
SrsHourGlass(ISrsHourGlass* h, int resolution_ms);
SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution);
virtual ~SrsHourGlass();
public:
// add a pair of tick(type, interval).
// @param type the type of tick.
// @param interval the interval in ms of tick.
virtual srs_error_t tick(int type, int interval);
// @param interval the interval in srs_utime_t of tick.
virtual srs_error_t tick(int type, srs_utime_t interval);
public:
// cycle the hourglass, which will sleep resolution every time.
// and call handler when ticked.

@ -1354,7 +1354,7 @@ srs_error_t SrsHttpApi::do_cycle()
// set the recv timeout, for some clients never disconnect the connection.
// @see https://github.com/ossrs/srs/issues/398
skt->set_recv_timeout(SRS_HTTP_RECV_TMMS);
skt->set_recv_timeout(SRS_HTTP_RECV_TIMEOUT);
// initialize the cors, which will proxy to mux.
bool crossdomain_enabled = _srs_config->get_http_api_crossdomain();

@ -90,7 +90,7 @@ srs_error_t SrsHttpConn::do_cycle()
// set the recv timeout, for some clients never disconnect the connection.
// @see https://github.com/ossrs/srs/issues/398
skt->set_recv_timeout(SRS_HTTP_RECV_TMMS);
skt->set_recv_timeout(SRS_HTTP_RECV_TIMEOUT);
SrsRequest* last_req = NULL;
SrsAutoFree(SrsRequest, last_req);

@ -45,8 +45,8 @@ using namespace std;
#define SRS_HTTP_READ_BUFFER 4096
#define SRS_HTTP_BODY_BUFFER (32 * 1024)
// the timeout for hls notify, in ms.
#define SRS_HLS_NOTIFY_TMMS (10 * SRS_UTIME_MILLISECONDS)
// the timeout for hls notify, in srs_utime_t.
#define SRS_HLS_NOTIFY_TIMEOUT (10 * SRS_UTIME_SECONDS)
SrsHttpHooks::SrsHttpHooks()
{
@ -301,7 +301,7 @@ srs_error_t SrsHttpHooks::on_dvr(int cid, string url, SrsRequest* req, string fi
return err;
}
srs_error_t SrsHttpHooks::on_hls(int cid, string url, SrsRequest* req, string file, string ts_url, string m3u8, string m3u8_url, int sn, double duration)
srs_error_t SrsHttpHooks::on_hls(int cid, string url, SrsRequest* req, string file, string ts_url, string m3u8, string m3u8_url, int sn, srs_utime_t duration)
{
srs_error_t err = srs_success;
@ -324,7 +324,7 @@ srs_error_t SrsHttpHooks::on_hls(int cid, string url, SrsRequest* req, string fi
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("duration", SrsJsonAny::number(duration));
obj->set("duration", SrsJsonAny::number(srsu2ms(duration)/1000.0));
obj->set("cwd", SrsJsonAny::str(cwd.c_str()));
obj->set("file", SrsJsonAny::str(file.c_str()));
obj->set("url", SrsJsonAny::str(ts_url.c_str()));
@ -371,7 +371,7 @@ srs_error_t SrsHttpHooks::on_hls_notify(int cid, std::string url, SrsRequest* re
}
SrsHttpClient http;
if ((err = http.initialize(uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TMMS)) != srs_success) {
if ((err = http.initialize(uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TIMEOUT)) != srs_success) {
return srs_error_wrap(err, "http: init client for %s", url.c_str());
}

@ -99,11 +99,11 @@ public:
* @param m3u8 the m3u8 file path, can be relative or absolute path.
* @param m3u8_url the m3u8 url, which is used for the http mount path.
* @param sn the seq_no, the sequence number of ts in hls/m3u8.
* @param duration the segment duration in seconds.
* @param duration the segment duration in srs_utime_t.
* @param cid the source connection cid, for the on_dvr is async call.
*/
static srs_error_t on_hls(int cid, std::string url, SrsRequest* req, std::string file, std::string ts_url,
std::string m3u8, std::string m3u8_url, int sn, double duration);
std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration);
/**
* when hls reap segment, callback.
* @param url the api server url, to process the event.

@ -108,7 +108,8 @@ srs_error_t SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgor
return srs_error_wrap(err, "dump packets");
}
srs_trace("http: dump cache %d msgs, duration=%dms, cache=%.2fs", queue->size(), srsu2msi(queue->duration()), fast_cache);
srs_trace("http: dump cache %d msgs, duration=%dms, cache=%dms",
queue->size(), srsu2msi(queue->duration()), srsu2msi(fast_cache));
return err;
}

@ -39,7 +39,7 @@ class SrsTsTransmuxer;
class SrsBufferCache : public ISrsCoroutineHandler
{
private:
double fast_cache;
srs_utime_t fast_cache;
private:
SrsMessageQueue* queue;
SrsSource* source;

@ -62,9 +62,9 @@ string SrsIngesterFFMPEG::uri()
return vhost + "/" + id;
}
int SrsIngesterFFMPEG::alive()
srs_utime_t SrsIngesterFFMPEG::alive()
{
return srsu2msi(srs_get_system_time() - starttime);
return srs_get_system_time() - starttime;
}
bool SrsIngesterFFMPEG::equals(string v)
@ -168,7 +168,7 @@ void SrsIngester::fast_stop()
// when error, ingester sleep for a while and retry.
// ingest never sleep a long time, for we must start the stream ASAP.
#define SRS_AUTO_INGESTER_CIMS (3000 * SRS_UTIME_MILLISECONDS)
#define SRS_AUTO_INGESTER_CIMS (3 * SRS_UTIME_SECONDS)
srs_error_t SrsIngester::cycle()
{
@ -454,8 +454,8 @@ void SrsIngester::show_ingest_log_message()
// reportable
if (pprint->can_print()) {
srs_trace("-> " SRS_CONSTS_LOG_INGESTER " time=%" PRId64 ", ingesters=%d, #%d(alive=%ds, %s)",
pprint->age(), (int)ingesters.size(), index, ingester->alive() / 1000, ingester->uri().c_str());
srs_trace("-> " SRS_CONSTS_LOG_INGESTER " time=%dms, ingesters=%d, #%d(alive=%dms, %s)",
srsu2msi(pprint->age()), (int)ingesters.size(), index, srsu2msi(ingester->alive()), ingester->uri().c_str());
}
}

@ -52,8 +52,8 @@ public:
virtual srs_error_t initialize(SrsFFMPEG* ff, std::string v, std::string i);
// the ingest uri, [vhost]/[ingest id]
virtual std::string uri();
// the alive in ms.
virtual int alive();
// the alive in srs_utime_t.
virtual srs_utime_t alive();
virtual bool equals(std::string v, std::string i);
virtual bool equals(std::string v);
public:

@ -40,7 +40,7 @@ using namespace std;
#ifdef SRS_AUTO_KAFKA
#define SRS_KAFKA_PRODUCER_TIMEOUT 30000
#define SRS_KAFKA_PRODUCER_TIMEOUT (30 * SRS_UTIME_MILLISECONDS)
#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 1
std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata)
@ -485,7 +485,7 @@ srs_error_t SrsKafkaProducer::on_close(int key)
return worker->execute(new SrsKafkaMessage(this, key, obj));
}
#define SRS_KAKFA_CIMS (3000 * SRS_UTIME_MILLISECONDS)
#define SRS_KAKFA_CIMS (3 * SRS_UTIME_SECONDS)
srs_error_t SrsKafkaProducer::cycle()
{
@ -577,7 +577,7 @@ srs_error_t SrsKafkaProducer::request_metadata()
senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
}
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TMMS);
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TIMEOUT);
SrsAutoFree(SrsTcpClient, transport);
SrsKafkaClient* kafka = new SrsKafkaClient(transport);

@ -43,8 +43,8 @@ using namespace std;
// set the max packet size.
#define SRS_UDP_MAX_PACKET_SIZE 65535
// sleep in ms for udp recv packet.
#define SrsUdpPacketRecvCycleMS 0
// sleep in srs_utime_t for udp recv packet.
#define SrsUdpPacketRecvCycleInterval 0
// nginx also set to 512
#define SERVER_LISTEN_BACKLOG 512
@ -173,8 +173,8 @@ srs_error_t SrsUdpListener::cycle()
return srs_error_wrap(err, "handle packet %d bytes", nread);
}
if (SrsUdpPacketRecvCycleMS > 0) {
srs_usleep(SrsUdpPacketRecvCycleMS * SRS_UTIME_MILLISECONDS);
if (SrsUdpPacketRecvCycleInterval > 0) {
srs_usleep(SrsUdpPacketRecvCycleInterval);
}
}

@ -615,13 +615,13 @@ srs_error_t SrsMpegtsOverUdp::connect()
return err;
}
int64_t cto = srsu2ms(SRS_CONSTS_RTMP_TIMEOUT);
int64_t sto = srsu2ms(SRS_CONSTS_RTMP_PULSE);
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
sdk = new SrsSimpleRtmpClient(output, cto, sto);
if ((err = sdk->connect()) != srs_success) {
close();
return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto);
return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", output.c_str(), srsu2msi(cto), srsu2msi(sto));
}
if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {

@ -76,7 +76,7 @@ void SrsNgExec::on_unpublish()
}
// when error, ng-exec sleep for a while and retry.
#define SRS_RTMP_EXEC_CIMS (3000 * SRS_UTIME_MILLISECONDS)
#define SRS_RTMP_EXEC_CIMS (3 * SRS_UTIME_SECONDS)
srs_error_t SrsNgExec::cycle()
{
srs_error_t err = srs_success;

@ -224,9 +224,9 @@ bool SrsPithyPrint::can_print()
return stage->can_print();
}
int64_t SrsPithyPrint::age()
srs_utime_t SrsPithyPrint::age()
{
return srsu2ms(_age);
return _age;
}

@ -111,9 +111,9 @@ public:
*/
virtual bool can_print();
/**
* get the elapsed time in ms.
* get the elapsed time in srs_utime_t.
*/
virtual int64_t age();
virtual srs_utime_t age();
};
#endif

@ -56,11 +56,12 @@ ISrsMessagePumper::~ISrsMessagePumper()
{
}
SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm)
SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, int parent_cid)
{
rtmp = r;
pumper = p;
timeout = tm;
_parent_cid = parent_cid;
trd = new SrsDummyCoroutine();
}
@ -79,7 +80,7 @@ srs_error_t SrsRecvThread::start()
srs_error_t err = srs_success;
srs_freep(trd);
trd = new SrsSTCoroutine("recv", this);
trd = new SrsSTCoroutine("recv", this, _parent_cid);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "recv thread");
@ -116,7 +117,7 @@ srs_error_t SrsRecvThread::cycle()
}
// reset the timeout to pulse mode.
rtmp->set_recv_timeout(timeout * 1000);
rtmp->set_recv_timeout(timeout);
pumper->on_stop();
@ -134,7 +135,7 @@ srs_error_t SrsRecvThread::do_cycle()
// When the pumper is interrupted, wait then retry.
if (pumper->interrupted()) {
srs_usleep(timeout * 1000);
srs_usleep(timeout);
continue;
}
@ -159,8 +160,8 @@ srs_error_t SrsRecvThread::do_cycle()
return err;
}
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms)
: trd(this, rtmp_sdk, timeout_ms)
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, int parent_cid)
: trd(this, rtmp_sdk, tm, parent_cid)
{
_consumer = consumer;
rtmp = rtmp_sdk;
@ -272,8 +273,9 @@ void SrsQueueRecvThread::on_stop()
rtmp->set_auto_response(true);
}
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source)
: trd(this, rtmp_sdk, timeout_ms)
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, int parent_cid)
: trd(this, rtmp_sdk, tm, parent_cid)
{
rtmp = rtmp_sdk;
@ -308,14 +310,14 @@ SrsPublishRecvThread::~SrsPublishRecvThread()
srs_freep(recv_error);
}
srs_error_t SrsPublishRecvThread::wait(uint64_t timeout_ms)
srs_error_t SrsPublishRecvThread::wait(srs_utime_t tm)
{
if (recv_error != srs_success) {
return srs_error_copy(recv_error);
}
// ignore any return of cond wait.
srs_cond_timedwait(error, timeout_ms * 1000);
srs_cond_timedwait(error, tm);
return srs_success;
}

@ -96,12 +96,13 @@ protected:
SrsCoroutine* trd;
ISrsMessagePumper* pumper;
SrsRtmpServer* rtmp;
// The recv timeout in ms.
int timeout;
int _parent_cid;
// The recv timeout in srs_utime_t.
srs_utime_t timeout;
public:
// Constructor.
// @param tm The receive timeout in ms.
SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm);
// @param tm The receive timeout in srs_utime_t.
SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, int parent_cid);
virtual ~SrsRecvThread();
public:
virtual int cid();
@ -133,7 +134,7 @@ private:
SrsConsumer* _consumer;
public:
// TODO: FIXME: Refine timeout in time unit.
SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms);
SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, int parent_cid);
virtual ~SrsQueueRecvThread();
public:
virtual srs_error_t start();
@ -189,13 +190,14 @@ private:
int cid;
int ncid;
public:
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source);
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, int parent_cid);
virtual ~SrsPublishRecvThread();
public:
/**
* wait for error for some timeout.
*/
virtual srs_error_t wait(uint64_t timeout_ms);
virtual srs_error_t wait(srs_utime_t tm);
virtual int64_t nb_msgs();
virtual uint64_t nb_video_frames();
virtual srs_error_t error_code();

@ -57,27 +57,22 @@ using namespace std;
#include <srs_protocol_json.hpp>
#include <srs_app_kafka.hpp>
// when stream is busy, for example, streaming is already
// publishing, when a new client to request to publish,
// sleep a while and close the connection.
#define SRS_STREAM_BUSY_CIMS (3000)
// the timeout in ms to wait encoder to republish
// the timeout in srs_utime_t to wait encoder to republish
// if timeout, close the connection.
#define SRS_REPUBLISH_SEND_TMMS (3 * SRS_UTIME_MINUTES)
#define SRS_REPUBLISH_SEND_TIMEOUT (3 * SRS_UTIME_MINUTES)
// if timeout, close the connection.
#define SRS_REPUBLISH_RECV_TMMS (3 * SRS_UTIME_MINUTES)
#define SRS_REPUBLISH_RECV_TIMEOUT (3 * SRS_UTIME_MINUTES)
// the timeout in ms to wait client data, when client paused
// the timeout in srs_utime_t to wait client data, when client paused
// if timeout, close the connection.
#define SRS_PAUSED_SEND_TMMS (3 * SRS_UTIME_MINUTES)
#define SRS_PAUSED_SEND_TIMEOUT (3 * SRS_UTIME_MINUTES)
// if timeout, close the connection.
#define SRS_PAUSED_RECV_TMMS (3 * SRS_UTIME_MINUTES)
#define SRS_PAUSED_RECV_TIMEOUT (3 * SRS_UTIME_MINUTES)
// when edge timeout, retry next.
#define SRS_EDGE_TOKEN_TRAVERSE_TMMS (3000)
#define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT (3 * SRS_UTIME_SECONDS)
SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, int64_t ctm, int64_t stm) : SrsBasicRtmpClient(u, ctm, stm)
SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, srs_utime_t ctm, srs_utime_t stm) : SrsBasicRtmpClient(u, ctm, stm)
{
}
@ -166,8 +161,8 @@ srs_error_t SrsRtmpConn::do_cycle()
}
#endif
rtmp->set_recv_timeout(srsu2ms(SRS_CONSTS_RTMP_TIMEOUT));
rtmp->set_send_timeout(srsu2ms(SRS_CONSTS_RTMP_TIMEOUT));
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
if ((err = rtmp->handshake()) != srs_success) {
return srs_error_wrap(err, "rtmp handshake");
@ -265,9 +260,9 @@ srs_error_t SrsRtmpConn::on_reload_vhost_play(string vhost)
// send_min_interval
if (true) {
double v = _srs_config->get_send_min_interval(vhost);
srs_utime_t v = _srs_config->get_send_min_interval(vhost);
if (v != send_min_interval) {
srs_trace("apply smi %.2f=>%.2f", send_min_interval, v);
srs_trace("apply smi %d=>%d ms", srsu2msi(send_min_interval), srsu2msi(v));
send_min_interval = v;
}
}
@ -410,8 +405,8 @@ srs_error_t SrsRtmpConn::service_cycle()
// for republish, continue service
if (srs_error_code(err) == ERROR_CONTROL_REPUBLISH) {
// set timeout to a larger value, wait for encoder to republish.
rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TMMS);
rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TMMS);
rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT);
rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT);
srs_trace("rtmp: retry for republish");
srs_freep(err);
@ -424,8 +419,8 @@ srs_error_t SrsRtmpConn::service_cycle()
// TODO: FIXME: use ping message to anti-death of socket.
// @see: https://github.com/ossrs/srs/issues/39
// set timeout to a larger value, for user paused.
rtmp->set_recv_timeout(SRS_PAUSED_RECV_TMMS);
rtmp->set_send_timeout(SRS_PAUSED_SEND_TMMS);
rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT);
rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT);
srs_trace("rtmp: retry for close");
srs_freep(err);
@ -451,8 +446,8 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
req->strip();
srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%.2f",
srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), req->duration);
srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%dms",
srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), srsu2msi(req->duration));
// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
@ -498,8 +493,8 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
}
// client is identified, set the timeout to service timeout.
rtmp->set_recv_timeout(srsu2ms(SRS_CONSTS_RTMP_TIMEOUT));
rtmp->set_send_timeout(srsu2ms(SRS_CONSTS_RTMP_TIMEOUT));
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
// find a source to serve.
SrsSource* source = NULL;
@ -646,7 +641,7 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
// Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217
SrsQueueRecvThread trd(consumer, rtmp, srsu2msi(SRS_PERF_MW_SLEEP));
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
if ((err = trd.start()) != srs_success) {
return srs_error_wrap(err, "rtmp: start receive thread");
@ -692,8 +687,8 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
send_min_interval, srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay);
srs_trace("start play smi=%dms, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay);
while (true) {
// collect elapse for pithy print.
@ -767,7 +762,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
if (starttime < 0 || starttime > msg->timestamp) {
starttime = msg->timestamp;
}
duration += msg->timestamp - starttime;
duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;
starttime = msg->timestamp;
}
}
@ -781,14 +776,14 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
// if duration specified, and exceed it, stop play live.
// @see: https://github.com/ossrs/srs/issues/45
if (user_specified_duration_to_stop) {
if (duration >= (int64_t)req->duration) {
return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", (int)duration, (int)req->duration);
if (duration >= req->duration) {
return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));
}
}
// apply the minimal interval for delivery stream in ms.
// apply the minimal interval for delivery stream in srs_utime_t.
if (send_min_interval > 0) {
srs_usleep((int64_t)(send_min_interval * 1000));
srs_usleep(send_min_interval);
}
}
@ -815,7 +810,7 @@ srs_error_t SrsRtmpConn::publishing(SrsSource* source)
if ((err = acquire_publish(source)) == srs_success) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source);
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
err = do_publishing(source, &rtrd);
rtrd.stop();
}
@ -847,11 +842,6 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
return srs_error_wrap(err, "rtmp: receive thread");
}
// change the isolate recv thread context id,
// merge its log to current thread.
int receive_thread_cid = rtrd->get_cid();
rtrd->set_cid(_srs_context->get_id());
// initialize the publish timeout.
publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
@ -862,9 +852,8 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
if (true) {
bool mr = _srs_config->get_mr_enabled(req->vhost);
srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d, rtcid=%d",
mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout),
tcp_nodelay, receive_thread_cid);
srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d",
mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay);
}
int64_t nb_msgs = 0;
@ -880,9 +869,9 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
if (nb_msgs == 0) {
// when not got msgs, wait for a larger timeout.
// @see https://github.com/ossrs/srs/issues/441
rtrd->wait(srsu2msi(publish_1stpkt_timeout));
rtrd->wait(publish_1stpkt_timeout);
} else {
rtrd->wait(srsu2msi(publish_normal_timeout));
rtrd->wait(publish_normal_timeout);
}
// check the thread error code.
@ -1159,7 +1148,7 @@ srs_error_t SrsRtmpConn::check_edge_token_traverse_auth()
int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
srs_parse_hostport(hostport, server, port);
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_EDGE_TOKEN_TRAVERSE_TMMS);
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT);
SrsAutoFree(SrsTcpClient, transport);
if ((err = transport->connect()) != srs_success) {
@ -1183,8 +1172,8 @@ srs_error_t SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client)
SrsRequest* req = info->req;
srs_assert(client);
client->set_recv_timeout(srsu2ms(SRS_CONSTS_RTMP_TIMEOUT));
client->set_send_timeout(srsu2ms(SRS_CONSTS_RTMP_TIMEOUT));
client->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
client->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
if ((err = client->handshake()) != srs_success) {
return srs_error_wrap(err, "rtmp: handshake");

@ -64,7 +64,7 @@ class ISrsKafkaCluster;
class SrsSimpleRtmpClient : public SrsBasicRtmpClient
{
public:
SrsSimpleRtmpClient(std::string u, int64_t ctm, int64_t stm);
SrsSimpleRtmpClient(std::string u, srs_utime_t ctm, srs_utime_t stm);
virtual ~SrsSimpleRtmpClient();
protected:
virtual srs_error_t connect_app();
@ -105,19 +105,19 @@ private:
// the wakable handler, maybe NULL.
// TODO: FIXME: Should refine the state for receiving thread.
ISrsWakable* wakable;
// elapse duration in ms
// elapse duration in srs_utime_t
// for live play duration, for instance, rtmpdump to record.
// @see https://github.com/ossrs/srs/issues/47
int64_t duration;
// the MR(merged-write) sleep time in ms.
srs_utime_t duration;
// the MR(merged-write) sleep time in srs_utime_t.
srs_utime_t mw_sleep;
// the MR(merged-write) only enabled for play.
int mw_enabled;
// for realtime
// @see https://github.com/ossrs/srs/issues/257
bool realtime;
// the minimal interval in ms for delivery stream.
double send_min_interval;
// the minimal interval in srs_utime_t for delivery stream.
srs_utime_t send_min_interval;
// publish 1st packet timeout in srs_utime_t
srs_utime_t publish_1stpkt_timeout;
// publish normal packet timeout in srs_utime_t

@ -649,13 +649,13 @@ srs_error_t SrsRtspConn::connect()
}
// connect host.
int64_t cto = srsu2ms(SRS_CONSTS_RTMP_TIMEOUT);
int64_t sto = srsu2ms(SRS_CONSTS_RTMP_PULSE);
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
if ((err = sdk->connect()) != srs_success) {
close();
return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
}
// publish.

@ -53,7 +53,7 @@ using namespace std;
#include <srs_app_thread.hpp>
#include <srs_app_coworkers.hpp>
// system interval in ms,
// system interval in srs_utime_t,
// all resolution times should be times togother,
// for example, system-interval is x=1s(1000ms),
// then rusage can be 3*x, for instance, 3*1=3s,

@ -260,9 +260,9 @@ srs_utime_t SrsMessageQueue::duration()
return (av_end_time - av_start_time);
}
void SrsMessageQueue::set_queue_size(double queue_size)
void SrsMessageQueue::set_queue_size(srs_utime_t queue_size)
{
max_queue_size = srs_utime_t(queue_size * SRS_UTIME_SECONDS);
max_queue_size = queue_size;
}
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
@ -442,7 +442,7 @@ SrsConsumer::~SrsConsumer()
#endif
}
void SrsConsumer::set_queue_size(double queue_size)
void SrsConsumer::set_queue_size(srs_utime_t queue_size)
{
queue->set_queue_size(queue_size);
}
@ -698,7 +698,7 @@ bool SrsGopCache::empty()
return gop_cache.empty();
}
int64_t SrsGopCache::start_time()
srs_utime_t SrsGopCache::start_time()
{
if (empty()) {
return 0;
@ -707,7 +707,7 @@ int64_t SrsGopCache::start_time()
SrsSharedPtrMessage* msg = gop_cache[0];
srs_assert(msg);
return msg->timestamp;
return srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
}
bool SrsGopCache::pure_audio()
@ -1468,9 +1468,8 @@ srs_error_t SrsOriginHub::create_forwarders()
if ((err = forwarder->initialize(req, forward_server)) != srs_success) {
return srs_error_wrap(err, "init forwarder");
}
// TODO: FIXME: support queue size.
double queue_size = _srs_config->get_queue_length(req->vhost);
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
forwarder->set_queue_size(queue_size);
if ((err = forwarder->on_publish()) != srs_success) {
@ -1864,7 +1863,7 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
return srs_error_wrap(err, "edge(publish)");
}
double queue_size = _srs_config->get_queue_length(req->vhost);
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
publish_edge->set_queue_size(queue_size);
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost);
@ -1919,7 +1918,7 @@ srs_error_t SrsSource::on_reload_vhost_play(string vhost)
// queue length
if (true) {
double v = _srs_config->get_queue_length(req->vhost);
srs_utime_t v = _srs_config->get_queue_length(req->vhost);
if (true) {
std::vector<SrsConsumer*>::iterator it;
@ -2443,19 +2442,19 @@ srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consum
consumer = new SrsConsumer(this, conn);
consumers.push_back(consumer);
double queue_size = _srs_config->get_queue_length(req->vhost);
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
consumer->set_queue_size(queue_size);
// if atc, update the sequence header to gop cache time.
if (atc && !gop_cache->empty()) {
if (meta->data()) {
meta->data()->timestamp = gop_cache->start_time();
meta->data()->timestamp = srsu2ms(gop_cache->start_time());
}
if (meta->vsh()) {
meta->vsh()->timestamp = gop_cache->start_time();
meta->vsh()->timestamp = srsu2ms(gop_cache->start_time());
}
if (meta->ash()) {
meta->ash()->timestamp = gop_cache->start_time();
meta->ash()->timestamp = srsu2ms(gop_cache->start_time());
}
}
@ -2516,6 +2515,7 @@ srs_error_t SrsSource::on_edge_start_publish()
return publish_edge->on_client_publish();
}
// TODO: FIXME: Use edge strategy pattern.
srs_error_t SrsSource::on_edge_proxy_publish(SrsCommonMessage* msg)
{
return publish_edge->on_proxy_publish(msg);

@ -161,9 +161,9 @@ public:
virtual srs_utime_t duration();
/**
* set the queue size
* @param queue_size the queue size in seconds.
* @param queue_size the queue size in srs_utime_t.
*/
virtual void set_queue_size(double queue_size);
virtual void set_queue_size(srs_utime_t queue_size);
public:
/**
* enqueue the message, the timestamp always monotonically.
@ -243,7 +243,7 @@ public:
/**
* set the size of queue.
*/
virtual void set_queue_size(double queue_size);
virtual void set_queue_size(srs_utime_t queue_size);
/**
* when source id changed, notice client to print.
*/
@ -358,10 +358,10 @@ public:
*/
virtual bool empty();
/**
* get the start time of gop cache, in ms.
* get the start time of gop cache, in srs_utime_t.
* @return 0 if no packets.
*/
virtual int64_t start_time();
virtual srs_utime_t start_time();
/**
* whether current stream is pure audio,
* when no video in gop cache, the stream is pure audio right now.

@ -90,7 +90,7 @@ srs_error_t SrsStatisticVhost::dumps(SrsJsonObject* obj)
hls->set("enabled", SrsJsonAny::boolean(hls_enabled));
if (hls_enabled) {
hls->set("fragment", SrsJsonAny::number(_srs_config->get_hls_fragment(vhost)));
hls->set("fragment", SrsJsonAny::number(srsu2msi(_srs_config->get_hls_fragment(vhost))/1000.0));
}
return err;

@ -27,7 +27,7 @@
// current release version
#define VERSION_MAJOR 3
#define VERSION_MINOR 0
#define VERSION_REVISION 48
#define VERSION_REVISION 49
// generated by configure, only macros.
#include <srs_auto_headers.hpp>

@ -59,7 +59,7 @@
#define SRS_PERF_MR_SLEEP (350 * SRS_UTIME_MILLISECONDS)
/**
* the MW(merged-write) send cache time in ms.
* the MW(merged-write) send cache time in srs_utime_t.
* the default value, user can override it in config.
* to improve send performance, cache msgs and send in a time.
* for example, cache 500ms videos and audios, then convert all these
@ -148,8 +148,8 @@
*/
// whether gop cache is on.
#define SRS_PERF_GOP_CACHE true
// in seconds, the live queue length.
#define SRS_PERF_PLAY_QUEUE 30
// in srs_utime_t, the live queue length.
#define SRS_PERF_PLAY_QUEUE (30 * SRS_UTIME_SECONDS)
/**
* whether always use complex send algorithm.

@ -32,12 +32,12 @@ typedef int64_t srs_utime_t;
// The time unit in ms, for example 100 * SRS_UTIME_MILLISECONDS means 100ms.
#define SRS_UTIME_MILLISECONDS 1000
// Convert srs_utime_t in ms unit.
#define srsu2ms(us) (us / SRS_UTIME_MILLISECONDS)
#define srsu2msi(us) int(us / SRS_UTIME_MILLISECONDS)
// Convert srs_utime_t as ms.
#define srsu2ms(us) ((us) / SRS_UTIME_MILLISECONDS)
#define srsu2msi(us) int((us) / SRS_UTIME_MILLISECONDS)
// The time unit in ms, for example 120 * SRS_UTIME_SECONDS means 120s.
#define SRS_UTIME_SECONDS 1000000
#define SRS_UTIME_SECONDS 1000000LL
// The time unit in minutes, for example 3 * SRS_UTIME_MINUTES means 3m.
#define SRS_UTIME_MINUTES 60000000LL

@ -221,7 +221,7 @@
#define SRS_CONSTS_HTTP_QUERY_SEP '?'
// the default recv timeout.
#define SRS_HTTP_RECV_TMMS (60 * SRS_UTIME_SECONDS)
#define SRS_HTTP_RECV_TIMEOUT (60 * SRS_UTIME_SECONDS)
// 6.1.1 Status Code and Reason Phrase
#define SRS_CONSTS_HTTP_Continue 100
@ -406,7 +406,7 @@
#define SRS_CONSTS_KAFKA_DEFAULT_PORT 9092
// the common io timeout, for both recv and send.
#define SRS_CONSTS_KAFKA_TMMS (30*1000)
#define SRS_CONSTS_KAFKA_TIMEOUT (30 * SRS_UTIME_MILLISECONDS)
#endif

@ -6144,7 +6144,7 @@ SrsMp4M2tsSegmentEncoder::~SrsMp4M2tsSegmentEncoder()
srs_freep(buffer);
}
srs_error_t SrsMp4M2tsSegmentEncoder::initialize(ISrsWriter* w, uint32_t sequence, uint64_t basetime, uint32_t tid)
srs_error_t SrsMp4M2tsSegmentEncoder::initialize(ISrsWriter* w, uint32_t sequence, srs_utime_t basetime, uint32_t tid)
{
srs_error_t err = srs_success;
@ -6252,7 +6252,7 @@ srs_error_t SrsMp4M2tsSegmentEncoder::flush(uint64_t& dts)
traf->set_tfdt(tfdt);
tfdt->version = 1;
tfdt->base_media_decode_time = decode_basetime;
tfdt->base_media_decode_time = srsu2ms(decode_basetime);
SrsMp4TrackFragmentRunBox* trun = new SrsMp4TrackFragmentRunBox();
traf->set_trun(trun);

@ -469,6 +469,7 @@ public:
class SrsMp4TrackFragmentDecodeTimeBox : public SrsMp4FullBox
{
public:
// It's in ms.
uint64_t base_media_decode_time;
public:
SrsMp4TrackFragmentDecodeTimeBox();
@ -2198,7 +2199,7 @@ private:
ISrsWriter* writer;
SrsBuffer* buffer;
uint32_t sequence_number;
uint64_t decode_basetime;
srs_utime_t decode_basetime;
uint32_t track_id;
private:
uint32_t nb_audios;
@ -2212,7 +2213,7 @@ public:
virtual ~SrsMp4M2tsSegmentEncoder();
public:
// Initialize the encoder with a writer w.
virtual srs_error_t initialize(ISrsWriter* w, uint32_t sequence, uint64_t basetime, uint32_t tid);
virtual srs_error_t initialize(ISrsWriter* w, uint32_t sequence, srs_utime_t basetime, uint32_t tid);
// Cache a sample.
// @param ht, The sample handler type, audio/soun or video/vide.
// @param ft, The frame type. For video, it's SrsVideoAvcFrameType.

@ -46,14 +46,6 @@ using namespace std;
#define HLS_AES_ENCRYPT_BLOCK_LENGTH SRS_TS_PACKET_SIZE * 4
// in ms, for HLS aac sync time.
#define SRS_CONF_DEFAULT_AAC_SYNC 100
// @see: ngx_rtmp_hls_audio
/* We assume here AAC frame size is 1024
* Need to handle AAC frames with frame size of 960 */
#define _SRS_AAC_SAMPLE_SIZE 1024
// the mpegts header specifed the video/audio pid.
#define TS_PMT_NUMBER 1
#define TS_PMT_PID 0x1001

@ -377,16 +377,16 @@ srs_error_t SimpleSocketStream::read(void* buf, size_t size, ssize_t* nread)
}
// ISrsProtocolReader
void SimpleSocketStream::set_recv_timeout(int64_t tm)
void SimpleSocketStream::set_recv_timeout(srs_utime_t tm)
{
srs_assert(io);
srs_hijack_io_set_recv_timeout(io, tm);
srs_hijack_io_set_recv_timeout(io, srsu2ms(tm));
}
int64_t SimpleSocketStream::get_recv_timeout()
srs_utime_t SimpleSocketStream::get_recv_timeout()
{
srs_assert(io);
return srs_hijack_io_get_recv_timeout(io);
return srs_hijack_io_get_recv_timeout(io) * SRS_UTIME_MILLISECONDS;
}
int64_t SimpleSocketStream::get_recv_bytes()
@ -396,16 +396,16 @@ int64_t SimpleSocketStream::get_recv_bytes()
}
// ISrsProtocolWriter
void SimpleSocketStream::set_send_timeout(int64_t tm)
void SimpleSocketStream::set_send_timeout(srs_utime_t tm)
{
srs_assert(io);
srs_hijack_io_set_send_timeout(io, tm);
srs_hijack_io_set_send_timeout(io, srsu2ms(tm));
}
int64_t SimpleSocketStream::get_send_timeout()
srs_utime_t SimpleSocketStream::get_send_timeout()
{
srs_assert(io);
return srs_hijack_io_get_send_timeout(io);
return srs_hijack_io_get_send_timeout(io) * SRS_UTIME_MILLISECONDS;
}
int64_t SimpleSocketStream::get_send_bytes()
@ -425,7 +425,7 @@ srs_error_t SimpleSocketStream::writev(const iovec *iov, int iov_size, ssize_t*
}
// ISrsProtocolReadWriter
bool SimpleSocketStream::is_never_timeout(int64_t tm)
bool SimpleSocketStream::is_never_timeout(srs_utime_t tm)
{
srs_assert(io);
return srs_hijack_io_is_never_timeout(io, tm);

@ -54,18 +54,18 @@ public:
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
// ISrsProtocolReader
public:
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual int64_t get_recv_bytes();
// ISrsProtocolWriter
public:
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual int64_t get_send_bytes();
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
// ISrsProtocolReadWriter
public:
virtual bool is_never_timeout(int64_t tm);
virtual bool is_never_timeout(srs_utime_t tm);
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
};

@ -572,8 +572,8 @@ int srs_rtmp_set_timeout(srs_rtmp_t rtmp, int recv_timeout_ms, int send_timeout_
context->stimeout = send_timeout_ms;
context->rtimeout = recv_timeout_ms;
context->skt->set_recv_timeout(context->rtimeout);
context->skt->set_send_timeout(context->stimeout);
context->skt->set_recv_timeout(context->rtimeout * SRS_UTIME_MILLISECONDS);
context->skt->set_send_timeout(context->stimeout * SRS_UTIME_MILLISECONDS);
return ret;
}
@ -637,11 +637,11 @@ int srs_rtmp_connect_server(srs_rtmp_t rtmp)
// set timeout if user not set.
if (context->stimeout == SRS_UTIME_NO_TIMEOUT) {
context->stimeout = SRS_SOCKET_DEFAULT_TMMS;
context->skt->set_send_timeout(context->stimeout);
context->skt->set_send_timeout(context->stimeout * SRS_UTIME_MILLISECONDS);
}
if (context->rtimeout == SRS_UTIME_NO_TIMEOUT) {
context->rtimeout = SRS_SOCKET_DEFAULT_TMMS;
context->skt->set_recv_timeout(context->rtimeout);
context->skt->set_recv_timeout(context->rtimeout * SRS_UTIME_MILLISECONDS);
}
if ((ret = srs_librtmp_context_connect(context)) != ERROR_SUCCESS) {

@ -1275,8 +1275,8 @@ int SrsIngestHlsOutput::connect()
srs_trace("connect output=%s", url.c_str());
// connect host.
int64_t cto = srsu2ms(SRS_CONSTS_RTMP_TIMEOUT);
int64_t sto = srsu2ms(SRS_CONSTS_RTMP_PULSE);
srs_utime_t cto =SRS_CONSTS_RTMP_TIMEOUT;
srs_utime_t sto =SRS_CONSTS_RTMP_PULSE;
sdk = new SrsBasicRtmpClient(url, cto, sto);
if ((err = sdk->connect()) != srs_success) {
@ -1284,7 +1284,7 @@ int SrsIngestHlsOutput::connect()
ret = srs_error_code(err);
srs_freep(err);
close();
srs_error("mpegts: connect %s failed, cto=%" PRId64 ", sto=%" PRId64 ". ret=%d", url.c_str(), cto, sto, ret);
srs_error("mpegts: connect %s failed, cto=%dms, sto=%dms. ret=%d", url.c_str(), srsu2msi(cto), srsu2msi(sto), ret);
return ret;
}

@ -306,7 +306,7 @@ void show_macro_features()
// gc(gop-cache)
ss << "gc:" << srs_bool2switch(SRS_PERF_GOP_CACHE);
// pq(play-queue)
ss << ", pq:" << SRS_PERF_PLAY_QUEUE << "s";
ss << ", pq:" << srsu2msi(SRS_PERF_PLAY_QUEUE) << "ms";
// cscc(chunk stream cache cid)
ss << ", cscc:[0," << SRS_PERF_CHUNK_STREAM_CACHE << ")";
// csa(complex send algorithm)
@ -341,8 +341,8 @@ void show_macro_features()
#ifdef SRS_PERF_MERGED_READ
possible_mr_latency = srsu2msi(SRS_PERF_MR_SLEEP);
#endif
srs_trace("system default latency in ms: mw(0-%d) + mr(0-%d) + play-queue(0-%d)",
srsu2msi(SRS_PERF_MW_SLEEP), possible_mr_latency, SRS_PERF_PLAY_QUEUE*1000);
srs_trace("system default latency(ms): mw(0-%d) + mr(0-%d) + play-queue(0-%d)",
srsu2msi(SRS_PERF_MW_SLEEP), possible_mr_latency, srsu2msi(SRS_PERF_PLAY_QUEUE));
#ifdef SRS_AUTO_MEM_WATCH
#warning "srs memory watcher will hurts performance. user should kill by SIGTERM or init.d script."

@ -29,7 +29,7 @@
#include <srs_kernel_io.hpp>
/**
* the system io reader/writer architecture:
* The system io reader/writer architecture:
* +---------------+ +---------------+
* | IStreamWriter | | IVectorWriter |
* +---------------+ +---------------+
@ -59,22 +59,18 @@
*/
/**
* get the statistic of channel.
* Get the statistic of channel.
*/
class ISrsProtocolStatistic
{
public:
ISrsProtocolStatistic();
virtual ~ISrsProtocolStatistic();
// for protocol
// For protocol
public:
/**
* get the total recv bytes over underlay fd.
*/
// Get the total recv bytes over underlay fd.
virtual int64_t get_recv_bytes() = 0;
/**
* get the total send bytes over underlay fd.
*/
// Get the total send bytes over underlay fd.
virtual int64_t get_send_bytes() = 0;
};
@ -88,21 +84,15 @@ public:
virtual ~ISrsProtocolReader();
// for protocol
public:
/**
* Set the timeout tm in ms for recv bytes from peer.
* @remark Use SRS_UTIME_NO_TIMEOUT to never timeout.
*/
virtual void set_recv_timeout(int64_t tm) = 0;
/**
* Get the timeout in ms for recv bytes from peer.
*/
virtual int64_t get_recv_timeout() = 0;
// for handshake.
// Set the timeout tm in srs_utime_t for recv bytes from peer.
// @remark Use SRS_UTIME_NO_TIMEOUT to never timeout.
virtual void set_recv_timeout(srs_utime_t tm) = 0;
// Get the timeout in srs_utime_t for recv bytes from peer.
virtual srs_utime_t get_recv_timeout() = 0;
// For handshake.
public:
/**
* read specified size bytes of data
* @param nread, the actually read size, NULL to ignore.
*/
// Read specified size bytes of data
// @param nread, the actually read size, NULL to ignore.
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread) = 0;
};
@ -114,33 +104,27 @@ class ISrsProtocolWriter : virtual public ISrsWriter, virtual public ISrsProtoco
public:
ISrsProtocolWriter();
virtual ~ISrsProtocolWriter();
// for protocol
// For protocol
public:
/**
* Set the timeout tm in ms for send bytes to peer.
* @remark Use SRS_UTIME_NO_TIMEOUT to never timeout.
*/
virtual void set_send_timeout(int64_t tm) = 0;
/**
* Get the timeout in ms for send bytes to peer.
*/
virtual int64_t get_send_timeout() = 0;
// Set the timeout tm in srs_utime_t for send bytes to peer.
// @remark Use SRS_UTIME_NO_TIMEOUT to never timeout.
virtual void set_send_timeout(srs_utime_t tm) = 0;
// Get the timeout in srs_utime_t for send bytes to peer.
virtual srs_utime_t get_send_timeout() = 0;
};
/**
* the reader and writer.
* The reader and writer.
*/
class ISrsProtocolReadWriter : virtual public ISrsProtocolReader, virtual public ISrsProtocolWriter
{
public:
ISrsProtocolReadWriter();
virtual ~ISrsProtocolReadWriter();
// for protocol
// For protocol
public:
/**
* Whether the specified tm in ms is never timeout.
*/
virtual bool is_never_timeout(int64_t tm) = 0;
// Whether the specified tm in srs_utime_t is never timeout.
virtual bool is_never_timeout(srs_utime_t tm) = 0;
};
#endif

@ -35,7 +35,7 @@ SrsKbpsSample::~SrsKbpsSample()
{
}
SrsKbpsSample* SrsKbpsSample::update(int64_t b, int64_t t, int k)
SrsKbpsSample* SrsKbpsSample::update(int64_t b, srs_utime_t t, int k)
{
bytes = b;
time = t;
@ -61,7 +61,7 @@ int64_t SrsKbpsSlice::get_total_bytes()
void SrsKbpsSlice::sample()
{
int64_t now = clk->time_ms();
srs_utime_t now = clk->now();
int64_t total_bytes = get_total_bytes();
if (sample_30s.time < 0) {
@ -77,20 +77,20 @@ void SrsKbpsSlice::sample()
sample_60m.update(total_bytes, now, 0);
}
if (now - sample_30s.time >= 30 * 1000) {
int kbps = (int)((total_bytes - sample_30s.bytes) * 8 / (now - sample_30s.time));
if (now - sample_30s.time >= 30 * SRS_UTIME_SECONDS) {
int kbps = (int)((total_bytes - sample_30s.bytes) * 8 / srsu2ms(now - sample_30s.time));
sample_30s.update(total_bytes, now, kbps);
}
if (now - sample_1m.time >= 60 * 1000) {
int kbps = (int)((total_bytes - sample_1m.bytes) * 8 / (now - sample_1m.time));
if (now - sample_1m.time >= 60 * SRS_UTIME_SECONDS) {
int kbps = (int)((total_bytes - sample_1m.bytes) * 8 / srsu2ms(now - sample_1m.time));
sample_1m.update(total_bytes, now, kbps);
}
if (now - sample_5m.time >= 300 * 1000) {
int kbps = (int)((total_bytes - sample_5m.bytes) * 8 / (now - sample_5m.time));
if (now - sample_5m.time >= 300 * SRS_UTIME_SECONDS) {
int kbps = (int)((total_bytes - sample_5m.bytes) * 8 / srsu2ms(now - sample_5m.time));
sample_5m.update(total_bytes, now, kbps);
}
if (now - sample_60m.time >= 3600 * 1000) {
int kbps = (int)((total_bytes - sample_60m.bytes) * 8 / (now - sample_60m.time));
if (now - sample_60m.time >= 3600 * SRS_UTIME_SECONDS) {
int kbps = (int)((total_bytes - sample_60m.bytes) * 8 / srsu2ms(now - sample_60m.time));
sample_60m.update(total_bytes, now, kbps);
}
}
@ -111,9 +111,9 @@ SrsWallClock::~SrsWallClock()
{
}
int64_t SrsWallClock::time_ms()
srs_utime_t SrsWallClock::now()
{
return srsu2ms(srs_get_system_time());
return srs_get_system_time();
}
SrsKbps::SrsKbps(SrsWallClock* c) : is(c), os(c)
@ -130,7 +130,7 @@ void SrsKbps::set_io(ISrsProtocolStatistic* in, ISrsProtocolStatistic* out)
// set input stream
// now, set start time.
if (is.starttime == 0) {
is.starttime = clk->time_ms();
is.starttime = clk->now();
}
// save the old in bytes.
if (is.io) {
@ -148,7 +148,7 @@ void SrsKbps::set_io(ISrsProtocolStatistic* in, ISrsProtocolStatistic* out)
// set output stream
// now, set start time.
if (os.starttime == 0) {
os.starttime = clk->time_ms();
os.starttime = clk->now();
}
// save the old in bytes.
if (os.io) {
@ -166,22 +166,22 @@ void SrsKbps::set_io(ISrsProtocolStatistic* in, ISrsProtocolStatistic* out)
int SrsKbps::get_send_kbps()
{
int64_t duration = clk->time_ms() - is.starttime;
srs_utime_t duration = clk->now() - is.starttime;
if (duration <= 0) {
return 0;
}
int64_t bytes = get_send_bytes();
return (int)(bytes * 8 / duration);
return (int)(bytes * 8 / srsu2ms(duration));
}
int SrsKbps::get_recv_kbps()
{
int64_t duration = clk->time_ms() - os.starttime;
srs_utime_t duration = clk->now() - os.starttime;
if (duration <= 0) {
return 0;
}
int64_t bytes = get_recv_bytes();
return (int)(bytes * 8 / duration);
return (int)(bytes * 8 / srsu2ms(duration));
}
int SrsKbps::get_send_kbps_30s()

@ -38,13 +38,13 @@ class SrsKbpsSample
{
public:
int64_t bytes;
int64_t time;
srs_utime_t time;
int kbps;
public:
SrsKbpsSample();
virtual ~SrsKbpsSample();
public:
virtual SrsKbpsSample* update(int64_t b, int64_t t, int k);
virtual SrsKbpsSample* update(int64_t b, srs_utime_t t, int k);
};
/**
@ -74,7 +74,7 @@ public:
// @remark, use total_bytes() to get the total bytes of slice.
int64_t bytes;
// slice starttime, the first time to record bytes.
int64_t starttime;
srs_utime_t starttime;
// session startup bytes number for io when set it,
// the base offset of bytes for io.
int64_t io_bytes_base;
@ -129,9 +129,9 @@ public:
virtual ~SrsWallClock();
public:
/**
* Current time in ms.
* Current time in srs_utime_t.
*/
virtual int64_t time_ms();
virtual srs_utime_t now();
};
/**

@ -39,7 +39,7 @@ SrsMessageArray::~SrsMessageArray()
{
// we just free the msgs itself,
// both delete and delete[] is ok,
// for each msg in msgs is already freed by send_and_free_messages.
// for all msgs is already freed by send_and_free_messages.
srs_freepa(msgs);
}

@ -41,7 +41,7 @@ class SrsMessageArray
{
public:
/**
* when user already send the msg in msgs, please set to NULL,
* when user already send all msgs, please set to NULL,
* for instance, msg= msgs.msgs[i], msgs.msgs[i]=NULL, send(msg),
* where send(msg) will always send and free it.
*/

@ -304,22 +304,22 @@ void SrsProtocol::set_recv_buffer(int buffer_size)
}
#endif
void SrsProtocol::set_recv_timeout(int64_t tm)
void SrsProtocol::set_recv_timeout(srs_utime_t tm)
{
return skt->set_recv_timeout(tm);
}
int64_t SrsProtocol::get_recv_timeout()
srs_utime_t SrsProtocol::get_recv_timeout()
{
return skt->get_recv_timeout();
}
void SrsProtocol::set_send_timeout(int64_t tm)
void SrsProtocol::set_send_timeout(srs_utime_t tm)
{
return skt->set_send_timeout(tm);
}
int64_t SrsProtocol::get_send_timeout()
srs_utime_t SrsProtocol::get_send_timeout()
{
return skt->get_send_timeout();
}
@ -1815,12 +1815,12 @@ SrsRtmpClient::~SrsRtmpClient()
srs_freep(hs_bytes);
}
void SrsRtmpClient::set_recv_timeout(int64_t tm)
void SrsRtmpClient::set_recv_timeout(srs_utime_t tm)
{
protocol->set_recv_timeout(tm);
}
void SrsRtmpClient::set_send_timeout(int64_t tm)
void SrsRtmpClient::set_send_timeout(srs_utime_t tm)
{
protocol->set_send_timeout(tm);
}
@ -2210,22 +2210,22 @@ void SrsRtmpServer::set_recv_buffer(int buffer_size)
}
#endif
void SrsRtmpServer::set_recv_timeout(int64_t tm)
void SrsRtmpServer::set_recv_timeout(srs_utime_t tm)
{
protocol->set_recv_timeout(tm);
}
int64_t SrsRtmpServer::get_recv_timeout()
srs_utime_t SrsRtmpServer::get_recv_timeout()
{
return protocol->get_recv_timeout();
}
void SrsRtmpServer::set_send_timeout(int64_t tm)
void SrsRtmpServer::set_send_timeout(srs_utime_t tm)
{
protocol->set_send_timeout(tm);
}
int64_t SrsRtmpServer::get_send_timeout()
srs_utime_t SrsRtmpServer::get_send_timeout()
{
return protocol->get_send_timeout();
}
@ -2408,7 +2408,7 @@ srs_error_t SrsRtmpServer::response_connect_app(SrsRequest *req, const char* ser
return err;
}
#define SRS_RTMP_REDIRECT_TMMS 3000
#define SRS_RTMP_REDIRECT_TIMEOUT (3 * SRS_UTIME_SECONDS)
srs_error_t SrsRtmpServer::redirect(SrsRequest* r, string host, int port, bool& accepted)
{
srs_error_t err = srs_success;
@ -2434,7 +2434,7 @@ srs_error_t SrsRtmpServer::redirect(SrsRequest* r, string host, int port, bool&
// client must response a call message.
// or we never know whether the client is ok to redirect.
protocol->set_recv_timeout(SRS_RTMP_REDIRECT_TMMS);
protocol->set_recv_timeout(SRS_RTMP_REDIRECT_TIMEOUT);
if (true) {
SrsCommonMessage* msg = NULL;
SrsCallPacket* pkt = NULL;
@ -2485,7 +2485,7 @@ srs_error_t SrsRtmpServer::on_bw_done()
return err;
}
srs_error_t SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& stream_name, double& duration)
srs_error_t SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& stream_name, srs_utime_t& duration)
{
type = SrsRtmpConnUnknown;
srs_error_t err = srs_success;
@ -2887,7 +2887,7 @@ srs_error_t SrsRtmpServer::start_flash_publish(int stream_id)
return err;
}
srs_error_t SrsRtmpServer::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsRtmpConnType& type, string& stream_name, double& duration)
srs_error_t SrsRtmpServer::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsRtmpConnType& type, string& stream_name, srs_utime_t& duration)
{
srs_error_t err = srs_success;
@ -2986,11 +2986,11 @@ srs_error_t SrsRtmpServer::identify_flash_publish_client(SrsPublishPacket* req,
return srs_success;
}
srs_error_t SrsRtmpServer::identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type, string& stream_name, double& duration)
srs_error_t SrsRtmpServer::identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type, string& stream_name, srs_utime_t& duration)
{
type = SrsRtmpConnPlay;
stream_name = req->stream_name;
duration = req->duration;
duration = srs_utime_t(req->duration) * SRS_UTIME_MILLISECONDS;
return srs_success;
}

@ -304,18 +304,17 @@ public:
#endif
public:
/**
* set/get the recv timeout in ms.
* set/get the recv timeout in srs_utime_t.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
// TODO: FIXME: Refine tm in time unit.
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
/**
* set/get the send timeout in ms.
* set/get the send timeout in srs_utime_t.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
/**
* get recv/send bytes.
*/
@ -567,8 +566,8 @@ public:
// for play live stream,
// used to specified the stop when exceed the duration.
// @see https://github.com/ossrs/srs/issues/45
// in ms.
double duration;
// in srs_utime_t.
srs_utime_t duration;
// the token in the connect request,
// used for edge traverse to origin authentication,
// @see https://github.com/ossrs/srs/issues/104
@ -688,8 +687,8 @@ public:
virtual ~SrsRtmpClient();
// protocol methods proxy
public:
virtual void set_recv_timeout(int64_t tm);
virtual void set_send_timeout(int64_t tm);
virtual void set_recv_timeout(srs_utime_t tm);
virtual void set_send_timeout(srs_utime_t tm);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual srs_error_t recv_message(SrsCommonMessage** pmsg);
@ -805,17 +804,17 @@ public:
virtual void set_recv_buffer(int buffer_size);
#endif
/**
* set/get the recv timeout in ms.
* set/get the recv timeout in srs_utime_t.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
/**
* set/get the send timeout in ms.
* set/get the send timeout in srs_utime_t.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
/**
* get recv/send bytes.
*/
@ -912,7 +911,7 @@ public:
* @stream_name, output the client publish/play stream name. @see: SrsRequest.stream
* @duration, output the play client duration. @see: SrsRequest.duration
*/
virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType& type, std::string& stream_name, double& duration);
virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType& type, std::string& stream_name, srs_utime_t& duration);
/**
* set the chunk size when client type identified.
*/
@ -984,12 +983,12 @@ public:
return protocol->expect_message<T>(pmsg, ppacket);
}
private:
virtual srs_error_t identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsRtmpConnType& type, std::string& stream_name, double& duration);
virtual srs_error_t identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsRtmpConnType& type, std::string& stream_name, srs_utime_t& duration);
virtual srs_error_t identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsRtmpConnType& type, std::string& stream_name);
virtual srs_error_t identify_haivision_publish_client(SrsFMLEStartPacket* req, SrsRtmpConnType& type, std::string& stream_name);
virtual srs_error_t identify_flash_publish_client(SrsPublishPacket* req, SrsRtmpConnType& type, std::string& stream_name);
private:
virtual srs_error_t identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type, std::string& stream_name, double& duration);
virtual srs_error_t identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type, std::string& stream_name, srs_utime_t& duration);
};
/**

@ -54,8 +54,7 @@ SrsHttpClient::~SrsHttpClient()
srs_freep(parser);
}
// TODO: FIXME: use ms for timeout.
srs_error_t SrsHttpClient::initialize(string h, int p, int64_t tm)
srs_error_t SrsHttpClient::initialize(string h, int p, srs_utime_t tm)
{
srs_error_t err = srs_success;
@ -186,7 +185,7 @@ srs_error_t SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
return err;
}
void SrsHttpClient::set_recv_timeout(int64_t tm)
void SrsHttpClient::set_recv_timeout(srs_utime_t tm)
{
transport->set_recv_timeout(tm);
}
@ -223,10 +222,10 @@ srs_error_t SrsHttpClient::connect()
transport = new SrsTcpClient(host, port, timeout);
if ((err = transport->connect()) != srs_success) {
disconnect();
return srs_error_wrap(err, "http: tcp connect %s:%d to=%d", host.c_str(), port, (int)timeout);
return srs_error_wrap(err, "http: tcp connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout));
}
// Set the recv/send timeout in ms.
// Set the recv/send timeout in srs_utime_t.
transport->set_recv_timeout(timeout);
transport->set_send_timeout(timeout);

@ -41,7 +41,7 @@ class SrsWallClock;
class SrsTcpClient;
// the default timeout for http client.
#define SRS_HTTP_CLIENT_TMMS (30*1000)
#define SRS_HTTP_CLIENT_TIMEOUT (30 * SRS_UTIME_SECONDS)
/**
* The client to GET/POST/PUT/DELETE over HTTP.
@ -63,8 +63,8 @@ private:
SrsKbps* kbps;
SrsWallClock* clk;
private:
// The timeout in ms.
int64_t timeout;
// The timeout in srs_utime_t.
srs_utime_t timeout;
// The host name or ip.
std::string host;
int port;
@ -74,10 +74,10 @@ public:
public:
/**
* Initliaze the client, disconnect the transport, renew the HTTP parser.
* @param tm The underlayer TCP transport timeout in ms.
* @param tm The underlayer TCP transport timeout in srs_utime_t.
* @remark we will set default values in headers, which can be override by set_header.
*/
virtual srs_error_t initialize(std::string h, int p, int64_t tm = SRS_HTTP_CLIENT_TMMS);
virtual srs_error_t initialize(std::string h, int p, srs_utime_t tm = SRS_HTTP_CLIENT_TIMEOUT);
/**
* Set HTTP request header in header[k]=v.
* @return the HTTP client itself.
@ -101,7 +101,7 @@ public:
*/
virtual srs_error_t get(std::string path, std::string req, ISrsHttpMessage** ppmsg);
private:
virtual void set_recv_timeout(int64_t tm);
virtual void set_recv_timeout(srs_utime_t tm);
public:
virtual void kbps_sample(const char* label, int64_t age);
private:

@ -33,7 +33,7 @@ using namespace std;
#include <srs_protocol_utility.hpp>
#include <srs_service_utility.hpp>
SrsBasicRtmpClient::SrsBasicRtmpClient(string u, int64_t ctm, int64_t stm)
SrsBasicRtmpClient::SrsBasicRtmpClient(string u, srs_utime_t ctm, srs_utime_t stm)
{
clk = new SrsWallClock();
kbps = new SrsKbps(clk);
@ -65,7 +65,7 @@ srs_error_t SrsBasicRtmpClient::connect()
close();
transport = new SrsTcpClient(req->host, req->port, connect_timeout);
transport = new SrsTcpClient(req->host, req->port, srs_utime_t(connect_timeout));
client = new SrsRtmpClient(transport);
kbps->set_io(transport, transport);
@ -234,7 +234,7 @@ srs_error_t SrsBasicRtmpClient::send_and_free_message(SrsSharedPtrMessage* msg)
return client->send_and_free_message(msg, stream_id);
}
void SrsBasicRtmpClient::set_recv_timeout(int64_t timeout)
void SrsBasicRtmpClient::set_recv_timeout(srs_utime_t timeout)
{
transport->set_recv_timeout(timeout);
}

@ -50,8 +50,8 @@ class SrsBasicRtmpClient
{
private:
std::string url;
int64_t connect_timeout;
int64_t stream_timeout;
srs_utime_t connect_timeout;
srs_utime_t stream_timeout;
protected:
SrsRequest* req;
private:
@ -63,9 +63,9 @@ private:
public:
// Constructor.
// @param u The RTMP url, for example, rtmp://ip:port/app/stream?domain=vhost
// @param ctm The timeout in ms to connect to server.
// @param stm The timeout in ms to delivery A/V stream.
SrsBasicRtmpClient(std::string u, int64_t ctm, int64_t stm);
// @param ctm The timeout in srs_utime_t to connect to server.
// @param stm The timeout in srs_utime_t to delivery A/V stream.
SrsBasicRtmpClient(std::string u, srs_utime_t ctm, srs_utime_t stm);
virtual ~SrsBasicRtmpClient();
public:
// Connect, handshake and connect app to RTMP server.
@ -87,7 +87,7 @@ public:
virtual srs_error_t send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
virtual srs_error_t send_and_free_message(SrsSharedPtrMessage* msg);
public:
virtual void set_recv_timeout(int64_t timeout);
virtual void set_recv_timeout(srs_utime_t timeout);
};
#endif

@ -104,12 +104,11 @@ srs_thread_t srs_thread_self()
return (srs_thread_t)st_thread_self();
}
// TODO: FXIME: Refine tm in time unit.
srs_error_t srs_socket_connect(string server, int port, int64_t tm, srs_netfd_t* pstfd)
srs_error_t srs_socket_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd)
{
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
if (tm != SRS_UTIME_NO_TIMEOUT) {
timeout = (st_utime_t)(tm * 1000);
timeout = tm;
}
*pstfd = NULL;
@ -247,27 +246,27 @@ srs_error_t SrsStSocket::initialize(srs_netfd_t fd)
return srs_success;
}
bool SrsStSocket::is_never_timeout(int64_t tm)
bool SrsStSocket::is_never_timeout(srs_utime_t tm)
{
return tm == SRS_UTIME_NO_TIMEOUT;
}
void SrsStSocket::set_recv_timeout(int64_t tm)
void SrsStSocket::set_recv_timeout(srs_utime_t tm)
{
rtm = tm;
}
int64_t SrsStSocket::get_recv_timeout()
srs_utime_t SrsStSocket::get_recv_timeout()
{
return rtm;
}
void SrsStSocket::set_send_timeout(int64_t tm)
void SrsStSocket::set_send_timeout(srs_utime_t tm)
{
stm = tm;
}
int64_t SrsStSocket::get_send_timeout()
srs_utime_t SrsStSocket::get_send_timeout()
{
return stm;
}
@ -290,7 +289,7 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
if (rtm == SRS_UTIME_NO_TIMEOUT) {
nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000);
nb_read = st_read((st_netfd_t)stfd, buf, size, rtm);
}
if (nread) {
@ -303,7 +302,7 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
if (nb_read <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm));
}
if (nb_read == 0) {
@ -326,7 +325,7 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
if (rtm == SRS_UTIME_NO_TIMEOUT) {
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000);
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm);
}
if (nread) {
@ -339,7 +338,7 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
if (nb_read != (ssize_t)size) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm));
}
if (nb_read >= 0) {
@ -362,7 +361,7 @@ srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
if (stm == SRS_UTIME_NO_TIMEOUT) {
nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000);
nb_write = st_write((st_netfd_t)stfd, buf, size, stm);
}
if (nwrite) {
@ -374,7 +373,7 @@ srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", stm);
return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", srsu2msi(stm));
}
return srs_error_new(ERROR_SOCKET_WRITE, "write");
@ -393,7 +392,7 @@ srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
if (stm == SRS_UTIME_NO_TIMEOUT) {
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000);
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm);
}
if (nwrite) {
@ -405,7 +404,7 @@ srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", stm);
return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", srsu2msi(stm));
}
return srs_error_new(ERROR_SOCKET_WRITE, "writev");
@ -416,7 +415,7 @@ srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
return err;
}
SrsTcpClient::SrsTcpClient(string h, int p, int64_t tm)
SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm)
{
stfd = NULL;
io = new SrsStSocket();
@ -441,7 +440,7 @@ srs_error_t SrsTcpClient::connect()
srs_assert(stfd == NULL);
if ((err = srs_socket_connect(host, port, timeout, &stfd)) != srs_success) {
return srs_error_wrap(err, "tcp: connect %s:%d to=%d", host.c_str(), port, (int)timeout);
return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout));
}
if ((err = io->initialize(stfd)) != srs_success) {
@ -461,27 +460,27 @@ void SrsTcpClient::close()
srs_close_stfd(stfd);
}
bool SrsTcpClient::is_never_timeout(int64_t tm)
bool SrsTcpClient::is_never_timeout(srs_utime_t tm)
{
return io->is_never_timeout(tm);
}
void SrsTcpClient::set_recv_timeout(int64_t tm)
void SrsTcpClient::set_recv_timeout(srs_utime_t tm)
{
io->set_recv_timeout(tm);
}
int64_t SrsTcpClient::get_recv_timeout()
srs_utime_t SrsTcpClient::get_recv_timeout()
{
return io->get_recv_timeout();
}
void SrsTcpClient::set_send_timeout(int64_t tm)
void SrsTcpClient::set_send_timeout(srs_utime_t tm)
{
io->set_send_timeout(tm);
}
int64_t SrsTcpClient::get_send_timeout()
srs_utime_t SrsTcpClient::get_send_timeout()
{
return io->get_send_timeout();
}

@ -53,8 +53,8 @@ extern void srs_socket_reuse_addr(int fd);
extern srs_thread_t srs_thread_self();
// client open socket and connect to server.
// @param tm The timeout in ms.
extern srs_error_t srs_socket_connect(std::string server, int port, int64_t tm, srs_netfd_t* pstfd);
// @param tm The timeout in srs_utime_t.
extern srs_error_t srs_socket_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd);
// Wrap for coroutine.
extern srs_cond_t srs_cond_new();
@ -109,10 +109,10 @@ public:
class SrsStSocket : public ISrsProtocolReadWriter
{
private:
// The recv/send timeout in ms.
// @remark Use SRS_UTIME_NO_TIMEOUT for never timeout in ms.
int64_t rtm;
int64_t stm;
// The recv/send timeout in srs_utime_t.
// @remark Use SRS_UTIME_NO_TIMEOUT for never timeout.
srs_utime_t rtm;
srs_utime_t stm;
// The recv/send data in bytes
int64_t rbytes;
int64_t sbytes;
@ -125,11 +125,11 @@ public:
// Initialize the socket with stfd, user must manage it.
virtual srs_error_t initialize(srs_netfd_t fd);
public:
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual bool is_never_timeout(srs_utime_t tm);
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
public:
@ -149,7 +149,7 @@ public:
* The client to connect to server over TCP.
* User must never reuse the client when close it.
* Usage:
* SrsTcpClient client("127.0.0.1", 1935,9000);
* SrsTcpClient client("127.0.0.1", 1935, 9 * SRS_UTIME_SECONDS);
* client.connect();
* client.write("Hello world!", 12, NULL);
* client.read(buf, 4096, NULL);
@ -163,16 +163,16 @@ private:
private:
std::string host;
int port;
// The timeout in ms.
int64_t timeout;
// The timeout in srs_utime_t.
srs_utime_t timeout;
public:
/**
* Constructor.
* @param h the ip or hostname of server.
* @param p the port to connect to.
* @param tm the timeout in ms.
* @param tm the timeout in srs_utime_t.
*/
SrsTcpClient(std::string h, int p, int64_t tm);
SrsTcpClient(std::string h, int p, srs_utime_t tm);
virtual ~SrsTcpClient();
public:
/**
@ -188,11 +188,11 @@ private:
virtual void close();
// interface ISrsProtocolReadWriter
public:
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual bool is_never_timeout(srs_utime_t tm);
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);

@ -1806,7 +1806,7 @@ VOID TEST(ConfigMainTest, CheckConf_vhost_ingest_id)
EXPECT_TRUE(ERROR_SUCCESS != conf.parse(_MIN_OK_CONF"vhost v{ingest{} ingest{}}"));
}
VOID TEST(ConfigUnitTest, CheckDefaultValues)
VOID TEST(ConfigUnitTest, CheckDefaultValuesVhost)
{
MockSrsConfig conf;
@ -1870,11 +1870,46 @@ VOID TEST(ConfigUnitTest, CheckDefaultValues)
if (true) {
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF));
EXPECT_EQ(0, conf.get_hls_dispose(""));
EXPECT_EQ(10 * SRS_UTIME_SECONDS, conf.get_hls_fragment(""));
EXPECT_EQ(60 * SRS_UTIME_SECONDS, conf.get_hls_window(""));
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{hls{hls_dispose 10;}}"));
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{hls{hls_dispose 10;hls_fragment 20;hls_window 30;}}"));
EXPECT_EQ(10 * SRS_UTIME_SECONDS, conf.get_hls_dispose("v"));
EXPECT_EQ(20 * SRS_UTIME_SECONDS, conf.get_hls_fragment("v"));
EXPECT_EQ(30 * SRS_UTIME_SECONDS, conf.get_hls_window("v"));
}
if (true) {
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF));
EXPECT_EQ(10 * SRS_UTIME_SECONDS, conf.get_hds_fragment(""));
EXPECT_EQ(60 * SRS_UTIME_SECONDS, conf.get_hds_window(""));
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{hds{hds_fragment 20;hds_window 30;}}"));
EXPECT_EQ(20 * SRS_UTIME_SECONDS, conf.get_hds_fragment("v"));
EXPECT_EQ(30 * SRS_UTIME_SECONDS, conf.get_hds_window("v"));
}
if (true) {
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF));
EXPECT_EQ(30 * SRS_UTIME_SECONDS, conf.get_queue_length(""));
EXPECT_EQ(0, conf.get_send_min_interval(""));
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{play{queue_length 100;send_min_interval 10;}}"));
EXPECT_EQ(100 * SRS_UTIME_SECONDS, conf.get_queue_length("v"));
EXPECT_EQ(10 * SRS_UTIME_MILLISECONDS, conf.get_send_min_interval("v"));
}
if (true) {
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF));
EXPECT_EQ(0, conf.get_vhost_http_remux_fast_cache(""));
EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{http_remux{fast_cache 10;}}"));
EXPECT_EQ(10 * SRS_UTIME_SECONDS, conf.get_vhost_http_remux_fast_cache("v"));
}
}
VOID TEST(ConfigUnitTest, CheckDefaultValuesGlobal)
{
if (true) {
srs_utime_t t0 = srs_update_system_time();
srs_usleep(10 * SRS_UTIME_MILLISECONDS);

@ -43,7 +43,7 @@ MockEmptyIO::~MockEmptyIO()
{
}
bool MockEmptyIO::is_never_timeout(int64_t /*tm*/)
bool MockEmptyIO::is_never_timeout(srs_utime_t /*tm*/)
{
return true;
}
@ -58,11 +58,11 @@ srs_error_t MockEmptyIO::write(void* /*buf*/, size_t /*size*/, ssize_t* /*nwrite
return srs_success;
}
void MockEmptyIO::set_recv_timeout(int64_t /*tm*/)
void MockEmptyIO::set_recv_timeout(srs_utime_t /*tm*/)
{
}
int64_t MockEmptyIO::get_recv_timeout()
srs_utime_t MockEmptyIO::get_recv_timeout()
{
return -1;
}
@ -72,11 +72,11 @@ int64_t MockEmptyIO::get_recv_bytes()
return -1;
}
void MockEmptyIO::set_send_timeout(int64_t /*tm*/)
void MockEmptyIO::set_send_timeout(srs_utime_t /*tm*/)
{
}
int64_t MockEmptyIO::get_send_timeout()
srs_utime_t MockEmptyIO::get_send_timeout()
{
return 0;
}
@ -112,7 +112,7 @@ MockBufferIO* MockBufferIO::append(string data)
return this;
}
bool MockBufferIO::is_never_timeout(int64_t tm)
bool MockBufferIO::is_never_timeout(srs_utime_t tm)
{
return tm == SRS_UTIME_NO_TIMEOUT;
}
@ -142,12 +142,12 @@ srs_error_t MockBufferIO::write(void* buf, size_t size, ssize_t* nwrite)
return srs_success;
}
void MockBufferIO::set_recv_timeout(int64_t tm)
void MockBufferIO::set_recv_timeout(srs_utime_t tm)
{
rtm = tm;
}
int64_t MockBufferIO::get_recv_timeout()
srs_utime_t MockBufferIO::get_recv_timeout()
{
return rtm;
}
@ -157,12 +157,12 @@ int64_t MockBufferIO::get_recv_bytes()
return rbytes;
}
void MockBufferIO::set_send_timeout(int64_t tm)
void MockBufferIO::set_send_timeout(srs_utime_t tm)
{
stm = tm;
}
int64_t MockBufferIO::get_send_timeout()
srs_utime_t MockBufferIO::get_send_timeout()
{
return stm;
}
@ -264,14 +264,14 @@ MockWallClock::~MockWallClock()
{
}
int64_t MockWallClock::time_ms()
srs_utime_t MockWallClock::now()
{
return clock;
}
MockWallClock* MockWallClock::set_clock(int64_t ms)
MockWallClock* MockWallClock::set_clock(srs_utime_t v)
{
clock = ms;
clock = v;
return this;
}
@ -693,11 +693,11 @@ VOID TEST(ProtocolStackTest, ProtocolTimeout)
EXPECT_TRUE(SRS_UTIME_NO_TIMEOUT == proto.get_recv_timeout());
EXPECT_TRUE(SRS_UTIME_NO_TIMEOUT == proto.get_send_timeout());
proto.set_recv_timeout(10);
EXPECT_TRUE(10 == proto.get_recv_timeout());
proto.set_recv_timeout(10 * SRS_UTIME_MILLISECONDS);
EXPECT_TRUE(10 * SRS_UTIME_MILLISECONDS == proto.get_recv_timeout());
proto.set_send_timeout(10);
EXPECT_TRUE(10 == proto.get_send_timeout());
proto.set_send_timeout(10 * SRS_UTIME_MILLISECONDS);
EXPECT_TRUE(10 * SRS_UTIME_MILLISECONDS == proto.get_send_timeout());
}
/**
@ -5707,7 +5707,7 @@ VOID TEST(ProtocolKbpsTest, Connections)
EXPECT_EQ(0, kbps->get_send_kbps_5m());
// 800kbps in 30s.
clock->set_clock(30 * 1000);
clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS);
io->set_in(30 * 100 * 1000)->set_out(30 * 100 * 1000);
kbps->sample();
@ -5720,7 +5720,7 @@ VOID TEST(ProtocolKbpsTest, Connections)
EXPECT_EQ(0, kbps->get_send_kbps_5m());
// 800kbps in 300s.
clock->set_clock(330 * 1000);
clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS);
io->set_in(330 * 100 * 1000)->set_out(330 * 100 * 1000);
kbps->sample();
@ -5755,7 +5755,7 @@ VOID TEST(ProtocolKbpsTest, Connections)
EXPECT_EQ(0, kbps->get_send_kbps_5m());
// 800kbps in 30s.
clock->set_clock(30 * 1000);
clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS);
io->set_in(30 * 100 * 1000);
kbps->sample();
@ -5768,7 +5768,7 @@ VOID TEST(ProtocolKbpsTest, Connections)
EXPECT_EQ(0, kbps->get_send_kbps_5m());
// 800kbps in 300s.
clock->set_clock(330 * 1000);
clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS);
io->set_in(330 * 100 * 1000);
kbps->sample();
@ -5803,7 +5803,7 @@ VOID TEST(ProtocolKbpsTest, Connections)
EXPECT_EQ(0, kbps->get_send_kbps_5m());
// 800kbps in 30s.
clock->set_clock(30 * 1000);
clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS);
io->set_out(30 * 100 * 1000);
kbps->sample();
@ -5816,7 +5816,7 @@ VOID TEST(ProtocolKbpsTest, Connections)
EXPECT_EQ(0, kbps->get_send_kbps_5m());
// 800kbps in 300s.
clock->set_clock(330 * 1000);
clock->set_clock(330 * 1000 * SRS_UTIME_MILLISECONDS);
io->set_out(330 * 100 * 1000);
kbps->sample();
@ -5901,7 +5901,7 @@ VOID TEST(ProtocolKbpsTest, Delta)
EXPECT_EQ(0, kbps->get_send_kbps_5m());
// 800kbps in 30s.
clock->set_clock(30 * 1000);
clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS);
kbps->add_delta(30 * in, 30 * out);
kbps->sample();
@ -5943,7 +5943,7 @@ VOID TEST(ProtocolKbpsTest, RAWStatistic)
EXPECT_EQ(0, kbps->get_send_kbps_5m());
// 800kbps in 30s.
clock->set_clock(30 * 1000);
clock->set_clock(30 * 1000 * SRS_UTIME_MILLISECONDS);
io->set_out(30 * 100 * 1000);
kbps->sample();

@ -48,20 +48,20 @@ public:
virtual ~MockEmptyIO();
// for protocol
public:
virtual bool is_never_timeout(int64_t tm);
virtual bool is_never_timeout(srs_utime_t tm);
// for handshake.
public:
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
// for protocol
public:
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual int64_t get_recv_bytes();
// for protocol
public:
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual int64_t get_send_bytes();
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
// for protocol/amf0/msg-codec
@ -72,9 +72,9 @@ public:
class MockBufferIO : public ISrsProtocolReadWriter
{
public:
// The send/recv timeout in ms.
int64_t rtm;
int64_t stm;
// The send/recv timeout in srs_utime_t.
srs_utime_t rtm;
srs_utime_t stm;
// The send/recv data in bytes.
int64_t rbytes;
int64_t sbytes;
@ -89,20 +89,20 @@ public:
virtual MockBufferIO* append(std::string data);
// for protocol
public:
virtual bool is_never_timeout(int64_t tm);
virtual bool is_never_timeout(srs_utime_t tm);
// for handshake.
public:
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
// for protocol
public:
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual int64_t get_recv_bytes();
// for protocol
public:
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual int64_t get_send_bytes();
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
// for protocol/amf0/msg-codec
@ -136,9 +136,9 @@ public:
MockWallClock();
virtual ~MockWallClock();
public:
virtual int64_t time_ms();
virtual srs_utime_t now();
public:
virtual MockWallClock* set_clock(int64_t ms);
virtual MockWallClock* set_clock(srs_utime_t v);
};
#endif

Loading…
Cancel
Save