[FLINK-35120][doris] Add Doris integration test cases

pull/3402/head
yuxiqian 8 months ago committed by Leonard Xu
parent 23e6352398
commit a74fb376d7

@ -46,6 +46,49 @@ limitations under the License.
<artifactId>flink-doris-connector-${flink.major.version}</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

@ -0,0 +1,441 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.doris.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
/** IT tests for {@link DorisMetadataApplier}. */
@RunWith(Parameterized.class)
public class DorisMetadataApplierITCase extends DorisSinkTestBase {
private static final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5;
private final boolean batchMode;
public DorisMetadataApplierITCase(boolean batchMode) {
this.batchMode = batchMode;
}
@Parameters(name = "batchMode: {0}")
public static Iterable<?> data() {
return Arrays.asList(true, false);
}
@BeforeClass
public static void before() {
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(3000);
env.setRestartStrategy(RestartStrategies.noRestart());
}
@Before
public void initializeDatabase() {
createDatabase(DorisContainer.DORIS_DATABASE_NAME);
// waiting for table to be created
DORIS_CONTAINER.waitForLog(
String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME),
1,
DATABASE_OPERATION_TIMEOUT_SECONDS);
LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME);
}
@After
public void destroyDatabase() {
dropDatabase(DorisContainer.DORIS_DATABASE_NAME);
// waiting for database to be created
DORIS_CONTAINER.waitForLog(
String.format(
".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME),
1,
DATABASE_OPERATION_TIMEOUT_SECONDS);
LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME);
}
private List<Event> generateAddColumnEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("extra_date", DataTypes.DATE(), null)))),
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
"extra_bool", DataTypes.BOOLEAN(), null)))),
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
"extra_decimal",
DataTypes.DECIMAL(17, 0),
null)))));
}
private List<Event> generateDropColumnEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
new DropColumnEvent(tableId, Collections.singletonList("number")));
}
private List<Event> generateRenameColumnEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")),
new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae")));
}
private List<Event> generateAlterColumnTypeEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19))));
}
private List<Event> generateNarrowingAlterColumnTypeEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
// Double -> Float is a narrowing cast, should fail
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("number", DataTypes.FLOAT())));
}
@Test
public void testDorisDataTypes() throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID"))
// Doris sink doesn't support BINARY type yet.
// .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary"))
// .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var
// Binary"))
.column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes"))
.column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean"))
.column(new PhysicalColumn("int", DataTypes.INT(), "Int"))
.column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int"))
.column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int"))
.column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float"))
.column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double"))
.column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char"))
.column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char"))
.column(new PhysicalColumn("string", DataTypes.STRING(), "String"))
.column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal"))
.column(new PhysicalColumn("date", DataTypes.DATE(), "Date"))
// Doris sink doesn't support TIME type yet.
// .column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
// .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With
// Precision"))
.column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp"))
.column(
new PhysicalColumn(
"timestamp_3",
DataTypes.TIMESTAMP(3),
"Timestamp With Precision"))
.column(
new PhysicalColumn(
"timestamptz", DataTypes.TIMESTAMP_TZ(), "TimestampTZ"))
.column(
new PhysicalColumn(
"timestamptz_3",
DataTypes.TIMESTAMP_TZ(3),
"TimestampTZ With Precision"))
.column(
new PhysicalColumn(
"timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ"))
.column(
new PhysicalColumn(
"timestampltz_3",
DataTypes.TIMESTAMP_LTZ(3),
"TimestampLTZ With Precision"))
.column(
new PhysicalColumn(
"arrayofint",
DataTypes.ARRAY(DataTypes.INT()),
"Array of Int"))
.column(
new PhysicalColumn(
"arrayofstr",
DataTypes.ARRAY(DataTypes.STRING()),
"Array of String"))
.column(
new PhysicalColumn(
"mapint2str",
DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
"Map Int to String"))
.primaryKey("id")
.build();
runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema)));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | INT | Yes | true | null",
"bytes | TEXT | Yes | false | null",
"boolean | BOOLEAN | Yes | false | null",
"int | INT | Yes | false | null",
"tinyint | TINYINT | Yes | false | null",
"smallint | SMALLINT | Yes | false | null",
"float | FLOAT | Yes | false | null",
"double | DOUBLE | Yes | false | null",
"char | CHAR(51) | Yes | false | null",
"varchar | VARCHAR(51) | Yes | false | null",
"string | TEXT | Yes | false | null",
"decimal | DECIMAL(17, 7) | Yes | false | null",
"date | DATE | Yes | false | null",
"timestamp | DATETIME(6) | Yes | false | null",
"timestamp_3 | DATETIME(3) | Yes | false | null",
"timestamptz | DATETIME(6) | Yes | false | null",
"timestamptz_3 | DATETIME(3) | Yes | false | null",
"timestampltz | DATETIME(6) | Yes | false | null",
"timestampltz_3 | DATETIME(3) | Yes | false | null",
"arrayofint | TEXT | Yes | false | null",
"arrayofstr | TEXT | Yes | false | null",
"mapint2str | TEXT | Yes | false | null");
assertEqualsInOrder(expected, actual);
}
@Test
public void testDorisAddColumn() throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
runJobWithEvents(generateAddColumnEvents(tableId));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | INT | Yes | true | null",
"number | DOUBLE | Yes | false | null",
"name | VARCHAR(51) | Yes | false | null",
"extra_date | DATE | Yes | false | null",
"extra_bool | BOOLEAN | Yes | false | null",
"extra_decimal | DECIMAL(17, 0) | Yes | false | null");
assertEqualsInOrder(expected, actual);
}
@Test
public void testDorisDropColumn() throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
runJobWithEvents(generateDropColumnEvents(tableId));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | INT | Yes | true | null", "name | VARCHAR(51) | Yes | false | null");
assertEqualsInOrder(expected, actual);
}
@Test
public void testDorisRenameColumn() throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
runJobWithEvents(generateRenameColumnEvents(tableId));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | INT | Yes | true | null",
"kazu | DOUBLE | Yes | false | null",
"namae | VARCHAR(51) | Yes | false | null");
assertEqualsInOrder(expected, actual);
}
@Test
@Ignore("AlterColumnType is yet to be supported until we close FLINK-35072.")
public void testDorisAlterColumnType() throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
runJobWithEvents(generateAlterColumnTypeEvents(tableId));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | INT | Yes | true | null",
"number | DOUBLE | Yes | false | null",
"name | VARCHAR(57) | Yes | false | null");
assertEqualsInOrder(expected, actual);
}
@Test(expected = JobExecutionException.class)
public void testDorisNarrowingAlterColumnType() throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId));
}
private void runJobWithEvents(List<Event> events) throws Exception {
DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class));
Configuration config =
new Configuration()
.set(FENODES, DORIS_CONTAINER.getFeNodes())
.set(BENODES, DORIS_CONTAINER.getBeNodes())
.set(USERNAME, DorisContainer.DORIS_USERNAME)
.set(PASSWORD, DorisContainer.DORIS_PASSWORD)
.set(SINK_ENABLE_BATCH_MODE, batchMode)
.set(SINK_ENABLE_DELETE, true);
config.addAll(
Configuration.fromMap(
Collections.singletonMap("table.create.properties.replication_num", "1")));
DataSink dorisSink = createDorisDataSink(config);
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
SchemaChangeBehavior.EVOLVE,
"$$_schema_operator_$$",
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
stream =
schemaOperatorTranslator.translate(
stream,
DEFAULT_PARALLELISM,
dorisSink.getMetadataApplier(),
new ArrayList<>());
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
sinkTranslator.translate(
new SinkDef("doris", "Dummy Doris Sink", config),
stream,
dorisSink,
schemaOperatorIDGenerator.generate());
env.execute("Doris Schema Evolution Test");
}
}

@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.doris.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
/** IT tests for {@link DorisDataSink}. */
public class DorisPipelineITCase extends DorisSinkTestBase {
private static final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5;
@BeforeClass
public static void before() {
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(3000);
env.setRestartStrategy(RestartStrategies.noRestart());
}
@Before
public void initializeDatabaseAndTable() {
createDatabase(DorisContainer.DORIS_DATABASE_NAME);
// waiting for table to be created
DORIS_CONTAINER.waitForLog(
String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME),
1,
DATABASE_OPERATION_TIMEOUT_SECONDS);
LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME);
createTable(
DorisContainer.DORIS_DATABASE_NAME,
DorisContainer.DORIS_TABLE_NAME,
"id",
Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)"));
// waiting for table to be created
DORIS_CONTAINER.waitForLog(
String.format(
".*successfully create table\\[%s;.*\\s", DorisContainer.DORIS_TABLE_NAME),
1,
DATABASE_OPERATION_TIMEOUT_SECONDS);
LOG.info("Table {} created.", DorisContainer.DORIS_TABLE_NAME);
}
@After
public void destroyDatabaseAndTable() {
dropTable(DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
// waiting for table to be dropped
DORIS_CONTAINER.waitForLog(
String.format(
".*finished dropping table: %s.*\\s", DorisContainer.DORIS_TABLE_NAME),
1,
DATABASE_OPERATION_TIMEOUT_SECONDS);
LOG.info("Table {} destroyed.", DorisContainer.DORIS_TABLE_NAME);
dropDatabase(DorisContainer.DORIS_DATABASE_NAME);
// waiting for database to be created
DORIS_CONTAINER.waitForLog(
String.format(
".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME),
1,
DATABASE_OPERATION_TIMEOUT_SECONDS);
LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME);
}
@Test
public void testDorisSinkStreamJob() throws Exception {
runValuesToDorisJob(false);
}
@Test
public void testDorisSinkBatchJob() throws Exception {
runValuesToDorisJob(true);
}
private List<Event> generateEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
return Arrays.asList(
new CreateTableEvent(tableId, schema),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
21, 1.732, BinaryStringData.fromString("Disenchanted")
})),
DataChangeEvent.updateEvent(
tableId,
generator.generate(
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")}),
generator.generate(
new Object[] {17, 6.28, BinaryStringData.fromString("Doris Day")})),
DataChangeEvent.deleteEvent(
tableId,
generator.generate(
new Object[] {
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
})));
}
private void runValuesToDorisJob(boolean batchMode) throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
DataStream<Event> stream =
env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class));
Configuration config =
new Configuration()
.set(FENODES, DORIS_CONTAINER.getFeNodes())
.set(BENODES, DORIS_CONTAINER.getBeNodes())
.set(USERNAME, DorisContainer.DORIS_USERNAME)
.set(PASSWORD, DorisContainer.DORIS_PASSWORD)
.set(SINK_ENABLE_BATCH_MODE, batchMode)
.set(SINK_ENABLE_DELETE, true);
config.addAll(
Configuration.fromMap(
Collections.singletonMap("table.create.properties.replication_num", "1")));
Sink<Event> dorisSink =
((FlinkSinkProvider) createDorisDataSink(config).getEventSinkProvider()).getSink();
stream.sinkTo(dorisSink);
env.execute("Values to Doris Sink");
List<String> actual = fetchTableContent(tableId, 3);
List<String> expected = Arrays.asList("17 | 6.28 | Doris Day", "21 | 1.732 | Disenchanted");
assertEqualsInAnyOrder(expected, actual);
}
}

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.doris.sink.utils;
import org.junit.ClassRule;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
/** Docker container for Doris. */
public class DorisContainer extends JdbcDatabaseContainer<DorisContainer> {
private static final String DOCKER_IMAGE_NAME = "apache/doris:doris-all-in-one-2.1.0";
public static final int FE_INNER_PORT = 8030;
public static final int BE_INNER_PORT = 8040;
public static final int DB_INNER_PORT = 9030;
@ClassRule public static final Network NETWORK = Network.newNetwork();
public String getFeNodes() {
return String.format("%s:%d", getHost(), getMappedPort(FE_INNER_PORT));
}
public String getBeNodes() {
return String.format("%s:%d", getHost(), getMappedPort(BE_INNER_PORT));
}
public String getTableIdentifier() {
return String.format("%s.%s", DORIS_DATABASE_NAME, DORIS_TABLE_NAME);
}
public static final String DORIS_DATABASE_NAME = "doris_database";
public static final String DORIS_TABLE_NAME = "fallen_angel";
public static final String DORIS_USERNAME = "root";
public static final String DORIS_PASSWORD = "";
public DorisContainer() {
super(DockerImageName.parse(DOCKER_IMAGE_NAME));
setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT));
setNetwork(NETWORK);
}
public DorisContainer(Network network) {
super(DockerImageName.parse(DOCKER_IMAGE_NAME));
setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT));
setNetwork(network);
}
public void waitForLog(String regex, int count, int timeoutSeconds) {
new LogMessageWaitStrategy()
.withRegEx(regex)
.withTimes(count)
.withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
.waitUntilReady(this);
}
@Override
public String getDriverClassName() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
return "com.mysql.cj.jdbc.Driver";
} catch (ClassNotFoundException e) {
return "com.mysql.jdbc.Driver";
}
}
@Override
public String getJdbcUrl() {
return getJdbcUrl("");
}
public String getJdbcUrl(String databaseName) {
String additionalUrlParams = constructUrlParameters("?", "&");
return "jdbc:mysql://"
+ getHost()
+ ":"
+ getMappedPort(DB_INNER_PORT)
+ "/"
+ databaseName
+ additionalUrlParams;
}
public String getJdbcUrl(String databaseName, String username) {
String additionalUrlParams = constructUrlParameters("?", "&");
return "jdbc:mysql://"
+ username
+ "@"
+ getHost()
+ ":"
+ getMappedPort(DB_INNER_PORT)
+ "/"
+ databaseName
+ additionalUrlParams;
}
public String getJdbcUrl(String databaseName, String username, String password) {
String additionalUrlParams = constructUrlParameters("?", "&");
return "jdbc:mysql://"
+ username
+ ":"
+ password
+ "@"
+ getHost()
+ ":"
+ getMappedPort(DB_INNER_PORT)
+ "/"
+ databaseName
+ additionalUrlParams;
}
@Override
public String getUsername() {
return DORIS_USERNAME;
}
@Override
public String getPassword() {
return DORIS_PASSWORD;
}
@Override
protected String getTestQueryString() {
return "SELECT 1";
}
}

@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.doris.sink.utils;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.doris.factory.DorisDataSinkFactory;
import org.apache.flink.cdc.connectors.doris.sink.DorisDataSink;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.lifecycle.Startables;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Basic class for testing {@link DorisDataSink}. */
public class DorisSinkTestBase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(DorisSinkTestBase.class);
protected static final int DEFAULT_PARALLELISM = 1;
protected static final DorisContainer DORIS_CONTAINER = createDorisContainer();
public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
private static DorisContainer createDorisContainer() {
return new DorisContainer();
}
@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
LOG.info("Waiting for backends to be available");
long startWaitingTimestamp = System.currentTimeMillis();
new LogMessageWaitStrategy()
.withRegEx(".*get heartbeat from FE.*\\s")
.withTimes(1)
.withStartupTimeout(
Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
.waitUntilReady(DORIS_CONTAINER);
while (!checkBackendAvailability()) {
try {
if (System.currentTimeMillis() - startWaitingTimestamp
> DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
throw new RuntimeException("Doris backend startup timed out.");
}
LOG.info("Waiting for backends to be available");
Thread.sleep(1000);
} catch (InterruptedException ignored) {
// ignore and check next round
}
}
LOG.info("Containers are started.");
}
@AfterClass
public static void stopContainers() {
LOG.info("Stopping containers...");
DORIS_CONTAINER.stop();
LOG.info("Containers are stopped.");
}
static class MockContext implements Factory.Context {
Configuration factoryConfiguration;
public MockContext(Configuration factoryConfiguration) {
this.factoryConfiguration = factoryConfiguration;
}
@Override
public Configuration getFactoryConfiguration() {
return factoryConfiguration;
}
@Override
public Configuration getPipelineConfiguration() {
return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC"));
}
@Override
public ClassLoader getClassLoader() {
return this.getClassLoader();
}
}
public static DataSink createDorisDataSink(Configuration factoryConfiguration) {
DorisDataSinkFactory factory = new DorisDataSinkFactory();
return factory.createDataSink(new MockContext(factoryConfiguration));
}
public static boolean checkBackendAvailability() {
try {
Container.ExecResult rs =
DORIS_CONTAINER.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
"-e SHOW BACKENDS\\G");
if (rs.getExitCode() != 0) {
return false;
}
String output = rs.getStdout();
LOG.info("Doris backend status:\n{}", output);
return output.contains("*************************** 1. row ***************************")
&& !output.contains("AvailCapacity: 1.000 B");
} catch (Exception e) {
LOG.info("Failed to check backend status.", e);
return false;
}
}
public static void createDatabase(String databaseName) {
try {
Container.ExecResult rs =
DORIS_CONTAINER.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName));
if (rs.getExitCode() != 0) {
throw new RuntimeException("Failed to create database." + rs.getStderr());
}
} catch (Exception e) {
throw new RuntimeException("Failed to create database.", e);
}
}
public static void createTable(
String databaseName, String tableName, String primaryKey, List<String> schema) {
try {
Container.ExecResult rs =
DORIS_CONTAINER.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
String.format(
"-e CREATE TABLE `%s`.`%s` (%s) UNIQUE KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");",
databaseName,
tableName,
String.join(", ", schema),
primaryKey,
primaryKey));
if (rs.getExitCode() != 0) {
throw new RuntimeException("Failed to create table." + rs.getStderr());
}
} catch (Exception e) {
throw new RuntimeException("Failed to create table.", e);
}
}
public static void dropDatabase(String databaseName) {
try {
Container.ExecResult rs =
DORIS_CONTAINER.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
String.format("-e DROP DATABASE IF EXISTS %s;", databaseName));
if (rs.getExitCode() != 0) {
throw new RuntimeException("Failed to drop database." + rs.getStderr());
}
} catch (Exception e) {
throw new RuntimeException("Failed to drop database.", e);
}
}
public static void dropTable(String databaseName, String tableName) {
try {
Container.ExecResult rs =
DORIS_CONTAINER.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
String.format(
"-e DROP TABLE IF EXISTS %s.%s;", databaseName, tableName));
if (rs.getExitCode() != 0) {
throw new RuntimeException("Failed to drop table." + rs.getStderr());
}
} catch (Exception e) {
throw new RuntimeException("Failed to drop table.", e);
}
}
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
public List<String> inspectTableSchema(TableId tableId) throws SQLException {
List<String> results = new ArrayList<>();
ResultSet rs =
DORIS_CONTAINER
.createConnection("")
.createStatement()
.executeQuery(
String.format(
"DESCRIBE `%s`.`%s`",
tableId.getSchemaName(), tableId.getTableName()));
while (rs.next()) {
List<String> columns = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
columns.add(rs.getString(i));
}
results.add(String.join(" | ", columns));
}
return results;
}
public List<String> fetchTableContent(TableId tableId, int columnCount) throws SQLException {
List<String> results = new ArrayList<>();
ResultSet rs =
DORIS_CONTAINER
.createConnection("")
.createStatement()
.executeQuery(
String.format(
"SELECT * FROM %s.%s",
tableId.getSchemaName(), tableId.getTableName()));
while (rs.next()) {
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
columns.add(rs.getString(i));
}
results.add(String.join(" | ", columns));
}
return results;
}
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
public static void assertMapEquals(Map<String, ?> expected, Map<String, ?> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
for (String key : expected.keySet()) {
assertEquals(expected.get(key), actual.get(key));
}
}
}

@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n

@ -0,0 +1,384 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.pipeline.tests;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.lifecycle.Startables;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** End-to-end tests for mysql cdc to Doris pipeline job. */
@RunWith(Parameterized.class)
public class MySqlToDorisE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MySqlToDorisE2eITCase.class);
// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the data source for easier verifying)
// ------------------------------------------------------------------------------------------
protected static final String MYSQL_TEST_USER = "mysqluser";
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
public static final int TESTCASE_TIMEOUT_SECONDS = 60;
@ClassRule
public static final MySqlContainer MYSQL =
(MySqlContainer)
new MySqlContainer(
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
.withConfigurationOverride("docker/mysql/my.cnf")
.withSetupSQL("docker/mysql/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ClassRule
public static final DorisContainer DORIS =
new DorisContainer(NETWORK)
.withNetworkAliases("doris")
.withLogConsumer(new Slf4jLogConsumer(LOG));
protected final UniqueDatabase mysqlInventoryDatabase =
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
@BeforeClass
public static void initializeContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(MYSQL)).join();
Startables.deepStart(Stream.of(DORIS)).join();
LOG.info("Waiting for backends to be available");
long startWaitingTimestamp = System.currentTimeMillis();
new LogMessageWaitStrategy()
.withRegEx(".*get heartbeat from FE.*")
.withTimes(1)
.withStartupTimeout(
Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
.waitUntilReady(DORIS);
while (!checkBackendAvailability()) {
try {
if (System.currentTimeMillis() - startWaitingTimestamp
> DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
throw new RuntimeException("Doris backend startup timed out.");
}
LOG.info("Waiting for backends to be available");
Thread.sleep(1000);
} catch (InterruptedException ignored) {
// ignore and check next round
}
}
LOG.info("Containers are started.");
}
@Before
public void before() throws Exception {
super.before();
mysqlInventoryDatabase.createAndInitialize();
createDorisDatabase(mysqlInventoryDatabase.getDatabaseName());
}
private static boolean checkBackendAvailability() {
try {
Container.ExecResult rs =
DORIS.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
"-e SHOW BACKENDS\\G");
if (rs.getExitCode() != 0) {
return false;
}
String output = rs.getStdout();
LOG.info("Doris backend status:\n{}", output);
return output.contains("*************************** 1. row ***************************")
&& !output.contains("AvailCapacity: 1.000 B");
} catch (Exception e) {
LOG.info("Failed to check backend status.", e);
return false;
}
}
@After
public void after() {
super.after();
mysqlInventoryDatabase.dropDatabase();
dropDorisDatabase(mysqlInventoryDatabase.getDatabaseName());
}
@Test
public void testSyncWholeDatabase() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: mysql\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: doris\n"
+ " fenodes: doris:8030\n"
+ " benodes: doris:8040\n"
+ " username: %s\n"
+ " password: \"%s\"\n"
+ " table.create.properties.replication_num: 1\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName(),
DORIS.getUsername(),
DORIS.getPassword());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
validateSinkResult(
mysqlInventoryDatabase.getDatabaseName(),
"products",
7,
Arrays.asList(
"101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
"102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
"103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
"104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
"105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
"106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null",
"107 | rocks | box of assorted rocks | 5.3 | null | null | null",
"108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
"109 | spare tire | 24 inch spare tire | 22.2 | null | null | null"));
validateSinkResult(
mysqlInventoryDatabase.getDatabaseName(),
"customers",
4,
Arrays.asList(
"101 | user_1 | Shanghai | 123567891234",
"102 | user_2 | Shanghai | 123567891234",
"103 | user_3 | Shanghai | 123567891234",
"104 | user_4 | Shanghai | 123567891234"));
LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110
validateSinkResult(
mysqlInventoryDatabase.getDatabaseName(),
"products",
7,
Arrays.asList(
"101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
"102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
"103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
"104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
"105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
"106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null",
"107 | rocks | box of assorted rocks | 5.3 | null | null | null",
"108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
"109 | spare tire | 24 inch spare tire | 22.2 | null | null | null",
"110 | jacket | water resistent white wind breaker | 0.2 | null | null | null"));
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
// modify table schema
stat.execute("ALTER TABLE products DROP COLUMN point_c;");
stat.execute("DELETE FROM products WHERE id=101;");
stat.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null);"); // 111
stat.execute(
"INSERT INTO products VALUES (default,'finally', null, 2.14, null, null);"); // 112
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
validateSinkResult(
mysqlInventoryDatabase.getDatabaseName(),
"products",
7,
Arrays.asList(
"102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | null",
"103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | null",
"104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | null",
"105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | null",
"106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null",
"107 | rocks | box of assorted rocks | 5.1 | null | null | null",
"108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
"109 | spare tire | 24 inch spare tire | 22.2 | null | null | null",
"110 | jacket | water resistent white wind breaker | 0.2 | null | null | null",
"111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null",
"112 | finally | null | 2.14 | null | null | null"));
}
public static void createDorisDatabase(String databaseName) {
try {
Container.ExecResult rs =
DORIS.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName));
if (rs.getExitCode() != 0) {
throw new RuntimeException("Failed to create database." + rs.getStderr());
}
} catch (Exception e) {
throw new RuntimeException("Failed to create database.", e);
}
}
public static void dropDorisDatabase(String databaseName) {
try {
Container.ExecResult rs =
DORIS.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
String.format("-e DROP DATABASE IF EXISTS %s;", databaseName));
if (rs.getExitCode() != 0) {
throw new RuntimeException("Failed to drop database." + rs.getStderr());
}
} catch (Exception e) {
throw new RuntimeException("Failed to drop database.", e);
}
}
private void validateSinkResult(
String databaseName, String tableName, int columnCount, List<String> expected)
throws Exception {
long startWaitingTimestamp = System.currentTimeMillis();
while (true) {
if (System.currentTimeMillis() - startWaitingTimestamp
> TESTCASE_TIMEOUT_SECONDS * 1000) {
throw new RuntimeException("Doris backend startup timed out.");
}
List<String> results = new ArrayList<>();
try (Connection conn =
DriverManager.getConnection(
DORIS.getJdbcUrl(databaseName, DORIS.getUsername()));
Statement stat = conn.createStatement()) {
ResultSet rs =
stat.executeQuery(
String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName));
while (rs.next()) {
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
try {
columns.add(rs.getString(i));
} catch (SQLException ignored) {
// Column count could change after schema evolution
columns.add(null);
}
}
results.add(String.join(" | ", columns));
}
if (expected.size() == results.size()) {
assertEqualsInAnyOrder(expected, results);
break;
} else {
Thread.sleep(1000);
}
} catch (SQLException e) {
LOG.info("Validate sink result failure, waiting for next turn...", e);
Thread.sleep(1000);
}
}
}
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
}
Loading…
Cancel
Save