[FLINK-35647][route] Support symbol replacement to enrich routing rules

This closes #3428.

Co-authored-by: 张田 <zhang_tian@inspur.com>
Co-authored-by: yangshuaitong <duguhoney@gmail.com>
pull/3429/head
yuxiqian 7 months ago committed by GitHub
parent 302a691225
commit ad1f554c0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -30,11 +30,12 @@ under the License.
# Parameters
To describe a route, the follows are required:
| parameter | meaning | optional/required |
|--------------|----------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| sink-table | Sink table id, supports regular expressions | required |
| description | Routing rule description(a default value provided) | optional |
| parameter | meaning | optional/required |
|----------------|---------------------------------------------------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| sink-table | Sink table id, supports symbol replacement | required |
| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional |
| description | Routing rule description(a default value provided) | optional |
A route module can contain a list of source-table/sink-table rules.
@ -71,4 +72,18 @@ route:
- source-table: mydb.products
sink-table: ods_db.ods_products
description: sync products table to ods_products
```
```
## Pattern Replacement in routing rules
If you'd like to route source tables and rename them to sink tables with specific patterns, `replace-symbol` could be used to resemble source table names like this:
```yaml
route:
- source-table: source_db.\.*
sink-table: sink_db.<>
replace-symbol: <>
description: route all tables in source_db to sink_db
```
Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.

@ -30,11 +30,12 @@ under the License.
# Parameters
To describe a route, the follows are required:
| parameter | meaning | optional/required |
|--------------|----------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| sink-table | Sink table id, supports regular expressions | required |
| description | Routing rule description(a default value provided) | optional |
| parameter | meaning | optional/required |
|----------------|---------------------------------------------------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| sink-table | Sink table id, supports symbol replacement | required |
| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional |
| description | Routing rule description(a default value provided) | optional |
A route module can contain a list of source-table/sink-table rules.
@ -71,4 +72,18 @@ route:
- source-table: mydb.products
sink-table: ods_db.ods_products
description: sync products table to ods_products
```
```
## Pattern Replacement in routing rules
If you'd like to route source tables and rename them to sink tables with specific patterns, `replace-symbol` could be used to resemble source table names like this:
```yaml
route:
- source-table: source_db.\.*
sink-table: sink_db.<>
replace-symbol: <>
description: route all tables in source_db to sink_db
```
Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.

@ -55,6 +55,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// Route keys
private static final String ROUTE_SOURCE_TABLE_KEY = "source-table";
private static final String ROUTE_SINK_TABLE_KEY = "sink-table";
private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol";
private static final String ROUTE_DESCRIPTION_KEY = "description";
// Transform keys
@ -164,11 +165,15 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
"Missing required field \"%s\" in route configuration",
ROUTE_SINK_TABLE_KEY)
.asText();
String replaceSymbol =
Optional.ofNullable(routeNode.get(ROUTE_REPLACE_SYMBOL))
.map(JsonNode::asText)
.orElse(null);
String description =
Optional.ofNullable(routeNode.get(ROUTE_DESCRIPTION_KEY))
.map(JsonNode::asText)
.orElse(null);
return new RouteDef(sourceTable, sinkTable, description);
return new RouteDef(sourceTable, sinkTable, replaceSymbol, description);
}
private TransformDef toTransformDef(JsonNode transformNode) {

@ -166,6 +166,15 @@ class YamlPipelineDefinitionParserTest {
+ "Or use 'UTC' without time zone and daylight saving time.");
}
@Test
void testRouteWithReplacementSymbol() throws Exception {
URL resource =
Resources.getResource("definitions/pipeline-definition-full-with-repsym.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
assertThat(pipelineDef).isEqualTo(fullDefWithRouteRepSym);
}
private final PipelineDef fullDef =
new PipelineDef(
new SourceDef(
@ -197,10 +206,12 @@ class YamlPipelineDefinitionParserTest {
new RouteDef(
"mydb.default.app_order_.*",
"odsdb.default.app_order",
null,
"sync all sharding tables to one"),
new RouteDef(
"mydb.default.web_order",
"odsdb.default.ods_web_order",
null,
"sync table to with given prefix ods_")),
Arrays.asList(
new TransformDef(
@ -258,10 +269,12 @@ class YamlPipelineDefinitionParserTest {
new RouteDef(
"mydb.default.app_order_.*",
"odsdb.default.app_order",
null,
"sync all sharding tables to one"),
new RouteDef(
"mydb.default.web_order",
"odsdb.default.ods_web_order",
null,
"sync table to with given prefix ods_")),
Arrays.asList(
new TransformDef(
@ -312,7 +325,10 @@ class YamlPipelineDefinitionParserTest {
.build())),
Collections.singletonList(
new RouteDef(
"mydb.default.app_order_.*", "odsdb.default.app_order", null)),
"mydb.default.app_order_.*",
"odsdb.default.app_order",
null,
null)),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
@ -326,4 +342,67 @@ class YamlPipelineDefinitionParserTest {
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(Collections.singletonMap("parallelism", "1")));
private final PipelineDef fullDefWithRouteRepSym =
new PipelineDef(
new SourceDef(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "pass")
.put(
"tables",
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.build())),
new SinkDef(
"kafka",
"sink-queue",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("bootstrap-servers", "localhost:9092")
.put("auto-create-table", "true")
.build())),
Arrays.asList(
new RouteDef(
"mydb.default.app_order_.*",
"odsdb.default.app_order_<>",
"<>",
"sync all sharding tables to one"),
new RouteDef(
"mydb.default.web_order",
"odsdb.default.ods_web_order_>_<",
">_<",
"sync table to with given prefix ods_")),
Arrays.asList(
new TransformDef(
"mydb.app_order_.*",
"id, order_id, TO_UPPER(product_name)",
"id > 10 AND order_id > 100",
"id",
"product_name",
"comment=app order",
"project fields from source table"),
new TransformDef(
"mydb.web_order_.*",
"CONCAT(id, order_id) as uniq_id, *",
"uniq_id > 10",
null,
null,
null,
"add new uniq_id for each row")),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("schema.change.behavior", "evolve")
.put("schema-operator.rpc-timeout", "1 h")
.build()));
}

@ -0,0 +1,61 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source:
type: mysql
name: source-database
host: localhost
port: 3306
username: admin
password: pass
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*
chunk-column: app_order_.*:id,web_order:product_id
capture-new-tables: true
sink:
type: kafka
name: sink-queue
bootstrap-servers: localhost:9092
auto-create-table: true
route:
- source-table: mydb.default.app_order_.*
sink-table: odsdb.default.app_order_<>
replace-symbol: "<>"
description: sync all sharding tables to one
- source-table: mydb.default.web_order
sink-table: odsdb.default.ods_web_order_>_<
replace-symbol: ">_<"
description: sync table to with given prefix ods_
transform:
- source-table: mydb.app_order_.*
projection: id, order_id, TO_UPPER(product_name)
filter: id > 10 AND order_id > 100
primary-keys: id
partition-keys: product_name
table-options: comment=app order
description: project fields from source table
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
description: add new uniq_id for each row
pipeline:
name: source-database-sync-pipe
parallelism: 4
schema.change.behavior: evolve
schema-operator.rpc-timeout: 1 h

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.route;
import java.io.Serializable;
/** Definition of a routing rule with replacement symbol. */
public class RouteRule implements Serializable {
private static final long serialVersionUID = 1L;
public RouteRule(String sourceTable, String sinkTable, String replaceSymbol) {
this.sourceTable = sourceTable;
this.sinkTable = sinkTable;
this.replaceSymbol = replaceSymbol;
}
public String sourceTable;
public String sinkTable;
public String replaceSymbol;
}

@ -36,11 +36,17 @@ import java.util.Optional;
public class RouteDef {
private final String sourceTable;
private final String sinkTable;
private final String replaceSymbol;
@Nullable private final String description;
public RouteDef(String sourceTable, String sinkTable, @Nullable String description) {
public RouteDef(
String sourceTable,
String sinkTable,
@Nullable String replaceSymbol,
@Nullable String description) {
this.sourceTable = sourceTable;
this.sinkTable = sinkTable;
this.replaceSymbol = replaceSymbol;
this.description = description;
}
@ -52,6 +58,10 @@ public class RouteDef {
return sinkTable;
}
public Optional<String> getReplaceSymbol() {
return Optional.ofNullable(replaceSymbol);
}
public Optional<String> getDescription() {
return Optional.ofNullable(description);
}
@ -63,6 +73,8 @@ public class RouteDef {
+ sourceTable
+ ", sinkTable="
+ sinkTable
+ ", replaceSymbol="
+ replaceSymbol
+ ", description='"
+ description
+ '\''
@ -80,11 +92,12 @@ public class RouteDef {
RouteDef routeDef = (RouteDef) o;
return Objects.equals(sourceTable, routeDef.sourceTable)
&& Objects.equals(sinkTable, routeDef.sinkTable)
&& Objects.equals(replaceSymbol, routeDef.replaceSymbol)
&& Objects.equals(description, routeDef.description);
}
@Override
public int hashCode() {
return Objects.hash(sourceTable, sinkTable, description);
return Objects.hash(sourceTable, sinkTable, replaceSymbol, description);
}
}

@ -17,12 +17,11 @@
package org.apache.flink.cdc.composer.flink.translator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
@ -80,10 +79,13 @@ public class SchemaOperatorTranslator {
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
List<Tuple2<String, TableId>> routingRules = new ArrayList<>();
List<RouteRule> routingRules = new ArrayList<>();
for (RouteDef route : routes) {
routingRules.add(
Tuple2.of(route.getSourceTable(), TableId.parse(route.getSinkTable())));
new RouteRule(
route.getSourceTable(),
route.getSinkTable(),
route.getReplaceSymbol().orElse(null)));
}
SingleOutputStreamOperator<Event> stream =
input.transform(

@ -424,8 +424,8 @@ class FlinkPipelineComposerITCase {
TableId routedTable2 = TableId.tableId("default_namespace", "default_schema", "routed2");
List<RouteDef> routeDef =
Arrays.asList(
new RouteDef(TABLE_1.toString(), routedTable1.toString(), null),
new RouteDef(TABLE_2.toString(), routedTable2.toString(), null));
new RouteDef(TABLE_1.toString(), routedTable1.toString(), null, null),
new RouteDef(TABLE_2.toString(), routedTable2.toString(), null, null));
// Setup pipeline
Configuration pipelineConfig = new Configuration();
@ -616,6 +616,7 @@ class FlinkPipelineComposerITCase {
new RouteDef(
"default_namespace.default_schema.mytable[0-9]",
mergedTable.toString(),
null,
null));
// Setup pipeline
@ -657,4 +658,62 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, null, 24, null, Eliza, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}");
}
@ParameterizedTest
@EnumSource
void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.singletonList(
new RouteDef(
"default_namespace.default_schema.table[0-9]",
"replaced_namespace.replaced_schema.__$__",
"__$__",
null)),
Collections.emptyList(),
pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=replaced_namespace.replaced_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=replaced_namespace.replaced_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=replaced_namespace.replaced_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}
}

@ -624,6 +624,102 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}");
}
@Test
public void testReplacementSymbol() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "route:\n"
+ " - source-table: %s.\\.*\n"
+ " sink-table: NEW_%s.NEW_<>\n"
+ " replace-symbol: <>\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
routeTestDatabase.getDatabaseName()));
waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
routeTestDatabase.getDatabaseName()));
waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEGAMMA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
routeTestDatabase.getDatabaseName()));
waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
routeTestDatabase.getDatabaseName()));
validateResult(
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[1008, 8], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[1009, 8.1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[1010, 10], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[1011, 11], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[2011, 11], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[2012, 12], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[2013, 13], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[2014, 14], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3015, Amber], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3016, Black], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3017, Cyan], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3018, Denim], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4019, Yosemite], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4020, El Capitan], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4021, Sierra], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4022, High Sierra], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4023, Mojave], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4024, Catalina], op=INSERT, meta=()}");
LOG.info("Begin incremental reading stage.");
generateIncrementalChanges();
validateResult(
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[3007, 7], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[2014, 14], after=[2014, 2014], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3019, Emerald], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[4024, Catalina], after=[], op=DELETE, meta=()}");
generateSchemaChanges();
validateResult(
"AddColumnEvent{tableId=NEW_%s.NEW_TABLEALPHA, addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}",
"DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, droppedColumnNames=[VERSION]}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}");
}
private void validateResult(String... expectedEvents) throws Exception {
for (String event : expectedEvents) {
waitUntilSpecificEvent(String.format(event, routeTestDatabase.getDatabaseName()));

@ -17,7 +17,7 @@
package org.apache.flink.cdc.runtime.operators.schema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
@ -70,6 +71,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@ -88,22 +90,33 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
private final List<Tuple2<String, TableId>> routingRules;
private final List<RouteRule> routingRules;
/**
* Storing route source table selector, sink table name (before symbol replacement), and replace
* symbol in a tuple.
*/
private transient List<Tuple3<Selectors, String, String>> routes;
private transient List<Tuple2<Selectors, TableId>> routes;
private transient TaskOperatorEventGateway toCoordinator;
private transient SchemaEvolutionClient schemaEvolutionClient;
private transient LoadingCache<TableId, Schema> cachedSchemas;
/**
* Storing mapping relations between upstream tableId (source table) mapping to downstream
* tableIds (sink tables).
*/
private transient LoadingCache<TableId, List<TableId>> tableIdMappingCache;
private final long rpcTimeOutInMillis;
public SchemaOperator(List<Tuple2<String, TableId>> routingRules) {
public SchemaOperator(List<RouteRule> routingRules) {
this.routingRules = routingRules;
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
}
public SchemaOperator(List<Tuple2<String, TableId>> routingRules, Duration rpcTimeOut) {
public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) {
this.routingRules = routingRules;
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
@ -119,14 +132,14 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
routes =
routingRules.stream()
.map(
tuple2 -> {
String tableInclusions = tuple2.f0;
TableId replaceBy = tuple2.f1;
rule -> {
String tableInclusions = rule.sourceTable;
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables(tableInclusions)
.build();
return new Tuple2<>(selectors, replaceBy);
return new Tuple3<>(
selectors, rule.sinkTable, rule.replaceSymbol);
})
.collect(Collectors.toList());
schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, getOperatorID());
@ -140,6 +153,16 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
return getLatestSchema(tableId);
}
});
tableIdMappingCache =
CacheBuilder.newBuilder()
.expireAfterAccess(CACHE_EXPIRE_DURATION)
.build(
new CacheLoader<TableId, List<TableId>>() {
@Override
public List<TableId> load(TableId tableId) {
return getRoutedTables(tableId);
}
});
}
@Override
@ -158,7 +181,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
*/
@Override
public void processElement(StreamRecord<Event> streamRecord)
throws InterruptedException, TimeoutException {
throws InterruptedException, TimeoutException, ExecutionException {
Event event = streamRecord.getValue();
// Schema changes
if (event instanceof SchemaChangeEvent) {
@ -169,15 +192,15 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);
// Update caches
cachedSchemas.put(tableId, getLatestSchema(tableId));
getRoutedTables(tableId)
tableIdMappingCache
.get(tableId)
.forEach(routed -> cachedSchemas.put(routed, getLatestSchema(routed)));
return;
}
// Data changes
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
TableId tableId = dataChangeEvent.tableId();
List<TableId> optionalRoutedTable = getRoutedTables(tableId);
List<TableId> optionalRoutedTable = tableIdMappingCache.get(dataChangeEvent.tableId());
if (optionalRoutedTable.isEmpty()) {
output.collect(streamRecord);
} else {
@ -270,10 +293,18 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private List<TableId> getRoutedTables(TableId originalTableId) {
return routes.stream()
.filter(route -> route.f0.isMatch(originalTableId))
.map(route -> route.f1)
.map(route -> resolveReplacement(originalTableId, route))
.collect(Collectors.toList());
}
private TableId resolveReplacement(
TableId originalTable, Tuple3<Selectors, String, String> route) {
if (route.f2 != null) {
return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName()));
}
return TableId.parse(route.f1);
}
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent)
throws InterruptedException, TimeoutException {
// The request will need to send a FlushEvent or block until flushing finished

@ -17,10 +17,9 @@
package org.apache.flink.cdc.runtime.operators.schema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
@ -40,12 +39,10 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory<Event>
private static final long serialVersionUID = 1L;
private final MetadataApplier metadataApplier;
private final List<Tuple2<String, TableId>> routingRules;
private final List<RouteRule> routingRules;
public SchemaOperatorFactory(
MetadataApplier metadataApplier,
List<Tuple2<String, TableId>> routingRules,
Duration rpcTimeOut) {
MetadataApplier metadataApplier, List<RouteRule> routingRules, Duration rpcTimeOut) {
super(new SchemaOperator(routingRules, rpcTimeOut));
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;

@ -17,7 +17,7 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
@ -48,19 +49,37 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/** Derive schema changes based on the routing rules. */
public class SchemaDerivation {
private final SchemaManager schemaManager;
private final List<Tuple2<Selectors, TableId>> routes;
private final Map<TableId, Set<TableId>> derivationMapping;
/**
* Storing route source table selector, sink table name (before symbol replacement), and replace
* symbol in a tuple.
*/
private transient List<Tuple3<Selectors, String, String>> routes;
public SchemaDerivation(
SchemaManager schemaManager,
List<Tuple2<Selectors, TableId>> routes,
List<RouteRule> routeRules,
Map<TableId, Set<TableId>> derivationMapping) {
this.schemaManager = schemaManager;
this.routes = routes;
this.routes =
routeRules.stream()
.map(
rule -> {
String tableInclusions = rule.sourceTable;
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables(tableInclusions)
.build();
return new Tuple3<>(
selectors, rule.sinkTable, rule.replaceSymbol);
})
.collect(Collectors.toList());
this.derivationMapping = derivationMapping;
}
@ -69,7 +88,7 @@ public class SchemaDerivation {
TableId originalTable = schemaChangeEvent.tableId();
boolean noRouteMatched = true;
for (Tuple2<Selectors, TableId> route : routes) {
for (Tuple3<Selectors, String, String> route : routes) {
// Check routing table
if (!route.f0.isMatch(originalTable)) {
continue;
@ -78,7 +97,7 @@ public class SchemaDerivation {
noRouteMatched = false;
// Matched a routing rule
TableId derivedTable = route.f1;
TableId derivedTable = resolveReplacement(originalTable, route);
Set<TableId> originalTables =
derivationMapping.computeIfAbsent(derivedTable, t -> new HashSet<>());
originalTables.add(originalTable);
@ -134,6 +153,14 @@ public class SchemaDerivation {
}
}
private TableId resolveReplacement(
TableId originalTable, Tuple3<Selectors, String, String> route) {
if (route.f2 != null) {
return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName()));
}
return TableId.parse(route.f1);
}
public Map<TableId, Set<TableId>> getDerivationMapping() {
return derivationMapping;
}

@ -17,9 +17,8 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
@ -90,7 +89,7 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
/** Metadata applier for applying schema changes to external system. */
private final MetadataApplier metadataApplier;
private final List<Tuple2<Selectors, TableId>> routes;
private final List<RouteRule> routes;
/** The request handler that handle all requests and events. */
private SchemaRegistryRequestHandler requestHandler;
@ -104,7 +103,7 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
String operatorName,
OperatorCoordinator.Context context,
MetadataApplier metadataApplier,
List<Tuple2<Selectors, TableId>> routes) {
List<RouteRule> routes) {
this.context = context;
this.operatorName = operatorName;
this.failedReasons = new HashMap<>();

@ -17,16 +17,13 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import java.util.List;
import java.util.stream.Collectors;
/** Provider of {@link SchemaRegistry}. */
@Internal
@ -36,13 +33,13 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider {
private final OperatorID operatorID;
private final String operatorName;
private final MetadataApplier metadataApplier;
private final List<Tuple2<String, TableId>> routingRules;
private final List<RouteRule> routingRules;
public SchemaRegistryProvider(
OperatorID operatorID,
String operatorName,
MetadataApplier metadataApplier,
List<Tuple2<String, TableId>> routingRules) {
List<RouteRule> routingRules) {
this.operatorID = operatorID;
this.operatorName = operatorName;
this.metadataApplier = metadataApplier;
@ -56,19 +53,6 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider {
@Override
public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
List<Tuple2<Selectors, TableId>> routes =
routingRules.stream()
.map(
tuple2 -> {
String tableInclusions = tuple2.f0;
TableId replaceBy = tuple2.f1;
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables(tableInclusions)
.build();
return new Tuple2<>(selectors, replaceBy);
})
.collect(Collectors.toList());
return new SchemaRegistry(operatorName, context, metadataApplier, routes);
return new SchemaRegistry(operatorName, context, metadataApplier, routingRules);
}
}

@ -17,7 +17,6 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
@ -25,10 +24,10 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
@ -81,13 +80,9 @@ class SchemaDerivationTest {
.column(Column.physicalColumn("gender", DataTypes.STRING()))
.build();
private static final List<Tuple2<Selectors, TableId>> ROUTES =
private static final List<RouteRule> ROUTES =
Collections.singletonList(
Tuple2.of(
new Selectors.SelectorsBuilder()
.includeTables("mydb.myschema.mytable[0-9]")
.build(),
MERGED_TABLE));
new RouteRule("mydb.myschema.mytable[0-9]", MERGED_TABLE.toString(), null));
@Test
void testOneToOneMapping() {

Loading…
Cancel
Save