diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index b594aa35b..acfd5b861 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.shade.utils.ConfigShadeUtils; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.composer.definition.ModelDef; @@ -114,7 +115,6 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig) throws Exception { - // UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since // it's not of plain data types and must be removed before calling toPipelineConfig. List udfDefs = new ArrayList<>(); @@ -135,6 +135,14 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { SchemaChangeBehavior schemaChangeBehavior = userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR); + // Merge user config into global config + Configuration pipelineConfig = new Configuration(); + pipelineConfig.addAll(globalPipelineConfig); + pipelineConfig.addAll(userPipelineConfig); + + // Decrypt configurations if specified + pipelineDefJsonNode = ConfigShadeUtils.decryptConfig(pipelineDefJsonNode, pipelineConfig); + // Source is required SourceDef sourceDef = toSourceDef( @@ -165,11 +173,6 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY)) .ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route)))); - // Merge user config into global config - Configuration pipelineConfig = new Configuration(); - pipelineConfig.addAll(globalPipelineConfig); - pipelineConfig.addAll(userPipelineConfig); - return new PipelineDef( sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig); } diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 27cbbb1de..90ffaf159 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.cli.parser; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.shade.utils.ConfigShadeUtils; import org.apache.flink.cdc.composer.definition.ModelDef; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.cdc.composer.definition.RouteDef; @@ -32,6 +33,7 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet; import org.apache.flink.shaded.guava31.com.google.common.io.Resources; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.net.URL; @@ -57,6 +59,10 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; /** Unit test for {@link org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser}. */ class YamlPipelineDefinitionParserTest { + private static final String USERNAME = "flinkcdc"; + + private static final String PASSWORD = "flinkcdc_password"; + @Test void testParsingFullDefinition() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml"); @@ -81,6 +87,25 @@ class YamlPipelineDefinitionParserTest { assertThat(pipelineDef).isEqualTo(minimizedDef); } + @Test + public void testDecryptOptions() { + String encryptUsername = "ZmxpbmtjZGM="; + String encryptPassword = "ZmxpbmtjZGNfcGFzc3dvcmQ="; + String decryptUsername = ConfigShadeUtils.decryptOption("base64", encryptUsername); + String decryptPassword = ConfigShadeUtils.decryptOption("base64", encryptPassword); + Assertions.assertEquals(decryptUsername, USERNAME); + Assertions.assertEquals(decryptPassword, PASSWORD); + } + + @Test + void testParsingBase64EncodedDefinition() throws Exception { + URL resource = + Resources.getResource("definitions/pipeline-definition-with-base64-encoded.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + assertThat(pipelineDef).isEqualTo(decryptedDefWithBase64Encode); + } + @Test void testOverridingGlobalConfig() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml"); @@ -368,6 +393,41 @@ class YamlPipelineDefinitionParserTest { .put("schema-operator.rpc-timeout", "1 h") .build())); + private final PipelineDef decryptedDefWithBase64Encode = + new PipelineDef( + new SourceDef( + "mysql", + "source-database", + Configuration.fromMap( + ImmutableMap.builder() + .put("host", "localhost") + .put("port", "3306") + .put("username", "admin") + .put("password", "password1") + .put("tables", "replication.cluster") + .build())), + new SinkDef( + "doris", + "sink-queue", + Configuration.fromMap( + ImmutableMap.builder() + .put("fenodes", "localhost:8035") + .put("username", "root") + .put("password", "password2") + .build())), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Configuration.fromMap( + ImmutableMap.builder() + .put("name", "source-database-sync-pipe") + .put("parallelism", "4") + .put("schema.change.behavior", "evolve") + .put("schema-operator.rpc-timeout", "1 h") + .put("shade.identifier", "base64") + .put("shade.sensitive.keywords", "password;username") + .build())); + @Test void testParsingFullDefinitionFromString() throws Exception { String pipelineDefText = diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-base64-encoded.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-base64-encoded.yaml new file mode 100644 index 000000000..9713dae26 --- /dev/null +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-base64-encoded.yaml @@ -0,0 +1,39 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +source: + type: mysql + name: source-database + host: localhost + port: 3306 + username: YWRtaW4= + password: cGFzc3dvcmQx + tables: replication.cluster + +sink: + type: doris + name: sink-queue + fenodes: localhost:8035 + username: cm9vdA== + password: cGFzc3dvcmQy + +pipeline: + name: source-database-sync-pipe + parallelism: 4 + schema.change.behavior: evolve + schema-operator.rpc-timeout: 1 h + shade.identifier: base64 + shade.sensitive.keywords: password;username \ No newline at end of file diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 343e99270..2b8e935ff 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.configuration.description.Description; import org.apache.flink.cdc.common.configuration.description.ListElement; import java.time.Duration; +import java.util.List; import static org.apache.flink.cdc.common.configuration.description.TextElement.text; @@ -100,5 +101,20 @@ public class PipelineOptions { .withDescription( "The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes."); + public static final ConfigOption SHADE_IDENTIFIER_OPTION = + ConfigOptions.key("shade.identifier") + .stringType() + .defaultValue("default") + .withDescription( + "The identifier of the encryption method for decryption. Defaults to \"default\", indicating no encryption"); + + public static final ConfigOption> SHADE_SENSITIVE_KEYWORDS = + ConfigOptions.key("shade.sensitive.keywords") + .stringType() + .asList() + .defaultValues("password") + .withDescription( + "A semicolon-separated list of keywords of the configuration items to be decrypted."); + private PipelineOptions() {} } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/ConfigShade.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/ConfigShade.java new file mode 100644 index 000000000..78b5032ac --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/ConfigShade.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.shade; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The interface that provides the ability to decrypt {@link + * org.apache.flink.cdc.composer.definition}. + */ +@PublicEvolving +public interface ConfigShade { + Logger LOG = LoggerFactory.getLogger(ConfigShade.class); + + /** + * Initializes the custom instance using the pipeline configuration. + * + *

This method can be useful when decryption requires an external file (e.g. a key file) + * defined in the pipeline configs. + */ + default void initialize(Configuration pipelineConfig) throws Exception {} + + /** + * The unique identifier of the current interface, used it to select the correct {@link + * ConfigShade}. + */ + String getIdentifier(); + + /** + * Decrypt the content. + * + * @param content The content to decrypt + */ + String decrypt(String content); +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/impl/Base64ConfigShade.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/impl/Base64ConfigShade.java new file mode 100644 index 000000000..9704f54a2 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/impl/Base64ConfigShade.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.shade.impl; + +import org.apache.flink.cdc.common.shade.ConfigShade; + +import java.util.Base64; + +/** Base64 ConfigShade. */ +public class Base64ConfigShade implements ConfigShade { + + private static final Base64.Decoder DECODER = Base64.getDecoder(); + + private static final String IDENTIFIER = "base64"; + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String decrypt(String content) { + return new String(DECODER.decode(content)); + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/utils/ConfigShadeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/utils/ConfigShadeUtils.java new file mode 100644 index 000000000..223ab0b91 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/utils/ConfigShadeUtils.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.shade.utils; + +import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.shade.ConfigShade; +import org.apache.flink.cdc.common.utils.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.function.BiFunction; + +/** Config shade utilities. */ +public final class ConfigShadeUtils { + private static final Logger LOG = LoggerFactory.getLogger(ConfigShadeUtils.class); + + private static final Map CONFIG_SHADES = new HashMap<>(); + + private static final ConfigShade DEFAULT_SHADE = new DefaultConfigShade(); + private static final String SOURCE_KEY = "source"; + private static final String SINK_KEY = "sink"; + + static { + ServiceLoader serviceLoader = ServiceLoader.load(ConfigShade.class); + Iterator it = serviceLoader.iterator(); + it.forEachRemaining( + configShade -> { + CONFIG_SHADES.put(configShade.getIdentifier(), configShade); + }); + LOG.info("Load config shade spi: {}", CONFIG_SHADES.keySet()); + } + + @VisibleForTesting + public static String decryptOption(String identifier, String content) { + ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE); + return configShade.decrypt(content); + } + + public static JsonNode decryptConfig(JsonNode root, Configuration pipelineConfig) + throws Exception { + String identifier = pipelineConfig.get(PipelineOptions.SHADE_IDENTIFIER_OPTION); + List sensitiveOptions = + pipelineConfig.get(PipelineOptions.SHADE_SENSITIVE_KEYWORDS); + return decryptConfig(identifier, root, sensitiveOptions, pipelineConfig); + } + + @SuppressWarnings("unchecked") + public static JsonNode decryptConfig( + String identifier, + JsonNode root, + List sensitiveOptions, + Configuration pipelineConfig) + throws Exception { + ConfigShade configShade = CONFIG_SHADES.get(identifier); + if (configShade == null) { + LOG.error("Can not find config shade: {}", identifier); + throw new IllegalStateException("Can not find config shade: " + identifier); + } + configShade.initialize(pipelineConfig); + + if (DEFAULT_SHADE.getIdentifier().equals(configShade.getIdentifier())) { + return root; + } + + LOG.info("Use config shade: {}", identifier); + BiFunction processFunction = + (key, value) -> configShade.decrypt(value.toString()); + ObjectNode jsonNodes = (ObjectNode) root; + Map configMap = JsonUtils.toMap(jsonNodes); + Map source = (Map) configMap.get(SOURCE_KEY); + Map sink = (Map) configMap.get(SINK_KEY); + Preconditions.checkArgument( + !source.isEmpty(), "Miss config! Please check the config file."); + Preconditions.checkArgument( + !sink.isEmpty(), "Miss config! Please check the config file."); + + for (String sensitiveOption : sensitiveOptions) { + source.computeIfPresent(sensitiveOption, processFunction); + sink.computeIfPresent(sensitiveOption, processFunction); + } + LOG.info("{} for source/sink has been decrypted and refreshed", sensitiveOptions); + + configMap.put(SOURCE_KEY, source); + configMap.put(SINK_KEY, sink); + return JsonUtils.toJsonNode(configMap); + } + + /** Default ConfigShade. */ + public static class DefaultConfigShade implements ConfigShade { + private static final String IDENTIFIER = "default"; + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String decrypt(String content) { + return content; + } + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/utils/JsonUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/utils/JsonUtils.java new file mode 100644 index 000000000..959243b6a --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/shade/utils/JsonUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.shade.utils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import java.util.Map; +import java.util.TimeZone; + +import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; +import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; +import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; + +/** {@link JsonNode} utility methods. */ +public class JsonUtils { + /** can use static singleton, inject: just make sure to reuse! */ + private static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper() + .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) + .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) + .configure(REQUIRE_SETTERS_FOR_GETTERS, true) + .setTimeZone(TimeZone.getDefault()) + // support java8 time api + .registerModule(new JavaTimeModule()); + + private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new ObjectMapper(); + + public static Map toMap(JsonNode jsonNode) { + return DEFAULT_OBJECT_MAPPER.convertValue( + jsonNode, new TypeReference>() {}); + } + + public static JsonNode toJsonNode(Object obj) { + return OBJECT_MAPPER.valueToTree(obj); + } +} diff --git a/flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.shade.ConfigShade b/flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.shade.ConfigShade new file mode 100644 index 000000000..ff1c7b1ab --- /dev/null +++ b/flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.shade.ConfigShade @@ -0,0 +1,16 @@ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.cdc.common.shade.utils.ConfigShadeUtils$DefaultConfigShade +org.apache.flink.cdc.common.shade.impl.Base64ConfigShade \ No newline at end of file