From 1b27ee6b29c9869ba2353240bb7eb5b9ca5087c9 Mon Sep 17 00:00:00 2001 From: gongzhongqiang <764629910@qq.com> Date: Mon, 11 Apr 2022 11:51:05 +0800 Subject: [PATCH] [cdc-base] Improve Incremental snapshot framework code style. (#1054) --- .../base/config/BaseSourceConfig.java | 6 +++++ .../base/dialect/DataSourceDialect.java | 2 +- .../base/dialect/JdbcDataSourceDialect.java | 2 ++ .../relational/JdbcSourceRecordEmitter.java | 2 +- .../connection/JdbcConnectionPools.java | 6 ++--- .../assigner/SnapshotSplitAssigner.java | 2 +- .../meta/split/FinishedSnapshotSplitInfo.java | 2 +- .../source/meta/split/SnapshotSplitState.java | 1 + .../source/meta/split/StreamSplitState.java | 1 + .../source/metrics/SourceReaderMetrics.java | 11 +++++--- .../metrics/SoureReaderMetricConstants.java | 27 +++++++++++++++++++ .../source/reader/JdbcSourceSplitReader.java | 2 +- .../external/JdbcSourceStreamFetcher.java | 1 + .../base/utils/SourceRecordUtils.java | 20 +++----------- .../config/MySqlSourceConfigFactory.java | 3 +-- .../experimental/config/ServerIdRange.java | 2 +- .../fetch/MySqlScanFetchTask.java | 3 +-- .../base/testutils/MySqlVersion.java | 2 +- 18 files changed, 61 insertions(+), 34 deletions(-) create mode 100644 flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SoureReaderMetricConstants.java diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java index 0cc4578ef..c0c9b29fe 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java @@ -61,26 +61,32 @@ public abstract class BaseSourceConfig implements SourceConfig { this.dbzConfiguration = dbzConfiguration; } + @Override public StartupOptions getStartupOptions() { return startupOptions; } + @Override public int getSplitSize() { return splitSize; } + @Override public int getSplitMetaGroupSize() { return splitMetaGroupSize; } + @Override public double getDistributionFactorUpper() { return distributionFactorUpper; } + @Override public double getDistributionFactorLower() { return distributionFactorLower; } + @Override public boolean isIncludeSchemaChanges() { return includeSchemaChanges; } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java index e48d4b385..6d8f9269a 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java @@ -61,7 +61,7 @@ public interface DataSourceDialect { /** Discovers the list of table to capture. */ + @Override List discoverDataCollections(JdbcSourceConfig sourceConfig); /** Discovers the captured tables' schema by {@link SourceConfig}. */ + @Override Map discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig); /** diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java index a6299611d..cb7385fe7 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java @@ -64,7 +64,7 @@ public class JdbcSourceRecordEmitter private final SourceReaderMetrics sourceReaderMetrics; private final boolean includeSchemaChanges; private final OutputCollector outputCollector; - private OffsetFactory offsetFactory; + private final OffsetFactory offsetFactory; public JdbcSourceRecordEmitter( DebeziumDeserializationSchema debeziumDeserializationSchema, diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionPools.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionPools.java index 512cfe089..7f03f7773 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionPools.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionPools.java @@ -39,13 +39,11 @@ public class JdbcConnectionPools implements ConnectionPools) this::getFetchDelay); - metricGroup.gauge("currentEmitEventTimeLag", (Gauge) this::getEmitDelay); - metricGroup.gauge("sourceIdleTime", (Gauge) this::getIdleTime); + metricGroup.gauge( + SoureReaderMetricConstants.CURRENT_FETCH_EVENT_TIME_LAG, + (Gauge) this::getFetchDelay); + metricGroup.gauge( + SoureReaderMetricConstants.CURRENT_EMIT_EVENT_TIME_LAG, + (Gauge) this::getEmitDelay); + metricGroup.gauge( + SoureReaderMetricConstants.SOURCE_IDLE_TIME, (Gauge) this::getIdleTime); } public long getFetchDelay() { diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SoureReaderMetricConstants.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SoureReaderMetricConstants.java new file mode 100644 index 000000000..b6f5e457f --- /dev/null +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SoureReaderMetricConstants.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.base.source.metrics; + +/** A collection of Source Reader metrics related constant strings. */ +public class SoureReaderMetricConstants { + + public static final String SOURCE_IDLE_TIME = "sourceIdleTime"; + public static final String CURRENT_FETCH_EVENT_TIME_LAG = "currentFetchEventTimeLag"; + public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag"; +} diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java index 21d6a134c..d8d2f9547 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java @@ -52,7 +52,7 @@ public class JdbcSourceSplitReader implements SplitReader currenFetcher; @Nullable private String currentSplitId; - private JdbcDataSourceDialect dataSourceDialect; + private final JdbcDataSourceDialect dataSourceDialect; public JdbcSourceSplitReader(int subtaskId, JdbcDataSourceDialect dataSourceDialect) { this.subtaskId = subtaskId; diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java index 199fcfdc1..c5d4c809b 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java @@ -75,6 +75,7 @@ public class JdbcSourceStreamFetcher implements Fetcher fetchTask) { this.streamFetchTask = fetchTask; this.currentStreamSplit = fetchTask.getSplit().asStreamSplit(); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java index f670bd807..e18ec8456 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java @@ -76,26 +76,17 @@ public class SourceRecordUtils { public static boolean isLowWatermarkEvent(SourceRecord record) { Optional watermarkKind = getWatermarkKind(record); - if (watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.LOW) { - return true; - } - return false; + return watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.LOW; } public static boolean isHighWatermarkEvent(SourceRecord record) { Optional watermarkKind = getWatermarkKind(record); - if (watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.HIGH) { - return true; - } - return false; + return watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.HIGH; } public static boolean isEndWatermarkEvent(SourceRecord record) { Optional watermarkKind = getWatermarkKind(record); - if (watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.BINLOG_END) { - return true; - } - return false; + return watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.BINLOG_END; } /** @@ -134,10 +125,7 @@ public class SourceRecordUtils { public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) { Schema keySchema = sourceRecord.keySchema(); - if (keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())) { - return true; - } - return false; + return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); } public static boolean isDataChangeRecord(SourceRecord record) { diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java index 144e4ddaa..2a261df64 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java @@ -67,8 +67,7 @@ public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory { // database history props.setProperty( "database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName()); - props.setProperty( - "database.history.instance.name", UUID.randomUUID().toString() + "_" + subtaskId); + props.setProperty("database.history.instance.name", UUID.randomUUID() + "_" + subtaskId); props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true)); props.setProperty("database.history.refer.ddl", String.valueOf(true)); props.setProperty("connect.timeout.ms", String.valueOf(connectTimeout.toMillis())); diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/ServerIdRange.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/ServerIdRange.java index dc915113b..1977c38ed 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/ServerIdRange.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/ServerIdRange.java @@ -62,7 +62,7 @@ public class ServerIdRange implements Serializable { + "please adjust the server id range to " + "make the number of server id larger than " + "the source parallelism.", - subTaskId, this.toString())); + subTaskId, this)); } return startServerId + subTaskId; } diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlScanFetchTask.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlScanFetchTask.java index b9d594c6c..2cda60adf 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlScanFetchTask.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlScanFetchTask.java @@ -150,8 +150,7 @@ public class MySqlScanFetchTask implements FetchTask { private MySqlBinlogSplitReadTask createBackfillBinlogReadTask( StreamSplit backfillBinlogSplit, MySqlSourceFetchTaskContext context) { final MySqlOffsetContext.Loader loader = - new MySqlOffsetContext.Loader( - (MySqlConnectorConfig) context.getSourceConfig().getDbzConnectorConfig()); + new MySqlOffsetContext.Loader(context.getSourceConfig().getDbzConnectorConfig()); final MySqlOffsetContext mySqlOffsetContext = (MySqlOffsetContext) loader.load(backfillBinlogSplit.getStartingOffset().getOffset()); diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/testutils/MySqlVersion.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/testutils/MySqlVersion.java index c5caed937..8bc3cf2fc 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/testutils/MySqlVersion.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/testutils/MySqlVersion.java @@ -25,7 +25,7 @@ public enum MySqlVersion { V5_7("5.7"), V8_0("8.0"); - private String version; + private final String version; MySqlVersion(String version) { this.version = version;