merge from srs2.

pull/556/head
winlin 10 years ago
commit 807299589e

@ -342,6 +342,7 @@ Remark:
## History
* v2.0, 2015-08-14, use send_min_interval for stream control. 2.0.183
* v2.0, 2015-08-12, enable the SRS_PERF_TCP_NODELAY and add config tcp_nodelay. 2.0.182
* v2.0, 2015-08-11, for [#442](https://github.com/simple-rtmp-server/srs/issues/442) support kickoff connected client. 2.0.181
* v2.0, 2015-07-21, for [#169](https://github.com/simple-rtmp-server/srs/issues/169) support default values for transcode. 2.0.180

@ -858,6 +858,27 @@ vhost min.delay.com {
tcp_nodelay on;
}
# the vhost to control the stream delivery feature
vhost stream.control.com {
# @see vhost mrw.srs.com for detail.
min_latency on;
mr {
enabled off;
}
mw_latency 100;
# @see vhost min.delay.com
queue_length 10;
tcp_nodelay on;
# the minimal packets send interval in ms,
# used to control the ndiff of stream by srs_rtmp_dump,
# for example, some device can only accept some stream which
# delivery packets in constant interval(not cbr).
# @remark 0 to disable the minimal interval.
# @remark >0 to make the srs to send message one by one.
# default: 0
send_min_interval 3;
}
# the vhost for antisuck.
vhost refer.anti_suck.com {
# the common refer for play and publish.

@ -257,6 +257,7 @@ int main(int argc, char** argv)
}
u_int32_t pre_timestamp = 0;
int64_t pre_now = srs_utils_time_ms();
for (;;) {
int size;
char type;
@ -268,11 +269,12 @@ int main(int argc, char** argv)
goto rtmp_destroy;
}
if (srs_human_print_rtmp_packet2(type, timestamp, data, size, pre_timestamp) != 0) {
if (srs_human_print_rtmp_packet3(type, timestamp, data, size, pre_timestamp, pre_now) != 0) {
srs_human_trace("print rtmp packet failed.");
goto rtmp_destroy;
}
pre_timestamp = timestamp;
pre_now = srs_utils_time_ms();
// we only write some types of messages to flv file.
int is_flv_msg = type == SRS_RTMP_TYPE_AUDIO

@ -757,6 +757,17 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
}
srs_trace("vhost %s reload mw success.", vhost.c_str());
}
// smi(send_min_interval), only one per vhost
if (!srs_directive_equals(new_vhost->get("send_min_interval"), old_vhost->get("send_min_interval"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_vhost_smi(vhost)) != ERROR_SUCCESS) {
srs_error("vhost %s notify subscribes smi failed. ret=%d", vhost.c_str(), ret);
return ret;
}
}
srs_trace("vhost %s reload smi success.", vhost.c_str());
}
// min_latency, only one per vhost
if (!srs_directive_equals(new_vhost->get("min_latency"), old_vhost->get("min_latency"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
@ -1750,7 +1761,7 @@ int SrsConfig::check_config()
&& n != "time_jitter" && n != "mix_correct"
&& n != "atc" && n != "atc_auto"
&& n != "debug_srs_upnode"
&& n != "mr" && n != "mw_latency" && n != "min_latency" && n != "tcp_nodelay"
&& n != "mr" && n != "mw_latency" && n != "min_latency" && n != "tcp_nodelay" && n != "send_min_interval"
&& n != "security" && n != "http_remux"
&& n != "http" && n != "http_static"
&& n != "hds"
@ -2506,6 +2517,23 @@ bool SrsConfig::get_tcp_nodelay(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
int SrsConfig::get_send_min_interval(string vhost)
{
static int DEFAULT = 0;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("send_min_interval");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
}
int SrsConfig::get_global_chunk_size()
{
SrsConfDirective* conf = root->get("chunk_size");

@ -526,6 +526,10 @@ public:
* whether enable tcp nodelay for all clients of vhost.
*/
virtual bool get_tcp_nodelay(std::string vhost);
/**
* the minimal send interval in ms.
*/
virtual int get_send_min_interval(std::string vhost);
private:
/**
* get the global chunk size.

@ -170,6 +170,11 @@ int ISrsReloadHandler::on_reload_vhost_mw(string /*vhost*/)
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_vhost_smi(string /*vhost*/)
{
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_vhost_realtime(string /*vhost*/)
{
return ERROR_SUCCESS;

@ -73,6 +73,7 @@ public:
virtual int on_reload_vhost_dvr(std::string vhost);
virtual int on_reload_vhost_mr(std::string vhost);
virtual int on_reload_vhost_mw(std::string vhost);
virtual int on_reload_vhost_smi(std::string vhost);
virtual int on_reload_vhost_realtime(std::string vhost);
virtual int on_reload_vhost_chunk_size(std::string vhost);
virtual int on_reload_vhost_transcode(std::string vhost);

@ -93,6 +93,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c)
mw_sleep = SRS_PERF_MW_SLEEP;
mw_enabled = false;
realtime = SRS_PERF_MIN_LATENCY_ENABLED;
send_min_interval = 0;
_srs_config->subscribe(this);
}
@ -247,6 +248,23 @@ int SrsRtmpConn::on_reload_vhost_mw(string vhost)
return ret;
}
int SrsRtmpConn::on_reload_vhost_smi(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
int smi = _srs_config->get_send_min_interval(vhost);
if (smi != send_min_interval) {
srs_trace("apply smi %d=>%d", send_min_interval, smi);
send_min_interval = smi;
}
return ret;
}
int SrsRtmpConn::on_reload_vhost_realtime(string vhost)
{
int ret = ERROR_SUCCESS;
@ -589,10 +607,15 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
// when mw_sleep changed, resize the socket send buffer.
mw_enabled = true;
change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
// set the sock options.
set_sock_options();
srs_trace("start play smi=%d, mw_sleep=%d, mw_enabled=%d, realtime=%d",
send_min_interval, mw_sleep, mw_enabled, realtime);
while (!disposed) {
// collect elapse for pithy print.
pprint->elapse();
@ -641,7 +664,8 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
// @remark when enable send_min_interval, only fetch one message a time.
int count = send_min_interval? 1 : 0;
if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
return ret;
@ -716,6 +740,11 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
return ret;
}
}
// apply the minimal interval for delivery stream in ms.
if (send_min_interval > 0) {
st_usleep(send_min_interval * 1000);
}
}
return ret;

@ -85,6 +85,8 @@ private:
// for realtime
// @see https://github.com/simple-rtmp-server/srs/issues/257
bool realtime;
// the minimal interval in ms for delivery stream.
int send_min_interval;
public:
SrsRtmpConn(SrsServer* svr, st_netfd_t c);
virtual ~SrsRtmpConn();
@ -96,6 +98,7 @@ protected:
public:
virtual int on_reload_vhost_removed(std::string vhost);
virtual int on_reload_vhost_mw(std::string vhost);
virtual int on_reload_vhost_smi(std::string vhost);
virtual int on_reload_vhost_realtime(std::string vhost);
// interface IKbpsDelta
public:

@ -301,7 +301,8 @@ int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, in
}
srs_assert(max_count > 0);
count = srs_min(max_count, nb_msgs);
// when count is 0, dumps all; otherwise, dumps no more than count.
count = srs_min(max_count, count? count : nb_msgs);
SrsSharedPtrMessage** omsgs = msgs.data();
for (int i = 0; i < count; i++) {

@ -173,11 +173,12 @@ public:
*/
virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL);
/**
* get packets in consumer queue.
* @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it.
* @count the count in array, output param.
* @max_count the max count to dequeue, must be positive.
*/
* get packets in consumer queue.
* @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it.
* @count the count in array, input and output param.
* @max_count the max count to dequeue, must be positive.
* @remark user can specifies the count to get specified msgs; 0 to get all if possible.
*/
virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count);
/**
* dumps packets to consumer, use specified args.
@ -256,10 +257,11 @@ public:
*/
virtual int enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag);
/**
* get packets in consumer queue.
* @param msgs the msgs array to dump packets to send.
* @param count the count in array, output param.
*/
* get packets in consumer queue.
* @param msgs the msgs array to dump packets to send.
* @param count the count in array, intput and output param.
* @remark user can specifies the count to get specified msgs; 0 to get all if possible.
*/
virtual int dump_packets(SrsMessageArray* msgs, int& count);
#ifdef SRS_PERF_QUEUE_COND_WAIT
/**

@ -1915,8 +1915,7 @@ void srs_amf0_strict_array_append(srs_amf0_t amf0, srs_amf0_t value)
int64_t srs_utils_time_ms()
{
srs_update_system_time_ms();
return srs_get_system_time_ms();
return srs_update_system_time_ms();
}
int64_t srs_utils_send_bytes(srs_rtmp_t rtmp)
@ -2320,6 +2319,11 @@ int srs_human_print_rtmp_packet(char type, u_int32_t timestamp, char* data, int
}
int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp)
{
return srs_human_print_rtmp_packet3(type, timestamp, data, size, pre_timestamp, 0);
}
int srs_human_print_rtmp_packet3(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp, int64_t pre_now)
{
int ret = ERROR_SUCCESS;
@ -2328,24 +2332,29 @@ int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int
diff = (int)timestamp - (int)pre_timestamp;
}
int ndiff = 0;
if (pre_now > 0) {
ndiff = (int)(srs_utils_time_ms() - pre_now);
}
u_int32_t pts;
if (srs_utils_parse_timestamp(timestamp, type, data, size, &pts) != 0) {
srs_human_trace("Rtmp packet type=%s, dts=%d, diff=%d, size=%d, DecodeError",
srs_human_flv_tag_type2string(type), timestamp, diff, size
srs_human_trace("Rtmp packet type=%s, dts=%d, diff=%d, ndiff=%d, size=%d, DecodeError",
srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size
);
return ret;
}
if (type == SRS_RTMP_TYPE_VIDEO) {
srs_human_trace("Video packet type=%s, dts=%d, pts=%d, diff=%d, size=%d, %s(%s,%s)",
srs_human_flv_tag_type2string(type), timestamp, pts, diff, size,
srs_human_trace("Video packet type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s)",
srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size,
srs_human_flv_video_codec_id2string(srs_utils_flv_video_codec_id(data, size)),
srs_human_flv_video_avc_packet_type2string(srs_utils_flv_video_avc_packet_type(data, size)),
srs_human_flv_video_frame_type2string(srs_utils_flv_video_frame_type(data, size))
);
} else if (type == SRS_RTMP_TYPE_AUDIO) {
srs_human_trace("Audio packet type=%s, dts=%d, pts=%d, diff=%d, size=%d, %s(%s,%s,%s,%s)",
srs_human_flv_tag_type2string(type), timestamp, pts, diff, size,
srs_human_trace("Audio packet type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s,%s,%s)",
srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size,
srs_human_flv_audio_sound_format2string(srs_utils_flv_audio_sound_format(data, size)),
srs_human_flv_audio_sound_rate2string(srs_utils_flv_audio_sound_rate(data, size)),
srs_human_flv_audio_sound_size2string(srs_utils_flv_audio_sound_size(data, size)),
@ -2353,8 +2362,8 @@ int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int
srs_human_flv_audio_aac_packet_type2string(srs_utils_flv_audio_aac_packet_type(data, size))
);
} else if (type == SRS_RTMP_TYPE_SCRIPT) {
srs_human_verbose("Data packet type=%s, time=%d, diff=%d, size=%d",
srs_human_flv_tag_type2string(type), timestamp, diff, size);
srs_human_verbose("Data packet type=%s, time=%d, diff=%d, ndiff=%d, size=%d",
srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size);
int nparsed = 0;
while (nparsed < size) {
int nb_parsed_this = 0;
@ -2370,8 +2379,8 @@ int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int
srs_freep(amf0_str);
}
} else {
srs_human_trace("Rtmp packet type=%#x, dts=%d, pts=%d, diff=%d, size=%d",
type, timestamp, pts, diff, size);
srs_human_trace("Rtmp packet type=%#x, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d",
type, timestamp, pts, diff, ndiff, size);
}
return ret;

@ -904,6 +904,7 @@ extern const char* srs_human_flv_audio_aac_packet_type2string(char aac_packet_ty
*/
extern int srs_human_print_rtmp_packet(char type, u_int32_t timestamp, char* data, int size);
extern int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp);
extern int srs_human_print_rtmp_packet3(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp, int64_t pre_now);
// log to console, for use srs-librtmp application.
extern const char* srs_human_format_time();

Loading…
Cancel
Save