From be9b52750c7bb4835a7b8631ebdb17666aff6bb1 Mon Sep 17 00:00:00 2001 From: Runkang He Date: Mon, 11 Nov 2024 13:55:02 +0800 Subject: [PATCH] [FLINK-36572][pipeline-connector][starrocks] Fix the issue that the local time zone is wrongly set This closes #3655. --- .../starrocks/sink/StarRocksDataSink.java | 6 ++++ .../sink/StarRocksDataSinkFactory.java | 2 +- .../sink/StarRocksDataSinkFactoryTest.java | 33 +++++++++++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java index 4c1969ae9..9811a010d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.starrocks.sink; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.common.sink.EventSinkProvider; @@ -78,4 +79,9 @@ public class StarRocksDataSink implements DataSink, Serializable { sinkOptions.getPassword()); return new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig); } + + @VisibleForTesting + public ZoneId getZoneId() { + return zoneId; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java index d1995fee6..f9fe58034 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java @@ -51,7 +51,7 @@ public class StarRocksDataSinkFactory implements DataSinkFactory { TableCreateConfig.from(context.getFactoryConfiguration()); SchemaChangeConfig schemaChangeConfig = SchemaChangeConfig.from(context.getFactoryConfiguration()); - String zoneStr = context.getFactoryConfiguration().get(PIPELINE_LOCAL_TIME_ZONE); + String zoneStr = context.getPipelineConfiguration().get(PIPELINE_LOCAL_TIME_ZONE); ZoneId zoneId = PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zoneStr) ? ZoneId.systemDefault() diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java index 7f498beb8..9b178874e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java @@ -30,6 +30,7 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.ZoneId; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -150,4 +151,36 @@ public class StarRocksDataSinkFactoryTest { conf, conf, Thread.currentThread().getContextClassLoader())); Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class); } + + @Test + void testCreateDataSinkWithSpecificedTimeZone() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class); + + Configuration factoryConfiguration = + Configuration.fromMap( + ImmutableMap.builder() + .put("jdbc-url", "jdbc:mysql://127.0.0.1:9030") + .put("load-url", "127.0.0.1:8030") + .put("username", "root") + .put("password", "") + .build()); + Configuration pipelineConfiguration = + Configuration.fromMap( + ImmutableMap.builder() + .put("local-time-zone", "America/Los_Angeles") + .build()); + DataSink dataSink = + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + factoryConfiguration, + pipelineConfiguration, + Thread.currentThread().getContextClassLoader())); + Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class); + + ZoneId zoneId = ((StarRocksDataSink) dataSink).getZoneId(); + ZoneId expectedZondId = ZoneId.of("America/Los_Angeles"); + Assertions.assertThat(zoneId).isEqualTo(expectedZondId); + } }