[FLINK-35092][cdc][starrocks] Add starrocks integration test cases

pull/3402/head
yuxiqian 8 months ago committed by Leonard Xu
parent a74fb376d7
commit a18cacec34

@ -114,6 +114,18 @@ limitations under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -52,6 +52,24 @@ limitations under the License.
<artifactId>flink-cdc-composer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.starrocks.shade.org.apache.commons.compress.utils;
import java.util.ArrayList;
/**
* Dummy class of shaded apache-commons since connector 1.2.9 depends on this, but not package it.
* This package should be removed after upgrading to 1.2.10 which will not use commons-compress
* anymore.
*/
public class Lists {
public static <E> ArrayList<E> newArrayList() {
return new ArrayList<>();
}
}

@ -0,0 +1,388 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.starrocks.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
/** IT tests for {@link StarRocksMetadataApplier}. */
public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase {
private static final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@BeforeClass
public static void before() {
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(3000);
env.setRestartStrategy(RestartStrategies.noRestart());
}
@Before
public void initializeDatabase() {
executeSql(
String.format(
"CREATE DATABASE IF NOT EXISTS `%s`;",
StarRocksContainer.STARROCKS_DATABASE_NAME));
LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
}
@After
public void destroyDatabase() {
executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME));
LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME);
}
private List<Event> generateAddColumnEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("extra_date", DataTypes.DATE(), null)))),
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
"extra_bool", DataTypes.BOOLEAN(), null)))),
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
"extra_decimal",
DataTypes.DECIMAL(17, 0),
null)))));
}
private List<Event> generateDropColumnEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
new DropColumnEvent(tableId, Collections.singletonList("number")));
}
private List<Event> generateRenameColumnEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
new RenameColumnEvent(tableId, Collections.singletonMap("number", "kazu")),
new RenameColumnEvent(tableId, Collections.singletonMap("name", "namae")));
}
private List<Event> generateAlterColumnTypeEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19))));
}
private List<Event> generateNarrowingAlterColumnTypeEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
// Double -> Float is a narrowing cast, should fail
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("number", DataTypes.FLOAT())));
}
@Test
public void testStarRocksDataType() throws Exception {
TableId tableId =
TableId.tableId(
StarRocksContainer.STARROCKS_DATABASE_NAME,
StarRocksContainer.STARROCKS_TABLE_NAME);
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), "ID"))
// StarRocks sink doesn't support BINARY and BYTES type yet.
// .column(new PhysicalColumn("binary", DataTypes.BINARY(17), "Binary"))
// .column(new PhysicalColumn("varbinary", DataTypes.VARBINARY(17), "Var
// Binary"))
// .column(new PhysicalColumn("bytes", DataTypes.BYTES(), "Bytes"))
.column(new PhysicalColumn("boolean", DataTypes.BOOLEAN(), "Boolean"))
.column(new PhysicalColumn("int", DataTypes.INT(), "Int"))
.column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), "Tiny Int"))
.column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), "Small Int"))
.column(new PhysicalColumn("float", DataTypes.FLOAT(), "Float"))
.column(new PhysicalColumn("double", DataTypes.DOUBLE(), "Double"))
.column(new PhysicalColumn("char", DataTypes.CHAR(17), "Char"))
.column(new PhysicalColumn("varchar", DataTypes.VARCHAR(17), "Var Char"))
.column(new PhysicalColumn("string", DataTypes.STRING(), "String"))
.column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal"))
.column(new PhysicalColumn("date", DataTypes.DATE(), "Date"))
// StarRocks sink doesn't support TIME type yet.
// .column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
// .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With
// Precision"))
.column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp"))
.column(
new PhysicalColumn(
"timestamp_3",
DataTypes.TIMESTAMP(3),
"Timestamp With Precision"))
// StarRocks sink doesn't support TIMESTAMP with non-local TZ yet.
// .column(new PhysicalColumn("timestamptz", DataTypes.TIMESTAMP_TZ(),
// "TimestampTZ"))
// .column(new PhysicalColumn("timestamptz_3", DataTypes.TIMESTAMP_TZ(3),
// "TimestampTZ With Precision"))
.column(
new PhysicalColumn(
"timestampltz", DataTypes.TIMESTAMP_LTZ(), "TimestampLTZ"))
.column(
new PhysicalColumn(
"timestampltz_3",
DataTypes.TIMESTAMP_LTZ(3),
"TimestampLTZ With Precision"))
.primaryKey("id")
.build();
runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema)));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | int | NO | true | null",
"boolean | boolean | YES | false | null",
"int | int | YES | false | null",
"tinyint | tinyint | YES | false | null",
"smallint | smallint | YES | false | null",
"float | float | YES | false | null",
"double | double | YES | false | null",
"char | char(51) | YES | false | null",
"varchar | varchar(51) | YES | false | null",
"string | varchar(1048576) | YES | false | null",
"decimal | decimal(17,7) | YES | false | null",
"date | date | YES | false | null",
"timestamp | datetime | YES | false | null",
"timestamp_3 | datetime | YES | false | null",
"timestampltz | datetime | YES | false | null",
"timestampltz_3 | datetime | YES | false | null");
assertEqualsInOrder(expected, actual);
}
@Test
public void testStarRocksAddColumn() throws Exception {
TableId tableId =
TableId.tableId(
StarRocksContainer.STARROCKS_DATABASE_NAME,
StarRocksContainer.STARROCKS_TABLE_NAME);
runJobWithEvents(generateAddColumnEvents(tableId));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | int | NO | true | null",
"number | double | YES | false | null",
"name | varchar(51) | YES | false | null",
"extra_date | date | YES | false | null",
"extra_bool | boolean | YES | false | null",
"extra_decimal | decimal(17,0) | YES | false | null");
assertEqualsInOrder(expected, actual);
}
@Test
public void testStarRocksDropColumn() throws Exception {
TableId tableId =
TableId.tableId(
StarRocksContainer.STARROCKS_DATABASE_NAME,
StarRocksContainer.STARROCKS_TABLE_NAME);
runJobWithEvents(generateDropColumnEvents(tableId));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | int | NO | true | null", "name | varchar(51) | YES | false | null");
assertEqualsInOrder(expected, actual);
}
@Test
@Ignore("Rename column is not supported currently.")
public void testStarRocksRenameColumn() throws Exception {
TableId tableId =
TableId.tableId(
StarRocksContainer.STARROCKS_DATABASE_NAME,
StarRocksContainer.STARROCKS_TABLE_NAME);
runJobWithEvents(generateRenameColumnEvents(tableId));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | int | NO | true | null",
"kazu | double | YES | false | null",
"namae | varchar(51) | YES | false | null");
assertEqualsInOrder(expected, actual);
}
@Test
@Ignore("Alter column type is not supported currently.")
public void testStarRocksAlterColumnType() throws Exception {
TableId tableId =
TableId.tableId(
StarRocksContainer.STARROCKS_DATABASE_NAME,
StarRocksContainer.STARROCKS_TABLE_NAME);
runJobWithEvents(generateAlterColumnTypeEvents(tableId));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | int | NO | true | null",
"number | double | YES | false | null",
"name | varchar(57) | YES | false | null");
assertEqualsInOrder(expected, actual);
}
@Test(expected = JobExecutionException.class)
@Ignore("Alter column type is not supported currently.")
public void testStarRocksNarrowingAlterColumnType() throws Exception {
TableId tableId =
TableId.tableId(
StarRocksContainer.STARROCKS_DATABASE_NAME,
StarRocksContainer.STARROCKS_TABLE_NAME);
runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId));
}
private void runJobWithEvents(List<Event> events) throws Exception {
DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class));
Configuration config =
new Configuration()
.set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl())
.set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl())
.set(USERNAME, StarRocksContainer.STARROCKS_USERNAME)
.set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
DataSink starRocksSink = createStarRocksDataSink(config);
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
SchemaChangeBehavior.EVOLVE,
"$$_schema_operator_$$",
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
stream =
schemaOperatorTranslator.translate(
stream,
DEFAULT_PARALLELISM,
starRocksSink.getMetadataApplier(),
new ArrayList<>());
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
sinkTranslator.translate(
new SinkDef("starrocks", "Dummy StarRocks Sink", config),
stream,
starRocksSink,
schemaOperatorIDGenerator.generate());
env.execute("StarRocks Schema Evolution Test");
}
}

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.starrocks.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
/** IT tests for {@link StarRocksDataSink}. */
public class StarRocksPipelineITCase extends StarRocksSinkTestBase {
private static final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@BeforeClass
public static void before() {
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(3000);
env.setRestartStrategy(RestartStrategies.noRestart());
}
@Before
public void initializeDatabaseAndTable() {
executeSql(
String.format(
"CREATE DATABASE IF NOT EXISTS `%s`;",
StarRocksContainer.STARROCKS_DATABASE_NAME));
LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
List<String> schema = Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)");
executeSql(
String.format(
"CREATE TABLE `%s`.`%s` (%s) PRIMARY KEY (`%s`) DISTRIBUTED BY HASH(`%s`) BUCKETS 1 PROPERTIES (\"replication_num\" = \"1\");",
StarRocksContainer.STARROCKS_DATABASE_NAME,
StarRocksContainer.STARROCKS_TABLE_NAME,
String.join(", ", schema),
"id",
"id"));
LOG.info("Table {} created.", StarRocksContainer.STARROCKS_TABLE_NAME);
}
@After
public void destroyDatabaseAndTable() {
executeSql(
String.format(
"DROP TABLE %s.%s;",
StarRocksContainer.STARROCKS_DATABASE_NAME,
StarRocksContainer.STARROCKS_TABLE_NAME));
LOG.info("Table {} destroyed.", StarRocksContainer.STARROCKS_TABLE_NAME);
executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME));
LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME);
}
private List<Event> generateEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
return Arrays.asList(
new CreateTableEvent(tableId, schema),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
21, 1.732, BinaryStringData.fromString("Disenchanted")
})),
DataChangeEvent.deleteEvent(
tableId,
generator.generate(
new Object[] {
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
})),
DataChangeEvent.updateEvent(
tableId,
generator.generate(
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")}),
generator.generate(
new Object[] {
17, 6.28, BinaryStringData.fromString("StarRocks")
})));
}
@Test
public void testValuesToStarRocks() throws Exception {
TableId tableId =
TableId.tableId(
StarRocksContainer.STARROCKS_DATABASE_NAME,
StarRocksContainer.STARROCKS_TABLE_NAME);
DataStream<Event> stream =
env.fromCollection(generateEvents(tableId), TypeInformation.of(Event.class));
Configuration config =
new Configuration()
.set(LOAD_URL, STARROCKS_CONTAINER.getLoadUrl())
.set(JDBC_URL, STARROCKS_CONTAINER.getJdbcUrl())
.set(USERNAME, StarRocksContainer.STARROCKS_USERNAME)
.set(PASSWORD, StarRocksContainer.STARROCKS_PASSWORD);
Sink<Event> starRocksSink =
((FlinkSinkProvider) createStarRocksDataSink(config).getEventSinkProvider())
.getSink();
stream.sinkTo(starRocksSink);
env.execute("Values to StarRocks Sink");
List<String> actual = fetchTableContent(tableId, 3);
List<String> expected = Arrays.asList("17 | 6.28 | StarRocks", "21 | 1.732 | Disenchanted");
assertEqualsInAnyOrder(expected, actual);
}
}

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.starrocks.sink.utils;
import org.junit.ClassRule;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/** Docker container for StarRocks. */
public class StarRocksContainer extends JdbcDatabaseContainer<StarRocksContainer> {
private static final String DOCKER_IMAGE_NAME = "starrocks/allin1-ubuntu:3.2.6";
// exposed ports
public static final int FE_HTTP_SERVICE_PORT = 8080;
public static final int FE_QUERY_PORT = 9030;
public static final String STARROCKS_DATABASE_NAME = "starrocks_database";
public static final String STARROCKS_TABLE_NAME = "fallen_angel";
public static final String STARROCKS_USERNAME = "root";
public static final String STARROCKS_PASSWORD = "";
@ClassRule public static final Network NETWORK = Network.newNetwork();
public StarRocksContainer() {
super(DockerImageName.parse(DOCKER_IMAGE_NAME));
setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT));
setNetwork(NETWORK);
}
public StarRocksContainer(Network network) {
super(DockerImageName.parse(DOCKER_IMAGE_NAME));
setExposedPorts(Arrays.asList(FE_HTTP_SERVICE_PORT, FE_QUERY_PORT));
setNetwork(network);
}
public List<String> getLoadUrl() {
return Collections.singletonList(
String.format("%s:%d", getHost(), getMappedPort(FE_HTTP_SERVICE_PORT)));
}
public void waitForLog(String regex, int count, int timeoutSeconds) {
new LogMessageWaitStrategy()
.withRegEx(regex)
.withTimes(count)
.withStartupTimeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
.waitUntilReady(this);
}
@Override
public String getDriverClassName() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
return "com.mysql.cj.jdbc.Driver";
} catch (ClassNotFoundException e) {
return "com.mysql.jdbc.Driver";
}
}
@Override
public String getJdbcUrl() {
return getJdbcUrl("");
}
public String getJdbcUrl(String databaseName) {
String additionalUrlParams = constructUrlParameters("?", "&");
return "jdbc:mysql://"
+ getHost()
+ ":"
+ getMappedPort(FE_QUERY_PORT)
+ "/"
+ databaseName
+ additionalUrlParams;
}
@Override
public String getUsername() {
return STARROCKS_USERNAME;
}
@Override
public String getPassword() {
return STARROCKS_PASSWORD;
}
@Override
protected String getTestQueryString() {
return "SELECT 1";
}
}

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.starrocks.sink.utils;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSink;
import org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.lifecycle.Startables;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Basic class for testing {@link StarRocksDataSink}. */
public class StarRocksSinkTestBase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkTestBase.class);
protected static final int DEFAULT_PARALLELISM = 1;
protected static final StarRocksContainer STARROCKS_CONTAINER = createStarRocksContainer();
public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
private static StarRocksContainer createStarRocksContainer() {
return new StarRocksContainer();
}
@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(STARROCKS_CONTAINER)).join();
LOG.info("Waiting for StarRocks to launch");
long startWaitingTimestamp = System.currentTimeMillis();
new LogMessageWaitStrategy()
.withRegEx(".*Enjoy the journal to StarRocks blazing-fast lake-house engine!.*\\s")
.withTimes(1)
.withStartupTimeout(
Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
.waitUntilReady(STARROCKS_CONTAINER);
while (!checkBackendAvailability()) {
try {
if (System.currentTimeMillis() - startWaitingTimestamp
> DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
throw new RuntimeException("StarRocks backend startup timed out.");
}
LOG.info("Waiting for backends to be available");
Thread.sleep(1000);
} catch (InterruptedException ignored) {
// ignore and check next round
}
}
LOG.info("Containers are started.");
}
@AfterClass
public static void stopContainers() {
LOG.info("Stopping containers...");
STARROCKS_CONTAINER.stop();
LOG.info("Containers are stopped.");
}
static class MockContext implements Factory.Context {
Configuration factoryConfiguration;
public MockContext(Configuration factoryConfiguration) {
this.factoryConfiguration = factoryConfiguration;
}
@Override
public Configuration getFactoryConfiguration() {
return factoryConfiguration;
}
@Override
public Configuration getPipelineConfiguration() {
return Configuration.fromMap(Collections.singletonMap("local-time-zone", "UTC"));
}
@Override
public ClassLoader getClassLoader() {
return null;
}
}
public static DataSink createStarRocksDataSink(Configuration factoryConfiguration) {
StarRocksDataSinkFactory factory = new StarRocksDataSinkFactory();
return factory.createDataSink(new MockContext(factoryConfiguration));
}
public static void executeSql(String sql) {
try {
Container.ExecResult rs =
STARROCKS_CONTAINER.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
"-e " + sql);
if (rs.getExitCode() != 0) {
throw new RuntimeException("Failed to execute SQL." + rs.getStderr());
}
} catch (Exception e) {
throw new RuntimeException("Failed to execute SQL.", e);
}
}
public static boolean checkBackendAvailability() {
try {
Container.ExecResult rs =
STARROCKS_CONTAINER.execInContainer(
"mysql",
"--protocol=TCP",
"-uroot",
"-P9030",
"-h127.0.0.1",
"-e SHOW BACKENDS\\G");
if (rs.getExitCode() != 0) {
return false;
}
return rs.getStdout()
.contains("*************************** 1. row ***************************");
} catch (Exception e) {
LOG.info("Failed to check backend status.", e);
return false;
}
}
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
public List<String> inspectTableSchema(TableId tableId) throws SQLException {
List<String> results = new ArrayList<>();
ResultSet rs =
STARROCKS_CONTAINER
.createConnection("")
.createStatement()
.executeQuery(
String.format(
"DESCRIBE `%s`.`%s`",
tableId.getSchemaName(), tableId.getTableName()));
while (rs.next()) {
List<String> columns = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
columns.add(rs.getString(i));
}
results.add(String.join(" | ", columns));
}
return results;
}
public List<String> fetchTableContent(TableId tableId, int columnCount) throws SQLException {
List<String> results = new ArrayList<>();
ResultSet rs =
STARROCKS_CONTAINER
.createConnection("")
.createStatement()
.executeQuery(
String.format(
"SELECT * FROM %s.%s",
tableId.getSchemaName(), tableId.getTableName()));
while (rs.next()) {
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
columns.add(rs.getString(i));
}
results.add(String.join(" | ", columns));
}
return results;
}
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
public static void assertMapEquals(Map<String, ?> expected, Map<String, ?> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
for (String key : expected.keySet()) {
assertEquals(expected.get(key), actual.get(key));
}
}
}

@ -85,6 +85,12 @@ limitations under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
@ -96,7 +102,6 @@ limitations under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>

Loading…
Cancel
Save