[pipeline-connector][doris] add doris pipeline connector. (#2810)

Co-authored-by: wudi <676366545@qq.com>
pull/2815/head
Kunni 1 year ago committed by GitHub
parent 8ca3091805
commit 4abd86a500
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -24,6 +24,7 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
/**
* An internal data structure representing data of {@link LocalZonedTimestampType}.
@ -74,7 +75,9 @@ public final class LocalZonedTimestampData implements Comparable<LocalZonedTimes
milliOfSecond += 1000;
}
long nanoAdjustment = milliOfSecond * 1_000_000 + epochNanoOfMillisecond;
return Instant.ofEpochSecond(epochSecond, nanoAdjustment);
return Instant.ofEpochSecond(epochSecond, nanoAdjustment)
.atZone(ZoneId.of("UTC"))
.toInstant();
}
@Override

@ -426,8 +426,12 @@ public class DataTypes {
return dataType.accept(LENGTH_EXTRACTOR);
}
private static final PrecisionExtractor PRECISION_EXTRACTOR = new PrecisionExtractor();
public static OptionalInt getScale(DataType dataType) {
return dataType.accept(SCALE_EXTRACTOR);
}
private static final PrecisionExtractor PRECISION_EXTRACTOR = new PrecisionExtractor();
private static final ScaleExtractor SCALE_EXTRACTOR = new ScaleExtractor();
private static final LengthExtractor LENGTH_EXTRACTOR = new LengthExtractor();
private static class PrecisionExtractor extends DataTypeDefaultVisitor<OptionalInt> {
@ -463,6 +467,18 @@ public class DataTypes {
}
}
private static class ScaleExtractor extends DataTypeDefaultVisitor<OptionalInt> {
@Override
public OptionalInt visit(DecimalType decimalType) {
return OptionalInt.of(decimalType.getScale());
}
@Override
protected OptionalInt defaultMethod(DataType dataType) {
return OptionalInt.empty();
}
}
private static class LengthExtractor extends DataTypeDefaultVisitor<OptionalInt> {
@Override

@ -16,6 +16,8 @@
package com.ververica.cdc.common.types.utils;
import org.apache.flink.util.CollectionUtil;
import com.ververica.cdc.common.data.ArrayData;
import com.ververica.cdc.common.data.DecimalData;
import com.ververica.cdc.common.data.MapData;
@ -24,6 +26,10 @@ import com.ververica.cdc.common.data.StringData;
import com.ververica.cdc.common.data.TimestampData;
import com.ververica.cdc.common.data.ZonedTimestampData;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.utils.Preconditions;
import java.util.List;
/** Utilities for handling {@link DataType}s. */
public class DataTypeUtils {
@ -73,4 +79,65 @@ public class DataTypeUtils {
throw new IllegalArgumentException("Illegal type: " + type);
}
}
/**
* Convert CDC's {@link DataType} to Flink's internal {@link
* org.apache.flink.table.types.DataType}.
*/
public static org.apache.flink.table.types.DataType toFlinkDataType(DataType type) {
// ordered by type root definition
List<DataType> children = type.getChildren();
int length = DataTypes.getLength(type).orElse(0);
int precision = DataTypes.getPrecision(type).orElse(0);
int scale = DataTypes.getScale(type).orElse(0);
switch (type.getTypeRoot()) {
case CHAR:
return org.apache.flink.table.api.DataTypes.CHAR(length);
case VARCHAR:
return org.apache.flink.table.api.DataTypes.VARCHAR(length);
case BOOLEAN:
return org.apache.flink.table.api.DataTypes.BOOLEAN();
case BINARY:
return org.apache.flink.table.api.DataTypes.BINARY(length);
case VARBINARY:
return org.apache.flink.table.api.DataTypes.VARBINARY(length);
case DECIMAL:
return org.apache.flink.table.api.DataTypes.DECIMAL(precision, scale);
case TINYINT:
return org.apache.flink.table.api.DataTypes.TINYINT();
case SMALLINT:
return org.apache.flink.table.api.DataTypes.SMALLINT();
case INTEGER:
return org.apache.flink.table.api.DataTypes.INT();
case DATE:
return org.apache.flink.table.api.DataTypes.DATE();
case TIME_WITHOUT_TIME_ZONE:
return org.apache.flink.table.api.DataTypes.TIME(length);
case BIGINT:
return org.apache.flink.table.api.DataTypes.BIGINT();
case FLOAT:
return org.apache.flink.table.api.DataTypes.FLOAT();
case DOUBLE:
return org.apache.flink.table.api.DataTypes.DOUBLE();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE(length);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(length);
case TIMESTAMP_WITH_TIME_ZONE:
return org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE(length);
case ARRAY:
Preconditions.checkState(children != null && children.size() > 0);
return org.apache.flink.table.api.DataTypes.ARRAY(toFlinkDataType(children.get(0)));
case MAP:
Preconditions.checkState(children != null && children.size() > 1);
return org.apache.flink.table.api.DataTypes.MAP(
toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1)));
case ROW:
Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
return org.apache.flink.table.api.DataTypes.ROW(
children.toArray(new org.apache.flink.table.types.DataType[] {}));
default:
throw new IllegalArgumentException("Illegal type: " + type);
}
}
}

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2023 Ververica Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-pipeline-connectors</artifactId>
<version>${revision}</version>
</parent>
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
<name>flink-cdc-pipeline-connector-doris</name>
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-${flink.major.version}</artifactId>
<version>1.5.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
<include>org.apache.doris:*</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,160 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.doris.factory;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.ConfigOption;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.pipeline.PipelineOptions;
import com.ververica.cdc.common.sink.DataSink;
import com.ververica.cdc.connectors.doris.sink.DorisDataSink;
import com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.AUTO_REDIRECT;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.JDBC_URL;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_COUNT;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_FLUSH_MAX_BYTES;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_SIZE;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_CHECK_INTERVAL;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_2PC;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_FLUSH_QUEUE_SIZE;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_IGNORE_UPDATE_BEFORE;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_LABEL_PREFIX;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_MAX_RETRIES;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_USE_CACHE;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.STREAM_LOAD_PROP_PREFIX;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
/** A dummy {@link DataSinkFactory} to create {@link DorisDataSink}. */
@Internal
public class DorisDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
Configuration config = context.getFactoryConfiguration();
DorisOptions.Builder optionsBuilder = DorisOptions.builder();
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
config.getOptional(FENODES).ifPresent(optionsBuilder::setFenodes);
config.getOptional(BENODES).ifPresent(optionsBuilder::setBenodes);
config.getOptional(USERNAME).ifPresent(optionsBuilder::setUsername);
config.getOptional(PASSWORD).ifPresent(optionsBuilder::setPassword);
config.getOptional(JDBC_URL).ifPresent(optionsBuilder::setJdbcUrl);
config.getOptional(AUTO_REDIRECT).ifPresent(optionsBuilder::setAutoRedirect);
config.getOptional(SINK_CHECK_INTERVAL).ifPresent(executionBuilder::setCheckInterval);
config.getOptional(SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries);
config.getOptional(SINK_ENABLE_DELETE).ifPresent(executionBuilder::setDeletable);
config.getOptional(SINK_LABEL_PREFIX).ifPresent(executionBuilder::setLabelPrefix);
config.getOptional(SINK_BUFFER_SIZE).ifPresent(executionBuilder::setBufferSize);
config.getOptional(SINK_BUFFER_COUNT).ifPresent(executionBuilder::setBufferCount);
config.getOptional(SINK_BUFFER_FLUSH_MAX_ROWS)
.ifPresent(executionBuilder::setBufferFlushMaxRows);
config.getOptional(SINK_BUFFER_FLUSH_MAX_BYTES)
.ifPresent(executionBuilder::setBufferFlushMaxBytes);
config.getOptional(SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize);
config.getOptional(SINK_IGNORE_UPDATE_BEFORE)
.ifPresent(executionBuilder::setIgnoreUpdateBefore);
config.getOptional(SINK_USE_CACHE).ifPresent(executionBuilder::setUseCache);
config.getOptional(SINK_BUFFER_FLUSH_INTERVAL)
.ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));
config.getOptional(SINK_ENABLE_2PC)
.ifPresent(
b -> {
if (b) {
executionBuilder.enable2PC();
} else {
executionBuilder.disable2PC();
}
});
// default batch mode
executionBuilder.setBatchMode(config.get(SINK_ENABLE_BATCH_MODE));
// set streamload properties
Properties properties = DorisExecutionOptions.defaultsProperties();
Map<String, String> streamLoadProp =
DorisDataSinkOptions.getPropertiesByPrefix(config, STREAM_LOAD_PROP_PREFIX);
properties.putAll(streamLoadProp);
executionBuilder.setStreamLoadProp(properties);
return new DorisDataSink(
optionsBuilder.build(),
DorisReadOptions.builder().build(),
executionBuilder.build(),
config,
ZoneId.of(
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE)));
}
@Override
public String identifier() {
return "doris";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(USERNAME);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(BENODES);
options.add(USERNAME);
options.add(PASSWORD);
options.add(JDBC_URL);
options.add(AUTO_REDIRECT);
options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);
options.add(SINK_MAX_RETRIES);
options.add(SINK_ENABLE_DELETE);
options.add(SINK_LABEL_PREFIX);
options.add(SINK_BUFFER_SIZE);
options.add(SINK_BUFFER_COUNT);
options.add(SINK_ENABLE_BATCH_MODE);
options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
options.add(SINK_BUFFER_FLUSH_MAX_BYTES);
options.add(SINK_FLUSH_QUEUE_SIZE);
options.add(SINK_BUFFER_FLUSH_INTERVAL);
options.add(SINK_IGNORE_UPDATE_BEFORE);
options.add(SINK_USE_CACHE);
return options;
}
}

@ -0,0 +1,78 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.doris.sink;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.sink.DataSink;
import com.ververica.cdc.common.sink.EventSinkProvider;
import com.ververica.cdc.common.sink.FlinkSinkProvider;
import com.ververica.cdc.common.sink.MetadataApplier;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.batch.DorisBatchSink;
import java.io.Serializable;
import java.time.ZoneId;
/** A {@link DataSink} for "Doris" connector. */
public class DorisDataSink implements DataSink, Serializable {
private final DorisOptions dorisOptions;
private final DorisReadOptions readOptions;
private final DorisExecutionOptions executionOptions;
private Configuration configuration;
private final ZoneId zoneId;
public DorisDataSink(
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions dorisExecutionOptions,
Configuration configuration,
ZoneId zoneId) {
this.dorisOptions = dorisOptions;
this.readOptions = dorisReadOptions;
this.executionOptions = dorisExecutionOptions;
this.configuration = configuration;
this.zoneId = zoneId;
}
@Override
public EventSinkProvider getEventSinkProvider() {
if (!executionOptions.enableBatchMode()) {
return FlinkSinkProvider.of(
new DorisSink<>(
dorisOptions,
readOptions,
executionOptions,
new DorisEventSerializer(zoneId)));
} else {
return FlinkSinkProvider.of(
new DorisBatchSink<>(
dorisOptions,
readOptions,
executionOptions,
new DorisEventSerializer(zoneId)));
}
}
@Override
public MetadataApplier getMetadataApplier() {
return new DorisMetadataApplier(dorisOptions, configuration);
}
}

@ -0,0 +1,166 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.doris.sink;
import com.ververica.cdc.common.configuration.ConfigOption;
import com.ververica.cdc.common.configuration.ConfigOptions;
import com.ververica.cdc.common.configuration.Configuration;
import org.apache.doris.flink.table.DorisConfigOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/** DorisDataSink Options reference {@link DorisConfigOptions}. */
public class DorisDataSinkOptions {
public static final ConfigOption<String> FENODES =
ConfigOptions.key("fenodes")
.stringType()
.noDefaultValue()
.withDescription("doris fe http address.");
public static final ConfigOption<String> BENODES =
ConfigOptions.key("benodes")
.stringType()
.noDefaultValue()
.withDescription("doris be http address.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("the doris user name.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("the doris password.");
public static final ConfigOption<String> JDBC_URL =
ConfigOptions.key("jdbc-url")
.stringType()
.noDefaultValue()
.withDescription("doris jdbc url address.");
public static final ConfigOption<Boolean> AUTO_REDIRECT =
ConfigOptions.key("auto-redirect")
.booleanType()
.defaultValue(false)
.withDescription(
"Use automatic redirection of fe without explicitly obtaining the be list");
// Streaming Sink options
public static final ConfigOption<Boolean> SINK_ENABLE_2PC =
ConfigOptions.key("sink.enable-2pc")
.booleanType()
.defaultValue(false)
.withDescription("enable 2PC while loading");
public static final ConfigOption<Integer> SINK_CHECK_INTERVAL =
ConfigOptions.key("sink.check-interval")
.intType()
.defaultValue(10000)
.withDescription("check exception with the interval while loading");
public static final ConfigOption<Integer> SINK_MAX_RETRIES =
ConfigOptions.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
public static final ConfigOption<Integer> SINK_BUFFER_SIZE =
ConfigOptions.key("sink.buffer-size")
.intType()
.defaultValue(1024 * 1024)
.withDescription("the buffer size to cache data for stream load.");
public static final ConfigOption<Integer> SINK_BUFFER_COUNT =
ConfigOptions.key("sink.buffer-count")
.intType()
.defaultValue(3)
.withDescription("the buffer count to cache data for stream load.");
public static final ConfigOption<String> SINK_LABEL_PREFIX =
ConfigOptions.key("sink.label-prefix")
.stringType()
.defaultValue("")
.withDescription("the unique label prefix.");
public static final ConfigOption<Boolean> SINK_ENABLE_DELETE =
ConfigOptions.key("sink.enable-delete")
.booleanType()
.defaultValue(true)
.withDescription("whether to enable the delete function");
// batch sink options
public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE =
ConfigOptions.key("sink.enable.batch-mode")
.booleanType()
.defaultValue(true)
.withDescription("Whether to enable batch write mode");
public static final ConfigOption<Integer> SINK_FLUSH_QUEUE_SIZE =
ConfigOptions.key("sink.flush.queue-size")
.intType()
.defaultValue(2)
.withDescription("Queue length for async stream load, default is 2");
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
.defaultValue(50000)
.withDescription(
"The maximum number of flush items in each batch, the default is 5w");
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_BYTES =
ConfigOptions.key("sink.buffer-flush.max-bytes")
.intType()
.defaultValue(10 * 1024 * 1024)
.withDescription(
"The maximum number of bytes flushed in each batch, the default is 10MB");
public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
ConfigOptions.key("sink.buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription(
"the flush interval mills, over this time, asynchronous threads will flush data. The "
+ "default value is 10s.");
public static final ConfigOption<Boolean> SINK_IGNORE_UPDATE_BEFORE =
ConfigOptions.key("sink.ignore.update-before")
.booleanType()
.defaultValue(true)
.withDescription(
"In the CDC scenario, when the primary key of the upstream is inconsistent with that of the downstream, the update-before data needs to be passed to the downstream as deleted data, otherwise the data cannot be deleted.\n"
+ "The default is to ignore, that is, perform upsert semantics.");
public static final ConfigOption<Boolean> SINK_USE_CACHE =
ConfigOptions.key("sink.use-cache")
.booleanType()
.defaultValue(false)
.withDescription("Whether to use buffer cache for breakpoint resume");
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
// Prefix for Doris Create table.
public static final String TABLE_CREATE_PROPERTIES_PREFIX = "table.create.properties.";
public static Map<String, String> getPropertiesByPrefix(
Configuration tableOptions, String prefix) {
final Map<String, String> props = new HashMap<>();
for (Map.Entry<String, String> entry : tableOptions.toMap().entrySet()) {
if (entry.getKey().startsWith(prefix)) {
String subKey = entry.getKey().substring(prefix.length());
props.put(subKey, entry.getValue());
}
}
return props;
}
}

@ -0,0 +1,132 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.doris.sink;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.OperationType;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Column;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.utils.Preconditions;
import com.ververica.cdc.common.utils.SchemaUtils;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
/** A serializer for Event to DorisRecord. */
public class DorisEventSerializer implements DorisRecordSerializer<Event> {
private ObjectMapper objectMapper = new ObjectMapper();
private Map<TableId, Schema> schemaMaps = new HashMap<>();
/** Format DATE type data. */
public static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy-MM-dd");
/** Format timestamp-related type data. */
public static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/** ZoneId from pipeline config to support timestamp with local time zone. */
public final ZoneId pipelineZoneId;
public DorisEventSerializer(ZoneId zoneId) {
pipelineZoneId = zoneId;
}
@Override
public DorisRecord serialize(Event event) throws IOException {
if (event instanceof DataChangeEvent) {
return applyDataChangeEvent((DataChangeEvent) event);
} else if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
TableId tableId = schemaChangeEvent.tableId();
if (event instanceof CreateTableEvent) {
schemaMaps.put(tableId, ((CreateTableEvent) event).getSchema());
} else {
if (!schemaMaps.containsKey(tableId)) {
throw new RuntimeException("schema of " + tableId + " is not existed.");
}
schemaMaps.put(
tableId,
SchemaUtils.applySchemaChangeEvent(
schemaMaps.get(tableId), schemaChangeEvent));
}
}
return null;
}
private DorisRecord applyDataChangeEvent(DataChangeEvent event) throws JsonProcessingException {
TableId tableId = event.tableId();
Schema schema = schemaMaps.get(tableId);
Preconditions.checkNotNull(schema, event.tableId() + " is not existed");
Map<String, Object> valueMap;
OperationType op = event.op();
switch (op) {
case INSERT:
case UPDATE:
case REPLACE:
valueMap = serializerRecord(event.after(), schema);
addDeleteSign(valueMap, false);
break;
case DELETE:
valueMap = serializerRecord(event.before(), schema);
addDeleteSign(valueMap, true);
break;
default:
throw new UnsupportedOperationException("Unsupport Operation " + op);
}
return DorisRecord.of(
tableId.getSchemaName(),
tableId.getTableName(),
objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
}
/** serializer RecordData to Doris Value. */
public Map<String, Object> serializerRecord(RecordData recordData, Schema schema) {
List<Column> columns = schema.getColumns();
Map<String, Object> record = new HashMap<>();
Preconditions.checkState(
columns.size() == recordData.getArity(),
"Column size does not match the data size");
for (int i = 0; i < recordData.getArity(); i++) {
DorisRowConverter.SerializationConverter converter =
DorisRowConverter.createNullableExternalConverter(
columns.get(i).getType(), pipelineZoneId);
Object field = converter.serialize(i, recordData);
record.put(columns.get(i).getName(), field);
}
return record;
}
}

@ -0,0 +1,195 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.doris.sink;
import org.apache.flink.util.CollectionUtil;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.AddColumnEvent;
import com.ververica.cdc.common.event.AlterColumnTypeEvent;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.DropColumnEvent;
import com.ververica.cdc.common.event.RenameColumnEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Column;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.sink.MetadataApplier;
import com.ververica.cdc.common.types.DataTypeChecks;
import com.ververica.cdc.common.types.LocalZonedTimestampType;
import com.ververica.cdc.common.types.TimestampType;
import com.ververica.cdc.common.types.ZonedTimestampType;
import com.ververica.cdc.common.types.utils.DataTypeUtils;
import org.apache.doris.flink.catalog.DorisTypeMapper;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
/** Supports {@link DorisDataSink} to schema evolution. */
public class DorisMetadataApplier implements MetadataApplier {
private static final Logger LOG = LoggerFactory.getLogger(DorisMetadataApplier.class);
private DorisOptions dorisOptions;
private SchemaChangeManager schemaChangeManager;
private Configuration config;
public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) {
this.dorisOptions = dorisOptions;
this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
this.config = config;
}
@Override
public void applySchemaChange(SchemaChangeEvent event) {
try {
// send schema change op to doris
if (event instanceof CreateTableEvent) {
applyCreateTableEvent((CreateTableEvent) event);
} else if (event instanceof AddColumnEvent) {
applyAddColumnEvent((AddColumnEvent) event);
} else if (event instanceof DropColumnEvent) {
applyDropColumnEvent((DropColumnEvent) event);
} else if (event instanceof RenameColumnEvent) {
applyRenameColumnEvent((RenameColumnEvent) event);
} else if (event instanceof AlterColumnTypeEvent) {
throw new RuntimeException("Unsupport schema change event, " + event);
}
} catch (Exception ex) {
throw new RuntimeException(
"Failed to schema change, " + event + ", reason: " + ex.getMessage());
}
}
private void applyCreateTableEvent(CreateTableEvent event)
throws IOException, IllegalArgumentException {
Schema schema = event.getSchema();
TableId tableId = event.tableId();
TableSchema tableSchema = new TableSchema();
tableSchema.setTable(tableId.getTableName());
tableSchema.setDatabase(tableId.getSchemaName());
tableSchema.setFields(buildFields(schema));
tableSchema.setDistributeKeys(buildDistributeKeys(schema));
if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
tableSchema.setModel(DataModel.DUPLICATE);
} else {
tableSchema.setKeys(schema.primaryKeys());
tableSchema.setModel(DataModel.UNIQUE);
}
Map<String, String> tableProperties =
DorisDataSinkOptions.getPropertiesByPrefix(config, TABLE_CREATE_PROPERTIES_PREFIX);
tableSchema.setProperties(tableProperties);
schemaChangeManager.createTable(tableSchema);
}
private Map<String, FieldSchema> buildFields(Schema schema) {
// Guaranteed the order of column
Map<String, FieldSchema> fieldSchemaMap = new LinkedHashMap<>();
List<String> columnNameList = schema.getColumnNames();
for (String columnName : columnNameList) {
Column column = schema.getColumn(columnName).get();
String typeString;
if (column.getType() instanceof LocalZonedTimestampType
|| column.getType() instanceof TimestampType
|| column.getType() instanceof ZonedTimestampType) {
int precision = DataTypeChecks.getPrecision(column.getType());
typeString =
String.format("%s(%s)", "DATETIMEV2", Math.min(Math.max(precision, 0), 6));
} else {
typeString =
DorisTypeMapper.toDorisType(
DataTypeUtils.toFlinkDataType(column.getType()));
}
fieldSchemaMap.put(
column.getName(),
new FieldSchema(column.getName(), typeString, column.getComment()));
}
return fieldSchemaMap;
}
private List<String> buildDistributeKeys(Schema schema) {
if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
return schema.primaryKeys();
}
if (!CollectionUtil.isNullOrEmpty(schema.getColumnNames())) {
return Collections.singletonList(schema.getColumnNames().get(0));
}
return new ArrayList<>();
}
private void applyAddColumnEvent(AddColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns();
for (AddColumnEvent.ColumnWithPosition col : addedColumns) {
Column column = col.getAddColumn();
String typeString;
if (column.getType() instanceof LocalZonedTimestampType
|| column.getType() instanceof TimestampType
|| column.getType() instanceof ZonedTimestampType) {
int precision = DataTypeChecks.getPrecision(column.getType());
typeString =
String.format("%s(%s)", "DATETIMEV2", Math.min(Math.max(precision, 0), 6));
} else {
typeString =
DorisTypeMapper.toDorisType(
DataTypeUtils.toFlinkDataType(column.getType()));
}
FieldSchema addFieldSchema =
new FieldSchema(column.getName(), typeString, column.getComment());
schemaChangeManager.addColumn(
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
}
}
private void applyDropColumnEvent(DropColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
List<Column> droppedColumns = event.getDroppedColumns();
for (Column col : droppedColumns) {
schemaChangeManager.dropColumn(
tableId.getSchemaName(), tableId.getTableName(), col.getName());
}
}
private void applyRenameColumnEvent(RenameColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
Map<String, String> nameMapping = event.getNameMapping();
for (Map.Entry<String, String> entry : nameMapping.entrySet()) {
schemaChangeManager.renameColumn(
tableId.getSchemaName(),
tableId.getTableName(),
entry.getKey(),
entry.getValue());
}
}
}

@ -0,0 +1,177 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.doris.sink;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.common.data.ArrayData;
import com.ververica.cdc.common.data.GenericArrayData;
import com.ververica.cdc.common.data.GenericMapData;
import com.ververica.cdc.common.data.MapData;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.types.DataField;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypeChecks;
import com.ververica.cdc.common.types.DecimalType;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.common.types.ZonedTimestampType;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Date;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.ververica.cdc.connectors.doris.sink.DorisEventSerializer.DATE_FORMATTER;
import static com.ververica.cdc.connectors.doris.sink.DorisEventSerializer.DATE_TIME_FORMATTER;
/** converter {@link RecordData} type object to doris field. */
public class DorisRowConverter implements Serializable {
private static final long serialVersionUID = 1L;
private static final ObjectMapper objectMapper = new ObjectMapper();
/** Runtime converter to convert {@link RecordData} type object to doris field. */
@FunctionalInterface
interface SerializationConverter extends Serializable {
Object serialize(int index, RecordData field);
}
static SerializationConverter createNullableExternalConverter(
DataType type, ZoneId pipelineZoneId) {
return wrapIntoNullableExternalConverter(createExternalConverter(type, pipelineZoneId));
}
static SerializationConverter wrapIntoNullableExternalConverter(
SerializationConverter serializationConverter) {
return (index, val) -> {
if (val == null || val.isNullAt(index)) {
return null;
} else {
return serializationConverter.serialize(index, val);
}
};
}
static SerializationConverter createExternalConverter(DataType type, ZoneId pipelineZoneId) {
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
return (index, val) -> val.getString(index).toString();
case BOOLEAN:
return (index, val) -> val.getBoolean(index);
case BINARY:
case VARBINARY:
return (index, val) -> val.getBinary(index);
case DECIMAL:
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
return (index, val) ->
val.getDecimal(index, decimalPrecision, decimalScale).toBigDecimal();
case TINYINT:
return (index, val) -> val.getByte(index);
case SMALLINT:
return (index, val) -> val.getShort(index);
case INTEGER:
return (index, val) -> val.getInt(index);
case BIGINT:
return (index, val) -> val.getLong(index);
case FLOAT:
return (index, val) -> val.getFloat(index);
case DOUBLE:
return (index, val) -> val.getDouble(index);
case DATE:
return (index, val) ->
DATE_FORMATTER.format(
Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (index, val) ->
val.getTimestamp(index, DataTypeChecks.getPrecision(type))
.toLocalDateTime()
.format(DATE_TIME_FORMATTER);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return (index, val) ->
ZonedDateTime.ofInstant(
val.getLocalZonedTimestampData(
index, DataTypeChecks.getPrecision(type))
.toInstant(),
pipelineZoneId)
.toLocalDateTime()
.format(DATE_TIME_FORMATTER);
case TIMESTAMP_WITH_TIME_ZONE:
final int zonedP = ((ZonedTimestampType) type).getPrecision();
return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
case ARRAY:
return (index, val) -> convertArrayData(val.getArray(index), type);
case MAP:
return (index, val) -> writeValueAsString(convertMapData(val.getMap(index), type));
case ROW:
return (index, val) ->
writeValueAsString(convertRowData(val, index, type, pipelineZoneId));
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
private static List<Object> convertArrayData(ArrayData array, DataType type) {
if (array instanceof GenericArrayData) {
return Arrays.asList(((GenericArrayData) array).toObjectArray());
}
throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
}
private static Object convertMapData(MapData map, DataType type) {
Map<Object, Object> result = new HashMap<>();
if (map instanceof GenericMapData) {
GenericMapData gMap = (GenericMapData) map;
for (Object key : ((GenericArrayData) gMap.keyArray()).toObjectArray()) {
result.put(key, gMap.get(key));
}
return result;
}
throw new UnsupportedOperationException("Unsupported map data: " + map.getClass());
}
private static Object convertRowData(
RecordData val, int index, DataType type, ZoneId pipelineZoneId) {
RowType rowType = (RowType) type;
Map<String, Object> value = new HashMap<>();
RecordData row = val.getRow(index, rowType.getFieldCount());
List<DataField> fields = rowType.getFields();
for (int i = 0; i < fields.size(); i++) {
DataField rowField = fields.get(i);
SerializationConverter converter =
createNullableExternalConverter(rowField.getType(), pipelineZoneId);
Object valTmp = converter.serialize(i, row);
value.put(rowField.getName(), valTmp.toString());
}
return value;
}
private static String writeValueAsString(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.ververica.cdc.connectors.doris.factory.DorisDataSinkFactory

@ -0,0 +1,90 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.doris.sink;
import com.ververica.cdc.common.data.TimestampData;
import com.ververica.cdc.common.data.binary.BinaryRecordData;
import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.common.schema.Column;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.junit.Assert;
import org.junit.Test;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/** A test for {@link DorisRowConverter} . */
public class DorisRowConverterTest {
@Test
public void testExternalConvert() {
List<Column> columns =
Arrays.asList(
Column.physicalColumn("f2", DataTypes.BOOLEAN()),
Column.physicalColumn("f3", DataTypes.FLOAT()),
Column.physicalColumn("f4", DataTypes.DOUBLE()),
Column.physicalColumn("f7", DataTypes.TINYINT()),
Column.physicalColumn("f8", DataTypes.SMALLINT()),
Column.physicalColumn("f9", DataTypes.INT()),
Column.physicalColumn("f10", DataTypes.BIGINT()),
Column.physicalColumn("f12", DataTypes.TIMESTAMP()),
Column.physicalColumn("f14", DataTypes.DATE()),
Column.physicalColumn("f15", DataTypes.CHAR(1)),
Column.physicalColumn("f16", DataTypes.VARCHAR(256)));
List<DataType> dataTypes =
columns.stream().map(v -> v.getType()).collect(Collectors.toList());
LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
LocalDate date1 = LocalDate.of(2021, 1, 1);
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(dataTypes.toArray(new DataType[] {})));
BinaryRecordData recordData =
generator.generate(
new Object[] {
true,
1.2F,
1.2345D,
(byte) 1,
(short) 32,
64,
128L,
TimestampData.fromLocalDateTime(time1),
(int) date1.toEpochDay(),
BinaryStringData.fromString("a"),
BinaryStringData.fromString("doris")
});
List row = new ArrayList();
for (int i = 0; i < recordData.getArity(); i++) {
DorisRowConverter.SerializationConverter converter =
DorisRowConverter.createNullableExternalConverter(
columns.get(i).getType(), ZoneId.systemDefault());
row.add(converter.serialize(i, recordData));
}
Assert.assertEquals(
"[true, 1.2, 1.2345, 1, 32, 64, 128, 2021-01-01 08:00:00, 2021-01-01, a, doris]",
row.toString());
}
}

@ -28,6 +28,7 @@ under the License.
<modules>
<module>flink-cdc-pipeline-connector-values</module>
<module>flink-cdc-pipeline-connector-mysql</module>
<module>flink-cdc-pipeline-connector-doris</module>
</modules>
<dependencies>

@ -72,6 +72,7 @@ under the License.
<!-- dependencies versions -->
<flink.version>1.18.0</flink.version>
<flink.major.version>1.18</flink.major.version>
<debezium.version>1.9.7.Final</debezium.version>
<tikv.version>3.2.0</tikv.version>
<geometry.version>2.2.0</geometry.version>

Loading…
Cancel
Save