[vitess] Add Vitess CDC connector (#456)

Co-authored-by: gintarasm <gintaras.matulas@gmail.com>
pull/2153/head
Simonas Gelazevicius 3 years ago committed by Qingsheng Ren
parent 2b017346c3
commit e349cb1a83

@ -17,6 +17,7 @@ This README is meant as a brief walkthrough on the core features of CDC Connecto
| [sqlserver-cdc](docs/content/connectors/sqlserver-cdc.md) | <li> [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |
| [tidb-cdc](docs/content/connectors/tidb-cdc.md) | <li> [TiDB](https://www.pingcap.com): 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 |
| [Db2-cdc](docs/content/connectors/db2-cdc.md) | <li> [Db2](https://www.ibm.com/products/db2): 11.5 | Db2 Driver: 11.5.0.0 |
| [Vitess-cdc](connectors/vitess-cdc.md) | <li> [Vitess](https://vitess.io/): 8.0.x, 9.0.x | MySql JDBC Driver: 8.0.16 |
## Features

@ -16,9 +16,10 @@ The CDC Connectors for Apache Flink<sup>®</sup> integrate Debezium as the engin
| [postgres-cdc](connectors/postgres-cdc.md) | <li> [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.5.1 |
| [sqlserver-cdc](connectors/sqlserver-cdc.md) | <li> [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |
| [tidb-cdc](connectors/tidb-cdc.md) | <li> [TiDB](https://www.pingcap.com/): 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 |
| [db2-cdc](connectors/db2-cdc.md) | <li> [Db2](https://www.ibm.com/products/db2): 11.5 | DB2 Driver: 11.5.0.0 |
| [db2-cdc](connectors/db2-cdc.md) | <li> [Db2](https://www.ibm.com/products/db2): 11.5 | DB2 Driver: 11.5.0.0 |
| [vitess-cdc](connectors/vitess-cdc.md) | <li> [Vitess](https://vitess.io/): 8.0.x, 9.0.x | MySql JDBC Driver: 8.0.16 |
## Supported Flink Versions
## Supported Flink Versions
The following table shows the version mapping between Flink<sup>®</sup> CDC Connectors and Flink<sup>®</sup>:
| Flink<sup>®</sup> CDC Version | Flink<sup>®</sup> Version |

@ -14,4 +14,5 @@ postgres-cdc
sqlserver-cdc
tidb-cdc
db2-cdc
vitess-cdc
```

@ -0,0 +1,309 @@
# Vitess CDC Connector
The Vitess CDC connector allows for reading of incremental data from Vitess cluster. The connector does not support snapshot feature at the moment. This document describes how to setup the Vitess CDC connector to run SQL queries against Vitess databases.
[Vitess debezium documentation](https://debezium.io/documentation/reference/connectors/vitess.html)
Dependencies
------------
In order to setup the Vitess CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
### Maven dependency
```
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-vitess-cdc</artifactId>
<version>2.4-SNAPSHOT</version>
</dependency>
```
### SQL Client JAR
Download [flink-sql-connector-vitess-cdc-2.0.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-vitess-cdc/2.0.0/flink-sql-connector-vitess-cdc-2.0.0.jar) and put it under `<FLINK_HOME>/lib/`.
Setup Vitess server
----------------
You can follow the Local Install via [Docker guide](https://vitess.io/docs/get-started/local-docker/), or the Vitess Operator for [Kubernetes guide](https://vitess.io/docs/get-started/operator/) to install Vitess. No special setup is needed to support Vitess connector.
### Checklist
* Make sure that the VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed
* Make sure that the VTCtld host and its gRPC port (default is 15999) is accessible from the machine where the Vitess connector is installed
### gRPC authentication
Because Vitess connector reads change events from the VTGate VStream gRPC server, it does not need to connect directly to MySQL instances.
Therefore, no special database user and permissions are needed. At the moment, Vitess connector only supports unauthenticated access to the VTGate gRPC server.
How to create a Vitess CDC table
----------------
The Vitess CDC table can be defined as following:
```sql
-- checkpoint every 3000 milliseconds
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- register a Vitess table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'vitess-cdc',
'hostname' = 'localhost',
'port' = '3306',
'keyspace' = 'mydb',
'vtctl.hostname' = 'localhost',
'table-name' = 'orders');
-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;
```
Connector Options
----------------
<div class="&ldquo;highlight&rdquo;">
<table class="&ldquo;colwidths-auto">
<thead>
<tr>
<th class="&ldquo;text-left&rdquo;">Option</th>
<th class="&ldquo;text-left&rdquo;">Required</th>
<th class="&ldquo;text-left&rdquo;">Default</th>
<th class="&ldquo;text-left&rdquo;">Type</th>
<th class="&ldquo;text-left&rdquo;">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>connector</td>
<td>required</td>
<td>(none)</td>
<td>String</td>
<td>Specify what connector to use, here should be <code>&lsquo;vitess-cdc&rsquo;</code>.</td>
</tr>
<tr>
<td>hostname</td>
<td>required</td>
<td>(none)</td>
<td>String</td>
<td>IP address or hostname of the Vitess database server (VTGate).</td>
</tr>
<tr>
<td>keyspace</td>
<td>required</td>
<td>(none)&nbsp;</td>
<td>&nbsp;String</td>
<td>The name of the keyspace from which to stream the changes.</td>
</tr>
<tr>
<td>username</td>
<td>optional</td>
<td>(none)</td>
<td>String</td>
<td>An optional username of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.</td>
</tr>
<tr>
<td>password</td>
<td>optional</td>
<td>(none)</td>
<td>String</td>
<td>An optional password of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.</td>
</tr>
<tr>
<td>table-name</td>
<td>required</td>
<td>(none)</td>
<td>String</td>
<td>Table name of the MySQL database to monitor.</td>
</tr>
<tr>
<td>port</td>
<td>optional</td>
<td>15991</td>
<td>Integer</td>
<td>Integer port number of the VTCtld server.</td>
</tr>
<tr>
<td>vtctld.host</td>
<td>required</td>
<td>(none)</td>
<td>String&nbsp;</td>
<td>IP address or hostname of the VTCtld server.</td>
</tr>
<tr>
<td>vtctld.port</td>
<td>optional</td>
<td>15999</td>
<td>Integer&nbsp;</td>
<td>Integer port number of the VTCtld server.</td>
</tr>
<tr>
<td>vtctld.user</td>
<td>optional</td>
<td>(none)&nbsp;</td>
<td>String&nbsp;</td>
<td>An optional username of the VTCtld server. If not configured, unauthenticated VTCtld gRPC is used.</td>
</tr>
<tr>
<td>vtctld.password</td>
<td>optional</td>
<td>(none)&nbsp;</td>
<td>String&nbsp;</td>
<td>An optional password of the VTCtld server. If not configured, unauthenticated VTCtld gRPC is used.</td>
</tr>
<tr>
<td>tablet.type</td>
<td>optional</td>
<td>RDONLY&nbsp;</td>
<td>String&nbsp;</td>
<td>The type of Tablet (hence MySQL) from which to stream the changes: MASTER represents streaming from the master MySQL instance REPLICA represents streaming from the replica slave MySQL instance RDONLY represents streaming from the read-only slave MySQL instance.</td>
</tr>
</tbody>
</table>
</div>
Features
--------
### Incremental Reading
The Vitess connector spends all its time streaming changes from the VTGates VStream gRPC service to which it is subscribed. The client receives changes from VStream as they are committed in the underlying MySQL servers binlog at certain positions, which are referred to as VGTID.
The VGTID in Vitess is the equivalent of GTID in MySQL, it describes the position in the VStream in which a change event happens. Typically, A VGTID has multiple shard GTIDs, each shard GTID is a tuple of (Keyspace, Shard, GTID), which describes the GTID position of a given shard.
When subscribing to a VStream service, the connector needs to provide a VGTID and a Tablet Type (e.g. MASTER, REPLICA). The VGTID describes the position from which VStream should starts sending change events; the Tablet type describes which underlying MySQL instance (master or replica) in each shard do we read change events from.
The first time the connector connects to a Vitess cluster, it gets the current VGTID from a Vitess component called VTCtld and provides the current VGTID to VStream.
The Debezium Vitess connector acts as a gRPC client of VStream. When the connector receives changes it transforms the events into Debezium create, update, or delete events that include the VGTID of the event. The Vitess connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.
#### Checkpoint
Incremental snapshot reading provides the ability to perform checkpoint in chunk level. It resolves the checkpoint timeout problem in previous version with old snapshot reading mechanism.
### Exactly-Once Processing
The Vitess CDC connector is a Flink Source connector which will read table snapshot chunks first and then continues to read binlog,
both snapshot phase and binlog phase, Vitess CDC connector read with **exactly-once processing** even failures happen.
### DataStream Source
The Incremental Reading feature of Vitess CDC Source only exposes in SQL currently, if you're using DataStream, please use Vitess Source:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.vitess.VitessSource;
public class VitessSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = VitessSource.<String>builder()
.hostname("localhost")
.port(15991)
.keyspace("inventory")
.username("flinkuser")
.password("flinkpw")
.vtctldConfig(VtctldConfig
.builder()
.hostname("localhost")
.port(15999)
.build())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
```
**Note:** Please refer [Deserialization](../about.html#deserialization) for more details about the JSON deserialization.
Data Type Mapping
----------------
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">MySQL type<a href="https://dev.mysql.com/doc/man/8.0/en/data-types.html"></a></th>
<th class="text-left">Flink SQL type<a href="{% link dev/table/types.md %}"></a></th>
</tr>
</thead>
<tbody>
<tr>
<td>TINYINT</td>
<td>TINYINT</td>
</tr>
<tr>
<td>
SMALLINT<br>
TINYINT UNSIGNED</td>
<td>SMALLINT</td>
</tr>
<tr>
<td>
INT<br>
MEDIUMINT<br>
SMALLINT UNSIGNED</td>
<td>INT</td>
</tr>
<tr>
<td>
BIGINT<br>
INT UNSIGNED</td>
<td>BIGINT</td>
</tr>
<tr>
<td>BIGINT UNSIGNED</td>
<td>DECIMAL(20, 0)</td>
</tr>
<tr>
<td>BIGINT</td>
<td>BIGINT</td>
</tr>
<tr>
<td>FLOAT</td>
<td>FLOAT</td>
</tr>
<tr>
<td>
DOUBLE<br>
DOUBLE PRECISION</td>
<td>DOUBLE</td>
</tr>
<tr>
<td>
NUMERIC(p, s)<br>
DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
</tr>
<tr>
<td>
BOOLEAN<br>
TINYINT(1)</td>
<td>BOOLEAN</td>
</tr>
<tr>
<td>
CHAR(n)<br>
VARCHAR(n)<br>
TEXT</td>
<td>STRING</td>
</tr>
</tbody>
</table>
</div>

@ -0,0 +1,166 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.ververica</groupId>
<version>2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-connector-vitess-cdc</artifactId>
<name>flink-connector-vitess-cdc</name>
<packaging>jar</packaging>
<dependencies>
<!-- Debezium dependencies -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>kafka-log4j-appender</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-vitess</artifactId>
<version>${debezium.version}</version>
</dependency>
<!-- test dependencies on Debezium -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-test-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<scope>test</scope>
</dependency>
<!-- test dependencies on Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- test dependencies on TestContainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.vitess;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.connector.vitess.VitessConnector;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A builder to build a SourceFunction which can read and process vitess database changes. The
* Vitess connector subscribes to VTGate's VStream gRPC service. VTGate is a lightweight, stateless
* gRPC server, which is part of the Vitess cluster setup.
*/
public class VitessSource {
public static <T> Builder<T> builder() {
return new Builder<>();
}
/** Builder class of {@link VitessSource}. */
public static class Builder<T> {
private String pluginName = "decoderbufs";
private String name = "flink";
private int port = 15991; // default 15991 port
private String hostname;
private String keyspace;
private String username;
private String password;
private VtctldConfig vtctldConfig;
private TabletType tabletType = TabletType.RDONLY;
private String[] tableIncludeList;
private String[] tableExcludeList;
private String[] columnIncludeList;
private String[] columnExcludeList;
private Properties dbzProperties;
private DebeziumDeserializationSchema<T> deserializer;
/**
* The name of the Vitess logical decoding plug-in installed on the server. Supported values
* are decoderbufs
*/
public Builder<T> decodingPluginName(String name) {
this.pluginName = name;
return this;
}
/** Hostname of the VTGates VStream server. */
public Builder<T> hostname(String hostname) {
this.hostname = hostname;
return this;
}
/** Integer port number of the VTGates VStream server. */
public Builder<T> port(int port) {
this.port = port;
return this;
}
/**
* The name of the keyspace (a.k.a database). If no shard is specified, it reads change
* events from all shards in the keyspace.
*/
public Builder<T> keyspace(String keyspace) {
this.keyspace = keyspace;
return this;
}
/** VTCtld server config. */
public Builder<T> vtctldConfig(VtctldConfig vtctldConfig) {
this.vtctldConfig = vtctldConfig;
return this;
}
/**
* The type of Tablet (hence MySQL) from which to stream the changes: MASTER represents
* streaming from the master MySQL instance REPLICA represents streaming from the replica
* slave MySQL instance RDONLY represents streaming from the read-only slave MySQL instance.
*/
public Builder<T> tabletType(TabletType tabletType) {
this.tabletType = tabletType;
return this;
}
/** The username of the Vitess database server (VTGate gRPC). */
public Builder<T> username(String username) {
this.username = username;
return this;
}
/** The password of the Vitess database server (VTGate gRPC). */
public Builder<T> password(String password) {
this.password = password;
return this;
}
/**
* Unique name for the connector. Attempting to register again with the same name will fail.
* This property is required by all Kafka Connect connectors. Default is "flink".
*/
public Builder<T> name(String name) {
this.name = name;
return this;
}
/**
* An optional, comma-separated list of regular expressions that match fully-qualified table
* identifiers for tables whose changes you want to capture. Any table not included in
* table.include.list does not have its changes captured. Each identifier is of the form
* keyspace.tableName. By default, the connector captures changes in every non-system table
* in each schema whose changes are being captured. Do not also set the table.exclude.list
* property.
*/
public Builder<T> tableIncludeList(String... tableIncludeList) {
this.tableIncludeList = tableIncludeList;
return this;
}
/**
* An optional, comma-separated list of regular expressions that match fully-qualified table
* identifiers for tables whose changes you do not want to capture. Any table not included
* in table.exclude.list has it changes captured. Each identifier is of the form
* keyspace.tableName. Do not also set the table.include.list property.
*/
public Builder<T> tableExcludeList(String... tableExcludeList) {
this.tableExcludeList = tableExcludeList;
return this;
}
/**
* An optional, comma-separated list of regular expressions that match the fully-qualified
* names of columns that should be included in change event record values. Fully-qualified
* names for columns are of the form keyspace.tableName.columnName. Do not also set the
* column.exclude.list property.
*/
public Builder<T> columnIncludeList(String... columnIncludeList) {
this.columnIncludeList = columnIncludeList;
return this;
}
/**
* An optional, comma-separated list of regular expressions that match the fully-qualified
* names of columns that should be excluded from change event record values. Fully-qualified
* names for columns are of the form keyspace.tableName.columnName. Do not also set the
* column.include.list property.
*/
public Builder<T> columnExcludeList(String... columnExcludeList) {
this.columnExcludeList = columnExcludeList;
return this;
}
/** The Debezium Vitess connector properties. */
public Builder<T> debeziumProperties(Properties properties) {
this.dbzProperties = properties;
return this;
}
/**
* The deserializer used to convert from consumed {@link
* org.apache.kafka.connect.source.SourceRecord}.
*/
public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
this.deserializer = deserializer;
return this;
}
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", VitessConnector.class.getCanonicalName());
props.setProperty("plugin.name", pluginName);
props.setProperty("name", name);
// hard code server name, because we don't need to distinguish it, docs:
// Logical name that identifies and provides a namespace for the particular Vitess
// Vtgate server/cluster being monitored. The logical name should be unique across
// all other connectors, since it is used as a prefix for all Kafka topic names coming
// from this connector. Only alphanumeric characters and underscores should be used.
props.setProperty("database.server.name", "vitess_cdc_source");
props.setProperty("database.hostname", checkNotNull(hostname));
props.setProperty("database.port", String.valueOf(port));
props.setProperty("vitess.keyspace", checkNotNull(keyspace));
props.setProperty("vitess.tablet.type", tabletType.name());
props.setProperty("vitess.vtctld.host", checkNotNull(vtctldConfig.getHostname()));
props.setProperty("vitess.vtctld.port", String.valueOf(vtctldConfig.getPort()));
if (username != null) {
props.setProperty("user", username);
}
if (vtctldConfig.getPassword() != null) {
props.setProperty("password", password);
}
if (vtctldConfig.getUsername() != null) {
props.setProperty("vitess.vtctld.user", vtctldConfig.getUsername());
}
if (vtctldConfig.getPassword() != null) {
props.setProperty("vitess.vtctld.password", vtctldConfig.getPassword());
}
// The maximum number of tasks that should be created for this connector.
// The Vitess connector always uses a single task and therefore does not use this value,
// so the default is always acceptable.
props.setProperty("tasks.max", "1");
if (tableIncludeList != null) {
props.setProperty("table.include.list", String.join(",", tableIncludeList));
}
if (tableExcludeList != null) {
props.setProperty("table.exclude.list", String.join(",", tableExcludeList));
}
if (columnIncludeList != null) {
props.setProperty("column.include.list", String.join(",", columnIncludeList));
}
if (columnExcludeList != null) {
props.setProperty("column.exclude.list", String.join(",", columnExcludeList));
}
if (dbzProperties != null) {
dbzProperties.forEach(props::put);
}
return new DebeziumSourceFunction<>(
deserializer, props, null, new VitessValidator(props));
}
}
}

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.vitess;
import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
import com.ververica.cdc.debezium.Validator;
import io.debezium.connector.vitess.VitessConnector;
import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
/** The validator for Vitess. */
public class VitessValidator implements Validator, Serializable {
private static final long serialVersionUID = 1L;
private final Map<String, String> configuration;
public VitessValidator(Properties properties) {
this.configuration = Maps.fromProperties(properties);
}
@Override
public void validate() {
VitessConnector c = new VitessConnector();
c.validate(configuration);
}
}

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.vitess.config;
/** The type of Tablet (hence MySQL) from which to stream the changes. */
public enum TabletType {
/** Streaming from the master MySQL instance. */
MASTER,
/** Streaming from the replica slave MySQL instance. */
REPLICA,
/** Streaming from the read-only slave MySQL instance. */
RDONLY;
public static TabletType master() {
return TabletType.MASTER;
}
public static TabletType replica() {
return TabletType.REPLICA;
}
public static TabletType rdonly() {
return TabletType.RDONLY;
}
}

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.vitess.config;
import java.util.Objects;
/** VTCtld server configuration options. */
public class VtctldConfig {
private String hostname;
private int port = 15999; // default 15999 port
private String username;
private String password;
public String getHostname() {
return hostname;
}
public int getPort() {
return port;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public static VtctldConfig.Builder builder() {
return new VtctldConfig.Builder();
}
/** Builder class of {@link VtctldConfig}. */
public static final class Builder {
private String hostname;
private int port = 15999; // default 15999 port
private String username;
private String password;
/** IP address or hostname of the VTCtld server. */
public Builder hostname(String hostname) {
this.hostname = hostname;
return this;
}
/** Integer port number of the VTCtld server. */
public Builder port(int port) {
this.port = port;
return this;
}
/**
* An optional username of the VTCtld server. If not configured, unauthenticated VTCtld gRPC
* is used.
*/
public Builder username(String username) {
this.username = username;
return this;
}
/**
* An optional password of the VTCtld server. If not configured, unauthenticated VTCtld gRPC
* is used.
*/
public Builder password(String password) {
this.password = password;
return this;
}
public VtctldConfig build() {
VtctldConfig vtctldConfig = new VtctldConfig();
vtctldConfig.password = this.password;
vtctldConfig.username = this.username;
vtctldConfig.hostname = this.hostname;
vtctldConfig.port = this.port;
return vtctldConfig;
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
VtctldConfig that = (VtctldConfig) o;
return port == that.port
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password);
}
@Override
public int hashCode() {
return Objects.hash(hostname, port, username, password);
}
@Override
public String toString() {
return "VtctldConfig{"
+ "hostname='"
+ hostname
+ '\''
+ ", port="
+ port
+ ", username='"
+ username
+ '\''
+ ", password='"
+ password
+ '\''
+ '}';
}
}

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.vitess.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import java.util.HashSet;
import java.util.Set;
import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
/** Factory for creating configured instance of {@link VitessTableSource}. */
public class VitessTableFactory implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "vitess-cdc";
private static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
.withDescription("Hostname of the VTGates VStream server.");
private static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(15991)
.withDescription("Integer port number of the VTGates VStream server.");
private static final ConfigOption<String> KEYSPACE =
ConfigOptions.key("keyspace")
.stringType()
.noDefaultValue()
.withDescription(
"The name of the keyspace (a.k.a database). If no shard is specified, it reads change events from all shards in the keyspace.");
private static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("The username of the Vitess database server (VTGate gRPC).");
private static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("The password of the Vitess database server (VTGate gRPC).");
private static final ConfigOption<String> VTCTL_HOSTNAME =
ConfigOptions.key("vtctl.hostname")
.stringType()
.noDefaultValue()
.withDescription("IP address or hostname of the VTCtld server.");
private static final ConfigOption<Integer> VTCTL_PORT =
ConfigOptions.key("vtctl.port")
.intType()
.defaultValue(15999)
.withDescription("Integer port number of the VTCtld server.");
private static final ConfigOption<String> VTCTL_USERNAME =
ConfigOptions.key("vtctl.username")
.stringType()
.noDefaultValue()
.withDescription("The username of the Vitess VTCtld server.");
private static final ConfigOption<String> VTCTL_PASSWORD =
ConfigOptions.key("vtctl.password")
.stringType()
.noDefaultValue()
.withDescription("The password of the Vitess VTCtld server.");
private static final ConfigOption<String> TABLET_TYPE =
ConfigOptions.key("tablet-type")
.stringType()
.defaultValue(TabletType.RDONLY.name())
.withDescription(
"The type of Tablet (hence MySQL) from which to stream the changes:");
private static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("Table name of the MYSQL database to monitor.");
private static final ConfigOption<String> DECODING_PLUGIN_NAME =
ConfigOptions.key("decoding.plugin.name")
.stringType()
.defaultValue("decoderbufs")
.withDescription(
"The name of the Vitess logical decoding plug-in installed on the server.");
private static final ConfigOption<String> NAME =
ConfigOptions.key("name")
.stringType()
.defaultValue("flink")
.withDescription(
"Unique name for the connector."
+ " Attempting to register again with the same name will fail. "
+ "This property is required by all Kafka Connect connectors. Default is flink.");
@Override
public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
int port = config.get(PORT);
String keyspace = config.get(KEYSPACE);
String tableName = config.get(TABLE_NAME);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
VtctldConfig vtctldConfig =
new VtctldConfig.Builder()
.hostname(config.get(VTCTL_HOSTNAME))
.port(config.get(VTCTL_PORT))
.username(config.get(VTCTL_USERNAME))
.password(config.get(VTCTL_PASSWORD))
.build();
TabletType tabletType = TabletType.valueOf(config.get(TABLET_TYPE));
String pluginName = config.get(DECODING_PLUGIN_NAME);
String name = config.get(NAME);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
return new VitessTableSource(
physicalSchema,
port,
hostname,
keyspace,
tableName,
username,
password,
vtctldConfig,
tabletType,
pluginName,
name,
getDebeziumProperties(context.getCatalogTable().getOptions()));
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(KEYSPACE);
options.add(VTCTL_HOSTNAME);
options.add(TABLE_NAME);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
options.add(VTCTL_PORT);
options.add(USERNAME);
options.add(PASSWORD);
options.add(TABLET_TYPE);
options.add(DECODING_PLUGIN_NAME);
options.add(NAME);
return options;
}
}

@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.vitess.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.vitess.VitessSource;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import java.time.ZoneId;
import java.util.Objects;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link DynamicTableSource} that describes how to create a Vitess source from a logical
* description.
*/
public class VitessTableSource implements ScanTableSource {
private final ResolvedSchema physicalSchema;
private final String pluginName;
private final String name;
private final int port;
private final String hostname;
private final String keyspace;
private final String username;
private final String password;
private final String tableName;
private final VtctldConfig vtctldConfig;
private final TabletType tabletType;
private final Properties dbzProperties;
public VitessTableSource(
ResolvedSchema physicalSchema,
int port,
String hostname,
String keyspace,
String tableName,
String username,
String password,
VtctldConfig vtctldConfig,
TabletType tabletType,
String pluginName,
String name,
Properties dbzProperties) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
this.keyspace = checkNotNull(keyspace);
this.tableName = checkNotNull(tableName);
this.username = username;
this.password = password;
this.vtctldConfig = checkNotNull(vtctldConfig);
this.tabletType = checkNotNull(tabletType);
this.pluginName = checkNotNull(pluginName);
this.name = name;
this.dbzProperties = dbzProperties;
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toPhysicalRowDataType());
DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setResultTypeInfo(typeInfo)
.setServerTimeZone(ZoneId.of("UTC"))
.build();
DebeziumSourceFunction<RowData> sourceFunction =
VitessSource.<RowData>builder()
.hostname(hostname)
.port(port)
.keyspace(keyspace)
.tableIncludeList(tableName)
.username(username)
.password(password)
.tabletType(tabletType)
.decodingPluginName(pluginName)
.vtctldConfig(vtctldConfig)
.name(name)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
@Override
public DynamicTableSource copy() {
return new VitessTableSource(
physicalSchema,
port,
hostname,
keyspace,
tableName,
username,
password,
vtctldConfig,
tabletType,
pluginName,
name,
dbzProperties);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
VitessTableSource that = (VitessTableSource) o;
return port == that.port
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(pluginName, that.pluginName)
&& Objects.equals(name, that.name)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(keyspace, that.keyspace)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(vtctldConfig, that.vtctldConfig)
&& tabletType == that.tabletType
&& Objects.equals(dbzProperties, that.dbzProperties);
}
@Override
public int hashCode() {
return Objects.hash(
physicalSchema,
pluginName,
name,
port,
hostname,
keyspace,
username,
password,
tableName,
vtctldConfig,
tabletType,
dbzProperties);
}
@Override
public String toString() {
return "VitessTableSource{"
+ "physicalSchema="
+ physicalSchema
+ ", pluginName='"
+ pluginName
+ '\''
+ ", name='"
+ name
+ '\''
+ ", port="
+ port
+ ", hostname='"
+ hostname
+ '\''
+ ", keyspace='"
+ keyspace
+ '\''
+ ", username='"
+ username
+ '\''
+ ", password='"
+ password
+ '\''
+ ", tableName='"
+ tableName
+ '\''
+ ", vtctldConfig="
+ vtctldConfig
+ ", tabletType="
+ tabletType
+ ", dbzProperties="
+ dbzProperties
+ '}';
}
@Override
public String asSummaryString() {
return "Vitess-CDC";
}
}

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.ververica.cdc.connectors.vitess.table.VitessTableFactory

@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.vervetica.cdc.connectors.vitess;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.connectors.vitess.VitessSource;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertDelete;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertInsert;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertUpdate;
/** Tests for {@link VitessSource} which also heavily tests {@link DebeziumSourceFunction}. */
public class VitessSourceTest extends VitessTestBase {
@Before
public void before() {
initializeTable("inventory");
}
@Test
public void testConsumingAllEvents() throws Exception {
DebeziumSourceFunction<SourceRecord> source = createVitessSqlSource(0);
TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
setupSource(source);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
// start the source
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
waitForSourceToStart(Duration.ofSeconds(60), source);
List<SourceRecord> records;
statement.execute(
"INSERT INTO test.products VALUES (default,'robot','Toy robot',1.304)"); // 110
records = drain(sourceContext, 1);
assertInsert(records.get(0), "id", 101);
statement.execute(
"INSERT INTO test.products VALUES (1001,'roy','old robot',1234.56)"); // 1001
records = drain(sourceContext, 1);
assertInsert(records.get(0), "id", 1001);
// ---------------------------------------------------------------------------------------------------------------
// Changing the primary key of a row should result in 2 events: INSERT, DELETE
// (TOMBSTONE is dropped)
// ---------------------------------------------------------------------------------------------------------------
statement.execute(
"UPDATE test.products SET id=2001, description='really old robot' WHERE id=1001");
records = drain(sourceContext, 2);
assertDelete(records.get(0), "id", 1001);
assertInsert(records.get(1), "id", 2001);
// ---------------------------------------------------------------------------------------------------------------
// Simple UPDATE (with no schema changes)
// ---------------------------------------------------------------------------------------------------------------
statement.execute("UPDATE test.products SET weight=1345.67 WHERE id=2001");
records = drain(sourceContext, 1);
assertUpdate(records.get(0), "id", 2001);
// ---------------------------------------------------------------------------------------------------------------
// Change our schema with a fully-qualified name; we should still see this event
// ---------------------------------------------------------------------------------------------------------------
// Add a column with default to the 'products' table and explicitly update one record
// ...
statement.execute(
"ALTER TABLE test.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL");
// Vitess schema change has eventual consistency, wait few seconds.
Thread.sleep(5000);
statement.execute("UPDATE test.products SET volume=13.5 WHERE id=2001");
records = drain(sourceContext, 1);
assertUpdate(records.get(0), "id", 2001);
// cleanup
source.cancel();
source.close();
runThread.sync();
}
}
// ------------------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------------------
private DebeziumSourceFunction<SourceRecord> createVitessSqlSource(int heartbeatInterval) {
Properties properties = new Properties();
properties.setProperty("heartbeat.interval.ms", String.valueOf(heartbeatInterval));
return VitessSource.<SourceRecord>builder()
.hostname(VITESS_CONTAINER.getHost())
.port(VITESS_CONTAINER.getGrpcPort())
.keyspace(VITESS_CONTAINER.getKeyspace())
.tabletType(TabletType.MASTER)
.tableIncludeList("test.products")
.vtctldConfig(
VtctldConfig.builder()
.hostname(VITESS_CONTAINER.getHost())
.port(VITESS_CONTAINER.getVtctldGrpcPort())
.build())
.deserializer(new ForwardDeserializeSchema())
.debeziumProperties(properties)
.build();
}
private <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount)
throws Exception {
List<T> allRecords = new ArrayList<>();
LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs();
while (allRecords.size() < expectedRecordCount) {
StreamRecord<T> record = queue.poll(1000, TimeUnit.SECONDS);
if (record != null) {
allRecords.add(record.getValue());
} else {
throw new RuntimeException(
"Can't receive " + expectedRecordCount + " elements before timeout.");
}
}
return allRecords;
}
private boolean waitForSourceToStart(
Duration timeout, DebeziumSourceFunction<SourceRecord> source)
throws InterruptedException {
long now = System.currentTimeMillis();
long stop = now + timeout.toMillis();
while (System.currentTimeMillis() < stop) {
if (source.getDebeziumStarted()) {
break;
}
Thread.sleep(10); // save CPU
}
Thread.sleep(10000); // Wait for full start
return source.getDebeziumStarted();
}
private static <T> void setupSource(DebeziumSourceFunction<T> source) throws Exception {
setupSource(
source, false, null, null,
true, // enable checkpointing; auto commit should be ignored
0, 1);
}
private static <T, S1, S2> void setupSource(
DebeziumSourceFunction<T> source,
boolean isRestored,
ListState<S1> restoredOffsetState,
ListState<S2> restoredHistoryState,
boolean isCheckpointingEnabled,
int subtaskIndex,
int totalNumSubtasks)
throws Exception {
// run setup procedure in operator life cycle
source.setRuntimeContext(
new MockStreamingRuntimeContext(
isCheckpointingEnabled, totalNumSubtasks, subtaskIndex));
source.initializeState(
new MockFunctionInitializationContext(
isRestored,
new MockOperatorStateStore(restoredOffsetState, restoredHistoryState)));
source.open(new Configuration());
}
private static class ForwardDeserializeSchema
implements DebeziumDeserializationSchema<SourceRecord> {
private static final long serialVersionUID = 2975058057832211228L;
@Override
public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
out.collect(record);
}
@Override
public TypeInformation<SourceRecord> getProducedType() {
return TypeInformation.of(SourceRecord.class);
}
}
private static class MockOperatorStateStore implements OperatorStateStore {
private final ListState<?> restoredOffsetListState;
private final ListState<?> restoredHistoryListState;
private MockOperatorStateStore(
ListState<?> restoredOffsetListState, ListState<?> restoredHistoryListState) {
this.restoredOffsetListState = restoredOffsetListState;
this.restoredHistoryListState = restoredHistoryListState;
}
@Override
@SuppressWarnings("unchecked")
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor)
throws Exception {
if (stateDescriptor.getName().equals(DebeziumSourceFunction.OFFSETS_STATE_NAME)) {
return (ListState<S>) restoredOffsetListState;
} else if (stateDescriptor
.getName()
.equals(DebeziumSourceFunction.HISTORY_RECORDS_STATE_NAME)) {
return (ListState<S>) restoredHistoryListState;
} else {
throw new IllegalStateException("Unknown state.");
}
}
@Override
public <K, V> BroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor)
throws Exception {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getRegisteredStateNames() {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getRegisteredBroadcastStateNames() {
throw new UnsupportedOperationException();
}
}
private static class MockFunctionInitializationContext
implements FunctionInitializationContext {
private final boolean isRestored;
private final OperatorStateStore operatorStateStore;
private MockFunctionInitializationContext(
boolean isRestored, OperatorStateStore operatorStateStore) {
this.isRestored = isRestored;
this.operatorStateStore = operatorStateStore;
}
@Override
public boolean isRestored() {
return isRestored;
}
@Override
public OptionalLong getRestoredCheckpointId() {
throw new UnsupportedOperationException();
}
@Override
public OperatorStateStore getOperatorStateStore() {
return operatorStateStore;
}
@Override
public KeyedStateStore getKeyedStateStore() {
throw new UnsupportedOperationException();
}
}
}

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.vervetica.cdc.connectors.vitess;
import org.apache.flink.test.util.AbstractTestBase;
import com.vervetica.cdc.connectors.vitess.container.VitessContainer;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.GRPC_PORT;
import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.MYSQL_PORT;
import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.VTCTLD_GRPC_PORT;
import static org.junit.Assert.assertNotNull;
/** Basic class for testing Vitess source, this contains a Vitess container. */
public abstract class VitessTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(VitessTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
protected static final VitessContainer VITESS_CONTAINER =
(VitessContainer)
new VitessContainer()
.withKeyspace("test")
.withUsername("flinkuser")
.withPassword("flinkpwd")
.withExposedPorts(MYSQL_PORT, GRPC_PORT, VTCTLD_GRPC_PORT)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(VITESS_CONTAINER)).join();
LOG.info("Containers are started.");
}
public Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(VITESS_CONTAINER.getJdbcUrl());
}
/**
* Executes a JDBC statement using the default jdbc config without autocommitting the
* connection.
*/
protected void initializeTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile = VitessTestBase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.vervetica.cdc.connectors.vitess.container;
import org.testcontainers.containers.JdbcDatabaseContainer;
/** Vitess container. */
public class VitessContainer extends JdbcDatabaseContainer {
public static final String IMAGE = "vitess/vttestserver";
public static final String DEFAULT_TAG = "mysql80";
private static final Integer VITESS_PORT = 15991;
public static final Integer GRPC_PORT = VITESS_PORT + 1;
public static final Integer VTCTLD_GRPC_PORT = VITESS_PORT + 8;
public static final Integer MYSQL_PORT = VITESS_PORT + 3;
private String keyspaces = "test";
private String username = "flinkuser";
private String password = "flinkpwd";
public VitessContainer() {
this(DEFAULT_TAG);
}
public VitessContainer(String tag) {
super(IMAGE + ":" + tag);
}
@Override
protected void configure() {
addEnv("PORT", VITESS_PORT.toString());
addEnv("KEYSPACES", getKeyspace());
addEnv("NUM_SHARDS", "1");
addEnv("MYSQL_BIND_HOST", "0.0.0.0");
}
@Override
public String getDriverClassName() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
return "com.mysql.cj.jdbc.Driver";
} catch (ClassNotFoundException e) {
return "com.mysql.jdbc.Driver";
}
}
@Override
public String getJdbcUrl() {
return "jdbc:mysql://" + getHost() + ":" + getMysqlPort() + "/" + getKeyspace();
}
@Override
public String getUsername() {
return username;
}
@Override
public String getPassword() {
return password;
}
public String getKeyspace() {
return keyspaces;
}
public Integer getMysqlPort() {
return this.getMappedPort(MYSQL_PORT);
}
public Integer getGrpcPort() {
return this.getMappedPort(GRPC_PORT);
}
public Integer getVtctldGrpcPort() {
return this.getMappedPort(VTCTLD_GRPC_PORT);
}
@Override
protected String getTestQueryString() {
return "SELECT 1";
}
@Override
public VitessContainer withDatabaseName(final String keyspace) {
this.keyspaces = keyspace;
return this;
}
public VitessContainer withKeyspace(String keyspace) {
this.keyspaces = keyspace;
return this;
}
@Override
public VitessContainer withUsername(final String username) {
this.username = username;
return this;
}
@Override
public VitessContainer withPassword(final String password) {
this.password = password;
return this;
}
}

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.vervetica.cdc.connectors.vitess.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.vervetica.cdc.connectors.vitess.VitessTestBase;
import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Integration tests for MySQL binlog SQL source. */
public class VitessConnectorITCase extends VitessTestBase {
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(1);
}
@Test
public void testConsumingAllEvents()
throws SQLException, ExecutionException, InterruptedException {
initializeTable("inventory");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'vitess-cdc',"
+ " 'tablet-type' = 'MASTER',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'vtctl.hostname' = '%s',"
+ " 'vtctl.port' = '%s',"
+ " 'keyspace' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
VITESS_CONTAINER.getHost(),
VITESS_CONTAINER.getGrpcPort(),
VITESS_CONTAINER.getHost(),
VITESS_CONTAINER.getVtctldGrpcPort(),
VITESS_CONTAINER.getKeyspace(),
"test.products");
String sinkDDL =
"CREATE TABLE sink ("
+ " name STRING,"
+ " weightSum DECIMAL(10,3),"
+ " PRIMARY KEY (name) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result =
tEnv.executeSql(
"INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
// Vitess source doesn't read snapshot data. Source will be empty at first.
// There's no way knowing if it's started, using sleep here.
Thread.sleep(10000);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO test.products \n"
+ "VALUES (default,'scooter','Small 2-wheel scooter',3.14),\n"
+ " (default,'car battery','12V car battery',8.1),\n"
+ " (default,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),\n"
+ " (default,'hammer','12oz carpenters hammer',0.75),\n"
+ " (default,'hammer','14oz carpenters hammer',0.875),\n"
+ " (default,'hammer','16oz carpenters hammer',1.0),\n"
+ " (default,'rocks','box of assorted rocks',5.3),\n"
+ " (default,'jacket','water resistent black wind breaker',0.1),\n"
+ " (default,'spare tire','24 inch spare tire',22.2);");
statement.execute(
"UPDATE test.products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE test.products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO test.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO test.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE test.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE test.products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM test.products WHERE id=111;");
}
waitForSinkSize("sink", 20);
List<String> expected =
Arrays.asList(
"+I[scooter, 3.140]",
"+I[car battery, 8.100]",
"+I[12-pack drill bits, 0.800]",
"+I[hammer, 2.625]",
"+I[rocks, 5.100]",
"+I[jacket, 0.600]",
"+I[spare tire, 22.200]");
List<String> actual = TestValuesTableFactory.getResults("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@Test
public void testAllTypes() throws Throwable {
initializeTable("column_type_test");
String sourceDDL =
String.format(
"CREATE TABLE full_types (\n"
+ " `id` INT NOT NULL,\n"
+ " tiny_c TINYINT,\n"
+ " tiny_un_c SMALLINT ,\n"
+ " small_c SMALLINT,\n"
+ " small_un_c INT,\n"
+ " int_c INT ,\n"
+ " int_un_c BIGINT,\n"
+ " int11_c BIGINT,\n"
+ " big_c BIGINT,\n"
+ " varchar_c STRING,\n"
+ " char_c STRING,\n"
+ " float_c FLOAT,\n"
+ " double_c DOUBLE,\n"
+ " decimal_c DECIMAL(8, 4),\n"
+ " numeric_c DECIMAL(6, 0),\n"
+ " boolean_c BOOLEAN,\n"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'vitess-cdc',"
+ " 'tablet-type' = 'MASTER',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'vtctl.hostname' = '%s',"
+ " 'vtctl.port' = '%s',"
+ " 'keyspace' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
VITESS_CONTAINER.getHost(),
VITESS_CONTAINER.getGrpcPort(),
VITESS_CONTAINER.getHost(),
VITESS_CONTAINER.getVtctldGrpcPort(),
VITESS_CONTAINER.getKeyspace(),
"test.full_types");
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM full_types");
// Vitess source doesn't read snapshot data. Source will be empty at first.
// There's no way knowing if it's started, using sleep here.
Thread.sleep(10000);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO test.full_types VALUES (\n"
+ " DEFAULT, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807,\n"
+ " 'Hello World', 'abc', 123.102, 404.4443, 123.4567, 345.6, true);");
statement.execute("UPDATE test.full_types SET varchar_c = 'Bye World' WHERE id=1;");
}
waitForSnapshotStarted(result.collect());
List<String> expected =
Arrays.asList(
"+I[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true]",
"-U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true]",
"+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Bye World, abc, 123.102, 404.4443, 123.4567, 346, true]");
List<String> actual = fetchRows(result.collect(), expected.size());
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
public static void assertEqualsInAnyOrder(List<String> actual, List<String> expected) {
assertTrue(actual != null && expected != null);
assertEquals(
actual.stream().sorted().collect(Collectors.toList()),
expected.stream().sorted().collect(Collectors.toList()));
}
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);
}
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.vervetica.cdc.connectors.vitess.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.vitess.config.TabletType;
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
import com.ververica.cdc.connectors.vitess.table.VitessTableFactory;
import com.ververica.cdc.connectors.vitess.table.VitessTableSource;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Test for {@link VitessTableSource} created by {@link VitessTableFactory}. */
public class VitessTableFactoryTest {
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
Arrays.asList(
Column.physical("aaa", DataTypes.INT().notNull()),
Column.physical("bbb", DataTypes.STRING().notNull()),
Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3))),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final String MY_SCHEMA = "public";
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
private static final String MY_KEYSPACE = "myDB";
private static final String MY_TABLE = "myTable";
private static final Properties PROPERTIES = new Properties();
@Test
public void testCommonProperties() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
VitessTableSource expectedSource =
new VitessTableSource(
SCHEMA,
15991,
MY_LOCALHOST,
MY_KEYSPACE,
MY_TABLE,
null,
null,
VtctldConfig.builder().hostname(MY_LOCALHOST).port(15999).build(),
TabletType.RDONLY,
"decoderbufs",
"flink",
PROPERTIES);
assertEquals(expectedSource, actualSource);
}
@Test
public void testOptionalProperties() {
Map<String, String> options = getAllOptions();
options.put("port", "5444");
options.put("vtctl.port", "5445");
options.put("decoding.plugin.name", "wal2json");
options.put("debezium.snapshot.mode", "never");
options.put("name", "flink");
options.put("tablet-type", "MASTER");
options.put("username", MY_USERNAME);
options.put("password", MY_PASSWORD);
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
dbzProperties.put("snapshot.mode", "never");
VitessTableSource expectedSource =
new VitessTableSource(
SCHEMA,
5444,
MY_LOCALHOST,
MY_KEYSPACE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
VtctldConfig.builder().hostname(MY_LOCALHOST).port(5445).build(),
TabletType.MASTER,
"wal2json",
"flink",
dbzProperties);
assertEquals(expectedSource, actualSource);
}
@Test
public void testValidation() {
// validate illegal port
try {
Map<String, String> properties = getAllOptions();
properties.put("port", "123b");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t, "Could not parse value '123b' for key 'port'.")
.isPresent());
}
// validate missing required
Factory factory = new VitessTableFactory();
for (ConfigOption<?> requiredOption : factory.requiredOptions()) {
Map<String, String> properties = getAllOptions();
properties.remove(requiredOption.key());
try {
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"Missing required options are:\n\n" + requiredOption.key())
.isPresent());
}
}
// validate unsupported option
try {
Map<String, String> properties = getAllOptions();
properties.put("unknown", "abc");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(t, "Unsupported options:\n\nunknown")
.isPresent());
}
}
private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "vitess-cdc");
options.put("hostname", MY_LOCALHOST);
options.put("keyspace", MY_KEYSPACE);
options.put("vtctl.hostname", MY_LOCALHOST);
options.put("table-name", MY_TABLE);
return options;
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(),
"mock source",
new ArrayList<>(),
options),
SCHEMA),
new Configuration(),
VitessTableFactoryTest.class.getClassLoader(),
false);
}
}

@ -0,0 +1,44 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
USE test;
DROP TABLE IF EXISTS full_types;
-- TODO add DATE, DATETIME, TIMESTAMP, TIME type mapping
CREATE TABLE full_types (
id INT AUTO_INCREMENT NOT NULL,
tiny_c TINYINT,
tiny_un_c TINYINT UNSIGNED,
small_c SMALLINT,
small_un_c SMALLINT UNSIGNED,
int_c INTEGER ,
int_un_c INTEGER UNSIGNED,
int11_c INT(11) ,
big_c BIGINT,
varchar_c VARCHAR(255),
char_c CHAR(3),
float_c FLOAT,
double_c DOUBLE,
decimal_c DECIMAL(8, 4),
numeric_c NUMERIC(6, 0),
boolean_c BOOLEAN,
-- date_c DATE,
-- time_c TIME(0),
-- datetime3_c DATETIME(3),
-- datetime6_c DATETIME(6),
-- timestamp_c TIMESTAMP,
-- file_uuid BINARY(16),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

@ -0,0 +1,24 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
USE test;
DROP TABLE IF EXISTS products;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;

@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n

@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.ververica</groupId>
<version>2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-sql-connector-vitess-cdc</artifactId>
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-vitess-cdc</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>io.grpc</pattern>
<shadedPattern>com.ververica.cdc.connectors.vitess.shaded.io.grpc</shadedPattern>
</relocation>
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>com.ververica.cdc.connectors.vitess.shaded.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>com.ververica.cdc.connectors.vitess.shaded.com.google</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<artifact>org.apache.kafka:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
<exclude>LICENSE</exclude>
<!-- Does not contain anything relevant.
Cites a binary dependency on jersey, but this is neither reflected in the
dependency graph, nor are any jersey files bundled. -->
<exclude>NOTICE</exclude>
<exclude>common/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.vitess;
/** This is used to generate a dummy docs jar for this module to pass OSS repository rule. */
public class DummyDocs {}

@ -471,6 +471,7 @@ under the License.
<module>flink-connector-sqlserver-cdc</module>
<module>flink-connector-tidb-cdc</module>
<module>flink-connector-db2-cdc</module>
<module>flink-connector-vitess-cdc</module>
<module>flink-sql-connector-mysql-cdc</module>
<module>flink-sql-connector-postgres-cdc</module>
<module>flink-sql-connector-mongodb-cdc</module>
@ -479,6 +480,7 @@ under the License.
<module>flink-sql-connector-sqlserver-cdc</module>
<module>flink-sql-connector-tidb-cdc</module>
<module>flink-sql-connector-db2-cdc</module>
<module>flink-sql-connector-vitess-cdc</module>
<module>flink-cdc-e2e-tests</module>
</modules>
</profile>

Loading…
Cancel
Save