diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml index 54ae076fc..ed166767c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml @@ -46,6 +46,49 @@ limitations under the License. flink-doris-connector-${flink.major.version} 1.6.0 + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + junit + junit + + + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + org.slf4j + slf4j-api + ${slf4j.version} + test + + + org.testcontainers + jdbc + 1.18.3 + test + + + mysql + mysql-connector-java + 8.0.26 + test + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java new file mode 100644 index 000000000..0d3c0d99c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; +import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; +import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; + +/** IT tests for {@link DorisMetadataApplier}. */ +@RunWith(Parameterized.class) +public class DorisMetadataApplierITCase extends DorisSinkTestBase { + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5; + + private final boolean batchMode; + + public DorisMetadataApplierITCase(boolean batchMode) { + this.batchMode = batchMode; + } + + @Parameters(name = "batchMode: {0}") + public static Iterable data() { + return Arrays.asList(true, false); + } + + @BeforeClass + public static void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @Before + public void initializeDatabase() { + createDatabase(DorisContainer.DORIS_DATABASE_NAME); + + // waiting for table to be created + DORIS_CONTAINER.waitForLog( + String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME); + } + + @After + public void destroyDatabase() { + dropDatabase(DorisContainer.DORIS_DATABASE_NAME); + // waiting for database to be created + DORIS_CONTAINER.waitForLog( + String.format( + ".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME); + } + + private List generateAddColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("extra_date", DataTypes.DATE(), null)))), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_bool", DataTypes.BOOLEAN(), null)))), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_decimal", + DataTypes.DECIMAL(17, 0), + null))))); + } + + private List generateDropColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new DropColumnEvent(tableId, Collections.singletonList("number"))); + } + + private List generateRenameColumnEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")), + new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae"))); + } + + private List generateAlterColumnTypeEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19)))); + } + + private List generateNarrowingAlterColumnTypeEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + // Double -> Float is a narrowing cast, should fail + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("number", DataTypes.FLOAT()))); + } + + @Test + public void testDorisDataTypes() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID")) + // Doris sink doesn't support BINARY type yet. + // .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary")) + // .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var + // Binary")) + .column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes")) + .column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean")) + .column(new PhysicalColumn("int", DataTypes.INT(), "Int")) + .column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int")) + .column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int")) + .column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float")) + .column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double")) + .column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char")) + .column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char")) + .column(new PhysicalColumn("string", DataTypes.STRING(), "String")) + .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal")) + .column(new PhysicalColumn("date", DataTypes.DATE(), "Date")) + // Doris sink doesn't support TIME type yet. + // .column(new PhysicalColumn("time", DataTypes.TIME(), "Time")) + // .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With + // Precision")) + .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp")) + .column( + new PhysicalColumn( + "timestamp_3", + DataTypes.TIMESTAMP(3), + "Timestamp With Precision")) + .column( + new PhysicalColumn( + "timestamptz", DataTypes.TIMESTAMP_TZ(), "TimestampTZ")) + .column( + new PhysicalColumn( + "timestamptz_3", + DataTypes.TIMESTAMP_TZ(3), + "TimestampTZ With Precision")) + .column( + new PhysicalColumn( + "timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ")) + .column( + new PhysicalColumn( + "timestampltz_3", + DataTypes.TIMESTAMP_LTZ(3), + "TimestampLTZ With Precision")) + .column( + new PhysicalColumn( + "arrayofint", + DataTypes.ARRAY(DataTypes.INT()), + "Array of Int")) + .column( + new PhysicalColumn( + "arrayofstr", + DataTypes.ARRAY(DataTypes.STRING()), + "Array of String")) + .column( + new PhysicalColumn( + "mapint2str", + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), + "Map Int to String")) + .primaryKey("id") + .build(); + + runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema))); + + List actual = inspectTableSchema(tableId); + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "bytes | TEXT | Yes | false | null", + "boolean | BOOLEAN | Yes | false | null", + "int | INT | Yes | false | null", + "tinyint | TINYINT | Yes | false | null", + "smallint | SMALLINT | Yes | false | null", + "float | FLOAT | Yes | false | null", + "double | DOUBLE | Yes | false | null", + "char | CHAR(51) | Yes | false | null", + "varchar | VARCHAR(51) | Yes | false | null", + "string | TEXT | Yes | false | null", + "decimal | DECIMAL(17, 7) | Yes | false | null", + "date | DATE | Yes | false | null", + "timestamp | DATETIME(6) | Yes | false | null", + "timestamp_3 | DATETIME(3) | Yes | false | null", + "timestamptz | DATETIME(6) | Yes | false | null", + "timestamptz_3 | DATETIME(3) | Yes | false | null", + "timestampltz | DATETIME(6) | Yes | false | null", + "timestampltz_3 | DATETIME(3) | Yes | false | null", + "arrayofint | TEXT | Yes | false | null", + "arrayofstr | TEXT | Yes | false | null", + "mapint2str | TEXT | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testDorisAddColumn() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateAddColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "number | DOUBLE | Yes | false | null", + "name | VARCHAR(51) | Yes | false | null", + "extra_date | DATE | Yes | false | null", + "extra_bool | BOOLEAN | Yes | false | null", + "extra_decimal | DECIMAL(17, 0) | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testDorisDropColumn() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateDropColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", "name | VARCHAR(51) | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + public void testDorisRenameColumn() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateRenameColumnEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "kazu | DOUBLE | Yes | false | null", + "namae | VARCHAR(51) | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test + @Ignore("AlterColumnType is yet to be supported until we close FLINK-35072.") + public void testDorisAlterColumnType() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateAlterColumnTypeEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "number | DOUBLE | Yes | false | null", + "name | VARCHAR(57) | Yes | false | null"); + + assertEqualsInOrder(expected, actual); + } + + @Test(expected = JobExecutionException.class) + public void testDorisNarrowingAlterColumnType() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId)); + } + + private void runJobWithEvents(List events) throws Exception { + DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class)); + + Configuration config = + new Configuration() + .set(FENODES, DORIS_CONTAINER.getFeNodes()) + .set(BENODES, DORIS_CONTAINER.getBeNodes()) + .set(USERNAME, DorisContainer.DORIS_USERNAME) + .set(PASSWORD, DorisContainer.DORIS_PASSWORD) + .set(SINK_ENABLE_BATCH_MODE, batchMode) + .set(SINK_ENABLE_DELETE, true); + + config.addAll( + Configuration.fromMap( + Collections.singletonMap("table.create.properties.replication_num", "1"))); + + DataSink dorisSink = createDorisDataSink(config); + + SchemaOperatorTranslator schemaOperatorTranslator = + new SchemaOperatorTranslator( + SchemaChangeBehavior.EVOLVE, + "$$_schema_operator_$$", + DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + + OperatorIDGenerator schemaOperatorIDGenerator = + new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); + + stream = + schemaOperatorTranslator.translate( + stream, + DEFAULT_PARALLELISM, + dorisSink.getMetadataApplier(), + new ArrayList<>()); + + DataSinkTranslator sinkTranslator = new DataSinkTranslator(); + sinkTranslator.translate( + new SinkDef("doris", "Dummy Doris Sink", config), + stream, + dorisSink, + schemaOperatorIDGenerator.generate()); + + env.execute("Doris Schema Evolution Test"); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java new file mode 100644 index 000000000..bb209a793 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; + +/** IT tests for {@link DorisDataSink}. */ +public class DorisPipelineITCase extends DorisSinkTestBase { + + private static final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5; + + @BeforeClass + public static void before() { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @Before + public void initializeDatabaseAndTable() { + createDatabase(DorisContainer.DORIS_DATABASE_NAME); + + // waiting for table to be created + DORIS_CONTAINER.waitForLog( + String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME); + + createTable( + DorisContainer.DORIS_DATABASE_NAME, + DorisContainer.DORIS_TABLE_NAME, + "id", + Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)")); + + // waiting for table to be created + DORIS_CONTAINER.waitForLog( + String.format( + ".*successfully create table\\[%s;.*\\s", DorisContainer.DORIS_TABLE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Table {} created.", DorisContainer.DORIS_TABLE_NAME); + } + + @After + public void destroyDatabaseAndTable() { + dropTable(DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + // waiting for table to be dropped + DORIS_CONTAINER.waitForLog( + String.format( + ".*finished dropping table: %s.*\\s", DorisContainer.DORIS_TABLE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Table {} destroyed.", DorisContainer.DORIS_TABLE_NAME); + + dropDatabase(DorisContainer.DORIS_DATABASE_NAME); + // waiting for database to be created + DORIS_CONTAINER.waitForLog( + String.format( + ".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME), + 1, + DATABASE_OPERATION_TIMEOUT_SECONDS); + + LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME); + } + + @Test + public void testDorisSinkStreamJob() throws Exception { + runValuesToDorisJob(false); + } + + @Test + public void testDorisSinkBatchJob() throws Exception { + runValuesToDorisJob(true); + } + + private List generateEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17))); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")})), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 19, 2.718, BinaryStringData.fromString("Que Sera Sera") + })), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 21, 1.732, BinaryStringData.fromString("Disenchanted") + })), + DataChangeEvent.updateEvent( + tableId, + generator.generate( + new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")}), + generator.generate( + new Object[] {17, 6.28, BinaryStringData.fromString("Doris Day")})), + DataChangeEvent.deleteEvent( + tableId, + generator.generate( + new Object[] { + 19, 2.718, BinaryStringData.fromString("Que Sera Sera") + }))); + } + + private void runValuesToDorisJob(boolean batchMode) throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + DataStream stream = + env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class)); + + Configuration config = + new Configuration() + .set(FENODES, DORIS_CONTAINER.getFeNodes()) + .set(BENODES, DORIS_CONTAINER.getBeNodes()) + .set(USERNAME, DorisContainer.DORIS_USERNAME) + .set(PASSWORD, DorisContainer.DORIS_PASSWORD) + .set(SINK_ENABLE_BATCH_MODE, batchMode) + .set(SINK_ENABLE_DELETE, true); + + config.addAll( + Configuration.fromMap( + Collections.singletonMap("table.create.properties.replication_num", "1"))); + + Sink dorisSink = + ((FlinkSinkProvider) createDorisDataSink(config).getEventSinkProvider()).getSink(); + + stream.sinkTo(dorisSink); + + env.execute("Values to Doris Sink"); + + List actual = fetchTableContent(tableId, 3); + + List expected = Arrays.asList("17 | 6.28 | Doris Day", "21 | 1.732 | Disenchanted"); + + assertEqualsInAnyOrder(expected, actual); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java new file mode 100644 index 000000000..aa63c336b --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisContainer.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink.utils; + +import org.junit.ClassRule; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; + +/** Docker container for Doris. */ +public class DorisContainer extends JdbcDatabaseContainer { + + private static final String DOCKER_IMAGE_NAME = "apache/doris:doris-all-in-one-2.1.0"; + + public static final int FE_INNER_PORT = 8030; + public static final int BE_INNER_PORT = 8040; + public static final int DB_INNER_PORT = 9030; + + @ClassRule public static final Network NETWORK = Network.newNetwork(); + + public String getFeNodes() { + return String.format("%s:%d", getHost(), getMappedPort(FE_INNER_PORT)); + } + + public String getBeNodes() { + return String.format("%s:%d", getHost(), getMappedPort(BE_INNER_PORT)); + } + + public String getTableIdentifier() { + return String.format("%s.%s", DORIS_DATABASE_NAME, DORIS_TABLE_NAME); + } + + public static final String DORIS_DATABASE_NAME = "doris_database"; + public static final String DORIS_TABLE_NAME = "fallen_angel"; + public static final String DORIS_USERNAME = "root"; + public static final String DORIS_PASSWORD = ""; + + public DorisContainer() { + super(DockerImageName.parse(DOCKER_IMAGE_NAME)); + setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT)); + setNetwork(NETWORK); + } + + public DorisContainer(Network network) { + super(DockerImageName.parse(DOCKER_IMAGE_NAME)); + setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT)); + setNetwork(network); + } + + public void waitForLog(String regex, int count, int timeoutSeconds) { + new LogMessageWaitStrategy() + .withRegEx(regex) + .withTimes(count) + .withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) + .waitUntilReady(this); + } + + @Override + public String getDriverClassName() { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + return "com.mysql.cj.jdbc.Driver"; + } catch (ClassNotFoundException e) { + return "com.mysql.jdbc.Driver"; + } + } + + @Override + public String getJdbcUrl() { + return getJdbcUrl(""); + } + + public String getJdbcUrl(String databaseName) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + getHost() + + ":" + + getMappedPort(DB_INNER_PORT) + + "/" + + databaseName + + additionalUrlParams; + } + + public String getJdbcUrl(String databaseName, String username) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + username + + "@" + + getHost() + + ":" + + getMappedPort(DB_INNER_PORT) + + "/" + + databaseName + + additionalUrlParams; + } + + public String getJdbcUrl(String databaseName, String username, String password) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + username + + ":" + + password + + "@" + + getHost() + + ":" + + getMappedPort(DB_INNER_PORT) + + "/" + + databaseName + + additionalUrlParams; + } + + @Override + public String getUsername() { + return DORIS_USERNAME; + } + + @Override + public String getPassword() { + return DORIS_PASSWORD; + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java new file mode 100644 index 000000000..bad46a8df --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/utils/DorisSinkTestBase.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink.utils; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.connectors.doris.factory.DorisDataSinkFactory; +import org.apache.flink.cdc.connectors.doris.sink.DorisDataSink; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.lifecycle.Startables; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Basic class for testing {@link DorisDataSink}. */ +public class DorisSinkTestBase extends TestLogger { + + protected static final Logger LOG = LoggerFactory.getLogger(DorisSinkTestBase.class); + + protected static final int DEFAULT_PARALLELISM = 1; + protected static final DorisContainer DORIS_CONTAINER = createDorisContainer(); + + public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + + private static DorisContainer createDorisContainer() { + return new DorisContainer(); + } + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(DORIS_CONTAINER)).join(); + LOG.info("Waiting for backends to be available"); + long startWaitingTimestamp = System.currentTimeMillis(); + + new LogMessageWaitStrategy() + .withRegEx(".*get heartbeat from FE.*\\s") + .withTimes(1) + .withStartupTimeout( + Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS)) + .waitUntilReady(DORIS_CONTAINER); + + while (!checkBackendAvailability()) { + try { + if (System.currentTimeMillis() - startWaitingTimestamp + > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("Doris backend startup timed out."); + } + LOG.info("Waiting for backends to be available"); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // ignore and check next round + } + } + + LOG.info("Containers are started."); + } + + @AfterClass + public static void stopContainers() { + LOG.info("Stopping containers..."); + DORIS_CONTAINER.stop(); + LOG.info("Containers are stopped."); + } + + static class MockContext implements Factory.Context { + + Configuration factoryConfiguration; + + public MockContext(Configuration factoryConfiguration) { + this.factoryConfiguration = factoryConfiguration; + } + + @Override + public Configuration getFactoryConfiguration() { + return factoryConfiguration; + } + + @Override + public Configuration getPipelineConfiguration() { + return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC")); + } + + @Override + public ClassLoader getClassLoader() { + return this.getClassLoader(); + } + } + + public static DataSink createDorisDataSink(Configuration factoryConfiguration) { + DorisDataSinkFactory factory = new DorisDataSinkFactory(); + return factory.createDataSink(new MockContext(factoryConfiguration)); + } + + public static boolean checkBackendAvailability() { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + "-e SHOW BACKENDS\\G"); + + if (rs.getExitCode() != 0) { + return false; + } + String output = rs.getStdout(); + LOG.info("Doris backend status:\n{}", output); + return output.contains("*************************** 1. row ***************************") + && !output.contains("AvailCapacity: 1.000 B"); + } catch (Exception e) { + LOG.info("Failed to check backend status.", e); + return false; + } + } + + public static void createDatabase(String databaseName) { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to create database." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create database.", e); + } + } + + public static void createTable( + String databaseName, String tableName, String primaryKey, List schema) { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format( + "-e CREATE TABLE `%s`.`%s` (%s) UNIQUE KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");", + databaseName, + tableName, + String.join(", ", schema), + primaryKey, + primaryKey)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to create table." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create table.", e); + } + } + + public static void dropDatabase(String databaseName) { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format("-e DROP DATABASE IF EXISTS %s;", databaseName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to drop database." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to drop database.", e); + } + } + + public static void dropTable(String databaseName, String tableName) { + try { + Container.ExecResult rs = + DORIS_CONTAINER.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format( + "-e DROP TABLE IF EXISTS %s.%s;", databaseName, tableName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to drop table." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to drop table.", e); + } + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + public List inspectTableSchema(TableId tableId) throws SQLException { + List results = new ArrayList<>(); + ResultSet rs = + DORIS_CONTAINER + .createConnection("") + .createStatement() + .executeQuery( + String.format( + "DESCRIBE `%s`.`%s`", + tableId.getSchemaName(), tableId.getTableName())); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= 5; i++) { + columns.add(rs.getString(i)); + } + results.add(String.join(" | ", columns)); + } + return results; + } + + public List fetchTableContent(TableId tableId, int columnCount) throws SQLException { + List results = new ArrayList<>(); + ResultSet rs = + DORIS_CONTAINER + .createConnection("") + .createStatement() + .executeQuery( + String.format( + "SELECT * FROM %s.%s", + tableId.getSchemaName(), tableId.getTableName())); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + columns.add(rs.getString(i)); + } + results.add(String.join(" | ", columns)); + } + return results; + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } + + public static void assertMapEquals(Map expected, Map actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + for (String key : expected.keySet()) { + assertEquals(expected.get(key), actual.get(key)); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..f0d32fb59 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/resources/log4j2-test.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java new file mode 100644 index 000000000..39997cdb6 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.lifecycle.Startables; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** End-to-end tests for mysql cdc to Doris pipeline job. */ +@RunWith(Parameterized.class) +public class MySqlToDorisE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MySqlToDorisE2eITCase.class); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + public static final int TESTCASE_TIMEOUT_SECONDS = 60; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases("mysql") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @ClassRule + public static final DorisContainer DORIS = + new DorisContainer(NETWORK) + .withNetworkAliases("doris") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase mysqlInventoryDatabase = + new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @BeforeClass + public static void initializeContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL)).join(); + Startables.deepStart(Stream.of(DORIS)).join(); + LOG.info("Waiting for backends to be available"); + long startWaitingTimestamp = System.currentTimeMillis(); + + new LogMessageWaitStrategy() + .withRegEx(".*get heartbeat from FE.*") + .withTimes(1) + .withStartupTimeout( + Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS)) + .waitUntilReady(DORIS); + + while (!checkBackendAvailability()) { + try { + if (System.currentTimeMillis() - startWaitingTimestamp + > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("Doris backend startup timed out."); + } + LOG.info("Waiting for backends to be available"); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // ignore and check next round + } + } + LOG.info("Containers are started."); + } + + @Before + public void before() throws Exception { + super.before(); + mysqlInventoryDatabase.createAndInitialize(); + createDorisDatabase(mysqlInventoryDatabase.getDatabaseName()); + } + + private static boolean checkBackendAvailability() { + try { + Container.ExecResult rs = + DORIS.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + "-e SHOW BACKENDS\\G"); + + if (rs.getExitCode() != 0) { + return false; + } + String output = rs.getStdout(); + LOG.info("Doris backend status:\n{}", output); + return output.contains("*************************** 1. row ***************************") + && !output.contains("AvailCapacity: 1.000 B"); + } catch (Exception e) { + LOG.info("Failed to check backend status.", e); + return false; + } + } + + @After + public void after() { + super.after(); + mysqlInventoryDatabase.dropDatabase(); + dropDorisDatabase(mysqlInventoryDatabase.getDatabaseName()); + } + + @Test + public void testSyncWholeDatabase() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: mysql\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: doris\n" + + " fenodes: doris:8030\n" + + " benodes: doris:8040\n" + + " username: %s\n" + + " password: \"%s\"\n" + + " table.create.properties.replication_num: 1\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1", + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + DORIS.getUsername(), + DORIS.getPassword()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null")); + + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "customers", + 4, + Arrays.asList( + "101 | user_1 | Shanghai | 123567891234", + "102 | user_2 | Shanghai | 123567891234", + "103 | user_3 | Shanghai | 123567891234", + "104 | user_4 | Shanghai | 123567891234")); + + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110 + + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null")); + + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // modify table schema + stat.execute("ALTER TABLE products DROP COLUMN point_c;"); + stat.execute("DELETE FROM products WHERE id=101;"); + + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null);"); // 111 + stat.execute( + "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null);"); // 112 + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Arrays.asList( + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | null", + "106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.1 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null", + "111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null", + "112 | finally | null | 2.14 | null | null | null")); + } + + public static void createDorisDatabase(String databaseName) { + try { + Container.ExecResult rs = + DORIS.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to create database." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create database.", e); + } + } + + public static void dropDorisDatabase(String databaseName) { + try { + Container.ExecResult rs = + DORIS.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + String.format("-e DROP DATABASE IF EXISTS %s;", databaseName)); + + if (rs.getExitCode() != 0) { + throw new RuntimeException("Failed to drop database." + rs.getStderr()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to drop database.", e); + } + } + + private void validateSinkResult( + String databaseName, String tableName, int columnCount, List expected) + throws Exception { + long startWaitingTimestamp = System.currentTimeMillis(); + while (true) { + if (System.currentTimeMillis() - startWaitingTimestamp + > TESTCASE_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("Doris backend startup timed out."); + } + List results = new ArrayList<>(); + try (Connection conn = + DriverManager.getConnection( + DORIS.getJdbcUrl(databaseName, DORIS.getUsername())); + Statement stat = conn.createStatement()) { + ResultSet rs = + stat.executeQuery( + String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName)); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + try { + columns.add(rs.getString(i)); + } catch (SQLException ignored) { + // Column count could change after schema evolution + columns.add(null); + } + } + results.add(String.join(" | ", columns)); + } + + if (expected.size() == results.size()) { + assertEqualsInAnyOrder(expected, results); + break; + } else { + Thread.sleep(1000); + } + } catch (SQLException e) { + LOG.info("Validate sink result failure, waiting for next turn...", e); + Thread.sleep(1000); + } + } + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } +}