[cdc-base][oracle] Improve the base methods base on Oracle specific requirements

pull/1590/merge
molsion 2 years ago committed by Leonard Xu
parent 4d9c0e41e1
commit f6c34cd616

@ -16,7 +16,7 @@
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source;
package com.ververica.cdc.connectors.base.source;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
import io.debezium.config.Configuration;

@ -93,19 +93,6 @@ public class IncrementalSource<T, C extends SourceConfig>
};
}
public JdbcIncrementalSource(
JdbcSourceConfigFactory configFactory,
DebeziumDeserializationSchema<T> deserializationSchema,
OffsetFactory offsetFactory,
JdbcDataSourceDialect dataSourceDialect,
SourceSplitSerializer sourceSplitSerializer) {
this.configFactory = configFactory;
this.deserializationSchema = deserializationSchema;
this.offsetFactory = offsetFactory;
this.dataSourceDialect = dataSourceDialect;
this.sourceSplitSerializer = sourceSplitSerializer;
}
@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;

@ -28,6 +28,7 @@ import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.SchemaNameAdjuster;
@ -169,4 +170,6 @@ public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
public abstract JdbcSourceEventDispatcher getDispatcher();
public abstract OffsetContext getOffsetContext();
public abstract RelationalTableFilters getRelationalTableFilters();
}

@ -94,7 +94,7 @@ public class OracleDialect implements JdbcDataSourceDialect {
}
@Override
public ChunkSplitter<TableId> createChunkSplitter(JdbcSourceConfig sourceConfig) {
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new OracleChunkSplitter(sourceConfig, this);
}

@ -21,7 +21,7 @@ package com.ververica.cdc.connectors.oracle.source;
import org.apache.flink.annotation.Internal;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;

@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.oracle.source.config;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfigFactory;
import com.ververica.cdc.connectors.oracle.source.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnector;

@ -19,10 +19,10 @@
package com.ververica.cdc.connectors.oracle.source.reader.fetch;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.WatermarkKind;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.DebeziumException;

@ -23,10 +23,10 @@ import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import com.ververica.cdc.connectors.oracle.source.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import com.ververica.cdc.connectors.oracle.source.utils.OracleUtils;
@ -51,6 +51,7 @@ import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
@ -205,6 +206,11 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
return queue;
}
@Override
public Tables.TableFilter getTableFilter() {
return getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
}
@Override
public Offset getStreamOffset(SourceRecord sourceRecord) {
return OracleUtils.getRedoLogPosition(sourceRecord);

@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.oracle.source.reader.fetch;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.DebeziumException;
@ -143,7 +144,7 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
offsetContext.getPartition(),
redoLogSplit,
currentRedoLogOffset,
JdbcSourceEventDispatcher.WatermarkKind.END);
WatermarkKind.END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(

@ -53,6 +53,9 @@ public class OracleConnectionUtils {
/** Returned by column metadata in Oracle if no scale is set. */
private static final int ORACLE_UNSET_SCALE = -127;
/** show current scn sql in oracle. */
private static final String SHOW_CURRENT_SCN = "SELECT CURRENT_SCN FROM V$DATABASE";
/** Creates a new {@link OracleConnection}, but not open the connection. */
public static OracleConnection createOracleConnection(Configuration dbzConfiguration) {
Configuration configuration = dbzConfiguration.subset(DATABASE_CONFIG_PREFIX, true);
@ -63,10 +66,9 @@ public class OracleConnectionUtils {
/** Fetch current redoLog offsets in Oracle Server. */
public static RedoLogOffset currentRedoLogOffset(JdbcConnection jdbc) {
final String showCurrentScn = "SELECT CURRENT_SCN FROM V$DATABASE";
try {
return jdbc.queryAndMap(
showCurrentScn,
SHOW_CURRENT_SCN,
rs -> {
if (rs.next()) {
final String scn = rs.getString(1);
@ -74,14 +76,14 @@ public class OracleConnectionUtils {
} else {
throw new FlinkRuntimeException(
"Cannot read the scn via '"
+ showCurrentScn
+ SHOW_CURRENT_SCN
+ "'. Make sure your server is correctly configured");
}
});
} catch (SQLException e) {
throw new FlinkRuntimeException(
"Cannot read the redo log position via '"
+ showCurrentScn
+ SHOW_CURRENT_SCN
+ "'. Make sure your server is correctly configured",
e);
}

@ -29,7 +29,7 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;

@ -25,7 +25,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

Loading…
Cancel
Save