[FLINK-35121][common] Adds validation for pipeline definition options

pull/3398/head
yux 8 months ago committed by GitHub
parent c958daf8ab
commit 2bd2e4ce24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -275,6 +275,21 @@ pipeline:
- `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
例如,可以在 YAML 配置文件中这样指定启动模式:
```yaml
source:
type: mysql
scan.startup.mode: earliest-offset # Start from earliest offset
scan.startup.mode: latest-offset # Start from latest offset
scan.startup.mode: specific-offset # Start from specific offset
scan.startup.mode: timestamp # Start from timestamp
scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode
# ...
```
## 数据类型映射

@ -284,6 +284,21 @@ The config option `scan.startup.mode` specifies the startup mode for MySQL CDC c
specified with binlog filename and position, or a GTID set if GTID is enabled on server.
- `timestamp`: Skip snapshot phase and start reading binlog events from a specific timestamp.
For example in YAML definition:
```yaml
source:
type: mysql
scan.startup.mode: earliest-offset # Start from earliest offset
scan.startup.mode: latest-offset # Start from latest offset
scan.startup.mode: specific-offset # Start from specific offset
scan.startup.mode: timestamp # Start from timestamp
scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode
# ...
```
## Data Type Mapping

@ -83,7 +83,8 @@ class CliFrontendTest {
"--global-config",
globalPipelineConfig());
assertThat(executor.getGlobalPipelineConfig().toMap().get("parallelism")).isEqualTo("1");
assertThat(executor.getGlobalPipelineConfig().toMap().get("foo")).isEqualTo("bar");
assertThat(executor.getGlobalPipelineConfig().toMap().get("schema.change.behavior"))
.isEqualTo("ignore");
}
@Test

@ -77,7 +77,6 @@ class YamlPipelineDefinitionParserTest {
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
.put("foo", "bar")
.build()));
assertThat(pipelineDef).isEqualTo(fullDefWithGlobalConf);
}
@ -224,7 +223,7 @@ class YamlPipelineDefinitionParserTest {
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.put("schema.change.behavior", "evolve")
.put("schema-operator.rpc-timeout", "1 h")
.build()));
@ -285,9 +284,8 @@ class YamlPipelineDefinitionParserTest {
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.put("schema.change.behavior", "evolve")
.put("schema-operator.rpc-timeout", "1 h")
.put("foo", "bar")
.build()));
private final PipelineDef defWithOptional =
@ -327,5 +325,5 @@ class YamlPipelineDefinitionParserTest {
new SinkDef("kafka", null, new Configuration()),
Collections.emptyList(),
Collections.emptyList(),
new Configuration());
Configuration.fromMap(Collections.singletonMap("parallelism", "1")));
}

@ -55,5 +55,5 @@ transform:
pipeline:
name: source-database-sync-pipe
parallelism: 4
enable-schema-evolution: false
schema.change.behavior: evolve
schema-operator.rpc-timeout: 1 h

@ -19,3 +19,6 @@ source:
sink:
type: kafka
pipeline:
parallelism: 1

@ -15,4 +15,4 @@
# limitations under the License.
################################################################################
parallelism: 1
foo: bar
schema.change.behavior: ignore

@ -22,8 +22,10 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import static org.apache.flink.cdc.common.configuration.ConfigurationUtils.canBePrefixMap;
@ -131,6 +133,17 @@ public class Configuration implements java.io.Serializable, Cloneable {
return this;
}
/**
* Returns the keys of all key/value pairs stored inside this configuration object.
*
* @return the keys of all key/value pairs stored inside this configuration object
*/
public Set<String> keySet() {
synchronized (this.confData) {
return new HashSet<>(this.confData.keySet());
}
}
public Map<String, String> toMap() {
synchronized (this.confData) {
Map<String, String> ret = new HashMap<>(this.confData.size());
@ -247,6 +260,10 @@ public class Configuration implements java.io.Serializable, Cloneable {
return Optional.empty();
}
public Set<String> getKeys() {
return confData.keySet();
}
@Override
public int hashCode() {
int hash = 0;

@ -18,12 +18,133 @@
package org.apache.flink.cdc.common.factories;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.table.api.ValidationException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** A helper for working with {@link Factory}. */
@PublicEvolving
public class FactoryHelper {
private final Factory factory;
private final Factory.Context context;
private FactoryHelper(Factory factory, Factory.Context context) {
this.factory = factory;
this.context = context;
}
public static FactoryHelper createFactoryHelper(Factory factory, Factory.Context context) {
return new FactoryHelper(factory, context);
}
/**
* Validates the required and optional {@link ConfigOption}s of a factory.
*
* <p>Note: It does not check for left-over options.
*/
public static void validateFactoryOptions(Factory factory, Configuration configuration) {
validateFactoryOptions(factory.requiredOptions(), factory.optionalOptions(), configuration);
}
/**
* Validates the required options and optional options.
*
* <p>Note: It does not check for left-over options.
*/
public static void validateFactoryOptions(
Set<ConfigOption<?>> requiredOptions,
Set<ConfigOption<?>> optionalOptions,
Configuration configuration) {
final List<String> missingRequiredOptions =
requiredOptions.stream()
.filter(option -> configuration.get(option) == null)
.map(ConfigOption::key)
.sorted()
.collect(Collectors.toList());
if (!missingRequiredOptions.isEmpty()) {
throw new ValidationException(
String.format(
"One or more required options are missing.\n\n"
+ "Missing required options are:\n\n"
+ "%s",
String.join("\n", missingRequiredOptions)));
}
optionalOptions.forEach(configuration::getOptional);
}
/** Validates unconsumed option keys. */
public static void validateUnconsumedKeys(
String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys) {
final Set<String> remainingOptionKeys = new HashSet<>(allOptionKeys);
remainingOptionKeys.removeAll(consumedOptionKeys);
if (!remainingOptionKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Unsupported options found for '%s'.\n\n"
+ "Unsupported options:\n\n"
+ "%s\n\n"
+ "Supported options:\n\n"
+ "%s",
factoryIdentifier,
remainingOptionKeys.stream().sorted().collect(Collectors.joining("\n")),
String.join("\n", consumedOptionKeys)));
}
}
/** Validates the options of the factory. It checks for unconsumed option keys. */
public void validate() {
Set<String> allOptionKeys =
Stream.concat(
factory.requiredOptions().stream().map(ConfigOption::key),
factory.optionalOptions().stream().map(ConfigOption::key))
.collect(Collectors.toSet());
validateFactoryOptions(factory, context.getFactoryConfiguration());
validateUnconsumedKeys(
factory.identifier(), context.getFactoryConfiguration().getKeys(), allOptionKeys);
}
/**
* Validates the options of the factory. It checks for unconsumed option keys while ignoring the
* options with given prefixes.
*
* <p>The option keys that have given prefix {@code prefixToSkip} would just be skipped for
* validation.
*
* @param prefixesToSkip Set of option key prefixes to skip validation
*/
public void validateExcept(String... prefixesToSkip) {
Preconditions.checkArgument(
prefixesToSkip.length > 0, "Prefixes to skip can not be empty.");
final List<String> prefixesList = Arrays.asList(prefixesToSkip);
Set<String> allOptionKeys =
Stream.concat(
factory.requiredOptions().stream().map(ConfigOption::key),
factory.optionalOptions().stream().map(ConfigOption::key))
.collect(Collectors.toSet());
Set<String> filteredOptionKeys =
context.getFactoryConfiguration().getKeys().stream()
.filter(key -> prefixesList.stream().noneMatch(key::startsWith))
.collect(Collectors.toSet());
validateFactoryOptions(factory, context.getFactoryConfiguration());
validateUnconsumedKeys(factory.identifier(), filteredOptionKeys, allOptionKeys);
}
/** Default implementation of {@link Factory.Context}. */
public static class DefaultContext implements Factory.Context {

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.factories;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.ConfigOptions;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/** Tests for {@link FactoryHelper}. */
public class FactoryHelperTests {
private Factory getDummyFactory() {
return new Factory() {
@Override
public String identifier() {
return "dummy";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Sets.newHashSet(
ConfigOptions.key("id").intType().noDefaultValue(),
ConfigOptions.key("name").stringType().noDefaultValue(),
ConfigOptions.key("age").doubleType().noDefaultValue());
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Sets.newHashSet(
ConfigOptions.key("hobby").stringType().noDefaultValue(),
ConfigOptions.key("location").stringType().defaultValue("Everywhere"),
ConfigOptions.key("misc")
.mapType()
.defaultValue(Collections.singletonMap("A", "Z")));
}
};
}
@Test
void testCorrectConfigValidation() {
// This is a valid configuration.
Map<String, String> configurations = new HashMap<>();
configurations.put("id", "1");
configurations.put("name", "Alice");
configurations.put("age", "17");
configurations.put("location", "Here");
FactoryHelper factoryHelper =
FactoryHelper.createFactoryHelper(
getDummyFactory(),
new FactoryHelper.DefaultContext(
Configuration.fromMap(configurations), null, null));
factoryHelper.validate();
}
@Test
void testMissingRequiredOptionConfigValidation() {
// This configuration doesn't provide all required options.
Map<String, String> configurations = new HashMap<>();
configurations.put("id", "1");
configurations.put("age", "17");
configurations.put("location", "Here");
FactoryHelper factoryHelper =
FactoryHelper.createFactoryHelper(
getDummyFactory(),
new FactoryHelper.DefaultContext(
Configuration.fromMap(configurations), null, null));
Assertions.assertThatThrownBy(factoryHelper::validate)
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining("One or more required options are missing.");
}
@Test
void testIncompatibleTypeValidation() {
// This configuration has an option with mismatched type.
Map<String, String> configurations = new HashMap<>();
configurations.put("id", "1");
configurations.put("name", "Alice");
configurations.put("age", "Not a number");
configurations.put("location", "Here");
FactoryHelper factoryHelper =
FactoryHelper.createFactoryHelper(
getDummyFactory(),
new FactoryHelper.DefaultContext(
Configuration.fromMap(configurations), null, null));
Assertions.assertThatThrownBy(factoryHelper::validate)
.isExactlyInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Could not parse value 'Not a number' for key 'age'.");
}
@Test
void testRedundantConfigValidation() {
// This configuration has redundant config options.
Map<String, String> configurations = new HashMap<>();
configurations.put("id", "1");
configurations.put("name", "Alice");
configurations.put("age", "17");
configurations.put("what", "Not a valid configOption");
FactoryHelper factoryHelper =
FactoryHelper.createFactoryHelper(
getDummyFactory(),
new FactoryHelper.DefaultContext(
Configuration.fromMap(configurations), null, null));
Assertions.assertThatThrownBy(factoryHelper::validate)
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining("Unsupported options found for 'dummy'.");
}
@Test
void testAllowedPrefixConfigValidation() {
// This configuration has allowed prefix options.
Map<String, String> configurations = new HashMap<>();
configurations.put("id", "1");
configurations.put("name", "Alice");
configurations.put("age", "17");
configurations.put("debezium.foo", "Some debezium options");
configurations.put("debezium.bar", "Another debezium options");
configurations.put("canal.baz", "Yet another debezium options");
FactoryHelper factoryHelper =
FactoryHelper.createFactoryHelper(
getDummyFactory(),
new FactoryHelper.DefaultContext(
Configuration.fromMap(configurations), null, null));
Assertions.assertThatThrownBy(factoryHelper::validate)
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining("Unsupported options found for 'dummy'.");
Assertions.assertThatThrownBy(() -> factoryHelper.validateExcept("debezium."))
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining("Unsupported options found for 'dummy'.");
Assertions.assertThatThrownBy(() -> factoryHelper.validateExcept("canal."))
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining("Unsupported options found for 'dummy'.");
factoryHelper.validateExcept("debezium.", "canal.");
}
}

@ -18,15 +18,21 @@
package org.apache.flink.cdc.composer.definition;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.table.api.ValidationException;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
@ -67,6 +73,8 @@ public class PipelineDef {
this.routes = routes;
this.transforms = transforms;
this.config = evaluatePipelineTimeZone(config);
validatePipelineDefinition(this.config);
}
public SourceDef getSource() {
@ -130,6 +138,43 @@ public class PipelineDef {
// Utilities
// ------------------------------------------------------------------------
@VisibleForTesting
public static void validatePipelineDefinition(Configuration configuration)
throws ValidationException {
List<ConfigOption<?>> options =
Arrays.asList(
PipelineOptions.PIPELINE_NAME,
PipelineOptions.PIPELINE_PARALLELISM,
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
PipelineOptions.PIPELINE_LOCAL_TIME_ZONE,
PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID,
PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT);
Set<String> optionKeys =
options.stream().map(ConfigOption::key).collect(Collectors.toSet());
configuration
.getKeys()
.forEach(
key -> {
if (!optionKeys.contains(key)) {
throw new ValidationException(
String.format("Unknown configuration key `%s`", key));
}
});
options.forEach(
option -> {
if (!configuration.getOptional(option).isPresent()
&& !option.hasDefaultValue()) {
throw new ValidationException(
String.format(
"Configuration key `%s` is not specified, and no default value available.",
option.key()));
}
});
}
/**
* Returns the current session time zone id. It is used when converting to/from {@code TIMESTAMP
* WITH LOCAL TIME ZONE}.

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.composer.definition;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
/** Tests for {@link PipelineDef} validation. */
public class PipelineValidationTest {
@Test
void testNormalConfigValidation() {
// A common configuration file
Map<String, String> configurations = new HashMap<>();
configurations.put("parallelism", "1");
configurations.put("name", "Pipeline Job");
PipelineDef.validatePipelineDefinition(Configuration.fromMap(configurations));
}
@Test
void testTypeMismatchValidation() {
Map<String, String> configurations = new HashMap<>();
// option value with mismatched type.
configurations.put("parallelism", "Not a Number");
configurations.put("name", "Pipeline Job");
Assertions.assertThrowsExactly(
IllegalArgumentException.class,
() -> PipelineDef.validatePipelineDefinition(Configuration.fromMap(configurations)),
"Could not parse value 'Not a Number' for key 'parallelism'.");
}
@Test
void testEmptyConfigValidation() {
// An empty configuration should fail
Map<String, String> configurations = new HashMap<>();
Assertions.assertThrowsExactly(
ValidationException.class,
() ->
PipelineDef.validatePipelineDefinition(
Configuration.fromMap(configurations)));
}
@Test
void testUnknownConfigValidation() {
// An empty configuration should fail
Map<String, String> configurations = new HashMap<>();
configurations.put("parallelism", "1");
configurations.put("name", "Pipeline Job");
configurations.put("unknown", "optionValue");
Assertions.assertThrowsExactly(
ValidationException.class,
() ->
PipelineDef.validatePipelineDefinition(
Configuration.fromMap(configurations)));
}
}

@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.doris.sink.DorisDataSink;
@ -56,6 +57,7 @@ import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SI
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_USE_CACHE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.STREAM_LOAD_PROP_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
/** A dummy {@link DataSinkFactory} to create {@link DorisDataSink}. */
@ -63,6 +65,9 @@ import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.US
public class DorisDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(TABLE_CREATE_PROPERTIES_PREFIX, STREAM_LOAD_PROP_PREFIX);
Configuration config = context.getFactoryConfiguration();
DorisOptions.Builder optionsBuilder = DorisOptions.builder();
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
@ -134,11 +139,9 @@ public class DorisDataSinkFactory implements DataSinkFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(BENODES);
options.add(USERNAME);
options.add(PASSWORD);
options.add(JDBC_URL);
options.add(PASSWORD);
options.add(AUTO_REDIRECT);
options.add(SINK_CHECK_INTERVAL);

@ -21,6 +21,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.kafka.json.ChangeLogJsonFormatFactory;
@ -37,6 +38,10 @@ import java.util.Properties;
import java.util.Set;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT;
/** A dummy {@link DataSinkFactory} to create {@link KafkaDataSink}. */
public class KafkaDataSinkFactory implements DataSinkFactory {
@ -45,6 +50,8 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context).validateExcept(PROPERTIES_PREFIX);
Configuration configuration =
Configuration.fromMap(context.getFactoryConfiguration().toMap());
DeliveryGuarantee deliveryGuarantee =
@ -97,15 +104,17 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
@Override
public Set<ConfigOption<?>> requiredOptions() {
return null;
return new HashSet<>();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(VALUE_FORMAT);
options.add(TOPIC);
options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED);
options.add(SINK_CUSTOM_HEADER);
options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE);
options.add(KafkaDataSinkOptions.TOPIC);
options.add(KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED);
return options;
}
}

@ -22,26 +22,72 @@ import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Assertions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
/** Tests for {@link KafkaDataSinkFactory}. */
public class KafkaDataSinkFactoryTest {
@Test
public void testCreateDataSink() {
void testCreateDataSink() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", DataSinkFactory.class);
Assertions.assertTrue(sinkFactory instanceof KafkaDataSinkFactory);
Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class);
Configuration conf = Configuration.fromMap(ImmutableMap.<String, String>builder().build());
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf, conf, Thread.currentThread().getContextClassLoader()));
Assertions.assertTrue(dataSink instanceof KafkaDataSink);
Assertions.assertThat(dataSink).isInstanceOf(KafkaDataSink.class);
}
@Test
void testUnsupportedOption() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", DataSinkFactory.class);
Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class);
Configuration conf =
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("unsupported_key", "unsupported_value")
.build());
Assertions.assertThatThrownBy(
() ->
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf,
conf,
Thread.currentThread().getContextClassLoader())))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Unsupported options found for 'kafka'.\n\n"
+ "Unsupported options:\n\n"
+ "unsupported_key");
}
@Test
void testPrefixRequireOption() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", DataSinkFactory.class);
Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class);
Configuration conf =
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("properties.compression.type", "none")
.build());
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf, conf, Thread.currentThread().getContextClassLoader()));
Assertions.assertThat(dataSink).isInstanceOf(KafkaDataSink.class);
}
}

@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
@ -75,7 +76,9 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX;
import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.getJdbcProperties;
import static org.apache.flink.util.Preconditions.checkState;
@ -89,6 +92,9 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
@Override
public DataSource createDataSource(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(PROPERTIES_PREFIX, DEBEZIUM_OPTIONS_PREFIX);
final Configuration config = context.getFactoryConfiguration();
String hostname = config.get(HOSTNAME);
int port = config.get(PORT);
@ -192,26 +198,28 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
options.add(SERVER_TIME_ZONE);
options.add(TABLES_EXCLUDE);
options.add(SCHEMA_CHANGE_ENABLED);
options.add(SERVER_ID);
options.add(SERVER_TIME_ZONE);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
options.add(SCAN_SNAPSHOT_FETCH_SIZE);
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
options.add(CHUNK_META_GROUP_SIZE);
options.add(SCAN_SNAPSHOT_FETCH_SIZE);
options.add(CONNECT_TIMEOUT);
options.add(CONNECT_MAX_RETRIES);
options.add(CONNECTION_POOL_SIZE);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(CONNECT_MAX_RETRIES);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(HEARTBEAT_INTERVAL);
options.add(SCHEMA_CHANGE_ENABLED);
return options;
}

@ -17,10 +17,12 @@
package org.apache.flink.cdc.connectors.mysql.source;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.table.api.ValidationException;
import org.junit.Test;
@ -29,7 +31,9 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
@ -165,6 +169,76 @@ public class MySqlDataSourceFactoryTest extends MySqlSourceTestBase {
+ inventoryDatabase.getDatabaseName()));
}
@Test
public void testLackRequireOption() {
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
List<String> requireKeys =
factory.requiredOptions().stream()
.map(ConfigOption::key)
.collect(Collectors.toList());
for (String requireKey : requireKeys) {
Map<String, String> remainingOptions = new HashMap<>(options);
remainingOptions.remove(requireKey);
Factory.Context context = new MockContext(Configuration.fromMap(remainingOptions));
assertThatThrownBy(() -> factory.createDataSource(context))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
String.format(
"One or more required options are missing.\n\n"
+ "Missing required options are:\n\n"
+ "%s",
requireKey));
}
}
@Test
public void testUnsupportedOption() {
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
options.put("unsupported_key", "unsupported_value");
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
Factory.Context context = new MockContext(Configuration.fromMap(options));
assertThatThrownBy(() -> factory.createDataSource(context))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Unsupported options found for 'mysql'.\n\n"
+ "Unsupported options:\n\n"
+ "unsupported_key");
}
@Test
public void testPrefixRequireOption() {
inventoryDatabase.createAndInitialize();
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
options.put("jdbc.properties.requireSSL", "true");
options.put("debezium.snapshot.mode", "initial");
Factory.Context context = new MockContext(Configuration.fromMap(options));
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
assertThat(dataSource.getSourceConfig().getTableList())
.isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".products"));
}
class MockContext implements Factory.Context {
Configuration factoryConfiguration;

@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.utils.Preconditions;
@ -40,6 +41,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES;
/** A {@link DataSinkFactory} to create {@link PaimonDataSink}. */
public class PaimonDataSinkFactory implements DataSinkFactory {
@ -47,16 +51,16 @@ public class PaimonDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);
Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
Map<String, String> catalogOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();
allOptions.forEach(
(key, value) -> {
if (key.startsWith(PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES)) {
tableOptions.put(
key.substring(
PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES.length()),
value);
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value);
} else if (key.startsWith(PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) {
catalogOptions.put(
key.substring(
@ -118,9 +122,10 @@ public class PaimonDataSinkFactory implements DataSinkFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PaimonDataSinkOptions.URI);
options.add(PaimonDataSinkOptions.WAREHOUSE);
options.add(PaimonDataSinkOptions.URI);
options.add(PaimonDataSinkOptions.COMMIT_USER);
options.add(PaimonDataSinkOptions.PARTITION_KEY);
return options;
}
}

@ -47,7 +47,7 @@ public class PaimonDataSinkOptions {
public static final ConfigOption<String> METASTORE =
key("catalog.properties.metastore")
.stringType()
.defaultValue("filesystem")
.noDefaultValue()
.withDescription("Metastore of paimon catalog, supports filesystem and hive.");
public static final ConfigOption<String> URI =

@ -17,20 +17,26 @@
package org.apache.flink.cdc.connectors.paimon.sink;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Assertions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/** Tests for {@link PaimonDataSinkFactory}. */
public class PaimonDataSinkFactoryTest {
@ -38,10 +44,10 @@ public class PaimonDataSinkFactoryTest {
@TempDir public static java.nio.file.Path temporaryFolder;
@Test
public void testCreateDataSink() {
void testCreateDataSink() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class);
Assertions.assertInstanceOf(PaimonDataSinkFactory.class, sinkFactory);
Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class);
Configuration conf =
Configuration.fromMap(
@ -58,6 +64,104 @@ public class PaimonDataSinkFactoryTest {
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf, conf, Thread.currentThread().getContextClassLoader()));
Assertions.assertInstanceOf(PaimonDataSink.class, dataSink);
Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class);
}
@Test
void testLackRequireOption() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class);
Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class);
Map<String, String> options = new HashMap<>();
options.put(PaimonDataSinkOptions.METASTORE.key(), "filesystem");
options.put(
PaimonDataSinkOptions.WAREHOUSE.key(),
new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString());
List<String> requireKeys =
sinkFactory.requiredOptions().stream()
.map(ConfigOption::key)
.collect(Collectors.toList());
for (String requireKey : requireKeys) {
Map<String, String> remainingOptions = new HashMap<>(options);
remainingOptions.remove(requireKey);
Configuration conf = Configuration.fromMap(remainingOptions);
Assertions.assertThatThrownBy(
() ->
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf,
conf,
Thread.currentThread()
.getContextClassLoader())))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
String.format(
"One or more required options are missing.\n\n"
+ "Missing required options are:\n\n"
+ "%s",
requireKey));
}
}
@Test
void testUnsupportedOption() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class);
Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class);
Configuration conf =
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(PaimonDataSinkOptions.METASTORE.key(), "filesystem")
.put(
PaimonDataSinkOptions.WAREHOUSE.key(),
new File(
temporaryFolder.toFile(),
UUID.randomUUID().toString())
.toString())
.put("unsupported_key", "unsupported_value")
.build());
Assertions.assertThatThrownBy(
() ->
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf,
conf,
Thread.currentThread().getContextClassLoader())))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Unsupported options found for 'paimon'.\n\n"
+ "Unsupported options:\n\n"
+ "unsupported_key");
}
@Test
void testPrefixRequireOption() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class);
Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class);
Configuration conf =
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(PaimonDataSinkOptions.METASTORE.key(), "filesystem")
.put(
PaimonDataSinkOptions.WAREHOUSE.key(),
new File(
temporaryFolder.toFile(),
UUID.randomUUID().toString())
.toString())
.put("catalog.properties.uri", "")
.put("table.properties.bucket", "2")
.build());
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf, conf, Thread.currentThread().getContextClassLoader()));
Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class);
}
}

@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.starrocks.sink;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
@ -31,6 +32,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
/** A {@link DataSinkFactory} to create {@link StarRocksDataSink}. */
public class StarRocksDataSinkFactory implements DataSinkFactory {
@ -39,6 +42,9 @@ public class StarRocksDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(TABLE_CREATE_PROPERTIES_PREFIX, SINK_PROPERTIES_PREFIX);
StarRocksSinkOptions sinkOptions =
buildSinkConnectorOptions(context.getFactoryConfiguration());
TableCreateConfig tableCreateConfig =
@ -124,8 +130,7 @@ public class StarRocksDataSinkFactory implements DataSinkFactory {
sinkConfig.set(StarRocksSinkOptions.SINK_SEMANTIC, "at-least-once");
Map<String, String> streamProperties =
getPrefixConfigs(
cdcConfig.toMap(), StarRocksDataSinkOptions.SINK_PROPERTIES_PREFIX);
getPrefixConfigs(cdcConfig.toMap(), SINK_PROPERTIES_PREFIX);
// force to use json format for stream load to simplify the configuration,
// such as there is no need to reconfigure the "columns" property after
// schema change. csv format can be supported in the future if needed

@ -17,26 +17,32 @@
package org.apache.flink.cdc.connectors.starrocks.sink;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.junit.Test;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/** Tests for {@link org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory}. */
public class StarRocksDataSinkFactoryTest {
@Test
public void testCreateDataSink() {
void testCreateDataSink() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class);
assertTrue(sinkFactory instanceof StarRocksDataSinkFactory);
Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class);
Configuration conf =
Configuration.fromMap(
@ -50,6 +56,98 @@ public class StarRocksDataSinkFactoryTest {
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf, conf, Thread.currentThread().getContextClassLoader()));
assertTrue(dataSink instanceof StarRocksDataSink);
Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class);
}
@Test
void testLackRequireOption() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class);
Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class);
Map<String, String> options = new HashMap<>();
options.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
options.put("load-url", "127.0.0.1:8030");
options.put("username", "root");
options.put("password", "");
List<String> requireKeys =
sinkFactory.requiredOptions().stream()
.map(ConfigOption::key)
.collect(Collectors.toList());
for (String requireKey : requireKeys) {
Map<String, String> remainingOptions = new HashMap<>(options);
remainingOptions.remove(requireKey);
Configuration conf = Configuration.fromMap(remainingOptions);
Assertions.assertThatThrownBy(
() ->
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf,
conf,
Thread.currentThread()
.getContextClassLoader())))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
String.format(
"One or more required options are missing.\n\n"
+ "Missing required options are:\n\n"
+ "%s",
requireKey));
}
}
@Test
void testUnsupportedOption() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class);
Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class);
Configuration conf =
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
.put("load-url", "127.0.0.1:8030")
.put("username", "root")
.put("password", "")
.put("unsupported_key", "unsupported_value")
.build());
Assertions.assertThatThrownBy(
() ->
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf,
conf,
Thread.currentThread().getContextClassLoader())))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Unsupported options found for 'starrocks'.\n\n"
+ "Unsupported options:\n\n"
+ "unsupported_key");
}
@Test
void testPrefixRequireOption() {
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class);
Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class);
Configuration conf =
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
.put("load-url", "127.0.0.1:8030")
.put("username", "root")
.put("password", "")
.put("table.create.properties.replication_num", "1")
.put("sink.properties.format", "json")
.build());
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
conf, conf, Thread.currentThread().getContextClassLoader()));
Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class);
}
}

@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink;
@ -41,6 +42,7 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory {
@Override
public DataSource createDataSource(Context context) {
FactoryHelper.createFactoryHelper(this, context).validate();
ValuesDataSourceHelper.EventSetId eventType =
context.getFactoryConfiguration().get(ValuesDataSourceOptions.EVENT_SET_ID);
int failAtPos =
@ -51,6 +53,7 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context).validate();
return new ValuesDataSink(
context.getFactoryConfiguration().get(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY),
context.getFactoryConfiguration().get(ValuesDataSinkOptions.PRINT_ENABLED),
@ -73,6 +76,8 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory {
options.add(ValuesDataSourceOptions.EVENT_SET_ID);
options.add(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX);
options.add(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY);
options.add(ValuesDataSinkOptions.PRINT_ENABLED);
options.add(ValuesDataSinkOptions.SINK_API);
return options;
}
}

Loading…
Cancel
Save