diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
index ed166767c..9c3c92ecb 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
@@ -114,6 +114,18 @@ limitations under the License.
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ test-jar
+
+ test-jar
+
+
+
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml
index 342044827..e410ad227 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml
@@ -52,6 +52,24 @@ limitations under the License.
flink-cdc-composer
${project.version}
+
+ org.testcontainers
+ jdbc
+ 1.18.3
+ test
+
+
+ org.apache.flink
+ flink-test-utils-junit
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java
new file mode 100644
index 000000000..d029943e3
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.starrocks.shade.org.apache.commons.compress.utils;
+
+import java.util.ArrayList;
+
+/**
+ * Dummy class of shaded apache-commons since connector 1.2.9 depends on this, but not package it.
+ * This package should be removed after upgrading to 1.2.10 which will not use commons-compress
+ * anymore.
+ */
+public class Lists {
+ public static ArrayList newArrayList() {
+ return new ArrayList<>();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
new file mode 100644
index 000000000..c294dd423
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
+import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
+import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
+import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
+import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
+import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
+import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
+import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
+import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
+
+/** IT tests for {@link StarRocksMetadataApplier}. */
+public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase {
+ private static final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ @BeforeClass
+ public static void before() {
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(3000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ }
+
+ @Before
+ public void initializeDatabase() {
+ executeSql(
+ String.format(
+ "CREATE DATABASE IF NOT EXISTS `%s`;",
+ StarRocksContainer.STARROCKS_DATABASE_NAME));
+ LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
+ }
+
+ @After
+ public void destroyDatabase() {
+ executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME));
+ LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME);
+ }
+
+ private List generateAddColumnEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ new PhysicalColumn("extra_date", DataTypes.DATE(), null)))),
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ new PhysicalColumn(
+ "extra_bool", DataTypes.BOOLEAN(), null)))),
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ new PhysicalColumn(
+ "extra_decimal",
+ DataTypes.DECIMAL(17, 0),
+ null)))));
+ }
+
+ private List generateDropColumnEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ new DropColumnEvent(tableId, Collections.singletonList("number")));
+ }
+
+ private List generateRenameColumnEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")),
+ new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae")));
+ }
+
+ private List generateAlterColumnTypeEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ new AlterColumnTypeEvent(
+ tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19))));
+ }
+
+ private List generateNarrowingAlterColumnTypeEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ // Double -> Float is a narrowing cast, should fail
+ new AlterColumnTypeEvent(
+ tableId, Collections.singletonMap("number", DataTypes.FLOAT())));
+ }
+
+ @Test
+ public void testStarRocksDataType() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID"))
+ // StarRocks sink doesn't support BINARY and BYTES type yet.
+ // .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary"))
+ // .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var
+ // Binary"))
+ // .column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes"))
+ .column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean"))
+ .column(new PhysicalColumn("int", DataTypes.INT(), "Int"))
+ .column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int"))
+ .column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int"))
+ .column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float"))
+ .column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double"))
+ .column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char"))
+ .column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char"))
+ .column(new PhysicalColumn("string", DataTypes.STRING(), "String"))
+ .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal"))
+ .column(new PhysicalColumn("date", DataTypes.DATE(), "Date"))
+ // StarRocks sink doesn't support TIME type yet.
+ // .column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
+ // .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With
+ // Precision"))
+ .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp"))
+ .column(
+ new PhysicalColumn(
+ "timestamp_3",
+ DataTypes.TIMESTAMP(3),
+ "Timestamp With Precision"))
+ // StarRocks sink doesn't support TIMESTAMP with non-local TZ yet.
+ // .column(new PhysicalColumn("timestamptz", DataTypes.TIMESTAMP_TZ(),
+ // "TimestampTZ"))
+ // .column(new PhysicalColumn("timestamptz_3", DataTypes.TIMESTAMP_TZ(3),
+ // "TimestampTZ With Precision"))
+ .column(
+ new PhysicalColumn(
+ "timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ"))
+ .column(
+ new PhysicalColumn(
+ "timestampltz_3",
+ DataTypes.TIMESTAMP_LTZ(3),
+ "TimestampLTZ With Precision"))
+ .primaryKey("id")
+ .build();
+
+ runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema)));
+
+ List actual = inspectTableSchema(tableId);
+ List expected =
+ Arrays.asList(
+ "id | int | NO | true | null",
+ "boolean | boolean | YES | false | null",
+ "int | int | YES | false | null",
+ "tinyint | tinyint | YES | false | null",
+ "smallint | smallint | YES | false | null",
+ "float | float | YES | false | null",
+ "double | double | YES | false | null",
+ "char | char(51) | YES | false | null",
+ "varchar | varchar(51) | YES | false | null",
+ "string | varchar(1048576) | YES | false | null",
+ "decimal | decimal(17,7) | YES | false | null",
+ "date | date | YES | false | null",
+ "timestamp | datetime | YES | false | null",
+ "timestamp_3 | datetime | YES | false | null",
+ "timestampltz | datetime | YES | false | null",
+ "timestampltz_3 | datetime | YES | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test
+ public void testStarRocksAddColumn() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+
+ runJobWithEvents(generateAddColumnEvents(tableId));
+
+ List actual = inspectTableSchema(tableId);
+
+ List expected =
+ Arrays.asList(
+ "id | int | NO | true | null",
+ "number | double | YES | false | null",
+ "name | varchar(51) | YES | false | null",
+ "extra_date | date | YES | false | null",
+ "extra_bool | boolean | YES | false | null",
+ "extra_decimal | decimal(17,0) | YES | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test
+ public void testStarRocksDropColumn() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+
+ runJobWithEvents(generateDropColumnEvents(tableId));
+
+ List actual = inspectTableSchema(tableId);
+
+ List expected =
+ Arrays.asList(
+ "id | int | NO | true | null", "name | varchar(51) | YES | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test
+ @Ignore("Rename column is not supported currently.")
+ public void testStarRocksRenameColumn() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+
+ runJobWithEvents(generateRenameColumnEvents(tableId));
+
+ List actual = inspectTableSchema(tableId);
+
+ List expected =
+ Arrays.asList(
+ "id | int | NO | true | null",
+ "kazu | double | YES | false | null",
+ "namae | varchar(51) | YES | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test
+ @Ignore("Alter column type is not supported currently.")
+ public void testStarRocksAlterColumnType() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+
+ runJobWithEvents(generateAlterColumnTypeEvents(tableId));
+
+ List actual = inspectTableSchema(tableId);
+
+ List expected =
+ Arrays.asList(
+ "id | int | NO | true | null",
+ "number | double | YES | false | null",
+ "name | varchar(57) | YES | false | null");
+
+ assertEqualsInOrder(expected, actual);
+ }
+
+ @Test(expected = JobExecutionException.class)
+ @Ignore("Alter column type is not supported currently.")
+ public void testStarRocksNarrowingAlterColumnType() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+
+ runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId));
+ }
+
+ private void runJobWithEvents(List events) throws Exception {
+ DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class));
+
+ Configuration config =
+ new Configuration()
+ .set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl())
+ .set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl())
+ .set(USERNAME, StarRocksContainer.STARROCKS_USERNAME)
+ .set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
+
+ DataSink starRocksSink = createStarRocksDataSink(config);
+
+ SchemaOperatorTranslator schemaOperatorTranslator =
+ new SchemaOperatorTranslator(
+ SchemaChangeBehavior.EVOLVE,
+ "$$_schema_operator_$$",
+ DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
+
+ OperatorIDGenerator schemaOperatorIDGenerator =
+ new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
+
+ stream =
+ schemaOperatorTranslator.translate(
+ stream,
+ DEFAULT_PARALLELISM,
+ starRocksSink.getMetadataApplier(),
+ new ArrayList<>());
+
+ DataSinkTranslator sinkTranslator = new DataSinkTranslator();
+ sinkTranslator.translate(
+ new SinkDef("starrocks", "Dummy StarRocks Sink", config),
+ stream,
+ starRocksSink,
+ schemaOperatorIDGenerator.generate());
+
+ env.execute("StarRocks Schema Evolution Test");
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java
new file mode 100644
index 000000000..43c1faaac
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksPipelineITCase.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
+import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
+import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
+import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
+import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
+
+/** IT tests for {@link StarRocksDataSink}. */
+public class StarRocksPipelineITCase extends StarRocksSinkTestBase {
+ private static final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ @BeforeClass
+ public static void before() {
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(3000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ }
+
+ @Before
+ public void initializeDatabaseAndTable() {
+ executeSql(
+ String.format(
+ "CREATE DATABASE IF NOT EXISTS `%s`;",
+ StarRocksContainer.STARROCKS_DATABASE_NAME));
+
+ LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
+
+ List schema = Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)");
+
+ executeSql(
+ String.format(
+ "CREATE TABLE `%s`.`%s` (%s) PRIMARY KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");",
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME,
+ String.join(", ", schema),
+ "id",
+ "id"));
+
+ LOG.info("Table {} created.", StarRocksContainer.STARROCKS_TABLE_NAME);
+ }
+
+ @After
+ public void destroyDatabaseAndTable() {
+
+ executeSql(
+ String.format(
+ "DROP TABLE %s.%s;",
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME));
+
+ LOG.info("Table {} destroyed.", StarRocksContainer.STARROCKS_TABLE_NAME);
+
+ executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME));
+
+ LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME);
+ }
+
+ private List generateEvents(TableId tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(
+ RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
+
+ return Arrays.asList(
+ new CreateTableEvent(tableId, schema),
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")})),
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 19, 2.718, BinaryStringData.fromString("Que Sera Sera")
+ })),
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 21, 1.732, BinaryStringData.fromString("Disenchanted")
+ })),
+ DataChangeEvent.deleteEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 19, 2.718, BinaryStringData.fromString("Que Sera Sera")
+ })),
+ DataChangeEvent.updateEvent(
+ tableId,
+ generator.generate(
+ new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")}),
+ generator.generate(
+ new Object[] {
+ 17, 6.28, BinaryStringData.fromString("StarRocks")
+ })));
+ }
+
+ @Test
+ public void testValuesToStarRocks() throws Exception {
+ TableId tableId =
+ TableId.tableId(
+ StarRocksContainer.STARROCKS_DATABASE_NAME,
+ StarRocksContainer.STARROCKS_TABLE_NAME);
+ DataStream stream =
+ env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class));
+
+ Configuration config =
+ new Configuration()
+ .set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl())
+ .set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl())
+ .set(USERNAME, StarRocksContainer.STARROCKS_USERNAME)
+ .set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
+
+ Sink starRocksSink =
+ ((FlinkSinkProvider) createStarRocksDataSink(config).getEventSinkProvider())
+ .getSink();
+ stream.sinkTo(starRocksSink);
+
+ env.execute("Values to StarRocks Sink");
+
+ List actual = fetchTableContent(tableId, 3);
+ List expected = Arrays.asList("17 | 6.28 | StarRocks", "21 | 1.732 | Disenchanted");
+
+ assertEqualsInAnyOrder(expected, actual);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java
new file mode 100644
index 000000000..8bba7053e
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksContainer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink.utils;
+
+import org.junit.ClassRule;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Docker container for StarRocks. */
+public class StarRocksContainer extends JdbcDatabaseContainer {
+
+ private static final String DOCKER_IMAGE_NAME = "starrocks/allin1-ubuntu:3.2.6";
+
+ // exposed ports
+ public static final int FE_HTTP_SERVICE_PORT = 8080;
+ public static final int FE_QUERY_PORT = 9030;
+
+ public static final String STARROCKS_DATABASE_NAME = "starrocks_database";
+ public static final String STARROCKS_TABLE_NAME = "fallen_angel";
+ public static final String STARROCKS_USERNAME = "root";
+ public static final String STARROCKS_PASSWORD = "";
+
+ @ClassRule public static final Network NETWORK = Network.newNetwork();
+
+ public StarRocksContainer() {
+ super(DockerImageName.parse(DOCKER_IMAGE_NAME));
+ setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT));
+ setNetwork(NETWORK);
+ }
+
+ public StarRocksContainer(Network network) {
+ super(DockerImageName.parse(DOCKER_IMAGE_NAME));
+ setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT));
+ setNetwork(network);
+ }
+
+ public List getLoadUrl() {
+ return Collections.singletonList(
+ String.format("%s:%d", getHost(), getMappedPort(FE_HTTP_SERVICE_PORT)));
+ }
+
+ public void waitForLog(String regex, int count, int timeoutSeconds) {
+ new LogMessageWaitStrategy()
+ .withRegEx(regex)
+ .withTimes(count)
+ .withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
+ .waitUntilReady(this);
+ }
+
+ @Override
+ public String getDriverClassName() {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ return "com.mysql.cj.jdbc.Driver";
+ } catch (ClassNotFoundException e) {
+ return "com.mysql.jdbc.Driver";
+ }
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return getJdbcUrl("");
+ }
+
+ public String getJdbcUrl(String databaseName) {
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return "jdbc:mysql://"
+ + getHost()
+ + ":"
+ + getMappedPort(FE_QUERY_PORT)
+ + "/"
+ + databaseName
+ + additionalUrlParams;
+ }
+
+ @Override
+ public String getUsername() {
+ return STARROCKS_USERNAME;
+ }
+
+ @Override
+ public String getPassword() {
+ return STARROCKS_PASSWORD;
+ }
+
+ @Override
+ protected String getTestQueryString() {
+ return "SELECT 1";
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
new file mode 100644
index 000000000..4980c603b
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink.utils;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.factories.Factory;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSink;
+import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Basic class for testing {@link StarRocksDataSink}. */
+public class StarRocksSinkTestBase extends TestLogger {
+ protected static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkTestBase.class);
+
+ protected static final int DEFAULT_PARALLELISM = 1;
+
+ protected static final StarRocksContainer STARROCKS_CONTAINER = createStarRocksContainer();
+
+ public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+
+ private static StarRocksContainer createStarRocksContainer() {
+ return new StarRocksContainer();
+ }
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
+
+ @BeforeClass
+ public static void startContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(STARROCKS_CONTAINER)).join();
+ LOG.info("Waiting for StarRocks to launch");
+
+ long startWaitingTimestamp = System.currentTimeMillis();
+
+ new LogMessageWaitStrategy()
+ .withRegEx(".*Enjoy the journal to StarRocks blazing-fast lake-house engine!.*\\s")
+ .withTimes(1)
+ .withStartupTimeout(
+ Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
+ .waitUntilReady(STARROCKS_CONTAINER);
+
+ while (!checkBackendAvailability()) {
+ try {
+ if (System.currentTimeMillis() - startWaitingTimestamp
+ > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
+ throw new RuntimeException("StarRocks backend startup timed out.");
+ }
+ LOG.info("Waiting for backends to be available");
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ // ignore and check next round
+ }
+ }
+ LOG.info("Containers are started.");
+ }
+
+ @AfterClass
+ public static void stopContainers() {
+ LOG.info("Stopping containers...");
+ STARROCKS_CONTAINER.stop();
+ LOG.info("Containers are stopped.");
+ }
+
+ static class MockContext implements Factory.Context {
+
+ Configuration factoryConfiguration;
+
+ public MockContext(Configuration factoryConfiguration) {
+ this.factoryConfiguration = factoryConfiguration;
+ }
+
+ @Override
+ public Configuration getFactoryConfiguration() {
+ return factoryConfiguration;
+ }
+
+ @Override
+ public Configuration getPipelineConfiguration() {
+ return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC"));
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return null;
+ }
+ }
+
+ public static DataSink createStarRocksDataSink(Configuration factoryConfiguration) {
+ StarRocksDataSinkFactory factory = new StarRocksDataSinkFactory();
+ return factory.createDataSink(new MockContext(factoryConfiguration));
+ }
+
+ public static void executeSql(String sql) {
+ try {
+ Container.ExecResult rs =
+ STARROCKS_CONTAINER.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ "-e " + sql);
+
+ if (rs.getExitCode() != 0) {
+ throw new RuntimeException("Failed to execute SQL." + rs.getStderr());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to execute SQL.", e);
+ }
+ }
+
+ public static boolean checkBackendAvailability() {
+ try {
+ Container.ExecResult rs =
+ STARROCKS_CONTAINER.execInContainer(
+ "mysql",
+ "--protocol=TCP",
+ "-uroot",
+ "-P9030",
+ "-h127.0.0.1",
+ "-e SHOW BACKENDS\\G");
+
+ if (rs.getExitCode() != 0) {
+ return false;
+ }
+ return rs.getStdout()
+ .contains("*************************** 1. row ***************************");
+ } catch (Exception e) {
+ LOG.info("Failed to check backend status.", e);
+ return false;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+
+ public List inspectTableSchema(TableId tableId) throws SQLException {
+ List results = new ArrayList<>();
+ ResultSet rs =
+ STARROCKS_CONTAINER
+ .createConnection("")
+ .createStatement()
+ .executeQuery(
+ String.format(
+ "DESCRIBE `%s`.`%s`",
+ tableId.getSchemaName(), tableId.getTableName()));
+
+ while (rs.next()) {
+ List columns = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ columns.add(rs.getString(i));
+ }
+ results.add(String.join(" | ", columns));
+ }
+ return results;
+ }
+
+ public List fetchTableContent(TableId tableId, int columnCount) throws SQLException {
+ List results = new ArrayList<>();
+ ResultSet rs =
+ STARROCKS_CONTAINER
+ .createConnection("")
+ .createStatement()
+ .executeQuery(
+ String.format(
+ "SELECT * FROM %s.%s",
+ tableId.getSchemaName(), tableId.getTableName()));
+
+ while (rs.next()) {
+ List columns = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ columns.add(rs.getString(i));
+ }
+ results.add(String.join(" | ", columns));
+ }
+ return results;
+ }
+
+ public static void assertEqualsInAnyOrder(List expected, List actual) {
+ assertTrue(expected != null && actual != null);
+ assertEqualsInOrder(
+ expected.stream().sorted().collect(Collectors.toList()),
+ actual.stream().sorted().collect(Collectors.toList()));
+ }
+
+ public static void assertEqualsInOrder(List expected, List actual) {
+ assertTrue(expected != null && actual != null);
+ assertEquals(expected.size(), actual.size());
+ assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
+ }
+
+ public static void assertMapEquals(Map expected, Map actual) {
+ assertTrue(expected != null && actual != null);
+ assertEquals(expected.size(), actual.size());
+ for (String key : expected.keySet()) {
+ assertEquals(expected.get(key), actual.get(key));
+ }
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 2326240b6..3b3e60353 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -85,6 +85,12 @@ limitations under the License.
test-jar
test
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-doris
+ ${project.version}
+ test
+
org.apache.flink
flink-cdc-pipeline-connector-doris
@@ -96,7 +102,6 @@ limitations under the License.
org.apache.flink
flink-cdc-pipeline-connector-starrocks
${project.version}
- test-jar
test