[FLINK-35638][OceanBase][test] Refactor OceanBase test cases and remove dependency on host network ()

pull/3551/head
He Wang committed by GitHub
parent c5396fbf29
commit cbb33bb870
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -42,16 +42,23 @@ under the License.
version: '2.1'
services:
observer:
image: oceanbase/oceanbase-ce:4.0.0.0
image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515'
container_name: observer
network_mode: "host"
environment:
- 'MODE=mini'
- 'OB_SYS_PASSWORD=123456'
- 'OB_TENANT_PASSWORD=654321'
ports:
- '2881:2881'
- '2882:2882'
oblogproxy:
image: whhe/oblogproxy:1.1.0_4x
image: 'oceanbase/oblogproxy-ce:latest'
container_name: oblogproxy
environment:
- 'OB_SYS_USERNAME=root'
- 'OB_SYS_PASSWORD=pswd'
network_mode: "host"
- 'OB_SYS_PASSWORD=123456'
ports:
- '2983:2983'
elasticsearch:
image: 'elastic/elasticsearch:7.6.0'
container_name: elasticsearch
@ -85,42 +92,26 @@ services:
docker-compose up -d
```
### 设置密码
OceanBase 中 root 用户默认是没有密码的,但是 oblogproxy 需要配置一个使用非空密码的系统租户用户,因此这里我们需要先为 root@sys 用户设置一个密码。
### 查询 Root Service List
登陆 sys 租户的 root 用户:
```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys
```
设置密码,注意这里的密码需要与上一步中 oblogproxy 服务的环境变量 'OB_SYS_PASSWORD' 保持一样。
```mysql
ALTER USER root IDENTIFIED BY 'pswd';
```
OceanBase 从社区版 4.0.0.0 开始只支持对非 sys 租户的增量数据拉取,这里我们使用 test 租户的 root 用户作为示例。
登陆 test 租户的 root 用户:
```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456
```
设置密码:
执行以下 sql 以查询 root service list将 VALUE 列的值保存下来。
```mysql
ALTER USER root IDENTIFIED BY 'test';
SHOW PARAMETERS LIKE 'rootservice_list';
```
### 准备数据
使用 'root@test' 用户登陆。
使用测试用的 test 租户的 root 用户登陆。
```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321
```
```sql
@ -169,6 +160,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
### 在 Flink SQL CLI 中使用 Flink DDL 创建表
注意在 OceanBase 源表的 SQL 中替换 root_service_list 为真实值。
```sql
-- 设置间隔时间为3秒
Flink SQL> SET execution.checkpointing.interval = 3s;
@ -189,13 +182,13 @@ Flink SQL> CREATE TABLE orders (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
'password' = 'test',
'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^orders$',
'hostname' = 'localhost',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
@ -211,13 +204,13 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
'password' = 'test',
'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^products$',
'hostname' = 'localhost',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'

@ -37,25 +37,27 @@ under the License.
Create `docker-compose.yml`.
*Note*: `host` network mode is required in this demo, so it can only work on Linux, see [network-tutorial-host](https://docs.docker.com/network/network-tutorial-host/).
```yaml
version: '2.1'
services:
observer:
image: oceanbase/oceanbase-ce:4.2.0.0
image: 'oceanbase/oceanbase-ce:4.2.1.6-106000012024042515'
container_name: observer
environment:
- 'MODE=slim'
- 'OB_ROOT_PASSWORD=pswd'
network_mode: "host"
- 'MODE=mini'
- 'OB_SYS_PASSWORD=123456'
- 'OB_TENANT_PASSWORD=654321'
ports:
- '2881:2881'
- '2882:2882'
oblogproxy:
image: whhe/oblogproxy:1.1.3_4x
image: 'oceanbase/oblogproxy-ce:latest'
container_name: oblogproxy
environment:
- 'OB_SYS_USERNAME=root'
- 'OB_SYS_PASSWORD=pswd'
network_mode: "host"
- 'OB_SYS_PASSWORD=123456'
ports:
- '2983:2983'
elasticsearch:
image: 'elastic/elasticsearch:7.6.0'
container_name: elasticsearch
@ -89,22 +91,18 @@ Execute the following command in the directory where `docker-compose.yml` is loc
docker-compose up -d
```
### Set password
### Query Root Service List
From OceanBase 4.0.0.0 CE, we can only fetch the commit log of non-sys tenant.
Here we use the 'test' tenant for example.
Login with 'root' user of 'test' tenant:
Login with 'root' user of 'sys' tenant:
```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@sys -p123456
```
Set a password:
Query the root service list by following SQL and store the value.
```mysql
ALTER USER root IDENTIFIED BY 'test';
SHOW PARAMETERS LIKE 'rootservice_list';
```
### Create data for reading snapshot
@ -112,7 +110,7 @@ ALTER USER root IDENTIFIED BY 'test';
Login 'root' user of 'test' tenant.
```shell
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -ptest
docker-compose exec observer obclient -h127.0.0.1 -P2881 -uroot@test -p654321
```
Insert data:
@ -163,6 +161,8 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
### Use Flink DDL to create dynamic table in Flink SQL CLI
Note that in the SQL of the OceanBase source table, replace root_service_list with the actual value.
```sql
-- checkpoint every 3000 milliseconds
Flink SQL> SET execution.checkpointing.interval = 3s;
@ -183,13 +183,13 @@ Flink SQL> CREATE TABLE orders (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
'password' = 'test',
'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^orders$',
'hostname' = 'localhost',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'
@ -205,13 +205,13 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'root@test',
'password' = 'test',
'password' = '654321',
'tenant-name' = 'test',
'database-name' = '^ob$',
'table-name' = '^products$',
'hostname' = 'localhost',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'rootserver-list' = '${root_service_list}',
'logproxy.host' = 'localhost',
'logproxy.port' = '2983',
'working-mode' = 'memory'

@ -163,4 +163,21 @@ limitations under the License.
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -17,20 +17,18 @@
package org.apache.flink.cdc.connectors.oceanbase;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.ClassRule;
import org.junit.Rule;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
@ -43,51 +41,13 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** Basic class for testing OceanBase source. */
public abstract class OceanBaseTestBase extends TestLogger {
public abstract class OceanBaseTestBase extends AbstractTestBase {
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
protected static final int DEFAULT_PARALLELISM = 4;
@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
protected final String compatibleMode;
protected final String username;
protected final String password;
protected final String hostname;
protected final int port;
protected final String logProxyHost;
protected final int logProxyPort;
protected final String tenant;
public OceanBaseTestBase(
String compatibleMode,
String username,
String password,
String hostname,
int port,
String logProxyHost,
int logProxyPort,
String tenant) {
this.compatibleMode = compatibleMode;
this.username = username;
this.password = password;
this.hostname = hostname;
this.port = port;
this.logProxyHost = logProxyHost;
this.logProxyPort = logProxyPort;
this.tenant = tenant;
}
protected abstract OceanBaseCdcMetadata metadata();
protected String commonOptionsString() {
return String.format(
@ -96,8 +56,14 @@ public abstract class OceanBaseTestBase extends TestLogger {
+ " 'password' = '%s', "
+ " 'hostname' = '%s', "
+ " 'port' = '%s', "
+ " 'compatible-mode' = '%s'",
username, password, hostname, port, compatibleMode);
+ " 'compatible-mode' = '%s', "
+ " 'jdbc.driver' = '%s'",
metadata().getUsername(),
metadata().getPassword(),
metadata().getHostname(),
metadata().getPort(),
metadata().getCompatibleMode(),
metadata().getDriverClass());
}
protected String logProxyOptionsString() {
@ -106,7 +72,9 @@ public abstract class OceanBaseTestBase extends TestLogger {
+ " 'tenant-name' = '%s',"
+ " 'logproxy.host' = '%s',"
+ " 'logproxy.port' = '%s'",
tenant, logProxyHost, logProxyPort);
metadata().getTenantName(),
metadata().getLogProxyHost(),
metadata().getLogProxyPort());
}
protected String initialOptionsString() {
@ -120,7 +88,10 @@ public abstract class OceanBaseTestBase extends TestLogger {
return " 'scan.startup.mode' = 'snapshot', " + commonOptionsString();
}
protected abstract Connection getJdbcConnection() throws SQLException;
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
metadata().getJdbcUrl(), metadata().getUsername(), metadata().getPassword());
}
protected void setGlobalTimeZone(String serverTimeZone) throws SQLException {
try (Connection connection = getJdbcConnection();
@ -130,7 +101,8 @@ public abstract class OceanBaseTestBase extends TestLogger {
}
protected void initializeTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s/%s.sql", compatibleMode, sqlFile);
final String ddlFile =
String.format("ddl/%s/%s.sql", metadata().getCompatibleMode(), sqlFile);
final URL ddlTestFile = getClass().getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection();

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oceanbase;
import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import java.time.Duration;
/** Utils to help test. */
@SuppressWarnings("resource")
public class OceanBaseTestUtils {
private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestUtils.class);
private static final String LATEST_VERSION = "latest";
private static final String CDC_TEST_OB_VERSION = "4.2.1.6-106000012024042515";
private static final String SYS_PASSWORD = "123456";
private static final String TEST_PASSWORD = "654321";
public static OceanBaseContainer createOceanBaseContainerForCDC() {
return createOceanBaseContainer(CDC_TEST_OB_VERSION, "mini")
.withSysPassword(SYS_PASSWORD)
.withStartupTimeout(Duration.ofMinutes(4));
}
public static OceanBaseContainer createOceanBaseContainerForJdbc() {
return createOceanBaseContainer(LATEST_VERSION, "slim")
.withStartupTimeout(Duration.ofMinutes(2));
}
public static OceanBaseContainer createOceanBaseContainer(String version, String mode) {
return new OceanBaseContainer(version)
.withMode(mode)
.withTenantPassword(TEST_PASSWORD)
.withEnv("OB_DATAFILE_SIZE", "2G")
.withEnv("OB_LOG_DISK_SIZE", "4G")
.withLogConsumer(new Slf4jLogConsumer(LOG));
}
public static LogProxyContainer createLogProxyContainer() {
return new LogProxyContainer(LATEST_VERSION)
.withSysPassword(SYS_PASSWORD)
.withStartupTimeout(Duration.ofMinutes(1))
.withLogConsumer(new Slf4jLogConsumer(LOG));
}
}

@ -18,89 +18,55 @@
package org.apache.flink.cdc.connectors.oceanbase.table;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase;
import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;
import org.testcontainers.containers.Network;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer;
import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC;
/** Integration tests for OceanBase MySQL mode table source. */
@RunWith(Parameterized.class)
public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
private static final Logger LOG = LoggerFactory.getLogger(OceanBaseMySQLModeITCase.class);
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());
private static final String NETWORK_MODE = "host";
private static final String OB_SYS_PASSWORD = "123456";
@ClassRule public static final Network NETWORK = Network.newNetwork();
@ClassRule
public static final GenericContainer<?> OB_SERVER =
new GenericContainer<>("oceanbase/oceanbase-ce:4.2.0.0")
.withNetworkMode(NETWORK_MODE)
.withEnv("MODE", "slim")
.withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD)
.withEnv("OB_DATAFILE_SIZE", "1G")
.withEnv("OB_LOG_DISK_SIZE", "4G")
.withCopyFileToContainer(
MountableFile.forClasspathResource("ddl/mysql/docker_init.sql"),
"/root/boot/init.d/init.sql")
.waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
.withStartupTimeout(Duration.ofMinutes(4))
.withLogConsumer(new Slf4jLogConsumer(LOG));
public static final OceanBaseContainer OB_SERVER =
createOceanBaseContainerForCDC().withNetwork(NETWORK);
@ClassRule
public static final GenericContainer<?> LOG_PROXY =
new GenericContainer<>("whhe/oblogproxy:1.1.3_4x")
.withNetworkMode(NETWORK_MODE)
.withEnv("OB_SYS_PASSWORD", OB_SYS_PASSWORD)
.waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
.withStartupTimeout(Duration.ofMinutes(1))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(OB_SERVER, LOG_PROXY)).join();
LOG.info("Containers are started.");
}
public static final LogProxyContainer LOG_PROXY =
createLogProxyContainer().withNetwork(NETWORK);
private static final OceanBaseCdcMetadata METADATA =
new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY);
@AfterClass
public static void stopContainers() {
LOG.info("Stopping containers...");
Stream.of(OB_SERVER, LOG_PROXY).forEach(GenericContainer::stop);
LOG.info("Containers are stopped.");
@Override
protected OceanBaseCdcMetadata metadata() {
return METADATA;
}
@Before
@ -110,47 +76,11 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
}
private final String rsList;
public OceanBaseMySQLModeITCase(
String username,
String password,
String hostname,
int port,
String logProxyHost,
int logProxyPort,
String tenant,
String rsList) {
super("mysql", username, password, hostname, port, logProxyHost, logProxyPort, tenant);
this.rsList = rsList;
}
@Parameterized.Parameters
public static List<Object[]> parameters() {
return Collections.singletonList(
new Object[] {
"root@test",
"123456",
"127.0.0.1",
2881,
"127.0.0.1",
2983,
"test",
"127.0.0.1:2882:2881"
});
}
@Override
protected String logProxyOptionsString() {
return super.logProxyOptionsString()
+ " , "
+ String.format(" 'rootserver-list' = '%s'", rsList);
}
@Override
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
"jdbc:mysql://" + hostname + ":" + port + "/?useSSL=false", username, password);
+ String.format(" 'rootserver-list' = '%s'", METADATA.getRsList());
}
@Test
@ -312,6 +242,8 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
waitForSinkSize("sink", snapshotSize + 1);
String tenant = metadata().getTenantName();
List<String> expected =
Arrays.asList(
"+I("

@ -18,6 +18,8 @@
package org.apache.flink.cdc.connectors.oceanbase.table;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseOracleCdcMetadata;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
@ -26,20 +28,14 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/** Integration tests for OceanBase Oracle mode table source. */
@Ignore("Test ignored before oceanbase-xe docker image is available")
@RunWith(Parameterized.class)
public class OceanBaseOracleModeITCase extends OceanBaseTestBase {
private final StreamExecutionEnvironment env =
@ -48,61 +44,25 @@ public class OceanBaseOracleModeITCase extends OceanBaseTestBase {
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());
private final String schema;
private final String configUrl;
public OceanBaseOracleModeITCase(
String username,
String password,
String hostname,
int port,
String logProxyHost,
int logProxyPort,
String tenant,
String schema,
String configUrl) {
super("oracle", username, password, hostname, port, logProxyHost, logProxyPort, tenant);
this.schema = schema;
this.configUrl = configUrl;
}
@Parameterized.Parameters
public static List<Object[]> parameters() {
return Collections.singletonList(
new Object[] {
"SYS@test",
"123456",
"127.0.0.1",
2881,
"127.0.0.1",
2983,
"test",
"SYS",
"http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster"
});
}
private static final OceanBaseCdcMetadata METADATA = new OceanBaseOracleCdcMetadata();
@Override
protected String commonOptionsString() {
return super.commonOptionsString() + " , " + " 'jdbc.driver' = 'com.oceanbase.jdbc.Driver'";
protected OceanBaseCdcMetadata metadata() {
return METADATA;
}
@Override
protected String logProxyOptionsString() {
return super.logProxyOptionsString()
+ " , "
+ String.format(" 'config-url' = '%s'", configUrl);
}
@Override
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
"jdbc:oceanbase://" + hostname + ":" + port + "/" + schema, username, password);
+ String.format(" 'config-url' = '%s'", METADATA.getConfigUrl());
}
@Test
public void testAllDataTypes() throws Exception {
initializeTable("column_type_test");
String schema = metadata().getDatabase();
String sourceDDL =
String.format(
"CREATE TABLE full_types ("

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oceanbase.testutils;
import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
import java.util.Collections;
import java.util.Set;
/** OceanBase Log Proxy container. */
public class LogProxyContainer extends GenericContainer<LogProxyContainer> {
private static final String IMAGE = "oceanbase/oblogproxy-ce";
private static final int PORT = 2983;
private static final String ROOT_USER = "root";
private String sysPassword;
public LogProxyContainer(String version) {
super(DockerImageName.parse(IMAGE + ":" + version));
addExposedPorts(PORT);
setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1));
}
@Override
protected void configure() {
addEnv("OB_SYS_USERNAME", ROOT_USER);
addEnv("OB_SYS_PASSWORD", sysPassword);
}
public @NotNull Set<Integer> getLivenessCheckPortNumbers() {
return Collections.singleton(this.getMappedPort(PORT));
}
public int getPort() {
return getMappedPort(PORT);
}
public LogProxyContainer withSysPassword(String sysPassword) {
this.sysPassword = sysPassword;
return this;
}
}

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oceanbase.testutils;
import java.io.Serializable;
/** OceanBase CDC metadata. */
public interface OceanBaseCdcMetadata extends Serializable {
String getCompatibleMode();
String getHostname();
int getPort();
String getUsername();
String getPassword();
String getDriverClass();
String getDatabase();
String getJdbcUrl();
String getTenantName();
String getLogProxyHost();
int getLogProxyPort();
default String getConfigUrl() {
return null;
}
default String getRsList() {
return null;
}
}

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oceanbase.testutils;
import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
import java.util.Collections;
import java.util.Set;
/** OceanBase container. */
public class OceanBaseContainer extends JdbcDatabaseContainer<OceanBaseContainer> {
private static final String IMAGE = "oceanbase/oceanbase-ce";
private static final int SQL_PORT = 2881;
private static final int RPC_PORT = 2882;
private static final String ROOT_USER = "root";
private static final String TEST_DATABASE = "test";
private static final String DEFAULT_TENANT = "test";
private static final String DEFAULT_PASSWORD = "";
private String mode = "mini";
private String tenantName = DEFAULT_TENANT;
private String sysPassword = DEFAULT_PASSWORD;
private String tenantPassword = DEFAULT_PASSWORD;
public OceanBaseContainer(String version) {
super(DockerImageName.parse(IMAGE + ":" + version));
addExposedPorts(SQL_PORT, RPC_PORT);
setWaitStrategy(Wait.forLogMessage(".*boot success!.*", 1));
}
@Override
protected void configure() {
addEnv("MODE", mode);
addEnv("OB_CLUSTER_NAME", "flink-cdc-ci");
if (!DEFAULT_PASSWORD.equals(sysPassword)) {
addEnv("OB_SYS_PASSWORD", sysPassword);
}
if (!DEFAULT_TENANT.equals(tenantName)) {
addEnv("OB_TENANT_NAME", tenantName);
}
if (!DEFAULT_PASSWORD.equals(tenantPassword)) {
addEnv("OB_TENANT_PASSWORD", tenantPassword);
}
}
protected void waitUntilContainerStarted() {
this.getWaitStrategy().waitUntilReady(this);
}
public @NotNull Set<Integer> getLivenessCheckPortNumbers() {
return Collections.singleton(this.getMappedPort(SQL_PORT));
}
@Override
public String getDriverClassName() {
return "com.mysql.cj.jdbc.Driver";
}
public String getJdbcUrl(String databaseName) {
return "jdbc:mysql://"
+ getHost()
+ ":"
+ getDatabasePort()
+ "/"
+ databaseName
+ "?useSSL=false";
}
@Override
public String getJdbcUrl() {
return getJdbcUrl("");
}
public int getDatabasePort() {
return getMappedPort(SQL_PORT);
}
public String getTenantName() {
return tenantName;
}
@Override
public String getDatabaseName() {
return TEST_DATABASE;
}
@Override
public String getUsername() {
return ROOT_USER + "@" + tenantName;
}
@Override
public String getPassword() {
return tenantPassword;
}
@Override
protected String getTestQueryString() {
return "SELECT 1";
}
public OceanBaseContainer withMode(String mode) {
this.mode = mode;
return this;
}
public OceanBaseContainer withTenantName(String tenantName) {
this.tenantName = tenantName;
return this;
}
public OceanBaseContainer withSysPassword(String sysPassword) {
this.sysPassword = sysPassword;
return this;
}
public OceanBaseContainer withTenantPassword(String tenantPassword) {
this.tenantPassword = tenantPassword;
return this;
}
}

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oceanbase.testutils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
/** OceanBase CDC MySQL mode metadata. */
public class OceanBaseMySQLCdcMetadata implements OceanBaseCdcMetadata {
private final OceanBaseContainer obServerContainer;
private final LogProxyContainer logProxyContainer;
private String rsList;
public OceanBaseMySQLCdcMetadata(
OceanBaseContainer obServerContainer, LogProxyContainer logProxyContainer) {
this.obServerContainer = obServerContainer;
this.logProxyContainer = logProxyContainer;
}
@Override
public String getCompatibleMode() {
return "mysql";
}
@Override
public String getHostname() {
return obServerContainer.getHost();
}
@Override
public int getPort() {
return obServerContainer.getDatabasePort();
}
@Override
public String getUsername() {
return obServerContainer.getUsername();
}
@Override
public String getPassword() {
return obServerContainer.getPassword();
}
@Override
public String getDriverClass() {
return obServerContainer.getDriverClassName();
}
@Override
public String getDatabase() {
return obServerContainer.getDatabaseName();
}
@Override
public String getJdbcUrl() {
return "jdbc:mysql://" + getHostname() + ":" + getPort() + "/?useSSL=false";
}
@Override
public String getTenantName() {
return obServerContainer.getTenantName();
}
@Override
public String getLogProxyHost() {
return logProxyContainer.getHost();
}
@Override
public int getLogProxyPort() {
return logProxyContainer.getPort();
}
@Override
public String getRsList() {
if (rsList == null) {
try (Connection connection =
DriverManager.getConnection(
getJdbcUrl(), getUsername(), getPassword());
Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE 'rootservice_list'");
rsList = rs.next() ? rs.getString("VALUE") : null;
} catch (SQLException e) {
throw new RuntimeException("Failed to query rs list", e);
}
if (rsList == null) {
throw new RuntimeException("Got empty rs list");
}
}
return rsList;
}
}

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oceanbase.testutils;
/** OceanBase CDC Oracle mode metadata. */
public class OceanBaseOracleCdcMetadata implements OceanBaseCdcMetadata {
@Override
public String getCompatibleMode() {
return "oracle";
}
@Override
public String getHostname() {
return System.getenv("host");
}
@Override
public int getPort() {
return Integer.parseInt(System.getenv("port"));
}
@Override
public String getUsername() {
return System.getenv("username");
}
@Override
public String getPassword() {
return System.getenv("password");
}
@Override
public String getDatabase() {
return System.getenv("schema");
}
@Override
public String getDriverClass() {
return "com.oceanbase.jdbc.Driver";
}
@Override
public String getJdbcUrl() {
return "jdbc:oceanbase://" + getHostname() + ":" + getPort() + "/" + getDatabase();
}
@Override
public String getTenantName() {
return System.getenv("tenant");
}
@Override
public String getLogProxyHost() {
return System.getenv("log_proxy_host");
}
@Override
public int getLogProxyPort() {
return Integer.parseInt(System.getenv("log_proxy_port"));
}
@Override
public String getConfigUrl() {
return System.getenv("config_url");
}
}

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oceanbase.testutils;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertNotNull;
/**
* Create and populate a unique instance of an OceanBase database for each run of JUnit test. A user
* of class needs to provide a logical name for Debezium and database name. It is expected that
* there is an init file in <code>src/test/resources/ddl/&lt;database_name&gt;.sql</code>. The
* database name is enriched with a unique suffix that guarantees complete isolation between runs
* <code>
* &lt;database_name&gt_&lt;suffix&gt</code>
*
* <p>This class is inspired from Debezium project.
*/
public class UniqueDatabase {
private static final String[] CREATE_DATABASE_DDL =
new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"};
private static final String DROP_DATABASE_DDL = "DROP DATABASE IF EXISTS `$DBNAME$`;";
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private final OceanBaseContainer container;
private final String databaseName;
private final String templateName;
public UniqueDatabase(OceanBaseContainer container, String databaseName) {
this(container, databaseName, Integer.toUnsignedString(new Random().nextInt(), 36));
}
private UniqueDatabase(
OceanBaseContainer container, String databaseName, final String identifier) {
this.container = container;
this.databaseName = databaseName + "_" + identifier;
this.templateName = databaseName;
}
public String getHost() {
return container.getHost();
}
public int getDatabasePort() {
return container.getDatabasePort();
}
public String getDatabaseName() {
return databaseName;
}
public String getUsername() {
return container.getUsername();
}
public String getPassword() {
return container.getPassword();
}
/** @return Fully qualified table name <code>&lt;databaseName&gt;.&lt;tableName&gt;</code> */
public String qualifiedTableName(final String tableName) {
return String.format("%s.%s", databaseName, tableName);
}
/** Creates the database and populates it with initialization SQL script. */
public void createAndInitialize() {
final String ddlFile = String.format("ddl/%s.sql", templateName);
final URL ddlTestFile = UniqueDatabase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try {
try (Connection connection =
DriverManager.getConnection(
container.getJdbcUrl(), getUsername(), getPassword());
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(
Stream.concat(
Arrays.stream(CREATE_DATABASE_DDL),
Files.readAllLines(
Paths.get(ddlTestFile.toURI()))
.stream())
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.map(this::convertSQL)
.collect(Collectors.joining("\n"))
.split(";"))
.map(x -> x.replace("$$", ";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
}
} catch (final Exception e) {
throw new IllegalStateException(e);
}
}
/** Drop the database if it is existing. */
public void dropDatabase() {
try {
try (Connection connection =
DriverManager.getConnection(
container.getJdbcUrl(), getUsername(), getPassword());
Statement statement = connection.createStatement()) {
final String dropDatabaseStatement = convertSQL(DROP_DATABASE_DDL);
statement.execute(dropDatabaseStatement);
}
} catch (final Exception e) {
throw new IllegalStateException(e);
}
}
public Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
container.getJdbcUrl(databaseName), getUsername(), getPassword());
}
private String convertSQL(final String sql) {
return sql.replace("$DBNAME$", databaseName);
}
}

@ -1,17 +0,0 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- Set the root user password of test tenant
ALTER USER root IDENTIFIED BY '123456';

@ -102,7 +102,6 @@ public abstract class PipelineTestEnvironment extends TestLogger {
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", flinkProperties)
@ -111,7 +110,6 @@ public abstract class PipelineTestEnvironment extends TestLogger {
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
.withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", flinkProperties)

@ -91,6 +91,13 @@ limitations under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-oceanbase-cdc</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>

@ -19,99 +19,68 @@ package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata;
import org.apache.flink.cdc.connectors.oceanbase.testutils.UniqueDatabase;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.MountableFile;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer;
import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC;
/** End-to-end tests for oceanbase-cdc connector uber jar. */
public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(OceanBaseE2eITCase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private static final String INTER_CONTAINER_OB_SERVER_ALIAS = "oceanbase";
private static final String INTER_CONTAINER_LOG_PROXY_ALIAS = "oblogproxy";
private static final Path obCdcJar = TestUtils.getResource("oceanbase-cdc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
// ------------------------------------------------------------------------------------------
// OceanBase container variables
// ------------------------------------------------------------------------------------------
private static final String OB_SERVER_IMAGE = "oceanbase/oceanbase-ce:4.2.0.0";
private static final String OB_LOG_PROXY_IMAGE = "whhe/oblogproxy:1.1.3_4x";
private static final String NETWORK_MODE = "host";
private static final String INTER_CONTAINER_OB_HOST = "host.docker.internal";
private static final String SYS_PASSWORD = "1234567";
private static final String TEST_TENANT = "test";
private static final String TEST_USER = "root@" + TEST_TENANT;
private static final String TEST_PASSWORD = "7654321";
@ClassRule
public static final GenericContainer<?> OB_SERVER =
new GenericContainer<>(OB_SERVER_IMAGE)
.withNetworkMode(NETWORK_MODE)
.withEnv("MODE", "slim")
.withEnv("OB_DATAFILE_SIZE", "1G")
.withEnv("OB_LOG_DISK_SIZE", "4G")
.withEnv("OB_ROOT_PASSWORD", SYS_PASSWORD)
.withEnv("OB_TENANT_NAME", TEST_TENANT)
.withCopyFileToContainer(
MountableFile.forClasspathResource("docker/oceanbase/setup.sql"),
"/root/boot/init.d/init.sql")
.waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
.withStartupTimeout(Duration.ofMinutes(3))
.withLogConsumer(new Slf4jLogConsumer(LOG));
public static final OceanBaseContainer OB_SERVER =
createOceanBaseContainerForCDC()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_OB_SERVER_ALIAS);
@ClassRule
public static final GenericContainer<?> LOG_PROXY =
new GenericContainer<>(OB_LOG_PROXY_IMAGE)
.withNetworkMode(NETWORK_MODE)
.withEnv("OB_SYS_PASSWORD", SYS_PASSWORD)
.waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
.withStartupTimeout(Duration.ofMinutes(1))
.withLogConsumer(new Slf4jLogConsumer(LOG));
public static final LogProxyContainer LOG_PROXY =
createLogProxyContainer()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_LOG_PROXY_ALIAS);
private static final OceanBaseCdcMetadata METADATA =
new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY);
protected final UniqueDatabase obInventoryDatabase =
new UniqueDatabase(OB_SERVER, "oceanbase_inventory");
@Before
public void before() {
super.before();
initializeTable("oceanbase_inventory");
obInventoryDatabase.createAndInitialize();
}
private Connection getTestConnection(String databaseName) {
try {
Class.forName(MYSQL_DRIVER_CLASS);
return DriverManager.getConnection(
String.format("jdbc:mysql://127.0.0.1:2881/%s?useSSL=false", databaseName),
TEST_USER,
TEST_PASSWORD);
} catch (Exception e) {
throw new RuntimeException("Failed to get test jdbc connection", e);
}
@After
public void after() {
super.after();
obInventoryDatabase.dropDatabase();
}
@Test
@ -131,16 +100,18 @@ public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
") WITH (",
" 'connector' = 'oceanbase-cdc',",
" 'scan.startup.mode' = 'initial',",
" 'username' = '" + TEST_USER + "',",
" 'password' = '" + TEST_PASSWORD + "',",
" 'tenant-name' = '" + TEST_TENANT + "',",
" 'table-list' = 'inventory.products_source',",
" 'hostname' = '" + INTER_CONTAINER_OB_HOST + "',",
" 'username' = '" + METADATA.getUsername() + "',",
" 'password' = '" + METADATA.getPassword() + "',",
" 'tenant-name' = '" + METADATA.getTenantName() + "',",
" 'table-list' = '"
+ obInventoryDatabase.qualifiedTableName("products_source")
+ "',",
" 'hostname' = '" + INTER_CONTAINER_OB_SERVER_ALIAS + "',",
" 'port' = '2881',",
" 'jdbc.driver' = '" + MYSQL_DRIVER_CLASS + "',",
" 'logproxy.host' = '" + INTER_CONTAINER_OB_HOST + "',",
" 'jdbc.driver' = '" + METADATA.getDriverClass() + "',",
" 'logproxy.host' = '" + INTER_CONTAINER_LOG_PROXY_ALIAS + "',",
" 'logproxy.port' = '2983',",
" 'rootserver-list' = '127.0.0.1:2882:2881',",
" 'rootserver-list' = '" + METADATA.getRsList() + "',",
" 'working-mode' = 'memory',",
" 'jdbc.properties.useSSL' = 'false'",
");",
@ -168,7 +139,7 @@ public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
submitSQLJob(sqlLines, obCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
try (Connection conn = getTestConnection("inventory");
try (Connection conn = obInventoryDatabase.getJdbcConnection();
Statement stat = conn.createStatement()) {
stat.execute(
"UPDATE products_source SET description='18oz carpenter hammer' WHERE id=106;");
@ -211,32 +182,4 @@ public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
new String[] {"id", "name", "description", "weight", "enum_c", "json_c"},
60000L);
}
protected void initializeTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile = OceanBaseE2eITCase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getTestConnection("");
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

@ -135,7 +135,6 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", flinkProperties)
@ -143,7 +142,6 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
.withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", flinkProperties)

@ -16,13 +16,8 @@
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
-- Create and populate our products using a single insert with many rows
DROP DATABASE IF EXISTS inventory;
CREATE DATABASE inventory;
USE inventory;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products_source (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',

@ -1,16 +0,0 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
ALTER USER root IDENTIFIED BY '7654321';
Loading…
Cancel
Save