diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
index 54ae076fc..ed166767c 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
@@ -46,6 +46,49 @@ limitations under the License.
flink-doris-connector-${flink.major.version}
1.6.0
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
+
+ junit
+ junit
+
+
+
+
+ org.apache.flink
+ flink-test-utils-junit
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+ test
+
+
+ org.testcontainers
+ jdbc
+ 1.18.3
+ test
+
+
+ mysql
+ mysql-connector-java
+ 8.0.26
+ test
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
new file mode 100644
index 000000000..0d3c0d99c
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
+import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
+import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
+import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
+import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
+
+/** IT tests for {@link DorisMetadataApplier}. */
+@RunWith(Parameterized.class)
+public class DorisMetadataApplierITCase extends DorisSinkTestBase {
+ private static final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5;
+
+ private final boolean batchMode;
+
+ public DorisMetadataApplierITCase(boolean batchMode) {
+ this.batchMode = batchMode;
+ }
+
+ @Parameters(name = "batchMode: {0}")
+ public static Iterable> data() {
+ return Arrays.asList(true, false);
+ }
+
+ @BeforeClass
+ public static void before() {
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(3000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ }
+
+ @Before
+ public void initializeDatabase() {
+ createDatabase(DorisContainer.DORIS_DATABASE_NAME);
+
+ // waiting for table to be created
+ DORIS_CONTAINER.waitForLog(
+ String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME),
+ 1,
+ DATABASE_OPERATION_TIMEOUT_SECONDS);
+
+ LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME);
+ }
+
+ @After
+ public void destroyDatabase() {
+ dropDatabase(DorisContainer.DORIS_DATABASE_NAME);
+ // waiting for database to be created
+ DORIS_CONTAINER.waitForLog(
+ String.format(
+ ".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME),
+ 1,
+ DATABASE_OPERATION_TIMEOUT_SECONDS);
+
+ LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME);
+ }
+
+ private List generateAddColumnEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ new PhysicalColumn("extra_date", DataTypes.DATE(), null)))),
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ new PhysicalColumn(
+ "extra_bool", DataTypes.BOOLEAN(), null)))),
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ new PhysicalColumn(
+ "extra_decimal",
+ DataTypes.DECIMAL(17, 0),
+ null)))));
+ }
+
+ private List generateDropColumnEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ new DropColumnEvent(tableId, Collections.singletonList("number")));
+ }
+
+ private List generateRenameColumnEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")),
+ new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae")));
+ }
+
+ private List generateAlterColumnTypeEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ new AlterColumnTypeEvent(
+ tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19))));
+ }
+
+ private List generateNarrowingAlterColumnTypeEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ // Double -> Float is a narrowing cast, should fail
+ new AlterColumnTypeEvent(
+ tableId, Collections.singletonMap("number", DataTypes.FLOAT())));
+ }
+
+ @Test
+ public void testDorisDataTypes() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
+
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID"))
+ // Doris sink doesn't support BINARY type yet.
+ // .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary"))
+ // .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var
+ // Binary"))
+ .column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes"))
+ .column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean"))
+ .column(new PhysicalColumn("int", DataTypes.INT(), "Int"))
+ .column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int"))
+ .column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int"))
+ .column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float"))
+ .column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double"))
+ .column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char"))
+ .column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char"))
+ .column(new PhysicalColumn("string", DataTypes.STRING(), "String"))
+ .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal"))
+ .column(new PhysicalColumn("date", DataTypes.DATE(), "Date"))
+ // Doris sink doesn't support TIME type yet.
+ // .column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
+ // .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With
+ // Precision"))
+ .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp"))
+ .column(
+ new PhysicalColumn(
+ "timestamp_3",
+ DataTypes.TIMESTAMP(3),
+ "Timestamp With Precision"))
+ .column(
+ new PhysicalColumn(
+ "timestamptz", DataTypes.TIMESTAMP_TZ(), "TimestampTZ"))
+ .column(
+ new PhysicalColumn(
+ "timestamptz_3",
+ DataTypes.TIMESTAMP_TZ(3),
+ "TimestampTZ With Precision"))
+ .column(
+ new PhysicalColumn(
+ "timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ"))
+ .column(
+ new PhysicalColumn(
+ "timestampltz_3",
+ DataTypes.TIMESTAMP_LTZ(3),
+ "TimestampLTZ With Precision"))
+ .column(
+ new PhysicalColumn(
+ "arrayofint",
+ DataTypes.ARRAY(DataTypes.INT()),
+ "Array of Int"))
+ .column(
+ new PhysicalColumn(
+ "arrayofstr",
+ DataTypes.ARRAY(DataTypes.STRING()),
+ "Array of String"))
+ .column(
+ new PhysicalColumn(
+ "mapint2str",
+ DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
+ "Map Int to String"))
+ .primaryKey("id")
+ .build();
+
+ runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema)));
+
+ List actual = inspectTableSchema(tableId);
+ List expected =
+ Arrays.asList(
+ "id | INT | Yes | true | null",
+ "bytes | TEXT | Yes | false | null",
+ "boolean | BOOLEAN | Yes | false | null",
+ "int | INT | Yes | false | null",
+ "tinyint | TINYINT | Yes | false | null",
+ "smallint | SMALLINT | Yes | false | null",
+ "float | FLOAT | Yes | false | null",
+ "double | DOUBLE | Yes | false | null",
+ "char | CHAR(51) | Yes | false | null",
+ "varchar | VARCHAR(51) | Yes | false | null",
+ "string | TEXT | Yes | false | null",
+ "decimal | DECIMAL(17, 7) | Yes | false | null",
+ "date | DATE | Yes | false | null",
+ "timestamp | DATETIME(6) | Yes | false | null",
+ "timestamp_3 | DATETIME(3) | Yes | false | null",
+ "timestamptz | DATETIME(6) | Yes | false | null",
+ "timestamptz_3 | DATETIME(3) | Yes | false | null",
+ "timestampltz | DATETIME(6) | Yes | false | null",
+ "timestampltz_3 | DATETIME(3) | Yes | false | null",
+ "arrayofint | TEXT | Yes | false | null",
+ "arrayofstr | TEXT | Yes | false | null",
+ "mapint2str | TEXT | Yes | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test
+ public void testDorisAddColumn() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
+
+ runJobWithEvents(generateAddColumnEvents(tableId));
+
+ List actual = inspectTableSchema(tableId);
+
+ List expected =
+ Arrays.asList(
+ "id | INT | Yes | true | null",
+ "number | DOUBLE | Yes | false | null",
+ "name | VARCHAR(51) | Yes | false | null",
+ "extra_date | DATE | Yes | false | null",
+ "extra_bool | BOOLEAN | Yes | false | null",
+ "extra_decimal | DECIMAL(17, 0) | Yes | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test
+ public void testDorisDropColumn() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
+
+ runJobWithEvents(generateDropColumnEvents(tableId));
+
+ List actual = inspectTableSchema(tableId);
+
+ List expected =
+ Arrays.asList(
+ "id | INT | Yes | true | null", "name | VARCHAR(51) | Yes | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test
+ public void testDorisRenameColumn() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
+
+ runJobWithEvents(generateRenameColumnEvents(tableId));
+
+ List actual = inspectTableSchema(tableId);
+
+ List expected =
+ Arrays.asList(
+ "id | INT | Yes | true | null",
+ "kazu | DOUBLE | Yes | false | null",
+ "namae | VARCHAR(51) | Yes | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test
+ @Ignore("AlterColumnType is yet to be supported until we close FLINK-35072.")
+ public void testDorisAlterColumnType() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
+
+ runJobWithEvents(generateAlterColumnTypeEvents(tableId));
+
+ List actual = inspectTableSchema(tableId);
+
+ List expected =
+ Arrays.asList(
+ "id | INT | Yes | true | null",
+ "number | DOUBLE | Yes | false | null",
+ "name | VARCHAR(57) | Yes | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test(expected = JobExecutionException.class)
+ public void testDorisNarrowingAlterColumnType() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
+
+ runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId));
+ }
+
+ private void runJobWithEvents(List events) throws Exception {
+ DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class));
+
+ Configuration config =
+ new Configuration()
+ .set(FENODES, DORIS_CONTAINER.getFeNodes())
+ .set(BENODES, DORIS_CONTAINER.getBeNodes())
+ .set(USERNAME, DorisContainer.DORIS_USERNAME)
+ .set(PASSWORD, DorisContainer.DORIS_PASSWORD)
+ .set(SINK_ENABLE_BATCH_MODE, batchMode)
+ .set(SINK_ENABLE_DELETE, true);
+
+ config.addAll(
+ Configuration.fromMap(
+ Collections.singletonMap("table.create.properties.replication_num", "1")));
+
+ DataSink dorisSink = createDorisDataSink(config);
+
+ SchemaOperatorTranslator schemaOperatorTranslator =
+ new SchemaOperatorTranslator(
+ SchemaChangeBehavior.EVOLVE,
+ "$$_schema_operator_$$",
+ DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
+
+ OperatorIDGenerator schemaOperatorIDGenerator =
+ new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
+
+ stream =
+ schemaOperatorTranslator.translate(
+ stream,
+ DEFAULT_PARALLELISM,
+ dorisSink.getMetadataApplier(),
+ new ArrayList<>());
+
+ DataSinkTranslator sinkTranslator = new DataSinkTranslator();
+ sinkTranslator.translate(
+ new SinkDef("doris", "Dummy Doris Sink", config),
+ stream,
+ dorisSink,
+ schemaOperatorIDGenerator.generate());
+
+ env.execute("Doris Schema Evolution Test");
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java
new file mode 100644
index 000000000..bb209a793
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
+import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
+
+/** IT tests for {@link DorisDataSink}. */
+public class DorisPipelineITCase extends DorisSinkTestBase {
+
+ private static final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5;
+
+ @BeforeClass
+ public static void before() {
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(3000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ }
+
+ @Before
+ public void initializeDatabaseAndTable() {
+ createDatabase(DorisContainer.DORIS_DATABASE_NAME);
+
+ // waiting for table to be created
+ DORIS_CONTAINER.waitForLog(
+ String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME),
+ 1,
+ DATABASE_OPERATION_TIMEOUT_SECONDS);
+
+ LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME);
+
+ createTable(
+ DorisContainer.DORIS_DATABASE_NAME,
+ DorisContainer.DORIS_TABLE_NAME,
+ "id",
+ Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)"));
+
+ // waiting for table to be created
+ DORIS_CONTAINER.waitForLog(
+ String.format(
+ ".*successfully create table\\[%s;.*\\s", DorisContainer.DORIS_TABLE_NAME),
+ 1,
+ DATABASE_OPERATION_TIMEOUT_SECONDS);
+
+ LOG.info("Table {} created.", DorisContainer.DORIS_TABLE_NAME);
+ }
+
+ @After
+ public void destroyDatabaseAndTable() {
+ dropTable(DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
+ // waiting for table to be dropped
+ DORIS_CONTAINER.waitForLog(
+ String.format(
+ ".*finished dropping table: %s.*\\s", DorisContainer.DORIS_TABLE_NAME),
+ 1,
+ DATABASE_OPERATION_TIMEOUT_SECONDS);
+
+ LOG.info("Table {} destroyed.", DorisContainer.DORIS_TABLE_NAME);
+
+ dropDatabase(DorisContainer.DORIS_DATABASE_NAME);
+ // waiting for database to be created
+ DORIS_CONTAINER.waitForLog(
+ String.format(
+ ".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME),
+ 1,
+ DATABASE_OPERATION_TIMEOUT_SECONDS);
+
+ LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME);
+ }
+
+ @Test
+ public void testDorisSinkStreamJob() throws Exception {
+ runValuesToDorisJob(false);
+ }
+
+ @Test
+ public void testDorisSinkBatchJob() throws Exception {
+ runValuesToDorisJob(true);
+ }
+
+ private List generateEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(
+ RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")})),
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 19, 2.718, BinaryStringData.fromString("Que Sera Sera")
+ })),
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 21, 1.732, BinaryStringData.fromString("Disenchanted")
+ })),
+ DataChangeEvent.updateEvent(
+ tableId,
+ generator.generate(
+ new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")}),
+ generator.generate(
+ new Object[] {17, 6.28, BinaryStringData.fromString("Doris Day")})),
+ DataChangeEvent.deleteEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 19, 2.718, BinaryStringData.fromString("Que Sera Sera")
+ })));
+ }
+
+ private void runValuesToDorisJob(boolean batchMode) throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
+
+ DataStream stream =
+ env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class));
+
+ Configuration config =
+ new Configuration()
+ .set(FENODES, DORIS_CONTAINER.getFeNodes())
+ .set(BENODES, DORIS_CONTAINER.getBeNodes())
+ .set(USERNAME, DorisContainer.DORIS_USERNAME)
+ .set(PASSWORD, DorisContainer.DORIS_PASSWORD)
+ .set(SINK_ENABLE_BATCH_MODE, batchMode)
+ .set(SINK_ENABLE_DELETE, true);
+
+ config.addAll(
+ Configuration.fromMap(
+ Collections.singletonMap("table.create.properties.replication_num", "1")));
+
+ Sink dorisSink =
+ ((FlinkSinkProvider) createDorisDataSink(config).getEventSinkProvider()).getSink();
+
+ stream.sinkTo(dorisSink);
+
+ env.execute("Values to Doris Sink");
+
+ List actual = fetchTableContent(tableId, 3);
+
+ List expected = Arrays.asList("17 | 6.28 | Doris Day", "21 | 1.732 | Disenchanted");
+
+ assertEqualsInAnyOrder(expected, actual);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java
new file mode 100644
index 000000000..aa63c336b
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink.utils;
+
+import org.junit.ClassRule;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+
+/** Docker container for Doris. */
+public class DorisContainer extends JdbcDatabaseContainer {
+
+ private static final String DOCKER_IMAGE_NAME = "apache/doris:doris-all-in-one-2.1.0";
+
+ public static final int FE_INNER_PORT = 8030;
+ public static final int BE_INNER_PORT = 8040;
+ public static final int DB_INNER_PORT = 9030;
+
+ @ClassRule public static final Network NETWORK = Network.newNetwork();
+
+ public String getFeNodes() {
+ return String.format("%s:%d", getHost(), getMappedPort(FE_INNER_PORT));
+ }
+
+ public String getBeNodes() {
+ return String.format("%s:%d", getHost(), getMappedPort(BE_INNER_PORT));
+ }
+
+ public String getTableIdentifier() {
+ return String.format("%s.%s", DORIS_DATABASE_NAME, DORIS_TABLE_NAME);
+ }
+
+ public static final String DORIS_DATABASE_NAME = "doris_database";
+ public static final String DORIS_TABLE_NAME = "fallen_angel";
+ public static final String DORIS_USERNAME = "root";
+ public static final String DORIS_PASSWORD = "";
+
+ public DorisContainer() {
+ super(DockerImageName.parse(DOCKER_IMAGE_NAME));
+ setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT));
+ setNetwork(NETWORK);
+ }
+
+ public DorisContainer(Network network) {
+ super(DockerImageName.parse(DOCKER_IMAGE_NAME));
+ setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT));
+ setNetwork(network);
+ }
+
+ public void waitForLog(String regex, int count, int timeoutSeconds) {
+ new LogMessageWaitStrategy()
+ .withRegEx(regex)
+ .withTimes(count)
+ .withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
+ .waitUntilReady(this);
+ }
+
+ @Override
+ public String getDriverClassName() {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ return "com.mysql.cj.jdbc.Driver";
+ } catch (ClassNotFoundException e) {
+ return "com.mysql.jdbc.Driver";
+ }
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return getJdbcUrl("");
+ }
+
+ public String getJdbcUrl(String databaseName) {
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return "jdbc:mysql://"
+ + getHost()
+ + ":"
+ + getMappedPort(DB_INNER_PORT)
+ + "/"
+ + databaseName
+ + additionalUrlParams;
+ }
+
+ public String getJdbcUrl(String databaseName, String username) {
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return "jdbc:mysql://"
+ + username
+ + "@"
+ + getHost()
+ + ":"
+ + getMappedPort(DB_INNER_PORT)
+ + "/"
+ + databaseName
+ + additionalUrlParams;
+ }
+
+ public String getJdbcUrl(String databaseName, String username, String password) {
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return "jdbc:mysql://"
+ + username
+ + ":"
+ + password
+ + "@"
+ + getHost()
+ + ":"
+ + getMappedPort(DB_INNER_PORT)
+ + "/"
+ + databaseName
+ + additionalUrlParams;
+ }
+
+ @Override
+ public String getUsername() {
+ return DORIS_USERNAME;
+ }
+
+ @Override
+ public String getPassword() {
+ return DORIS_PASSWORD;
+ }
+
+ @Override
+ protected String getTestQueryString() {
+ return "SELECT 1";
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java
new file mode 100644
index 000000000..bad46a8df
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink.utils;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.factories.Factory;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.connectors.doris.factory.DorisDataSinkFactory;
+import org.apache.flink.cdc.connectors.doris.sink.DorisDataSink;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Basic class for testing {@link DorisDataSink}. */
+public class DorisSinkTestBase extends TestLogger {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DorisSinkTestBase.class);
+
+ protected static final int DEFAULT_PARALLELISM = 1;
+ protected static final DorisContainer DORIS_CONTAINER = createDorisContainer();
+
+ public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+
+ private static DorisContainer createDorisContainer() {
+ return new DorisContainer();
+ }
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
+
+ @BeforeClass
+ public static void startContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
+ LOG.info("Waiting for backends to be available");
+ long startWaitingTimestamp = System.currentTimeMillis();
+
+ new LogMessageWaitStrategy()
+ .withRegEx(".*get heartbeat from FE.*\\s")
+ .withTimes(1)
+ .withStartupTimeout(
+ Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
+ .waitUntilReady(DORIS_CONTAINER);
+
+ while (!checkBackendAvailability()) {
+ try {
+ if (System.currentTimeMillis() - startWaitingTimestamp
+ > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
+ throw new RuntimeException("Doris backend startup timed out.");
+ }
+ LOG.info("Waiting for backends to be available");
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ // ignore and check next round
+ }
+ }
+
+ LOG.info("Containers are started.");
+ }
+
+ @AfterClass
+ public static void stopContainers() {
+ LOG.info("Stopping containers...");
+ DORIS_CONTAINER.stop();
+ LOG.info("Containers are stopped.");
+ }
+
+ static class MockContext implements Factory.Context {
+
+ Configuration factoryConfiguration;
+
+ public MockContext(Configuration factoryConfiguration) {
+ this.factoryConfiguration = factoryConfiguration;
+ }
+
+ @Override
+ public Configuration getFactoryConfiguration() {
+ return factoryConfiguration;
+ }
+
+ @Override
+ public Configuration getPipelineConfiguration() {
+ return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC"));
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return this.getClassLoader();
+ }
+ }
+
+ public static DataSink createDorisDataSink(Configuration factoryConfiguration) {
+ DorisDataSinkFactory factory = new DorisDataSinkFactory();
+ return factory.createDataSink(new MockContext(factoryConfiguration));
+ }
+
+ public static boolean checkBackendAvailability() {
+ try {
+ Container.ExecResult rs =
+ DORIS_CONTAINER.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ "-e SHOW BACKENDS\\G");
+
+ if (rs.getExitCode() != 0) {
+ return false;
+ }
+ String output = rs.getStdout();
+ LOG.info("Doris backend status:\n{}", output);
+ return output.contains("*************************** 1. row ***************************")
+ && !output.contains("AvailCapacity: 1.000 B");
+ } catch (Exception e) {
+ LOG.info("Failed to check backend status.", e);
+ return false;
+ }
+ }
+
+ public static void createDatabase(String databaseName) {
+ try {
+ Container.ExecResult rs =
+ DORIS_CONTAINER.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName));
+
+ if (rs.getExitCode() != 0) {
+ throw new RuntimeException("Failed to create database." + rs.getStderr());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create database.", e);
+ }
+ }
+
+ public static void createTable(
+ String databaseName, String tableName, String primaryKey, List schema) {
+ try {
+ Container.ExecResult rs =
+ DORIS_CONTAINER.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ String.format(
+ "-e CREATE TABLE `%s`.`%s` (%s) UNIQUE KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");",
+ databaseName,
+ tableName,
+ String.join(", ", schema),
+ primaryKey,
+ primaryKey));
+
+ if (rs.getExitCode() != 0) {
+ throw new RuntimeException("Failed to create table." + rs.getStderr());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create table.", e);
+ }
+ }
+
+ public static void dropDatabase(String databaseName) {
+ try {
+ Container.ExecResult rs =
+ DORIS_CONTAINER.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ String.format("-e DROP DATABASE IF EXISTS %s;", databaseName));
+
+ if (rs.getExitCode() != 0) {
+ throw new RuntimeException("Failed to drop database." + rs.getStderr());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to drop database.", e);
+ }
+ }
+
+ public static void dropTable(String databaseName, String tableName) {
+ try {
+ Container.ExecResult rs =
+ DORIS_CONTAINER.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ String.format(
+ "-e DROP TABLE IF EXISTS %s.%s;", databaseName, tableName));
+
+ if (rs.getExitCode() != 0) {
+ throw new RuntimeException("Failed to drop table." + rs.getStderr());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to drop table.", e);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+
+ public List inspectTableSchema(TableId tableId) throws SQLException {
+ List results = new ArrayList<>();
+ ResultSet rs =
+ DORIS_CONTAINER
+ .createConnection("")
+ .createStatement()
+ .executeQuery(
+ String.format(
+ "DESCRIBE `%s`.`%s`",
+ tableId.getSchemaName(), tableId.getTableName()));
+
+ while (rs.next()) {
+ List columns = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ columns.add(rs.getString(i));
+ }
+ results.add(String.join(" | ", columns));
+ }
+ return results;
+ }
+
+ public List fetchTableContent(TableId tableId, int columnCount) throws SQLException {
+ List results = new ArrayList<>();
+ ResultSet rs =
+ DORIS_CONTAINER
+ .createConnection("")
+ .createStatement()
+ .executeQuery(
+ String.format(
+ "SELECT * FROM %s.%s",
+ tableId.getSchemaName(), tableId.getTableName()));
+
+ while (rs.next()) {
+ List columns = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ columns.add(rs.getString(i));
+ }
+ results.add(String.join(" | ", columns));
+ }
+ return results;
+ }
+
+ public static void assertEqualsInAnyOrder(List expected, List actual) {
+ assertTrue(expected != null && actual != null);
+ assertEqualsInOrder(
+ expected.stream().sorted().collect(Collectors.toList()),
+ actual.stream().sorted().collect(Collectors.toList()));
+ }
+
+ public static void assertEqualsInOrder(List expected, List actual) {
+ assertTrue(expected != null && actual != null);
+ assertEquals(expected.size(), actual.size());
+ assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
+ }
+
+ public static void assertMapEquals(Map expected, Map actual) {
+ assertTrue(expected != null && actual != null);
+ assertEquals(expected.size(), actual.size());
+ for (String key : expected.keySet()) {
+ assertEquals(expected.get(key), actual.get(key));
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..f0d32fb59
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
new file mode 100644
index 000000000..39997cdb6
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** End-to-end tests for mysql cdc to Doris pipeline job. */
+@RunWith(Parameterized.class)
+public class MySqlToDorisE2eITCase extends PipelineTestEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(MySqlToDorisE2eITCase.class);
+
+ // ------------------------------------------------------------------------------------------
+ // MySQL Variables (we always use MySQL as the data source for easier verifying)
+ // ------------------------------------------------------------------------------------------
+ protected static final String MYSQL_TEST_USER = "mysqluser";
+ protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+ protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+ public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+
+ @ClassRule
+ public static final MySqlContainer MYSQL =
+ (MySqlContainer)
+ new MySqlContainer(
+ MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
+ .withConfigurationOverride("docker/mysql/my.cnf")
+ .withSetupSQL("docker/mysql/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("mysql")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ @ClassRule
+ public static final DorisContainer DORIS =
+ new DorisContainer(NETWORK)
+ .withNetworkAliases("doris")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ protected final UniqueDatabase mysqlInventoryDatabase =
+ new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
+ @BeforeClass
+ public static void initializeContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL)).join();
+ Startables.deepStart(Stream.of(DORIS)).join();
+ LOG.info("Waiting for backends to be available");
+ long startWaitingTimestamp = System.currentTimeMillis();
+
+ new LogMessageWaitStrategy()
+ .withRegEx(".*get heartbeat from FE.*")
+ .withTimes(1)
+ .withStartupTimeout(
+ Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
+ .waitUntilReady(DORIS);
+
+ while (!checkBackendAvailability()) {
+ try {
+ if (System.currentTimeMillis() - startWaitingTimestamp
+ > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
+ throw new RuntimeException("Doris backend startup timed out.");
+ }
+ LOG.info("Waiting for backends to be available");
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ // ignore and check next round
+ }
+ }
+ LOG.info("Containers are started.");
+ }
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ mysqlInventoryDatabase.createAndInitialize();
+ createDorisDatabase(mysqlInventoryDatabase.getDatabaseName());
+ }
+
+ private static boolean checkBackendAvailability() {
+ try {
+ Container.ExecResult rs =
+ DORIS.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ "-e SHOW BACKENDS\\G");
+
+ if (rs.getExitCode() != 0) {
+ return false;
+ }
+ String output = rs.getStdout();
+ LOG.info("Doris backend status:\n{}", output);
+ return output.contains("*************************** 1. row ***************************")
+ && !output.contains("AvailCapacity: 1.000 B");
+ } catch (Exception e) {
+ LOG.info("Failed to check backend status.", e);
+ return false;
+ }
+ }
+
+ @After
+ public void after() {
+ super.after();
+ mysqlInventoryDatabase.dropDatabase();
+ dropDorisDatabase(mysqlInventoryDatabase.getDatabaseName());
+ }
+
+ @Test
+ public void testSyncWholeDatabase() throws Exception {
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: mysql\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: doris\n"
+ + " fenodes: doris:8030\n"
+ + " benodes: doris:8040\n"
+ + " username: %s\n"
+ + " password: \"%s\"\n"
+ + " table.create.properties.replication_num: 1\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: 1",
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ mysqlInventoryDatabase.getDatabaseName(),
+ DORIS.getUsername(),
+ DORIS.getPassword());
+ Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+
+ validateSinkResult(
+ mysqlInventoryDatabase.getDatabaseName(),
+ "products",
+ 7,
+ Arrays.asList(
+ "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+ "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+ "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+ "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+ "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+ "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null",
+ "107 | rocks | box of assorted rocks | 5.3 | null | null | null",
+ "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
+ "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null"));
+
+ validateSinkResult(
+ mysqlInventoryDatabase.getDatabaseName(),
+ "customers",
+ 4,
+ Arrays.asList(
+ "101 | user_1 | Shanghai | 123567891234",
+ "102 | user_2 | Shanghai | 123567891234",
+ "103 | user_3 | Shanghai | 123567891234",
+ "104 | user_4 | Shanghai | 123567891234"));
+
+ LOG.info("Begin incremental reading stage.");
+ // generate binlogs
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(),
+ MYSQL.getDatabasePort(),
+ mysqlInventoryDatabase.getDatabaseName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+
+ stat.execute(
+ "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110
+
+ validateSinkResult(
+ mysqlInventoryDatabase.getDatabaseName(),
+ "products",
+ 7,
+ Arrays.asList(
+ "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+ "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+ "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+ "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+ "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+ "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null",
+ "107 | rocks | box of assorted rocks | 5.3 | null | null | null",
+ "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
+ "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null",
+ "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null"));
+
+ stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
+ stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+ // modify table schema
+ stat.execute("ALTER TABLE products DROP COLUMN point_c;");
+ stat.execute("DELETE FROM products WHERE id=101;");
+
+ stat.execute(
+ "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null);"); // 111
+ stat.execute(
+ "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null);"); // 112
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+ validateSinkResult(
+ mysqlInventoryDatabase.getDatabaseName(),
+ "products",
+ 7,
+ Arrays.asList(
+ "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | null",
+ "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | null",
+ "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | null",
+ "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | null",
+ "106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null",
+ "107 | rocks | box of assorted rocks | 5.1 | null | null | null",
+ "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
+ "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null",
+ "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null",
+ "111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null",
+ "112 | finally | null | 2.14 | null | null | null"));
+ }
+
+ public static void createDorisDatabase(String databaseName) {
+ try {
+ Container.ExecResult rs =
+ DORIS.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName));
+
+ if (rs.getExitCode() != 0) {
+ throw new RuntimeException("Failed to create database." + rs.getStderr());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create database.", e);
+ }
+ }
+
+ public static void dropDorisDatabase(String databaseName) {
+ try {
+ Container.ExecResult rs =
+ DORIS.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ String.format("-e DROP DATABASE IF EXISTS %s;", databaseName));
+
+ if (rs.getExitCode() != 0) {
+ throw new RuntimeException("Failed to drop database." + rs.getStderr());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to drop database.", e);
+ }
+ }
+
+ private void validateSinkResult(
+ String databaseName, String tableName, int columnCount, List expected)
+ throws Exception {
+ long startWaitingTimestamp = System.currentTimeMillis();
+ while (true) {
+ if (System.currentTimeMillis() - startWaitingTimestamp
+ > TESTCASE_TIMEOUT_SECONDS * 1000) {
+ throw new RuntimeException("Doris backend startup timed out.");
+ }
+ List results = new ArrayList<>();
+ try (Connection conn =
+ DriverManager.getConnection(
+ DORIS.getJdbcUrl(databaseName, DORIS.getUsername()));
+ Statement stat = conn.createStatement()) {
+ ResultSet rs =
+ stat.executeQuery(
+ String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName));
+
+ while (rs.next()) {
+ List columns = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ try {
+ columns.add(rs.getString(i));
+ } catch (SQLException ignored) {
+ // Column count could change after schema evolution
+ columns.add(null);
+ }
+ }
+ results.add(String.join(" | ", columns));
+ }
+
+ if (expected.size() == results.size()) {
+ assertEqualsInAnyOrder(expected, results);
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ } catch (SQLException e) {
+ LOG.info("Validate sink result failure, waiting for next turn...", e);
+ Thread.sleep(1000);
+ }
+ }
+ }
+
+ public static void assertEqualsInAnyOrder(List expected, List actual) {
+ assertTrue(expected != null && actual != null);
+ assertEqualsInOrder(
+ expected.stream().sorted().collect(Collectors.toList()),
+ actual.stream().sorted().collect(Collectors.toList()));
+ }
+
+ public static void assertEqualsInOrder(List expected, List actual) {
+ assertTrue(expected != null && actual != null);
+ assertEquals(expected.size(), actual.size());
+ assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
+ }
+}