[FLINK-36805][cdc-common] Add interface to support encryption of sensitive configuration items and provide base64 encoding implementation

pull/3829/head
jzjsnow 1 month ago
parent 8815f2b879
commit cce014f31d

@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.shade.utils.ConfigShadeUtils;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.ModelDef;
@ -114,7 +115,6 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
throws Exception {
// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
// it's not of plain data types and must be removed before calling toPipelineConfig.
List<UdfDef> udfDefs = new ArrayList<>();
@ -135,6 +135,14 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);
// Decrypt configurations if specified
pipelineDefJsonNode = ConfigShadeUtils.decryptConfig(pipelineDefJsonNode, pipelineConfig);
// Source is required
SourceDef sourceDef =
toSourceDef(
@ -165,11 +173,6 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));
// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);
return new PipelineDef(
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
}

@ -20,6 +20,7 @@ package org.apache.flink.cdc.cli.parser;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.shade.utils.ConfigShadeUtils;
import org.apache.flink.cdc.composer.definition.ModelDef;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
@ -32,6 +33,7 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.net.URL;
@ -57,6 +59,10 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
/** Unit test for {@link org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser}. */
class YamlPipelineDefinitionParserTest {
private static final String USERNAME = "flinkcdc";
private static final String PASSWORD = "flinkcdc_password";
@Test
void testParsingFullDefinition() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");
@ -81,6 +87,25 @@ class YamlPipelineDefinitionParserTest {
assertThat(pipelineDef).isEqualTo(minimizedDef);
}
@Test
public void testDecryptOptions() {
String encryptUsername = "ZmxpbmtjZGM=";
String encryptPassword = "ZmxpbmtjZGNfcGFzc3dvcmQ=";
String decryptUsername = ConfigShadeUtils.decryptOption("base64", encryptUsername);
String decryptPassword = ConfigShadeUtils.decryptOption("base64", encryptPassword);
Assertions.assertEquals(decryptUsername, USERNAME);
Assertions.assertEquals(decryptPassword, PASSWORD);
}
@Test
void testParsingBase64EncodedDefinition() throws Exception {
URL resource =
Resources.getResource("definitions/pipeline-definition-with-base64-encoded.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
assertThat(pipelineDef).isEqualTo(decryptedDefWithBase64Encode);
}
@Test
void testOverridingGlobalConfig() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");
@ -368,6 +393,41 @@ class YamlPipelineDefinitionParserTest {
.put("schema-operator.rpc-timeout", "1 h")
.build()));
private final PipelineDef decryptedDefWithBase64Encode =
new PipelineDef(
new SourceDef(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "password1")
.put("tables", "replication.cluster")
.build())),
new SinkDef(
"doris",
"sink-queue",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("fenodes", "localhost:8035")
.put("username", "root")
.put("password", "password2")
.build())),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("schema.change.behavior", "evolve")
.put("schema-operator.rpc-timeout", "1 h")
.put("shade.identifier", "base64")
.put("shade.sensitive.keywords", "password;username")
.build()));
@Test
void testParsingFullDefinitionFromString() throws Exception {
String pipelineDefText =

@ -0,0 +1,39 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source:
type: mysql
name: source-database
host: localhost
port: 3306
username: YWRtaW4=
password: cGFzc3dvcmQx
tables: replication.cluster
sink:
type: doris
name: sink-queue
fenodes: localhost:8035
username: cm9vdA==
password: cGFzc3dvcmQy
pipeline:
name: source-database-sync-pipe
parallelism: 4
schema.change.behavior: evolve
schema-operator.rpc-timeout: 1 h
shade.identifier: base64
shade.sensitive.keywords: password;username

@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.configuration.description.Description;
import org.apache.flink.cdc.common.configuration.description.ListElement;
import java.time.Duration;
import java.util.List;
import static org.apache.flink.cdc.common.configuration.description.TextElement.text;
@ -100,5 +101,20 @@ public class PipelineOptions {
.withDescription(
"The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes.");
public static final ConfigOption<String> SHADE_IDENTIFIER_OPTION =
ConfigOptions.key("shade.identifier")
.stringType()
.defaultValue("default")
.withDescription(
"The identifier of the encryption method for decryption. Defaults to \"default\", indicating no encryption");
public static final ConfigOption<List<String>> SHADE_SENSITIVE_KEYWORDS =
ConfigOptions.key("shade.sensitive.keywords")
.stringType()
.asList()
.defaultValues("password")
.withDescription(
"A semicolon-separated list of keywords of the configuration items to be decrypted.");
private PipelineOptions() {}
}

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.shade;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The interface that provides the ability to decrypt {@link
* org.apache.flink.cdc.composer.definition}.
*/
@PublicEvolving
public interface ConfigShade {
Logger LOG = LoggerFactory.getLogger(ConfigShade.class);
/**
* Initializes the custom instance using the pipeline configuration.
*
* <p>This method can be useful when decryption requires an external file (e.g. a key file)
* defined in the pipeline configs.
*/
default void initialize(Configuration pipelineConfig) throws Exception {}
/**
* The unique identifier of the current interface, used it to select the correct {@link
* ConfigShade}.
*/
String getIdentifier();
/**
* Decrypt the content.
*
* @param content The content to decrypt
*/
String decrypt(String content);
}

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.shade.impl;
import org.apache.flink.cdc.common.shade.ConfigShade;
import java.util.Base64;
/** Base64 ConfigShade. */
public class Base64ConfigShade implements ConfigShade {
private static final Base64.Decoder DECODER = Base64.getDecoder();
private static final String IDENTIFIER = "base64";
@Override
public String getIdentifier() {
return IDENTIFIER;
}
@Override
public String decrypt(String content) {
return new String(DECODER.decode(content));
}
}

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.shade.utils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.shade.ConfigShade;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.BiFunction;
/** Config shade utilities. */
public final class ConfigShadeUtils {
private static final Logger LOG = LoggerFactory.getLogger(ConfigShadeUtils.class);
private static final Map<String, ConfigShade> CONFIG_SHADES = new HashMap<>();
private static final ConfigShade DEFAULT_SHADE = new DefaultConfigShade();
private static final String SOURCE_KEY = "source";
private static final String SINK_KEY = "sink";
static {
ServiceLoader<ConfigShade> serviceLoader = ServiceLoader.load(ConfigShade.class);
Iterator<ConfigShade> it = serviceLoader.iterator();
it.forEachRemaining(
configShade -> {
CONFIG_SHADES.put(configShade.getIdentifier(), configShade);
});
LOG.info("Load config shade spi: {}", CONFIG_SHADES.keySet());
}
@VisibleForTesting
public static String decryptOption(String identifier, String content) {
ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
return configShade.decrypt(content);
}
public static JsonNode decryptConfig(JsonNode root, Configuration pipelineConfig)
throws Exception {
String identifier = pipelineConfig.get(PipelineOptions.SHADE_IDENTIFIER_OPTION);
List<String> sensitiveOptions =
pipelineConfig.get(PipelineOptions.SHADE_SENSITIVE_KEYWORDS);
return decryptConfig(identifier, root, sensitiveOptions, pipelineConfig);
}
@SuppressWarnings("unchecked")
public static JsonNode decryptConfig(
String identifier,
JsonNode root,
List<String> sensitiveOptions,
Configuration pipelineConfig)
throws Exception {
ConfigShade configShade = CONFIG_SHADES.get(identifier);
if (configShade == null) {
LOG.error("Can not find config shade: {}", identifier);
throw new IllegalStateException("Can not find config shade: " + identifier);
}
configShade.initialize(pipelineConfig);
if (DEFAULT_SHADE.getIdentifier().equals(configShade.getIdentifier())) {
return root;
}
LOG.info("Use config shade: {}", identifier);
BiFunction<String, Object, String> processFunction =
(key, value) -> configShade.decrypt(value.toString());
ObjectNode jsonNodes = (ObjectNode) root;
Map<String, Object> configMap = JsonUtils.toMap(jsonNodes);
Map<String, Object> source = (Map<String, Object>) configMap.get(SOURCE_KEY);
Map<String, Object> sink = (Map<String, Object>) configMap.get(SINK_KEY);
Preconditions.checkArgument(
!source.isEmpty(), "Miss <Source> config! Please check the config file.");
Preconditions.checkArgument(
!sink.isEmpty(), "Miss <Sink> config! Please check the config file.");
for (String sensitiveOption : sensitiveOptions) {
source.computeIfPresent(sensitiveOption, processFunction);
sink.computeIfPresent(sensitiveOption, processFunction);
}
LOG.info("{} for source/sink has been decrypted and refreshed", sensitiveOptions);
configMap.put(SOURCE_KEY, source);
configMap.put(SINK_KEY, sink);
return JsonUtils.toJsonNode(configMap);
}
/** Default ConfigShade. */
public static class DefaultConfigShade implements ConfigShade {
private static final String IDENTIFIER = "default";
@Override
public String getIdentifier() {
return IDENTIFIER;
}
@Override
public String decrypt(String content) {
return content;
}
}
}

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.shade.utils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.util.Map;
import java.util.TimeZone;
import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
/** {@link JsonNode} utility methods. */
public class JsonUtils {
/** can use static singleton, inject: just make sure to reuse! */
private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
.setTimeZone(TimeZone.getDefault())
// support java8 time api
.registerModule(new JavaTimeModule());
private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new ObjectMapper();
public static Map<String, Object> toMap(JsonNode jsonNode) {
return DEFAULT_OBJECT_MAPPER.convertValue(
jsonNode, new TypeReference<Map<String, Object>>() {});
}
public static JsonNode toJsonNode(Object obj) {
return OBJECT_MAPPER.valueToTree(obj);
}
}

@ -0,0 +1,16 @@
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.cdc.common.shade.utils.ConfigShadeUtils$DefaultConfigShade
org.apache.flink.cdc.common.shade.impl.Base64ConfigShade
Loading…
Cancel
Save