|
|
|
@ -255,6 +255,102 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
|
|
|
|
|
() -> submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testByDefaultTransform() throws Exception {
|
|
|
|
|
String dbName = schemaEvolveDatabase.getDatabaseName();
|
|
|
|
|
|
|
|
|
|
// We put a dummy transform block that matches nothing
|
|
|
|
|
// to ensure TransformOperator exists, so we could verify if TransformOperator could
|
|
|
|
|
// correctly handle such "bypass" tables with schema changes.
|
|
|
|
|
String pipelineJob =
|
|
|
|
|
String.format(
|
|
|
|
|
"source:\n"
|
|
|
|
|
+ " type: mysql\n"
|
|
|
|
|
+ " hostname: %s\n"
|
|
|
|
|
+ " port: 3306\n"
|
|
|
|
|
+ " username: %s\n"
|
|
|
|
|
+ " password: %s\n"
|
|
|
|
|
+ " tables: %s.members\n"
|
|
|
|
|
+ " server-id: 5400-5404\n"
|
|
|
|
|
+ " server-time-zone: UTC\n"
|
|
|
|
|
+ "\n"
|
|
|
|
|
+ "sink:\n"
|
|
|
|
|
+ " type: values\n"
|
|
|
|
|
+ "transform:\n"
|
|
|
|
|
+ " - source-table: another.irrelevant\n"
|
|
|
|
|
+ " projection: \"'irrelevant' AS tag\"\n"
|
|
|
|
|
+ "\n"
|
|
|
|
|
+ "pipeline:\n"
|
|
|
|
|
+ " schema.change.behavior: evolve\n"
|
|
|
|
|
+ " parallelism: %d",
|
|
|
|
|
INTER_CONTAINER_MYSQL_ALIAS,
|
|
|
|
|
MYSQL_TEST_USER,
|
|
|
|
|
MYSQL_TEST_PASSWORD,
|
|
|
|
|
dbName,
|
|
|
|
|
parallelism);
|
|
|
|
|
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
|
|
|
|
|
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
|
|
|
|
|
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
|
|
|
|
|
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
|
|
|
|
|
waitUntilJobRunning(Duration.ofSeconds(30));
|
|
|
|
|
LOG.info("Pipeline job is running");
|
|
|
|
|
validateSnapshotData(dbName, "members");
|
|
|
|
|
|
|
|
|
|
LOG.info("Starting schema evolution");
|
|
|
|
|
String mysqlJdbcUrl =
|
|
|
|
|
String.format(
|
|
|
|
|
"jdbc:mysql://%s:%s/%s", MYSQL.getHost(), MYSQL.getDatabasePort(), dbName);
|
|
|
|
|
|
|
|
|
|
try (Connection conn =
|
|
|
|
|
DriverManager.getConnection(
|
|
|
|
|
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
|
|
|
|
|
Statement stmt = conn.createStatement()) {
|
|
|
|
|
|
|
|
|
|
waitForIncrementalStage(dbName, "members", stmt);
|
|
|
|
|
|
|
|
|
|
// triggers AddColumnEvent
|
|
|
|
|
stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;");
|
|
|
|
|
stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");
|
|
|
|
|
|
|
|
|
|
// triggers AlterColumnTypeEvent and RenameColumnEvent
|
|
|
|
|
stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;");
|
|
|
|
|
|
|
|
|
|
// triggers RenameColumnEvent
|
|
|
|
|
stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;");
|
|
|
|
|
|
|
|
|
|
// triggers DropColumnEvent
|
|
|
|
|
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
|
|
|
|
|
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
|
|
|
|
|
|
|
|
|
|
// triggers TruncateTableEvent
|
|
|
|
|
stmt.execute("TRUNCATE TABLE members;");
|
|
|
|
|
stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
|
|
|
|
|
|
|
|
|
|
// triggers DropTableEvent
|
|
|
|
|
stmt.execute("DROP TABLE members;");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
List<String> expectedTaskManagerEvents =
|
|
|
|
|
Arrays.asList(
|
|
|
|
|
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
|
|
|
|
|
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
|
|
|
|
|
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
|
|
|
|
|
"RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}",
|
|
|
|
|
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
|
|
|
|
|
"DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}",
|
|
|
|
|
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}",
|
|
|
|
|
"TruncateTableEvent{tableId=%s.members}",
|
|
|
|
|
"DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, 17.0], op=INSERT, meta=()}",
|
|
|
|
|
"DropTableEvent{tableId=%s.members}");
|
|
|
|
|
|
|
|
|
|
List<String> expectedTmEvents =
|
|
|
|
|
expectedTaskManagerEvents.stream()
|
|
|
|
|
.map(s -> String.format(s, dbName, dbName))
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
|
|
|
|
validateResult(expectedTmEvents, taskManagerConsumer);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void testGenericSchemaEvolution(
|
|
|
|
|
String behavior,
|
|
|
|
|
boolean mergeTable,
|
|
|
|
|