[FLINK-37042][pipeline-connector/maxcompute] Rename maxcompute pipieline connector options to follow flink style

This closes  #3852
pull/3857/head
moses 2 weeks ago committed by GitHub
parent 32934393ce
commit 4dc31d8c61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -94,7 +94,7 @@ pipeline:
<td>Sink 的名称.</td>
</tr>
<tr>
<td>accessId</td>
<td>access-id</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
@ -102,7 +102,7 @@ pipeline:
AccessKey管理页面</a> 获取AccessKey ID。</td>
</tr>
<tr>
<td>accessKey</td>
<td>access-key</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
@ -126,14 +126,14 @@ pipeline:
MaxCompute控制台</a>,在 工作区 > 项目管理 页面获取MaxCompute项目名称。</td>
</tr>
<tr>
<td>tunnelEndpoint</td>
<td>tunnel.endpoint</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>MaxCompute Tunnel服务的连接地址通常这项配置可以根据指定的project所在的region进行自动路由。仅在使用代理等特殊网络环境下使用该配置。</td>
</tr>
<tr>
<td>quotaName</td>
<td>quota.name</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
@ -141,14 +141,14 @@ pipeline:
使用 Maxcompute 独享资源组</a></td>
</tr>
<tr>
<td>stsToken</td>
<td>sts-token</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>当使用RAM角色颁发的短时有效的访问令牌STS Token进行鉴权时需要指定该参数。</td>
</tr>
<tr>
<td>bucketsNum</td>
<td>buckets-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
@ -156,35 +156,35 @@ pipeline:
Delta Table 概述</a></td>
</tr>
<tr>
<td>compressAlgorithm</td>
<td>compress.algorithm</td>
<td>optional</td>
<td style="word-wrap: break-word;">zlib</td>
<td>String</td>
<td>写入MaxCompute时使用的数据压缩算法当前支持<code>raw</code>(不进行压缩),<code>zlib</code><code>snappy</code></td>
<td>写入MaxCompute时使用的数据压缩算法当前支持<code>raw</code>(不进行压缩),<code>zlib</code>, <code>lz4</code><code>snappy</code></td>
</tr>
<tr>
<td>totalBatchSize</td>
<td>total.buffer-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">64MB</td>
<td>String</td>
<td>内存中缓冲的数据量大小单位为分区级非分区表单位为表级不同分区的缓冲区相互独立达到阈值后数据写入到MaxCompute。</td>
</tr>
<tr>
<td>bucketBatchSize</td>
<td>bucket.buffer-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">4MB</td>
<td>String</td>
<td>内存中缓冲的数据量大小,单位为桶级,仅写入 Delta 表时生效。不同数据桶的缓冲区相互独立达到阈值后将该桶数据写入到MaxCompute。</td>
</tr>
<tr>
<td>numCommitThreads</td>
<td>commit.thread-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>checkpoint阶段能够同时处理的分区数量。</td>
</tr>
<tr>
<td>numFlushConcurrent</td>
<td>flush.concurrent-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">4</td>
<td>Integer</td>

@ -94,7 +94,7 @@ pipeline:
<td>The name of the sink.</td>
</tr>
<tr>
<td>accessId</td>
<td>access-id</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
@ -102,7 +102,7 @@ pipeline:
AccessKey management page</a> Obtain AccessKey ID.</td>
</tr>
<tr>
<td>accessKey</td>
<td>access-key</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
@ -124,63 +124,63 @@ pipeline:
<td>The name of the MaxCompute project. You can log in to the <a href="https://maxcompute.console.aliyun.com/">MaxCompute console</a> and obtain the MaxCompute project name on the Workspace > Project Management page.</td>
</tr>
<tr>
<td>tunnelEndpoint</td>
<td>tunnel.endpoint</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The connection address for the MaxCompute Tunnel service. Typically, this configuration can be auto-routed based on the region where the specified project is located. It is used only in special network environments such as when using a proxy.</td>
</tr>
<tr>
<td>quotaName</td>
<td>quota.name</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The name of the exclusive resource group for MaxCompute data transfer. If not specified, the shared resource group is used. For details, refer to <a href="https://help.aliyun.com/zh/maxcompute/user-guide/purchase-and-use-exclusive-resource-groups-for-dts">Using exclusive resource groups for Maxcompute</a></td>
</tr>
<tr>
<td>stsToken</td>
<td>sts-token</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When using a temporary access token (STS Token) issued by a RAM role for authentication, this parameter must be specified.</td>
</tr>
<tr>
<td>bucketsNum</td>
<td>buckets-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>The number of buckets used when auto-creating MaxCompute Delta tables. For usage, refer to <a href="https://help.aliyun.com/zh/maxcompute/user-guide/transaction-table2-0-overview">Delta Table Overview</a></td>
</tr>
<tr>
<td>compressAlgorithm</td>
<td>compress.algorithm</td>
<td>optional</td>
<td style="word-wrap: break-word;">zlib</td>
<td>String</td>
<td>The data compression algorithm used when writing to MaxCompute. Currently supports <code>raw</code> (no compression), <code>zlib</code>, and <code>snappy</code>.</td>
<td>The data compression algorithm used when writing to MaxCompute. Currently supports <code>raw</code> (no compression), <code>zlib</code>, <code>lz4</code>, and <code>snappy</code>.</td>
</tr>
<tr>
<td>totalBatchSize</td>
<td>total.buffer-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">64MB</td>
<td>String</td>
<td>The size of the data buffer in memory, by partition level (for non-partitioned tables, by table level). Buffers for different partitions (tables) are independent, and data is written to MaxCompute when the threshold is reached.</td>
</tr>
<tr>
<td>bucketBatchSize</td>
<td>bucket.buffer-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">4MB</td>
<td>String</td>
<td>The size of the data buffer in memory, by bucket level. This is effective only when writing to Delta tables. Buffers for different data buckets are independent, and the bucket data is written to MaxCompute when the threshold is reached.</td>
</tr>
<tr>
<td>numCommitThreads</td>
<td>commit.thread-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>The number of partitions (tables) that can be processed simultaneously during the checkpoint stage.</td>
</tr>
<tr>
<td>numFlushConcurrent</td>
<td>flush.concurrent-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">4</td>
<td>Integer</td>

@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.maxcompute.options.CompressAlgorithm;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
import org.apache.flink.configuration.MemorySize;
@ -69,21 +70,20 @@ public class MaxComputeDataSinkFactory implements DataSinkFactory {
private MaxComputeWriteOptions extractMaxComputeWriteOptions(
Configuration factoryConfiguration) {
int numCommitThread =
factoryConfiguration.get(MaxComputeDataSinkOptions.NUM_COMMIT_THREADS);
String compressAlgorithm =
int numCommitThread = factoryConfiguration.get(MaxComputeDataSinkOptions.COMMIT_THREAD_NUM);
CompressAlgorithm compressAlgorithm =
factoryConfiguration.get(MaxComputeDataSinkOptions.COMPRESS_ALGORITHM);
int flushConcurrent =
factoryConfiguration.get(MaxComputeDataSinkOptions.NUM_FLUSH_CONCURRENT);
factoryConfiguration.get(MaxComputeDataSinkOptions.FLUSH_CONCURRENT_NUM);
long maxBufferSize =
MemorySize.parse(
factoryConfiguration.get(
MaxComputeDataSinkOptions.TOTAL_BATCH_SIZE))
MaxComputeDataSinkOptions.TOTAL_BUFFER_SIZE))
.getBytes();
long maxSlotSize =
MemorySize.parse(
factoryConfiguration.get(
MaxComputeDataSinkOptions.BUCKET_BATCH_SIZE))
MaxComputeDataSinkOptions.BUCKET_BUFFER_SIZE))
.getBytes();
return MaxComputeWriteOptions.builder()
@ -119,11 +119,11 @@ public class MaxComputeDataSinkFactory implements DataSinkFactory {
optionalOptions.add(MaxComputeDataSinkOptions.STS_TOKEN);
optionalOptions.add(MaxComputeDataSinkOptions.BUCKETS_NUM);
// write options
optionalOptions.add(MaxComputeDataSinkOptions.NUM_COMMIT_THREADS);
optionalOptions.add(MaxComputeDataSinkOptions.COMMIT_THREAD_NUM);
optionalOptions.add(MaxComputeDataSinkOptions.COMPRESS_ALGORITHM);
optionalOptions.add(MaxComputeDataSinkOptions.NUM_FLUSH_CONCURRENT);
optionalOptions.add(MaxComputeDataSinkOptions.TOTAL_BATCH_SIZE);
optionalOptions.add(MaxComputeDataSinkOptions.BUCKET_BATCH_SIZE);
optionalOptions.add(MaxComputeDataSinkOptions.FLUSH_CONCURRENT_NUM);
optionalOptions.add(MaxComputeDataSinkOptions.TOTAL_BUFFER_SIZE);
optionalOptions.add(MaxComputeDataSinkOptions.BUCKET_BUFFER_SIZE);
return optionalOptions;
}

@ -20,18 +20,19 @@ package org.apache.flink.cdc.connectors.maxcompute;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.ConfigOptions;
import org.apache.flink.cdc.connectors.maxcompute.options.CompressAlgorithm;
/** Options for MaxCompute Data Sink. */
public class MaxComputeDataSinkOptions {
// basic options.
public static final ConfigOption<String> ACCESS_ID =
ConfigOptions.key("accessId")
ConfigOptions.key("access-id")
.stringType()
.noDefaultValue()
.withDescription("MaxCompute user access id.");
public static final ConfigOption<String> ACCESS_KEY =
ConfigOptions.key("accessKey")
ConfigOptions.key("access-key")
.stringType()
.noDefaultValue()
.withDescription("MaxCompute user access key.");
@ -49,59 +50,60 @@ public class MaxComputeDataSinkOptions {
.withDescription("MaxCompute project.");
public static final ConfigOption<String> TUNNEL_ENDPOINT =
ConfigOptions.key("tunnelEndpoint")
ConfigOptions.key("tunnel.endpoint")
.stringType()
.noDefaultValue()
.withDescription("MaxCompute tunnel end point.");
public static final ConfigOption<String> QUOTA_NAME =
ConfigOptions.key("quotaName")
ConfigOptions.key("quota.name")
.stringType()
.noDefaultValue()
.withDescription(
"MaxCompute tunnel quota name, note that not quota nick-name.");
public static final ConfigOption<String> STS_TOKEN =
ConfigOptions.key("stsToken")
ConfigOptions.key("sts-token")
.stringType()
.noDefaultValue()
.withDescription("MaxCompute sts token.");
public static final ConfigOption<Integer> BUCKETS_NUM =
ConfigOptions.key("bucketsNum")
ConfigOptions.key("buckets-num")
.intType()
.defaultValue(16)
.withDescription(
"The batch size of MaxCompute table when automatically create table.");
// write options.
public static final ConfigOption<String> COMPRESS_ALGORITHM =
ConfigOptions.key("compressAlgorithm")
.stringType()
.defaultValue("zlib")
public static final ConfigOption<CompressAlgorithm> COMPRESS_ALGORITHM =
ConfigOptions.key("compress.algorithm")
.enumType(CompressAlgorithm.class)
.defaultValue(CompressAlgorithm.ZLIB)
.withDescription(
"The compress algorithm of data upload to MaxCompute, support 'zlib', 'snappy', 'raw'.");
public static final ConfigOption<String> TOTAL_BATCH_SIZE =
ConfigOptions.key("totalBatchSize")
.stringType()
.defaultValue("64MB")
.withDescription("The max batch size of data upload to MaxCompute.");
"The compress algorithm of data upload to MaxCompute, support 'zlib', 'snappy', 'lz4', 'raw'.");
public static final ConfigOption<String> BUCKET_BATCH_SIZE =
ConfigOptions.key("bucketBatchSize")
public static final ConfigOption<String> BUCKET_BUFFER_SIZE =
ConfigOptions.key("bucket.buffer-size")
.stringType()
.defaultValue("4MB")
.withDescription(
"The max batch size of data per bucket when upload to MaxCompute");
public static final ConfigOption<Integer> NUM_COMMIT_THREADS =
ConfigOptions.key("numCommitThreads")
public static final ConfigOption<String> TOTAL_BUFFER_SIZE =
ConfigOptions.key("total.buffer-size")
.stringType()
.defaultValue("64MB")
.withDescription("The max batch size of data upload to MaxCompute.");
public static final ConfigOption<Integer> COMMIT_THREAD_NUM =
ConfigOptions.key("commit.thread-num")
.intType()
.defaultValue(16)
.withDescription("The number of threads used to commit data to MaxCompute.");
public static final ConfigOption<Integer> NUM_FLUSH_CONCURRENT =
ConfigOptions.key("numFlushConcurrent")
public static final ConfigOption<Integer> FLUSH_CONCURRENT_NUM =
ConfigOptions.key("flush.concurrent-num")
.intType()
.defaultValue(4)
.withDescription("The number of concurrent with flush bucket data.");

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.maxcompute.options;
/** Compress algorithm for MaxCompute table. */
public enum CompressAlgorithm {
/** No compress. */
RAW("raw"),
/** Zlib compress. */
ZLIB("zlib"),
/** LZ4 compress. */
LZ4("lz4"),
/** Snappy compress. */
@Deprecated
SNAPPY("snappy");
private final String value;
CompressAlgorithm(String value) {
this.value = value;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return value;
}
}

@ -34,7 +34,7 @@ public class MaxComputeWriteOptions implements Serializable {
this.maxBufferSize = builder.maxBufferSize;
this.slotBufferSize = builder.slotBufferSize;
this.numCommitThread = builder.numCommitThread;
this.compressAlgorithm = builder.compressAlgorithm;
this.compressAlgorithm = builder.compressAlgorithm.getValue();
}
public static Builder builder() {
@ -67,7 +67,7 @@ public class MaxComputeWriteOptions implements Serializable {
private long maxBufferSize = 64 * 1024 * 1024L;
private long slotBufferSize = 1024 * 1024L;
private int numCommitThread = 16;
private String compressAlgorithm = "zlib";
private CompressAlgorithm compressAlgorithm = CompressAlgorithm.ZLIB;
public Builder withFlushConcurrent(int flushConcurrent) {
this.flushConcurrent = flushConcurrent;
@ -89,7 +89,7 @@ public class MaxComputeWriteOptions implements Serializable {
return this;
}
public Builder withCompressAlgorithm(String compressAlgorithm) {
public Builder withCompressAlgorithm(CompressAlgorithm compressAlgorithm) {
this.compressAlgorithm = compressAlgorithm;
return this;
}

@ -108,17 +108,17 @@ public class MaxComputeE2eITCase extends PipelineTestEnvironment {
+ "sink:\n"
+ " type: maxcompute\n"
+ " name: MaxComputeSink\n"
+ " accessId: ak\n"
+ " accessKey: sk\n"
+ " access-id: ak\n"
+ " access-key: sk\n"
+ " endpoint: "
+ getEndpoint()
+ "\n"
+ " tunnelEndpoint: "
+ " tunnel.endpoint: "
+ getEndpoint()
+ "\n"
+ " project: mocked_mc\n"
+ " bucketsNum: 8\n"
+ " compressAlgorithm: raw\n"
+ " buckets-num: 8\n"
+ " compress.algorithm: raw\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 4";

Loading…
Cancel
Save