From 06fc9395693a78318bc3b42b76088e6d3201b79f Mon Sep 17 00:00:00 2001
From: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
Date: Fri, 30 Aug 2024 11:41:05 +0800
Subject: [PATCH] [FLINK-36184][transform] Fix transform operator swallows
 schema changes from tables not present in transform rules (#3589)

---
 .../pipeline/tests/SchemaEvolveE2eITCase.java | 96 +++++++++++++++++++
 .../transform/PostTransformOperator.java      | 32 +++++--
 .../transform/PreTransformOperator.java       | 29 ++++--
 3 files changed, 137 insertions(+), 20 deletions(-)

diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index 7551add08..43ff5305b 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -255,6 +255,102 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
                 () -> submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar));
     }
 
+    @Test
+    public void testByDefaultTransform() throws Exception {
+        String dbName = schemaEvolveDatabase.getDatabaseName();
+
+        // We put a dummy transform block that matches nothing
+        // to ensure TransformOperator exists, so we could verify if TransformOperator could
+        // correctly handle such "bypass" tables with schema changes.
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: 3306\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.members\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "transform:\n"
+                                + "  - source-table: another.irrelevant\n"
+                                + "    projection: \"'irrelevant' AS tag\"\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  schema.change.behavior: evolve\n"
+                                + "  parallelism: %d",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        dbName,
+                        parallelism);
+        Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+        validateSnapshotData(dbName, "members");
+
+        LOG.info("Starting schema evolution");
+        String mysqlJdbcUrl =
+                String.format(
+                        "jdbc:mysql://%s:%s/%s", MYSQL.getHost(), MYSQL.getDatabasePort(), dbName);
+
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+                Statement stmt = conn.createStatement()) {
+
+            waitForIncrementalStage(dbName, "members", stmt);
+
+            // triggers AddColumnEvent
+            stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;");
+            stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");
+
+            // triggers AlterColumnTypeEvent and RenameColumnEvent
+            stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;");
+
+            // triggers RenameColumnEvent
+            stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;");
+
+            // triggers DropColumnEvent
+            stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
+            stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
+
+            // triggers TruncateTableEvent
+            stmt.execute("TRUNCATE TABLE members;");
+            stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
+
+            // triggers DropTableEvent
+            stmt.execute("DROP TABLE members;");
+        }
+
+        List<String> expectedTaskManagerEvents =
+                Arrays.asList(
+                        "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
+                        "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
+                        "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
+                        "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}",
+                        "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
+                        "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}",
+                        "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}",
+                        "TruncateTableEvent{tableId=%s.members}",
+                        "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, 17.0], op=INSERT, meta=()}",
+                        "DropTableEvent{tableId=%s.members}");
+
+        List<String> expectedTmEvents =
+                expectedTaskManagerEvents.stream()
+                        .map(s -> String.format(s, dbName, dbName))
+                        .collect(Collectors.toList());
+
+        validateResult(expectedTmEvents, taskManagerConsumer);
+    }
+
     private void testGenericSchemaEvolution(
             String behavior,
             boolean mergeTable,
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 30c78d202..83dd07c53 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -258,17 +258,29 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
                                                     .stream())
                             .map(ProjectionColumn::getColumnName)
                             .collect(Collectors.toSet());
-            boolean hasAsterisk =
-                    transforms.stream()
-                            .filter(t -> t.getSelectors().isMatch(tableId))
-                            .anyMatch(
-                                    t ->
-                                            TransformParser.hasAsterisk(
-                                                    t.getProjection()
-                                                            .map(TransformProjection::getProjection)
-                                                            .orElse(null)));
 
-            hasAsteriskMap.put(tableId, hasAsterisk);
+            boolean notTransformed =
+                    transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
+
+            if (notTransformed) {
+                // If this TableId isn't presented in any transform block, it should behave like a
+                // "*" projection and should be regarded as asterisk-ful.
+                hasAsteriskMap.put(tableId, true);
+            } else {
+                boolean hasAsterisk =
+                        transforms.stream()
+                                .filter(t -> t.getSelectors().isMatch(tableId))
+                                .anyMatch(
+                                        t ->
+                                                TransformParser.hasAsterisk(
+                                                        t.getProjection()
+                                                                .map(
+                                                                        TransformProjection
+                                                                                ::getProjection)
+                                                                .orElse(null)));
+
+                hasAsteriskMap.put(tableId, hasAsterisk);
+            }
             projectedColumnsMap.put(
                     tableId,
                     createTableEvent.getSchema().getColumnNames().stream()
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 5d8798068..3b050f993 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -300,17 +300,26 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
                         .map(Column::getName)
                         .collect(Collectors.toSet());
 
-        boolean hasAsterisk =
-                transforms.stream()
-                        .filter(t -> t.getSelectors().isMatch(tableId))
-                        .anyMatch(
-                                t ->
-                                        TransformParser.hasAsterisk(
-                                                t.getProjection()
-                                                        .map(TransformProjection::getProjection)
-                                                        .orElse(null)));
+        boolean notTransformed =
+                transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
 
-        hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
+        if (notTransformed) {
+            // If this TableId isn't presented in any transform block, it should behave like a "*"
+            // projection and should be regarded as asterisk-ful.
+            hasAsteriskMap.put(tableId, true);
+        } else {
+            boolean hasAsterisk =
+                    transforms.stream()
+                            .filter(t -> t.getSelectors().isMatch(tableId))
+                            .anyMatch(
+                                    t ->
+                                            TransformParser.hasAsterisk(
+                                                    t.getProjection()
+                                                            .map(TransformProjection::getProjection)
+                                                            .orElse(null)));
+
+            hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
+        }
         referencedColumnsMap.put(
                 createTableEvent.tableId(),
                 createTableEvent.getSchema().getColumnNames().stream()