[FLINK-35120][doris] Add Doris integration test cases
parent
d8a9c8c63e
commit
8eb7e53965
@ -0,0 +1,441 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.doris.sink;
|
||||
|
||||
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.cdc.common.configuration.Configuration;
|
||||
import org.apache.flink.cdc.common.event.AddColumnEvent;
|
||||
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
|
||||
import org.apache.flink.cdc.common.event.CreateTableEvent;
|
||||
import org.apache.flink.cdc.common.event.DropColumnEvent;
|
||||
import org.apache.flink.cdc.common.event.Event;
|
||||
import org.apache.flink.cdc.common.event.RenameColumnEvent;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
|
||||
import org.apache.flink.cdc.common.schema.PhysicalColumn;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.common.sink.DataSink;
|
||||
import org.apache.flink.cdc.common.types.DataTypes;
|
||||
import org.apache.flink.cdc.composer.definition.SinkDef;
|
||||
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
|
||||
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
|
||||
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
|
||||
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
|
||||
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
|
||||
import org.apache.flink.runtime.client.JobExecutionException;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
|
||||
|
||||
/** IT tests for {@link DorisMetadataApplier}. */
|
||||
@RunWith(Parameterized.class)
|
||||
public class DorisMetadataApplierITCase extends DorisSinkTestBase {
|
||||
private static final StreamExecutionEnvironment env =
|
||||
StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5;
|
||||
|
||||
private final boolean batchMode;
|
||||
|
||||
public DorisMetadataApplierITCase(boolean batchMode) {
|
||||
this.batchMode = batchMode;
|
||||
}
|
||||
|
||||
@Parameters(name = "batchMode: {0}")
|
||||
public static Iterable<?> data() {
|
||||
return Arrays.asList(true, false);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void before() {
|
||||
env.setParallelism(DEFAULT_PARALLELISM);
|
||||
env.enableCheckpointing(3000);
|
||||
env.setRestartStrategy(RestartStrategies.noRestart());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void initializeDatabase() {
|
||||
createDatabase(DorisContainer.DORIS_DATABASE_NAME);
|
||||
|
||||
// waiting for table to be created
|
||||
DORIS_CONTAINER.waitForLog(
|
||||
String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME),
|
||||
1,
|
||||
DATABASE_OPERATION_TIMEOUT_SECONDS);
|
||||
|
||||
LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME);
|
||||
}
|
||||
|
||||
@After
|
||||
public void destroyDatabase() {
|
||||
dropDatabase(DorisContainer.DORIS_DATABASE_NAME);
|
||||
// waiting for database to be created
|
||||
DORIS_CONTAINER.waitForLog(
|
||||
String.format(
|
||||
".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME),
|
||||
1,
|
||||
DATABASE_OPERATION_TIMEOUT_SECONDS);
|
||||
|
||||
LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME);
|
||||
}
|
||||
|
||||
private List<Event> generateAddColumnEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
new AddColumnEvent(
|
||||
tableId,
|
||||
Collections.singletonList(
|
||||
new AddColumnEvent.ColumnWithPosition(
|
||||
new PhysicalColumn("extra_date", DataTypes.DATE(), null)))),
|
||||
new AddColumnEvent(
|
||||
tableId,
|
||||
Collections.singletonList(
|
||||
new AddColumnEvent.ColumnWithPosition(
|
||||
new PhysicalColumn(
|
||||
"extra_bool", DataTypes.BOOLEAN(), null)))),
|
||||
new AddColumnEvent(
|
||||
tableId,
|
||||
Collections.singletonList(
|
||||
new AddColumnEvent.ColumnWithPosition(
|
||||
new PhysicalColumn(
|
||||
"extra_decimal",
|
||||
DataTypes.DECIMAL(17, 0),
|
||||
null)))));
|
||||
}
|
||||
|
||||
private List<Event> generateDropColumnEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
new DropColumnEvent(tableId, Collections.singletonList("number")));
|
||||
}
|
||||
|
||||
private List<Event> generateRenameColumnEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")),
|
||||
new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae")));
|
||||
}
|
||||
|
||||
private List<Event> generateAlterColumnTypeEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
new AlterColumnTypeEvent(
|
||||
tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19))));
|
||||
}
|
||||
|
||||
private List<Event> generateNarrowingAlterColumnTypeEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
// Double -> Float is a narrowing cast, should fail
|
||||
new AlterColumnTypeEvent(
|
||||
tableId, Collections.singletonMap("number", DataTypes.FLOAT())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDorisDataTypes() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
|
||||
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID"))
|
||||
// Doris sink doesn't support BINARY type yet.
|
||||
// .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary"))
|
||||
// .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var
|
||||
// Binary"))
|
||||
.column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes"))
|
||||
.column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean"))
|
||||
.column(new PhysicalColumn("int", DataTypes.INT(), "Int"))
|
||||
.column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int"))
|
||||
.column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int"))
|
||||
.column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float"))
|
||||
.column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double"))
|
||||
.column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char"))
|
||||
.column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char"))
|
||||
.column(new PhysicalColumn("string", DataTypes.STRING(), "String"))
|
||||
.column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal"))
|
||||
.column(new PhysicalColumn("date", DataTypes.DATE(), "Date"))
|
||||
// Doris sink doesn't support TIME type yet.
|
||||
// .column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
|
||||
// .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With
|
||||
// Precision"))
|
||||
.column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"timestamp_3",
|
||||
DataTypes.TIMESTAMP(3),
|
||||
"Timestamp With Precision"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"timestamptz", DataTypes.TIMESTAMP_TZ(), "TimestampTZ"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"timestamptz_3",
|
||||
DataTypes.TIMESTAMP_TZ(3),
|
||||
"TimestampTZ With Precision"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"timestampltz_3",
|
||||
DataTypes.TIMESTAMP_LTZ(3),
|
||||
"TimestampLTZ With Precision"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"arrayofint",
|
||||
DataTypes.ARRAY(DataTypes.INT()),
|
||||
"Array of Int"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"arrayofstr",
|
||||
DataTypes.ARRAY(DataTypes.STRING()),
|
||||
"Array of String"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"mapint2str",
|
||||
DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
|
||||
"Map Int to String"))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema)));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | INT | Yes | true | null",
|
||||
"bytes | TEXT | Yes | false | null",
|
||||
"boolean | BOOLEAN | Yes | false | null",
|
||||
"int | INT | Yes | false | null",
|
||||
"tinyint | TINYINT | Yes | false | null",
|
||||
"smallint | SMALLINT | Yes | false | null",
|
||||
"float | FLOAT | Yes | false | null",
|
||||
"double | DOUBLE | Yes | false | null",
|
||||
"char | CHAR(51) | Yes | false | null",
|
||||
"varchar | VARCHAR(51) | Yes | false | null",
|
||||
"string | TEXT | Yes | false | null",
|
||||
"decimal | DECIMAL(17, 7) | Yes | false | null",
|
||||
"date | DATE | Yes | false | null",
|
||||
"timestamp | DATETIME(6) | Yes | false | null",
|
||||
"timestamp_3 | DATETIME(3) | Yes | false | null",
|
||||
"timestamptz | DATETIME(6) | Yes | false | null",
|
||||
"timestamptz_3 | DATETIME(3) | Yes | false | null",
|
||||
"timestampltz | DATETIME(6) | Yes | false | null",
|
||||
"timestampltz_3 | DATETIME(3) | Yes | false | null",
|
||||
"arrayofint | TEXT | Yes | false | null",
|
||||
"arrayofstr | TEXT | Yes | false | null",
|
||||
"mapint2str | TEXT | Yes | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDorisAddColumn() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateAddColumnEvents(tableId));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | INT | Yes | true | null",
|
||||
"number | DOUBLE | Yes | false | null",
|
||||
"name | VARCHAR(51) | Yes | false | null",
|
||||
"extra_date | DATE | Yes | false | null",
|
||||
"extra_bool | BOOLEAN | Yes | false | null",
|
||||
"extra_decimal | DECIMAL(17, 0) | Yes | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDorisDropColumn() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateDropColumnEvents(tableId));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | INT | Yes | true | null", "name | VARCHAR(51) | Yes | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDorisRenameColumn() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateRenameColumnEvents(tableId));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | INT | Yes | true | null",
|
||||
"kazu | DOUBLE | Yes | false | null",
|
||||
"namae | VARCHAR(51) | Yes | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("AlterColumnType is yet to be supported until we close FLINK-35072.")
|
||||
public void testDorisAlterColumnType() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateAlterColumnTypeEvents(tableId));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | INT | Yes | true | null",
|
||||
"number | DOUBLE | Yes | false | null",
|
||||
"name | VARCHAR(57) | Yes | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test(expected = JobExecutionException.class)
|
||||
public void testDorisNarrowingAlterColumnType() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId));
|
||||
}
|
||||
|
||||
private void runJobWithEvents(List<Event> events) throws Exception {
|
||||
DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class));
|
||||
|
||||
Configuration config =
|
||||
new Configuration()
|
||||
.set(FENODES, DORIS_CONTAINER.getFeNodes())
|
||||
.set(BENODES, DORIS_CONTAINER.getBeNodes())
|
||||
.set(USERNAME, DorisContainer.DORIS_USERNAME)
|
||||
.set(PASSWORD, DorisContainer.DORIS_PASSWORD)
|
||||
.set(SINK_ENABLE_BATCH_MODE, batchMode)
|
||||
.set(SINK_ENABLE_DELETE, true);
|
||||
|
||||
config.addAll(
|
||||
Configuration.fromMap(
|
||||
Collections.singletonMap("table.create.properties.replication_num", "1")));
|
||||
|
||||
DataSink dorisSink = createDorisDataSink(config);
|
||||
|
||||
SchemaOperatorTranslator schemaOperatorTranslator =
|
||||
new SchemaOperatorTranslator(
|
||||
SchemaChangeBehavior.EVOLVE,
|
||||
"$$_schema_operator_$$",
|
||||
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
|
||||
|
||||
OperatorIDGenerator schemaOperatorIDGenerator =
|
||||
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
|
||||
|
||||
stream =
|
||||
schemaOperatorTranslator.translate(
|
||||
stream,
|
||||
DEFAULT_PARALLELISM,
|
||||
dorisSink.getMetadataApplier(),
|
||||
new ArrayList<>());
|
||||
|
||||
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
|
||||
sinkTranslator.translate(
|
||||
new SinkDef("doris", "Dummy Doris Sink", config),
|
||||
stream,
|
||||
dorisSink,
|
||||
schemaOperatorIDGenerator.generate());
|
||||
|
||||
env.execute("Doris Schema Evolution Test");
|
||||
}
|
||||
}
|
@ -0,0 +1,210 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.doris.sink;
|
||||
|
||||
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.api.connector.sink2.Sink;
|
||||
import org.apache.flink.cdc.common.configuration.Configuration;
|
||||
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
|
||||
import org.apache.flink.cdc.common.event.CreateTableEvent;
|
||||
import org.apache.flink.cdc.common.event.DataChangeEvent;
|
||||
import org.apache.flink.cdc.common.event.Event;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.schema.PhysicalColumn;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
|
||||
import org.apache.flink.cdc.common.types.DataTypes;
|
||||
import org.apache.flink.cdc.common.types.RowType;
|
||||
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
|
||||
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
|
||||
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
|
||||
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
|
||||
|
||||
/** IT tests for {@link DorisDataSink}. */
|
||||
public class DorisPipelineITCase extends DorisSinkTestBase {
|
||||
|
||||
private static final StreamExecutionEnvironment env =
|
||||
StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5;
|
||||
|
||||
@BeforeClass
|
||||
public static void before() {
|
||||
env.setParallelism(DEFAULT_PARALLELISM);
|
||||
env.enableCheckpointing(3000);
|
||||
env.setRestartStrategy(RestartStrategies.noRestart());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void initializeDatabaseAndTable() {
|
||||
createDatabase(DorisContainer.DORIS_DATABASE_NAME);
|
||||
|
||||
// waiting for table to be created
|
||||
DORIS_CONTAINER.waitForLog(
|
||||
String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME),
|
||||
1,
|
||||
DATABASE_OPERATION_TIMEOUT_SECONDS);
|
||||
|
||||
LOG.info("Database {} created.", DorisContainer.DORIS_DATABASE_NAME);
|
||||
|
||||
createTable(
|
||||
DorisContainer.DORIS_DATABASE_NAME,
|
||||
DorisContainer.DORIS_TABLE_NAME,
|
||||
"id",
|
||||
Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)"));
|
||||
|
||||
// waiting for table to be created
|
||||
DORIS_CONTAINER.waitForLog(
|
||||
String.format(
|
||||
".*successfully create table\\[%s;.*\\s", DorisContainer.DORIS_TABLE_NAME),
|
||||
1,
|
||||
DATABASE_OPERATION_TIMEOUT_SECONDS);
|
||||
|
||||
LOG.info("Table {} created.", DorisContainer.DORIS_TABLE_NAME);
|
||||
}
|
||||
|
||||
@After
|
||||
public void destroyDatabaseAndTable() {
|
||||
dropTable(DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
|
||||
// waiting for table to be dropped
|
||||
DORIS_CONTAINER.waitForLog(
|
||||
String.format(
|
||||
".*finished dropping table: %s.*\\s", DorisContainer.DORIS_TABLE_NAME),
|
||||
1,
|
||||
DATABASE_OPERATION_TIMEOUT_SECONDS);
|
||||
|
||||
LOG.info("Table {} destroyed.", DorisContainer.DORIS_TABLE_NAME);
|
||||
|
||||
dropDatabase(DorisContainer.DORIS_DATABASE_NAME);
|
||||
// waiting for database to be created
|
||||
DORIS_CONTAINER.waitForLog(
|
||||
String.format(
|
||||
".*finish drop database\\[%s\\].*\\s", DorisContainer.DORIS_DATABASE_NAME),
|
||||
1,
|
||||
DATABASE_OPERATION_TIMEOUT_SECONDS);
|
||||
|
||||
LOG.info("Database {} destroyed.", DorisContainer.DORIS_DATABASE_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDorisSinkStreamJob() throws Exception {
|
||||
runValuesToDorisJob(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDorisSinkBatchJob() throws Exception {
|
||||
runValuesToDorisJob(true);
|
||||
}
|
||||
|
||||
private List<Event> generateEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
BinaryRecordDataGenerator generator =
|
||||
new BinaryRecordDataGenerator(
|
||||
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
DataChangeEvent.insertEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")})),
|
||||
DataChangeEvent.insertEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {
|
||||
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
|
||||
})),
|
||||
DataChangeEvent.insertEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {
|
||||
21, 1.732, BinaryStringData.fromString("Disenchanted")
|
||||
})),
|
||||
DataChangeEvent.updateEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")}),
|
||||
generator.generate(
|
||||
new Object[] {17, 6.28, BinaryStringData.fromString("Doris Day")})),
|
||||
DataChangeEvent.deleteEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {
|
||||
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
|
||||
})));
|
||||
}
|
||||
|
||||
private void runValuesToDorisJob(boolean batchMode) throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
|
||||
|
||||
DataStream<Event> stream =
|
||||
env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class));
|
||||
|
||||
Configuration config =
|
||||
new Configuration()
|
||||
.set(FENODES, DORIS_CONTAINER.getFeNodes())
|
||||
.set(BENODES, DORIS_CONTAINER.getBeNodes())
|
||||
.set(USERNAME, DorisContainer.DORIS_USERNAME)
|
||||
.set(PASSWORD, DorisContainer.DORIS_PASSWORD)
|
||||
.set(SINK_ENABLE_BATCH_MODE, batchMode)
|
||||
.set(SINK_ENABLE_DELETE, true);
|
||||
|
||||
config.addAll(
|
||||
Configuration.fromMap(
|
||||
Collections.singletonMap("table.create.properties.replication_num", "1")));
|
||||
|
||||
Sink<Event> dorisSink =
|
||||
((FlinkSinkProvider) createDorisDataSink(config).getEventSinkProvider()).getSink();
|
||||
|
||||
stream.sinkTo(dorisSink);
|
||||
|
||||
env.execute("Values to Doris Sink");
|
||||
|
||||
List<String> actual = fetchTableContent(tableId, 3);
|
||||
|
||||
List<String> expected = Arrays.asList("17 | 6.28 | Doris Day", "21 | 1.732 | Disenchanted");
|
||||
|
||||
assertEqualsInAnyOrder(expected, actual);
|
||||
}
|
||||
}
|
@ -0,0 +1,146 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.doris.sink.utils;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.testcontainers.containers.JdbcDatabaseContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Arrays;
|
||||
|
||||
/** Docker container for Doris. */
|
||||
public class DorisContainer extends JdbcDatabaseContainer<DorisContainer> {
|
||||
|
||||
private static final String DOCKER_IMAGE_NAME = "apache/doris:doris-all-in-one-2.1.0";
|
||||
|
||||
public static final int FE_INNER_PORT = 8030;
|
||||
public static final int BE_INNER_PORT = 8040;
|
||||
public static final int DB_INNER_PORT = 9030;
|
||||
|
||||
@ClassRule public static final Network NETWORK = Network.newNetwork();
|
||||
|
||||
public String getFeNodes() {
|
||||
return String.format("%s:%d", getHost(), getMappedPort(FE_INNER_PORT));
|
||||
}
|
||||
|
||||
public String getBeNodes() {
|
||||
return String.format("%s:%d", getHost(), getMappedPort(BE_INNER_PORT));
|
||||
}
|
||||
|
||||
public String getTableIdentifier() {
|
||||
return String.format("%s.%s", DORIS_DATABASE_NAME, DORIS_TABLE_NAME);
|
||||
}
|
||||
|
||||
public static final String DORIS_DATABASE_NAME = "doris_database";
|
||||
public static final String DORIS_TABLE_NAME = "fallen_angel";
|
||||
public static final String DORIS_USERNAME = "root";
|
||||
public static final String DORIS_PASSWORD = "";
|
||||
|
||||
public DorisContainer() {
|
||||
super(DockerImageName.parse(DOCKER_IMAGE_NAME));
|
||||
setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT));
|
||||
setNetwork(NETWORK);
|
||||
}
|
||||
|
||||
public DorisContainer(Network network) {
|
||||
super(DockerImageName.parse(DOCKER_IMAGE_NAME));
|
||||
setExposedPorts(Arrays.asList(FE_INNER_PORT, BE_INNER_PORT, DB_INNER_PORT));
|
||||
setNetwork(network);
|
||||
}
|
||||
|
||||
public void waitForLog(String regex, int count, int timeoutSeconds) {
|
||||
new LogMessageWaitStrategy()
|
||||
.withRegEx(regex)
|
||||
.withTimes(count)
|
||||
.withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
|
||||
.waitUntilReady(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDriverClassName() {
|
||||
try {
|
||||
Class.forName("com.mysql.cj.jdbc.Driver");
|
||||
return "com.mysql.cj.jdbc.Driver";
|
||||
} catch (ClassNotFoundException e) {
|
||||
return "com.mysql.jdbc.Driver";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getJdbcUrl() {
|
||||
return getJdbcUrl("");
|
||||
}
|
||||
|
||||
public String getJdbcUrl(String databaseName) {
|
||||
String additionalUrlParams = constructUrlParameters("?", "&");
|
||||
return "jdbc:mysql://"
|
||||
+ getHost()
|
||||
+ ":"
|
||||
+ getMappedPort(DB_INNER_PORT)
|
||||
+ "/"
|
||||
+ databaseName
|
||||
+ additionalUrlParams;
|
||||
}
|
||||
|
||||
public String getJdbcUrl(String databaseName, String username) {
|
||||
String additionalUrlParams = constructUrlParameters("?", "&");
|
||||
return "jdbc:mysql://"
|
||||
+ username
|
||||
+ "@"
|
||||
+ getHost()
|
||||
+ ":"
|
||||
+ getMappedPort(DB_INNER_PORT)
|
||||
+ "/"
|
||||
+ databaseName
|
||||
+ additionalUrlParams;
|
||||
}
|
||||
|
||||
public String getJdbcUrl(String databaseName, String username, String password) {
|
||||
String additionalUrlParams = constructUrlParameters("?", "&");
|
||||
return "jdbc:mysql://"
|
||||
+ username
|
||||
+ ":"
|
||||
+ password
|
||||
+ "@"
|
||||
+ getHost()
|
||||
+ ":"
|
||||
+ getMappedPort(DB_INNER_PORT)
|
||||
+ "/"
|
||||
+ databaseName
|
||||
+ additionalUrlParams;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUsername() {
|
||||
return DORIS_USERNAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPassword() {
|
||||
return DORIS_PASSWORD;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getTestQueryString() {
|
||||
return "SELECT 1";
|
||||
}
|
||||
}
|
@ -0,0 +1,319 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.doris.sink.utils;
|
||||
|
||||
import org.apache.flink.cdc.common.configuration.Configuration;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.factories.Factory;
|
||||
import org.apache.flink.cdc.common.sink.DataSink;
|
||||
import org.apache.flink.cdc.connectors.doris.factory.DorisDataSinkFactory;
|
||||
import org.apache.flink.cdc.connectors.doris.sink.DorisDataSink;
|
||||
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
|
||||
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
|
||||
import org.apache.flink.test.util.MiniClusterWithClientResource;
|
||||
import org.apache.flink.util.TestLogger;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.Container;
|
||||
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
|
||||
import org.testcontainers.lifecycle.Startables;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/** Basic class for testing {@link DorisDataSink}. */
|
||||
public class DorisSinkTestBase extends TestLogger {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(DorisSinkTestBase.class);
|
||||
|
||||
protected static final int DEFAULT_PARALLELISM = 1;
|
||||
protected static final DorisContainer DORIS_CONTAINER = createDorisContainer();
|
||||
|
||||
public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
|
||||
|
||||
private static DorisContainer createDorisContainer() {
|
||||
return new DorisContainer();
|
||||
}
|
||||
|
||||
@Rule
|
||||
public final MiniClusterWithClientResource miniClusterResource =
|
||||
new MiniClusterWithClientResource(
|
||||
new MiniClusterResourceConfiguration.Builder()
|
||||
.setNumberTaskManagers(1)
|
||||
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
|
||||
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
|
||||
.withHaLeadershipControl()
|
||||
.build());
|
||||
|
||||
@BeforeClass
|
||||
public static void startContainers() {
|
||||
LOG.info("Starting containers...");
|
||||
Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
|
||||
LOG.info("Waiting for backends to be available");
|
||||
long startWaitingTimestamp = System.currentTimeMillis();
|
||||
|
||||
new LogMessageWaitStrategy()
|
||||
.withRegEx(".*get heartbeat from FE.*\\s")
|
||||
.withTimes(1)
|
||||
.withStartupTimeout(
|
||||
Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
|
||||
.waitUntilReady(DORIS_CONTAINER);
|
||||
|
||||
while (!checkBackendAvailability()) {
|
||||
try {
|
||||
if (System.currentTimeMillis() - startWaitingTimestamp
|
||||
> DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
|
||||
throw new RuntimeException("Doris backend startup timed out.");
|
||||
}
|
||||
LOG.info("Waiting for backends to be available");
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ignored) {
|
||||
// ignore and check next round
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("Containers are started.");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopContainers() {
|
||||
LOG.info("Stopping containers...");
|
||||
DORIS_CONTAINER.stop();
|
||||
LOG.info("Containers are stopped.");
|
||||
}
|
||||
|
||||
static class MockContext implements Factory.Context {
|
||||
|
||||
Configuration factoryConfiguration;
|
||||
|
||||
public MockContext(Configuration factoryConfiguration) {
|
||||
this.factoryConfiguration = factoryConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getFactoryConfiguration() {
|
||||
return factoryConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getPipelineConfiguration() {
|
||||
return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader getClassLoader() {
|
||||
return this.getClassLoader();
|
||||
}
|
||||
}
|
||||
|
||||
public static DataSink createDorisDataSink(Configuration factoryConfiguration) {
|
||||
DorisDataSinkFactory factory = new DorisDataSinkFactory();
|
||||
return factory.createDataSink(new MockContext(factoryConfiguration));
|
||||
}
|
||||
|
||||
public static boolean checkBackendAvailability() {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
DORIS_CONTAINER.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
"-e SHOW BACKENDS\\G");
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
return false;
|
||||
}
|
||||
String output = rs.getStdout();
|
||||
LOG.info("Doris backend status:\n{}", output);
|
||||
return output.contains("*************************** 1. row ***************************")
|
||||
&& !output.contains("AvailCapacity: 1.000 B");
|
||||
} catch (Exception e) {
|
||||
LOG.info("Failed to check backend status.", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static void createDatabase(String databaseName) {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
DORIS_CONTAINER.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName));
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
throw new RuntimeException("Failed to create database." + rs.getStderr());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create database.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void createTable(
|
||||
String databaseName, String tableName, String primaryKey, List<String> schema) {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
DORIS_CONTAINER.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
String.format(
|
||||
"-e CREATE TABLE `%s`.`%s` (%s) UNIQUE KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");",
|
||||
databaseName,
|
||||
tableName,
|
||||
String.join(", ", schema),
|
||||
primaryKey,
|
||||
primaryKey));
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
throw new RuntimeException("Failed to create table." + rs.getStderr());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create table.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void dropDatabase(String databaseName) {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
DORIS_CONTAINER.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
String.format("-e DROP DATABASE IF EXISTS %s;", databaseName));
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
throw new RuntimeException("Failed to drop database." + rs.getStderr());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to drop database.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void dropTable(String databaseName, String tableName) {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
DORIS_CONTAINER.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
String.format(
|
||||
"-e DROP TABLE IF EXISTS %s.%s;", databaseName, tableName));
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
throw new RuntimeException("Failed to drop table." + rs.getStderr());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to drop table.", e);
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// test utilities
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
public List<String> inspectTableSchema(TableId tableId) throws SQLException {
|
||||
List<String> results = new ArrayList<>();
|
||||
ResultSet rs =
|
||||
DORIS_CONTAINER
|
||||
.createConnection("")
|
||||
.createStatement()
|
||||
.executeQuery(
|
||||
String.format(
|
||||
"DESCRIBE `%s`.`%s`",
|
||||
tableId.getSchemaName(), tableId.getTableName()));
|
||||
|
||||
while (rs.next()) {
|
||||
List<String> columns = new ArrayList<>();
|
||||
for (int i = 1; i <= 5; i++) {
|
||||
columns.add(rs.getString(i));
|
||||
}
|
||||
results.add(String.join(" | ", columns));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
public List<String> fetchTableContent(TableId tableId, int columnCount) throws SQLException {
|
||||
List<String> results = new ArrayList<>();
|
||||
ResultSet rs =
|
||||
DORIS_CONTAINER
|
||||
.createConnection("")
|
||||
.createStatement()
|
||||
.executeQuery(
|
||||
String.format(
|
||||
"SELECT * FROM %s.%s",
|
||||
tableId.getSchemaName(), tableId.getTableName()));
|
||||
|
||||
while (rs.next()) {
|
||||
List<String> columns = new ArrayList<>();
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
columns.add(rs.getString(i));
|
||||
}
|
||||
results.add(String.join(" | ", columns));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
|
||||
assertTrue(expected != null && actual != null);
|
||||
assertEqualsInOrder(
|
||||
expected.stream().sorted().collect(Collectors.toList()),
|
||||
actual.stream().sorted().collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
|
||||
assertTrue(expected != null && actual != null);
|
||||
assertEquals(expected.size(), actual.size());
|
||||
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
|
||||
}
|
||||
|
||||
public static void assertMapEquals(Map<String, ?> expected, Map<String, ?> actual) {
|
||||
assertTrue(expected != null && actual != null);
|
||||
assertEquals(expected.size(), actual.size());
|
||||
for (String key : expected.keySet()) {
|
||||
assertEquals(expected.get(key), actual.get(key));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
################################################################################
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
################################################################################
|
||||
|
||||
# Set root logger level to OFF to not flood build logs
|
||||
# set manually to INFO for debugging purposes
|
||||
rootLogger.level=INFO
|
||||
rootLogger.appenderRef.test.ref = TestLogger
|
||||
|
||||
appender.testlogger.name = TestLogger
|
||||
appender.testlogger.type = CONSOLE
|
||||
appender.testlogger.target = SYSTEM_ERR
|
||||
appender.testlogger.layout.type = PatternLayout
|
||||
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
|
@ -0,0 +1,384 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.pipeline.tests;
|
||||
|
||||
import org.apache.flink.cdc.common.test.utils.TestUtils;
|
||||
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
|
||||
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
|
||||
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
|
||||
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
|
||||
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.Container;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
|
||||
import org.testcontainers.lifecycle.Startables;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/** End-to-end tests for mysql cdc to Doris pipeline job. */
|
||||
@RunWith(Parameterized.class)
|
||||
public class MySqlToDorisE2eITCase extends PipelineTestEnvironment {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MySqlToDorisE2eITCase.class);
|
||||
|
||||
// ------------------------------------------------------------------------------------------
|
||||
// MySQL Variables (we always use MySQL as the data source for easier verifying)
|
||||
// ------------------------------------------------------------------------------------------
|
||||
protected static final String MYSQL_TEST_USER = "mysqluser";
|
||||
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
|
||||
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
|
||||
public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
|
||||
public static final int TESTCASE_TIMEOUT_SECONDS = 60;
|
||||
|
||||
@ClassRule
|
||||
public static final MySqlContainer MYSQL =
|
||||
(MySqlContainer)
|
||||
new MySqlContainer(
|
||||
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
|
||||
.withConfigurationOverride("docker/mysql/my.cnf")
|
||||
.withSetupSQL("docker/mysql/setup.sql")
|
||||
.withDatabaseName("flink-test")
|
||||
.withUsername("flinkuser")
|
||||
.withPassword("flinkpw")
|
||||
.withNetwork(NETWORK)
|
||||
.withNetworkAliases("mysql")
|
||||
.withLogConsumer(new Slf4jLogConsumer(LOG));
|
||||
|
||||
@ClassRule
|
||||
public static final DorisContainer DORIS =
|
||||
new DorisContainer(NETWORK)
|
||||
.withNetworkAliases("doris")
|
||||
.withLogConsumer(new Slf4jLogConsumer(LOG));
|
||||
|
||||
protected final UniqueDatabase mysqlInventoryDatabase =
|
||||
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
|
||||
|
||||
@BeforeClass
|
||||
public static void initializeContainers() {
|
||||
LOG.info("Starting containers...");
|
||||
Startables.deepStart(Stream.of(MYSQL)).join();
|
||||
Startables.deepStart(Stream.of(DORIS)).join();
|
||||
LOG.info("Waiting for backends to be available");
|
||||
long startWaitingTimestamp = System.currentTimeMillis();
|
||||
|
||||
new LogMessageWaitStrategy()
|
||||
.withRegEx(".*get heartbeat from FE.*")
|
||||
.withTimes(1)
|
||||
.withStartupTimeout(
|
||||
Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
|
||||
.waitUntilReady(DORIS);
|
||||
|
||||
while (!checkBackendAvailability()) {
|
||||
try {
|
||||
if (System.currentTimeMillis() - startWaitingTimestamp
|
||||
> DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
|
||||
throw new RuntimeException("Doris backend startup timed out.");
|
||||
}
|
||||
LOG.info("Waiting for backends to be available");
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ignored) {
|
||||
// ignore and check next round
|
||||
}
|
||||
}
|
||||
LOG.info("Containers are started.");
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
super.before();
|
||||
mysqlInventoryDatabase.createAndInitialize();
|
||||
createDorisDatabase(mysqlInventoryDatabase.getDatabaseName());
|
||||
}
|
||||
|
||||
private static boolean checkBackendAvailability() {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
DORIS.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
"-e SHOW BACKENDS\\G");
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
return false;
|
||||
}
|
||||
String output = rs.getStdout();
|
||||
LOG.info("Doris backend status:\n{}", output);
|
||||
return output.contains("*************************** 1. row ***************************")
|
||||
&& !output.contains("AvailCapacity: 1.000 B");
|
||||
} catch (Exception e) {
|
||||
LOG.info("Failed to check backend status.", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
super.after();
|
||||
mysqlInventoryDatabase.dropDatabase();
|
||||
dropDorisDatabase(mysqlInventoryDatabase.getDatabaseName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSyncWholeDatabase() throws Exception {
|
||||
String pipelineJob =
|
||||
String.format(
|
||||
"source:\n"
|
||||
+ " type: mysql\n"
|
||||
+ " hostname: mysql\n"
|
||||
+ " port: 3306\n"
|
||||
+ " username: %s\n"
|
||||
+ " password: %s\n"
|
||||
+ " tables: %s.\\.*\n"
|
||||
+ " server-id: 5400-5404\n"
|
||||
+ " server-time-zone: UTC\n"
|
||||
+ "\n"
|
||||
+ "sink:\n"
|
||||
+ " type: doris\n"
|
||||
+ " fenodes: doris:8030\n"
|
||||
+ " benodes: doris:8040\n"
|
||||
+ " username: %s\n"
|
||||
+ " password: \"%s\"\n"
|
||||
+ " table.create.properties.replication_num: 1\n"
|
||||
+ "\n"
|
||||
+ "pipeline:\n"
|
||||
+ " parallelism: 1",
|
||||
MYSQL_TEST_USER,
|
||||
MYSQL_TEST_PASSWORD,
|
||||
mysqlInventoryDatabase.getDatabaseName(),
|
||||
DORIS.getUsername(),
|
||||
DORIS.getPassword());
|
||||
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
|
||||
Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar");
|
||||
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
|
||||
submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar);
|
||||
waitUntilJobRunning(Duration.ofSeconds(30));
|
||||
LOG.info("Pipeline job is running");
|
||||
|
||||
validateSinkResult(
|
||||
mysqlInventoryDatabase.getDatabaseName(),
|
||||
"products",
|
||||
7,
|
||||
Arrays.asList(
|
||||
"101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
|
||||
"102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
|
||||
"103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
|
||||
"104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
|
||||
"105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
|
||||
"106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null",
|
||||
"107 | rocks | box of assorted rocks | 5.3 | null | null | null",
|
||||
"108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
|
||||
"109 | spare tire | 24 inch spare tire | 22.2 | null | null | null"));
|
||||
|
||||
validateSinkResult(
|
||||
mysqlInventoryDatabase.getDatabaseName(),
|
||||
"customers",
|
||||
4,
|
||||
Arrays.asList(
|
||||
"101 | user_1 | Shanghai | 123567891234",
|
||||
"102 | user_2 | Shanghai | 123567891234",
|
||||
"103 | user_3 | Shanghai | 123567891234",
|
||||
"104 | user_4 | Shanghai | 123567891234"));
|
||||
|
||||
LOG.info("Begin incremental reading stage.");
|
||||
// generate binlogs
|
||||
String mysqlJdbcUrl =
|
||||
String.format(
|
||||
"jdbc:mysql://%s:%s/%s",
|
||||
MYSQL.getHost(),
|
||||
MYSQL.getDatabasePort(),
|
||||
mysqlInventoryDatabase.getDatabaseName());
|
||||
try (Connection conn =
|
||||
DriverManager.getConnection(
|
||||
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
|
||||
Statement stat = conn.createStatement()) {
|
||||
|
||||
stat.execute(
|
||||
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110
|
||||
|
||||
validateSinkResult(
|
||||
mysqlInventoryDatabase.getDatabaseName(),
|
||||
"products",
|
||||
7,
|
||||
Arrays.asList(
|
||||
"101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
|
||||
"102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
|
||||
"103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
|
||||
"104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
|
||||
"105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
|
||||
"106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null",
|
||||
"107 | rocks | box of assorted rocks | 5.3 | null | null | null",
|
||||
"108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
|
||||
"109 | spare tire | 24 inch spare tire | 22.2 | null | null | null",
|
||||
"110 | jacket | water resistent white wind breaker | 0.2 | null | null | null"));
|
||||
|
||||
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
|
||||
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
|
||||
|
||||
// modify table schema
|
||||
stat.execute("ALTER TABLE products DROP COLUMN point_c;");
|
||||
stat.execute("DELETE FROM products WHERE id=101;");
|
||||
|
||||
stat.execute(
|
||||
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null);"); // 111
|
||||
stat.execute(
|
||||
"INSERT INTO products VALUES (default,'finally', null, 2.14, null, null);"); // 112
|
||||
} catch (SQLException e) {
|
||||
LOG.error("Update table for CDC failed.", e);
|
||||
throw e;
|
||||
}
|
||||
validateSinkResult(
|
||||
mysqlInventoryDatabase.getDatabaseName(),
|
||||
"products",
|
||||
7,
|
||||
Arrays.asList(
|
||||
"102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | null",
|
||||
"103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | null",
|
||||
"104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | null",
|
||||
"105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | null",
|
||||
"106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null",
|
||||
"107 | rocks | box of assorted rocks | 5.1 | null | null | null",
|
||||
"108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
|
||||
"109 | spare tire | 24 inch spare tire | 22.2 | null | null | null",
|
||||
"110 | jacket | water resistent white wind breaker | 0.2 | null | null | null",
|
||||
"111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null",
|
||||
"112 | finally | null | 2.14 | null | null | null"));
|
||||
}
|
||||
|
||||
public static void createDorisDatabase(String databaseName) {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
DORIS.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName));
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
throw new RuntimeException("Failed to create database." + rs.getStderr());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create database.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void dropDorisDatabase(String databaseName) {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
DORIS.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
String.format("-e DROP DATABASE IF EXISTS %s;", databaseName));
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
throw new RuntimeException("Failed to drop database." + rs.getStderr());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to drop database.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void validateSinkResult(
|
||||
String databaseName, String tableName, int columnCount, List<String> expected)
|
||||
throws Exception {
|
||||
long startWaitingTimestamp = System.currentTimeMillis();
|
||||
while (true) {
|
||||
if (System.currentTimeMillis() - startWaitingTimestamp
|
||||
> TESTCASE_TIMEOUT_SECONDS * 1000) {
|
||||
throw new RuntimeException("Doris backend startup timed out.");
|
||||
}
|
||||
List<String> results = new ArrayList<>();
|
||||
try (Connection conn =
|
||||
DriverManager.getConnection(
|
||||
DORIS.getJdbcUrl(databaseName, DORIS.getUsername()));
|
||||
Statement stat = conn.createStatement()) {
|
||||
ResultSet rs =
|
||||
stat.executeQuery(
|
||||
String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName));
|
||||
|
||||
while (rs.next()) {
|
||||
List<String> columns = new ArrayList<>();
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
try {
|
||||
columns.add(rs.getString(i));
|
||||
} catch (SQLException ignored) {
|
||||
// Column count could change after schema evolution
|
||||
columns.add(null);
|
||||
}
|
||||
}
|
||||
results.add(String.join(" | ", columns));
|
||||
}
|
||||
|
||||
if (expected.size() == results.size()) {
|
||||
assertEqualsInAnyOrder(expected, results);
|
||||
break;
|
||||
} else {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
LOG.info("Validate sink result failure, waiting for next turn...", e);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
|
||||
assertTrue(expected != null && actual != null);
|
||||
assertEqualsInOrder(
|
||||
expected.stream().sorted().collect(Collectors.toList()),
|
||||
actual.stream().sorted().collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
|
||||
assertTrue(expected != null && actual != null);
|
||||
assertEquals(expected.size(), actual.size());
|
||||
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue