[oracle] Fix the incremental phase not skip the data that has been read in the snapshot full phase (#2215)

Co-authored-by: Hang Ruan <ruanhang1993@hotmail.com>
pull/2179/head
molsionmo 2 years ago committed by GitHub
parent aff5b0566d
commit 734982e075
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -110,6 +110,8 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So
for (DataChangeEvent event : batch) {
if (shouldEmit(event.getRecord())) {
sourceRecords.add(event.getRecord());
} else {
LOG.debug("{} data change event should not emit", event);
}
}
}

@ -157,7 +157,7 @@ public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
}
public SchemaNameAdjuster getSchemaNameAdjuster() {
return null;
return schemaNameAdjuster;
}
public abstract RelationalDatabaseSchema getDatabaseSchema();

@ -19,7 +19,6 @@ package com.ververica.cdc.connectors.oracle;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
import io.debezium.connector.oracle.OracleConnector;
import javax.annotation.Nullable;
@ -167,7 +166,6 @@ public class OracleSource {
props.setProperty("table.include.list", String.join(",", tableList));
}
DebeziumOffset specificOffset = null;
switch (startupOptions.startupMode) {
case INITIAL:
props.setProperty("snapshot.mode", "initial");
@ -193,7 +191,7 @@ public class OracleSource {
}
return new DebeziumSourceFunction<>(
deserializer, props, specificOffset, new OracleValidator(props));
deserializer, props, null, new OracleValidator(props));
}
}
}

@ -74,7 +74,8 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
long start = System.currentTimeMillis();
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn = getSplitColumn(table, sourceConfig);
Column splitColumn =
ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
@ -377,8 +378,4 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
}
public static Column getSplitColumn(Table table, JdbcSourceConfig sourceConfig) {
return ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn());
}
}

@ -25,9 +25,11 @@ import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import com.ververica.cdc.connectors.base.utils.SourceRecordUtils;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import com.ververica.cdc.connectors.oracle.source.utils.OracleUtils;
import com.ververica.cdc.connectors.oracle.util.ChunkUtils;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.OracleChangeEventSourceMetricsFactory;
import io.debezium.connector.oracle.OracleConnection;
@ -52,14 +54,19 @@ import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import oracle.sql.ROWID;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Map;
import static com.ververica.cdc.connectors.oracle.util.ChunkUtils.getChunkKeyColumn;
/** The context for fetch task that fetching data of snapshot split from Oracle data source. */
public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
@ -186,7 +193,43 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
@Override
public RowType getSplitType(Table table) {
return OracleUtils.getSplitType(table);
OracleSourceConfig oracleSourceConfig = getSourceConfig();
return ChunkUtils.getSplitType(
getChunkKeyColumn(table, oracleSourceConfig.getChunkKeyColumn()));
}
@Override
public boolean isDataChangeRecord(SourceRecord record) {
return SourceRecordUtils.isDataChangeRecord(record);
}
@Override
public boolean isRecordBetween(SourceRecord record, Object[] splitStart, Object[] splitEnd) {
RowType splitKeyType =
getSplitType(getDatabaseSchema().tableFor(SourceRecordUtils.getTableId(record)));
// RowId is chunk key column by default, compare RowId
if (splitKeyType.getFieldNames().contains(ROWID.class.getSimpleName())) {
ConnectHeaders headers = (ConnectHeaders) record.headers();
ROWID rowId = null;
try {
rowId = new ROWID(headers.iterator().next().value().toString());
} catch (SQLException e) {
LOG.error("{} can not convert to RowId", record);
}
Object[] rowIds = new ROWID[] {rowId};
return SourceRecordUtils.splitKeyRangeContains(rowIds, splitStart, splitEnd);
} else {
// config chunk key column compare
Object[] key =
SourceRecordUtils.getSplitKey(splitKeyType, record, getSchemaNameAdjuster());
return SourceRecordUtils.splitKeyRangeContains(key, splitStart, splitEnd);
}
}
@Override
public TableId getTableId(SourceRecord record) {
return SourceRecordUtils.getTableId(record);
}
@Override

@ -16,7 +16,6 @@
package com.ververica.cdc.connectors.oracle.source.utils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
@ -28,8 +27,6 @@ import io.debezium.connector.oracle.OracleTopicSelector;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.StreamingAdapter;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
@ -40,14 +37,11 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
/** Utils to prepare Oracle SQL statement. */
public class OracleUtils {
@ -245,20 +239,6 @@ public class OracleUtils {
}
}
public static RowType getSplitType(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}
// use first field in primary key as the split key
return getSplitType(primaryKeys.get(0));
}
/** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
public static OracleDatabaseSchema createOracleDatabaseSchema(
OracleConnectorConfig dbzOracleConfig) {
@ -313,26 +293,6 @@ public class OracleUtils {
return new RedoLogOffset(offsetStrMap);
}
public static RowType getSplitType(Column splitColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(), OracleTypeUtils.fromDbzColumn(splitColumn)))
.getLogicalType();
}
public static Column getSplitColumn(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}
// use first field in primary key as the split key
return primaryKeys.get(0);
}
public static String quote(String dbOrTableName) {
return "\"" + dbOrTableName + "\"";
}

@ -17,7 +17,9 @@
package com.ververica.cdc.connectors.oracle.util;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.oracle.source.utils.OracleTypeUtils;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import oracle.sql.ROWID;
@ -29,11 +31,20 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
/** Utilities to split chunks of table. */
public class ChunkUtils {
private ChunkUtils() {}
public static RowType getSplitType(Column splitColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(), OracleTypeUtils.fromDbzColumn(splitColumn)))
.getLogicalType();
}
public static Column getChunkKeyColumn(Table table, @Nullable String chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns();

@ -0,0 +1,268 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.BaseChangeRecordEmitter;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Table;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.util.Clock;
import oracle.sql.ROWID;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
/**
* Copied from Debezium 1.6.4.Final.
*
* <p>Emits change record based on a single {@link LogMinerDmlEntry} event.
*
* <p>This class overrides the emit methods to put the ROWID in the header.
*
* <p>Line 59 ~ 257: add ROWID and emit methods.
*/
public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter<Object> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerChangeRecordEmitter.class);
private final int operation;
private final Object[] oldValues;
private final Object[] newValues;
private final String rowId;
public LogMinerChangeRecordEmitter(
OffsetContext offset,
int operation,
Object[] oldValues,
Object[] newValues,
Table table,
Clock clock) {
super(offset, table, clock);
this.operation = operation;
this.oldValues = oldValues;
this.newValues = newValues;
this.rowId = null;
}
public LogMinerChangeRecordEmitter(
OffsetContext offset,
int operation,
Object[] oldValues,
Object[] newValues,
Table table,
Clock clock,
String rowId) {
super(offset, table, clock);
this.operation = operation;
this.oldValues = oldValues;
this.newValues = newValues;
this.rowId = rowId;
}
@Override
protected Operation getOperation() {
switch (operation) {
case RowMapper.INSERT:
return Operation.CREATE;
case RowMapper.UPDATE:
case RowMapper.SELECT_LOB_LOCATOR:
return Operation.UPDATE;
case RowMapper.DELETE:
return Operation.DELETE;
default:
throw new DebeziumException("Unsupported operation type: " + operation);
}
}
@Override
public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver)
throws InterruptedException {
TableSchema tableSchema = (TableSchema) schema;
Operation operation = getOperation();
switch (operation) {
case CREATE:
emitCreateRecord(receiver, tableSchema);
break;
case READ:
emitReadRecord(receiver, tableSchema);
break;
case UPDATE:
emitUpdateRecord(receiver, tableSchema);
break;
case DELETE:
emitDeleteRecord(receiver, tableSchema);
break;
case TRUNCATE:
emitTruncateRecord(receiver, tableSchema);
break;
default:
throw new IllegalArgumentException("Unsupported operation: " + operation);
}
}
@Override
protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope =
tableSchema
.getEnvelopeSchema()
.create(
newValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
ConnectHeaders headers = new ConnectHeaders();
headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));
if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
// This case can be hit on UPDATE / DELETE when there's no primary key defined while
// using certain decoders
LOGGER.warn(
"no new values found for table '{}' from create message at '{}'; skipping record",
tableSchema,
getOffset().getSourceInfo());
return;
}
receiver.changeRecord(
tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);
}
@Override
protected void emitReadRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope =
tableSchema
.getEnvelopeSchema()
.read(
newValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
ConnectHeaders headers = new ConnectHeaders();
headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));
receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset(), headers);
}
@Override
protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();
Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
LOGGER.warn(
"no new values found for table '{}' from update message at '{}'; skipping record",
tableSchema,
getOffset().getSourceInfo());
return;
}
// some configurations does not provide old values in case of updates
// in this case we handle all updates as regular ones
if (oldKey == null || Objects.equals(oldKey, newKey)) {
Struct envelope =
tableSchema
.getEnvelopeSchema()
.update(
oldValue,
newValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
ConnectHeaders headers = new ConnectHeaders();
headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));
receiver.changeRecord(
tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), headers);
}
// PK update -> emit as delete and re-insert with new key
else {
ConnectHeaders headers = new ConnectHeaders();
headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema());
Struct envelope =
tableSchema
.getEnvelopeSchema()
.delete(
oldValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
receiver.changeRecord(
tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
headers = new ConnectHeaders();
headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());
headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));
envelope =
tableSchema
.getEnvelopeSchema()
.create(
newValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
receiver.changeRecord(
tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);
}
}
@Override
protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
ConnectHeaders headers = new ConnectHeaders();
headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));
if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) {
LOGGER.warn(
"no old values found for table '{}' from delete message at '{}'; skipping record",
tableSchema,
getOffset().getSourceInfo());
return;
}
Struct envelope =
tableSchema
.getEnvelopeSchema()
.delete(
oldValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
receiver.changeRecord(
tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
}
@Override
protected Object[] getOldColumnValues() {
return oldValues;
}
@Override
protected Object[] getNewColumnValues() {
return newValues;
}
}

@ -410,7 +410,8 @@ public final class TransactionalBuffer implements AutoCloseable {
event.getEntry().getOldValues(),
event.getEntry().getNewValues(),
schema.tableFor(event.getTableId()),
clock));
clock,
event.rowId));
}
lastCommittedScn = Scn.valueOf(scn.longValue());

@ -116,7 +116,6 @@ public class OracleChangeEventSourceExampleTest {
.setParallelism(DEFAULT_PARALLELISM)
.print()
.setParallelism(1);
env.execute("Print Oracle Snapshot + RedoLog");
}
}

@ -26,7 +26,6 @@ import org.apache.flink.test.util.AbstractTestBase;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -103,7 +102,6 @@ public class OracleConnectorITCase extends AbstractTestBase {
}
@Test
@Ignore("It can be open until issue 1875 fix")
public void testConsumingAllEvents()
throws SQLException, ExecutionException, InterruptedException {
String sourceDDL =
@ -152,11 +150,11 @@ public class OracleConnectorITCase extends AbstractTestBase {
tEnv.executeSql(
"INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
waitForSnapshotStarted("sink");
// There are 9 records in the table, wait until the snapshot phase finished
waitForSinkSize("sink", 9);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
statement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
@ -214,6 +212,9 @@ public class OracleConnectorITCase extends AbstractTestBase {
@Test
public void testConsumingAllEventsByChunkKeyColumn()
throws SQLException, ExecutionException, InterruptedException {
if (!parallelismSnapshot) {
return;
}
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -261,7 +262,7 @@ public class OracleConnectorITCase extends AbstractTestBase {
tEnv.executeSql(
"INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
waitForSnapshotStarted("sink");
waitForSinkSize("sink", 9);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
@ -400,7 +401,6 @@ public class OracleConnectorITCase extends AbstractTestBase {
@Test
public void testStartupFromLatestOffset() throws Exception {
// database.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -441,7 +441,7 @@ public class OracleConnectorITCase extends AbstractTestBase {
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
Thread.sleep(5000L);
Thread.sleep(10000L);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {

Loading…
Cancel
Save