Merge branch '2.0release' into develop

pull/556/head
winlin 10 years ago
commit f10e399991

@ -759,18 +759,6 @@ About the HLS overhead of SRS, we compare the overhead to FLV by remux the HLS t
The HLS overhead is calc by: (HLS - FLV) / FLV * 100%
### Bytes Api
The api provides bytes of vhost, stream and srs.
| Connections | tcpdump | srs | deviation |
| ----------- | ---------- | ---------- | --------- |
| 1 | 201687968 | 201687968 | 0% |
| 100 | 12456042662 | 12458284053 | 0.018% |
| 500 | 11623083989 | 11633158616 | 0.087% |
The tcpdump command is `tcpdump src 192.168.1.151 and port 1935 -i eth0 and tcp -ql`.
## Architecture
SRS always use the most simple architecture to support complex transaction.

@ -883,6 +883,12 @@ vhost stream.control.com {
# while the sequence header is not changed yet.
# default: off
reduce_sequence_header on;
# the 1st packet timeout in ms for encoder.
# default: 20000
publish_1stpkt_timeout 20000;
# the normal packet timeout in ms for encoder.
# default: 5000
publish_normal_timeout 7000;
}
# the vhost for antisuck.

@ -258,8 +258,8 @@ int main(int argc, char** argv)
int64_t nb_packets = 0;
u_int32_t pre_timestamp = 0;
int64_t pre_now = srs_utils_time_ms();
int64_t start_time = pre_now;
int64_t pre_now = -1;
int64_t start_time = -1;
for (;;) {
int size;
char type;
@ -271,6 +271,13 @@ int main(int argc, char** argv)
goto rtmp_destroy;
}
if (pre_now == -1) {
pre_now = srs_utils_time_ms();
}
if (start_time == -1) {
start_time = srs_utils_time_ms();
}
if (srs_human_print_rtmp_packet4(type, timestamp, data, size, pre_timestamp, pre_now, start_time, nb_packets++) != 0) {
srs_human_trace("print rtmp packet failed.");
goto rtmp_destroy;

@ -768,6 +768,28 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
}
srs_trace("vhost %s reload smi success.", vhost.c_str());
}
// publish_1stpkt_timeout, only one per vhost
if (!srs_directive_equals(new_vhost->get("publish_1stpkt_timeout"), old_vhost->get("publish_1stpkt_timeout"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_vhost_p1stpt(vhost)) != ERROR_SUCCESS) {
srs_error("vhost %s notify subscribes p1stpt failed. ret=%d", vhost.c_str(), ret);
return ret;
}
}
srs_trace("vhost %s reload p1stpt success.", vhost.c_str());
}
// publish_normal_timeout, only one per vhost
if (!srs_directive_equals(new_vhost->get("publish_normal_timeout"), old_vhost->get("publish_normal_timeout"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_vhost_pnt(vhost)) != ERROR_SUCCESS) {
srs_error("vhost %s notify subscribes pnt failed. ret=%d", vhost.c_str(), ret);
return ret;
}
}
srs_trace("vhost %s reload pnt success.", vhost.c_str());
}
// min_latency, only one per vhost
if (!srs_directive_equals(new_vhost->get("min_latency"), old_vhost->get("min_latency"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
@ -1763,6 +1785,7 @@ int SrsConfig::check_config()
&& n != "debug_srs_upnode"
&& n != "mr" && n != "mw_latency" && n != "min_latency"
&& n != "tcp_nodelay" && n != "send_min_interval" && n != "reduce_sequence_header"
&& n != "publish_1stpkt_timeout" && n != "publish_normal_timeout"
&& n != "security" && n != "http_remux"
&& n != "http" && n != "http_static"
&& n != "hds"
@ -2552,6 +2575,44 @@ bool SrsConfig::get_reduce_sequence_header(string vhost)
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
int SrsConfig::get_publish_1stpkt_timeout(string vhost)
{
// when no msg recevied for publisher, use larger timeout.
static int DEFAULT = 20000;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("publish_1stpkt_timeout");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
}
int SrsConfig::get_publish_normal_timeout(string vhost)
{
// the timeout for publish recv.
// we must use more smaller timeout, for the recv never know the status
// of underlayer socket.
static int DEFAULT = 5000;
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}
conf = conf->get("publish_normal_timeout");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
}
int SrsConfig::get_global_chunk_size()
{
SrsConfDirective* conf = root->get("chunk_size");

@ -534,6 +534,14 @@ public:
* whether reduce the sequence header.
*/
virtual bool get_reduce_sequence_header(std::string vhost);
/**
* the 1st packet timeout in ms for encoder.
*/
virtual int get_publish_1stpkt_timeout(std::string vhost);
/**
* the normal packet timeout in ms for encoder.
*/
virtual int get_publish_normal_timeout(std::string vhost);
private:
/**
* get the global chunk size.

@ -180,6 +180,16 @@ int ISrsReloadHandler::on_reload_vhost_realtime(string /*vhost*/)
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_vhost_p1stpt(string /*vhost*/)
{
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_vhost_pnt(string /*vhost*/)
{
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_vhost_chunk_size(string /*vhost*/)
{
return ERROR_SUCCESS;

@ -75,6 +75,8 @@ public:
virtual int on_reload_vhost_mw(std::string vhost);
virtual int on_reload_vhost_smi(std::string vhost);
virtual int on_reload_vhost_realtime(std::string vhost);
virtual int on_reload_vhost_p1stpt(std::string vhost);
virtual int on_reload_vhost_pnt(std::string vhost);
virtual int on_reload_vhost_chunk_size(std::string vhost);
virtual int on_reload_vhost_transcode(std::string vhost);
virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);

@ -274,9 +274,45 @@ int SrsRtmpConn::on_reload_vhost_realtime(string vhost)
}
bool realtime_enabled = _srs_config->get_realtime_enabled(req->vhost);
srs_trace("realtime changed %d=>%d", realtime, realtime_enabled);
realtime = realtime_enabled;
if (realtime_enabled != realtime) {
srs_trace("realtime changed %d=>%d", realtime, realtime_enabled);
realtime = realtime_enabled;
}
return ret;
}
int SrsRtmpConn::on_reload_vhost_p1stpt(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
int p1stpt = _srs_config->get_publish_1stpkt_timeout(req->vhost);
if (p1stpt != publish_1stpkt_timeout) {
srs_trace("p1stpt changed %d=>%d", publish_1stpkt_timeout, p1stpt);
publish_1stpkt_timeout = p1stpt;
}
return ret;
}
int SrsRtmpConn::on_reload_vhost_pnt(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
int pnt = _srs_config->get_publish_normal_timeout(req->vhost);
if (pnt != publish_normal_timeout) {
srs_trace("p1stpt changed %d=>%d", publish_normal_timeout, pnt);
publish_normal_timeout = pnt;
}
return ret;
}
@ -803,6 +839,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
// set the sock options.
set_sock_options();
if (true) {
bool mr = _srs_config->get_mr_enabled(req->vhost);
int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
publish_normal_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d", mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout);
}
int64_t nb_msgs = 0;
while (!disposed) {
@ -819,9 +863,9 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
if (nb_msgs == 0) {
// when not got msgs, wait for a larger timeout.
// @see https://github.com/simple-rtmp-server/srs/issues/441
trd->wait(SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US / 1000);
trd->wait(publish_1stpkt_timeout);
} else {
trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000);
trd->wait(publish_normal_timeout);
}
// check the thread error code.
@ -835,8 +879,8 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
// when not got any messages, timeout.
if (trd->nb_msgs() <= nb_msgs) {
ret = ERROR_SOCKET_TIMEOUT;
srs_warn("publish timeout %"PRId64"us, nb_msgs=%"PRId64", ret=%d",
SRS_CONSTS_RTMP_RECV_TIMEOUT_US, nb_msgs, ret);
srs_warn("publish timeout %dms, nb_msgs=%"PRId64", ret=%d",
nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret);
break;
}
nb_msgs = trd->nb_msgs();
@ -847,10 +891,10 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
bool mr = _srs_config->get_mr_enabled(req->vhost);
int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH
" time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d", pprint->age(),
" time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d", pprint->age(),
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(),
mr, mr_sleep
mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout
);
}
}

@ -87,6 +87,10 @@ private:
bool realtime;
// the minimal interval in ms for delivery stream.
double send_min_interval;
// publish 1st packet timeout in ms
int publish_1stpkt_timeout;
// publish normal packet timeout in ms
int publish_normal_timeout;
public:
SrsRtmpConn(SrsServer* svr, st_netfd_t c);
virtual ~SrsRtmpConn();
@ -100,6 +104,8 @@ public:
virtual int on_reload_vhost_mw(std::string vhost);
virtual int on_reload_vhost_smi(std::string vhost);
virtual int on_reload_vhost_realtime(std::string vhost);
virtual int on_reload_vhost_p1stpt(std::string vhost);
virtual int on_reload_vhost_pnt(std::string vhost);
// interface IKbpsDelta
public:
virtual void resample();

@ -74,12 +74,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// the timeout to wait client data,
// if timeout, close the connection.
#define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL)
// the timeout for publish recv.
// we must use more smaller timeout, for the recv never know the status
// of underlayer socket.
#define SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US (int64_t)(3*1000*1000LL)
// when no msg recevied for publisher, use larger timeout.
#define SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US 10*SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US
// the timeout to wait for client control message,
// if timeout, we generally ignore and send the data to client,

@ -2338,6 +2338,12 @@ int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* data, int
pi = (pre_now - starttime) / (double)nb_packets;
}
// global fps(video and audio mixed fps).
double gfps = 0;
if (pi > 0) {
gfps = 1000 / pi;
}
int diff = 0;
if (pre_timestamp > 0) {
diff = (int)timestamp - (int)pre_timestamp;
@ -2350,22 +2356,22 @@ int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* data, int
u_int32_t pts;
if (srs_utils_parse_timestamp(timestamp, type, data, size, &pts) != 0) {
srs_human_trace("Rtmp packet id=%"PRId64"/%.1f, type=%s, dts=%d, diff=%d, ndiff=%d, size=%d, DecodeError",
nb_packets, pi, srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size
srs_human_trace("Rtmp packet id=%"PRId64"/%.1f/%.1f, type=%s, dts=%d, ndiff=%d, diff=%d, size=%d, DecodeError",
nb_packets, pi, gfps, srs_human_flv_tag_type2string(type), timestamp, ndiff, diff, size
);
return ret;
}
if (type == SRS_RTMP_TYPE_VIDEO) {
srs_human_trace("Video packet id=%"PRId64"/%.1f, type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s)",
nb_packets, pi, srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size,
srs_human_trace("Video packet id=%"PRId64"/%.1f/%.1f, type=%s, dts=%d, pts=%d, ndiff=%d, diff=%d, size=%d, %s(%s,%s)",
nb_packets, pi, gfps, srs_human_flv_tag_type2string(type), timestamp, pts, ndiff, diff, size,
srs_human_flv_video_codec_id2string(srs_utils_flv_video_codec_id(data, size)),
srs_human_flv_video_avc_packet_type2string(srs_utils_flv_video_avc_packet_type(data, size)),
srs_human_flv_video_frame_type2string(srs_utils_flv_video_frame_type(data, size))
);
} else if (type == SRS_RTMP_TYPE_AUDIO) {
srs_human_trace("Audio packet id=%"PRId64"/%.1f, type=%s, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d, %s(%s,%s,%s,%s)",
nb_packets, pi, srs_human_flv_tag_type2string(type), timestamp, pts, diff, ndiff, size,
srs_human_trace("Audio packet id=%"PRId64"/%.1f/%.1f, type=%s, dts=%d, pts=%d, ndiff=%d, diff=%d, size=%d, %s(%s,%s,%s,%s)",
nb_packets, pi, gfps, srs_human_flv_tag_type2string(type), timestamp, pts, ndiff, diff, size,
srs_human_flv_audio_sound_format2string(srs_utils_flv_audio_sound_format(data, size)),
srs_human_flv_audio_sound_rate2string(srs_utils_flv_audio_sound_rate(data, size)),
srs_human_flv_audio_sound_size2string(srs_utils_flv_audio_sound_size(data, size)),
@ -2373,8 +2379,8 @@ int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* data, int
srs_human_flv_audio_aac_packet_type2string(srs_utils_flv_audio_aac_packet_type(data, size))
);
} else if (type == SRS_RTMP_TYPE_SCRIPT) {
srs_human_verbose("Data packet id=%"PRId64"/%.1f, type=%s, time=%d, diff=%d, ndiff=%d, size=%d",
nb_packets, pi, srs_human_flv_tag_type2string(type), timestamp, diff, ndiff, size);
srs_human_verbose("Data packet id=%"PRId64"/%.1f/%.1f, type=%s, time=%d, ndiff=%d, diff=%d, size=%d",
nb_packets, pi, gfps, srs_human_flv_tag_type2string(type), timestamp, ndiff, diff, size);
int nparsed = 0;
while (nparsed < size) {
int nb_parsed_this = 0;
@ -2390,8 +2396,8 @@ int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* data, int
srs_freep(amf0_str);
}
} else {
srs_human_trace("Rtmp packet id=%"PRId64"/%.1f, type=%#x, dts=%d, pts=%d, diff=%d, ndiff=%d, size=%d",
nb_packets, pi, type, timestamp, pts, diff, ndiff, size);
srs_human_trace("Rtmp packet id=%"PRId64"/%.1f/%.1f, type=%#x, dts=%d, pts=%d, ndiff=%d, diff=%d, size=%d",
nb_packets, pi, gfps, type, timestamp, pts, ndiff, diff, size);
}
return ret;

Loading…
Cancel
Save