[oracle][minor] Clean up useless methods and classes

pull/1590/merge
Leonard Xu 2 years ago committed by Leonard Xu
parent 6fe4fc93e1
commit df4b87f79e

@ -32,12 +32,6 @@ public class JdbcSourceOptions extends SourceOptions {
.noDefaultValue()
.withDescription("IP address or hostname of the database server.");
public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(3306)
.withDescription("Integer port number of the database server.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
@ -57,6 +51,12 @@ public class JdbcSourceOptions extends SourceOptions {
.noDefaultValue()
.withDescription("Database name of the database to monitor.");
public static final ConfigOption<String> SCHEMA_NAME =
ConfigOptions.key("schema-name")
.stringType()
.noDefaultValue()
.withDescription("Schema name of the database to monitor.");
public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()

@ -16,7 +16,7 @@
package com.ververica.cdc.connectors.oracle;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.internal.DebeziumOffset;

@ -133,7 +133,7 @@ public class OracleDialect implements JdbcDataSourceDialect {
@Override
public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (oracleSchema == null) {
oracleSchema = new OracleSchema(sourceConfig);
oracleSchema = new OracleSchema();
}
return oracleSchema.getTableSchema(jdbc, tableId);
}

@ -18,10 +18,10 @@ package com.ververica.cdc.connectors.oracle.source;
import org.apache.flink.annotation.Internal;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import javax.annotation.Nullable;
@ -184,17 +184,7 @@ public class OracleSourceBuilder<T> {
/** Specifies the startup options. */
public OracleSourceBuilder<T> startupOptions(StartupOptions startupOptions) {
com.ververica.cdc.connectors.base.options.StartupOptions baseStartupOptions = null;
switch (startupOptions.startupMode) {
case INITIAL:
baseStartupOptions =
com.ververica.cdc.connectors.base.options.StartupOptions.initial();
break;
case LATEST_OFFSET:
baseStartupOptions =
com.ververica.cdc.connectors.base.options.StartupOptions.latest();
}
this.configFactory.startupOptions(baseStartupOptions);
this.configFactory.startupOptions(startupOptions);
return this;
}

@ -37,12 +37,10 @@ public class OracleSourceConfig extends JdbcSourceConfig {
private static final long serialVersionUID = 1L;
@Nullable private String url;
private List<String> schemaList;
public OracleSourceConfig(
StartupOptions startupOptions,
List<String> databaseList,
List<String> schemaList,
List<String> tableList,
int splitSize,
int splitMetaGroupSize,
@ -84,7 +82,6 @@ public class OracleSourceConfig extends JdbcSourceConfig {
connectMaxRetries,
connectionPoolSize);
this.url = url;
this.schemaList = schemaList;
}
@Override
@ -104,8 +101,4 @@ public class OracleSourceConfig extends JdbcSourceConfig {
public String getUrl() {
return url;
}
public List<String> getSchemaList() {
return schemaList;
}
}

@ -107,7 +107,6 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory {
return new OracleSourceConfig(
startupOptions,
databaseList,
schemaList,
tableList,
splitSize,
splitMetaGroupSize,

@ -31,12 +31,6 @@ public class OracleSourceOptions extends JdbcSourceOptions {
.noDefaultValue()
.withDescription("Url to use when connecting to the Oracle database server.");
public static final ConfigOption<String> SCHEMA_NAME =
ConfigOptions.key("schema-name")
.stringType()
.noDefaultValue()
.withDescription("Schema name of the Oracle database to monitor.");
public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()

@ -16,8 +16,6 @@
package com.ververica.cdc.connectors.oracle.source.meta.offset;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
@ -26,6 +24,8 @@ import java.util.Map;
/** An offset factory class create {@link RedoLogOffset} instance. */
public class RedoLogOffsetFactory extends OffsetFactory {
private static final long serialVersionUID = 1L;
public RedoLogOffsetFactory() {}
@Override
@ -35,13 +35,14 @@ public class RedoLogOffsetFactory extends OffsetFactory {
@Override
public Offset newOffset(String filename, Long position) {
throw new FlinkRuntimeException(
"not supported create new Offset by filename and position.");
throw new UnsupportedOperationException(
"Do not support to create RedoLogOffset by filename and position.");
}
@Override
public Offset newOffset(Long position) {
throw new FlinkRuntimeException("not supported create new Offset by Long position.");
throw new UnsupportedOperationException(
"Do not support to create RedoLogOffset by position.");
}
@Override

@ -1,43 +0,0 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.meta.offset;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
/** Serializer implementation for a {@link RedoLogOffset}. */
@Internal
public class RedoLogOffsetSerializer {
public static final RedoLogOffsetSerializer INSTANCE = new RedoLogOffsetSerializer();
public byte[] serialize(RedoLogOffset redoLogOffset) throws IOException {
// use JSON serialization
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsBytes(redoLogOffset.getOffset());
}
public RedoLogOffset deserialize(byte[] bytes) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> offset = objectMapper.readValue(bytes, Map.class);
return new RedoLogOffset(offset);
}
}

@ -172,7 +172,6 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
// task to read binlog and backfill for current split
return new RedoLogSplitReadTask(
new OracleConnectorConfig(dezConf),
oracleOffsetContext,
createOracleConnection(context.getSourceConfig().getDbzConfiguration()),
context.getDispatcher(),
context.getErrorHandler(),

@ -118,7 +118,7 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
.loggingContextSupplier(
() ->
taskContext.configureLoggingContext(
"mysql-cdc-connector-task"))
"oracle-cdc-connector-task"))
// do not buffer any element, we use signal event
// .buffering()
.build();
@ -156,10 +156,6 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
return connection;
}
public OracleTaskContext getTaskContext() {
return taskContext;
}
@Override
public OracleConnectorConfig getDbzConnectorConfig() {
return (OracleConnectorConfig) super.getDbzConnectorConfig();

@ -59,7 +59,6 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
redoLogSplitReadTask =
new RedoLogSplitReadTask(
sourceFetchContext.getDbzConnectorConfig(),
sourceFetchContext.getOffsetContext(),
sourceFetchContext.getConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getErrorHandler(),
@ -91,14 +90,12 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(RedoLogSplitReadTask.class);
private final StreamSplit redoLogSplit;
private final OracleOffsetContext offsetContext;
private final JdbcSourceEventDispatcher dispatcher;
private final ErrorHandler errorHandler;
private ChangeEventSourceContext context;
public RedoLogSplitReadTask(
OracleConnectorConfig connectorConfig,
OracleOffsetContext offsetContext,
OracleConnection connection,
JdbcSourceEventDispatcher dispatcher,
ErrorHandler errorHandler,
@ -117,7 +114,6 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
metrics);
this.redoLogSplit = redoLogSplit;
this.dispatcher = dispatcher;
this.offsetContext = offsetContext;
this.errorHandler = errorHandler;
}

@ -21,21 +21,14 @@ import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.Scn;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import oracle.jdbc.OracleTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -123,56 +116,4 @@ public class OracleConnectionUtils {
return capturedTableIds;
}
/**
* overwrite table catalog in database schema.
*
* @param databaseSchema
* @param connectorConfig
*/
public static void overwriteCatalog(
OracleDatabaseSchema databaseSchema, OracleConnectorConfig connectorConfig) {
Tables tables = databaseSchema.getTables();
Set<TableId> tableIds = databaseSchema.tableIds();
Tables.TableFilter tableFilter = connectorConfig.getTableFilters().dataCollectionFilter();
for (TableId tableId : tableIds) {
TableId tableIdWithCatalog =
new TableId(
connectorConfig.getCatalogName(), tableId.schema(), tableId.table());
if (tableFilter.isIncluded(tableIdWithCatalog)) {
overrideOracleSpecificColumnTypes(tables, tableId, tableIdWithCatalog);
databaseSchema.refresh(tables.forTable(tableIdWithCatalog));
}
}
}
private static void overrideOracleSpecificColumnTypes(
Tables tables, TableId tableId, TableId tableIdWithCatalog) {
TableEditor editor = tables.editTable(tableId);
editor.tableId(tableIdWithCatalog);
List<String> columnNames = new ArrayList<>(editor.columnNames());
for (String columnName : columnNames) {
Column column = editor.columnWithName(columnName);
if (column.jdbcType() == Types.TIMESTAMP) {
editor.addColumn(
column.edit()
.length(column.scale().orElse(Column.UNSET_INT_VALUE))
.scale(null)
.create());
}
// NUMBER columns without scale value have it set to -127 instead of null;
// let's rectify that
else if (column.jdbcType() == OracleTypes.NUMBER) {
column.scale()
.filter(s -> s == ORACLE_UNSET_SCALE)
.ifPresent(
s -> {
editor.addColumn(column.edit().scale(null).create());
});
}
}
tables.overwriteTable(editor.create());
}
}

@ -18,10 +18,7 @@ package com.ververica.cdc.connectors.oracle.source.utils;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@ -35,18 +32,12 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static com.ververica.cdc.connectors.oracle.source.utils.OracleUtils.createOracleDatabaseSchema;
/** A component used to get schema by table path. */
public class OracleSchema {
private final OracleConnectorConfig connectorConfig;
private final OracleDatabaseSchema databaseSchema;
private final Map<TableId, TableChange> schemasByTableId;
public OracleSchema(OracleSourceConfig sourceConfig) {
this.connectorConfig = sourceConfig.getDbzConnectorConfig();
this.databaseSchema = createOracleDatabaseSchema(connectorConfig);
public OracleSchema() {
this.schemasByTableId = new HashMap<>();
}

@ -29,6 +29,7 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;

@ -24,6 +24,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import java.util.HashSet;

@ -1,27 +0,0 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.table;
/**
* Startup modes for the Oracle CDC Consumer.
*
* @see StartupOptions
*/
public enum StartupMode {
INITIAL,
LATEST_OFFSET,
}

@ -1,71 +0,0 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.table;
import java.util.Objects;
/** Debezium startup options. */
public final class StartupOptions {
public final StartupMode startupMode;
/**
* Performs an initial snapshot on the monitored database tables upon first startup, and
* continue to read the latest logminer.
*/
public static StartupOptions initial() {
return new StartupOptions(StartupMode.INITIAL);
}
/**
* Never to perform snapshot on the monitored database tables upon first startup, just read from
* the end of the logminer which means only have the changes since the connector was started.
*/
public static StartupOptions latest() {
return new StartupOptions(StartupMode.LATEST_OFFSET);
}
private StartupOptions(StartupMode startupMode) {
this.startupMode = startupMode;
switch (startupMode) {
case INITIAL:
case LATEST_OFFSET:
break;
default:
throw new UnsupportedOperationException(startupMode + " mode is not supported.");
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StartupOptions that = (StartupOptions) o;
return startupMode == that.startupMode;
}
@Override
public int hashCode() {
return Objects.hash(startupMode);
}
}

@ -23,6 +23,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
@ -99,6 +100,7 @@ public class OracleChangeEventSourceExampleTest {
.password(oracleContainer.getPassword())
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(3)
.build();

@ -86,7 +86,12 @@ public class OracleConnectorITCase extends AbstractTestBase {
TestValuesTableFactory.clearAllData();
env.setParallelism(1);
if (parallelismSnapshot) {
env.setParallelism(4);
env.enableCheckpointing(200);
} else {
env.setParallelism(1);
}
}
@After

@ -33,6 +33,7 @@ import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.base.options.JdbcSourceOptions;
import com.ververica.cdc.connectors.base.options.SourceOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import org.junit.Test;
import java.util.ArrayList;

Loading…
Cancel
Save