[common] Make Flink CDC Compatible with Flink 1.14

pull/705/head
wangxiaojing 3 years ago committed by Leonard Xu
parent 3f3ee289f0
commit b21466a848

@ -30,7 +30,8 @@ under the License.
<packaging>jar</packaging>
<properties>
<flink-1.13>1.13.3</flink-1.13>
<flink-1.13>1.13.5</flink-1.13>
<flink-1.14>1.14.3</flink-1.14>
<mysql.driver.version>8.0.27</mysql.driver.version>
</properties>
@ -186,7 +187,18 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink-1.13}</version>
<destFileName>jdbc-connector.jar</destFileName>
<destFileName>jdbc-connector_${flink-1.13}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink-1.14}</version>
<destFileName>jdbc-connector_${flink-1.14}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>

@ -32,13 +32,13 @@ import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.net.URL;
import java.nio.file.Files;
@ -51,6 +51,7 @@ import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGODB_PORT;
import static org.junit.Assert.assertNotNull;
@ -67,36 +68,47 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private static final Path mongoCdcJar = TestUtils.getResource("mongodb-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ClassRule
public static final MongoDBContainer MONGODB =
new MongoDBContainer()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
private MongoDBContainer mongodb;
private static MongoClient mongoClient;
private MongoClient mongoClient;
@Before
public void before() {
super.before();
// Tips: If you meet issue like 'errmsg" : "No host described in new configuration 1 for
// replica set rs0 maps to this node"' when start the container in you local environment,
// please check your '/etc/hosts' file contains the line 'internet_ip(not 127.0.0.1)
// hostname' e.g: '30.225.0.87 leonard.machine'
mongodb =
new MongoDBContainer()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
Startables.deepStart(Stream.of(mongodb)).join();
@BeforeClass
public static void beforeClass() {
executeCommandFileInMongoDB("mongo_setup", "admin");
MongoClientSettings settings =
MongoClientSettings.builder()
.applyConnectionString(
new ConnectionString(
MONGODB.getConnectionString(
mongodb.getConnectionString(
MONGO_SUPER_USER, MONGO_SUPER_PASSWORD)))
.build();
mongoClient = MongoClients.create(settings);
}
@AfterClass
public static void afterClass() {
@After
public void after() {
super.after();
if (mongoClient != null) {
mongoClient.close();
}
if (mongodb != null) {
mongodb.stop();
}
}
@Test
@ -201,8 +213,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
}
/** Executes a mongo command file, specify a database name. */
private static String executeCommandFileInMongoDB(
String fileNameIgnoreSuffix, String databaseName) {
private String executeCommandFileInMongoDB(String fileNameIgnoreSuffix, String databaseName) {
final String dbName = databaseName != null ? databaseName : fileNameIgnoreSuffix;
final String ddlFile = String.format("ddl/%s.js", fileNameIgnoreSuffix);
final URL ddlTestFile = MongoDBTestBase.class.getClassLoader().getResource(ddlFile);
@ -222,7 +233,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
})
.collect(Collectors.joining("\n"));
MONGODB.executeCommand(command0 + command1);
mongodb.executeCommand(command0 + command1);
return dbName;
} catch (Exception e) {

@ -40,7 +40,6 @@ public class MySqlE2eITCase extends FlinkContainerTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MySqlE2eITCase.class);
private static final Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
@Test
public void testMySqlCDC() throws Exception {

@ -21,12 +21,15 @@ package com.ververica.cdc.connectors.tests;
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.junit.ClassRule;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.nio.file.Path;
import java.sql.Connection;
@ -36,6 +39,7 @@ import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
/** End-to-end tests for oracle-cdc connector uber jar. */
public class OracleE2eITCase extends FlinkContainerTestEnvironment {
@ -49,16 +53,31 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
private static final int ORACLE_PORT = 1521;
private static final Path oracleCdcJar = TestUtils.getResource("oracle-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ClassRule
public static final OracleContainer ORACLE =
@Rule
public final OracleContainer oracle =
new OracleContainer(ORACLE_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_ORACLE_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Before
public void before() {
super.before();
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(oracle)).join();
LOG.info("Containers are started.");
}
@After
public void after() {
if (oracle != null) {
oracle.stop();
}
super.after();
}
@Test
public void testOracleCDC() throws Exception {
List<String> sqlLines =
@ -155,6 +174,6 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
private Connection getOracleJdbcConnection() throws SQLException {
return DriverManager.getConnection(
ORACLE.getJdbcUrl(), ORACLE_TEST_USER, ORACLE_TEST_PASSWORD);
oracle.getJdbcUrl(), ORACLE_TEST_USER, ORACLE_TEST_PASSWORD);
}
}

@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.tests;
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@ -61,7 +62,6 @@ public class PostgresE2eITCase extends FlinkContainerTestEnvironment {
DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres");
private static final Path postgresCdcJar = TestUtils.getResource("postgres-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ClassRule
@ -80,6 +80,11 @@ public class PostgresE2eITCase extends FlinkContainerTestEnvironment {
initializePostgresTable("postgres_inventory");
}
@After
public void after() {
super.after();
}
@Test
public void testPostgresCDC() throws Exception {
List<String> sqlLines =

@ -21,13 +21,15 @@ package com.ververica.cdc.connectors.tests;
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.net.URL;
import java.nio.file.Files;
@ -43,6 +45,7 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertNotNull;
@ -54,11 +57,10 @@ public class SqlServerE2eITCase extends FlinkContainerTestEnvironment {
private static final String INTER_CONTAINER_SQL_SERVER_ALIAS = "mssqlserver";
private static final Path sqlServerCdcJar =
TestUtils.getResource("sqlserver-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ClassRule
public static final MSSQLServerContainer MSSQL_SERVER_CONTAINER =
@Rule
public MSSQLServerContainer sqlServer =
new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest")
.withPassword("Password!")
.withEnv("MSSQL_AGENT_ENABLED", "true")
@ -70,9 +72,20 @@ public class SqlServerE2eITCase extends FlinkContainerTestEnvironment {
@Before
public void before() {
super.before();
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(sqlServer)).join();
LOG.info("Containers are started.");
initializeSqlServerTable("sqlserver_inventory");
}
@After
public void after() {
if (sqlServer != null) {
sqlServer.stop();
}
super.after();
}
@Test
public void testSqlServerCDC() throws Exception {
List<String> sqlLines =
@ -86,9 +99,9 @@ public class SqlServerE2eITCase extends FlinkContainerTestEnvironment {
") WITH (",
" 'connector' = 'sqlserver-cdc',",
" 'hostname' = '" + INTER_CONTAINER_SQL_SERVER_ALIAS + "',",
" 'port' = '" + MSSQL_SERVER_CONTAINER.MS_SQL_SERVER_PORT + "',",
" 'username' = '" + MSSQL_SERVER_CONTAINER.getUsername() + "',",
" 'password' = '" + MSSQL_SERVER_CONTAINER.getPassword() + "',",
" 'port' = '" + sqlServer.MS_SQL_SERVER_PORT + "',",
" 'username' = '" + sqlServer.getUsername() + "',",
" 'password' = '" + sqlServer.getPassword() + "',",
" 'database-name' = 'inventory',",
" 'schema-name' = 'dbo',",
" 'table-name' = 'products'",
@ -193,8 +206,6 @@ public class SqlServerE2eITCase extends FlinkContainerTestEnvironment {
private Connection getSqlServerJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MSSQL_SERVER_CONTAINER.getJdbcUrl(),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword());
sqlServer.getJdbcUrl(), sqlServer.getUsername(), sqlServer.getPassword());
}
}

@ -35,12 +35,15 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;
import javax.annotation.Nullable;
@ -55,19 +58,22 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkState;
/** Test environment running job on Flink containers. */
@RunWith(Parameterized.class)
public abstract class FlinkContainerTestEnvironment extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnvironment.class);
@Parameterized.Parameter public String flinkVersion;
// ------------------------------------------------------------------------------------------
// Flink Variables
// ------------------------------------------------------------------------------------------
private static final int JOB_MANAGER_REST_PORT = 8081;
private static final String FLINK_BIN = "bin";
private static final String FLINK_IMAGE_TAG = "flink:1.13.3-scala_2.11";
private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
private static final String FLINK_PROPERTIES =
@ -91,6 +97,10 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
@ClassRule public static final Network NETWORK = Network.newNetwork();
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Nullable private RestClusterClient<StandaloneClusterId> restClusterClient;
@ClassRule
public static final MySqlContainer MYSQL =
(MySqlContainer)
@ -104,36 +114,44 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
.withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public final GenericContainer<?> jobmanager =
new GenericContainer<>(FLINK_IMAGE_TAG)
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Rule
public final GenericContainer<?> taskmanager =
new GenericContainer<>(FLINK_IMAGE_TAG)
.withCommand("taskmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.dependsOn(jobmanager)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Nullable private RestClusterClient<StandaloneClusterId> restClusterClient;
protected final UniqueDatabase mysqlInventoryDatabase =
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
protected Path jdbcJar;
private GenericContainer<?> jobManager;
private GenericContainer<?> taskManager;
@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List<String> getFlinkVersion() {
return Arrays.asList("1.13.5", "1.14.3");
}
@Before
public void before() {
mysqlInventoryDatabase.createAndInitialize();
jdbcJar = TestUtils.getResource(getJdbcConnectorResourceName());
LOG.info("Starting containers...");
jobManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withLogConsumer(new Slf4jLogConsumer(LOG));
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.dependsOn(jobManager)
.withLogConsumer(new Slf4jLogConsumer(LOG));
Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
LOG.info("Containers are started.");
}
@After
@ -141,6 +159,12 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
if (restClusterClient != null) {
restClusterClient.close();
}
if (jobManager != null) {
jobManager.stop();
}
if (taskManager != null) {
taskManager.stop();
}
}
/**
@ -155,17 +179,17 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
final List<String> commands = new ArrayList<>();
Path script = temporaryFolder.newFile().toPath();
Files.write(script, job.getSqlLines());
jobmanager.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql");
jobManager.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql");
commands.add("cat /tmp/script.sql | ");
commands.add(FLINK_BIN + "/sql-client.sh");
for (String jar : job.getJars()) {
commands.add("--jar");
String containerPath = copyAndGetContainerPath(jobmanager, jar);
String containerPath = copyAndGetContainerPath(jobManager, jar);
commands.add(containerPath);
}
ExecResult execResult =
jobmanager.execInContainer("bash", "-c", String.join(" ", commands));
jobManager.execInContainer("bash", "-c", String.join(" ", commands));
LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr());
if (execResult.getExitCode() != 0) {
@ -183,13 +207,13 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
return restClusterClient;
}
checkState(
jobmanager.isRunning(),
jobManager.isRunning(),
"Cluster client should only be retrieved for a running cluster");
try {
final Configuration clientConfiguration = new Configuration();
clientConfiguration.set(RestOptions.ADDRESS, jobmanager.getHost());
clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
clientConfiguration.set(
RestOptions.PORT, jobmanager.getMappedPort(JOB_MANAGER_REST_PORT));
RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
this.restClusterClient =
new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance());
} catch (Exception e) {
@ -233,4 +257,12 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
container.copyFileToContainer(MountableFile.forHostPath(path), containerPath);
return containerPath;
}
private String getFlinkDockerImageTag() {
return String.format("flink:%s-scala_2.11", flinkVersion);
}
protected String getJdbcConnectorResourceName() {
return String.format("jdbc-connector_%s.jar", flinkVersion);
}
}

@ -17,17 +17,20 @@
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
-- Create the inventory database
CREATE DATABASE inventory;
CREATE
DATABASE inventory;
USE inventory;
USE
inventory;
EXEC sys.sp_cdc_enable_db;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
CREATE TABLE products
(
id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
INSERT INTO products(name,description,weight)
VALUES ('scooter','Small 2-wheel scooter',3.14);

@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Properties;
@ -425,7 +426,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
debeziumStarted = true;
// initialize metrics
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
// make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
final Method getMetricGroupMethod =
getRuntimeContext().getClass().getMethod("getMetricGroup");
getMetricGroupMethod.setAccessible(true);
final MetricGroup metricGroup =
(MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
metricGroup.gauge(
"currentFetchEventTimeLag",
(Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());

@ -31,6 +31,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.mysql.MySqlValidator;
@ -58,6 +59,7 @@ import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import org.apache.kafka.connect.source.SourceRecord;
import java.lang.reflect.Method;
import java.util.List;
import java.util.function.Supplier;
@ -135,8 +137,13 @@ public class MySqlSource<T>
configFactory.createConfig(readerContext.getIndexOfSubtask());
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>();
final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup");
metricGroupMethod.setAccessible(true);
final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext);
final MySqlSourceReaderMetrics sourceReaderMetrics =
new MySqlSourceReaderMetrics(readerContext.metricGroup());
new MySqlSourceReaderMetrics(metricGroup);
sourceReaderMetrics.registerMetrics();
MySqlSourceReaderContext mySqlSourceReaderContext =
new MySqlSourceReaderContext(readerContext);

@ -58,7 +58,7 @@ import java.util.Random;
import java.util.concurrent.ExecutionException;
import static java.lang.String.format;
import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
import static org.apache.flink.util.Preconditions.checkState;
/** IT tests for {@link MySqlSource}. */
public class MySqlSourceITCase extends MySqlSourceTestBase {

@ -28,6 +28,7 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
@ -54,6 +55,7 @@ import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@ -238,18 +240,24 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
}
}
private MySqlSourceReader<SourceRecord> createReader(MySqlSourceConfig configuration) {
private MySqlSourceReader<SourceRecord> createReader(MySqlSourceConfig configuration)
throws Exception {
return createReader(configuration, new TestingReaderContext());
}
private MySqlSourceReader<SourceRecord> createReader(
MySqlSourceConfig configuration, SourceReaderContext readerContext) {
MySqlSourceConfig configuration, SourceReaderContext readerContext) throws Exception {
final FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>();
// make SourceReaderContext#metricGroup compatible between Flink 1.13 and Flink 1.14
final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup");
metricGroupMethod.setAccessible(true);
final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext);
final MySqlRecordEmitter<SourceRecord> recordEmitter =
new MySqlRecordEmitter<>(
new ForwardDeserializeSchema(),
new MySqlSourceReaderMetrics(readerContext.metricGroup()),
new MySqlSourceReaderMetrics(metricGroup),
configuration.isIncludeSchemaChanges());
final MySqlSourceReaderContext mySqlSourceReaderContext =
new MySqlSourceReaderContext(readerContext);

@ -65,6 +65,8 @@ under the License.
<include>org.apache.kafka:*</include>
<include>com.fasterxml.*:*</include>
<include>com.google.guava:*</include>
<!-- Include fixed version 18.0-13.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<filters>

@ -67,6 +67,8 @@ under the License.
<include>com.google.guava:*</include>
<include>com.esri.geometry:esri-geometry-api</include>
<include>com.zaxxer:HikariCP</include>
<!-- Include fixed version 18.0-13.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<filters>

@ -65,6 +65,8 @@ under the License.
<include>org.apache.kafka:*</include>
<include>com.fasterxml.*:*</include>
<include>com.google.guava:*</include>
<!-- Include fixed version 18.0-13.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<filters>

@ -63,6 +63,8 @@ under the License.
<include>org.apache.kafka:*</include>
<include>org.postgresql:postgresql</include>
<include>com.fasterxml.*:*</include>
<!-- Include fixed version 18.0-13.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<filters>

@ -62,6 +62,8 @@ under the License.
<include>org.apache.kafka:*</include>
<include>com.fasterxml.*:*</include>
<include>com.google.guava:*</include>
<!-- Include fixed version 18.0-13.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<filters>

@ -125,6 +125,13 @@ under the License.
<version>${slf4j.version}</version>
</dependency>
<!-- Use fixed version 18.0-13.0 of flink shaded guava -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>18.0-13.0</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.hamcrest</groupId>

Loading…
Cancel
Save