The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to set up the OceanBase CDC connector to run SQL queries against OceanBase.
In order to set up the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
<!-- The dependency is available only for stable releases, SNAPSHOT dependencies need to be built based on master or release- branches by yourself. -->
If you want to use OceanBase JDBC driver to connect to the enterprise edition database, you should also include the following dependency in your class path.
Download [flink-sql-connector-oceanbase-cdc-2.5-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.5-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.5-SNAPSHOT.jar) and put it under `<FLINK_HOME>/lib/`.
**Note:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-oceanbase-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc), the released version will be available in the Maven central warehouse.
For JDBC driver, the cdc jar above already contains MySQL JDBC driver 5.1.47, which is our recommended version. Due to the license issue, we can not include the OceanBase JDBC driver in the cdc jar. If you need to use it, you can download it from [here](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.2/oceanbase-client-2.4.2.jar) and put it under `<FLINK_HOME>/lib/`, you also need to set the start option `jdbc.driver` to `com.oceanbase.jdbc.Driver`.
5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the [quick start](https://github.com/oceanbase/oblogproxy#getting-started).
If you want to use OceanBase Oracle mode, you need to add the OceanBase jdbc jar file to Flink and set up the enterprise edition of oblogproxy, then you can create a table in Flink as following:
You can also try the quickstart tutorial that sync data from OceanBase to Elasticsearch, please refer [Flink CDC Tutorial](https://ververica.github.io/flink-cdc-connectors/release-2.3//content/quickstart/oceanbase-tutorial.html) for more information.
The OceanBase CDC Connector contains some options for both sql and stream api as the following sheet.
*Note*: The connector supports two ways to specify the table list to listen to, and will get the union of the results when both way are used at the same time.
1. Use `database-name` and `table-name` to match database and table names in regex. As the `obcdc` (former `liboblog`) only supports `fnmatch` now, we can't use regex directly to filter change events, so these two options can only be used in `initial` startup mode.
2. Use `table-list` to match the exact value of database and table names.
<td>Integer port number to connect to OceanBase. It can be the SQL port of OceanBase server, which is 2881 by default, or the port of OceanBase proxy service, which is 2883 by default.</td>
</tr>
<tr>
<td>connect.timeout</td>
<td>optional</td>
<tdstyle="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>The maximum time that the connector should wait after trying to connect to the OceanBase database server before timing out.</td>
<td>The session timezone which controls how temporal types are converted to STRING in OceanBase. Can be UTC offset in format "±hh:mm", or named time zones if the time zone information tables in the mysql database have been created and populated.</td>
The OceanBase CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with **at-least-once processing**.
OceanBase is a kind of distributed database whose log files are distributed on different servers. As there is no position information like MySQL binlog offset, we can only use timestamp as the position mark. In order to ensure the completeness of reading data, `liboblog` (a C++ library to read OceanBase log record) might read some log data before the given timestamp. So in this way we may read duplicate data whose timestamp is around the start point, and only 'at-least-once' can be guaranteed.
### Startup Reading Position
The config option `scan.startup.mode` specifies the startup mode for OceanBase CDC consumer. The valid enumerations are:
-`initial`: Performs an initial snapshot on the monitored table upon first startup, and continue to read the latest commit log.
-`latest-offset`: Never to perform snapshot on the monitored table upon first startup and just read the latest commit log since the connector is started.
-`timestamp`: Never to perform snapshot on the monitored table upon first startup and just read the commit log from the given `scan.startup.timestamp`.
### Consume Commit Log
The OceanBase CDC Connector using [oblogclient](https://github.com/oceanbase/oblogclient) to consume commit log from OceanBase LogProxy.
### DataStream Source
The OceanBase CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows: