[FLINK-35092][cdc][starrocks] Add starrocks integration test cases
parent
8eb7e53965
commit
1a360635c7
@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.starrocks.shade.org.apache.commons.compress.utils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* Dummy class of shaded apache-commons since connector 1.2.9 depends on this, but not package it.
|
||||
* This package should be removed after upgrading to 1.2.10 which will not use commons-compress
|
||||
* anymore.
|
||||
*/
|
||||
public class Lists {
|
||||
public static <E> ArrayList<E> newArrayList() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
}
|
@ -0,0 +1,388 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.starrocks.sink;
|
||||
|
||||
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.cdc.common.configuration.Configuration;
|
||||
import org.apache.flink.cdc.common.event.AddColumnEvent;
|
||||
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
|
||||
import org.apache.flink.cdc.common.event.CreateTableEvent;
|
||||
import org.apache.flink.cdc.common.event.DropColumnEvent;
|
||||
import org.apache.flink.cdc.common.event.Event;
|
||||
import org.apache.flink.cdc.common.event.RenameColumnEvent;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
|
||||
import org.apache.flink.cdc.common.schema.PhysicalColumn;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.common.sink.DataSink;
|
||||
import org.apache.flink.cdc.common.types.DataTypes;
|
||||
import org.apache.flink.cdc.composer.definition.SinkDef;
|
||||
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
|
||||
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
|
||||
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
|
||||
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
|
||||
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
|
||||
import org.apache.flink.runtime.client.JobExecutionException;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
|
||||
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
|
||||
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
|
||||
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
|
||||
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
|
||||
|
||||
/** IT tests for {@link StarRocksMetadataApplier}. */
|
||||
public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase {
|
||||
private static final StreamExecutionEnvironment env =
|
||||
StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
@BeforeClass
|
||||
public static void before() {
|
||||
env.setParallelism(DEFAULT_PARALLELISM);
|
||||
env.enableCheckpointing(3000);
|
||||
env.setRestartStrategy(RestartStrategies.noRestart());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void initializeDatabase() {
|
||||
executeSql(
|
||||
String.format(
|
||||
"CREATE DATABASE IF NOT EXISTS `%s`;",
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME));
|
||||
LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
|
||||
}
|
||||
|
||||
@After
|
||||
public void destroyDatabase() {
|
||||
executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME));
|
||||
LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME);
|
||||
}
|
||||
|
||||
private List<Event> generateAddColumnEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
new AddColumnEvent(
|
||||
tableId,
|
||||
Collections.singletonList(
|
||||
new AddColumnEvent.ColumnWithPosition(
|
||||
new PhysicalColumn("extra_date", DataTypes.DATE(), null)))),
|
||||
new AddColumnEvent(
|
||||
tableId,
|
||||
Collections.singletonList(
|
||||
new AddColumnEvent.ColumnWithPosition(
|
||||
new PhysicalColumn(
|
||||
"extra_bool", DataTypes.BOOLEAN(), null)))),
|
||||
new AddColumnEvent(
|
||||
tableId,
|
||||
Collections.singletonList(
|
||||
new AddColumnEvent.ColumnWithPosition(
|
||||
new PhysicalColumn(
|
||||
"extra_decimal",
|
||||
DataTypes.DECIMAL(17, 0),
|
||||
null)))));
|
||||
}
|
||||
|
||||
private List<Event> generateDropColumnEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
new DropColumnEvent(tableId, Collections.singletonList("number")));
|
||||
}
|
||||
|
||||
private List<Event> generateRenameColumnEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")),
|
||||
new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae")));
|
||||
}
|
||||
|
||||
private List<Event> generateAlterColumnTypeEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
new AlterColumnTypeEvent(
|
||||
tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19))));
|
||||
}
|
||||
|
||||
private List<Event> generateNarrowingAlterColumnTypeEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
// Double -> Float is a narrowing cast, should fail
|
||||
new AlterColumnTypeEvent(
|
||||
tableId, Collections.singletonMap("number", DataTypes.FLOAT())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStarRocksDataType() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME,
|
||||
StarRocksContainer.STARROCKS_TABLE_NAME);
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID"))
|
||||
// StarRocks sink doesn't support BINARY and BYTES type yet.
|
||||
// .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary"))
|
||||
// .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var
|
||||
// Binary"))
|
||||
// .column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes"))
|
||||
.column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean"))
|
||||
.column(new PhysicalColumn("int", DataTypes.INT(), "Int"))
|
||||
.column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int"))
|
||||
.column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int"))
|
||||
.column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float"))
|
||||
.column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double"))
|
||||
.column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char"))
|
||||
.column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char"))
|
||||
.column(new PhysicalColumn("string", DataTypes.STRING(), "String"))
|
||||
.column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal"))
|
||||
.column(new PhysicalColumn("date", DataTypes.DATE(), "Date"))
|
||||
// StarRocks sink doesn't support TIME type yet.
|
||||
// .column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
|
||||
// .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With
|
||||
// Precision"))
|
||||
.column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"timestamp_3",
|
||||
DataTypes.TIMESTAMP(3),
|
||||
"Timestamp With Precision"))
|
||||
// StarRocks sink doesn't support TIMESTAMP with non-local TZ yet.
|
||||
// .column(new PhysicalColumn("timestamptz", DataTypes.TIMESTAMP_TZ(),
|
||||
// "TimestampTZ"))
|
||||
// .column(new PhysicalColumn("timestamptz_3", DataTypes.TIMESTAMP_TZ(3),
|
||||
// "TimestampTZ With Precision"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ"))
|
||||
.column(
|
||||
new PhysicalColumn(
|
||||
"timestampltz_3",
|
||||
DataTypes.TIMESTAMP_LTZ(3),
|
||||
"TimestampLTZ With Precision"))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
|
||||
runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema)));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | int | NO | true | null",
|
||||
"boolean | boolean | YES | false | null",
|
||||
"int | int | YES | false | null",
|
||||
"tinyint | tinyint | YES | false | null",
|
||||
"smallint | smallint | YES | false | null",
|
||||
"float | float | YES | false | null",
|
||||
"double | double | YES | false | null",
|
||||
"char | char(51) | YES | false | null",
|
||||
"varchar | varchar(51) | YES | false | null",
|
||||
"string | varchar(1048576) | YES | false | null",
|
||||
"decimal | decimal(17,7) | YES | false | null",
|
||||
"date | date | YES | false | null",
|
||||
"timestamp | datetime | YES | false | null",
|
||||
"timestamp_3 | datetime | YES | false | null",
|
||||
"timestampltz | datetime | YES | false | null",
|
||||
"timestampltz_3 | datetime | YES | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStarRocksAddColumn() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME,
|
||||
StarRocksContainer.STARROCKS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateAddColumnEvents(tableId));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | int | NO | true | null",
|
||||
"number | double | YES | false | null",
|
||||
"name | varchar(51) | YES | false | null",
|
||||
"extra_date | date | YES | false | null",
|
||||
"extra_bool | boolean | YES | false | null",
|
||||
"extra_decimal | decimal(17,0) | YES | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStarRocksDropColumn() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME,
|
||||
StarRocksContainer.STARROCKS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateDropColumnEvents(tableId));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | int | NO | true | null", "name | varchar(51) | YES | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Rename column is not supported currently.")
|
||||
public void testStarRocksRenameColumn() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME,
|
||||
StarRocksContainer.STARROCKS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateRenameColumnEvents(tableId));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | int | NO | true | null",
|
||||
"kazu | double | YES | false | null",
|
||||
"namae | varchar(51) | YES | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Alter column type is not supported currently.")
|
||||
public void testStarRocksAlterColumnType() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME,
|
||||
StarRocksContainer.STARROCKS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateAlterColumnTypeEvents(tableId));
|
||||
|
||||
List<String> actual = inspectTableSchema(tableId);
|
||||
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"id | int | NO | true | null",
|
||||
"number | double | YES | false | null",
|
||||
"name | varchar(57) | YES | false | null");
|
||||
|
||||
assertEqualsInOrder(expected, actual);
|
||||
}
|
||||
|
||||
@Test(expected = JobExecutionException.class)
|
||||
@Ignore("Alter column type is not supported currently.")
|
||||
public void testStarRocksNarrowingAlterColumnType() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME,
|
||||
StarRocksContainer.STARROCKS_TABLE_NAME);
|
||||
|
||||
runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId));
|
||||
}
|
||||
|
||||
private void runJobWithEvents(List<Event> events) throws Exception {
|
||||
DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class));
|
||||
|
||||
Configuration config =
|
||||
new Configuration()
|
||||
.set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl())
|
||||
.set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl())
|
||||
.set(USERNAME, StarRocksContainer.STARROCKS_USERNAME)
|
||||
.set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
|
||||
|
||||
DataSink starRocksSink = createStarRocksDataSink(config);
|
||||
|
||||
SchemaOperatorTranslator schemaOperatorTranslator =
|
||||
new SchemaOperatorTranslator(
|
||||
SchemaChangeBehavior.EVOLVE,
|
||||
"$$_schema_operator_$$",
|
||||
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
|
||||
|
||||
OperatorIDGenerator schemaOperatorIDGenerator =
|
||||
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
|
||||
|
||||
stream =
|
||||
schemaOperatorTranslator.translate(
|
||||
stream,
|
||||
DEFAULT_PARALLELISM,
|
||||
starRocksSink.getMetadataApplier(),
|
||||
new ArrayList<>());
|
||||
|
||||
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
|
||||
sinkTranslator.translate(
|
||||
new SinkDef("starrocks", "Dummy StarRocks Sink", config),
|
||||
stream,
|
||||
starRocksSink,
|
||||
schemaOperatorIDGenerator.generate());
|
||||
|
||||
env.execute("StarRocks Schema Evolution Test");
|
||||
}
|
||||
}
|
@ -0,0 +1,178 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.starrocks.sink;
|
||||
|
||||
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.api.connector.sink2.Sink;
|
||||
import org.apache.flink.cdc.common.configuration.Configuration;
|
||||
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
|
||||
import org.apache.flink.cdc.common.event.CreateTableEvent;
|
||||
import org.apache.flink.cdc.common.event.DataChangeEvent;
|
||||
import org.apache.flink.cdc.common.event.Event;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.schema.PhysicalColumn;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
|
||||
import org.apache.flink.cdc.common.types.DataTypes;
|
||||
import org.apache.flink.cdc.common.types.RowType;
|
||||
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
|
||||
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
|
||||
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
|
||||
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
|
||||
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
|
||||
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
|
||||
|
||||
/** IT tests for {@link StarRocksDataSink}. */
|
||||
public class StarRocksPipelineITCase extends StarRocksSinkTestBase {
|
||||
private static final StreamExecutionEnvironment env =
|
||||
StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
@BeforeClass
|
||||
public static void before() {
|
||||
env.setParallelism(DEFAULT_PARALLELISM);
|
||||
env.enableCheckpointing(3000);
|
||||
env.setRestartStrategy(RestartStrategies.noRestart());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void initializeDatabaseAndTable() {
|
||||
executeSql(
|
||||
String.format(
|
||||
"CREATE DATABASE IF NOT EXISTS `%s`;",
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME));
|
||||
|
||||
LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
|
||||
|
||||
List<String> schema = Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)");
|
||||
|
||||
executeSql(
|
||||
String.format(
|
||||
"CREATE TABLE `%s`.`%s` (%s) PRIMARY KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");",
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME,
|
||||
StarRocksContainer.STARROCKS_TABLE_NAME,
|
||||
String.join(", ", schema),
|
||||
"id",
|
||||
"id"));
|
||||
|
||||
LOG.info("Table {} created.", StarRocksContainer.STARROCKS_TABLE_NAME);
|
||||
}
|
||||
|
||||
@After
|
||||
public void destroyDatabaseAndTable() {
|
||||
|
||||
executeSql(
|
||||
String.format(
|
||||
"DROP TABLE %s.%s;",
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME,
|
||||
StarRocksContainer.STARROCKS_TABLE_NAME));
|
||||
|
||||
LOG.info("Table {} destroyed.", StarRocksContainer.STARROCKS_TABLE_NAME);
|
||||
|
||||
executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME));
|
||||
|
||||
LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME);
|
||||
}
|
||||
|
||||
private List<Event> generateEvents(TableId tableId) {
|
||||
Schema schema =
|
||||
Schema.newBuilder()
|
||||
.column(new PhysicalColumn("id", DataTypes.INT(), null))
|
||||
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
|
||||
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
|
||||
.primaryKey("id")
|
||||
.build();
|
||||
BinaryRecordDataGenerator generator =
|
||||
new BinaryRecordDataGenerator(
|
||||
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
|
||||
|
||||
return Arrays.asList(
|
||||
new CreateTableEvent(tableId, schema),
|
||||
DataChangeEvent.insertEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")})),
|
||||
DataChangeEvent.insertEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {
|
||||
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
|
||||
})),
|
||||
DataChangeEvent.insertEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {
|
||||
21, 1.732, BinaryStringData.fromString("Disenchanted")
|
||||
})),
|
||||
DataChangeEvent.deleteEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {
|
||||
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
|
||||
})),
|
||||
DataChangeEvent.updateEvent(
|
||||
tableId,
|
||||
generator.generate(
|
||||
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")}),
|
||||
generator.generate(
|
||||
new Object[] {
|
||||
17, 6.28, BinaryStringData.fromString("StarRocks")
|
||||
})));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValuesToStarRocks() throws Exception {
|
||||
TableId tableId =
|
||||
TableId.tableId(
|
||||
StarRocksContainer.STARROCKS_DATABASE_NAME,
|
||||
StarRocksContainer.STARROCKS_TABLE_NAME);
|
||||
DataStream<Event> stream =
|
||||
env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class));
|
||||
|
||||
Configuration config =
|
||||
new Configuration()
|
||||
.set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl())
|
||||
.set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl())
|
||||
.set(USERNAME, StarRocksContainer.STARROCKS_USERNAME)
|
||||
.set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
|
||||
|
||||
Sink<Event> starRocksSink =
|
||||
((FlinkSinkProvider) createStarRocksDataSink(config).getEventSinkProvider())
|
||||
.getSink();
|
||||
stream.sinkTo(starRocksSink);
|
||||
|
||||
env.execute("Values to StarRocks Sink");
|
||||
|
||||
List<String> actual = fetchTableContent(tableId, 3);
|
||||
List<String> expected = Arrays.asList("17 | 6.28 | StarRocks", "21 | 1.732 | Disenchanted");
|
||||
|
||||
assertEqualsInAnyOrder(expected, actual);
|
||||
}
|
||||
}
|
@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.starrocks.sink.utils;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.testcontainers.containers.JdbcDatabaseContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/** Docker container for StarRocks. */
|
||||
public class StarRocksContainer extends JdbcDatabaseContainer<StarRocksContainer> {
|
||||
|
||||
private static final String DOCKER_IMAGE_NAME = "starrocks/allin1-ubuntu:3.2.6";
|
||||
|
||||
// exposed ports
|
||||
public static final int FE_HTTP_SERVICE_PORT = 8080;
|
||||
public static final int FE_QUERY_PORT = 9030;
|
||||
|
||||
public static final String STARROCKS_DATABASE_NAME = "starrocks_database";
|
||||
public static final String STARROCKS_TABLE_NAME = "fallen_angel";
|
||||
public static final String STARROCKS_USERNAME = "root";
|
||||
public static final String STARROCKS_PASSWORD = "";
|
||||
|
||||
@ClassRule public static final Network NETWORK = Network.newNetwork();
|
||||
|
||||
public StarRocksContainer() {
|
||||
super(DockerImageName.parse(DOCKER_IMAGE_NAME));
|
||||
setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT));
|
||||
setNetwork(NETWORK);
|
||||
}
|
||||
|
||||
public StarRocksContainer(Network network) {
|
||||
super(DockerImageName.parse(DOCKER_IMAGE_NAME));
|
||||
setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT));
|
||||
setNetwork(network);
|
||||
}
|
||||
|
||||
public List<String> getLoadUrl() {
|
||||
return Collections.singletonList(
|
||||
String.format("%s:%d", getHost(), getMappedPort(FE_HTTP_SERVICE_PORT)));
|
||||
}
|
||||
|
||||
public void waitForLog(String regex, int count, int timeoutSeconds) {
|
||||
new LogMessageWaitStrategy()
|
||||
.withRegEx(regex)
|
||||
.withTimes(count)
|
||||
.withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
|
||||
.waitUntilReady(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDriverClassName() {
|
||||
try {
|
||||
Class.forName("com.mysql.cj.jdbc.Driver");
|
||||
return "com.mysql.cj.jdbc.Driver";
|
||||
} catch (ClassNotFoundException e) {
|
||||
return "com.mysql.jdbc.Driver";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getJdbcUrl() {
|
||||
return getJdbcUrl("");
|
||||
}
|
||||
|
||||
public String getJdbcUrl(String databaseName) {
|
||||
String additionalUrlParams = constructUrlParameters("?", "&");
|
||||
return "jdbc:mysql://"
|
||||
+ getHost()
|
||||
+ ":"
|
||||
+ getMappedPort(FE_QUERY_PORT)
|
||||
+ "/"
|
||||
+ databaseName
|
||||
+ additionalUrlParams;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUsername() {
|
||||
return STARROCKS_USERNAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPassword() {
|
||||
return STARROCKS_PASSWORD;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getTestQueryString() {
|
||||
return "SELECT 1";
|
||||
}
|
||||
}
|
@ -0,0 +1,252 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.starrocks.sink.utils;
|
||||
|
||||
import org.apache.flink.cdc.common.configuration.Configuration;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.factories.Factory;
|
||||
import org.apache.flink.cdc.common.sink.DataSink;
|
||||
import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSink;
|
||||
import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory;
|
||||
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
|
||||
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
|
||||
import org.apache.flink.test.util.MiniClusterWithClientResource;
|
||||
import org.apache.flink.util.TestLogger;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.Container;
|
||||
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
|
||||
import org.testcontainers.lifecycle.Startables;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/** Basic class for testing {@link StarRocksDataSink}. */
|
||||
public class StarRocksSinkTestBase extends TestLogger {
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkTestBase.class);
|
||||
|
||||
protected static final int DEFAULT_PARALLELISM = 1;
|
||||
|
||||
protected static final StarRocksContainer STARROCKS_CONTAINER = createStarRocksContainer();
|
||||
|
||||
public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
|
||||
|
||||
private static StarRocksContainer createStarRocksContainer() {
|
||||
return new StarRocksContainer();
|
||||
}
|
||||
|
||||
@Rule
|
||||
public final MiniClusterWithClientResource miniClusterResource =
|
||||
new MiniClusterWithClientResource(
|
||||
new MiniClusterResourceConfiguration.Builder()
|
||||
.setNumberTaskManagers(1)
|
||||
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
|
||||
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
|
||||
.withHaLeadershipControl()
|
||||
.build());
|
||||
|
||||
@BeforeClass
|
||||
public static void startContainers() {
|
||||
LOG.info("Starting containers...");
|
||||
Startables.deepStart(Stream.of(STARROCKS_CONTAINER)).join();
|
||||
LOG.info("Waiting for StarRocks to launch");
|
||||
|
||||
long startWaitingTimestamp = System.currentTimeMillis();
|
||||
|
||||
new LogMessageWaitStrategy()
|
||||
.withRegEx(".*Enjoy the journal to StarRocks blazing-fast lake-house engine!.*\\s")
|
||||
.withTimes(1)
|
||||
.withStartupTimeout(
|
||||
Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
|
||||
.waitUntilReady(STARROCKS_CONTAINER);
|
||||
|
||||
while (!checkBackendAvailability()) {
|
||||
try {
|
||||
if (System.currentTimeMillis() - startWaitingTimestamp
|
||||
> DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
|
||||
throw new RuntimeException("StarRocks backend startup timed out.");
|
||||
}
|
||||
LOG.info("Waiting for backends to be available");
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ignored) {
|
||||
// ignore and check next round
|
||||
}
|
||||
}
|
||||
LOG.info("Containers are started.");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopContainers() {
|
||||
LOG.info("Stopping containers...");
|
||||
STARROCKS_CONTAINER.stop();
|
||||
LOG.info("Containers are stopped.");
|
||||
}
|
||||
|
||||
static class MockContext implements Factory.Context {
|
||||
|
||||
Configuration factoryConfiguration;
|
||||
|
||||
public MockContext(Configuration factoryConfiguration) {
|
||||
this.factoryConfiguration = factoryConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getFactoryConfiguration() {
|
||||
return factoryConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getPipelineConfiguration() {
|
||||
return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader getClassLoader() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static DataSink createStarRocksDataSink(Configuration factoryConfiguration) {
|
||||
StarRocksDataSinkFactory factory = new StarRocksDataSinkFactory();
|
||||
return factory.createDataSink(new MockContext(factoryConfiguration));
|
||||
}
|
||||
|
||||
public static void executeSql(String sql) {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
STARROCKS_CONTAINER.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
"-e " + sql);
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
throw new RuntimeException("Failed to execute SQL." + rs.getStderr());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to execute SQL.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean checkBackendAvailability() {
|
||||
try {
|
||||
Container.ExecResult rs =
|
||||
STARROCKS_CONTAINER.execInContainer(
|
||||
"mysql",
|
||||
"--protocol=TCP",
|
||||
"-uroot",
|
||||
"-P9030",
|
||||
"-h127.0.0.1",
|
||||
"-e SHOW BACKENDS\\G");
|
||||
|
||||
if (rs.getExitCode() != 0) {
|
||||
return false;
|
||||
}
|
||||
return rs.getStdout()
|
||||
.contains("*************************** 1. row ***************************");
|
||||
} catch (Exception e) {
|
||||
LOG.info("Failed to check backend status.", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// test utilities
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
public List<String> inspectTableSchema(TableId tableId) throws SQLException {
|
||||
List<String> results = new ArrayList<>();
|
||||
ResultSet rs =
|
||||
STARROCKS_CONTAINER
|
||||
.createConnection("")
|
||||
.createStatement()
|
||||
.executeQuery(
|
||||
String.format(
|
||||
"DESCRIBE `%s`.`%s`",
|
||||
tableId.getSchemaName(), tableId.getTableName()));
|
||||
|
||||
while (rs.next()) {
|
||||
List<String> columns = new ArrayList<>();
|
||||
for (int i = 1; i <= 5; i++) {
|
||||
columns.add(rs.getString(i));
|
||||
}
|
||||
results.add(String.join(" | ", columns));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
public List<String> fetchTableContent(TableId tableId, int columnCount) throws SQLException {
|
||||
List<String> results = new ArrayList<>();
|
||||
ResultSet rs =
|
||||
STARROCKS_CONTAINER
|
||||
.createConnection("")
|
||||
.createStatement()
|
||||
.executeQuery(
|
||||
String.format(
|
||||
"SELECT * FROM %s.%s",
|
||||
tableId.getSchemaName(), tableId.getTableName()));
|
||||
|
||||
while (rs.next()) {
|
||||
List<String> columns = new ArrayList<>();
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
columns.add(rs.getString(i));
|
||||
}
|
||||
results.add(String.join(" | ", columns));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
|
||||
assertTrue(expected != null && actual != null);
|
||||
assertEqualsInOrder(
|
||||
expected.stream().sorted().collect(Collectors.toList()),
|
||||
actual.stream().sorted().collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
|
||||
assertTrue(expected != null && actual != null);
|
||||
assertEquals(expected.size(), actual.size());
|
||||
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
|
||||
}
|
||||
|
||||
public static void assertMapEquals(Map<String, ?> expected, Map<String, ?> actual) {
|
||||
assertTrue(expected != null && actual != null);
|
||||
assertEquals(expected.size(), actual.size());
|
||||
for (String key : expected.keySet()) {
|
||||
assertEquals(expected.get(key), actual.get(key));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue