diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 6e2d02b57..c455388fe 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -175,6 +175,56 @@ circuit_breaker { dying_pulse 5; } +# TencentCloud CLS config, logging to cloud. +# See https://cloud.tencent.com/document/product/614/11254 +tencentcloud_cls { + # Whether heartbeat is enabled. + # Overwrite by env SRS_TENCENTCLOUD_CLS_ENABLED + # default: off + enabled off; + # The logging label to category the cluster servers. + # Overwrite by env SRS_TENCENTCLOUD_CLS_LABEL + label cn-beijing; + # The logging tag to category the cluster servers. + # Overwrite by env SRS_TENCENTCLOUD_CLS_TAG + tag cn-edge; + # The SecretId to access CLS service, see https://console.cloud.tencent.com/cam/capi + # Overwrite by env SRS_TENCENTCLOUD_CLS_SECRET_ID + secret_id AKIDxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx; + # The SecretKey to access CLS service, see https://console.cloud.tencent.com/cam/capi + # Overwrite by env SRS_TENCENTCLOUD_CLS_SECRET_KEY + secret_key xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx; + # The endpoint of CLS, format as .cls.tencentcs.com. For example: + # ap-guangzhou.cls.tencentcs.com + # Note that tencentyun.com is for internal network, while tencentcs.com is for public internet. + # See https://cloud.tencent.com/document/product/614/18940 + # Overwrite by env SRS_TENCENTCLOUD_CLS_ENDPOINT + endpoint ap-guangzhou.cls.tencentcs.com; + # The topic ID of CLS, see https://cloud.tencent.com/document/product/614/41035 + # Overwrite by env SRS_TENCENTCLOUD_CLS_TOPIC_ID + topic_id xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx; + # Whether enable logging for each log sending. + # Overwrite by env SRS_TENCENTCLOUD_CLS_DEBUG_LOGGING + # Default: off + debug_logging off; + # Whether enable the heartbeat stat every (5 * heartbeat_ratio)s. + # Overwrite by env SRS_TENCENTCLOUD_CLS_STAT_HEARTBEAT + # Default: on + stat_heartbeat on; + # Setup the heartbeat interval ratio, 1 means 5s, 2 means 10s, etc. + # Overwrite by env SRS_TENCENTCLOUD_CLS_HEARTBEAT_RATIO + # Default: 1 + heartbeat_ratio 1; + # Whether enable the streams stat every (5 * streams_ratio)s. + # Overwrite by env SRS_TENCENTCLOUD_CLS_STAT_STREAMS + # Default: on + stat_streams on; + # Setup the streams interval ratio, 1 means 5s, 2 means 10s, etc. + # Overwrite by env SRS_TENCENTCLOUD_CLS_STREAMS_RATIO + # Default: 1 + streams_ratio 1; +} + ############################################################################################# # heartbeat/stats sections ############################################################################################# diff --git a/trunk/configure b/trunk/configure index 4ee6f64ee..5172a18ba 100755 --- a/trunk/configure +++ b/trunk/configure @@ -255,7 +255,7 @@ if [[ $SRS_FFMPEG_FIT == YES ]]; then fi MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_source" "srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder" "srs_app_http_stream" - "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config" + "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config" "srs_app_tencentcloud" "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" "srs_app_http_conn" "srs_app_http_hooks" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_edge" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index ea8f62a35..6b905189c 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 5.0 Changelog +* v5.0, 2022-08-24, Log: Support write log to tencentcloud CLS. v5.0.44 * v5.0, 2022-08-22, Fix [#3114](https://github.com/ossrs/srs/issues/3114): Origin cluster config bug. v5.0.43 * v5.0, 2022-08-19, For [#2136](https://github.com/ossrs/srs/issues/2136): API: Cleanup no active streams for statistics. v5.0.42 * v5.0, 2022-08-14, Fix [#2747](https://github.com/ossrs/srs/issues/2747): Support Apple Silicon M1(aarch64). v5.0.41 diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 7ce6652d3..4603f220c 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -59,6 +59,10 @@ const char* _srs_version = "XCORE-" RTMP_SIG_SRS_SERVER; #define SRS_CR (char)SRS_CONSTS_CR // Overwrite the config by env. +#define SRS_OVERWRITE_BY_ENV_STRING(key) if (getenv(key)) return getenv(key) +#define SRS_OVERWRITE_BY_ENV_BOOL(key) if (getenv(key)) return SRS_CONF_PERFER_FALSE(string(getenv(key))) +#define SRS_OVERWRITE_BY_ENV_BOOL2(key) if (getenv(key)) return SRS_CONF_PERFER_TRUE(string(getenv(key))) +#define SRS_OVERWRITE_BY_ENV_INT(key) if (getenv(key)) return ::atoi(getenv(key)) #define SRS_OVERWRITE_BY_ENV_SECONDS(key) if (getenv(key)) return ::atoi(getenv(key)) * SRS_UTIME_SECONDS /** @@ -2393,7 +2397,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker" && n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate" && n != "query_latest_version" && n != "first_wait_for_qlv" && n != "threads" - && n != "circuit_breaker" && n != "is_full" && n != "in_docker" + && n != "circuit_breaker" && n != "is_full" && n != "in_docker" && n != "tencentcloud_cls" ) { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str()); } @@ -3371,6 +3375,234 @@ int SrsConfig::get_dying_pulse() return ::atoi(conf->arg0().c_str()); } +bool SrsConfig::get_tencentcloud_cls_enabled() +{ + SRS_OVERWRITE_BY_ENV_BOOL("SRS_TENCENTCLOUD_CLS_ENABLED"); + + static bool DEFAULT = false; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("enabled"); + if (!conf) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +bool SrsConfig::get_tencentcloud_cls_stat_heartbeat() +{ + SRS_OVERWRITE_BY_ENV_BOOL2("SRS_TENCENTCLOUD_CLS_STAT_HEARTBEAT"); + + static bool DEFAULT = true; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("stat_heartbeat"); + if (!conf) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +bool SrsConfig::get_tencentcloud_cls_stat_streams() +{ + SRS_OVERWRITE_BY_ENV_BOOL2("SRS_TENCENTCLOUD_CLS_STAT_STREAMS"); + + static bool DEFAULT = true; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("stat_streams"); + if (!conf) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +bool SrsConfig::get_tencentcloud_cls_debug_logging() +{ + SRS_OVERWRITE_BY_ENV_BOOL("SRS_TENCENTCLOUD_CLS_DEBUG_LOGGING"); + + static bool DEFAULT = false; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("debug_logging"); + if (!conf) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +int SrsConfig::get_tencentcloud_cls_heartbeat_ratio() +{ + SRS_OVERWRITE_BY_ENV_INT("SRS_TENCENTCLOUD_CLS_HEARTBEAT_RATIO"); + + static int DEFAULT = 1; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("heartbeat_ratio"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_tencentcloud_cls_streams_ratio() +{ + SRS_OVERWRITE_BY_ENV_INT("SRS_TENCENTCLOUD_CLS_STREAMS_RATIO"); + + static int DEFAULT = 1; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("streams_ratio"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +string SrsConfig::get_tencentcloud_cls_label() +{ + SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_LABEL"); + + static string DEFAULT = ""; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("label"); + if (!conf) { + return DEFAULT; + } + + return conf->arg0(); +} + +string SrsConfig::get_tencentcloud_cls_tag() +{ + SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_TAG"); + + static string DEFAULT = ""; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("tag"); + if (!conf) { + return DEFAULT; + } + + return conf->arg0(); +} + +string SrsConfig::get_tencentcloud_cls_secret_id() +{ + SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_SECRET_ID"); + + static string DEFAULT = ""; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("secret_id"); + if (!conf) { + return DEFAULT; + } + + return conf->arg0(); +} + +string SrsConfig::get_tencentcloud_cls_secret_key() +{ + SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_SECRET_KEY"); + + static string DEFAULT = ""; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("secret_key"); + if (!conf) { + return DEFAULT; + } + + return conf->arg0(); +} + +string SrsConfig::get_tencentcloud_cls_endpoint() +{ + SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_ENDPOINT"); + + static string DEFAULT = ""; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("endpoint"); + if (!conf) { + return DEFAULT; + } + + return conf->arg0(); +} + +string SrsConfig::get_tencentcloud_cls_topic_id() +{ + SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_TOPIC_ID"); + + static string DEFAULT = ""; + + SrsConfDirective* conf = root->get("tencentcloud_cls"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("topic_id"); + if (!conf) { + return DEFAULT; + } + + return conf->arg0(); +} + vector SrsConfig::get_stream_casters() { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 666ffcaf8..36a14a57c 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -460,6 +460,20 @@ public: virtual int get_critical_pulse(); virtual int get_dying_threshold(); virtual int get_dying_pulse(); +// TencentCloud service section. +public: + virtual bool get_tencentcloud_cls_enabled(); + virtual bool get_tencentcloud_cls_stat_heartbeat(); + virtual bool get_tencentcloud_cls_stat_streams(); + virtual bool get_tencentcloud_cls_debug_logging(); + virtual int get_tencentcloud_cls_heartbeat_ratio(); + virtual int get_tencentcloud_cls_streams_ratio(); + virtual std::string get_tencentcloud_cls_label(); + virtual std::string get_tencentcloud_cls_tag(); + virtual std::string get_tencentcloud_cls_secret_id(); + virtual std::string get_tencentcloud_cls_secret_key(); + virtual std::string get_tencentcloud_cls_endpoint(); + virtual std::string get_tencentcloud_cls_topic_id(); // stream_caster section public: // Get all stream_caster in config file. diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index 2d55af4f1..05a63f084 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -12,6 +12,7 @@ #include #include #include +#include using namespace std; @@ -180,6 +181,11 @@ srs_error_t SrsHybridServer::initialize() return srs_error_wrap(err, "dvr async"); } + // Initialize TencentCloud CLS object. + if ((err = _srs_cls->initialize()) != srs_success) { + return srs_error_wrap(err, "cls client"); + } + // Register some timers. timer20ms_->subscribe(clock_monitor_); timer5s_->subscribe(this); @@ -384,6 +390,12 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval) thread_desc.c_str(), free_desc.c_str(), objs_desc.c_str() ); + // Report logs to CLS if enabled. + if ((err = srs_cls_report()) != srs_success) { + srs_warn("ignore cls err %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + return err; } diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 874e64a89..8aa9610d6 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -18,6 +18,8 @@ using namespace std; #include #include #include +#include +#include string srs_generate_stat_vid() { @@ -99,13 +101,14 @@ SrsStatisticStream::SrsStatisticStream() kbps->set_io(NULL, NULL); nb_clients = 0; - nb_frames = 0; + frames = new SrsPps(); } SrsStatisticStream::~SrsStatisticStream() { srs_freep(kbps); srs_freep(clk); + srs_freep(frames); } srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj) @@ -118,7 +121,7 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj) obj->set("app", SrsJsonAny::str(app.c_str())); obj->set("live_ms", SrsJsonAny::integer(srsu2ms(srs_get_system_time()))); obj->set("clients", SrsJsonAny::integer(nb_clients)); - obj->set("frames", SrsJsonAny::integer(nb_frames)); + obj->set("frames", SrsJsonAny::integer(frames->sugar)); obj->set("send_bytes", SrsJsonAny::integer(kbps->get_send_bytes())); obj->set("recv_bytes", SrsJsonAny::integer(kbps->get_recv_bytes())); @@ -369,7 +372,7 @@ srs_error_t SrsStatistic::on_video_frames(SrsRequest* req, int nb_frames) SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); - stream->nb_frames += nb_frames; + stream->frames->sugar += nb_frames; return err; } @@ -508,6 +511,7 @@ SrsKbps* SrsStatistic::kbps_sample() for (it = streams.begin(); it != streams.end(); it++) { SrsStatisticStream* stream = it->second; stream->kbps->sample(); + stream->frames->update(); } } if (true) { @@ -610,6 +614,65 @@ void SrsStatistic::dumps_hints_kv(std::stringstream & ss) } } +void SrsStatistic::dumps_cls_summaries(SrsClsSugar* sugar) +{ + if (!vhosts.empty()) { + sugar->kvf("vhosts", "%d", (int) vhosts.size()); + } + if (!streams.empty()) { + sugar->kvf("streams", "%d", (int) streams.size()); + } + if (!clients.empty()) { + sugar->kvf("clients", "%d", (int) clients.size()); + } +} + +void SrsStatistic::dumps_cls_streams(SrsClsSugars* sugars) +{ + for (std::map::iterator it = streams.begin(); it != streams.end(); ++it) { + SrsStatisticStream* stream = it->second; + if (!stream->active || !stream->nb_clients) { + continue; + } + + SrsClsSugar* sugar = sugars->create(); + sugar->kv("hint", "stream"); + sugar->kv("version", RTMP_SIG_SRS_VERSION); + sugar->kvf("pid", "%d", getpid()); + + sugar->kv("sid", stream->id); + sugar->kv("url", stream->url); + + if (stream->frames->r30s()) { + sugar->kvf("fps", "%d", stream->frames->r30s()); + } + if (stream->width) { + sugar->kvf("width", "%d", stream->width); + } + if (stream->height) { + sugar->kvf("height", "%d", stream->height); + } + + SrsStatisticClient* pub = find_client(stream->publisher_id); + if (pub) { + if (pub->kbps->get_recv_kbps_30s()) { + sugar->kvf("recv", "%d", pub->kbps->get_recv_kbps_30s()); + } + if (pub->kbps->get_send_kbps_30s()) { + sugar->kvf("send", "%d", pub->kbps->get_send_kbps_30s()); + } + } + + sugar->kvf("clients", "%d", stream->nb_clients); + if (stream->kbps->get_recv_kbps_30s()) { + sugar->kvf("recv2", "%d", stream->kbps->get_recv_kbps_30s()); + } + if (stream->kbps->get_send_kbps_30s()) { + sugar->kvf("send2", "%d", stream->kbps->get_send_kbps_30s()); + } + } +} + SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req) { SrsStatisticVhost* vhost = NULL; diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 37a1e22eb..a15539181 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -24,6 +24,9 @@ class ISrsExpire; class SrsJsonObject; class SrsJsonArray; class ISrsKbpsDelta; +class SrsClsSugar; +class SrsClsSugars; +class SrsPps; struct SrsStatisticVhost { @@ -55,11 +58,12 @@ public: // The publisher connection id. std::string publisher_id; int nb_clients; - uint64_t nb_frames; public: // The stream total kbps. SrsKbps* kbps; SrsWallClock* clk; + // The fps of stream. + SrsPps* frames; public: bool has_video; SrsVideoCodecId vcodec; @@ -200,6 +204,10 @@ public: virtual srs_error_t dumps_clients(SrsJsonArray* arr, int start, int count); // Dumps the hints about SRS server. void dumps_hints_kv(std::stringstream & ss); +public: + // Dumps the CLS summary. + void dumps_cls_summaries(SrsClsSugar* sugar); + void dumps_cls_streams(SrsClsSugars* sugars); private: virtual SrsStatisticVhost* create_vhost(SrsRequest* req); virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req); diff --git a/trunk/src/app/srs_app_tencentcloud.cpp b/trunk/src/app/srs_app_tencentcloud.cpp new file mode 100644 index 000000000..582e00e6a --- /dev/null +++ b/trunk/src/app/srs_app_tencentcloud.cpp @@ -0,0 +1,1135 @@ +// +// Copyright (c) 2013-2022 The SRS Authors +// +// SPDX-License-Identifier: MIT or MulanPSL-2.0 +// + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +using namespace std; + +// See https://cloud.tencent.com/document/product/614/12445 +namespace tencentcloud_api_sign { + std::string sha1(const void *data, size_t len) { + unsigned char digest[SHA_DIGEST_LENGTH]; + SHA_CTX ctx; + SHA1_Init(&ctx); + SHA1_Update(&ctx, data, len); + SHA1_Final(digest, &ctx); + char c_sha1[SHA_DIGEST_LENGTH*2+1]; + for (unsigned i = 0; i < SHA_DIGEST_LENGTH; ++i) { + sprintf(&c_sha1[i*2], "%02x", (unsigned int)digest[i]); + } + return c_sha1; + } + + std::string hmac_sha1(const char *key, const void *data, size_t len) { + unsigned char digest[EVP_MAX_MD_SIZE]; + unsigned digest_len; + char c_hmacsha1[EVP_MAX_MD_SIZE*2+1]; +#if !defined(OPENSSL_VERSION_NUMBER) || OPENSSL_VERSION_NUMBER < 0x10100000L + HMAC_CTX ctx; + HMAC_CTX_init(&ctx); + HMAC_Init_ex(&ctx, key, strlen(key), EVP_sha1(), NULL); + HMAC_Update(&ctx, (unsigned char*)data, len); + HMAC_Final(&ctx, digest, &digest_len); + HMAC_CTX_cleanup(&ctx); +#else + HMAC_CTX *ctx = HMAC_CTX_new(); + HMAC_CTX_reset(ctx); + HMAC_Init_ex(ctx, key, strlen(key), EVP_sha1(), NULL); + HMAC_Update(ctx, (unsigned char *)data, len); + HMAC_Final(ctx, digest, &digest_len); + HMAC_CTX_free(ctx); +#endif + for (unsigned i = 0; i != digest_len; ++i) { + sprintf(&c_hmacsha1[i*2], "%02x", (unsigned int)digest[i]); + } + return c_hmacsha1; + } + + std::string urlencode(const char *s) { + static unsigned char hexchars[] = "0123456789ABCDEF"; + size_t length = strlen(s), pos = 0; + unsigned char c_url[length*3+1]; + const unsigned char *p = (const unsigned char *)s; + for (; *p; ++p) { + if (isalnum((unsigned char)*p) || (*p == '-') || + (*p == '_') || (*p == '.') || (*p == '~')) { + c_url[pos++] = *p; + } else { + c_url[pos++] = '%'; + c_url[pos++] = hexchars[(*p)>>4]; + c_url[pos++] = hexchars[(*p)&15U]; + } + } + c_url[pos] = 0; + return (char*)c_url; + } + + std::string signature(const std::string &secret_id, + const std::string &secret_key, + std::string method, + const std::string &path, + const std::map ¶ms, + const std::map &headers, + long expire) { + + const size_t SIGNLEN = 1024; + std::string http_request_info, uri_parm_list, + header_list, str_to_sign, sign_key; + transform(method.begin(), method.end(), method.begin(), ::tolower); + http_request_info.reserve(SIGNLEN); + http_request_info.append(method).append("\n").append(path).append("\n"); + uri_parm_list.reserve(SIGNLEN); + std::map::const_iterator iter; + for (iter = params.begin(); + iter != params.end(); ) { + uri_parm_list.append(iter->first); + http_request_info.append(iter->first).append("=") + .append(urlencode(iter->second.c_str())); + if (++iter != params.end()) { + uri_parm_list.append(";"); + http_request_info.append("&"); + } + } + http_request_info.append("\n"); + header_list.reserve(SIGNLEN); + for (iter = headers.begin(); + iter != headers.end(); ++iter) { + sign_key = iter->first; + transform(sign_key.begin(), sign_key.end(), sign_key.begin(), ::tolower); + if (sign_key == "content-type" || sign_key == "content-md5" + || sign_key == "host" || sign_key[0] == 'x') { + header_list.append(sign_key); + http_request_info.append(sign_key).append("=") + .append(urlencode(iter->second.c_str())); + header_list.append(";"); + http_request_info.append("&"); + } + } + if (!header_list.empty()) { + header_list[header_list.size() - 1] = 0; + http_request_info[http_request_info.size() - 1] = '\n'; + } + //printf("%s\nEOF\n", http_request_info.c_str()); + char signed_time[SIGNLEN]; + int signed_time_len = snprintf(signed_time, SIGNLEN, + "%lu;%lu", time(0) - 60, time(0) + expire); + //snprintf(signed_time, SIGNLEN, "1510109254;1510109314"); + std::string signkey = hmac_sha1(secret_key.c_str(), + signed_time, signed_time_len); + str_to_sign.reserve(SIGNLEN); + str_to_sign.append("sha1").append("\n") + .append(signed_time).append("\n") + .append(sha1(http_request_info.c_str(), http_request_info.size())) + .append("\n"); + //printf("%s\nEOF\n", str_to_sign.c_str()); + char c_signature[SIGNLEN]; + snprintf(c_signature, SIGNLEN, + "q-sign-algorithm=sha1&q-ak=%s" + "&q-sign-time=%s&q-key-time=%s" + "&q-header-list=%s&q-url-param-list=%s&q-signature=%s", + secret_id.c_str(), signed_time, signed_time, + header_list.c_str(), uri_parm_list.c_str(), + hmac_sha1(signkey.c_str(), str_to_sign.c_str(), + str_to_sign.size()).c_str()); + return c_signature; + } +} + +// See https://developers.google.com/protocol-buffers/docs/encoding#varints +class SrsProtobufVarints +{ +private: + // See Go bits.Len64 of package math/bits. + static int bits_len64(uint64_t x) { + // See Go bits.len8tab of package math/bits. + static uint8_t bits_len8tab[256] = { + 0x00, 0x01, 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, + 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, + 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, + 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, + 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, + 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, + 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, + 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, + 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, + 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, + 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, + 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, + 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, + 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, + 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, + 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, + }; + + int n = 0; + if (x >= (uint64_t)1<<32) { + x >>= 32; + n = 32; + } + if (x >= (uint64_t)1<<16) { + x >>= 16; + n += 16; + } + if (x >= (uint64_t)1<<8) { + x >>= 8; + n += 8; + } + return n + int(bits_len8tab[x]); + } +public: + // See Go protowire.SizeVarint of package google.golang.org/protobuf/encoding/protowire + static int sizeof_varint(uint64_t v) { + int n = bits_len64(v); + return int(9 * uint32_t(n) + 64) / 64; + } + // See Go protowire.AppendVarint of package google.golang.org/protobuf/encoding/protowire + static srs_error_t encode(SrsBuffer* b, uint64_t v) { + srs_error_t err = srs_success; + + if (!b->require(SrsProtobufVarints::sizeof_varint(v))) { + return srs_error_new(ERROR_PB_NO_SPACE, "require %d only %d bytes", v, b->left()); + } + + if (v < (uint64_t)1<<7) { + b->write_1bytes((uint8_t)v); + } else if (v < (uint64_t)1<<14) { + b->write_1bytes((uint8_t)(((v>>0)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(v>>7)); + } else if (v < (uint64_t)1<<21) { + b->write_1bytes((uint8_t)(((v>>0)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>7)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>14)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(v>>21)); + } else if (v < (uint64_t)1<<35) { + b->write_1bytes((uint8_t)(((v>>0)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>7)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>14)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>21)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(v>>28)); + } else if (v < (uint64_t)1<<42) { + b->write_1bytes((uint8_t)(((v>>0)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>7)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>14)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>21)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>28)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(v>>35)); + } else if (v < (uint64_t)1<<49) { + b->write_1bytes((uint8_t)(((v>>0)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>7)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>14)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>21)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>28)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>35)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(v>>42)); + } else if(v < (uint64_t)1<<56) { + b->write_1bytes((uint8_t)(((v>>0)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>7)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>14)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>21)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>28)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>35)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>42)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(v>>49)); + } else if (v < (uint64_t)1<<63) { + b->write_1bytes((uint8_t)(((v>>0)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>7)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>14)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>21)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>28)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>35)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>42)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>49)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(v>>56)); + } else { + b->write_1bytes((uint8_t)(((v>>0)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>7)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>14)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>21)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>28)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>35)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>42)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>49)&0x7f)|0x80)); + b->write_1bytes((uint8_t)(((v>>56)&0x7f)|0x80)); + b->write_1bytes((uint8_t)1); + } + + return err; + } +}; + +class SrsProtobufString +{ +public: + // See Go protowire.SizeBytes of package google.golang.org/protobuf/encoding/protowire + static int sizeof_string(const std::string& v) { + uint64_t n = v.length(); + return SrsProtobufVarints::sizeof_varint(uint64_t(n)) + n; + } + // See Go protowire.AppendString of package google.golang.org/protobuf/encoding/protowire + static srs_error_t encode(SrsBuffer* b, const std::string& v) { + srs_error_t err = srs_success; + + uint64_t n = v.length(); + if ((err = SrsProtobufVarints::encode(b, n)) != srs_success) { + return srs_error_wrap(err, "string size %d", n); + } + + if (!b->require(n)) { + return srs_error_new(ERROR_PB_NO_SPACE, "require %d only %d byte", n, b->left()); + } + b->write_string(v); + + return err; + } +}; + +// See https://cloud.tencent.com/document/api/614/16873 +class SrsClsLogContent +{ +private: + // required string key = 1; + std::string key_; + // required string value = 2; + std::string value_; +public: + SrsClsLogContent(); + virtual ~SrsClsLogContent(); +public: + SrsClsLogContent* set_key(std::string v); + SrsClsLogContent* set_value(std::string v); +public: + virtual uint64_t nb_bytes(); + srs_error_t encode(SrsBuffer* b); +}; + +// See https://cloud.tencent.com/document/api/614/16873 +class SrsClsLog +{ +private: + // required int64 time = 1; + int64_t time_; + // repeated Content contents= 2; + std::vector contents_; +public: + SrsClsLog(); + virtual ~SrsClsLog(); +public: + SrsClsLogContent* add_content(); + SrsClsLog* set_time(int64_t v); +public: + virtual uint64_t nb_bytes(); + srs_error_t encode(SrsBuffer* b); +}; + +// See https://cloud.tencent.com/document/api/614/16873 +class SrsClsLogGroup +{ +private: + // repeated Log logs= 1; + std::vector logs_; + // optional string source = 4; + std::string source_; +public: + SrsClsLogGroup(); + virtual ~SrsClsLogGroup(); +public: + SrsClsLogGroup* set_source(std::string v); + SrsClsLog* add_log(); +public: + virtual uint64_t nb_bytes(); + srs_error_t encode(SrsBuffer* b); +}; + +// See https://cloud.tencent.com/document/api/614/16873 +class SrsClsLogGroupList +{ +private: + // repeated LogGroup logGroupList = 1; + std::vector groups_; +public: + SrsClsLogGroupList(); + virtual ~SrsClsLogGroupList(); +public: + SrsClsLogGroup* add_log_group(); +public: + virtual uint64_t nb_bytes(); + srs_error_t encode(SrsBuffer* b); +}; + +SrsClsLogContent::SrsClsLogContent() +{ +} + +SrsClsLogContent::~SrsClsLogContent() +{ +} + +SrsClsLogContent* SrsClsLogContent::set_key(std::string v) +{ + key_ = v; + return this; +} + +SrsClsLogContent* SrsClsLogContent::set_value(std::string v) +{ + value_ = v; + return this; +} + +uint64_t SrsClsLogContent::nb_bytes() +{ + uint64_t nn = 1 + SrsProtobufString::sizeof_string(key_); + nn += 1 + SrsProtobufString::sizeof_string(value_); + return nn; +} + +srs_error_t SrsClsLogContent::encode(SrsBuffer* b) +{ + srs_error_t err = srs_success; + + // Encode the field key as [ID=1, TYPE=2(Length delimited)] + if (!b->require(1)) { + return srs_error_new(ERROR_PB_NO_SPACE, "require 1 byte"); + } + b->write_1bytes(0x0a); + + if ((err = SrsProtobufString::encode(b, key_)) != srs_success) { + return srs_error_wrap(err, "encode key=%s", key_.c_str()); + } + + // Encode the field value as [ID=2, TYPE=2(Length delimited)] + if (!b->require(1)) { + return srs_error_new(ERROR_PB_NO_SPACE, "require 1 byte"); + } + b->write_1bytes(0x12); + + if ((err = SrsProtobufString::encode(b, value_)) != srs_success) { + return srs_error_wrap(err, "encode value=%s", value_.c_str()); + } + + return err; +} + +SrsClsLog::SrsClsLog() +{ +} + +SrsClsLog::~SrsClsLog() +{ + for (std::vector::iterator it = contents_.begin(); it != contents_.end(); ++it) { + SrsClsLogContent* content = *it; + srs_freep(content); + } +} + +SrsClsLogContent* SrsClsLog::add_content() +{ + SrsClsLogContent* content = new SrsClsLogContent(); + contents_.push_back(content); + return content; +} + +SrsClsLog* SrsClsLog::set_time(int64_t v) +{ + time_ = v; + return this; +} + +uint64_t SrsClsLog::nb_bytes() +{ + uint64_t nn = 1 + SrsProtobufVarints::sizeof_varint(time_); + + for (std::vector::iterator it = contents_.begin(); it != contents_.end(); ++it) { + SrsClsLogContent* content = *it; + uint64_t size = content->nb_bytes(); + nn += 1 + SrsProtobufVarints::sizeof_varint(size) + size; + } + + return nn; +} + +srs_error_t SrsClsLog::encode(SrsBuffer* b) +{ + srs_error_t err = srs_success; + + // Encode the field time as [ID=1, TYPE=0(Varint)] + if (!b->require(1)) { + return srs_error_new(ERROR_PB_NO_SPACE, "require 1 byte"); + } + b->write_1bytes(0x08); + + if ((err = SrsProtobufVarints::encode(b, time_)) != srs_success) { + return srs_error_wrap(err, "encode time"); + } + + // Encode each content. + for (std::vector::iterator it = contents_.begin(); it != contents_.end(); ++it) { + SrsClsLogContent* content = *it; + + // Encode the field contents as [ID=2, TYPE=2(Length delimited)] + if (!b->require(1)) { + return srs_error_new(ERROR_PB_NO_SPACE, "require 1 byte"); + } + b->write_1bytes(0x12); + + // Encode the varint size of children. + uint64_t size = content->nb_bytes(); + if ((err = SrsProtobufVarints::encode(b, size)) != srs_success) { + return srs_error_wrap(err, "encode size=%d", (int)size); + } + + // Encode the content itself. + if ((err = content->encode(b)) != srs_success) { + return srs_error_wrap(err, "encode content"); + } + } + + return err; +} + +SrsClsLogGroup::SrsClsLogGroup() +{ +} + +SrsClsLogGroup::~SrsClsLogGroup() +{ + for (std::vector::iterator it = logs_.begin(); it != logs_.end(); ++it) { + SrsClsLog* log = *it; + srs_freep(log); + } +} + +SrsClsLogGroup* SrsClsLogGroup::set_source(std::string v) +{ + source_ = v; + return this; +} + +SrsClsLog* SrsClsLogGroup::add_log() +{ + SrsClsLog* log = new SrsClsLog(); + logs_.push_back(log); + return log; +} + +uint64_t SrsClsLogGroup::nb_bytes() +{ + uint64_t nn = 0; + for (std::vector::iterator it = logs_.begin(); it != logs_.end(); ++it) { + SrsClsLog* log = *it; + uint64_t size = log->nb_bytes(); + nn += 1 + SrsProtobufVarints::sizeof_varint(size) + size; + } + + nn += 1 + SrsProtobufString::sizeof_string(source_); + return nn; +} + +srs_error_t SrsClsLogGroup::encode(SrsBuffer* b) +{ + srs_error_t err = srs_success; + + // Encode each log. + for (std::vector::iterator it = logs_.begin(); it != logs_.end(); ++it) { + SrsClsLog* log = *it; + + // Encode the field logs as [ID=1, TYPE=2(Length delimited)] + if (!b->require(1)) { + return srs_error_new(ERROR_PB_NO_SPACE, "require 1 byte"); + } + b->write_1bytes(0x0a); + + // Encode the varint size of children. + uint64_t size = log->nb_bytes(); + if ((err = SrsProtobufVarints::encode(b, size)) != srs_success) { + return srs_error_wrap(err, "encode size=%d", (int)size); + } + + // Encode the log itself. + if ((err = log->encode(b)) != srs_success) { + return srs_error_wrap(err, "encode log"); + } + } + + // Encode the field source as [ID=4, TYPE=2(Length delimited)] + if (!b->require(1)) { + return srs_error_new(ERROR_PB_NO_SPACE, "require 1 byte"); + } + b->write_1bytes(0x22); + + if ((err = SrsProtobufString::encode(b, source_)) != srs_success) { + return srs_error_wrap(err, "encode source=%s", source_.c_str()); + } + + return err; +} + +SrsClsLogGroupList::SrsClsLogGroupList() +{ +} + +SrsClsLogGroupList::~SrsClsLogGroupList() +{ + for (std::vector::iterator it = groups_.begin(); it != groups_.end(); ++it) { + SrsClsLogGroup* group = *it; + srs_freep(group); + } +} + +SrsClsLogGroup* SrsClsLogGroupList::add_log_group() +{ + SrsClsLogGroup* group = new SrsClsLogGroup(); + groups_.push_back(group); + return group; +} + +uint64_t SrsClsLogGroupList::nb_bytes() +{ + uint64_t nn = 0; + for (std::vector::iterator it = groups_.begin(); it != groups_.end(); ++it) { + SrsClsLogGroup* group = *it; + uint64_t size = group->nb_bytes(); + nn += 1 + SrsProtobufVarints::sizeof_varint(size) + size; + } + return nn; +} + +srs_error_t SrsClsLogGroupList::encode(SrsBuffer* b) +{ + srs_error_t err = srs_success; + + // Encode each group. + for (std::vector::iterator it = groups_.begin(); it != groups_.end(); ++it) { + SrsClsLogGroup* group = *it; + + // Encode the field groups as [ID=1, TYPE=2(Length delimited)] + if (!b->require(1)) { + return srs_error_new(ERROR_PB_NO_SPACE, "require 1 byte"); + } + b->write_1bytes(0x0a); + + // Encode the varint size of children. + uint64_t size = group->nb_bytes(); + if ((err = SrsProtobufVarints::encode(b, size)) != srs_success) { + return srs_error_wrap(err, "encode size=%d", (int)size); + } + + // Encode the log group itself. + if ((err = group->encode(b)) != srs_success) { + return srs_error_wrap(err, "encode group"); + } + } + + return err; +} + +SrsClsSugar::SrsClsSugar() +{ + log_groups_ = new SrsClsLogGroupList(); + log_group_ = log_groups_->add_log_group(); + log_ = log_group_->add_log(); + + log_group_->set_source(srs_get_public_internet_address(true)); + log_->set_time(srs_get_system_time() / SRS_UTIME_SECONDS); + kv("agent", RTMP_SIG_SRS_SERVER); + + string label = _srs_cls->label(); + if (!label.empty()) { + kv("label", label); + } + + string tag = _srs_cls->tag(); + if (!tag.empty()) { + kv("tag", tag); + } + + string server_id = SrsStatistic::instance()->server_id(); + if (!server_id.empty()) { + kv("id", server_id); + } +} + +SrsClsSugar::~SrsClsSugar() +{ + srs_freep(log_groups_); +} + +SrsClsSugar* SrsClsSugar::kv(std::string k, std::string v) +{ + log_->add_content()->set_key(k)->set_value(v); + return this; +} + +SrsClsSugar* SrsClsSugar::kvf(std::string k, const char* fmt, ...) +{ + static int LOG_MAX_SIZE = 4096; + static char* buf = new char[LOG_MAX_SIZE]; + + va_list ap; + va_start(ap, fmt); + int r0 = vsnprintf(buf, LOG_MAX_SIZE, fmt, ap); + va_end(ap); + + // Something not expected, drop the log. If error, it might be 0 or negative value. If greater or equals to the + // LOG_MAX_SIZE, means need more buffers to write the data and the last byte might be 0. If success, return the + // number of characters printed, not including the trailing 0. + if (r0 <= 0 || r0 >= LOG_MAX_SIZE) { + return this; + } + + string v = string(buf, r0); + return kv(k, v); +} + +uint64_t SrsClsSugar::nb_bytes() +{ + return log_groups_->nb_bytes(); +} + +srs_error_t SrsClsSugar::encode(SrsBuffer* b) +{ + return log_groups_->encode(b); +} + +SrsClsSugars::SrsClsSugars() +{ +} + +SrsClsSugars::~SrsClsSugars() +{ + for (vector::iterator it = sugars.begin(); it != sugars.end(); ++it) { + SrsClsSugar* sugar = *it; + srs_freep(sugar); + } +} + +uint64_t SrsClsSugars::nb_bytes() +{ + uint64_t size = 0; + for (vector::iterator it = sugars.begin(); it != sugars.end(); ++it) { + SrsClsSugar* sugar = *it; + size += sugar->nb_bytes(); + } + return size; +} + +srs_error_t SrsClsSugars::encode(SrsBuffer* b) +{ + srs_error_t err = srs_success; + + for (vector::iterator it = sugars.begin(); it != sugars.end(); ++it) { + SrsClsSugar* sugar = *it; + if ((err = sugar->encode(b)) != srs_success) { + return srs_error_wrap(err, "encode %d sugars", (int)sugars.size()); + } + } + + return err; +} + +SrsClsSugar* SrsClsSugars::create() +{ + SrsClsSugar* sugar = new SrsClsSugar(); + sugars.push_back(sugar); + return sugar; +} + +SrsClsSugars* SrsClsSugars::slice(int max_size) +{ + SrsClsSugars* v = new SrsClsSugars(); + + uint64_t v_size = 0; + for (vector::iterator it = sugars.begin(); it != sugars.end();) { + SrsClsSugar* sugar = *it; + + // Always push at least one elem. + it = sugars.erase(it); + v->sugars.push_back(sugar); + + // Util exceed the max size. + v_size += sugar->nb_bytes(); + if (v_size > max_size) { + break; + } + } + + return v; +} + +bool SrsClsSugars::empty() +{ + return sugars.empty(); +} + +int SrsClsSugars::size() +{ + return (int)sugars.size(); +} + +SrsClsClient* _srs_cls = NULL; + +SrsClsClient::SrsClsClient() +{ + enabled_ = false; + stat_heartbeat_ = false; + stat_streams_ = false; + debug_logging_ = false; + heartbeat_ratio_ = 0; + streams_ratio_ = 0; +} + +SrsClsClient::~SrsClsClient() +{ +} + +bool SrsClsClient::enabled() +{ + return enabled_; +} + +bool SrsClsClient::stat_heartbeat() +{ + return stat_heartbeat_; +} + +bool SrsClsClient::stat_streams() +{ + return stat_streams_; +} + +int SrsClsClient::heartbeat_ratio() +{ + return heartbeat_ratio_; +} + +int SrsClsClient::streams_ratio() +{ + return streams_ratio_; +} + +string SrsClsClient::label() +{ + return label_; +} + +string SrsClsClient::tag() +{ + return tag_; +} + +srs_error_t SrsClsClient::initialize() +{ + srs_error_t err = srs_success; + + enabled_ = _srs_config->get_tencentcloud_cls_enabled(); + if (!enabled_) { + srs_trace("TencentCloud CLS is disabled"); + return err; + } + + label_ = _srs_config->get_tencentcloud_cls_label(); + tag_ = _srs_config->get_tencentcloud_cls_tag(); + stat_heartbeat_ = _srs_config->get_tencentcloud_cls_stat_heartbeat(); + stat_streams_ = _srs_config->get_tencentcloud_cls_stat_streams(); + debug_logging_ = _srs_config->get_tencentcloud_cls_debug_logging(); + heartbeat_ratio_ = srs_max(1, _srs_config->get_tencentcloud_cls_heartbeat_ratio()); + streams_ratio_ = srs_max(1, _srs_config->get_tencentcloud_cls_streams_ratio()); + + secret_id_ = _srs_config->get_tencentcloud_cls_secret_id(); + if (secret_id_.empty()) { + return srs_error_new(ERROR_CLS_INVALID_CONFIG, "CLS no config for secret_id"); + } + + string secret_key = _srs_config->get_tencentcloud_cls_secret_key(); + if (secret_key.empty()) { + return srs_error_new(ERROR_CLS_INVALID_CONFIG, "CLS no config for secret_key"); + } + + endpoint_ = _srs_config->get_tencentcloud_cls_endpoint(); + if (endpoint_.empty()) { + return srs_error_new(ERROR_CLS_INVALID_CONFIG, "CLS no config for endpoint"); + } + + topic_ = _srs_config->get_tencentcloud_cls_topic_id(); + if (topic_.empty()) { + return srs_error_new(ERROR_CLS_INVALID_CONFIG, "CLS no config for topic_id"); + } + + srs_trace("Initialize TencentCloud CLS label=%s, tag=%s, secret_id=%dB, secret_key=%dB, endpoint=%s, topic=%s, heartbeat=%d/%d, streams=%d/%d debug_logging=%d", + label_.c_str(), tag_.c_str(), secret_id_.length(), secret_key.length(), endpoint_.c_str(), topic_.c_str(), stat_heartbeat_, heartbeat_ratio_, stat_streams_, streams_ratio_, debug_logging_); + + return err; +} + +srs_error_t SrsClsClient::send_log(ISrsEncoder* sugar, int count, int total) +{ + srs_error_t err = srs_success; + + uint64_t size = sugar->nb_bytes(); + // Max size is 5MB, error is 403:LogSizeExceed, see https://cloud.tencent.com/document/api/614/12402 + if (size >= 5 * 1024 * 1024) { + return srs_error_new(ERROR_CLS_EXCEED_SIZE, "exceed 5MB actual %d", size); + } + + char* buf = new char[size]; + SrsAutoFreeA(char, buf); + + memset(buf, 0, size); + SrsBuffer b(buf, size); + if ((err = sugar->encode(&b)) != srs_success) { + return srs_error_wrap(err, "encode log"); + } + + string body(buf, size); + + // Write a CLS log to service specified by url. + string url = "http://" + endpoint_ + ":80/structuredlog?topic_id=" + topic_; + + SrsHttpUri uri; + if ((err = uri.initialize(url)) != srs_success) { + return srs_error_wrap(err, "http: post failed. url=%s", url.c_str()); + } + + SrsHttpClient http; + if ((err = http.initialize(uri.get_schema(), uri.get_host(), uri.get_port())) != srs_success) { + return srs_error_wrap(err, "http: init client"); + } + + // Sign the request, see https://cloud.tencent.com/document/product/614/56475 + if (true) { + map params; + params["topic_id"] = topic_; + + map headers; + headers["Host"] = uri.get_host(); + headers["Content-Type"] = "application/x-protobuf"; + http.set_header("Content-Type", "application/x-protobuf"); + + string method = "POST"; + string secret_key = _srs_config->get_tencentcloud_cls_secret_key(); + std::string signature = tencentcloud_api_sign::signature( + secret_id_, secret_key, method, uri.get_path(), params, headers, 300 // Expire in seconds + ); + headers["Authorization"] = signature; + http.set_header("Authorization", signature); + } + + string path = uri.get_path(); + if (!uri.get_query().empty()) { + path += "?" + uri.get_query(); + } + + // Start request and parse response. + ISrsHttpMessage* msg = NULL; + if ((err = http.post(path, body, &msg)) != srs_success) { + return srs_error_wrap(err, "http: client post"); + } + SrsAutoFree(ISrsHttpMessage, msg); + + string res; + uint16_t code = msg->status_code(); + if ((err = msg->body_read_all(res)) != srs_success) { + return srs_error_wrap(err, "http: body read"); + } + + // ensure the http status is ok. + if (code != SRS_CONSTS_HTTP_OK && code != SRS_CONSTS_HTTP_Created) { + return srs_error_new(ERROR_HTTP_STATUS_INVALID, "http: status %d, body is %s", code, res.c_str()); + } + + string request_id = msg->header()->get("X-Cls-Requestid"); + if (request_id.empty() && !debug_logging_) { + srs_warn("no CLS requestId for log %dB", body.length()); + } + + if (debug_logging_) { + srs_trace("CLS write logs=%d/%d, size=%dB, request_id=%s", count, total, body.length(), request_id.c_str()); + } + + return err; +} + +// For each upload, never exceed 2MB, to avoid burst of CPU or network usage. +#define SRS_CLS_BATCH_MAX_LOG_SIZE 2 * 1024 * 1024 + +srs_error_t SrsClsClient::send_logs(SrsClsSugars* sugars) +{ + srs_error_t err = srs_success; + + // Never do infinite loop, limit to a max loop and drop logs if exceed. + int total = sugars->size(); + for (int i = 0; i < 128 && !sugars->empty(); ++i) { + SrsClsSugars* v = sugars->slice(SRS_CLS_BATCH_MAX_LOG_SIZE); + SrsAutoFree(SrsClsSugars, v); + + if ((err = send_log((ISrsEncoder*)v, v->size(), total)) != srs_success) { + return srs_error_wrap(err, "send %d/%d/%d logs", v->size(), i, total); + } + } + + return err; +} + +srs_error_t srs_cls_dump_summaries(SrsClsSugars* sugars) +{ + srs_error_t err = srs_success; + + // Ignore if disabled. + if (!_srs_cls->enabled() || !_srs_cls->stat_heartbeat()) { + return err; + } + + // Whether it's time to report heartbeat. + static int nn_heartbeat = -1; + bool interval_ok = nn_heartbeat == -1 || ++nn_heartbeat >= _srs_cls->heartbeat_ratio(); + if (interval_ok) { + nn_heartbeat = 0; + } + if (!interval_ok) { + return err; + } + + SrsClsSugar* sugar = sugars->create(); + sugar->kv("hint", "summary"); + sugar->kv("version", RTMP_SIG_SRS_VERSION); + sugar->kvf("pid", "%d", getpid()); + + // Server ID to identify logs from a set of servers' logs. + SrsStatistic::instance()->dumps_cls_summaries(sugar); + + SrsProcSelfStat* u = srs_get_self_proc_stat(); + if (u->ok) { + // The cpu usage of SRS, 1 means 1/1000 + if (u->percent > 0) { + sugar->kvf("cpu", "%d", (int) (u->percent * 1000)); + } + } + + SrsPlatformInfo* p = srs_get_platform_info(); + if (p->ok) { + // The uptime of SRS, in seconds. + if (p->srs_startup_time > 0) { + sugar->kvf("uptime", "%d", (int) ((srs_get_system_time() - p->srs_startup_time) / SRS_UTIME_SECONDS)); + } + // The load of system, load every 1 minute, 1 means 1/1000. + if (p->load_one_minutes > 0) { + sugar->kvf("load", "%d", (int) (p->load_one_minutes * 1000)); + } + } + + SrsRusage* r = srs_get_system_rusage(); + SrsMemInfo* m = srs_get_meminfo(); + if (r->ok && m->ok) { + float self_mem_percent = 0; + if (m->MemTotal > 0) { + self_mem_percent = (float)(r->r.ru_maxrss / (double)m->MemTotal); + } + + // The memory of SRS, 1 means 1/1000 + if (self_mem_percent > 0) { + sugar->kvf("mem", "%d", (int) (self_mem_percent * 1000)); + } + } + + SrsProcSystemStat* s = srs_get_system_proc_stat(); + if (s->ok) { + // The cpu usage of system, 1 means 1/1000 + if (s->percent > 0) { + sugar->kvf("cpu2", "%d", (int) (s->percent * 1000)); + } + } + + SrsNetworkRtmpServer* nrs = srs_get_network_rtmp_server(); + if (nrs->ok) { + // The number of connections of SRS. + if (nrs->nb_conn_srs > 0) { + sugar->kvf("conn", "%d", nrs->nb_conn_srs); + } + // The number of connections of system. + if (nrs->nb_conn_sys > 0) { + sugar->kvf("conn2", "%d", nrs->nb_conn_sys); + } + // The received kbps in 30s of SRS. + if (nrs->rkbps_30s > 0) { + sugar->kvf("recv", "%d", nrs->rkbps_30s); + } + // The sending out kbps in 30s of SRS. + if (nrs->skbps_30s > 0) { + sugar->kvf("send", "%d", nrs->skbps_30s); + } + } + + return err; +} + +srs_error_t srs_cls_dump_streams(SrsClsSugars* sugars) +{ + srs_error_t err = srs_success; + + // Ignore if disabled. + if (!_srs_cls->enabled() || !_srs_cls->stat_streams()) { + return err; + } + + // Whether it's time to report streams. + static int nn_streams = -1; + bool interval_ok = nn_streams == -1 || ++nn_streams >= _srs_cls->streams_ratio(); + if (interval_ok) { + nn_streams = 0; + } + if (!interval_ok) { + return err; + } + + // Dumps all streams as sugars. + SrsStatistic::instance()->dumps_cls_streams(sugars); + + return err; +} + +srs_error_t srs_cls_report() +{ + srs_error_t err = srs_success; + + SrsClsSugars sugars; + + if ((err = srs_cls_dump_summaries(&sugars)) != srs_success) { + return srs_error_wrap(err, "dump summary"); + } + + if ((err = srs_cls_dump_streams(&sugars)) != srs_success) { + return srs_error_wrap(err, "dump streams"); + } + + if ((err = _srs_cls->send_logs(&sugars)) != srs_success) { + return srs_error_wrap(err, "cls"); + } + + return err; +} + + diff --git a/trunk/src/app/srs_app_tencentcloud.hpp b/trunk/src/app/srs_app_tencentcloud.hpp new file mode 100644 index 000000000..47c1eaa9f --- /dev/null +++ b/trunk/src/app/srs_app_tencentcloud.hpp @@ -0,0 +1,95 @@ +// +// Copyright (c) 2013-2022 The SRS Authors +// +// SPDX-License-Identifier: MIT or MulanPSL-2.0 +// + +#ifndef SRS_APP_TENCENTCLOUD_HPP +#define SRS_APP_TENCENTCLOUD_HPP + +#include + +#include + +#include +#include + +class SrsBuffer; +class SrsClsLogGroupList; +class SrsClsLogGroup; +class SrsClsLog; + +class SrsClsSugar : public ISrsEncoder +{ +private: + SrsClsLog* log_; + SrsClsLogGroup* log_group_; + SrsClsLogGroupList* log_groups_; +public: + SrsClsSugar(); + virtual ~SrsClsSugar(); +public: + virtual uint64_t nb_bytes(); + srs_error_t encode(SrsBuffer* b); +public: + SrsClsSugar* kv(std::string k, std::string v); + SrsClsSugar* kvf(std::string k, const char* fmt, ...); +}; + +class SrsClsSugars : public ISrsEncoder +{ +private: + std::vector sugars; +public: + SrsClsSugars(); + virtual ~SrsClsSugars(); +public: + virtual uint64_t nb_bytes(); + srs_error_t encode(SrsBuffer* b); +public: + SrsClsSugar* create(); + SrsClsSugars* slice(int max_size); + bool empty(); + int size(); +}; + +class SrsClsClient +{ +private: + bool enabled_; + bool stat_heartbeat_; + bool stat_streams_; + bool debug_logging_; + int heartbeat_ratio_; + int streams_ratio_; + std::string label_; + std::string tag_; +private: + std::string secret_id_; + std::string endpoint_; + std::string topic_; +public: + SrsClsClient(); + virtual ~SrsClsClient(); +public: + bool enabled(); + bool stat_heartbeat(); + bool stat_streams(); + int heartbeat_ratio(); + int streams_ratio(); + std::string label(); + std::string tag(); +public: + srs_error_t initialize(); +private: + srs_error_t send_log(ISrsEncoder* sugar, int count, int total); +public: + srs_error_t send_logs(SrsClsSugars* sugars); +}; + +extern SrsClsClient* _srs_cls; + +srs_error_t srs_cls_report(); + +#endif + diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index fb38684e7..01450ffa0 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #ifdef SRS_RTC #include @@ -438,6 +439,9 @@ srs_error_t srs_global_initialize() // Create global async worker for DVR. _srs_dvr_async = new SrsAsyncCallWorker(); + // Initialize global TencentCloud CLS object. + _srs_cls = new SrsClsClient(); + return err; } diff --git a/trunk/src/core/srs_core_version5.hpp b/trunk/src/core/srs_core_version5.hpp index e9f2fa87c..d6f69f2ef 100644 --- a/trunk/src/core/srs_core_version5.hpp +++ b/trunk/src/core/srs_core_version5.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 5 #define VERSION_MINOR 0 -#define VERSION_REVISION 43 +#define VERSION_REVISION 44 #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index a296d4c47..602818465 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -99,6 +99,9 @@ #define ERROR_SOCKET_ACCEPT 1081 #define ERROR_THREAD_CREATE 1082 #define ERROR_THREAD_FINISHED 1083 +#define ERROR_PB_NO_SPACE 1084 +#define ERROR_CLS_INVALID_CONFIG 1085 +#define ERROR_CLS_EXCEED_SIZE 1086 /////////////////////////////////////////////////////// // RTMP protocol error. diff --git a/trunk/src/kernel/srs_kernel_kbps.cpp b/trunk/src/kernel/srs_kernel_kbps.cpp index 322e96458..30dcb3f4e 100644 --- a/trunk/src/kernel/srs_kernel_kbps.cpp +++ b/trunk/src/kernel/srs_kernel_kbps.cpp @@ -91,6 +91,11 @@ int SrsPps::r10s() return sample_10s_.rate; } +int SrsPps::r30s() +{ + return sample_30s_.rate; +} + SrsWallClock::SrsWallClock() { } diff --git a/trunk/src/kernel/srs_kernel_kbps.hpp b/trunk/src/kernel/srs_kernel_kbps.hpp index fac456d14..6e6c96c4d 100644 --- a/trunk/src/kernel/srs_kernel_kbps.hpp +++ b/trunk/src/kernel/srs_kernel_kbps.hpp @@ -53,6 +53,8 @@ public: void update(int64_t nn); // Get the 10s average stat. int r10s(); + // Get the 30s average stat. + int r30s(); }; /**