diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml index 1ff13c070..3c6d3d198 100644 --- a/flink-cdc-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/pom.xml @@ -30,7 +30,8 @@ under the License. jar - 1.13.3 + 1.13.5 + 1.14.3 8.0.27 @@ -186,7 +187,18 @@ under the License. org.apache.flink flink-connector-jdbc_2.11 ${flink-1.13} - jdbc-connector.jar + jdbc-connector_${flink-1.13}.jar + jar + ${project.build.directory}/dependencies + + + + + + org.apache.flink + flink-connector-jdbc_2.11 + ${flink-1.14} + jdbc-connector_${flink-1.14}.jar jar ${project.build.directory}/dependencies diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java index 2b97862c5..3a614fa75 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java @@ -32,13 +32,13 @@ import com.ververica.cdc.connectors.tests.utils.JdbcProxy; import com.ververica.cdc.connectors.tests.utils.TestUtils; import org.bson.Document; import org.bson.types.ObjectId; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; import java.net.URL; import java.nio.file.Files; @@ -51,6 +51,7 @@ import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGODB_PORT; import static org.junit.Assert.assertNotNull; @@ -67,36 +68,47 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment { private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); private static final Path mongoCdcJar = TestUtils.getResource("mongodb-cdc-connector.jar"); - private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar"); private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); - @ClassRule - public static final MongoDBContainer MONGODB = - new MongoDBContainer() - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + private MongoDBContainer mongodb; - private static MongoClient mongoClient; + private MongoClient mongoClient; + + @Before + public void before() { + super.before(); + // Tips: If you meet issue like 'errmsg" : "No host described in new configuration 1 for + // replica set rs0 maps to this node"' when start the container in you local environment, + // please check your '/etc/hosts' file contains the line 'internet_ip(not 127.0.0.1) + // hostname' e.g: '30.225.0.87 leonard.machine' + mongodb = + new MongoDBContainer() + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + Startables.deepStart(Stream.of(mongodb)).join(); - @BeforeClass - public static void beforeClass() { executeCommandFileInMongoDB("mongo_setup", "admin"); MongoClientSettings settings = MongoClientSettings.builder() .applyConnectionString( new ConnectionString( - MONGODB.getConnectionString( + mongodb.getConnectionString( MONGO_SUPER_USER, MONGO_SUPER_PASSWORD))) .build(); mongoClient = MongoClients.create(settings); } - @AfterClass - public static void afterClass() { + @After + public void after() { + super.after(); if (mongoClient != null) { mongoClient.close(); } + if (mongodb != null) { + mongodb.stop(); + } } @Test @@ -201,8 +213,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment { } /** Executes a mongo command file, specify a database name. */ - private static String executeCommandFileInMongoDB( - String fileNameIgnoreSuffix, String databaseName) { + private String executeCommandFileInMongoDB(String fileNameIgnoreSuffix, String databaseName) { final String dbName = databaseName != null ? databaseName : fileNameIgnoreSuffix; final String ddlFile = String.format("ddl/%s.js", fileNameIgnoreSuffix); final URL ddlTestFile = MongoDBTestBase.class.getClassLoader().getResource(ddlFile); @@ -222,7 +233,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment { }) .collect(Collectors.joining("\n")); - MONGODB.executeCommand(command0 + command1); + mongodb.executeCommand(command0 + command1); return dbName; } catch (Exception e) { diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MySqlE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MySqlE2eITCase.java index f447c92cf..7b215fa2c 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MySqlE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MySqlE2eITCase.java @@ -40,7 +40,6 @@ public class MySqlE2eITCase extends FlinkContainerTestEnvironment { private static final Logger LOG = LoggerFactory.getLogger(MySqlE2eITCase.class); private static final Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-connector.jar"); - private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar"); @Test public void testMySqlCDC() throws Exception { diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java index 041d42fec..ad6c7b09e 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java @@ -21,12 +21,15 @@ package com.ververica.cdc.connectors.tests; import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; import com.ververica.cdc.connectors.tests.utils.JdbcProxy; import com.ververica.cdc.connectors.tests.utils.TestUtils; -import org.junit.ClassRule; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.OracleContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; import java.nio.file.Path; import java.sql.Connection; @@ -36,6 +39,7 @@ import java.sql.Statement; import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.stream.Stream; /** End-to-end tests for oracle-cdc connector uber jar. */ public class OracleE2eITCase extends FlinkContainerTestEnvironment { @@ -49,16 +53,31 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment { private static final int ORACLE_PORT = 1521; private static final Path oracleCdcJar = TestUtils.getResource("oracle-cdc-connector.jar"); - private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar"); private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); - @ClassRule - public static final OracleContainer ORACLE = + @Rule + public final OracleContainer oracle = new OracleContainer(ORACLE_IMAGE) .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_ORACLE_ALIAS) .withLogConsumer(new Slf4jLogConsumer(LOG)); + @Before + public void before() { + super.before(); + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(oracle)).join(); + LOG.info("Containers are started."); + } + + @After + public void after() { + if (oracle != null) { + oracle.stop(); + } + super.after(); + } + @Test public void testOracleCDC() throws Exception { List sqlLines = @@ -155,6 +174,6 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment { private Connection getOracleJdbcConnection() throws SQLException { return DriverManager.getConnection( - ORACLE.getJdbcUrl(), ORACLE_TEST_USER, ORACLE_TEST_PASSWORD); + oracle.getJdbcUrl(), ORACLE_TEST_USER, ORACLE_TEST_PASSWORD); } } diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/PostgresE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/PostgresE2eITCase.java index c95a0dbc0..22f412dd9 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/PostgresE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/PostgresE2eITCase.java @@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.tests; import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; import com.ververica.cdc.connectors.tests.utils.JdbcProxy; import com.ververica.cdc.connectors.tests.utils.TestUtils; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -61,7 +62,6 @@ public class PostgresE2eITCase extends FlinkContainerTestEnvironment { DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres"); private static final Path postgresCdcJar = TestUtils.getResource("postgres-cdc-connector.jar"); - private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar"); private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @ClassRule @@ -80,6 +80,11 @@ public class PostgresE2eITCase extends FlinkContainerTestEnvironment { initializePostgresTable("postgres_inventory"); } + @After + public void after() { + super.after(); + } + @Test public void testPostgresCDC() throws Exception { List sqlLines = diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/SqlServerE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/SqlServerE2eITCase.java index 655e72cd2..e18a2e9c5 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/SqlServerE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/SqlServerE2eITCase.java @@ -21,13 +21,15 @@ package com.ververica.cdc.connectors.tests; import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; import com.ververica.cdc.connectors.tests.utils.JdbcProxy; import com.ververica.cdc.connectors.tests.utils.TestUtils; +import org.junit.After; import org.junit.Before; -import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.MSSQLServerContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; import java.net.URL; import java.nio.file.Files; @@ -43,6 +45,7 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.assertNotNull; @@ -54,11 +57,10 @@ public class SqlServerE2eITCase extends FlinkContainerTestEnvironment { private static final String INTER_CONTAINER_SQL_SERVER_ALIAS = "mssqlserver"; private static final Path sqlServerCdcJar = TestUtils.getResource("sqlserver-cdc-connector.jar"); - private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar"); private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); - @ClassRule - public static final MSSQLServerContainer MSSQL_SERVER_CONTAINER = + @Rule + public MSSQLServerContainer sqlServer = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest") .withPassword("Password!") .withEnv("MSSQL_AGENT_ENABLED", "true") @@ -70,9 +72,20 @@ public class SqlServerE2eITCase extends FlinkContainerTestEnvironment { @Before public void before() { super.before(); + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(sqlServer)).join(); + LOG.info("Containers are started."); initializeSqlServerTable("sqlserver_inventory"); } + @After + public void after() { + if (sqlServer != null) { + sqlServer.stop(); + } + super.after(); + } + @Test public void testSqlServerCDC() throws Exception { List sqlLines = @@ -86,9 +99,9 @@ public class SqlServerE2eITCase extends FlinkContainerTestEnvironment { ") WITH (", " 'connector' = 'sqlserver-cdc',", " 'hostname' = '" + INTER_CONTAINER_SQL_SERVER_ALIAS + "',", - " 'port' = '" + MSSQL_SERVER_CONTAINER.MS_SQL_SERVER_PORT + "',", - " 'username' = '" + MSSQL_SERVER_CONTAINER.getUsername() + "',", - " 'password' = '" + MSSQL_SERVER_CONTAINER.getPassword() + "',", + " 'port' = '" + sqlServer.MS_SQL_SERVER_PORT + "',", + " 'username' = '" + sqlServer.getUsername() + "',", + " 'password' = '" + sqlServer.getPassword() + "',", " 'database-name' = 'inventory',", " 'schema-name' = 'dbo',", " 'table-name' = 'products'", @@ -193,8 +206,6 @@ public class SqlServerE2eITCase extends FlinkContainerTestEnvironment { private Connection getSqlServerJdbcConnection() throws SQLException { return DriverManager.getConnection( - MSSQL_SERVER_CONTAINER.getJdbcUrl(), - MSSQL_SERVER_CONTAINER.getUsername(), - MSSQL_SERVER_CONTAINER.getPassword()); + sqlServer.getJdbcUrl(), sqlServer.getUsername(), sqlServer.getPassword()); } } diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java index 72e2a30c7..f626293dc 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java @@ -35,12 +35,15 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.MountableFile; import javax.annotation.Nullable; @@ -55,19 +58,22 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkState; /** Test environment running job on Flink containers. */ +@RunWith(Parameterized.class) public abstract class FlinkContainerTestEnvironment extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnvironment.class); + @Parameterized.Parameter public String flinkVersion; + // ------------------------------------------------------------------------------------------ // Flink Variables // ------------------------------------------------------------------------------------------ private static final int JOB_MANAGER_REST_PORT = 8081; private static final String FLINK_BIN = "bin"; - private static final String FLINK_IMAGE_TAG = "flink:1.13.3-scala_2.11"; private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; private static final String FLINK_PROPERTIES = @@ -91,6 +97,10 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { @ClassRule public static final Network NETWORK = Network.newNetwork(); + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Nullable private RestClusterClient restClusterClient; + @ClassRule public static final MySqlContainer MYSQL = (MySqlContainer) @@ -104,36 +114,44 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) .withLogConsumer(new Slf4jLogConsumer(LOG)); - @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Rule - public final GenericContainer jobmanager = - new GenericContainer<>(FLINK_IMAGE_TAG) - .withCommand("jobmanager") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) - .withExposedPorts(JOB_MANAGER_REST_PORT) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - @Rule - public final GenericContainer taskmanager = - new GenericContainer<>(FLINK_IMAGE_TAG) - .withCommand("taskmanager") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .dependsOn(jobmanager) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - @Nullable private RestClusterClient restClusterClient; - protected final UniqueDatabase mysqlInventoryDatabase = new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + protected Path jdbcJar; + + private GenericContainer jobManager; + private GenericContainer taskManager; + + @Parameterized.Parameters(name = "flinkVersion: {0}") + public static List getFlinkVersion() { + return Arrays.asList("1.13.5", "1.14.3"); + } @Before public void before() { mysqlInventoryDatabase.createAndInitialize(); + jdbcJar = TestUtils.getResource(getJdbcConnectorResourceName()); + + LOG.info("Starting containers..."); + jobManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + taskManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .dependsOn(jobManager) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + Startables.deepStart(Stream.of(jobManager)).join(); + Startables.deepStart(Stream.of(taskManager)).join(); + LOG.info("Containers are started."); } @After @@ -141,6 +159,12 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { if (restClusterClient != null) { restClusterClient.close(); } + if (jobManager != null) { + jobManager.stop(); + } + if (taskManager != null) { + taskManager.stop(); + } } /** @@ -155,17 +179,17 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { final List commands = new ArrayList<>(); Path script = temporaryFolder.newFile().toPath(); Files.write(script, job.getSqlLines()); - jobmanager.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql"); + jobManager.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql"); commands.add("cat /tmp/script.sql | "); commands.add(FLINK_BIN + "/sql-client.sh"); for (String jar : job.getJars()) { commands.add("--jar"); - String containerPath = copyAndGetContainerPath(jobmanager, jar); + String containerPath = copyAndGetContainerPath(jobManager, jar); commands.add(containerPath); } ExecResult execResult = - jobmanager.execInContainer("bash", "-c", String.join(" ", commands)); + jobManager.execInContainer("bash", "-c", String.join(" ", commands)); LOG.info(execResult.getStdout()); LOG.error(execResult.getStderr()); if (execResult.getExitCode() != 0) { @@ -183,13 +207,13 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { return restClusterClient; } checkState( - jobmanager.isRunning(), + jobManager.isRunning(), "Cluster client should only be retrieved for a running cluster"); try { final Configuration clientConfiguration = new Configuration(); - clientConfiguration.set(RestOptions.ADDRESS, jobmanager.getHost()); + clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost()); clientConfiguration.set( - RestOptions.PORT, jobmanager.getMappedPort(JOB_MANAGER_REST_PORT)); + RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT)); this.restClusterClient = new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance()); } catch (Exception e) { @@ -233,4 +257,12 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { container.copyFileToContainer(MountableFile.forHostPath(path), containerPath); return containerPath; } + + private String getFlinkDockerImageTag() { + return String.format("flink:%s-scala_2.11", flinkVersion); + } + + protected String getJdbcConnectorResourceName() { + return String.format("jdbc-connector_%s.jar", flinkVersion); + } } diff --git a/flink-cdc-e2e-tests/src/test/resources/ddl/sqlserver_inventory.sql b/flink-cdc-e2e-tests/src/test/resources/ddl/sqlserver_inventory.sql index 8b2015065..1e8bd1340 100644 --- a/flink-cdc-e2e-tests/src/test/resources/ddl/sqlserver_inventory.sql +++ b/flink-cdc-e2e-tests/src/test/resources/ddl/sqlserver_inventory.sql @@ -17,17 +17,20 @@ -- DATABASE: inventory -- ---------------------------------------------------------------------------------------------------------------- -- Create the inventory database -CREATE DATABASE inventory; +CREATE +DATABASE inventory; -USE inventory; +USE +inventory; EXEC sys.sp_cdc_enable_db; -- Create and populate our products using a single insert with many rows -CREATE TABLE products ( - id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY, - name VARCHAR(255) NOT NULL, - description VARCHAR(512), - weight FLOAT +CREATE TABLE products +( + id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT ); INSERT INTO products(name,description,weight) VALUES ('scooter','Small 2-wheel scooter',3.14); diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java index 33bd71ec7..aaadd07d1 100644 --- a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java @@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Properties; @@ -425,7 +426,13 @@ public class DebeziumSourceFunction extends RichSourceFunction debeziumStarted = true; // initialize metrics - MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14 + final Method getMetricGroupMethod = + getRuntimeContext().getClass().getMethod("getMetricGroup"); + getMetricGroupMethod.setAccessible(true); + final MetricGroup metricGroup = + (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext()); + metricGroup.gauge( "currentFetchEventTimeLag", (Gauge) () -> debeziumChangeFetcher.getFetchDelay()); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java index a5c69188b..4286b4a83 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java @@ -31,6 +31,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.FlinkRuntimeException; import com.ververica.cdc.connectors.mysql.MySqlValidator; @@ -58,6 +59,7 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import org.apache.kafka.connect.source.SourceRecord; +import java.lang.reflect.Method; import java.util.List; import java.util.function.Supplier; @@ -135,8 +137,13 @@ public class MySqlSource configFactory.createConfig(readerContext.getIndexOfSubtask()); FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); + + final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup"); + metricGroupMethod.setAccessible(true); + final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext); + final MySqlSourceReaderMetrics sourceReaderMetrics = - new MySqlSourceReaderMetrics(readerContext.metricGroup()); + new MySqlSourceReaderMetrics(metricGroup); sourceReaderMetrics.registerMetrics(); MySqlSourceReaderContext mySqlSourceReaderContext = new MySqlSourceReaderContext(readerContext); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java index 2ad405ba3..1bfc8d953 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -58,7 +58,7 @@ import java.util.Random; import java.util.concurrent.ExecutionException; import static java.lang.String.format; -import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState; +import static org.apache.flink.util.Preconditions.checkState; /** IT tests for {@link MySqlSource}. */ public class MySqlSourceITCase extends MySqlSourceTestBase { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index be07caf86..9359e272d 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -28,6 +28,7 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; import org.apache.flink.core.io.InputStatus; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Collector; @@ -54,6 +55,7 @@ import io.debezium.relational.history.TableChanges; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; +import java.lang.reflect.Method; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; @@ -238,18 +240,24 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase { } } - private MySqlSourceReader createReader(MySqlSourceConfig configuration) { + private MySqlSourceReader createReader(MySqlSourceConfig configuration) + throws Exception { return createReader(configuration, new TestingReaderContext()); } private MySqlSourceReader createReader( - MySqlSourceConfig configuration, SourceReaderContext readerContext) { + MySqlSourceConfig configuration, SourceReaderContext readerContext) throws Exception { final FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); + // make SourceReaderContext#metricGroup compatible between Flink 1.13 and Flink 1.14 + final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup"); + metricGroupMethod.setAccessible(true); + final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext); + final MySqlRecordEmitter recordEmitter = new MySqlRecordEmitter<>( new ForwardDeserializeSchema(), - new MySqlSourceReaderMetrics(readerContext.metricGroup()), + new MySqlSourceReaderMetrics(metricGroup), configuration.isIncludeSchemaChanges()); final MySqlSourceReaderContext mySqlSourceReaderContext = new MySqlSourceReaderContext(readerContext); diff --git a/flink-sql-connector-mongodb-cdc/pom.xml b/flink-sql-connector-mongodb-cdc/pom.xml index 0656ce79c..456ca93dc 100644 --- a/flink-sql-connector-mongodb-cdc/pom.xml +++ b/flink-sql-connector-mongodb-cdc/pom.xml @@ -65,6 +65,8 @@ under the License. org.apache.kafka:* com.fasterxml.*:* com.google.guava:* + + org.apache.flink:flink-shaded-guava diff --git a/flink-sql-connector-mysql-cdc/pom.xml b/flink-sql-connector-mysql-cdc/pom.xml index 2e954d06f..d1297403e 100644 --- a/flink-sql-connector-mysql-cdc/pom.xml +++ b/flink-sql-connector-mysql-cdc/pom.xml @@ -67,6 +67,8 @@ under the License. com.google.guava:* com.esri.geometry:esri-geometry-api com.zaxxer:HikariCP + + org.apache.flink:flink-shaded-guava diff --git a/flink-sql-connector-oracle-cdc/pom.xml b/flink-sql-connector-oracle-cdc/pom.xml index 024012bb7..46170e806 100644 --- a/flink-sql-connector-oracle-cdc/pom.xml +++ b/flink-sql-connector-oracle-cdc/pom.xml @@ -65,6 +65,8 @@ under the License. org.apache.kafka:* com.fasterxml.*:* com.google.guava:* + + org.apache.flink:flink-shaded-guava diff --git a/flink-sql-connector-postgres-cdc/pom.xml b/flink-sql-connector-postgres-cdc/pom.xml index 70fb6e4c8..e4758125f 100644 --- a/flink-sql-connector-postgres-cdc/pom.xml +++ b/flink-sql-connector-postgres-cdc/pom.xml @@ -63,6 +63,8 @@ under the License. org.apache.kafka:* org.postgresql:postgresql com.fasterxml.*:* + + org.apache.flink:flink-shaded-guava diff --git a/flink-sql-connector-sqlserver-cdc/pom.xml b/flink-sql-connector-sqlserver-cdc/pom.xml index c363a7b6f..cb6865b03 100644 --- a/flink-sql-connector-sqlserver-cdc/pom.xml +++ b/flink-sql-connector-sqlserver-cdc/pom.xml @@ -62,6 +62,8 @@ under the License. org.apache.kafka:* com.fasterxml.*:* com.google.guava:* + + org.apache.flink:flink-shaded-guava diff --git a/pom.xml b/pom.xml index 7bf92a5e7..1faa668c6 100644 --- a/pom.xml +++ b/pom.xml @@ -125,6 +125,13 @@ under the License. ${slf4j.version} + + + org.apache.flink + flink-shaded-guava + 18.0-13.0 + + org.hamcrest