[oracle] Support custom JDBC URL for Oracle (#1258)

This closes #1244.
pull/1461/head
dujie 3 years ago committed by Leonard Xu
parent 45fd69692f
commit 0292a39a3c

@ -240,10 +240,10 @@ Connector Options
</tr>
<tr>
<td>hostname</td>
<td>required</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>IP address or hostname of the Oracle database server.</td>
<td>IP address or hostname of the Oracle database server. If the url is not empty, hostname may not be configured, otherwise hostname can not be empty</td>
</tr>
<tr>
<td>username</td>
@ -287,6 +287,13 @@ Connector Options
<td>Integer</td>
<td>Integer port number of the Oracle database server.</td>
</tr>
<tr>
<td>url</td>
<td>optional</td>
<td style="word-wrap: break-word;">jdbc:oracle:thin:@{hostname}:{port}:{database-name}</td>
<td>String</td>
<td>JdbcUrl of the oracle database server . If the hostname and port parameter is configured, the URL is concatenated by hostname port database-name in SID format by default. Otherwise, you need to configure the URL parameter</td>
</tr>
<tr>
<td>scan.startup.mode</td>
<td>optional</td>
@ -419,7 +426,7 @@ import com.ververica.cdc.connectors.oracle.OracleSource;
public class OracleSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.hostname()
.url("jdbc:oracle:thin:@{hostname}:{port}:{database}")
.port(1521)
.database("XE") // monitor XE database
.schemaList("inventory") // monitor inventory schema

@ -22,6 +22,8 @@ import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
import io.debezium.connector.oracle.OracleConnector;
import javax.annotation.Nullable;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -40,28 +42,35 @@ public class OracleSource {
/** Builder class of {@link OracleSource}. */
public static class Builder<T> {
private int port = 1521; // default 1521 port
private Integer port = 1521; // default 1521 port
private String hostname;
private String database;
private String username;
private String password;
private String url;
private String[] tableList;
private String[] schemaList;
private Properties dbzProperties;
private StartupOptions startupOptions = StartupOptions.initial();
private DebeziumDeserializationSchema<T> deserializer;
public Builder<T> hostname(String hostname) {
public Builder<T> hostname(@Nullable String hostname) {
this.hostname = hostname;
return this;
}
/** Integer port number of the Oracle database server. */
public Builder<T> port(int port) {
public Builder<T> port(@Nullable Integer port) {
this.port = port;
return this;
}
/** Url to use when connecting to the Oracle database server. */
public Builder<T> url(@Nullable String url) {
this.url = url;
return this;
}
/**
* An optional list of regular expressions that match database names to be monitored; any
* database name not included in the whitelist will be excluded from monitoring. By default
@ -137,10 +146,18 @@ public class OracleSource {
// and
// underscores should be used.
props.setProperty("database.server.name", DATABASE_SERVER_NAME);
props.setProperty("database.hostname", checkNotNull(hostname));
props.setProperty("database.user", checkNotNull(username));
props.setProperty("database.password", checkNotNull(password));
props.setProperty("database.port", String.valueOf(port));
if (url != null) {
props.setProperty("database.url", url);
}
if (hostname != null) {
props.setProperty("database.hostname", hostname);
}
if (port != null) {
props.setProperty("database.port", String.valueOf(port));
}
props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
props.setProperty("database.dbname", checkNotNull(database));
if (schemaList != null) {
@ -168,6 +185,13 @@ public class OracleSource {
props.putAll(dbzProperties);
}
if (url == null) {
checkNotNull(hostname, "hostname is required when url is not configured");
props.setProperty("database.hostname", hostname);
checkNotNull(port, "port is required when url is not configured");
props.setProperty("database.port", String.valueOf(port));
}
return new DebeziumSourceFunction<>(
deserializer, props, specificOffset, new OracleValidator(props));
}

@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.oracle;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import com.ververica.cdc.connectors.oracle.util.JdbcConfigurationUtil;
import com.ververica.cdc.debezium.Validator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,12 +64,9 @@ public class OracleValidator implements Validator {
public static Connection openConnection(Properties properties) throws SQLException {
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
String hostname = properties.getProperty("database.hostname");
String port = properties.getProperty("database.port");
String dbname = properties.getProperty("database.dbname");
String url = JdbcConfigurationUtil.getConnectionUrlWithSid(properties);
String userName = properties.getProperty("database.user");
String userpwd = properties.getProperty("database.password");
return DriverManager.getConnection(
"jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);
return DriverManager.getConnection(url, userName, userpwd);
}
}

@ -34,6 +34,8 @@ import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -51,7 +53,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {
private final ResolvedSchema physicalSchema;
private final int port;
private final String url;
private final Integer port;
private final String hostname;
private final String database;
private final String username;
@ -73,8 +76,9 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
public OracleTableSource(
ResolvedSchema physicalSchema,
int port,
String hostname,
@Nullable String url,
@Nullable Integer port,
@Nullable String hostname,
String database,
String tableName,
String schemaName,
@ -83,8 +87,9 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
Properties dbzProperties,
StartupOptions startupOptions) {
this.physicalSchema = physicalSchema;
this.url = url;
this.port = port;
this.hostname = checkNotNull(hostname);
this.hostname = hostname;
this.database = checkNotNull(database);
this.tableName = checkNotNull(tableName);
this.schemaName = checkNotNull(schemaName);
@ -123,6 +128,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
.build();
OracleSource.Builder<RowData> builder =
OracleSource.<RowData>builder()
.url(url)
.hostname(hostname)
.port(port)
.database(database)
@ -159,6 +165,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
OracleTableSource source =
new OracleTableSource(
physicalSchema,
url,
port,
hostname,
database,
@ -182,8 +189,9 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
return false;
}
OracleTableSource that = (OracleTableSource) o;
return port == that.port
return Objects.equals(port, that.port)
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(url, that.url)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
&& Objects.equals(username, that.username)
@ -200,6 +208,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
public int hashCode() {
return Objects.hash(
physicalSchema,
url,
port,
hostname,
database,

@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.Set;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Factory for creating configured instance of {@link
@ -52,6 +53,13 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
.defaultValue(1521)
.withDescription("Integer port number of the Oracle database server.");
private static final ConfigOption<String> URL =
ConfigOptions.key("url")
.stringType()
.noDefaultValue()
.withDescription(
"Complete JDBC URL as an alternative to specifying hostname, port and database provided as a way to support alternative connection scenarios.");
private static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
@ -105,12 +113,17 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
String schemaName = config.get(SCHEMA_NAME);
int port = config.get(PORT);
String url = config.get(URL);
Integer port = config.get(PORT);
StartupOptions startupOptions = getStartupOptions(config);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
if (url == null) {
checkNotNull(hostname, "hostname is required when url is not configured");
checkNotNull(port, "port is required when url is not configured");
}
return new OracleTableSource(
physicalSchema,
url,
port,
hostname,
databaseName,
@ -130,7 +143,6 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(USERNAME);
options.add(PASSWORD);
options.add(DATABASE_NAME);
@ -142,7 +154,9 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(PORT);
options.add(URL);
options.add(SCAN_STARTUP_MODE);
return options;

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.util;
import java.util.Properties;
/** Get the JDBC configuration utility class according to debezium properties. */
public class JdbcConfigurationUtil {
/**
* Gets the URL in SID format.
*
* @param properties
* @return jdbcUrl in SID format.
*/
public static String getConnectionUrlWithSid(Properties properties) {
String url;
if (properties.containsKey("database.url")) {
url = properties.getProperty("database.url");
} else {
String hostname = properties.getProperty("database.hostname");
String port = properties.getProperty("database.port");
String dbname = properties.getProperty("database.dbname");
url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname;
}
return url;
}
}

@ -89,15 +89,56 @@ public class OracleTableSourceFactoryTest {
private static final String MY_SCHEMA = "mySchema";
private static final Properties PROPERTIES = new Properties();
@Test
public void testRequiredProperties() {
try {
Map<String, String> properties = getAllRequiredOptions();
// validation for source
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t, "hostname is required when url is not configured")
.isPresent());
}
}
@Test
public void testRequiredPropertiesWithUrl() {
String url = "jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE;
Map<String, String> properties = getAllRequiredOptions();
properties.put("url", url);
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
url,
1521,
null,
MY_DATABASE,
MY_TABLE,
MY_SCHEMA,
MY_USERNAME,
MY_PASSWORD,
PROPERTIES,
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@Test
public void testCommonProperties() {
Map<String, String> properties = getAllOptions();
Map<String, String> properties = getAllRequiredOptionsWithHost();
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
null,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -112,8 +153,10 @@ public class OracleTableSourceFactoryTest {
@Test
public void testOptionalProperties() {
Map<String, String> options = getAllOptions();
Map<String, String> options = getAllRequiredOptions();
options.put("port", "1521");
options.put("hostname", MY_LOCALHOST);
options.put("url", "jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE);
options.put("debezium.snapshot.mode", "initial");
DynamicTableSource actualSource = createTableSource(options);
@ -122,6 +165,7 @@ public class OracleTableSourceFactoryTest {
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
"jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -136,7 +180,7 @@ public class OracleTableSourceFactoryTest {
@Test
public void testStartupFromInitial() {
Map<String, String> properties = getAllOptions();
Map<String, String> properties = getAllRequiredOptionsWithHost();
properties.put("scan.startup.mode", "initial");
// validation for source
@ -144,6 +188,7 @@ public class OracleTableSourceFactoryTest {
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
null,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -158,7 +203,7 @@ public class OracleTableSourceFactoryTest {
@Test
public void testStartupFromLatestOffset() {
Map<String, String> properties = getAllOptions();
Map<String, String> properties = getAllRequiredOptionsWithHost();
properties.put("scan.startup.mode", "latest-offset");
// validation for source
@ -166,6 +211,7 @@ public class OracleTableSourceFactoryTest {
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
null,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -180,7 +226,7 @@ public class OracleTableSourceFactoryTest {
@Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
Map<String, String> properties = getAllRequiredOptionsWithHost();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
@ -192,6 +238,7 @@ public class OracleTableSourceFactoryTest {
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA_WITH_METADATA,
null,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -219,7 +266,7 @@ public class OracleTableSourceFactoryTest {
public void testValidation() {
// validate illegal port
try {
Map<String, String> properties = getAllOptions();
Map<String, String> properties = getAllRequiredOptionsWithHost();
properties.put("port", "123b");
createTableSource(properties);
@ -234,7 +281,7 @@ public class OracleTableSourceFactoryTest {
// validate missing required
Factory factory = new OracleTableSourceFactory();
for (ConfigOption<?> requiredOption : factory.requiredOptions()) {
Map<String, String> properties = getAllOptions();
Map<String, String> properties = getAllRequiredOptionsWithHost();
properties.remove(requiredOption.key());
try {
@ -251,7 +298,7 @@ public class OracleTableSourceFactoryTest {
// validate unsupported option
try {
Map<String, String> properties = getAllOptions();
Map<String, String> properties = getAllRequiredOptionsWithHost();
properties.put("unknown", "abc");
createTableSource(properties);
@ -264,7 +311,7 @@ public class OracleTableSourceFactoryTest {
// validate unsupported option
try {
Map<String, String> properties = getAllOptions();
Map<String, String> properties = getAllRequiredOptionsWithHost();
properties.put("scan.startup.mode", "abc");
createTableSource(properties);
@ -279,10 +326,9 @@ public class OracleTableSourceFactoryTest {
}
}
private Map<String, String> getAllOptions() {
private Map<String, String> getAllRequiredOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "oracle-cdc");
options.put("hostname", MY_LOCALHOST);
options.put("database-name", MY_DATABASE);
options.put("table-name", MY_TABLE);
options.put("username", MY_USERNAME);
@ -291,6 +337,12 @@ public class OracleTableSourceFactoryTest {
return options;
}
private Map<String, String> getAllRequiredOptionsWithHost() {
Map<String, String> options = getAllRequiredOptions();
options.put("hostname", MY_LOCALHOST);
return options;
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return createTableSource(SCHEMA, options);
}

Loading…
Cancel
Save