[FLINK-35540][cdc-common] Fix table missed when database and table are with the same name

This closes  #3409.
pull/3380/head
North Lin 8 months ago committed by GitHub
parent 1112987572
commit 7287eaceca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -98,15 +98,15 @@ public class Selectors {
Predicates.setOf(
tableInclusions, Predicates.RegExSplitterByComma::split, (str) -> str);
for (String tableSplit : tableSplitSet) {
Set<String> tableIdSet =
Predicates.setOf(
List<String> tableIdList =
Predicates.listOf(
tableSplit, Predicates.RegExSplitterByDot::split, (str) -> str);
Iterator<String> iterator = tableIdSet.iterator();
if (tableIdSet.size() == 1) {
Iterator<String> iterator = tableIdList.iterator();
if (tableIdList.size() == 1) {
selectors.add(new Selector(null, null, iterator.next()));
} else if (tableIdSet.size() == 2) {
} else if (tableIdList.size() == 2) {
selectors.add(new Selector(null, iterator.next(), iterator.next()));
} else if (tableIdSet.size() == 3) {
} else if (tableIdList.size() == 3) {
selectors.add(new Selector(iterator.next(), iterator.next(), iterator.next()));
} else {
throw new IllegalArgumentException(

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@ -78,6 +79,21 @@ public class Predicates {
return matches;
}
public static <T> List<T> listOf(
String input, Function<String, String[]> splitter, Function<String, T> factory) {
if (input == null) {
return Collections.emptyList();
}
List<T> matches = new LinkedList<>();
for (String item : splitter.apply(input)) {
T obj = factory.apply(item);
if (obj != null) {
matches.add(obj);
}
}
return matches;
}
protected static <T> Function<T, Optional<Pattern>> matchedByPattern(
Collection<Pattern> patterns, Function<T, String> conversion) {
return (t) -> {

@ -32,9 +32,10 @@ class SelectorsTest {
// nameSpace, schemaName, tableName
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables("db.sc1.A[0-9]+,db.sc2.B[0-1]+")
.includeTables("db.sc1.A[0-9]+,db.sc2.B[0-1]+,db.sc1.sc1")
.build();
assertAllowed(selectors, "db", "sc1", "sc1");
assertAllowed(selectors, "db", "sc1", "A1");
assertAllowed(selectors, "db", "sc1", "A2");
assertAllowed(selectors, "db", "sc2", "B0");
@ -50,9 +51,12 @@ class SelectorsTest {
selectors =
new Selectors.SelectorsBuilder()
.includeTables("db\\..sc1.A[0-9]+,db.sc2.B[0-1]+")
.includeTables("db\\..sc1.A[0-9]+,db.sc2.B[0-1]+,db\\..sc1.sc1,db.sc1.sc1")
.build();
assertAllowed(selectors, "db", "sc1", "sc1");
assertAllowed(selectors, "db1", "sc1", "sc1");
assertAllowed(selectors, "dba", "sc1", "sc1");
assertAllowed(selectors, "db1", "sc1", "A1");
assertAllowed(selectors, "dba", "sc1", "A2");
assertAllowed(selectors, "db", "sc2", "B0");
@ -68,8 +72,11 @@ class SelectorsTest {
// schemaName, tableName
selectors =
new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build();
new Selectors.SelectorsBuilder()
.includeTables("sc1.A[0-9]+,sc2.B[0-1]+,sc1.sc1")
.build();
assertAllowed(selectors, null, "sc1", "sc1");
assertAllowed(selectors, null, "sc1", "A1");
assertAllowed(selectors, null, "sc1", "A2");
assertAllowed(selectors, null, "sc2", "B0");
@ -82,8 +89,12 @@ class SelectorsTest {
assertNotAllowed(selectors, null, "sc1A", "A1");
// tableName
selectors = new Selectors.SelectorsBuilder().includeTables("\\.A[0-9]+,B[0-1]+").build();
selectors =
new Selectors.SelectorsBuilder().includeTables("\\.A[0-9]+,B[0-1]+,sc1").build();
assertAllowed(selectors, null, null, "sc1");
assertNotAllowed(selectors, "db", "sc1", "sc1");
assertNotAllowed(selectors, null, "sc1", "sc1");
assertAllowed(selectors, null, null, "1A1");
assertAllowed(selectors, null, null, "AA2");
assertAllowed(selectors, null, null, "B0");
@ -94,8 +105,11 @@ class SelectorsTest {
assertNotAllowed(selectors, null, null, "2B");
selectors =
new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build();
new Selectors.SelectorsBuilder()
.includeTables("sc1.A[0-9]+,sc2.B[0-1]+,sc1.sc1")
.build();
assertAllowed(selectors, null, "sc1", "sc1");
assertAllowed(selectors, null, "sc1", "A1");
assertAllowed(selectors, null, "sc1", "A2");
assertAllowed(selectors, null, "sc1", "A2");
@ -107,6 +121,15 @@ class SelectorsTest {
assertNotAllowed(selectors, null, "sc2", "B2");
assertNotAllowed(selectors, null, "sc11", "A1");
assertNotAllowed(selectors, null, "sc1A", "A1");
selectors = new Selectors.SelectorsBuilder().includeTables("sc1.sc1").build();
assertAllowed(selectors, null, "sc1", "sc1");
selectors = new Selectors.SelectorsBuilder().includeTables("sc1.sc[0-9]+").build();
assertAllowed(selectors, null, "sc1", "sc1");
selectors = new Selectors.SelectorsBuilder().includeTables("sc1.\\.*").build();
assertAllowed(selectors, null, "sc1", "sc1");
}
protected void assertAllowed(

@ -24,6 +24,9 @@ import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.junit.Test;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@ -124,6 +127,44 @@ public class MySqlDataSourceFactoryTest extends MySqlSourceTestBase {
+ tableExclude);
}
@Test
public void testDatabaseAndTableWithTheSameName() throws SQLException {
inventoryDatabase.createAndInitialize();
// create a table with the same name of database
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
String createSameNameTableSql =
String.format(
"CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
+ " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n"
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ " description VARCHAR(512)\n"
+ ");",
inventoryDatabase.getDatabaseName(),
inventoryDatabase.getDatabaseName());
statement.execute(createSameNameTableSql);
}
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(
TABLES.key(),
inventoryDatabase.getDatabaseName() + "." + inventoryDatabase.getDatabaseName());
Factory.Context context = new MockContext(Configuration.fromMap(options));
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
assertThat(dataSource.getSourceConfig().getTableList())
.isEqualTo(
Arrays.asList(
inventoryDatabase.getDatabaseName()
+ "."
+ inventoryDatabase.getDatabaseName()));
}
class MockContext implements Factory.Context {
Configuration factoryConfiguration;

Loading…
Cancel
Save