[FLINK-35264][cdc][runtime] Fix multiple transform rules do not take effect (#3280)

pull/3284/head
yux 9 months ago committed by GitHub
parent 23a67dcdb9
commit f61f0f44bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.pipeline.tests;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
/** E2e tests for the {@link TransformSchemaOperator}. */
@RunWith(Parameterized.class)
public class TransformE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(TransformE2eITCase.class);
// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the data source for easier verifying)
// ------------------------------------------------------------------------------------------
protected static final String MYSQL_TEST_USER = "mysqluser";
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
@ClassRule
public static final MySqlContainer MYSQL =
(MySqlContainer)
new MySqlContainer(
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
.withConfigurationOverride("docker/mysql/my.cnf")
.withSetupSQL("docker/mysql/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
protected final UniqueDatabase transformRenameDatabase =
new UniqueDatabase(MYSQL, "transform_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
@Before
public void before() throws Exception {
super.before();
transformRenameDatabase.createAndInitialize();
}
@After
public void after() {
super.after();
transformRenameDatabase.dropDatabase();
}
@Test
public void testHeteroSchemaTransform() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "route:\n"
+ " - source-table: %s.\\.*\n"
+ " sink-table: %s.terminus\n"
+ "transform:\n"
+ " - source-table: %s.TABLEALPHA\n"
+ " projection: ID, VERSION\n"
+ " filter: ID > 1008\n"
+ " - source-table: %s.TABLEBETA\n"
+ " projection: ID, VERSION\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
waitUtilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
List<String> expectedEvents =
Arrays.asList(
String.format(
"CreateTableEvent{tableId=%s.terminus, schema=columns={`ID` INT NOT NULL,`VERSION` STRING}, primaryKeys=ID, options=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1009, 8.1], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1010, 10], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2012, 12], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()));
validateResult(expectedEvents);
LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
transformRenameDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79);");
stat.execute("DELETE FROM TABLEBETA WHERE id=2011;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
waitUtilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
waitUtilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
waitUtilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
String stdout = taskManagerConsumer.toUtf8String();
System.out.println(stdout);
}
private void validateResult(List<String> expectedEvents) {
String stdout = taskManagerConsumer.toUtf8String();
for (String event : expectedEvents) {
if (!stdout.contains(event)) {
throw new RuntimeException(
"failed to get specific event: " + event + " from stdout: " + stdout);
}
}
}
private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
result = true;
break;
}
Thread.sleep(1000);
}
if (!result) {
throw new TimeoutException(
"failed to get specific event: "
+ event
+ " from stdout: "
+ taskManagerConsumer.toUtf8String());
}
}
}

@ -1,15 +1,17 @@
-- Copyright 2023 Ververica Inc.
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: mysql_inventory

@ -0,0 +1,42 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
DROP TABLE IF EXISTS TABLEALPHA;
CREATE TABLE TABLEALPHA (
ID INT NOT NULL,
VERSION VARCHAR(17),
PRICEALPHA INT,
PRIMARY KEY (ID)
);
INSERT INTO TABLEALPHA VALUES (1008, '8', 199);
INSERT INTO TABLEALPHA VALUES (1009, '8.1', 0);
INSERT INTO TABLEALPHA VALUES (1010, '10', 99);
INSERT INTO TABLEALPHA VALUES (1011, '11', 59);
DROP TABLE IF EXISTS TABLEBETA;
CREATE TABLE TABLEBETA (
ID INT NOT NULL,
VERSION VARCHAR(17),
CODENAMESBETA VARCHAR(17),
PRIMARY KEY (ID)
);
INSERT INTO TABLEBETA VALUES (2011, '11', 'Big Sur');
INSERT INTO TABLEBETA VALUES (2012, '12', 'Monterey');
INSERT INTO TABLEBETA VALUES (2013, '13', 'Ventura');
INSERT INTO TABLEBETA VALUES (2014, '14', 'Sonoma');

@ -96,6 +96,7 @@ public class TransformSchemaOperator extends AbstractStreamOperator<Event>
@Override
public void open() throws Exception {
super.open();
transforms = new ArrayList<>();
for (Tuple5<String, String, String, String, String> transformRule : transformRules) {
String tableInclusions = transformRule.f0;
String projection = transformRule.f1;
@ -104,7 +105,6 @@ public class TransformSchemaOperator extends AbstractStreamOperator<Event>
String tableOptions = transformRule.f4;
Selectors selectors =
new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
transforms = new ArrayList<>();
transforms.add(new Tuple2<>(selectors, TransformProjection.of(projection)));
schemaMetadataTransformers.add(
new Tuple2<>(

Loading…
Cancel
Save