diff --git a/README.md b/README.md index d6a33c5fd..2d186565c 100755 --- a/README.md +++ b/README.md @@ -342,6 +342,7 @@ Remark: ## History +* v2.0, 2015-08-14, use send_min_interval for stream control. 2.0.183 * v2.0, 2015-08-12, enable the SRS_PERF_TCP_NODELAY and add config tcp_nodelay. 2.0.182 * v2.0, 2015-08-11, for [#442](https://github.com/simple-rtmp-server/srs/issues/442) support kickoff connected client. 2.0.181 * v2.0, 2015-07-21, for [#169](https://github.com/simple-rtmp-server/srs/issues/169) support default values for transcode. 2.0.180 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 6ebd97dca..2e878c065 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -858,6 +858,27 @@ vhost min.delay.com { tcp_nodelay on; } +# the vhost to control the stream delivery feature +vhost stream.control.com { + # @see vhost mrw.srs.com for detail. + min_latency on; + mr { + enabled off; + } + mw_latency 100; + # @see vhost min.delay.com + queue_length 10; + tcp_nodelay on; + # the minimal packets send interval in ms, + # used to control the ndiff of stream by srs_rtmp_dump, + # for example, some device can only accept some stream which + # delivery packets in constant interval(not cbr). + # @remark 0 to disable the minimal interval. + # @remark >0 to make the srs to send message one by one. + # default: 0 + send_min_interval 3; +} + # the vhost for antisuck. vhost refer.anti_suck.com { # the common refer for play and publish. diff --git a/trunk/research/librtmp/srs_rtmp_dump.c b/trunk/research/librtmp/srs_rtmp_dump.c index 1fbab4821..065df1a92 100644 --- a/trunk/research/librtmp/srs_rtmp_dump.c +++ b/trunk/research/librtmp/srs_rtmp_dump.c @@ -257,6 +257,7 @@ int main(int argc, char** argv) } u_int32_t pre_timestamp = 0; + int64_t pre_now = srs_utils_time_ms(); for (;;) { int size; char type; @@ -268,11 +269,12 @@ int main(int argc, char** argv) goto rtmp_destroy; } - if (srs_human_print_rtmp_packet2(type, timestamp, data, size, pre_timestamp) != 0) { + if (srs_human_print_rtmp_packet3(type, timestamp, data, size, pre_timestamp, pre_now) != 0) { srs_human_trace("print rtmp packet failed."); goto rtmp_destroy; } pre_timestamp = timestamp; + pre_now = srs_utils_time_ms(); // we only write some types of messages to flv file. int is_flv_msg = type == SRS_RTMP_TYPE_AUDIO diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index d14154a72..33c2db42d 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -757,6 +757,17 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) } srs_trace("vhost %s reload mw success.", vhost.c_str()); } + // smi(send_min_interval), only one per vhost + if (!srs_directive_equals(new_vhost->get("send_min_interval"), old_vhost->get("send_min_interval"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_vhost_smi(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes smi failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload smi success.", vhost.c_str()); + } // min_latency, only one per vhost if (!srs_directive_equals(new_vhost->get("min_latency"), old_vhost->get("min_latency"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { @@ -1750,7 +1761,7 @@ int SrsConfig::check_config() && n != "time_jitter" && n != "mix_correct" && n != "atc" && n != "atc_auto" && n != "debug_srs_upnode" - && n != "mr" && n != "mw_latency" && n != "min_latency" && n != "tcp_nodelay" + && n != "mr" && n != "mw_latency" && n != "min_latency" && n != "tcp_nodelay" && n != "send_min_interval" && n != "security" && n != "http_remux" && n != "http" && n != "http_static" && n != "hds" @@ -2506,6 +2517,23 @@ bool SrsConfig::get_tcp_nodelay(string vhost) return SRS_CONF_PERFER_FALSE(conf->arg0()); } +int SrsConfig::get_send_min_interval(string vhost) +{ + static int DEFAULT = 0; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("send_min_interval"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + int SrsConfig::get_global_chunk_size() { SrsConfDirective* conf = root->get("chunk_size"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index a1ca864e1..ad695d166 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -526,6 +526,10 @@ public: * whether enable tcp nodelay for all clients of vhost. */ virtual bool get_tcp_nodelay(std::string vhost); + /** + * the minimal send interval in ms. + */ + virtual int get_send_min_interval(std::string vhost); private: /** * get the global chunk size. diff --git a/trunk/src/app/srs_app_reload.cpp b/trunk/src/app/srs_app_reload.cpp index 905964462..ca9732988 100644 --- a/trunk/src/app/srs_app_reload.cpp +++ b/trunk/src/app/srs_app_reload.cpp @@ -170,6 +170,11 @@ int ISrsReloadHandler::on_reload_vhost_mw(string /*vhost*/) return ERROR_SUCCESS; } +int ISrsReloadHandler::on_reload_vhost_smi(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + int ISrsReloadHandler::on_reload_vhost_realtime(string /*vhost*/) { return ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_reload.hpp b/trunk/src/app/srs_app_reload.hpp index f05370cff..2e2f8c765 100644 --- a/trunk/src/app/srs_app_reload.hpp +++ b/trunk/src/app/srs_app_reload.hpp @@ -73,6 +73,7 @@ public: virtual int on_reload_vhost_dvr(std::string vhost); virtual int on_reload_vhost_mr(std::string vhost); virtual int on_reload_vhost_mw(std::string vhost); + virtual int on_reload_vhost_smi(std::string vhost); virtual int on_reload_vhost_realtime(std::string vhost); virtual int on_reload_vhost_chunk_size(std::string vhost); virtual int on_reload_vhost_transcode(std::string vhost); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 8b796b8ab..f63456abb 100755 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -93,6 +93,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) mw_sleep = SRS_PERF_MW_SLEEP; mw_enabled = false; realtime = SRS_PERF_MIN_LATENCY_ENABLED; + send_min_interval = 0; _srs_config->subscribe(this); } @@ -247,6 +248,23 @@ int SrsRtmpConn::on_reload_vhost_mw(string vhost) return ret; } +int SrsRtmpConn::on_reload_vhost_smi(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + int smi = _srs_config->get_send_min_interval(vhost); + if (smi != send_min_interval) { + srs_trace("apply smi %d=>%d", send_min_interval, smi); + send_min_interval = smi; + } + + return ret; +} + int SrsRtmpConn::on_reload_vhost_realtime(string vhost) { int ret = ERROR_SUCCESS; @@ -589,10 +607,15 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe // when mw_sleep changed, resize the socket send buffer. mw_enabled = true; change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost)); + // initialize the send_min_interval + send_min_interval = _srs_config->get_send_min_interval(req->vhost); // set the sock options. set_sock_options(); + srs_trace("start play smi=%d, mw_sleep=%d, mw_enabled=%d, realtime=%d", + send_min_interval, mw_sleep, mw_enabled, realtime); + while (!disposed) { // collect elapse for pithy print. pprint->elapse(); @@ -641,7 +664,8 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe // get messages from consumer. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. - int count = 0; + // @remark when enable send_min_interval, only fetch one message a time. + int count = send_min_interval? 1 : 0; if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { srs_error("get messages from consumer failed. ret=%d", ret); return ret; @@ -716,6 +740,11 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe return ret; } } + + // apply the minimal interval for delivery stream in ms. + if (send_min_interval > 0) { + st_usleep(send_min_interval * 1000); + } } return ret; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 683a23271..36f311a07 100755 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -85,6 +85,8 @@ private: // for realtime // @see https://github.com/simple-rtmp-server/srs/issues/257 bool realtime; + // the minimal interval in ms for delivery stream. + int send_min_interval; public: SrsRtmpConn(SrsServer* svr, st_netfd_t c); virtual ~SrsRtmpConn(); @@ -96,6 +98,7 @@ protected: public: virtual int on_reload_vhost_removed(std::string vhost); virtual int on_reload_vhost_mw(std::string vhost); + virtual int on_reload_vhost_smi(std::string vhost); virtual int on_reload_vhost_realtime(std::string vhost); // interface IKbpsDelta public: diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 72a8b4671..02a9457eb 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -301,7 +301,8 @@ int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, in } srs_assert(max_count > 0); - count = srs_min(max_count, nb_msgs); + // when count is 0, dumps all; otherwise, dumps no more than count. + count = srs_min(max_count, count? count : nb_msgs); SrsSharedPtrMessage** omsgs = msgs.data(); for (int i = 0; i < count; i++) { diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 361901920..74d7f343f 100755 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -173,11 +173,12 @@ public: */ virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL); /** - * get packets in consumer queue. - * @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it. - * @count the count in array, output param. - * @max_count the max count to dequeue, must be positive. - */ + * get packets in consumer queue. + * @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it. + * @count the count in array, input and output param. + * @max_count the max count to dequeue, must be positive. + * @remark user can specifies the count to get specified msgs; 0 to get all if possible. + */ virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); /** * dumps packets to consumer, use specified args. @@ -256,10 +257,11 @@ public: */ virtual int enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag); /** - * get packets in consumer queue. - * @param msgs the msgs array to dump packets to send. - * @param count the count in array, output param. - */ + * get packets in consumer queue. + * @param msgs the msgs array to dump packets to send. + * @param count the count in array, intput and output param. + * @remark user can specifies the count to get specified msgs; 0 to get all if possible. + */ virtual int dump_packets(SrsMessageArray* msgs, int& count); #ifdef SRS_PERF_QUEUE_COND_WAIT /** diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 02c33b36b..d640f1deb 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 182 +#define VERSION_REVISION 183 // server info. #define RTMP_SIG_SRS_KEY "SRS" diff --git a/trunk/src/libs/srs_librtmp.cpp b/trunk/src/libs/srs_librtmp.cpp index f9830039e..d1aab230e 100644 --- a/trunk/src/libs/srs_librtmp.cpp +++ b/trunk/src/libs/srs_librtmp.cpp @@ -1915,8 +1915,7 @@ void srs_amf0_strict_array_append(srs_amf0_t amf0, srs_amf0_t value) int64_t srs_utils_time_ms() { - srs_update_system_time_ms(); - return srs_get_system_time_ms(); + return srs_update_system_time_ms(); } int64_t srs_utils_send_bytes(srs_rtmp_t rtmp) @@ -2320,6 +2319,11 @@ int srs_human_print_rtmp_packet(char type, u_int32_t timestamp, char* data, int } int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp) +{ + return srs_human_print_rtmp_packet3(type, timestamp, data, size, pre_timestamp, 0); +} + +int srs_human_print_rtmp_packet3(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp, int64_t pre_now) { int ret = ERROR_SUCCESS; @@ -2328,24 +2332,29 @@ int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int diff = (int)timestamp - (int)pre_timestamp; } + int ndiff = 0; + if (pre_now > 0) { + ndiff = (int)(srs_utils_time_ms() - pre_now); + } + u_int32_t pts; if (srs_utils_parse_timestamp(timestamp, type, data, size, &pts) != 0) { - srs_human_trace("Rtmp packet type=%s, dts=%d, diff=%d, size=%d, DecodeError", - srs_human_flv_tag_type2string(type), timestamp, diff, size + srs_human_trace("Rtmp packet type=%s, dts=%d, diff=%d, ndiff=%d, size=%d, DecodeError", + srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size ); return ret; } if (type == SRS_RTMP_TYPE_VIDEO) { - srs_human_trace("Video packet type=%s, dts=%d, pts=%d, diff=%d, size=%d, %s(%s,%s)", - srs_human_flv_tag_type2string(type), timestamp, pts, diff, size, + srs_human_trace("Video packet type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s)", + srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size, srs_human_flv_video_codec_id2string(srs_utils_flv_video_codec_id(data, size)), srs_human_flv_video_avc_packet_type2string(srs_utils_flv_video_avc_packet_type(data, size)), srs_human_flv_video_frame_type2string(srs_utils_flv_video_frame_type(data, size)) ); } else if (type == SRS_RTMP_TYPE_AUDIO) { - srs_human_trace("Audio packet type=%s, dts=%d, pts=%d, diff=%d, size=%d, %s(%s,%s,%s,%s)", - srs_human_flv_tag_type2string(type), timestamp, pts, diff, size, + srs_human_trace("Audio packet type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s,%s,%s)", + srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size, srs_human_flv_audio_sound_format2string(srs_utils_flv_audio_sound_format(data, size)), srs_human_flv_audio_sound_rate2string(srs_utils_flv_audio_sound_rate(data, size)), srs_human_flv_audio_sound_size2string(srs_utils_flv_audio_sound_size(data, size)), @@ -2353,8 +2362,8 @@ int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int srs_human_flv_audio_aac_packet_type2string(srs_utils_flv_audio_aac_packet_type(data, size)) ); } else if (type == SRS_RTMP_TYPE_SCRIPT) { - srs_human_verbose("Data packet type=%s, time=%d, diff=%d, size=%d", - srs_human_flv_tag_type2string(type), timestamp, diff, size); + srs_human_verbose("Data packet type=%s, time=%d, diff=%d, ndiff=%d, size=%d", + srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size); int nparsed = 0; while (nparsed < size) { int nb_parsed_this = 0; @@ -2370,8 +2379,8 @@ int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int srs_freep(amf0_str); } } else { - srs_human_trace("Rtmp packet type=%#x, dts=%d, pts=%d, diff=%d, size=%d", - type, timestamp, pts, diff, size); + srs_human_trace("Rtmp packet type=%#x, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d", + type, timestamp, pts, diff, ndiff, size); } return ret; diff --git a/trunk/src/libs/srs_librtmp.hpp b/trunk/src/libs/srs_librtmp.hpp index 67eb77b82..b0fc49e52 100644 --- a/trunk/src/libs/srs_librtmp.hpp +++ b/trunk/src/libs/srs_librtmp.hpp @@ -904,6 +904,7 @@ extern const char* srs_human_flv_audio_aac_packet_type2string(char aac_packet_ty */ extern int srs_human_print_rtmp_packet(char type, u_int32_t timestamp, char* data, int size); extern int srs_human_print_rtmp_packet2(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp); +extern int srs_human_print_rtmp_packet3(char type, u_int32_t timestamp, char* data, int size, u_int32_t pre_timestamp, int64_t pre_now); // log to console, for use srs-librtmp application. extern const char* srs_human_format_time();