[FLINK-35323][cdc-runtime] Fix transform failure when one rule matches multiple tables with incompatible schema

This closes #3312.
pull/3386/head
yux 10 months ago committed by GitHub
parent e18e7a2523
commit f4045fb8e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -128,13 +128,13 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
@ -184,19 +184,19 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
throw e;
}
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}",
transformRenameDatabase.getDatabaseName()),
@ -206,17 +206,122 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
System.out.println(stdout);
}
private void validateResult(List<String> expectedEvents) {
@Test
public void testMultipleHittingTable() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "transform:\n"
+ " - source-table: %s.TABLE\\.*\n"
+ " projection: \\*, ID + 1000 as UID, VERSION AS NEWVERSION\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
List<String> expectedEvents =
Arrays.asList(
String.format(
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` STRING,`PRICEALPHA` INT,`UID` INT,`NEWVERSION` STRING}, primaryKeys=ID, options=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 2010, 10], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 2011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 2008, 8], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 2009, 8.1], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` STRING,`CODENAMESBETA` STRING,`UID` INT,`NEWVERSION` STRING}, primaryKeys=ID, options=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 3014, 14], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 3012, 12], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 3013, 13], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 3011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()));
validateResult(expectedEvents);
LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
transformRenameDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79);");
stat.execute("DELETE FROM TABLEBETA WHERE id=2011;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 2009, 8.1], after=[1009, 100, 0, 2009, 100], op=UPDATE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 4007, 7], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 3011, 11], after=[], op=DELETE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
String stdout = taskManagerConsumer.toUtf8String();
System.out.println(stdout);
}
private void validateResult(List<String> expectedEvents) throws Exception {
for (String event : expectedEvents) {
if (!stdout.contains(event)) {
throw new RuntimeException(
"failed to get specific event: " + event + " from stdout: " + stdout);
}
waitUntilSpecificEvent(event, 6000L);
}
}
private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {

@ -17,6 +17,7 @@
package org.apache.flink.cdc.runtime.operators.transform;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
@ -70,9 +71,10 @@ public class TransformDataOperator extends AbstractStreamOperator<Event>
/** keep the relationship of TableId and table information. */
private final Map<TableId, TableInfo> tableInfoMap;
private transient Map<TransformProjection, TransformProjectionProcessor>
private transient Map<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor>
transformProjectionProcessorMap;
private transient Map<TransformFilter, TransformFilterProcessor> transformFilterProcessorMap;
private transient Map<Tuple2<TableId, TransformFilter>, TransformFilterProcessor>
transformFilterProcessorMap;
public static TransformDataOperator.Builder newBuilder() {
return new TransformDataOperator.Builder();
@ -228,13 +230,15 @@ public class TransformDataOperator extends AbstractStreamOperator<Event>
if (selectors.isMatch(tableId) && transform.f1.isPresent()) {
TransformProjection transformProjection = transform.f1.get();
if (transformProjection.isValid()) {
if (!transformProjectionProcessorMap.containsKey(transformProjection)) {
if (!transformProjectionProcessorMap.containsKey(
Tuple2.of(tableId, transformProjection))) {
transformProjectionProcessorMap.put(
transformProjection,
Tuple2.of(tableId, transformProjection),
TransformProjectionProcessor.of(transformProjection));
}
TransformProjectionProcessor transformProjectionProcessor =
transformProjectionProcessorMap.get(transformProjection);
transformProjectionProcessorMap.get(
Tuple2.of(tableId, transformProjection));
// update the columns of projection and add the column of projection into Schema
transformProjectionProcessor.processSchemaChangeEvent(schema);
}
@ -258,19 +262,21 @@ public class TransformDataOperator extends AbstractStreamOperator<Event>
&& transformProjectionOptional.isPresent()
&& transformProjectionOptional.get().isValid()) {
TransformProjection transformProjection = transformProjectionOptional.get();
if (!transformProjectionProcessorMap.containsKey(transformProjection)
if (!transformProjectionProcessorMap.containsKey(
Tuple2.of(tableId, transformProjection))
|| !transformProjectionProcessorMap
.get(transformProjection)
.get(Tuple2.of(tableId, transformProjection))
.hasTableInfo()) {
transformProjectionProcessorMap.put(
transformProjection,
Tuple2.of(tableId, transformProjection),
TransformProjectionProcessor.of(
getTableInfoFromSchemaEvolutionClient(tableId),
transformProjection,
timezone));
}
TransformProjectionProcessor transformProjectionProcessor =
transformProjectionProcessorMap.get(transformProjection);
transformProjectionProcessorMap.get(
Tuple2.of(tableId, transformProjection));
dataChangeEventOptional =
processProjection(
transformProjectionProcessor,
@ -281,16 +287,17 @@ public class TransformDataOperator extends AbstractStreamOperator<Event>
if (transformFilterOptional.isPresent()
&& transformFilterOptional.get().isVaild()) {
TransformFilter transformFilter = transformFilterOptional.get();
if (!transformFilterProcessorMap.containsKey(transformFilter)) {
if (!transformFilterProcessorMap.containsKey(
Tuple2.of(tableId, transformFilter))) {
transformFilterProcessorMap.put(
transformFilter,
Tuple2.of(tableId, transformFilter),
TransformFilterProcessor.of(
getTableInfoFromSchemaEvolutionClient(tableId),
transformFilter,
timezone));
}
TransformFilterProcessor transformFilterProcessor =
transformFilterProcessorMap.get(transformFilter);
transformFilterProcessorMap.get(Tuple2.of(tableId, transformFilter));
dataChangeEventOptional =
processFilter(
transformFilterProcessor,
@ -302,19 +309,21 @@ public class TransformDataOperator extends AbstractStreamOperator<Event>
&& transformProjectionOptional.isPresent()
&& transformProjectionOptional.get().isValid()) {
TransformProjection transformProjection = transformProjectionOptional.get();
if (!transformProjectionProcessorMap.containsKey(transformProjection)
if (!transformProjectionProcessorMap.containsKey(
Tuple2.of(tableId, transformProjection))
|| !transformProjectionProcessorMap
.get(transformProjection)
.get(Tuple2.of(tableId, transformProjection))
.hasTableInfo()) {
transformProjectionProcessorMap.put(
transformProjection,
Tuple2.of(tableId, transformProjection),
TransformProjectionProcessor.of(
getTableInfoFromSchemaEvolutionClient(tableId),
transformProjection,
timezone));
}
TransformProjectionProcessor transformProjectionProcessor =
transformProjectionProcessorMap.get(transformProjection);
transformProjectionProcessorMap.get(
Tuple2.of(tableId, transformProjection));
dataChangeEventOptional =
processProjection(
transformProjectionProcessor,

Loading…
Cancel
Save