[vitess] Migrate connector properties to debezium 1.9.7 (#2260)

* feat[vitess]: Update connector properties for debezium-1.9.7

* fix: Address review comments

* fix: remove useless &ldquo and &rdquo

* fix: remove useless &nbsp
pull/1431/head
Simonas Gelazevicius 2 years ago committed by GitHub
parent 8f25ebbb7c
commit 2cf3723354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -29,7 +29,6 @@ You can follow the Local Install via [Docker guide](https://vitess.io/docs/get-s
### Checklist
* Make sure that the VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed
* Make sure that the VTCtld host and its gRPC port (default is 15999) is accessible from the machine where the Vitess connector is installed
### gRPC authentication
Because Vitess connector reads change events from the VTGate VStream gRPC server, it does not need to connect directly to MySQL instances.
@ -58,7 +57,6 @@ Flink SQL> CREATE TABLE orders (
'hostname' = 'localhost',
'port' = '3306',
'keyspace' = 'mydb',
'vtctl.hostname' = 'localhost',
'table-name' = 'orders');
-- read snapshot and binlogs from orders table
@ -69,15 +67,15 @@ Connector Options
----------------
<div class="&ldquo;highlight&rdquo;">
<table class="&ldquo;colwidths-auto">
<div class="highlight">
<table class="colwidths-auto">
<thead>
<tr>
<th class="&ldquo;text-left&rdquo;">Option</th>
<th class="&ldquo;text-left&rdquo;">Required</th>
<th class="&ldquo;text-left&rdquo;">Default</th>
<th class="&ldquo;text-left&rdquo;">Type</th>
<th class="&ldquo;text-left&rdquo;">Description</th>
<th class="text-left">Option</th>
<th class="text-left">Required</th>
<th class="text-left">Default</th>
<th class="text-left">Type</th>
<th class="text-left">Description</th>
</tr>
</thead>
<tbody>
@ -98,8 +96,8 @@ Connector Options
<tr>
<td>keyspace</td>
<td>required</td>
<td>(none)&nbsp;</td>
<td>&nbsp;String</td>
<td>(none)</td>
<td>String</td>
<td>The name of the keyspace from which to stream the changes.</td>
</tr>
<tr>
@ -117,52 +115,59 @@ Connector Options
<td>An optional password of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.</td>
</tr>
<tr>
<td>table-name</td>
<td>required</td>
<td>shard</td>
<td>optional</td>
<td>(none)</td>
<td>String</td>
<td>Table name of the MySQL database to monitor.</td>
<td>An optional name of the shard from which to stream the changes. If not configured, in case of unsharded keyspace, the connector streams changes from the only shard, in case of sharded keyspace, the connector streams changes from all shards in the keyspace.</td>
</tr>
<tr>
<td>port</td>
<td>gtid</td>
<td>optional</td>
<td>15991</td>
<td>Integer</td>
<td>Integer port number of the VTCtld server.</td>
<td>current</td>
<td>String</td>
<td>An optional GTID position for a shard to stream from.</td>
</tr>
<tr>
<td>vtctld.host</td>
<td>required</td>
<td>(none)</td>
<td>String&nbsp;</td>
<td>IP address or hostname of the VTCtld server.</td>
<td>stopOnReshard</td>
<td>optional</td>
<td>false</td>
<td>Boolean</td>
<td>Controls Vitess flag stop_on_reshard.</td>
</tr>
<tr>
<td>vtctld.port</td>
<td>tombstonesOnDelete</td>
<td>optional</td>
<td>15999</td>
<td>Integer&nbsp;</td>
<td>Integer port number of the VTCtld server.</td>
<td>true</td>
<td>Boolean</td>
<td>Controls whether a delete event is followed by a tombstone event.</td>
</tr>
<tr>
<td>vtctld.user</td>
<td>tombstonesOnDelete</td>
<td>optional</td>
<td>(none)&nbsp;</td>
<td>String&nbsp;</td>
<td>An optional username of the VTCtld server. If not configured, unauthenticated VTCtld gRPC is used.</td>
<td>true</td>
<td>Boolean</td>
<td>Controls whether a delete event is followed by a tombstone event.</td>
</tr>
<tr>
<td>vtctld.password</td>
<td>schemaNameAdjustmentMode</td>
<td>optional</td>
<td>(none)&nbsp;</td>
<td>String&nbsp;</td>
<td>An optional password of the VTCtld server. If not configured, unauthenticated VTCtld gRPC is used.</td>
<td>avro</td>
<td>String</td>
<td>Specifies how schema names should be adjusted for compatibility with the message converter used by the connector.</td>
</tr>
<tr>
<td>table-name</td>
<td>required</td>
<td>(none)</td>
<td>String</td>
<td>Table name of the MySQL database to monitor.</td>
</tr>
<tr>
<td>tablet.type</td>
<td>optional</td>
<td>RDONLY&nbsp;</td>
<td>String&nbsp;</td>
<td>RDONLY</td>
<td>String</td>
<td>The type of Tablet (hence MySQL) from which to stream the changes: MASTER represents streaming from the master MySQL instance REPLICA represents streaming from the replica slave MySQL instance RDONLY represents streaming from the read-only slave MySQL instance.</td>
</tr>
</tbody>
@ -180,7 +185,7 @@ The VGTID in Vitess is the equivalent of GTID in MySQL, it describes the positio
When subscribing to a VStream service, the connector needs to provide a VGTID and a Tablet Type (e.g. MASTER, REPLICA). The VGTID describes the position from which VStream should starts sending change events; the Tablet type describes which underlying MySQL instance (master or replica) in each shard do we read change events from.
The first time the connector connects to a Vitess cluster, it gets the current VGTID from a Vitess component called VTCtld and provides the current VGTID to VStream.
The first time the connector connects to a Vitess cluster, it gets and provides the current VGTID to VStream.
The Debezium Vitess connector acts as a gRPC client of VStream. When the connector receives changes it transforms the events into Debezium create, update, or delete events that include the VGTID of the event. The Vitess connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.
@ -211,11 +216,6 @@ public class VitessSourceExample {
.keyspace("inventory")
.username("flinkuser")
.password("flinkpw")
.vtctldConfig(VtctldConfig
.builder()
.hostname("localhost")
.port(15999)
.build())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();

@ -45,7 +45,6 @@ import java.util.stream.Collectors;
import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.GRPC_PORT;
import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.MYSQL_PORT;
import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.VTCTLD_GRPC_PORT;
import static org.junit.Assert.assertNotNull;
/** End-to-end test for Vitess CDC connector. */
@ -63,7 +62,7 @@ public class VitessE2eITCase extends FlinkContainerTestEnvironment {
.withKeyspace("test")
.withUsername("flinkuser")
.withPassword("flinkpwd")
.withExposedPorts(MYSQL_PORT, GRPC_PORT, VTCTLD_GRPC_PORT)
.withExposedPorts(MYSQL_PORT, GRPC_PORT)
.withLogConsumer(new Slf4jLogConsumer(LOG))
.withNetwork(NETWORK)
.withNetworkAliases(VITESS_CONTAINER_NETWORK_ALIAS);
@ -98,15 +97,11 @@ public class VitessE2eITCase extends FlinkContainerTestEnvironment {
+ " 'tablet-type' = 'MASTER',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'vtctl.hostname' = '%s',"
+ " 'vtctl.port' = '%s',"
+ " 'keyspace' = '%s',"
+ " 'table-name' = '%s'"
+ ");",
VITESS_CONTAINER_NETWORK_ALIAS,
GRPC_PORT,
VITESS_CONTAINER_NETWORK_ALIAS,
VTCTLD_GRPC_PORT,
VITESS_CONTAINER.getKeyspace(),
"test.products");
String sinkDDL =

@ -16,8 +16,8 @@
package com.ververica.cdc.connectors.vitess;
import com.ververica.cdc.connectors.vitess.config.SchemaAdjustmentMode;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.connector.vitess.VitessConnector;
@ -47,8 +47,13 @@ public class VitessSource {
private String keyspace;
private String username;
private String password;
private VtctldConfig vtctldConfig;
private TabletType tabletType = TabletType.RDONLY;
private String shard;
private String gtid = "current";
private Boolean stopOnReshard = false;
private Boolean tombstonesOnDelete = true;
private String[] messageKeyColumns;
private SchemaAdjustmentMode schemaNameAdjustmentMode = SchemaAdjustmentMode.NONE;
private String[] tableIncludeList;
private String[] tableExcludeList;
private String[] columnIncludeList;
@ -86,9 +91,71 @@ public class VitessSource {
return this;
}
/** VTCtld server config. */
public Builder<T> vtctldConfig(VtctldConfig vtctldConfig) {
this.vtctldConfig = vtctldConfig;
/**
* An optional name of the shard from which to stream the changes. If not configured, in
* case of unsharded keyspace, the connector streams changes from the only shard, in case of
* sharded keyspace, the connector streams changes from all shards in the keyspace. We
* recommend not configuring it in order to stream from all shards in the keyspace because
* it has better support for reshard operation. If configured, for example, -80, the
* connector will stream changes from the -80 shard.
*/
public Builder<T> shard(String shard) {
this.shard = shard;
return this;
}
/**
* An optional GTID position for a shard to stream from. This has to be set together with
* vitess.shard. If not configured, the connector streams changes from the latest position
* for the given shard.
*/
public Builder<T> gtid(String gtid) {
this.gtid = gtid;
return this;
}
/**
* Controls Vitess flag stop_on_reshard. true - the stream will be stopped after a reshard
* operation. false - the stream will be automatically migrated for the new shards after a
* reshard operation. If set to true, you should also consider setting vitess.gtid in the
* configuration.
*/
public Builder<T> stopOnReshard(Boolean stopOnReshard) {
this.stopOnReshard = stopOnReshard;
return this;
}
/**
* * Controls whether a delete event is followed by a tombstone event. true - a delete
* operation is represented by a delete event and a subsequent tombstone event. false - only
* a delete event is emitted.
*/
public Builder<T> tombstonesOnDelete(Boolean tombstonesOnDelete) {
this.tombstonesOnDelete = tombstonesOnDelete;
return this;
}
/**
* A semicolon separated list of tables with regular expressions that match table column
* names. The connector maps values in matching columns to key fields in change event
* records that it sends to Kafka topics. This is useful when a table does not have a
* primary key, or when you want to order change event records in a Kafka topic according to
* a field that is not a primary key. Separate entries with semicolons. Insert a colon
* between the fully-qualified table name and its regular expression. The format is:
* keyspace-name.table-name:_regexp_;
*/
public Builder<T> messageKeyColumns(String[] messageKeyColumns) {
this.messageKeyColumns = messageKeyColumns;
return this;
}
/**
* Specifies how schema names should be adjusted for compatibility with the message
* converter used by the connector. Possible settings: avro replaces the characters that
* cannot be used in the Avro type name with underscore. none does not apply any adjustment.
*/
public Builder<T> schemaNameAdjustmentMode(SchemaAdjustmentMode schemaNameAdjustmentMode) {
this.schemaNameAdjustmentMode = schemaNameAdjustmentMode;
return this;
}
@ -199,23 +266,28 @@ public class VitessSource {
props.setProperty("database.port", String.valueOf(port));
props.setProperty("vitess.keyspace", checkNotNull(keyspace));
props.setProperty("vitess.tablet.type", tabletType.name());
props.setProperty("vitess.vtctld.host", checkNotNull(vtctldConfig.getHostname()));
props.setProperty("vitess.vtctld.port", String.valueOf(vtctldConfig.getPort()));
if (username != null) {
props.setProperty("user", username);
props.setProperty("vitess.database.user", username);
}
if (vtctldConfig.getPassword() != null) {
props.setProperty("password", password);
if (password != null) {
props.setProperty("vitess.database.password", password);
}
if (vtctldConfig.getUsername() != null) {
props.setProperty("vitess.vtctld.user", vtctldConfig.getUsername());
if (shard != null) {
props.setProperty("vitess.shard", shard);
}
if (vtctldConfig.getPassword() != null) {
props.setProperty("vitess.vtctld.password", vtctldConfig.getPassword());
props.setProperty("vitess.gtid", checkNotNull(gtid));
if (messageKeyColumns != null) {
props.setProperty("message.key.columns", String.join(",", messageKeyColumns));
}
props.setProperty(
"schema.name.adjustment.mode", schemaNameAdjustmentMode.name().toLowerCase());
props.setProperty("vitess.stop_on_reshard", stopOnReshard.toString());
props.setProperty("tombstones.on.delete", tombstonesOnDelete.toString());
// The maximum number of tasks that should be created for this connector.
// The Vitess connector always uses a single task and therefore does not use this value,
// so the default is always acceptable.

@ -0,0 +1,36 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.vitess.config;
/**
* Specifies how schema names should be adjusted for compatibility with the message converter used
* by the connector.
*/
public enum SchemaAdjustmentMode {
/** Replaces the characters that cannot be used in the Avro type name with underscore. */
AVRO,
/** Does not apply any adjustment. */
NONE;
public static SchemaAdjustmentMode avro() {
return SchemaAdjustmentMode.AVRO;
}
public static SchemaAdjustmentMode none() {
return SchemaAdjustmentMode.NONE;
}
}

@ -1,132 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.vitess.config;
import java.util.Objects;
/** VTCtld server configuration options. */
public class VtctldConfig {
private String hostname;
private int port = 15999; // default 15999 port
private String username;
private String password;
public String getHostname() {
return hostname;
}
public int getPort() {
return port;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public static VtctldConfig.Builder builder() {
return new VtctldConfig.Builder();
}
/** Builder class of {@link VtctldConfig}. */
public static final class Builder {
private String hostname;
private int port = 15999; // default 15999 port
private String username;
private String password;
/** IP address or hostname of the VTCtld server. */
public Builder hostname(String hostname) {
this.hostname = hostname;
return this;
}
/** Integer port number of the VTCtld server. */
public Builder port(int port) {
this.port = port;
return this;
}
/**
* An optional username of the VTCtld server. If not configured, unauthenticated VTCtld gRPC
* is used.
*/
public Builder username(String username) {
this.username = username;
return this;
}
/**
* An optional password of the VTCtld server. If not configured, unauthenticated VTCtld gRPC
* is used.
*/
public Builder password(String password) {
this.password = password;
return this;
}
public VtctldConfig build() {
VtctldConfig vtctldConfig = new VtctldConfig();
vtctldConfig.password = this.password;
vtctldConfig.username = this.username;
vtctldConfig.hostname = this.hostname;
vtctldConfig.port = this.port;
return vtctldConfig;
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
VtctldConfig that = (VtctldConfig) o;
return port == that.port
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password);
}
@Override
public int hashCode() {
return Objects.hash(hostname, port, username, password);
}
@Override
public String toString() {
return "VtctldConfig{"
+ "hostname='"
+ hostname
+ '\''
+ ", port="
+ port
+ ", username='"
+ username
+ '\''
+ ", password='"
+ password
+ '\''
+ '}';
}
}

@ -25,8 +25,8 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import com.ververica.cdc.connectors.vitess.config.SchemaAdjustmentMode;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import java.util.HashSet;
import java.util.Set;
@ -70,29 +70,38 @@ public class VitessTableFactory implements DynamicTableSourceFactory {
.noDefaultValue()
.withDescription("The password of the Vitess database server (VTGate gRPC).");
private static final ConfigOption<String> VTCTL_HOSTNAME =
ConfigOptions.key("vtctl.hostname")
private static final ConfigOption<String> SHARD =
ConfigOptions.key("vitess.shard")
.stringType()
.noDefaultValue()
.withDescription("IP address or hostname of the VTCtld server.");
private static final ConfigOption<Integer> VTCTL_PORT =
ConfigOptions.key("vtctl.port")
.intType()
.defaultValue(15999)
.withDescription("Integer port number of the VTCtld server.");
.withDescription(
"An optional name of the shard from which to stream the changes.");
private static final ConfigOption<String> VTCTL_USERNAME =
ConfigOptions.key("vtctl.username")
private static final ConfigOption<String> GTID =
ConfigOptions.key("vitess.gtid")
.stringType()
.noDefaultValue()
.withDescription("The username of the Vitess VTCtld server.");
.defaultValue("current")
.withDescription("An optional GTID position for a shard to stream from.");
private static final ConfigOption<Boolean> STOP_ON_RESHARD =
ConfigOptions.key("vitess.stop_on_reshard")
.booleanType()
.defaultValue(false)
.withDescription("Controls Vitess flag stop_on_reshard.");
private static final ConfigOption<Boolean> TOMBSTONES_ON_DELETE =
ConfigOptions.key("tombstones.on.delete")
.booleanType()
.defaultValue(true)
.withDescription(
"Controls whether a delete event is followed by a tombstone event.");
private static final ConfigOption<String> VTCTL_PASSWORD =
ConfigOptions.key("vtctl.password")
private static final ConfigOption<String> SCHEMA_NAME_ADJUSTMENT_MODE =
ConfigOptions.key("schema.name.adjustment.mode")
.stringType()
.noDefaultValue()
.withDescription("The password of the Vitess VTCtld server.");
.defaultValue("avro")
.withDescription(
"Specifies how schema names should be adjusted for compatibility with the message converter used by the connector.");
private static final ConfigOption<String> TABLET_TYPE =
ConfigOptions.key("tablet-type")
@ -134,15 +143,14 @@ public class VitessTableFactory implements DynamicTableSourceFactory {
int port = config.get(PORT);
String keyspace = config.get(KEYSPACE);
String tableName = config.get(TABLE_NAME);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
VtctldConfig vtctldConfig =
new VtctldConfig.Builder()
.hostname(config.get(VTCTL_HOSTNAME))
.port(config.get(VTCTL_PORT))
.username(config.get(VTCTL_USERNAME))
.password(config.get(VTCTL_PASSWORD))
.build();
String username = config.getOptional(USERNAME).orElse(null);
String password = config.getOptional(PASSWORD).orElse(null);
String shard = config.getOptional(SHARD).orElse(null);
String gtid = config.get(GTID);
Boolean stopOnReshard = config.get(STOP_ON_RESHARD);
Boolean tombstonesOnDelete = config.get(TOMBSTONES_ON_DELETE);
SchemaAdjustmentMode schemaNameAdjustmentMode =
SchemaAdjustmentMode.valueOf(config.get(SCHEMA_NAME_ADJUSTMENT_MODE).toUpperCase());
TabletType tabletType = TabletType.valueOf(config.get(TABLET_TYPE));
String pluginName = config.get(DECODING_PLUGIN_NAME);
String name = config.get(NAME);
@ -156,7 +164,11 @@ public class VitessTableFactory implements DynamicTableSourceFactory {
tableName,
username,
password,
vtctldConfig,
shard,
gtid,
stopOnReshard,
tombstonesOnDelete,
schemaNameAdjustmentMode,
tabletType,
pluginName,
name,
@ -173,7 +185,6 @@ public class VitessTableFactory implements DynamicTableSourceFactory {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(KEYSPACE);
options.add(VTCTL_HOSTNAME);
options.add(TABLE_NAME);
return options;
}
@ -182,7 +193,11 @@ public class VitessTableFactory implements DynamicTableSourceFactory {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
options.add(VTCTL_PORT);
options.add(SHARD);
options.add(GTID);
options.add(STOP_ON_RESHARD);
options.add(TOMBSTONES_ON_DELETE);
options.add(SCHEMA_NAME_ADJUSTMENT_MODE);
options.add(USERNAME);
options.add(PASSWORD);
options.add(TABLET_TYPE);

@ -27,12 +27,14 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.vitess.VitessSource;
import com.ververica.cdc.connectors.vitess.config.SchemaAdjustmentMode;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import javax.annotation.Nullable;
import java.time.ZoneId;
import java.util.Objects;
import java.util.Properties;
@ -51,10 +53,14 @@ public class VitessTableSource implements ScanTableSource {
private final int port;
private final String hostname;
private final String keyspace;
private final String username;
private final String password;
@Nullable private final String username;
@Nullable private final String password;
private final String tableName;
private final VtctldConfig vtctldConfig;
@Nullable private String shard;
private String gtid;
private Boolean stopOnReshard;
private Boolean tombstonesOnDelete;
private SchemaAdjustmentMode schemaNameAdjustmentMode;
private final TabletType tabletType;
private final Properties dbzProperties;
@ -64,9 +70,13 @@ public class VitessTableSource implements ScanTableSource {
String hostname,
String keyspace,
String tableName,
String username,
String password,
VtctldConfig vtctldConfig,
@Nullable String username,
@Nullable String password,
@Nullable String shard,
String gtid,
Boolean stopOnReshard,
Boolean tombstonesOnDelete,
SchemaAdjustmentMode schemaNameAdjustmentMode,
TabletType tabletType,
String pluginName,
String name,
@ -78,7 +88,11 @@ public class VitessTableSource implements ScanTableSource {
this.tableName = checkNotNull(tableName);
this.username = username;
this.password = password;
this.vtctldConfig = checkNotNull(vtctldConfig);
this.shard = shard;
this.gtid = gtid;
this.stopOnReshard = stopOnReshard;
this.tombstonesOnDelete = tombstonesOnDelete;
this.schemaNameAdjustmentMode = checkNotNull(schemaNameAdjustmentMode);
this.tabletType = checkNotNull(tabletType);
this.pluginName = checkNotNull(pluginName);
this.name = name;
@ -119,7 +133,11 @@ public class VitessTableSource implements ScanTableSource {
.password(password)
.tabletType(tabletType)
.decodingPluginName(pluginName)
.vtctldConfig(vtctldConfig)
.shard(shard)
.gtid(gtid)
.stopOnReshard(stopOnReshard)
.tombstonesOnDelete(tombstonesOnDelete)
.schemaNameAdjustmentMode(schemaNameAdjustmentMode)
.name(name)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
@ -137,7 +155,11 @@ public class VitessTableSource implements ScanTableSource {
tableName,
username,
password,
vtctldConfig,
shard,
gtid,
stopOnReshard,
tombstonesOnDelete,
schemaNameAdjustmentMode,
tabletType,
pluginName,
name,
@ -162,7 +184,11 @@ public class VitessTableSource implements ScanTableSource {
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(vtctldConfig, that.vtctldConfig)
&& Objects.equals(shard, that.shard)
&& Objects.equals(gtid, that.gtid)
&& Objects.equals(stopOnReshard, that.stopOnReshard)
&& Objects.equals(tombstonesOnDelete, that.tombstonesOnDelete)
&& Objects.equals(schemaNameAdjustmentMode, that.schemaNameAdjustmentMode)
&& tabletType == that.tabletType
&& Objects.equals(dbzProperties, that.dbzProperties);
}
@ -179,7 +205,11 @@ public class VitessTableSource implements ScanTableSource {
username,
password,
tableName,
vtctldConfig,
shard,
gtid,
stopOnReshard,
tombstonesOnDelete,
schemaNameAdjustmentMode,
tabletType,
dbzProperties);
}
@ -204,16 +234,26 @@ public class VitessTableSource implements ScanTableSource {
+ keyspace
+ '\''
+ ", username='"
+ username
+ String.valueOf(username)
+ '\''
+ ", password='"
+ password
+ String.valueOf(password)
+ '\''
+ ", tableName='"
+ tableName
+ '\''
+ ", vtctldConfig="
+ vtctldConfig
+ ", shard='"
+ String.valueOf(shard)
+ '\''
+ ", gtid='"
+ gtid
+ '\''
+ ", stopOnReshard="
+ stopOnReshard
+ ", tombstonesOnDelete="
+ tombstonesOnDelete
+ ", schemaNameAdjustmentMode="
+ schemaNameAdjustmentMode
+ ", tabletType="
+ tabletType
+ ", dbzProperties="

@ -33,7 +33,6 @@ import org.apache.flink.util.Collector;
import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.connectors.vitess.VitessSource;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
@ -147,11 +146,6 @@ public class VitessSourceTest extends VitessTestBase {
.keyspace(VITESS_CONTAINER.getKeyspace())
.tabletType(TabletType.MASTER)
.tableIncludeList("test.products")
.vtctldConfig(
VtctldConfig.builder()
.hostname(VITESS_CONTAINER.getHost())
.port(VITESS_CONTAINER.getVtctldGrpcPort())
.build())
.deserializer(new ForwardDeserializeSchema())
.debeziumProperties(properties)
.build();

@ -41,7 +41,6 @@ import java.util.stream.Stream;
import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.GRPC_PORT;
import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.MYSQL_PORT;
import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.VTCTLD_GRPC_PORT;
import static org.junit.Assert.assertNotNull;
/** Basic class for testing Vitess source, this contains a Vitess container. */
@ -56,7 +55,7 @@ public abstract class VitessTestBase extends AbstractTestBase {
.withKeyspace("test")
.withUsername("flinkuser")
.withPassword("flinkpwd")
.withExposedPorts(MYSQL_PORT, GRPC_PORT, VTCTLD_GRPC_PORT)
.withExposedPorts(MYSQL_PORT, GRPC_PORT)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@BeforeClass

@ -25,7 +25,6 @@ public class VitessContainer extends JdbcDatabaseContainer {
public static final String DEFAULT_TAG = "mysql80";
private static final Integer VITESS_PORT = 15991;
public static final Integer GRPC_PORT = VITESS_PORT + 1;
public static final Integer VTCTLD_GRPC_PORT = VITESS_PORT + 8;
public static final Integer MYSQL_PORT = VITESS_PORT + 3;
private String keyspaces = "test";
@ -85,10 +84,6 @@ public class VitessContainer extends JdbcDatabaseContainer {
return this.getMappedPort(GRPC_PORT);
}
public Integer getVtctldGrpcPort() {
return this.getMappedPort(VTCTLD_GRPC_PORT);
}
@Override
protected String getTestQueryString() {
return "SELECT 1";

@ -73,15 +73,11 @@ public class VitessConnectorITCase extends VitessTestBase {
+ " 'tablet-type' = 'MASTER',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'vtctl.hostname' = '%s',"
+ " 'vtctl.port' = '%s',"
+ " 'keyspace' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
VITESS_CONTAINER.getHost(),
VITESS_CONTAINER.getGrpcPort(),
VITESS_CONTAINER.getHost(),
VITESS_CONTAINER.getVtctldGrpcPort(),
VITESS_CONTAINER.getKeyspace(),
"test.products");
String sinkDDL =
@ -177,15 +173,11 @@ public class VitessConnectorITCase extends VitessTestBase {
+ " 'tablet-type' = 'MASTER',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'vtctl.hostname' = '%s',"
+ " 'vtctl.port' = '%s',"
+ " 'keyspace' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
VITESS_CONTAINER.getHost(),
VITESS_CONTAINER.getGrpcPort(),
VITESS_CONTAINER.getHost(),
VITESS_CONTAINER.getVtctldGrpcPort(),
VITESS_CONTAINER.getKeyspace(),
"test.full_types");
tEnv.executeSql(sourceDDL);

@ -30,8 +30,8 @@ import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.vitess.config.SchemaAdjustmentMode;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import com.ververica.cdc.connectors.vitess.table.VitessTableFactory;
import com.ververica.cdc.connectors.vitess.table.VitessTableSource;
import org.junit.Test;
@ -61,7 +61,6 @@ public class VitessTableFactoryTest {
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final String MY_SCHEMA = "public";
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
@ -84,7 +83,11 @@ public class VitessTableFactoryTest {
MY_TABLE,
null,
null,
VtctldConfig.builder().hostname(MY_LOCALHOST).port(15999).build(),
null,
"current",
false,
true,
SchemaAdjustmentMode.AVRO,
TabletType.RDONLY,
"decoderbufs",
"flink",
@ -96,7 +99,6 @@ public class VitessTableFactoryTest {
public void testOptionalProperties() {
Map<String, String> options = getAllOptions();
options.put("port", "5444");
options.put("vtctl.port", "5445");
options.put("decoding.plugin.name", "wal2json");
options.put("debezium.snapshot.mode", "never");
options.put("name", "flink");
@ -116,7 +118,11 @@ public class VitessTableFactoryTest {
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
VtctldConfig.builder().hostname(MY_LOCALHOST).port(5445).build(),
null,
"current",
false,
true,
SchemaAdjustmentMode.AVRO,
TabletType.MASTER,
"wal2json",
"flink",
@ -177,7 +183,6 @@ public class VitessTableFactoryTest {
options.put("connector", "vitess-cdc");
options.put("hostname", MY_LOCALHOST);
options.put("keyspace", MY_KEYSPACE);
options.put("vtctl.hostname", MY_LOCALHOST);
options.put("table-name", MY_TABLE);
return options;
}

Loading…
Cancel
Save