[FLINK-36315][cdc-connector][base&pg&mongodb]The flink-cdc-base module supports source metric statistics (#3619)

Co-authored-by: molin.lxd <molin.lxd@alibaba-inc.com>
Co-authored-by: Hang Ruan <ruanhang1993@hotmail.com>
pull/3771/merge
liuxiaodong 2 months ago committed by GitHub
parent 26f5880fbf
commit ee9cd828c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -121,7 +121,6 @@ public class IncrementalSource<T, C extends SourceConfig>
final SourceReaderMetrics sourceReaderMetrics = final SourceReaderMetrics sourceReaderMetrics =
new SourceReaderMetrics(readerContext.metricGroup()); new SourceReaderMetrics(readerContext.metricGroup());
sourceReaderMetrics.registerMetrics();
IncrementalSourceReaderContext incrementalSourceReaderContext = IncrementalSourceReaderContext incrementalSourceReaderContext =
new IncrementalSourceReaderContext(readerContext); new IncrementalSourceReaderContext(readerContext);
Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier = Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier =
@ -161,13 +160,16 @@ public class IncrementalSource<T, C extends SourceConfig>
remainingTables, remainingTables,
isTableIdCaseSensitive, isTableIdCaseSensitive,
dataSourceDialect, dataSourceDialect,
offsetFactory); offsetFactory,
enumContext);
} catch (Exception e) { } catch (Exception e) {
throw new FlinkRuntimeException( throw new FlinkRuntimeException(
"Failed to discover captured tables for enumerator", e); "Failed to discover captured tables for enumerator", e);
} }
} else { } else {
splitAssigner = new StreamSplitAssigner(sourceConfig, dataSourceDialect, offsetFactory); splitAssigner =
new StreamSplitAssigner(
sourceConfig, dataSourceDialect, offsetFactory, enumContext);
} }
return new IncrementalSourceEnumerator( return new IncrementalSourceEnumerator(
@ -187,14 +189,16 @@ public class IncrementalSource<T, C extends SourceConfig>
enumContext.currentParallelism(), enumContext.currentParallelism(),
(HybridPendingSplitsState) checkpoint, (HybridPendingSplitsState) checkpoint,
dataSourceDialect, dataSourceDialect,
offsetFactory); offsetFactory,
enumContext);
} else if (checkpoint instanceof StreamPendingSplitsState) { } else if (checkpoint instanceof StreamPendingSplitsState) {
splitAssigner = splitAssigner =
new StreamSplitAssigner( new StreamSplitAssigner(
sourceConfig, sourceConfig,
(StreamPendingSplitsState) checkpoint, (StreamPendingSplitsState) checkpoint,
dataSourceDialect, dataSourceDialect,
offsetFactory); offsetFactory,
enumContext);
} else { } else {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Unsupported restored PendingSplitsState: " + checkpoint); "Unsupported restored PendingSplitsState: " + checkpoint);

@ -17,6 +17,8 @@
package org.apache.flink.cdc.connectors.base.source.assigner; package org.apache.flink.cdc.connectors.base.source.assigner;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.base.config.SourceConfig; import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect; import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.state.HybridPendingSplitsState; import org.apache.flink.cdc.connectors.base.source.assigner.state.HybridPendingSplitsState;
@ -27,6 +29,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSp
import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -61,13 +64,17 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
private final OffsetFactory offsetFactory; private final OffsetFactory offsetFactory;
private final SplitEnumeratorContext<? extends SourceSplit> enumeratorContext;
private SourceEnumeratorMetrics enumeratorMetrics;
public HybridSplitAssigner( public HybridSplitAssigner(
C sourceConfig, C sourceConfig,
int currentParallelism, int currentParallelism,
List<TableId> remainingTables, List<TableId> remainingTables,
boolean isTableIdCaseSensitive, boolean isTableIdCaseSensitive,
DataSourceDialect<C> dialect, DataSourceDialect<C> dialect,
OffsetFactory offsetFactory) { OffsetFactory offsetFactory,
SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
this( this(
sourceConfig, sourceConfig,
new SnapshotSplitAssigner<>( new SnapshotSplitAssigner<>(
@ -79,7 +86,8 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
offsetFactory), offsetFactory),
false, false,
sourceConfig.getSplitMetaGroupSize(), sourceConfig.getSplitMetaGroupSize(),
offsetFactory); offsetFactory,
enumeratorContext);
} }
public HybridSplitAssigner( public HybridSplitAssigner(
@ -87,7 +95,8 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
int currentParallelism, int currentParallelism,
HybridPendingSplitsState checkpoint, HybridPendingSplitsState checkpoint,
DataSourceDialect<C> dialect, DataSourceDialect<C> dialect,
OffsetFactory offsetFactory) { OffsetFactory offsetFactory,
SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
this( this(
sourceConfig, sourceConfig,
new SnapshotSplitAssigner<>( new SnapshotSplitAssigner<>(
@ -98,7 +107,8 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
offsetFactory), offsetFactory),
checkpoint.isStreamSplitAssigned(), checkpoint.isStreamSplitAssigned(),
sourceConfig.getSplitMetaGroupSize(), sourceConfig.getSplitMetaGroupSize(),
offsetFactory); offsetFactory,
enumeratorContext);
} }
private HybridSplitAssigner( private HybridSplitAssigner(
@ -106,17 +116,29 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
SnapshotSplitAssigner<C> snapshotSplitAssigner, SnapshotSplitAssigner<C> snapshotSplitAssigner,
boolean isStreamSplitAssigned, boolean isStreamSplitAssigned,
int splitMetaGroupSize, int splitMetaGroupSize,
OffsetFactory offsetFactory) { OffsetFactory offsetFactory,
SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
this.sourceConfig = sourceConfig; this.sourceConfig = sourceConfig;
this.snapshotSplitAssigner = snapshotSplitAssigner; this.snapshotSplitAssigner = snapshotSplitAssigner;
this.isStreamSplitAssigned = isStreamSplitAssigned; this.isStreamSplitAssigned = isStreamSplitAssigned;
this.splitMetaGroupSize = splitMetaGroupSize; this.splitMetaGroupSize = splitMetaGroupSize;
this.offsetFactory = offsetFactory; this.offsetFactory = offsetFactory;
this.enumeratorContext = enumeratorContext;
} }
@Override @Override
public void open() { public void open() {
this.enumeratorMetrics = new SourceEnumeratorMetrics(enumeratorContext.metricGroup());
if (isStreamSplitAssigned) {
enumeratorMetrics.enterStreamReading();
} else {
enumeratorMetrics.exitStreamReading();
}
snapshotSplitAssigner.open(); snapshotSplitAssigner.open();
// init enumerator metrics
snapshotSplitAssigner.initEnumeratorMetrics(enumeratorMetrics);
} }
@Override @Override
@ -126,6 +148,7 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
return Optional.empty(); return Optional.empty();
} }
if (snapshotSplitAssigner.noMoreSplits()) { if (snapshotSplitAssigner.noMoreSplits()) {
enumeratorMetrics.exitSnapshotPhase();
// stream split assigning // stream split assigning
if (isStreamSplitAssigned) { if (isStreamSplitAssigned) {
// no more splits for the assigner // no more splits for the assigner
@ -137,6 +160,7 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
// assigning the stream split. Otherwise, records emitted from stream split // assigning the stream split. Otherwise, records emitted from stream split
// might be out-of-order in terms of same primary key with snapshot splits. // might be out-of-order in terms of same primary key with snapshot splits.
isStreamSplitAssigned = true; isStreamSplitAssigned = true;
enumeratorMetrics.enterStreamReading();
StreamSplit streamSplit = createStreamSplit(); StreamSplit streamSplit = createStreamSplit();
LOG.trace( LOG.trace(
"SnapshotSplitAssigner is finished: creating a new stream split {}", "SnapshotSplitAssigner is finished: creating a new stream split {}",
@ -145,6 +169,7 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
} else if (isNewlyAddedAssigningFinished(snapshotSplitAssigner.getAssignerStatus())) { } else if (isNewlyAddedAssigningFinished(snapshotSplitAssigner.getAssignerStatus())) {
// do not need to create stream split, but send event to wake up the binlog reader // do not need to create stream split, but send event to wake up the binlog reader
isStreamSplitAssigned = true; isStreamSplitAssigned = true;
enumeratorMetrics.enterStreamReading();
return Optional.empty(); return Optional.empty();
} else { } else {
// stream split is not ready by now // stream split is not ready by now
@ -184,6 +209,9 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
isStreamSplitAssigned = false; isStreamSplitAssigned = false;
} }
} }
if (!snapshotSplits.isEmpty()) {
enumeratorMetrics.exitStreamReading();
}
snapshotSplitAssigner.addSplits(snapshotSplits); snapshotSplitAssigner.addSplits(snapshotSplits);
} }

@ -27,6 +27,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSp
import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
@ -49,6 +50,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus.INITIAL_ASSIGNING; import static org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus.INITIAL_ASSIGNING;
@ -81,6 +83,10 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
private final DataSourceDialect<C> dialect; private final DataSourceDialect<C> dialect;
private final OffsetFactory offsetFactory; private final OffsetFactory offsetFactory;
private SourceEnumeratorMetrics enumeratorMetrics;
private final Map<String, Long> splitFinishedCheckpointIds;
private static final long UNDEFINED_CHECKPOINT_ID = -1;
public SnapshotSplitAssigner( public SnapshotSplitAssigner(
C sourceConfig, C sourceConfig,
int currentParallelism, int currentParallelism,
@ -101,7 +107,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
isTableIdCaseSensitive, isTableIdCaseSensitive,
true, true,
dialect, dialect,
offsetFactory); offsetFactory,
new ConcurrentHashMap<>());
} }
public SnapshotSplitAssigner( public SnapshotSplitAssigner(
@ -123,7 +130,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
checkpoint.isTableIdCaseSensitive(), checkpoint.isTableIdCaseSensitive(),
checkpoint.isRemainingTablesCheckpointed(), checkpoint.isRemainingTablesCheckpointed(),
dialect, dialect,
offsetFactory); offsetFactory,
new ConcurrentHashMap<>());
} }
private SnapshotSplitAssigner( private SnapshotSplitAssigner(
@ -139,7 +147,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
boolean isTableIdCaseSensitive, boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed, boolean isRemainingTablesCheckpointed,
DataSourceDialect<C> dialect, DataSourceDialect<C> dialect,
OffsetFactory offsetFactory) { OffsetFactory offsetFactory,
Map<String, Long> splitFinishedCheckpointIds) {
this.sourceConfig = sourceConfig; this.sourceConfig = sourceConfig;
this.currentParallelism = currentParallelism; this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables; this.alreadyProcessedTables = alreadyProcessedTables;
@ -163,6 +172,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
this.isTableIdCaseSensitive = isTableIdCaseSensitive; this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.dialect = dialect; this.dialect = dialect;
this.offsetFactory = offsetFactory; this.offsetFactory = offsetFactory;
this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
} }
@Override @Override
@ -269,6 +279,46 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
} }
} }
/** This should be invoked after this class's open method. */
public void initEnumeratorMetrics(SourceEnumeratorMetrics enumeratorMetrics) {
this.enumeratorMetrics = enumeratorMetrics;
this.enumeratorMetrics.enterSnapshotPhase();
this.enumeratorMetrics.registerMetrics(
alreadyProcessedTables::size, assignedSplits::size, remainingSplits::size);
this.enumeratorMetrics.addNewTables(computeTablesPendingSnapshot());
for (SchemalessSnapshotSplit snapshotSplit : remainingSplits) {
this.enumeratorMetrics
.getTableMetrics(snapshotSplit.getTableId())
.addNewSplit(snapshotSplit.splitId());
}
for (SchemalessSnapshotSplit snapshotSplit : assignedSplits.values()) {
this.enumeratorMetrics
.getTableMetrics(snapshotSplit.getTableId())
.addProcessedSplit(snapshotSplit.splitId());
}
for (String splitId : splitFinishedOffsets.keySet()) {
TableId tableId = SnapshotSplit.extractTableId(splitId);
this.enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId);
}
}
// remainingTables + tables has been split but not processed
private int computeTablesPendingSnapshot() {
int numTablesPendingSnapshot = remainingTables.size();
Set<TableId> computedTables = new HashSet<>();
for (SchemalessSnapshotSplit split : remainingSplits) {
TableId tableId = split.getTableId();
if (!computedTables.contains(tableId)
&& !alreadyProcessedTables.contains(tableId)
&& !remainingTables.contains(tableId)) {
computedTables.add(tableId);
numTablesPendingSnapshot++;
}
}
return numTablesPendingSnapshot;
}
@Override @Override
public Optional<SourceSplitBase> getNext() { public Optional<SourceSplitBase> getNext() {
if (!remainingSplits.isEmpty()) { if (!remainingSplits.isEmpty()) {
@ -277,6 +327,9 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
SchemalessSnapshotSplit split = iterator.next(); SchemalessSnapshotSplit split = iterator.next();
iterator.remove(); iterator.remove();
assignedSplits.put(split.splitId(), split); assignedSplits.put(split.splitId(), split);
enumeratorMetrics
.getTableMetrics(split.getTableId())
.finishProcessSplit(split.splitId());
return Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId()))); return Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
} else { } else {
// it's turn for new table // it's turn for new table
@ -294,7 +347,15 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
.collect(Collectors.toList()); .collect(Collectors.toList());
remainingSplits.addAll(schemalessSnapshotSplits); remainingSplits.addAll(schemalessSnapshotSplits);
tableSchemas.putAll(tableSchema); tableSchemas.putAll(tableSchema);
if (!alreadyProcessedTables.contains(nextTable)) {
enumeratorMetrics.startSnapshotTables(1);
}
alreadyProcessedTables.add(nextTable); alreadyProcessedTables.add(nextTable);
List<String> splitIds =
schemalessSnapshotSplits.stream()
.map(SchemalessSnapshotSplit::splitId)
.collect(Collectors.toList());
enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
return getNext(); return getNext();
} else { } else {
return Optional.empty(); return Optional.empty();
@ -335,6 +396,12 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
@Override @Override
public void onFinishedSplits(Map<String, Offset> splitFinishedOffsets) { public void onFinishedSplits(Map<String, Offset> splitFinishedOffsets) {
this.splitFinishedOffsets.putAll(splitFinishedOffsets); this.splitFinishedOffsets.putAll(splitFinishedOffsets);
for (String splitId : splitFinishedOffsets.keySet()) {
splitFinishedCheckpointIds.put(splitId, UNDEFINED_CHECKPOINT_ID);
}
LOG.info(
"splitFinishedCheckpointIds size in onFinishedSplits: {}",
splitFinishedCheckpointIds == null ? 0 : splitFinishedCheckpointIds.size());
if (allSnapshotSplitsFinished() && isAssigningSnapshotSplits(assignerStatus)) { if (allSnapshotSplitsFinished() && isAssigningSnapshotSplits(assignerStatus)) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need // Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and stream split. // to care about the global output data order of snapshot splits and stream split.
@ -359,11 +426,31 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
// because they are failed // because they are failed
assignedSplits.remove(split.splitId()); assignedSplits.remove(split.splitId());
splitFinishedOffsets.remove(split.splitId()); splitFinishedOffsets.remove(split.splitId());
enumeratorMetrics
.getTableMetrics(split.asSnapshotSplit().getTableId())
.reprocessSplit(split.splitId());
TableId tableId = split.asSnapshotSplit().getTableId();
enumeratorMetrics.getTableMetrics(tableId).removeFinishedSplit(split.splitId());
} }
} }
@Override @Override
public SnapshotPendingSplitsState snapshotState(long checkpointId) { public SnapshotPendingSplitsState snapshotState(long checkpointId) {
if (splitFinishedCheckpointIds != null && !splitFinishedCheckpointIds.isEmpty()) {
for (Map.Entry<String, Long> splitFinishedCheckpointId :
splitFinishedCheckpointIds.entrySet()) {
if (splitFinishedCheckpointId.getValue() == UNDEFINED_CHECKPOINT_ID) {
splitFinishedCheckpointId.setValue(checkpointId);
}
}
LOG.info(
"SnapshotSplitAssigner snapshotState on checkpoint {} with splitFinishedCheckpointIds size {}.",
checkpointId,
splitFinishedCheckpointIds.size());
}
SnapshotPendingSplitsState state = SnapshotPendingSplitsState state =
new SnapshotPendingSplitsState( new SnapshotPendingSplitsState(
alreadyProcessedTables, alreadyProcessedTables,
@ -374,7 +461,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
assignerStatus, assignerStatus,
remainingTables, remainingTables,
isTableIdCaseSensitive, isTableIdCaseSensitive,
true); true,
splitFinishedCheckpointIds);
// we need a complete checkpoint before mark this assigner to be finished, to wait for all // we need a complete checkpoint before mark this assigner to be finished, to wait for all
// records of snapshot splits are completely processed // records of snapshot splits are completely processed
if (checkpointIdToFinish == null if (checkpointIdToFinish == null
@ -397,6 +485,27 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
} }
LOG.info("Snapshot split assigner is turn into finished status."); LOG.info("Snapshot split assigner is turn into finished status.");
} }
if (splitFinishedCheckpointIds != null && !splitFinishedCheckpointIds.isEmpty()) {
Iterator<Map.Entry<String, Long>> iterator =
splitFinishedCheckpointIds.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> splitFinishedCheckpointId = iterator.next();
String splitId = splitFinishedCheckpointId.getKey();
Long splitCheckpointId = splitFinishedCheckpointId.getValue();
if (splitCheckpointId != UNDEFINED_CHECKPOINT_ID
&& checkpointId >= splitCheckpointId) {
// record table-level splits metrics
TableId tableId = SnapshotSplit.extractTableId(splitId);
enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId);
iterator.remove();
}
}
LOG.info(
"Checkpoint completed on checkpoint {} with splitFinishedCheckpointIds size {}.",
checkpointId,
splitFinishedCheckpointIds.size());
}
} }
@Override @Override

@ -17,6 +17,8 @@
package org.apache.flink.cdc.connectors.base.source.assigner; package org.apache.flink.cdc.connectors.base.source.assigner;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.base.config.SourceConfig; import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect; import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.options.StartupOptions; import org.apache.flink.cdc.connectors.base.options.StartupOptions;
@ -27,6 +29,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -49,32 +52,53 @@ public class StreamSplitAssigner implements SplitAssigner {
private final DataSourceDialect dialect; private final DataSourceDialect dialect;
private final OffsetFactory offsetFactory; private final OffsetFactory offsetFactory;
private final SplitEnumeratorContext<? extends SourceSplit> enumeratorContext;
private SourceEnumeratorMetrics enumeratorMetrics;
public StreamSplitAssigner( public StreamSplitAssigner(
SourceConfig sourceConfig, DataSourceDialect dialect, OffsetFactory offsetFactory) { SourceConfig sourceConfig,
this(sourceConfig, false, dialect, offsetFactory); DataSourceDialect dialect,
OffsetFactory offsetFactory,
SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
this(sourceConfig, false, dialect, offsetFactory, enumeratorContext);
} }
public StreamSplitAssigner( public StreamSplitAssigner(
SourceConfig sourceConfig, SourceConfig sourceConfig,
StreamPendingSplitsState checkpoint, StreamPendingSplitsState checkpoint,
DataSourceDialect dialect, DataSourceDialect dialect,
OffsetFactory offsetFactory) { OffsetFactory offsetFactory,
this(sourceConfig, checkpoint.isStreamSplitAssigned(), dialect, offsetFactory); SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
this(
sourceConfig,
checkpoint.isStreamSplitAssigned(),
dialect,
offsetFactory,
enumeratorContext);
} }
private StreamSplitAssigner( private StreamSplitAssigner(
SourceConfig sourceConfig, SourceConfig sourceConfig,
boolean isStreamSplitAssigned, boolean isStreamSplitAssigned,
DataSourceDialect dialect, DataSourceDialect dialect,
OffsetFactory offsetFactory) { OffsetFactory offsetFactory,
SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
this.sourceConfig = sourceConfig; this.sourceConfig = sourceConfig;
this.isStreamSplitAssigned = isStreamSplitAssigned; this.isStreamSplitAssigned = isStreamSplitAssigned;
this.dialect = dialect; this.dialect = dialect;
this.offsetFactory = offsetFactory; this.offsetFactory = offsetFactory;
this.enumeratorContext = enumeratorContext;
} }
@Override @Override
public void open() {} public void open() {
this.enumeratorMetrics = new SourceEnumeratorMetrics(enumeratorContext.metricGroup());
if (isStreamSplitAssigned) {
enumeratorMetrics.enterStreamReading();
} else {
enumeratorMetrics.exitStreamReading();
}
}
@Override @Override
public Optional<SourceSplitBase> getNext() { public Optional<SourceSplitBase> getNext() {
@ -82,6 +106,7 @@ public class StreamSplitAssigner implements SplitAssigner {
return Optional.empty(); return Optional.empty();
} else { } else {
isStreamSplitAssigned = true; isStreamSplitAssigned = true;
enumeratorMetrics.enterStreamReading();
return Optional.of(createStreamSplit()); return Optional.of(createStreamSplit());
} }
} }
@ -105,6 +130,7 @@ public class StreamSplitAssigner implements SplitAssigner {
public void addSplits(Collection<SourceSplitBase> splits) { public void addSplits(Collection<SourceSplitBase> splits) {
// we don't store the split, but will re-create stream split later // we don't store the split, but will re-create stream split later
isStreamSplitAssigned = false; isStreamSplitAssigned = false;
enumeratorMetrics.exitStreamReading();
} }
@Override @Override

@ -53,7 +53,7 @@ import static org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplit
*/ */
public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<PendingSplitsState> { public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<PendingSplitsState> {
private static final int VERSION = 6; private static final int VERSION = 7;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
@ -114,6 +114,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
case 4: case 4:
case 5: case 5:
case 6: case 6:
case 7:
return deserializePendingSplitsState(version, serialized); return deserializePendingSplitsState(version, serialized);
default: default:
throw new IOException("Unknown version: " + version); throw new IOException("Unknown version: " + version);
@ -168,6 +169,8 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
writeTableIds(state.getRemainingTables(), out); writeTableIds(state.getRemainingTables(), out);
out.writeBoolean(state.isTableIdCaseSensitive()); out.writeBoolean(state.isTableIdCaseSensitive());
writeTableSchemas(state.getTableSchemas(), out); writeTableSchemas(state.getTableSchemas(), out);
writeSplitFinishedCheckpointIds(state.getSplitFinishedCheckpointIds(), out);
} }
private void serializeHybridPendingSplitsState( private void serializeHybridPendingSplitsState(
@ -226,7 +229,8 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
assignerStatus, assignerStatus,
new ArrayList<>(), new ArrayList<>(),
false, false,
false); false,
new HashMap<>());
} }
private HybridPendingSplitsState deserializeLegacyHybridPendingSplitsState( private HybridPendingSplitsState deserializeLegacyHybridPendingSplitsState(
@ -277,6 +281,10 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
if (version >= 4) { if (version >= 4) {
tableSchemas.putAll(readTableSchemas(splitVersion, in)); tableSchemas.putAll(readTableSchemas(splitVersion, in));
} }
Map<String, Long> splitFinishedCheckpointIds = new HashMap<>();
if (version >= 7) {
splitFinishedCheckpointIds = readSplitFinishedCheckpointIds(in);
}
return new SnapshotPendingSplitsState( return new SnapshotPendingSplitsState(
alreadyProcessedTables, alreadyProcessedTables,
remainingSchemalessSplits, remainingSchemalessSplits,
@ -286,7 +294,8 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
assignerStatus, assignerStatus,
remainingTableIds, remainingTableIds,
isTableIdCaseSensitive, isTableIdCaseSensitive,
true); true,
splitFinishedCheckpointIds);
} }
private HybridPendingSplitsState deserializeHybridPendingSplitsState( private HybridPendingSplitsState deserializeHybridPendingSplitsState(
@ -306,6 +315,30 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
// Utilities // Utilities
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
private void writeSplitFinishedCheckpointIds(
Map<String, Long> splitFinishedCheckpointIds, DataOutputSerializer out)
throws IOException {
final int size = splitFinishedCheckpointIds.size();
out.writeInt(size);
for (Map.Entry<String, Long> splitFinishedCheckpointId :
splitFinishedCheckpointIds.entrySet()) {
out.writeUTF(splitFinishedCheckpointId.getKey());
out.writeLong(splitFinishedCheckpointId.getValue());
}
}
private Map<String, Long> readSplitFinishedCheckpointIds(DataInputDeserializer in)
throws IOException {
Map<String, Long> splitFinishedCheckpointIds = new HashMap<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
String splitId = in.readUTF();
Long checkpointId = in.readLong();
splitFinishedCheckpointIds.put(splitId, checkpointId);
}
return splitFinishedCheckpointIds;
}
private void writeFinishedOffsets(Map<String, Offset> splitsInfo, DataOutputSerializer out) private void writeFinishedOffsets(Map<String, Offset> splitsInfo, DataOutputSerializer out)
throws IOException { throws IOException {
final int size = splitsInfo.size(); final int size = splitsInfo.size();

@ -68,6 +68,9 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
private final Map<TableId, TableChanges.TableChange> tableSchemas; private final Map<TableId, TableChanges.TableChange> tableSchemas;
/** Map to record splitId and the checkpointId mark the split is finished. */
private final Map<String, Long> splitFinishedCheckpointIds;
public SnapshotPendingSplitsState( public SnapshotPendingSplitsState(
List<TableId> alreadyProcessedTables, List<TableId> alreadyProcessedTables,
List<SchemalessSnapshotSplit> remainingSplits, List<SchemalessSnapshotSplit> remainingSplits,
@ -77,7 +80,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
AssignerStatus assignerStatus, AssignerStatus assignerStatus,
List<TableId> remainingTables, List<TableId> remainingTables,
boolean isTableIdCaseSensitive, boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed) { boolean isRemainingTablesCheckpointed,
Map<String, Long> splitFinishedCheckpointIds) {
this.alreadyProcessedTables = alreadyProcessedTables; this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits; this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits; this.assignedSplits = assignedSplits;
@ -87,6 +91,11 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
this.isTableIdCaseSensitive = isTableIdCaseSensitive; this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.tableSchemas = tableSchemas; this.tableSchemas = tableSchemas;
this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
}
public Map<String, Long> getSplitFinishedCheckpointIds() {
return splitFinishedCheckpointIds;
} }
public List<TableId> getAlreadyProcessedTables() { public List<TableId> getAlreadyProcessedTables() {
@ -141,7 +150,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
&& Objects.equals(alreadyProcessedTables, that.alreadyProcessedTables) && Objects.equals(alreadyProcessedTables, that.alreadyProcessedTables)
&& Objects.equals(remainingSplits, that.remainingSplits) && Objects.equals(remainingSplits, that.remainingSplits)
&& Objects.equals(assignedSplits, that.assignedSplits) && Objects.equals(assignedSplits, that.assignedSplits)
&& Objects.equals(splitFinishedOffsets, that.splitFinishedOffsets); && Objects.equals(splitFinishedOffsets, that.splitFinishedOffsets)
&& Objects.equals(splitFinishedCheckpointIds, that.splitFinishedCheckpointIds);
} }
@Override @Override
@ -154,7 +164,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
splitFinishedOffsets, splitFinishedOffsets,
assignerStatus, assignerStatus,
isTableIdCaseSensitive, isTableIdCaseSensitive,
isRemainingTablesCheckpointed); isRemainingTablesCheckpointed,
splitFinishedCheckpointIds);
} }
@Override @Override
@ -176,6 +187,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
+ isTableIdCaseSensitive + isTableIdCaseSensitive
+ ", isRemainingTablesCheckpointed=" + ", isRemainingTablesCheckpointed="
+ isRemainingTablesCheckpointed + isRemainingTablesCheckpointed
+ ", splitFinishedCheckpointIds="
+ splitFinishedCheckpointIds
+ '}'; + '}';
} }
} }

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.metrics;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import io.debezium.relational.TableId;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/** A collection class for handling metrics in {@link SourceEnumeratorMetrics}. */
public class SourceEnumeratorMetrics {
private static final Logger LOGGER = LoggerFactory.getLogger(SourceEnumeratorMetrics.class);
// Constants
public static final int UNDEFINED = 0;
// Metric names
public static final String IS_SNAPSHOTTING = "isSnapshotting";
public static final String IS_STREAM_READING = "isStreamReading";
public static final String NUM_TABLES_SNAPSHOTTED = "numTablesSnapshotted";
public static final String NUM_TABLES_REMAINING = "numTablesRemaining";
public static final String NUM_SNAPSHOT_SPLITS_PROCESSED = "numSnapshotSplitsProcessed";
public static final String NUM_SNAPSHOT_SPLITS_REMAINING = "numSnapshotSplitsRemaining";
public static final String NUM_SNAPSHOT_SPLITS_FINISHED = "numSnapshotSplitsFinished";
public static final String SNAPSHOT_START_TIME = "snapshotStartTime";
public static final String SNAPSHOT_END_TIME = "snapshotEndTime";
public static final String NAMESPACE_GROUP_KEY = "namespace";
public static final String SCHEMA_GROUP_KEY = "schema";
public static final String TABLE_GROUP_KEY = "table";
private final SplitEnumeratorMetricGroup metricGroup;
private volatile int isSnapshotting = UNDEFINED;
private volatile int isStreamReading = UNDEFINED;
private volatile int numTablesRemaining = 0;
// Map for managing per-table metrics by table identifier
// Key: Identifier of the table
// Value: TableMetrics related to the table
private final Map<TableId, TableMetrics> tableMetricsMap = new ConcurrentHashMap<>();
public SourceEnumeratorMetrics(SplitEnumeratorMetricGroup metricGroup) {
this.metricGroup = metricGroup;
metricGroup.gauge(IS_SNAPSHOTTING, () -> isSnapshotting);
metricGroup.gauge(IS_STREAM_READING, () -> isStreamReading);
metricGroup.gauge(NUM_TABLES_REMAINING, () -> numTablesRemaining);
}
public void enterSnapshotPhase() {
this.isSnapshotting = 1;
}
public void exitSnapshotPhase() {
this.isSnapshotting = 0;
}
public void enterStreamReading() {
this.isStreamReading = 1;
}
public void exitStreamReading() {
this.isStreamReading = 0;
}
public void registerMetrics(
Gauge<Integer> numTablesSnapshotted,
Gauge<Integer> numSnapshotSplitsProcessed,
Gauge<Integer> numSnapshotSplitsRemaining) {
metricGroup.gauge(NUM_TABLES_SNAPSHOTTED, numTablesSnapshotted);
metricGroup.gauge(NUM_SNAPSHOT_SPLITS_PROCESSED, numSnapshotSplitsProcessed);
metricGroup.gauge(NUM_SNAPSHOT_SPLITS_REMAINING, numSnapshotSplitsRemaining);
}
public void addNewTables(int numNewTables) {
numTablesRemaining += numNewTables;
}
public void startSnapshotTables(int numSnapshottedTables) {
numTablesRemaining -= numSnapshottedTables;
}
public TableMetrics getTableMetrics(TableId tableId) {
return tableMetricsMap.computeIfAbsent(
tableId,
key -> new TableMetrics(key.catalog(), key.schema(), key.table(), metricGroup));
}
// ----------------------------------- Helper classes --------------------------------
/**
* Collection class for managing metrics of a table.
*
* <p>Metrics of table level are registered in its corresponding subgroup under the {@link
* SplitEnumeratorMetricGroup}.
*/
public static class TableMetrics {
private AtomicInteger numSnapshotSplitsProcessed = new AtomicInteger(0);
private AtomicInteger numSnapshotSplitsRemaining = new AtomicInteger(0);
private AtomicInteger numSnapshotSplitsFinished = new AtomicInteger(0);
private volatile long snapshotStartTime = UNDEFINED;
private volatile long snapshotEndTime = UNDEFINED;
private Set<Integer> remainingSplitChunkIds =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private Set<Integer> processedSplitChunkIds =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private Set<Integer> finishedSplitChunkIds =
Collections.newSetFromMap(new ConcurrentHashMap<>());
public TableMetrics(
String databaseName, String schemaName, String tableName, MetricGroup parentGroup) {
databaseName = processNull(databaseName);
schemaName = processNull(schemaName);
tableName = processNull(tableName);
MetricGroup metricGroup =
parentGroup
.addGroup(NAMESPACE_GROUP_KEY, databaseName)
.addGroup(SCHEMA_GROUP_KEY, schemaName)
.addGroup(TABLE_GROUP_KEY, tableName);
metricGroup.gauge(
NUM_SNAPSHOT_SPLITS_PROCESSED, () -> numSnapshotSplitsProcessed.intValue());
metricGroup.gauge(
NUM_SNAPSHOT_SPLITS_REMAINING, () -> numSnapshotSplitsRemaining.intValue());
metricGroup.gauge(
NUM_SNAPSHOT_SPLITS_FINISHED, () -> numSnapshotSplitsFinished.intValue());
metricGroup.gauge(SNAPSHOT_START_TIME, () -> snapshotStartTime);
metricGroup.gauge(SNAPSHOT_END_TIME, () -> snapshotEndTime);
snapshotStartTime = System.currentTimeMillis();
}
private String processNull(String name) {
if (StringUtils.isBlank(name)) {
// If null, convert to an empty string
return "";
}
return name;
}
public void addNewSplit(String newSplitId) {
int chunkId = SnapshotSplit.extractChunkId(newSplitId);
if (!remainingSplitChunkIds.contains(chunkId)) {
remainingSplitChunkIds.add(chunkId);
numSnapshotSplitsRemaining.getAndAdd(1);
LOGGER.info("add remaining split: {}", newSplitId);
}
}
public void addNewSplits(List<String> newSplitIds) {
if (newSplitIds != null) {
for (String newSplitId : newSplitIds) {
addNewSplit(newSplitId);
}
}
}
public void removeRemainingSplit(String removeSplitId) {
int chunkId = SnapshotSplit.extractChunkId(removeSplitId);
if (remainingSplitChunkIds.contains(chunkId)) {
remainingSplitChunkIds.remove(chunkId);
numSnapshotSplitsRemaining.getAndUpdate(num -> num - 1);
LOGGER.info("remove remaining split: {}", removeSplitId);
}
}
public void addProcessedSplit(String processedSplitId) {
int chunkId = SnapshotSplit.extractChunkId(processedSplitId);
if (!processedSplitChunkIds.contains(chunkId)) {
processedSplitChunkIds.add(chunkId);
numSnapshotSplitsProcessed.getAndAdd(1);
LOGGER.info("add processed split: {}", processedSplitId);
}
}
public void removeProcessedSplit(String removeSplitId) {
int chunkId = SnapshotSplit.extractChunkId(removeSplitId);
if (processedSplitChunkIds.contains(chunkId)) {
processedSplitChunkIds.remove(chunkId);
numSnapshotSplitsProcessed.getAndUpdate(num -> num - 1);
LOGGER.info("remove processed split: {}", removeSplitId);
}
}
public void reprocessSplit(String reprocessSplitId) {
addNewSplit(reprocessSplitId);
removeProcessedSplit(reprocessSplitId);
}
public void finishProcessSplit(String processedSplitId) {
addProcessedSplit(processedSplitId);
removeRemainingSplit(processedSplitId);
}
public void tryToMarkSnapshotEndTime() {
if (numSnapshotSplitsRemaining.get() == 0
&& (numSnapshotSplitsFinished.get() == numSnapshotSplitsProcessed.get())) {
// Mark the end time of snapshot when remained splits is zero and processed splits
// are all finished
snapshotEndTime = System.currentTimeMillis();
}
}
public void addFinishedSplits(Set<String> finishedSplitIds) {
if (finishedSplitIds != null) {
for (String finishedSplitId : finishedSplitIds) {
addFinishedSplit(finishedSplitId);
}
}
}
public void addFinishedSplit(String finishedSplitId) {
int chunkId = SnapshotSplit.extractChunkId(finishedSplitId);
if (!finishedSplitChunkIds.contains(chunkId)) {
finishedSplitChunkIds.add(chunkId);
numSnapshotSplitsFinished.getAndAdd(1);
tryToMarkSnapshotEndTime();
LOGGER.info("add finished split: {}", finishedSplitId);
}
}
public void removeFinishedSplit(String removeSplitId) {
int chunkId = SnapshotSplit.extractChunkId(removeSplitId);
if (finishedSplitChunkIds.contains(chunkId)) {
finishedSplitChunkIds.remove(chunkId);
numSnapshotSplitsFinished.getAndUpdate(num -> num - 1);
LOGGER.info("remove finished split: {}", removeSplitId);
}
}
}
}

@ -20,16 +20,56 @@ package org.apache.flink.cdc.connectors.base.source.metrics;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader; import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup; import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.util.clock.SystemClock;
import io.debezium.data.Envelope;
import io.debezium.relational.TableId;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
/** A collection class for handling metrics in {@link IncrementalSourceReader}. */ /** A collection class for handling metrics in {@link IncrementalSourceReader}. */
public class SourceReaderMetrics { public class SourceReaderMetrics {
private static final Logger LOG = LoggerFactory.getLogger(SourceReaderMetrics.class);
public static final long UNDEFINED = -1; public static final long UNDEFINED = -1;
// Metric group keys
public static final String NAMESPACE_GROUP_KEY = "namespace";
public static final String SCHEMA_GROUP_KEY = "schema";
public static final String TABLE_GROUP_KEY = "table";
// Metric names
public static final String NUM_SNAPSHOT_RECORDS = "numSnapshotRecords";
public static final String NUM_INSERT_DML_RECORDS = "numInsertDMLRecords";
public static final String NUM_UPDATE_DML_RECORDS = "numUpdateDMLRecords";
public static final String NUM_DELETE_DML_RECORDS = "numDeleteDMLRecords";
public static final String NUM_DDL_RECORDS = "numDDLRecords";
public static final String CURRENT_EVENT_TIME_LAG = "currentEventTimeLag";
private final SourceReaderMetricGroup metricGroup; private final SourceReaderMetricGroup metricGroup;
// Reader-level metrics
private final Counter snapshotCounter;
private final Counter insertCounter;
private final Counter updateCounter;
private final Counter deleteCounter;
private final Counter schemaChangeCounter;
private final Map<TableId, TableMetrics> tableMetricsMap = new HashMap<>();
/** /**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator. * record fetched into the source operator.
@ -38,15 +78,22 @@ public class SourceReaderMetrics {
/** The total number of record that failed to consume, process or emit. */ /** The total number of record that failed to consume, process or emit. */
private final Counter numRecordsInErrorsCounter; private final Counter numRecordsInErrorsCounter;
/** The timestamp of the last record received. */
private volatile long lastReceivedEventTime = UNDEFINED;
public SourceReaderMetrics(SourceReaderMetricGroup metricGroup) { public SourceReaderMetrics(SourceReaderMetricGroup metricGroup) {
this.metricGroup = metricGroup; this.metricGroup = metricGroup;
this.numRecordsInErrorsCounter = metricGroup.getNumRecordsInErrorsCounter(); this.numRecordsInErrorsCounter = metricGroup.getNumRecordsInErrorsCounter();
}
public void registerMetrics() {
metricGroup.gauge( metricGroup.gauge(
MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, (Gauge<Long>) this::getFetchDelay); MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, (Gauge<Long>) this::getFetchDelay);
metricGroup.gauge(CURRENT_EVENT_TIME_LAG, this::getCurrentEventTimeLag);
snapshotCounter = metricGroup.counter(NUM_SNAPSHOT_RECORDS);
insertCounter = metricGroup.counter(NUM_INSERT_DML_RECORDS);
updateCounter = metricGroup.counter(NUM_UPDATE_DML_RECORDS);
deleteCounter = metricGroup.counter(NUM_DELETE_DML_RECORDS);
schemaChangeCounter = metricGroup.counter(NUM_DDL_RECORDS);
} }
public long getFetchDelay() { public long getFetchDelay() {
@ -60,4 +107,150 @@ public class SourceReaderMetrics {
public void addNumRecordsInErrors(long delta) { public void addNumRecordsInErrors(long delta) {
this.numRecordsInErrorsCounter.inc(delta); this.numRecordsInErrorsCounter.inc(delta);
} }
public void updateLastReceivedEventTime(Long eventTimestamp) {
if (eventTimestamp != null && eventTimestamp > 0L) {
lastReceivedEventTime = eventTimestamp;
}
}
public void markRecord() {
try {
metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
} catch (Exception e) {
LOG.warn("Failed to update record counters.", e);
}
}
public void updateRecordCounters(SourceRecord record) {
catchAndWarnLogAllExceptions(
() -> {
// Increase reader and table level input counters
if (isDataChangeRecord(record)) {
TableMetrics tableMetrics = getTableMetrics(getTableId(record));
Envelope.Operation op = Envelope.operationFor(record);
switch (op) {
case READ:
snapshotCounter.inc();
tableMetrics.markSnapshotRecord();
break;
case CREATE:
insertCounter.inc();
tableMetrics.markInsertRecord();
break;
case DELETE:
deleteCounter.inc();
tableMetrics.markDeleteRecord();
break;
case UPDATE:
updateCounter.inc();
tableMetrics.markUpdateRecord();
break;
}
} else if (isSchemaChangeEvent(record)) {
schemaChangeCounter.inc();
TableId tableId = getTableId(record);
if (tableId != null) {
getTableMetrics(tableId).markSchemaChangeRecord();
}
}
});
}
private TableMetrics getTableMetrics(TableId tableId) {
return tableMetricsMap.computeIfAbsent(
tableId,
id -> new TableMetrics(id.catalog(), id.schema(), id.table(), metricGroup));
}
// ------------------------------- Helper functions -----------------------------
private void catchAndWarnLogAllExceptions(Runnable runnable) {
try {
runnable.run();
} catch (Exception e) {
// Catch all exceptions as errors in metric handling should not fail the job
LOG.warn("Failed to update metrics", e);
}
}
private long getCurrentEventTimeLag() {
if (lastReceivedEventTime == UNDEFINED) {
return UNDEFINED;
}
return SystemClock.getInstance().absoluteTimeMillis() - lastReceivedEventTime;
}
// ----------------------------------- Helper classes --------------------------------
/**
* Collection class for managing metrics of a table.
*
* <p>Metrics of table level are registered in its corresponding subgroup under the {@link
* SourceReaderMetricGroup}.
*/
private static class TableMetrics {
// Snapshot + Stream
private final Counter recordsCounter;
// Snapshot phase
private final Counter snapshotCounter;
// Stream phase
private final Counter insertCounter;
private final Counter updateCounter;
private final Counter deleteCounter;
private final Counter schemaChangeCounter;
public TableMetrics(
String databaseName, String schemaName, String tableName, MetricGroup parentGroup) {
databaseName = processNull(databaseName);
schemaName = processNull(schemaName);
tableName = processNull(tableName);
MetricGroup metricGroup =
parentGroup
.addGroup(NAMESPACE_GROUP_KEY, databaseName)
.addGroup(SCHEMA_GROUP_KEY, schemaName)
.addGroup(TABLE_GROUP_KEY, tableName);
recordsCounter = metricGroup.counter(MetricNames.IO_NUM_RECORDS_IN);
snapshotCounter = metricGroup.counter(NUM_SNAPSHOT_RECORDS);
insertCounter = metricGroup.counter(NUM_INSERT_DML_RECORDS);
updateCounter = metricGroup.counter(NUM_UPDATE_DML_RECORDS);
deleteCounter = metricGroup.counter(NUM_DELETE_DML_RECORDS);
schemaChangeCounter = metricGroup.counter(NUM_DDL_RECORDS);
}
private String processNull(String name) {
if (StringUtils.isBlank(name)) {
// If null, convert to an empty string
return "";
}
return name;
}
public void markSnapshotRecord() {
recordsCounter.inc();
snapshotCounter.inc();
}
public void markInsertRecord() {
recordsCounter.inc();
insertCounter.inc();
}
public void markDeleteRecord() {
recordsCounter.inc();
deleteCounter.inc();
}
public void markUpdateRecord() {
recordsCounter.inc();
updateCounter.inc();
}
public void markSchemaChangeRecord() {
recordsCounter.inc();
schemaChangeCounter.inc();
}
}
} }

@ -152,6 +152,9 @@ public class IncrementalSourceRecordEmitter<T>
} }
protected void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception { protected void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
sourceReaderMetrics.markRecord();
sourceReaderMetrics.updateRecordCounters(element);
outputCollector.output = output; outputCollector.output = output;
outputCollector.currentMessageTimestamp = getMessageTimestamp(element); outputCollector.currentMessageTimestamp = getMessageTimestamp(element);
debeziumDeserializationSchema.deserialize(element, outputCollector); debeziumDeserializationSchema.deserialize(element, outputCollector);
@ -169,9 +172,10 @@ public class IncrementalSourceRecordEmitter<T>
} }
} }
private static class OutputCollector<T> implements Collector<T> { /** An adapter between {@link SourceOutput} and {@link Collector}. */
private SourceOutput<T> output; protected static class OutputCollector<T> implements Collector<T> {
private Long currentMessageTimestamp; public SourceOutput<T> output;
public Long currentMessageTimestamp;
@Override @Override
public void collect(T record) { public void collect(T record) {

@ -24,6 +24,7 @@ import io.debezium.document.DocumentReader;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecord;
import io.debezium.util.SchemaNameAdjuster; import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
@ -119,7 +120,11 @@ public class SourceRecordUtils {
Struct value = (Struct) dataRecord.value(); Struct value = (Struct) dataRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE); Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String dbName = source.getString(DATABASE_NAME_KEY); String dbName = source.getString(DATABASE_NAME_KEY);
String schemaName = source.getString(SCHEMA_NAME_KEY); Field field = source.schema().field(SCHEMA_NAME_KEY);
String schemaName = null;
if (field != null) {
schemaName = source.getString(SCHEMA_NAME_KEY);
}
String tableName = source.getString(TABLE_NAME_KEY); String tableName = source.getString(TABLE_NAME_KEY);
return new TableId(dbName, schemaName, tableName); return new TableId(dbName, schemaName, tableName);
} }

@ -0,0 +1,396 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.connectors.base.experimental.MySqlSourceBuilder;
import org.apache.flink.cdc.connectors.base.source.MySqlEventDeserializer;
import org.apache.flink.cdc.connectors.base.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.base.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.base.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** MySQL Source Metrics Tests. */
public class MySqlSourceMetricsTest {
private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceMetricsTest.class);
private static final int DEFAULT_PARALLELISM = 4;
private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);
protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics();
@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.setConfiguration(
metricReporter.addToConfiguration(new Configuration()))
.build());
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
LOG.info("Containers are started.");
}
private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "metrics", "mysqluser", "mysqlpw");
@Test
public void testSourceMetrics() throws Exception {
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()));
inventoryDatabase.createAndInitialize();
final String tableId = inventoryDatabase.getDatabaseName() + ".users";
MySqlSourceBuilder.MySqlIncrementalSource<Event> mySqlChangeEventSource =
new MySqlSourceBuilder<Event>()
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.databaseList(inventoryDatabase.getDatabaseName())
.tableList(tableId)
.username(inventoryDatabase.getUsername())
.password(inventoryDatabase.getPassword())
.serverId("5401-5404")
.deserializer(buildRowDataDebeziumDeserializeSchema())
.includeSchemaChanges(true) // output the schema changes as well
.splitSize(2)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 4
CloseableIterator<Event> iterator =
env.fromSource(
mySqlChangeEventSource,
WatermarkStrategy.noWatermarks(),
"MySqlParallelSource")
.setParallelism(1)
.executeAndCollect(); // collect record
String[] snapshotExpectedRecords =
new String[] {
"+I[101, Tom, 3]",
"+I[102, Jack, 5]",
"+I[103, Allen, 10]",
"+I[104, Andrew, 13]",
"+I[105, Arnold, 15]",
"+I[106, Claud, 19]",
"+I[107, Howard, 37]",
"+I[108, Jacob, 46]",
"+I[109, Lionel, 58]"
};
// step-1: consume snapshot data
List<Event> snapshotRowDataList = new ArrayList<>();
for (int i = 0; i < snapshotExpectedRecords.length && iterator.hasNext(); i++) {
snapshotRowDataList.add(iterator.next());
}
List<String> snapshotActualRecords = formatResult(snapshotRowDataList, dataType);
assertEqualsInAnyOrder(Arrays.asList(snapshotExpectedRecords), snapshotActualRecords);
// step-2: make 6 change events in one MySQL transaction
makeBinlogEvents(getConnection(), tableId);
// mock ddl events
makeDdlEvents(getConnection(), tableId);
String[] binlogExpectedRecords =
new String[] {
"-U[103, Allen, 10]",
"+U[103, Oswald, 10]",
"+I[110, Terence, 78]",
"-D[110, Terence, 78]",
"-U[103, Oswald, 10]",
"+U[103, Marry, 10]"
};
// step-3: consume binlog change events
List<Event> binlogRowDataList = new ArrayList<>();
for (int i = 0; i < 4 && iterator.hasNext(); i++) {
binlogRowDataList.add(iterator.next());
}
List<String> binlogActualRecords = formatResult(binlogRowDataList, dataType);
assertEqualsInAnyOrder(Arrays.asList(binlogExpectedRecords), binlogActualRecords);
Set<MetricGroup> metricGroups = metricReporter.findGroups("users");
for (MetricGroup enumeratorGroup : metricGroups) {
boolean isTableMetric = true;
for (String scopeComponent : enumeratorGroup.getScopeComponents()) {
if (scopeComponent.contains("enumerator")) {
isTableMetric = false;
break;
}
}
if (!isTableMetric) {
break;
}
Map<String, Metric> enumeratorMetrics =
metricReporter.getMetricsByGroup(enumeratorGroup);
Assert.assertEquals(
1, ((Counter) enumeratorMetrics.get("numDeleteDMLRecords")).getCount());
Assert.assertEquals(
1, ((Counter) enumeratorMetrics.get("numInsertDMLRecords")).getCount());
Assert.assertEquals(
9, ((Counter) enumeratorMetrics.get("numSnapshotRecords")).getCount());
// ddl eventd
Assert.assertEquals(1, ((Counter) enumeratorMetrics.get("numDDLRecords")).getCount());
Assert.assertEquals(13, ((Counter) enumeratorMetrics.get("numRecordsIn")).getCount());
Assert.assertEquals(
2, ((Counter) enumeratorMetrics.get("numUpdateDMLRecords")).getCount());
}
Set<MetricGroup> enumeratorGroups = metricReporter.findGroups("enumerator");
for (MetricGroup enumeratorGroup : enumeratorGroups) {
boolean isTableMetric = false;
for (String scopeComponent : enumeratorGroup.getScopeComponents()) {
if (scopeComponent.contains("users")) {
isTableMetric = true;
break;
}
}
Map<String, Metric> enumeratorMetrics =
metricReporter.getMetricsByGroup(enumeratorGroup);
if (isTableMetric) {
Assert.assertEquals(
0,
((Gauge<Integer>) enumeratorMetrics.get("numSnapshotSplitsRemaining"))
.getValue()
.intValue());
Assert.assertEquals(
5,
((Gauge<Integer>) enumeratorMetrics.get("numSnapshotSplitsProcessed"))
.getValue()
.intValue());
Assert.assertEquals(
5,
((Gauge<Integer>) enumeratorMetrics.get("numSnapshotSplitsFinished"))
.getValue()
.intValue());
Assert.assertTrue(
((Gauge<Long>) enumeratorMetrics.get("snapshotEndTime"))
.getValue()
.longValue()
> 0);
Assert.assertTrue(
((Gauge<Long>) enumeratorMetrics.get("snapshotStartTime"))
.getValue()
.longValue()
> 0);
} else {
Assert.assertEquals(
0,
((Gauge<Integer>) enumeratorMetrics.get("isSnapshotting"))
.getValue()
.intValue());
Assert.assertEquals(
1,
((Gauge<Integer>) enumeratorMetrics.get("isStreamReading"))
.getValue()
.intValue());
Assert.assertEquals(
1,
((Gauge<Integer>) enumeratorMetrics.get("numTablesSnapshotted"))
.getValue()
.intValue());
Assert.assertEquals(
0,
((Gauge<Integer>) enumeratorMetrics.get("numSnapshotSplitsRemaining"))
.getValue()
.intValue());
Assert.assertEquals(
5,
((Gauge<Integer>) enumeratorMetrics.get("numSnapshotSplitsProcessed"))
.getValue()
.intValue());
}
}
// stop the worker
iterator.close();
}
private MySqlEventDeserializer buildRowDataDebeziumDeserializeSchema() {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(DebeziumChangelogMode.ALL, true);
return deserializer;
}
private List<String> formatResult(List<Event> records, DataType dataType) {
RowRowConverter rowRowConverter = RowRowConverter.create(dataType);
rowRowConverter.open(Thread.currentThread().getContextClassLoader());
return records.stream()
.flatMap(
item -> {
DataChangeEvent changeEvent = ((DataChangeEvent) item);
RecordData before = changeEvent.before();
RecordData after = changeEvent.after();
switch (changeEvent.op()) {
case INSERT:
GenericRowData insertData = new GenericRowData(3);
insertData.setRowKind(RowKind.INSERT);
convertData(changeEvent.after(), insertData);
return Arrays.stream(new GenericRowData[] {insertData});
case DELETE:
GenericRowData deleteData = null;
deleteData = new GenericRowData(3);
deleteData.setRowKind(RowKind.DELETE);
convertData(before, deleteData);
return Arrays.stream(new GenericRowData[] {deleteData});
case UPDATE:
case REPLACE:
GenericRowData beforeData = new GenericRowData(3);
beforeData.setRowKind(RowKind.UPDATE_BEFORE);
convertData(before, beforeData);
GenericRowData afterData = new GenericRowData(3);
afterData.setRowKind(RowKind.UPDATE_AFTER);
convertData(after, afterData);
return Stream.of(beforeData, afterData)
.filter(row -> row != null);
}
return Stream.empty();
})
.map(rowRowConverter::toExternal)
.map(Object::toString)
.collect(Collectors.toList());
}
private void convertData(RecordData inputData, GenericRowData outputData) {
outputData.setField(0, inputData.getLong(0));
outputData.setField(1, StringData.fromString(inputData.getString(1).toString()));
outputData.setField(2, inputData.getInt(2));
}
private MySqlConnection getConnection() {
Map<String, String> properties = new HashMap<>();
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
properties.put("database.user", inventoryDatabase.getUsername());
properties.put("database.password", inventoryDatabase.getPassword());
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
// properties.put("transaction.topic", "transaction_topic");
io.debezium.config.Configuration configuration =
io.debezium.config.Configuration.from(properties);
return new MySqlConnection(new MySqlConnection.MySqlConnectionConfiguration(configuration));
}
private void makeBinlogEvents(JdbcConnection connection, String tableId) throws SQLException {
try {
connection.setAutoCommit(false);
// make binlog events
connection.execute(
"UPDATE " + tableId + " SET name = 'Oswald' where id = 103",
"INSERT INTO " + tableId + " VALUES(110,'Terence',78)",
"DELETE FROM " + tableId + " where id = 110",
"UPDATE " + tableId + " SET name = 'Marry' where id = 103");
connection.commit();
} finally {
connection.close();
}
}
private void makeDdlEvents(JdbcConnection connection, String tableId) throws SQLException {
try {
connection.setAutoCommit(false);
// make binlog events
connection.execute("alter table " + tableId + " add test_add_col int null");
connection.commit();
} finally {
connection.close();
}
}
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
private static MySqlContainer createMySqlContainer(MySqlVersion version) {
return new MySqlContainer(version)
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
}
}

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.connectors.base.source.parser.CustomMySqlAntlrDdlParser;
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.data.Envelope;
import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord;
/** Event deserializer for {@link MySqlDataSource}. */
@Internal
public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final long serialVersionUID = 1L;
public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.mysql.SchemaChangeKey";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final boolean includeSchemaChanges;
private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customParser;
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
}
@Override
protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
if (customParser == null) {
customParser = new CustomMySqlAntlrDdlParser();
tables = new Tables();
}
try {
HistoryRecord historyRecord = getHistoryRecord(record);
String databaseName =
historyRecord.document().getString(HistoryRecord.Fields.DATABASE_NAME);
String ddl =
historyRecord.document().getString(HistoryRecord.Fields.DDL_STATEMENTS);
customParser.setCurrentDatabase(databaseName);
customParser.parse(ddl, tables);
return customParser.getAndClearParsedEvents();
} catch (IOException e) {
throw new IllegalStateException("Failed to parse the schema change : " + record, e);
}
}
return Collections.emptyList();
}
@Override
protected boolean isDataChangeRecord(SourceRecord record) {
Schema valueSchema = record.valueSchema();
Struct value = (Struct) record.value();
return value != null
&& valueSchema != null
&& valueSchema.field(Envelope.FieldName.OPERATION) != null
&& value.getString(Envelope.FieldName.OPERATION) != null;
}
@Override
protected boolean isSchemaChangeRecord(SourceRecord record) {
Schema keySchema = record.keySchema();
return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
}
@Override
protected TableId getTableId(SourceRecord record) {
String[] parts = record.topic().split("\\.");
return TableId.tableId(parts[1], parts[2]);
}
@Override
protected Map<String, String> getMetadata(SourceRecord record) {
return Collections.emptyMap();
}
@Override
protected Object convertToString(Object dbzObj, Schema schema) {
return BinaryStringData.fromString(dbzObj.toString());
}
}

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import org.apache.kafka.connect.data.Schema;
/** {@link DataType} inference for MySQL debezium {@link Schema}. */
@Internal
public class MySqlSchemaDataTypeInference extends DebeziumSchemaDataTypeInference {
private static final long serialVersionUID = 1L;
protected DataType inferStruct(Object value, Schema schema) {
// the Geometry datatype in MySQL will be converted to
// a String with Json format
if (Point.LOGICAL_NAME.equals(schema.name())
|| Geometry.LOGICAL_NAME.equals(schema.name())) {
return DataTypes.STRING();
} else {
return super.inferStruct(value, schema);
}
}
}

@ -21,6 +21,9 @@ import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
import org.apache.flink.cdc.connectors.base.source.assigner.state.version5.HybridPendingSplitsStateVersion5; import org.apache.flink.cdc.connectors.base.source.assigner.state.version5.HybridPendingSplitsStateVersion5;
import org.apache.flink.cdc.connectors.base.source.assigner.state.version5.PendingSplitsStateSerializerVersion5; import org.apache.flink.cdc.connectors.base.source.assigner.state.version5.PendingSplitsStateSerializerVersion5;
import org.apache.flink.cdc.connectors.base.source.assigner.state.version5.SnapshotPendingSplitsStateVersion5; import org.apache.flink.cdc.connectors.base.source.assigner.state.version5.SnapshotPendingSplitsStateVersion5;
import org.apache.flink.cdc.connectors.base.source.assigner.state.version6.HybridPendingSplitsStateVersion6;
import org.apache.flink.cdc.connectors.base.source.assigner.state.version6.PendingSplitsStateSerializerVersion6;
import org.apache.flink.cdc.connectors.base.source.assigner.state.version6.SnapshotPendingSplitsStateVersion6;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit;
@ -57,14 +60,14 @@ public class PendingSplitsStateSerializerTest {
new PendingSplitsStateSerializer(constructSourceSplitSerializer()); new PendingSplitsStateSerializer(constructSourceSplitSerializer());
PendingSplitsState streamSplitsStateAfter = PendingSplitsState streamSplitsStateAfter =
pendingSplitsStateSerializer.deserializePendingSplitsState( pendingSplitsStateSerializer.deserializePendingSplitsState(
6, pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore)); 7, pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore));
Assert.assertEquals(streamPendingSplitsStateBefore, streamSplitsStateAfter); Assert.assertEquals(streamPendingSplitsStateBefore, streamSplitsStateAfter);
SnapshotPendingSplitsState snapshotPendingSplitsStateBefore = SnapshotPendingSplitsState snapshotPendingSplitsStateBefore =
constructSnapshotPendingSplitsState(AssignerStatus.NEWLY_ADDED_ASSIGNING); constructSnapshotPendingSplitsState(AssignerStatus.NEWLY_ADDED_ASSIGNING);
PendingSplitsState snapshotPendingSplitsStateAfter = PendingSplitsState snapshotPendingSplitsStateAfter =
pendingSplitsStateSerializer.deserializePendingSplitsState( pendingSplitsStateSerializer.deserializePendingSplitsState(
6, 7,
pendingSplitsStateSerializer.serialize(snapshotPendingSplitsStateBefore)); pendingSplitsStateSerializer.serialize(snapshotPendingSplitsStateBefore));
Assert.assertEquals(snapshotPendingSplitsStateBefore, snapshotPendingSplitsStateAfter); Assert.assertEquals(snapshotPendingSplitsStateBefore, snapshotPendingSplitsStateAfter);
@ -72,12 +75,12 @@ public class PendingSplitsStateSerializerTest {
new HybridPendingSplitsState(snapshotPendingSplitsStateBefore, false); new HybridPendingSplitsState(snapshotPendingSplitsStateBefore, false);
PendingSplitsState hybridPendingSplitsStateAfter = PendingSplitsState hybridPendingSplitsStateAfter =
pendingSplitsStateSerializer.deserializePendingSplitsState( pendingSplitsStateSerializer.deserializePendingSplitsState(
6, pendingSplitsStateSerializer.serialize(hybridPendingSplitsStateBefore)); 7, pendingSplitsStateSerializer.serialize(hybridPendingSplitsStateBefore));
Assert.assertEquals(hybridPendingSplitsStateBefore, hybridPendingSplitsStateAfter); Assert.assertEquals(hybridPendingSplitsStateBefore, hybridPendingSplitsStateAfter);
} }
@Test @Test
public void testPendingSplitsStateSerializerCompatibility() throws IOException { public void testPendingSplitsStateSerializerCompatibilityVersion5() throws IOException {
StreamPendingSplitsState streamPendingSplitsStateBefore = StreamPendingSplitsState streamPendingSplitsStateBefore =
new StreamPendingSplitsState(true); new StreamPendingSplitsState(true);
PendingSplitsStateSerializer pendingSplitsStateSerializer = PendingSplitsStateSerializer pendingSplitsStateSerializer =
@ -95,7 +98,7 @@ public class PendingSplitsStateSerializerTest {
pendingSplitsStateSerializer.deserializePendingSplitsState( pendingSplitsStateSerializer.deserializePendingSplitsState(
5, 5,
PendingSplitsStateSerializerVersion5.serialize( PendingSplitsStateSerializerVersion5.serialize(
constructSnapshotPendingSplitsStateVersion4(false))); constructSnapshotPendingSplitsStateVersion5(false)));
Assert.assertEquals(expectedSnapshotSplitsState, snapshotPendingSplitsStateAfter); Assert.assertEquals(expectedSnapshotSplitsState, snapshotPendingSplitsStateAfter);
HybridPendingSplitsState expectedHybridPendingSplitsState = HybridPendingSplitsState expectedHybridPendingSplitsState =
@ -108,7 +111,46 @@ public class PendingSplitsStateSerializerTest {
5, 5,
PendingSplitsStateSerializerVersion5.serialize( PendingSplitsStateSerializerVersion5.serialize(
new HybridPendingSplitsStateVersion5( new HybridPendingSplitsStateVersion5(
constructSnapshotPendingSplitsStateVersion4(true), false))); constructSnapshotPendingSplitsStateVersion5(true), false)));
Assert.assertEquals(expectedHybridPendingSplitsState, hybridPendingSplitsStateAfter);
}
@Test
public void testPendingSplitsStateSerializerCompatibilityVersion6() throws IOException {
StreamPendingSplitsState streamPendingSplitsStateBefore =
new StreamPendingSplitsState(true);
PendingSplitsStateSerializer pendingSplitsStateSerializer =
new PendingSplitsStateSerializer(constructSourceSplitSerializer());
PendingSplitsState streamSplitsStateAfter =
pendingSplitsStateSerializer.deserializePendingSplitsState(
6,
PendingSplitsStateSerializerVersion6.serialize(
streamPendingSplitsStateBefore));
Assert.assertEquals(streamPendingSplitsStateBefore, streamSplitsStateAfter);
SnapshotPendingSplitsState expectedSnapshotSplitsState =
constructSnapshotPendingSplitsState(AssignerStatus.INITIAL_ASSIGNING);
PendingSplitsState snapshotPendingSplitsStateAfter =
pendingSplitsStateSerializer.deserializePendingSplitsState(
6,
PendingSplitsStateSerializerVersion6.serialize(
constructSnapshotPendingSplitsStateVersion6(
AssignerStatus.INITIAL_ASSIGNING)));
Assert.assertEquals(expectedSnapshotSplitsState, snapshotPendingSplitsStateAfter);
HybridPendingSplitsState expectedHybridPendingSplitsState =
new HybridPendingSplitsState(
constructSnapshotPendingSplitsState(
AssignerStatus.INITIAL_ASSIGNING_FINISHED),
false);
PendingSplitsState hybridPendingSplitsStateAfter =
pendingSplitsStateSerializer.deserializePendingSplitsState(
6,
PendingSplitsStateSerializerVersion6.serialize(
new HybridPendingSplitsStateVersion6(
constructSnapshotPendingSplitsStateVersion6(
AssignerStatus.INITIAL_ASSIGNING_FINISHED),
false)));
Assert.assertEquals(expectedHybridPendingSplitsState, hybridPendingSplitsStateAfter); Assert.assertEquals(expectedHybridPendingSplitsState, hybridPendingSplitsStateAfter);
} }
@ -181,10 +223,11 @@ public class PendingSplitsStateSerializerTest {
assignerStatus, assignerStatus,
Arrays.asList(TableId.parse("catalog2.schema2.table2")), Arrays.asList(TableId.parse("catalog2.schema2.table2")),
true, true,
true); true,
new HashMap<>());
} }
private SnapshotPendingSplitsStateVersion5 constructSnapshotPendingSplitsStateVersion4( private SnapshotPendingSplitsStateVersion5 constructSnapshotPendingSplitsStateVersion5(
boolean isAssignerFinished) { boolean isAssignerFinished) {
SchemalessSnapshotSplit schemalessSnapshotSplit = constuctSchemalessSnapshotSplit(); SchemalessSnapshotSplit schemalessSnapshotSplit = constuctSchemalessSnapshotSplit();
Map<String, SchemalessSnapshotSplit> assignedSplits = new HashMap<>(); Map<String, SchemalessSnapshotSplit> assignedSplits = new HashMap<>();
@ -206,6 +249,28 @@ public class PendingSplitsStateSerializerTest {
true); true);
} }
private SnapshotPendingSplitsStateVersion6 constructSnapshotPendingSplitsStateVersion6(
AssignerStatus assignerStatus) {
SchemalessSnapshotSplit schemalessSnapshotSplit = constuctSchemalessSnapshotSplit();
Map<String, SchemalessSnapshotSplit> assignedSplits = new HashMap<>();
assignedSplits.put(tableId.toQuotedString('`'), schemalessSnapshotSplit);
Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
tableSchemas.put(
tableId,
new TableChanges.TableChange(
TableChanges.TableChangeType.CREATE, createTable(tableId)));
return new SnapshotPendingSplitsStateVersion6(
Arrays.asList(tableId),
Arrays.asList(schemalessSnapshotSplit),
assignedSplits,
tableSchemas,
new HashMap<>(),
assignerStatus,
Arrays.asList(TableId.parse("catalog2.schema2.table2")),
true,
true);
}
private static Table createTable(TableId id) { private static Table createTable(TableId id) {
TableEditor editor = Table.editor().tableId(id).setDefaultCharsetName("UTF8"); TableEditor editor = Table.editor().tableId(id).setDefaultCharsetName("UTF8");
editor.setComment("comment"); editor.setComment("comment");

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.assigner.state.version6;
import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsState;
/** The 6th version of HybridPendingSplitsState. */
public class HybridPendingSplitsStateVersion6 extends PendingSplitsState {
private final SnapshotPendingSplitsStateVersion6 snapshotPendingSplits;
private final boolean isStreamSplitAssigned;
public HybridPendingSplitsStateVersion6(
SnapshotPendingSplitsStateVersion6 snapshotPendingSplits,
boolean isStreamSplitAssigned) {
this.snapshotPendingSplits = snapshotPendingSplits;
this.isStreamSplitAssigned = isStreamSplitAssigned;
}
public SnapshotPendingSplitsStateVersion6 getSnapshotPendingSplits() {
return snapshotPendingSplits;
}
public boolean isStreamSplitAssigned() {
return isStreamSplitAssigned;
}
}

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.assigner.state.version6;
import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.version4.LegacySourceSplitSerializierVersion4;
import org.apache.flink.cdc.connectors.base.utils.SerializerUtils;
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/** The 6th version of PendingSplitsStateSerializer. */
public class PendingSplitsStateSerializerVersion6 {
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
private static final int SNAPSHOT_PENDING_SPLITS_STATE_FLAG = 1;
private static final int STREAM_PENDING_SPLITS_STATE_FLAG = 2;
private static final int HYBRID_PENDING_SPLITS_STATE_FLAG = 3;
public static byte[] serialize(PendingSplitsState state) throws IOException {
final DataOutputSerializer out = SERIALIZER_CACHE.get();
out.writeInt(5);
if (state instanceof SnapshotPendingSplitsStateVersion6) {
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsStateVersion6) state, out);
} else if (state instanceof HybridPendingSplitsStateVersion6) {
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
serializeHybridPendingSplitsState((HybridPendingSplitsStateVersion6) state, out);
} else if (state instanceof StreamPendingSplitsState) {
out.writeInt(STREAM_PENDING_SPLITS_STATE_FLAG);
serializeStreamPendingSplitsState((StreamPendingSplitsState) state, out);
}
final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
}
private static void serializeHybridPendingSplitsState(
HybridPendingSplitsStateVersion6 state, DataOutputSerializer out) throws IOException {
serializeSnapshotPendingSplitsState(state.getSnapshotPendingSplits(), out);
out.writeBoolean(state.isStreamSplitAssigned());
}
private static void serializeSnapshotPendingSplitsState(
SnapshotPendingSplitsStateVersion6 state, DataOutputSerializer out) throws IOException {
writeTableIds(state.getAlreadyProcessedTables(), out);
writeRemainingSplits(state.getRemainingSplits(), out);
writeAssignedSnapshotSplits(state.getAssignedSplits(), out);
writeFinishedOffsets(state.getSplitFinishedOffsets(), out);
out.writeInt(state.getSnapshotAssignerStatus().getStatusCode());
writeTableIds(state.getRemainingTables(), out);
out.writeBoolean(state.isTableIdCaseSensitive());
writeTableSchemas(state.getTableSchemas(), out);
}
private static void serializeStreamPendingSplitsState(
StreamPendingSplitsState state, DataOutputSerializer out) throws IOException {
out.writeBoolean(state.isStreamSplitAssigned());
}
private static void writeTableIds(Collection<TableId> tableIds, DataOutputSerializer out)
throws IOException {
final int size = tableIds.size();
out.writeInt(size);
for (TableId tableId : tableIds) {
boolean useCatalogBeforeSchema = SerializerUtils.shouldUseCatalogBeforeSchema(tableId);
out.writeBoolean(useCatalogBeforeSchema);
out.writeUTF(tableId.toString());
}
}
private static void writeRemainingSplits(
List<SchemalessSnapshotSplit> remainingSplits, DataOutputSerializer out)
throws IOException {
final int size = remainingSplits.size();
out.writeInt(size);
for (SchemalessSnapshotSplit split : remainingSplits) {
byte[] splitBytes = LegacySourceSplitSerializierVersion4.serialize(split);
out.writeInt(splitBytes.length);
out.write(splitBytes);
}
}
private static void writeAssignedSnapshotSplits(
Map<String, SchemalessSnapshotSplit> assignedSplits, DataOutputSerializer out)
throws IOException {
final int size = assignedSplits.size();
out.writeInt(size);
for (Map.Entry<String, SchemalessSnapshotSplit> entry : assignedSplits.entrySet()) {
out.writeUTF(entry.getKey());
byte[] splitBytes = LegacySourceSplitSerializierVersion4.serialize(entry.getValue());
out.writeInt(splitBytes.length);
out.write(splitBytes);
}
}
private static void writeFinishedOffsets(
Map<String, Offset> splitsInfo, DataOutputSerializer out) throws IOException {
final int size = splitsInfo.size();
out.writeInt(size);
for (Map.Entry<String, Offset> splitInfo : splitsInfo.entrySet()) {
out.writeUTF(splitInfo.getKey());
LegacySourceSplitSerializierVersion4.writeOffsetPosition(splitInfo.getValue(), out);
}
}
private static void writeTableSchemas(
Map<TableId, TableChanges.TableChange> tableSchemas, DataOutputSerializer out)
throws IOException {
FlinkJsonTableChangeSerializer jsonSerializer = new FlinkJsonTableChangeSerializer();
DocumentWriter documentWriter = DocumentWriter.defaultWriter();
final int size = tableSchemas.size();
out.writeInt(size);
for (Map.Entry<TableId, TableChanges.TableChange> entry : tableSchemas.entrySet()) {
boolean useCatalogBeforeSchema =
SerializerUtils.shouldUseCatalogBeforeSchema(entry.getKey());
out.writeBoolean(useCatalogBeforeSchema);
out.writeUTF(entry.getKey().toString());
final String tableChangeStr =
documentWriter.write(jsonSerializer.toDocument(entry.getValue()));
final byte[] tableChangeBytes = tableChangeStr.getBytes(StandardCharsets.UTF_8);
out.writeInt(tableChangeBytes.length);
out.write(tableChangeBytes);
}
}
}

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.assigner.state.version6;
import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.List;
import java.util.Map;
/**
* The 6th version of PendingSplitsStateSerializer. The modification of the 7th version: Add
* splitFinishedCheckpointIds variables.
*/
public class SnapshotPendingSplitsStateVersion6 extends PendingSplitsState {
/** The tables in the checkpoint. */
private final List<TableId> remainingTables;
/**
* The paths that are no longer in the enumerator checkpoint, but have been processed before and
* should this be ignored. Relevant only for sources in continuous monitoring mode.
*/
private final List<TableId> alreadyProcessedTables;
/** The splits in the checkpoint. */
private final List<SchemalessSnapshotSplit> remainingSplits;
/**
* The snapshot splits that the {@link IncrementalSourceEnumerator} has assigned to {@link
* IncrementalSourceSplitReader}s.
*/
private final Map<String, SchemalessSnapshotSplit> assignedSplits;
/* The {@link AssignerStatus} that indicates the snapshot assigner status. */
private final AssignerStatus assignerStatus;
/**
* The offsets of finished (snapshot) splits that the {@link IncrementalSourceEnumerator} has
* received from {@link IncrementalSourceSplitReader}s.
*/
private final Map<String, Offset> splitFinishedOffsets;
/** Whether the table identifier is case sensitive. */
private final boolean isTableIdCaseSensitive;
/** Whether the remaining tables are keep when snapshot state. */
private final boolean isRemainingTablesCheckpointed;
private final Map<TableId, TableChanges.TableChange> tableSchemas;
public SnapshotPendingSplitsStateVersion6(
List<TableId> alreadyProcessedTables,
List<SchemalessSnapshotSplit> remainingSplits,
Map<String, SchemalessSnapshotSplit> assignedSplits,
Map<TableId, TableChanges.TableChange> tableSchemas,
Map<String, Offset> splitFinishedOffsets,
AssignerStatus assignerStatus,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed) {
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
this.remainingTables = remainingTables;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.tableSchemas = tableSchemas;
}
public List<TableId> getAlreadyProcessedTables() {
return alreadyProcessedTables;
}
public List<SchemalessSnapshotSplit> getRemainingSplits() {
return remainingSplits;
}
public Map<String, SchemalessSnapshotSplit> getAssignedSplits() {
return assignedSplits;
}
public Map<TableId, TableChanges.TableChange> getTableSchemas() {
return tableSchemas;
}
public Map<String, Offset> getSplitFinishedOffsets() {
return splitFinishedOffsets;
}
public List<TableId> getRemainingTables() {
return remainingTables;
}
public boolean isTableIdCaseSensitive() {
return isTableIdCaseSensitive;
}
public boolean isRemainingTablesCheckpointed() {
return isRemainingTablesCheckpointed;
}
public AssignerStatus getSnapshotAssignerStatus() {
return assignerStatus;
}
}

@ -0,0 +1,443 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.parser;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.antlr.listener.AlterTableParserListener;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.base.experimental.utils.MySqlTypeUtils.fromDbzColumn;
/** Copied from {@link AlterTableParserListener} in Debezium 1.9.8.Final. */
public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private static final int STARTING_INDEX = 1;
private static final Logger LOG = LoggerFactory.getLogger(CustomAlterTableParserListener.class);
private final MySqlAntlrDdlParser parser;
private final List<ParseTreeListener> listeners;
private final LinkedList<SchemaChangeEvent> changes;
private org.apache.flink.cdc.common.event.TableId currentTable;
private List<ColumnEditor> columnEditors;
private CustomColumnDefinitionParserListener columnDefinitionListener;
private TableEditor tableEditor;
private int parsingColumnIndex = STARTING_INDEX;
public CustomAlterTableParserListener(
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners,
LinkedList<SchemaChangeEvent> changes) {
this.parser = parser;
this.listeners = listeners;
this.changes = changes;
}
@Override
public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId());
if (parser.databaseTables().forTable(tableId) == null) {
tableEditor = parser.databaseTables().editOrCreateTable(tableId);
}
super.enterColumnCreateTable(ctx);
}
@Override
public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
parser.runIfNotNull(
() -> {
// Make sure that the table's character set has been set ...
if (!tableEditor.hasDefaultCharsetName()) {
tableEditor.setDefaultCharsetName(
parser.charsetForTable(tableEditor.tableId()));
}
listeners.remove(columnDefinitionListener);
columnDefinitionListener = null;
// remove column definition parser listener
final String defaultCharsetName = tableEditor.create().defaultCharsetName();
tableEditor.setColumns(
tableEditor.columns().stream()
.map(
column -> {
final ColumnEditor columnEditor = column.edit();
if (columnEditor.charsetNameOfTable() == null) {
columnEditor.charsetNameOfTable(
defaultCharsetName);
}
return columnEditor;
})
.map(ColumnEditor::create)
.collect(Collectors.toList()));
parser.databaseTables().overwriteTable(tableEditor.create());
parser.signalCreateTable(tableEditor.tableId(), ctx);
Schema.Builder builder = Schema.newBuilder();
tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column)));
if (tableEditor.hasPrimaryKey()) {
builder.primaryKey(tableEditor.primaryKeyColumnNames());
}
changes.add(
new CreateTableEvent(
toCdcTableId(tableEditor.tableId()), builder.build()));
},
tableEditor);
super.exitColumnCreateTable(ctx);
}
@Override
public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
parser.runIfNotNull(
() -> {
String columnName = parser.parseName(ctx.uid());
ColumnEditor columnEditor = Column.editor().name(columnName);
if (columnDefinitionListener == null) {
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
} else {
columnDefinitionListener.setColumnEditor(columnEditor);
}
},
tableEditor);
super.enterColumnDeclaration(ctx);
}
@Override
public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
parser.runIfNotNull(
() -> {
tableEditor.addColumn(columnDefinitionListener.getColumn());
},
tableEditor,
columnDefinitionListener);
super.exitColumnDeclaration(ctx);
}
@Override
public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext ctx) {
parser.runIfNotNull(
() -> {
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
},
tableEditor);
super.enterPrimaryKeyTableConstraint(ctx);
}
@Override
public void enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) {
parser.runIfNotNull(
() -> {
if (!tableEditor.hasPrimaryKey()) {
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
}
},
tableEditor);
super.enterUniqueKeyTableConstraint(ctx);
}
@Override
public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
this.currentTable = toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId()));
super.enterAlterTable(ctx);
}
@Override
public void exitAlterTable(MySqlParser.AlterTableContext ctx) {
listeners.remove(columnDefinitionListener);
super.exitAlterTable(ctx);
this.currentTable = null;
}
@Override
public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
String columnName = parser.parseName(ctx.uid(0));
ColumnEditor columnEditor = Column.editor().name(columnName);
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.exitAlterByAddColumn(ctx);
}
@Override
public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
parser.runIfNotNull(
() -> {
Column column = columnDefinitionListener.getColumn();
if (ctx.FIRST() != null) {
changes.add(
new AddColumnEvent(
currentTable,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
toCdcColumn(column),
AddColumnEvent.ColumnPosition.FIRST,
null))));
} else if (ctx.AFTER() != null) {
String afterColumn = parser.parseName(ctx.uid(1));
changes.add(
new AddColumnEvent(
currentTable,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
toCdcColumn(column),
AddColumnEvent.ColumnPosition.AFTER,
afterColumn))));
} else {
changes.add(
new AddColumnEvent(
currentTable,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
toCdcColumn(column)))));
}
listeners.remove(columnDefinitionListener);
},
columnDefinitionListener);
super.exitAlterByAddColumn(ctx);
}
@Override
public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
// multiple columns are added. Initialize a list of column editors for them
columnEditors = new ArrayList<>(ctx.uid().size());
for (MySqlParser.UidContext uidContext : ctx.uid()) {
String columnName = parser.parseName(uidContext);
columnEditors.add(Column.editor().name(columnName));
}
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
tableEditor, columnEditors.get(0), parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByAddColumns(ctx);
}
@Override
public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) {
parser.runIfNotNull(
() -> {
if (columnEditors != null) {
// column editor list is not null when a multiple columns are parsed in one
// statement
if (columnEditors.size() > parsingColumnIndex) {
// assign next column editor to parse another column definition
columnDefinitionListener.setColumnEditor(
columnEditors.get(parsingColumnIndex++));
}
}
},
columnEditors);
super.exitColumnDefinition(ctx);
}
@Override
public void exitAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
parser.runIfNotNull(
() -> {
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
columnEditors.forEach(
columnEditor -> {
Column column = columnEditor.create();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(toCdcColumn(column)));
});
changes.add(new AddColumnEvent(currentTable, addedColumns));
listeners.remove(columnDefinitionListener);
columnEditors = null;
parsingColumnIndex = STARTING_INDEX;
},
columnEditors);
super.exitAlterByAddColumns(ctx);
}
@Override
public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) {
String oldColumnName = parser.parseName(ctx.oldColumn);
ColumnEditor columnEditor = Column.editor().name(oldColumnName);
columnEditor.unsetDefaultValueExpression();
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByChangeColumn(ctx);
}
@Override
public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) {
parser.runIfNotNull(
() -> {
Column column = columnDefinitionListener.getColumn();
String newColumnName = parser.parseName(ctx.newColumn);
Map<String, DataType> typeMapping = new HashMap<>();
typeMapping.put(column.name(), convertDataType(fromDbzColumn(column)));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) {
Map<String, String> renameMap = new HashMap<>();
renameMap.put(column.name(), newColumnName);
changes.add(new RenameColumnEvent(currentTable, renameMap));
}
listeners.remove(columnDefinitionListener);
},
columnDefinitionListener);
super.exitAlterByChangeColumn(ctx);
}
private DataType convertDataType(org.apache.flink.table.types.DataType dataType) {
if (dataType.getLogicalType().is(LogicalTypeRoot.INTEGER)) {
return DataTypes.INT();
}
if (dataType.getLogicalType().is(LogicalTypeRoot.BIGINT)) {
return DataTypes.BIGINT();
}
if (dataType.getLogicalType().is(LogicalTypeRoot.FLOAT)) {
return DataTypes.FLOAT();
}
if (dataType.getLogicalType().is(LogicalTypeRoot.DATE)) {
return DataTypes.DATE();
}
return DataTypes.STRING();
}
@Override
public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) {
String removedColName = parser.parseName(ctx.uid());
changes.add(new DropColumnEvent(currentTable, Collections.singletonList(removedColName)));
super.enterAlterByDropColumn(ctx);
}
@Override
public void enterAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) {
String oldColumnName = parser.parseName(ctx.oldColumn);
ColumnEditor columnEditor = Column.editor().name(oldColumnName);
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByRenameColumn(ctx);
}
@Override
public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) {
String oldColumnName = parser.parseName(ctx.uid(0));
ColumnEditor columnEditor = Column.editor().name(oldColumnName);
columnEditor.unsetDefaultValueExpression();
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByModifyColumn(ctx);
}
@Override
public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) {
parser.runIfNotNull(
() -> {
Column column = columnDefinitionListener.getColumn();
Map<String, DataType> typeMapping = new HashMap<>();
typeMapping.put(column.name(), convertDataType(fromDbzColumn(column)));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
listeners.remove(columnDefinitionListener);
},
columnDefinitionListener);
super.exitAlterByModifyColumn(ctx);
}
@Override
public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) {
parser.runIfNotNull(
() -> {
Column column = columnDefinitionListener.getColumn();
String newColumnName = parser.parseName(ctx.newColumn);
if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) {
Map<String, String> renameMap = new HashMap<>();
renameMap.put(column.name(), newColumnName);
changes.add(new RenameColumnEvent(currentTable, renameMap));
}
listeners.remove(columnDefinitionListener);
},
columnDefinitionListener);
super.exitAlterByRenameColumn(ctx);
}
@Override
public void exitTruncateTable(MySqlParser.TruncateTableContext ctx) {
TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId());
changes.add(new TruncateTableEvent(toCdcTableId(tableId)));
super.exitTruncateTable(ctx);
}
@Override
public void exitDropTable(MySqlParser.DropTableContext ctx) {
ctx.tables()
.tableName()
.forEach(
evt -> {
TableId tableId = parser.parseQualifiedTableId(evt.fullId());
changes.add(new DropTableEvent(toCdcTableId(tableId)));
});
super.exitDropTable(ctx);
}
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(),
convertDataType(fromDbzColumn(dbzColumn)),
dbzColumn.comment(),
dbzColumn.defaultValueExpression().orElse(null));
}
private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId dbzTableId) {
return org.apache.flink.cdc.common.event.TableId.tableId(
dbzTableId.catalog(), dbzTableId.table());
}
}

@ -0,0 +1,333 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.parser;
import io.debezium.antlr.AntlrDdlParser;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.antlr.listener.DefaultValueParserListener;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
import io.debezium.relational.ddl.DataType;
import io.debezium.util.Strings;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Types;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/** Parser listener that is parsing column definition part of MySQL statements. */
public class CustomColumnDefinitionParserListener extends MySqlParserBaseListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(CustomColumnDefinitionParserListener.class);
private static final Pattern DOT = Pattern.compile("\\.");
private final MySqlAntlrDdlParser parser;
private final DataTypeResolver dataTypeResolver;
private ColumnEditor columnEditor;
private boolean uniqueColumn;
private AtomicReference<Boolean> optionalColumn = new AtomicReference<>();
private DefaultValueParserListener defaultValueListener;
private final TableEditor tableEditor;
private final List<ParseTreeListener> listeners;
public CustomColumnDefinitionParserListener(
TableEditor tableEditor,
ColumnEditor columnEditor,
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners) {
this.tableEditor = tableEditor;
this.columnEditor = columnEditor;
this.parser = parser;
this.dataTypeResolver = parser.dataTypeResolver();
this.listeners = listeners;
}
public void setColumnEditor(ColumnEditor columnEditor) {
this.columnEditor = columnEditor;
}
public ColumnEditor getColumnEditor() {
return columnEditor;
}
public Column getColumn() {
return columnEditor.create();
}
@Override
public void enterColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) {
uniqueColumn = false;
optionalColumn = new AtomicReference<>();
resolveColumnDataType(ctx.dataType());
defaultValueListener = new DefaultValueParserListener(columnEditor, optionalColumn);
listeners.add(defaultValueListener);
super.enterColumnDefinition(ctx);
}
@Override
public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) {
if (optionalColumn.get() != null) {
columnEditor.optional(optionalColumn.get().booleanValue());
}
defaultValueListener.exitDefaultValue(false);
listeners.remove(defaultValueListener);
super.exitColumnDefinition(ctx);
}
@Override
public void enterUniqueKeyColumnConstraint(MySqlParser.UniqueKeyColumnConstraintContext ctx) {
uniqueColumn = true;
super.enterUniqueKeyColumnConstraint(ctx);
}
@Override
public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstraintContext ctx) {
// this rule will be parsed only if no primary key is set in a table
// otherwise the statement can't be executed due to multiple primary key error
optionalColumn.set(Boolean.FALSE);
tableEditor.addColumn(columnEditor.create());
tableEditor.setPrimaryKeyNames(columnEditor.name());
super.enterPrimaryKeyColumnConstraint(ctx);
}
@Override
public void enterCommentColumnConstraint(MySqlParser.CommentColumnConstraintContext ctx) {
if (!parser.skipComments()) {
if (ctx.STRING_LITERAL() != null) {
columnEditor.comment(parser.withoutQuotes(ctx.STRING_LITERAL().getText()));
}
}
super.enterCommentColumnConstraint(ctx);
}
@Override
public void enterNullNotnull(MySqlParser.NullNotnullContext ctx) {
optionalColumn.set(Boolean.valueOf(ctx.NOT() == null));
super.enterNullNotnull(ctx);
}
@Override
public void enterAutoIncrementColumnConstraint(
MySqlParser.AutoIncrementColumnConstraintContext ctx) {
columnEditor.autoIncremented(true);
columnEditor.generated(true);
super.enterAutoIncrementColumnConstraint(ctx);
}
@Override
public void enterSerialDefaultColumnConstraint(
MySqlParser.SerialDefaultColumnConstraintContext ctx) {
serialColumn();
super.enterSerialDefaultColumnConstraint(ctx);
}
private void resolveColumnDataType(MySqlParser.DataTypeContext dataTypeContext) {
String charsetName = null;
DataType dataType = dataTypeResolver.resolveDataType(dataTypeContext);
if (dataTypeContext instanceof MySqlParser.StringDataTypeContext) {
// Same as LongVarcharDataTypeContext but with dimension handling
MySqlParser.StringDataTypeContext stringDataTypeContext =
(MySqlParser.StringDataTypeContext) dataTypeContext;
if (stringDataTypeContext.lengthOneDimension() != null) {
Integer length =
parseLength(
stringDataTypeContext
.lengthOneDimension()
.decimalLiteral()
.getText());
columnEditor.length(length);
}
charsetName =
parser.extractCharset(
stringDataTypeContext.charsetName(),
stringDataTypeContext.collationName());
} else if (dataTypeContext instanceof MySqlParser.LongVarcharDataTypeContext) {
// Same as StringDataTypeContext but without dimension handling
MySqlParser.LongVarcharDataTypeContext longVarcharTypeContext =
(MySqlParser.LongVarcharDataTypeContext) dataTypeContext;
charsetName =
parser.extractCharset(
longVarcharTypeContext.charsetName(),
longVarcharTypeContext.collationName());
} else if (dataTypeContext instanceof MySqlParser.NationalStringDataTypeContext) {
MySqlParser.NationalStringDataTypeContext nationalStringDataTypeContext =
(MySqlParser.NationalStringDataTypeContext) dataTypeContext;
if (nationalStringDataTypeContext.lengthOneDimension() != null) {
Integer length =
parseLength(
nationalStringDataTypeContext
.lengthOneDimension()
.decimalLiteral()
.getText());
columnEditor.length(length);
}
} else if (dataTypeContext instanceof MySqlParser.NationalVaryingStringDataTypeContext) {
MySqlParser.NationalVaryingStringDataTypeContext nationalVaryingStringDataTypeContext =
(MySqlParser.NationalVaryingStringDataTypeContext) dataTypeContext;
if (nationalVaryingStringDataTypeContext.lengthOneDimension() != null) {
Integer length =
parseLength(
nationalVaryingStringDataTypeContext
.lengthOneDimension()
.decimalLiteral()
.getText());
columnEditor.length(length);
}
} else if (dataTypeContext instanceof MySqlParser.DimensionDataTypeContext) {
MySqlParser.DimensionDataTypeContext dimensionDataTypeContext =
(MySqlParser.DimensionDataTypeContext) dataTypeContext;
Integer length = null;
Integer scale = null;
if (dimensionDataTypeContext.lengthOneDimension() != null) {
length =
parseLength(
dimensionDataTypeContext
.lengthOneDimension()
.decimalLiteral()
.getText());
}
if (dimensionDataTypeContext.lengthTwoDimension() != null) {
List<MySqlParser.DecimalLiteralContext> decimalLiterals =
dimensionDataTypeContext.lengthTwoDimension().decimalLiteral();
length = parseLength(decimalLiterals.get(0).getText());
scale = Integer.valueOf(decimalLiterals.get(1).getText());
}
if (dimensionDataTypeContext.lengthTwoOptionalDimension() != null) {
List<MySqlParser.DecimalLiteralContext> decimalLiterals =
dimensionDataTypeContext.lengthTwoOptionalDimension().decimalLiteral();
if (decimalLiterals.get(0).REAL_LITERAL() != null) {
String[] digits = DOT.split(decimalLiterals.get(0).getText());
if (Strings.isNullOrEmpty(digits[0]) || Integer.valueOf(digits[0]) == 0) {
// Set default value 10 according mysql engine
length = 10;
} else {
length = parseLength(digits[0]);
}
} else {
length = parseLength(decimalLiterals.get(0).getText());
}
if (decimalLiterals.size() > 1) {
scale = Integer.valueOf(decimalLiterals.get(1).getText());
}
}
if (length != null) {
columnEditor.length(length);
}
if (scale != null) {
columnEditor.scale(scale);
}
} else if (dataTypeContext instanceof MySqlParser.CollectionDataTypeContext) {
MySqlParser.CollectionDataTypeContext collectionDataTypeContext =
(MySqlParser.CollectionDataTypeContext) dataTypeContext;
if (collectionDataTypeContext.charsetName() != null) {
charsetName = collectionDataTypeContext.charsetName().getText();
}
if (dataType.name().equalsIgnoreCase("SET")) {
// After DBZ-132, it will always be comma separated
int optionsSize =
collectionDataTypeContext.collectionOptions().collectionOption().size();
columnEditor.length(
Math.max(0, optionsSize * 2 - 1)); // number of options + number of commas
} else {
columnEditor.length(1);
}
}
String dataTypeName = dataType.name().toUpperCase();
if (dataTypeName.equals("ENUM") || dataTypeName.equals("SET")) {
// type expression has to be set, because the value converter needs to know the enum or
// set options
MySqlParser.CollectionDataTypeContext collectionDataTypeContext =
(MySqlParser.CollectionDataTypeContext) dataTypeContext;
List<String> collectionOptions =
collectionDataTypeContext.collectionOptions().collectionOption().stream()
.map(AntlrDdlParser::getText)
.collect(Collectors.toList());
columnEditor.type(dataTypeName);
columnEditor.enumValues(collectionOptions);
} else if (dataTypeName.equals("SERIAL")) {
// SERIAL is an alias for BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE
columnEditor.type("BIGINT UNSIGNED");
serialColumn();
} else {
columnEditor.type(dataTypeName);
}
int jdbcDataType = dataType.jdbcType();
columnEditor.jdbcType(jdbcDataType);
if (columnEditor.length() == -1) {
columnEditor.length((int) dataType.length());
}
if (!columnEditor.scale().isPresent() && dataType.scale() != Column.UNSET_INT_VALUE) {
columnEditor.scale(dataType.scale());
}
if (Types.NCHAR == jdbcDataType || Types.NVARCHAR == jdbcDataType) {
// NCHAR and NVARCHAR columns always uses utf8 as charset
columnEditor.charsetName("utf8");
} else {
columnEditor.charsetName(charsetName);
}
}
private Integer parseLength(String lengthStr) {
Long length = Long.parseLong(lengthStr);
if (length > Integer.MAX_VALUE) {
LOGGER.warn(
"The length '{}' of the column `{}` is too large to be supported, truncating it to '{}'",
length,
columnEditor.name(),
Integer.MAX_VALUE);
length = (long) Integer.MAX_VALUE;
}
return length.intValue();
}
private void serialColumn() {
if (optionalColumn.get() == null) {
optionalColumn.set(Boolean.FALSE);
}
uniqueColumn = true;
columnEditor.autoIncremented(true);
columnEditor.generated(true);
}
}

@ -0,0 +1,288 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.parser;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
/** A ddl parser that will use custom listener. */
public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {
private final LinkedList<SchemaChangeEvent> parsedEvents;
public CustomMySqlAntlrDdlParser() {
super();
this.parsedEvents = new LinkedList<>();
}
// Overriding this method because the BIT type requires default length dimension of 1.
// Remove it when debezium fixed this issue.
@Override
protected DataTypeResolver initializeDataTypeResolver() {
DataTypeResolver.Builder dataTypeResolverBuilder = new DataTypeResolver.Builder();
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.StringDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHAR),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.CHAR, MySqlParser.VARYING),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.VARCHAR),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TINYTEXT),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TEXT),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.MEDIUMTEXT),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONGTEXT),
new DataTypeResolver.DataTypeEntry(Types.NCHAR, MySqlParser.NCHAR),
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARYING),
new DataTypeResolver.DataTypeEntry(Types.NVARCHAR, MySqlParser.NVARCHAR),
new DataTypeResolver.DataTypeEntry(
Types.CHAR, MySqlParser.CHAR, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.VARCHAR, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.TINYTEXT, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.TEXT, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.MEDIUMTEXT, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.LONGTEXT, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.NCHAR, MySqlParser.NCHAR, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR, MySqlParser.NVARCHAR, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHARACTER),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.CHARACTER, MySqlParser.VARYING)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.NationalStringDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR, MySqlParser.NATIONAL, MySqlParser.VARCHAR)
.setSuffixTokens(MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.NCHAR, MySqlParser.NATIONAL, MySqlParser.CHARACTER)
.setSuffixTokens(MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARCHAR)
.setSuffixTokens(MySqlParser.BINARY)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.NationalVaryingStringDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR,
MySqlParser.NATIONAL,
MySqlParser.CHAR,
MySqlParser.VARYING),
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR,
MySqlParser.NATIONAL,
MySqlParser.CHARACTER,
MySqlParser.VARYING)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.DimensionDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.TINYINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT1)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.SMALLINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT2)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MEDIUMINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT3)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MIDDLEINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INTEGER)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT4)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.BIGINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.INT8)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.REAL, MySqlParser.REAL)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.DOUBLE)
.setSuffixTokens(
MySqlParser.PRECISION,
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.FLOAT8)
.setSuffixTokens(
MySqlParser.PRECISION,
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT4)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DECIMAL)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL)
.setDefaultLengthScaleDimension(10, 0),
new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DEC)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL)
.setDefaultLengthScaleDimension(10, 0),
new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.FIXED)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL)
.setDefaultLengthScaleDimension(10, 0),
new DataTypeResolver.DataTypeEntry(Types.NUMERIC, MySqlParser.NUMERIC)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL)
.setDefaultLengthScaleDimension(10, 0),
new DataTypeResolver.DataTypeEntry(Types.BIT, MySqlParser.BIT)
.setDefaultLengthDimension(1),
new DataTypeResolver.DataTypeEntry(Types.TIME, MySqlParser.TIME),
new DataTypeResolver.DataTypeEntry(
Types.TIMESTAMP_WITH_TIMEZONE, MySqlParser.TIMESTAMP),
new DataTypeResolver.DataTypeEntry(Types.TIMESTAMP, MySqlParser.DATETIME),
new DataTypeResolver.DataTypeEntry(Types.BINARY, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(Types.VARBINARY, MySqlParser.VARBINARY),
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.BLOB),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.YEAR)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.SimpleDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.DATE, MySqlParser.DATE),
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.TINYBLOB),
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.MEDIUMBLOB),
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONGBLOB),
new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOL),
new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOLEAN),
new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.SERIAL)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.CollectionDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.ENUM)
.setSuffixTokens(MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.SET)
.setSuffixTokens(MySqlParser.BINARY)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.SpatialDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(
Types.OTHER, MySqlParser.GEOMETRYCOLLECTION),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMCOLLECTION),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.LINESTRING),
new DataTypeResolver.DataTypeEntry(
Types.OTHER, MySqlParser.MULTILINESTRING),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOINT),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOLYGON),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POINT),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POLYGON),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.JSON),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMETRY)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.LongVarbinaryDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONG)
.setSuffixTokens(MySqlParser.VARBINARY)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.LongVarcharDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONG)
.setSuffixTokens(MySqlParser.VARCHAR)));
return dataTypeResolverBuilder.build();
}
@Override
protected AntlrDdlParserListener createParseTreeWalkerListener() {
return new CustomMySqlAntlrDdlParserListener(this, parsedEvents);
}
public List<SchemaChangeEvent> getAndClearParsedEvents() {
List<SchemaChangeEvent> result = new ArrayList<>(parsedEvents);
parsedEvents.clear();
return result;
}
}

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.parser;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.ProxyParseTreeListenerUtil;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.antlr.listener.AlterViewParserListener;
import io.debezium.connector.mysql.antlr.listener.CreateAndAlterDatabaseParserListener;
import io.debezium.connector.mysql.antlr.listener.CreateTableParserListener;
import io.debezium.connector.mysql.antlr.listener.CreateUniqueIndexParserListener;
import io.debezium.connector.mysql.antlr.listener.CreateViewParserListener;
import io.debezium.connector.mysql.antlr.listener.DropDatabaseParserListener;
import io.debezium.connector.mysql.antlr.listener.DropTableParserListener;
import io.debezium.connector.mysql.antlr.listener.DropViewParserListener;
import io.debezium.connector.mysql.antlr.listener.MySqlAntlrDdlParserListener;
import io.debezium.connector.mysql.antlr.listener.RenameTableParserListener;
import io.debezium.connector.mysql.antlr.listener.SetStatementParserListener;
import io.debezium.connector.mysql.antlr.listener.TruncateTableParserListener;
import io.debezium.connector.mysql.antlr.listener.UseStatementParserListener;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.text.ParsingException;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.tree.ErrorNode;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.antlr.v4.runtime.tree.TerminalNode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Copied from {@link MySqlAntlrDdlParserListener} in Debezium 1.9.8.final.
*
* <p>This listener's constructor will use some modified listener.
*/
public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener
implements AntlrDdlParserListener {
/** Collection of listeners for delegation of events. */
private final List<ParseTreeListener> listeners = new CopyOnWriteArrayList<>();
/** Flag for skipping phase. */
private boolean skipNodes;
/**
* Count of skipped nodes. Each enter event during skipping phase will increase the counter and
* each exit event will decrease it. When counter will be decreased to 0, the skipping phase
* will end.
*/
private int skippedNodesCount = 0;
/** Collection of catched exceptions. */
private final Collection<ParsingException> errors = new ArrayList<>();
public CustomMySqlAntlrDdlParserListener(
MySqlAntlrDdlParser parser, LinkedList<SchemaChangeEvent> parsedEvents) {
// initialize listeners
listeners.add(new CreateAndAlterDatabaseParserListener(parser));
listeners.add(new DropDatabaseParserListener(parser));
listeners.add(new CreateTableParserListener(parser, listeners));
listeners.add(new CustomAlterTableParserListener(parser, listeners, parsedEvents));
listeners.add(new DropTableParserListener(parser));
listeners.add(new RenameTableParserListener(parser));
listeners.add(new TruncateTableParserListener(parser));
listeners.add(new CreateViewParserListener(parser, listeners));
listeners.add(new AlterViewParserListener(parser, listeners));
listeners.add(new DropViewParserListener(parser));
listeners.add(new CreateUniqueIndexParserListener(parser));
listeners.add(new SetStatementParserListener(parser));
listeners.add(new UseStatementParserListener(parser));
}
/**
* Returns all caught errors during tree walk.
*
* @return list of Parsing exceptions
*/
@Override
public Collection<ParsingException> getErrors() {
return errors;
}
@Override
public void enterEveryRule(ParserRuleContext ctx) {
if (skipNodes) {
skippedNodesCount++;
} else {
ProxyParseTreeListenerUtil.delegateEnterRule(ctx, listeners, errors);
}
}
@Override
public void exitEveryRule(ParserRuleContext ctx) {
if (skipNodes) {
if (skippedNodesCount == 0) {
// back in the node where skipping started
skipNodes = false;
} else {
// going up in a tree, means decreasing a number of skipped nodes
skippedNodesCount--;
}
} else {
ProxyParseTreeListenerUtil.delegateExitRule(ctx, listeners, errors);
}
}
@Override
public void visitErrorNode(ErrorNode node) {
ProxyParseTreeListenerUtil.visitErrorNode(node, listeners, errors);
}
@Override
public void visitTerminal(TerminalNode node) {
ProxyParseTreeListenerUtil.visitTerminal(node, listeners, errors);
}
@Override
public void enterRoutineBody(MySqlParser.RoutineBodyContext ctx) {
// this is a grammar rule for BEGIN ... END part of statements. Skip it.
skipNodes = true;
}
}

@ -0,0 +1,36 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: metrics
-- ----------------------------------------------------------------------------------------------------------------
CREATE TABLE users (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
age integer
);
ALTER TABLE users AUTO_INCREMENT = 101;
INSERT INTO users
VALUES (default,"Tom",3),
(default,"Jack",5),
(default,"Allen",10),
(default,"Andrew",13),
(default,"Arnold",15),
(default,"Claud",19),
(default,"Howard",37),
(default,"Jacob",46),
(default,"Lionel",58);

@ -26,6 +26,7 @@ import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader; import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter; import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamOffset; import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.connector.base.source.reader.RecordEmitter;
@ -71,10 +72,14 @@ public final class MongoDBRecordEmitter<T> extends IncrementalSourceRecordEmitte
} else if (isHeartbeatEvent(element)) { } else if (isHeartbeatEvent(element)) {
if (splitState.isStreamSplitState()) { if (splitState.isStreamSplitState()) {
updatePositionForStreamSplit(element, splitState); updatePositionForStreamSplit(element, splitState);
sourceReaderMetrics.updateLastReceivedEventTime(
MongoRecordUtils.getMessageTimestamp(element));
} }
} else if (isDataChangeRecord(element)) { } else if (isDataChangeRecord(element)) {
if (splitState.isStreamSplitState()) { if (splitState.isStreamSplitState()) {
updatePositionForStreamSplit(element, splitState); updatePositionForStreamSplit(element, splitState);
sourceReaderMetrics.updateLastReceivedEventTime(
MongoRecordUtils.getMessageTimestamp(element));
} }
reportMetrics(element); reportMetrics(element);
emitElement(element, output); emitElement(element, output);
@ -94,6 +99,17 @@ public final class MongoDBRecordEmitter<T> extends IncrementalSourceRecordEmitte
splitState.asStreamSplitState().setStartingOffset(offset); splitState.asStreamSplitState().setStartingOffset(offset);
} }
@Override
protected void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
sourceReaderMetrics.markRecord();
sourceReaderMetrics.updateRecordCounters(element);
outputCollector.output = output;
// use mongo timestamp as the current message timestamp
outputCollector.currentMessageTimestamp = MongoRecordUtils.getMessageTimestamp(element);
debeziumDeserializationSchema.deserialize(element, outputCollector);
}
@Override @Override
protected void reportMetrics(SourceRecord element) { protected void reportMetrics(SourceRecord element) {
Long messageTimestamp = getMessageTimestamp(element); Long messageTimestamp = getMessageTimestamp(element);

@ -321,14 +321,16 @@ public class PostgresSourceBuilder<T> {
remainingTables, remainingTables,
isTableIdCaseSensitive, isTableIdCaseSensitive,
dataSourceDialect, dataSourceDialect,
offsetFactory); offsetFactory,
enumContext);
} catch (Exception e) { } catch (Exception e) {
throw new FlinkRuntimeException( throw new FlinkRuntimeException(
"Failed to discover captured tables for enumerator", e); "Failed to discover captured tables for enumerator", e);
} }
} else { } else {
splitAssigner = splitAssigner =
new StreamSplitAssigner(sourceConfig, dataSourceDialect, offsetFactory); new StreamSplitAssigner(
sourceConfig, dataSourceDialect, offsetFactory, enumContext);
} }
return new PostgresSourceEnumerator( return new PostgresSourceEnumerator(
@ -352,14 +354,16 @@ public class PostgresSourceBuilder<T> {
enumContext.currentParallelism(), enumContext.currentParallelism(),
(HybridPendingSplitsState) checkpoint, (HybridPendingSplitsState) checkpoint,
dataSourceDialect, dataSourceDialect,
offsetFactory); offsetFactory,
enumContext);
} else if (checkpoint instanceof StreamPendingSplitsState) { } else if (checkpoint instanceof StreamPendingSplitsState) {
splitAssigner = splitAssigner =
new StreamSplitAssigner( new StreamSplitAssigner(
sourceConfig, sourceConfig,
(StreamPendingSplitsState) checkpoint, (StreamPendingSplitsState) checkpoint,
dataSourceDialect, dataSourceDialect,
offsetFactory); offsetFactory,
enumContext);
} else { } else {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Unsupported restored PendingSplitsState: " + checkpoint); "Unsupported restored PendingSplitsState: " + checkpoint);
@ -385,7 +389,6 @@ public class PostgresSourceBuilder<T> {
final SourceReaderMetrics sourceReaderMetrics = final SourceReaderMetrics sourceReaderMetrics =
new SourceReaderMetrics(readerContext.metricGroup()); new SourceReaderMetrics(readerContext.metricGroup());
sourceReaderMetrics.registerMetrics();
IncrementalSourceReaderContext incrementalSourceReaderContext = IncrementalSourceReaderContext incrementalSourceReaderContext =
new IncrementalSourceReaderContext(readerContext); new IncrementalSourceReaderContext(readerContext);
Supplier<IncrementalSourceSplitReader<JdbcSourceConfig>> splitReaderSupplier = Supplier<IncrementalSourceSplitReader<JdbcSourceConfig>> splitReaderSupplier =

Loading…
Cancel
Save