[cdc-base] Improve Incremental snapshot framework code style. ()

pull/1071/head
gongzhongqiang committed by GitHub
parent b89c80d667
commit 1b27ee6b29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -61,26 +61,32 @@ public abstract class BaseSourceConfig implements SourceConfig {
this.dbzConfiguration = dbzConfiguration; this.dbzConfiguration = dbzConfiguration;
} }
@Override
public StartupOptions getStartupOptions() { public StartupOptions getStartupOptions() {
return startupOptions; return startupOptions;
} }
@Override
public int getSplitSize() { public int getSplitSize() {
return splitSize; return splitSize;
} }
@Override
public int getSplitMetaGroupSize() { public int getSplitMetaGroupSize() {
return splitMetaGroupSize; return splitMetaGroupSize;
} }
@Override
public double getDistributionFactorUpper() { public double getDistributionFactorUpper() {
return distributionFactorUpper; return distributionFactorUpper;
} }
@Override
public double getDistributionFactorLower() { public double getDistributionFactorLower() {
return distributionFactorLower; return distributionFactorLower;
} }
@Override
public boolean isIncludeSchemaChanges() { public boolean isIncludeSchemaChanges() {
return includeSchemaChanges; return includeSchemaChanges;
} }

@ -61,7 +61,7 @@ public interface DataSourceDialect<ID extends DataCollectionId, S, C extends Sou
*/ */
Offset displayCurrentOffset(C sourceConfig); Offset displayCurrentOffset(C sourceConfig);
/** Check if the CollectionId is case sensitive or not. */ /** Check if the CollectionId is case-sensitive or not. */
boolean isDataCollectionIdCaseSensitive(C sourceConfig); boolean isDataCollectionIdCaseSensitive(C sourceConfig);
/** Returns the {@link ChunkSplitter} which used to split collection to splits. */ /** Returns the {@link ChunkSplitter} which used to split collection to splits. */

@ -41,9 +41,11 @@ public interface JdbcDataSourceDialect
extends DataSourceDialect<TableId, TableChange, JdbcSourceConfig> { extends DataSourceDialect<TableId, TableChange, JdbcSourceConfig> {
/** Discovers the list of table to capture. */ /** Discovers the list of table to capture. */
@Override
List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig); List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig);
/** Discovers the captured tables' schema by {@link SourceConfig}. */ /** Discovers the captured tables' schema by {@link SourceConfig}. */
@Override
Map<TableId, TableChange> discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig); Map<TableId, TableChange> discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig);
/** /**

@ -64,7 +64,7 @@ public class JdbcSourceRecordEmitter<T>
private final SourceReaderMetrics sourceReaderMetrics; private final SourceReaderMetrics sourceReaderMetrics;
private final boolean includeSchemaChanges; private final boolean includeSchemaChanges;
private final OutputCollector<T> outputCollector; private final OutputCollector<T> outputCollector;
private OffsetFactory offsetFactory; private final OffsetFactory offsetFactory;
public JdbcSourceRecordEmitter( public JdbcSourceRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema, DebeziumDeserializationSchema<T> debeziumDeserializationSchema,

@ -39,13 +39,11 @@ public class JdbcConnectionPools implements ConnectionPools<HikariDataSource, Jd
public static synchronized JdbcConnectionPools getInstance( public static synchronized JdbcConnectionPools getInstance(
JdbcConnectionPoolFactory jdbcConnectionPoolFactory) { JdbcConnectionPoolFactory jdbcConnectionPoolFactory) {
if (instance != null) { if (instance == null) {
return instance;
} else {
JdbcConnectionPools.jdbcConnectionPoolFactory = jdbcConnectionPoolFactory; JdbcConnectionPools.jdbcConnectionPoolFactory = jdbcConnectionPoolFactory;
instance = new JdbcConnectionPools(); instance = new JdbcConnectionPools();
return instance;
} }
return instance;
} }
@Override @Override

@ -66,7 +66,7 @@ public class SnapshotSplitAssigner implements SplitAssigner {
@Nullable private Long checkpointIdToFinish; @Nullable private Long checkpointIdToFinish;
private final DataSourceDialect dialect; private final DataSourceDialect dialect;
private OffsetFactory offsetFactory; private final OffsetFactory offsetFactory;
public SnapshotSplitAssigner( public SnapshotSplitAssigner(
SourceConfig sourceConfig, SourceConfig sourceConfig,

@ -45,7 +45,7 @@ public class FinishedSnapshotSplitInfo implements OffsetDeserializerSerializer {
private final Object[] splitEnd; private final Object[] splitEnd;
private final Offset highWatermark; private final Offset highWatermark;
private OffsetFactory offsetFactory; private final OffsetFactory offsetFactory;
public FinishedSnapshotSplitInfo( public FinishedSnapshotSplitInfo(
TableId tableId, TableId tableId,

@ -41,6 +41,7 @@ public class SnapshotSplitState extends SourceSplitState {
this.highWatermark = highWatermark; this.highWatermark = highWatermark;
} }
@Override
public SnapshotSplit toSourceSplit() { public SnapshotSplit toSourceSplit() {
final SnapshotSplit snapshotSplit = split.asSnapshotSplit(); final SnapshotSplit snapshotSplit = split.asSnapshotSplit();
return new SnapshotSplit( return new SnapshotSplit(

@ -66,6 +66,7 @@ public class StreamSplitState extends SourceSplitState {
this.tableSchemas.put(tableId, latestTableChange); this.tableSchemas.put(tableId, latestTableChange);
} }
@Override
public StreamSplit toSourceSplit() { public StreamSplit toSourceSplit() {
final StreamSplit binlogSplit = split.asStreamSplit(); final StreamSplit binlogSplit = split.asStreamSplit();
return new StreamSplit( return new StreamSplit(

@ -52,9 +52,14 @@ public class SourceReaderMetrics {
} }
public void registerMetrics() { public void registerMetrics() {
metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) this::getFetchDelay); metricGroup.gauge(
metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) this::getEmitDelay); SoureReaderMetricConstants.CURRENT_FETCH_EVENT_TIME_LAG,
metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime); (Gauge<Long>) this::getFetchDelay);
metricGroup.gauge(
SoureReaderMetricConstants.CURRENT_EMIT_EVENT_TIME_LAG,
(Gauge<Long>) this::getEmitDelay);
metricGroup.gauge(
SoureReaderMetricConstants.SOURCE_IDLE_TIME, (Gauge<Long>) this::getIdleTime);
} }
public long getFetchDelay() { public long getFetchDelay() {

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.base.source.metrics;
/** A collection of Source Reader metrics related constant strings. */
public class SoureReaderMetricConstants {
public static final String SOURCE_IDLE_TIME = "sourceIdleTime";
public static final String CURRENT_FETCH_EVENT_TIME_LAG = "currentFetchEventTimeLag";
public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag";
}

@ -52,7 +52,7 @@ public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSp
@Nullable private Fetcher<SourceRecord, SourceSplitBase> currenFetcher; @Nullable private Fetcher<SourceRecord, SourceSplitBase> currenFetcher;
@Nullable private String currentSplitId; @Nullable private String currentSplitId;
private JdbcDataSourceDialect dataSourceDialect; private final JdbcDataSourceDialect dataSourceDialect;
public JdbcSourceSplitReader(int subtaskId, JdbcDataSourceDialect dataSourceDialect) { public JdbcSourceSplitReader(int subtaskId, JdbcDataSourceDialect dataSourceDialect) {
this.subtaskId = subtaskId; this.subtaskId = subtaskId;

@ -75,6 +75,7 @@ public class JdbcSourceStreamFetcher implements Fetcher<SourceRecord, SourceSpli
this.executor = Executors.newSingleThreadExecutor(threadFactory); this.executor = Executors.newSingleThreadExecutor(threadFactory);
} }
@Override
public void submitTask(FetchTask<SourceSplitBase> fetchTask) { public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
this.streamFetchTask = fetchTask; this.streamFetchTask = fetchTask;
this.currentStreamSplit = fetchTask.getSplit().asStreamSplit(); this.currentStreamSplit = fetchTask.getSplit().asStreamSplit();

@ -76,26 +76,17 @@ public class SourceRecordUtils {
public static boolean isLowWatermarkEvent(SourceRecord record) { public static boolean isLowWatermarkEvent(SourceRecord record) {
Optional<WatermarkKind> watermarkKind = getWatermarkKind(record); Optional<WatermarkKind> watermarkKind = getWatermarkKind(record);
if (watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.LOW) { return watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.LOW;
return true;
}
return false;
} }
public static boolean isHighWatermarkEvent(SourceRecord record) { public static boolean isHighWatermarkEvent(SourceRecord record) {
Optional<WatermarkKind> watermarkKind = getWatermarkKind(record); Optional<WatermarkKind> watermarkKind = getWatermarkKind(record);
if (watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.HIGH) { return watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.HIGH;
return true;
}
return false;
} }
public static boolean isEndWatermarkEvent(SourceRecord record) { public static boolean isEndWatermarkEvent(SourceRecord record) {
Optional<WatermarkKind> watermarkKind = getWatermarkKind(record); Optional<WatermarkKind> watermarkKind = getWatermarkKind(record);
if (watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.BINLOG_END) { return watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.BINLOG_END;
return true;
}
return false;
} }
/** /**
@ -134,10 +125,7 @@ public class SourceRecordUtils {
public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) { public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
Schema keySchema = sourceRecord.keySchema(); Schema keySchema = sourceRecord.keySchema();
if (keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())) { return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
return true;
}
return false;
} }
public static boolean isDataChangeRecord(SourceRecord record) { public static boolean isDataChangeRecord(SourceRecord record) {

@ -67,8 +67,7 @@ public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
// database history // database history
props.setProperty( props.setProperty(
"database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName()); "database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName());
props.setProperty( props.setProperty("database.history.instance.name", UUID.randomUUID() + "_" + subtaskId);
"database.history.instance.name", UUID.randomUUID().toString() + "_" + subtaskId);
props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true)); props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
props.setProperty("database.history.refer.ddl", String.valueOf(true)); props.setProperty("database.history.refer.ddl", String.valueOf(true));
props.setProperty("connect.timeout.ms", String.valueOf(connectTimeout.toMillis())); props.setProperty("connect.timeout.ms", String.valueOf(connectTimeout.toMillis()));

@ -62,7 +62,7 @@ public class ServerIdRange implements Serializable {
+ "please adjust the server id range to " + "please adjust the server id range to "
+ "make the number of server id larger than " + "make the number of server id larger than "
+ "the source parallelism.", + "the source parallelism.",
subTaskId, this.toString())); subTaskId, this));
} }
return startServerId + subTaskId; return startServerId + subTaskId;
} }

@ -150,8 +150,7 @@ public class MySqlScanFetchTask implements FetchTask<SourceSplitBase> {
private MySqlBinlogSplitReadTask createBackfillBinlogReadTask( private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(
StreamSplit backfillBinlogSplit, MySqlSourceFetchTaskContext context) { StreamSplit backfillBinlogSplit, MySqlSourceFetchTaskContext context) {
final MySqlOffsetContext.Loader loader = final MySqlOffsetContext.Loader loader =
new MySqlOffsetContext.Loader( new MySqlOffsetContext.Loader(context.getSourceConfig().getDbzConnectorConfig());
(MySqlConnectorConfig) context.getSourceConfig().getDbzConnectorConfig());
final MySqlOffsetContext mySqlOffsetContext = final MySqlOffsetContext mySqlOffsetContext =
(MySqlOffsetContext) (MySqlOffsetContext)
loader.load(backfillBinlogSplit.getStartingOffset().getOffset()); loader.load(backfillBinlogSplit.getStartingOffset().getOffset());

@ -25,7 +25,7 @@ public enum MySqlVersion {
V5_7("5.7"), V5_7("5.7"),
V8_0("8.0"); V8_0("8.0");
private String version; private final String version;
MySqlVersion(String version) { MySqlVersion(String version) {
this.version = version; this.version = version;

Loading…
Cancel
Save