|
|
|
@ -101,9 +101,11 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int cport)
|
|
|
|
|
ip = cip;
|
|
|
|
|
port = cport;
|
|
|
|
|
create_time = srsu2ms(srs_get_system_time());
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
span_main_ = _srs_apm->dummy();
|
|
|
|
|
span_connect_ = _srs_apm->dummy();
|
|
|
|
|
span_client_ = _srs_apm->dummy();
|
|
|
|
|
#endif
|
|
|
|
|
trd = new SrsSTCoroutine("rtmp", this, _srs_context->get_id());
|
|
|
|
|
|
|
|
|
|
kbps = new SrsNetworkKbps();
|
|
|
|
@ -149,9 +151,11 @@ SrsRtmpConn::~SrsRtmpConn()
|
|
|
|
|
srs_freep(rtmp);
|
|
|
|
|
srs_freep(refer);
|
|
|
|
|
srs_freep(security);
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
srs_freep(span_main_);
|
|
|
|
|
srs_freep(span_connect_);
|
|
|
|
|
srs_freep(span_client_);
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::string SrsRtmpConn::desc()
|
|
|
|
@ -169,16 +173,22 @@ srs_error_t SrsRtmpConn::do_cycle()
|
|
|
|
|
{
|
|
|
|
|
srs_error_t err = srs_success;
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// We should keep the root span to alive util connection closed.
|
|
|
|
|
// Note that we use producer and consumer span because RTMP connection is long polling connection.
|
|
|
|
|
// Note that we also store this span in coroutine context, so that edge could load it.
|
|
|
|
|
srs_freep(span_main_);
|
|
|
|
|
span_main_ = _srs_apm->span("rtmp")->set_kind(SrsApmKindServer)->attr("cip", ip)
|
|
|
|
|
->attr("cid", _srs_context->get_id().c_str());
|
|
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
srs_trace("RTMP client ip=%s:%d, fd=%d, trace=%s, span=%s", ip.c_str(), port, srs_netfd_fileno(stfd),
|
|
|
|
|
span_main_->format_trace_id(), span_main_->format_span_id()
|
|
|
|
|
);
|
|
|
|
|
#else
|
|
|
|
|
srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
|
|
|
|
|
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
|
|
|
|
@ -193,12 +203,14 @@ srs_error_t SrsRtmpConn::do_cycle()
|
|
|
|
|
srs_trace("RTMP proxy real client ip=%s", rips.c_str());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// Update the real IP of client, also set the HTTP fields.
|
|
|
|
|
span_main_->attr("rip", rip ? rips : ip)->attr("http.client_ip", rip ? rips : ip);
|
|
|
|
|
|
|
|
|
|
// The span for RTMP connecting to application.
|
|
|
|
|
srs_freep(span_connect_);
|
|
|
|
|
span_connect_ = _srs_apm->span("connect")->as_child(span_main_);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
SrsRequest* req = info->req;
|
|
|
|
|
if ((err = rtmp->connect_app(req)) != srs_success) {
|
|
|
|
@ -239,9 +251,11 @@ srs_error_t SrsRtmpConn::do_cycle()
|
|
|
|
|
srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// Load the span from the AMF0 object propagator.
|
|
|
|
|
// Note that we will update the trace id, so please make sure no spans are ended before this.
|
|
|
|
|
_srs_apm->extract(span_main_, req->args);
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ((err = service_cycle()) != srs_success) {
|
|
|
|
@ -414,8 +428,10 @@ srs_error_t SrsRtmpConn::service_cycle()
|
|
|
|
|
return srs_error_wrap(err, "rtmp: response connect app");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// Must be a connecting application span.
|
|
|
|
|
span_connect_->end();
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if ((err = rtmp->on_bw_done()) != srs_success) {
|
|
|
|
|
return srs_error_wrap(err, "rtmp: on bw down");
|
|
|
|
@ -493,20 +509,24 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|
|
|
|
srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%dms",
|
|
|
|
|
srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), srsu2msi(req->duration));
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// Start APM only when client is identified, because it might republish.
|
|
|
|
|
srs_freep(span_client_);
|
|
|
|
|
span_client_ = _srs_apm->span("client")->as_child(span_connect_)->attr("type", srs_client_type_string(info->type))
|
|
|
|
|
->attr("url", req->get_stream_url())->attr("http.url", req->get_stream_url());
|
|
|
|
|
// We store the span to coroutine context, for edge to load it.
|
|
|
|
|
_srs_apm->store(span_client_);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
// discovery vhost, resolve the vhost from config
|
|
|
|
|
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
|
|
|
|
|
if (parsed_vhost) {
|
|
|
|
|
req->vhost = parsed_vhost->arg0();
|
|
|
|
|
}
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
span_client_->attr("vhost", req->vhost)->attr("http.host", req->host)->attr("http.server_name", req->vhost)
|
|
|
|
|
->attr("http.target", srs_fmt("/%s/%s", req->app.c_str(), req->stream.c_str()));
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if (req->schema.empty() || req->vhost.empty() || req->port == 0 || req->app.empty()) {
|
|
|
|
|
return srs_error_new(ERROR_RTMP_REQ_TCURL, "discovery tcUrl failed, tcUrl=%s, schema=%s, vhost=%s, port=%d, app=%s",
|
|
|
|
@ -582,10 +602,12 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|
|
|
|
return srs_error_wrap(err, "rtmp: callback on play");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// Must be a client span.
|
|
|
|
|
span_client_->set_name("play")->end();
|
|
|
|
|
// We end the connection span because it's a producer and only trace the established.
|
|
|
|
|
span_main_->end();
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
err = playing(source);
|
|
|
|
|
http_hooks_on_stop();
|
|
|
|
@ -597,10 +619,12 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|
|
|
|
return srs_error_wrap(err, "rtmp: start FMLE publish");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// Must be a client span.
|
|
|
|
|
span_client_->set_name("publish")->end();
|
|
|
|
|
// We end the connection span because it's a producer and only trace the established.
|
|
|
|
|
span_main_->end();
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
return publishing(source);
|
|
|
|
|
}
|
|
|
|
@ -609,10 +633,12 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|
|
|
|
return srs_error_wrap(err, "rtmp: start HAIVISION publish");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// Must be a client span.
|
|
|
|
|
span_client_->set_name("publish")->end();
|
|
|
|
|
// We end the connection span because it's a producer and only trace the established.
|
|
|
|
|
span_main_->end();
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
return publishing(source);
|
|
|
|
|
}
|
|
|
|
@ -621,10 +647,12 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|
|
|
|
return srs_error_wrap(err, "rtmp: start FLASH publish");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// Must be a client span.
|
|
|
|
|
span_client_->set_name("publish")->end();
|
|
|
|
|
// We end the connection span because it's a producer and only trace the established.
|
|
|
|
|
span_main_->end();
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
return publishing(source);
|
|
|
|
|
}
|
|
|
|
@ -786,9 +814,11 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
|
|
|
|
|
srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d",
|
|
|
|
|
srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay);
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
ISrsApmSpan* span = _srs_apm->span("play-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)
|
|
|
|
|
->attr("realtime", srs_fmt("%d", realtime))->end();
|
|
|
|
|
SrsAutoFree(ISrsApmSpan, span);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
// when source is set to expired, disconnect it.
|
|
|
|
@ -833,10 +863,12 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
|
|
|
|
|
(int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
|
|
|
|
|
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs);
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// TODO: Do not use pithy print for frame span.
|
|
|
|
|
ISrsApmSpan* sample = _srs_apm->span("play-frame")->set_kind(SrsApmKindConsumer)->as_child(span)
|
|
|
|
|
->attr("msgs", srs_fmt("%d", count))->attr("kbps", srs_fmt("%d", kbps->get_send_kbps_30s()));
|
|
|
|
|
srs_freep(sample);
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (count <= 0) {
|
|
|
|
@ -963,9 +995,11 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
|
|
|
|
|
srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d", mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
ISrsApmSpan* span = _srs_apm->span("publish-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)
|
|
|
|
|
->attr("timeout", srs_fmt("%d", srsu2msi(publish_normal_timeout)))->end();
|
|
|
|
|
SrsAutoFree(ISrsApmSpan, span);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
int64_t nb_msgs = 0;
|
|
|
|
|
uint64_t nb_frames = 0;
|
|
|
|
@ -1015,10 +1049,12 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
|
|
|
|
|
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),
|
|
|
|
|
srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout));
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// TODO: Do not use pithy print for frame span.
|
|
|
|
|
ISrsApmSpan* sample = _srs_apm->span("publish-frame")->set_kind(SrsApmKindConsumer)->as_child(span)
|
|
|
|
|
->attr("msgs", srs_fmt("%" PRId64, nb_frames))->attr("kbps", srs_fmt("%d", kbps->get_recv_kbps_30s()));
|
|
|
|
|
srs_freep(sample);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1543,6 +1579,7 @@ srs_error_t SrsRtmpConn::cycle()
|
|
|
|
|
// Serve the client.
|
|
|
|
|
err = do_cycle();
|
|
|
|
|
|
|
|
|
|
#ifdef SRS_APM
|
|
|
|
|
// Final APM span, parent is the last span, not the root span. Note that only client or server kind will be filtered
|
|
|
|
|
// for error or exception report.
|
|
|
|
|
ISrsApmSpan* span_final = _srs_apm->span("final")->set_kind(SrsApmKindServer)->as_child(span_client_);
|
|
|
|
@ -1550,6 +1587,7 @@ srs_error_t SrsRtmpConn::cycle()
|
|
|
|
|
if (srs_error_code(err) != 0) {
|
|
|
|
|
span_final->record_error(err)->set_status(SrsApmStatusError, srs_fmt("fail code=%d", srs_error_code(err)));
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
// Update statistic when done.
|
|
|
|
|
SrsStatistic* stat = SrsStatistic::instance();
|
|
|
|
|