diff --git a/docs/content/connectors/sqlserver-cdc.md b/docs/content/connectors/sqlserver-cdc.md
index 4f6c2d818..09167c8a5 100644
--- a/docs/content/connectors/sqlserver-cdc.md
+++ b/docs/content/connectors/sqlserver-cdc.md
@@ -210,6 +210,14 @@ Connector Options
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true,
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
+
+
+ scan.incremental.snapshot.chunk.key-column |
+ optional |
+ (none) |
+ String |
+ The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table.
+ By default, the chunk key is the first column of the primary key. This column must be a column of the primary key. |
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java
index dd5572667..262139730 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.options.StartupOptions;
@@ -60,6 +61,7 @@ import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
/** IT tests for {@link OracleSourceBuilder.OracleIncrementalSource}. */
public class OracleSourceITCase extends OracleSourceTestBase {
@@ -127,7 +129,9 @@ public class OracleSourceITCase extends OracleSourceTestBase {
FailoverType.TM,
FailoverPhase.SNAPSHOT,
new String[] {"CUSTOMERS"},
- true);
+ true,
+ RestartStrategies.fixedDelayRestart(1, 0),
+ null);
}
@Test
@@ -302,6 +306,29 @@ public class OracleSourceITCase extends OracleSourceTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
+ @Test
+ public void testTableWithChunkColumnOfNoPrimaryKey() {
+ String chunkColumn = "NAME";
+ try {
+ testOracleParallelSource(
+ 1,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ new String[] {"CUSTOMERS"},
+ false,
+ RestartStrategies.noRestart(),
+ chunkColumn);
+ } catch (Exception e) {
+ assertTrue(
+ ExceptionUtils.findThrowableWithMessage(
+ e,
+ String.format(
+ "Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
+ chunkColumn, "ID", "customer.DEBEZIUM.CUSTOMERS"))
+ .isPresent());
+ }
+ }
+
private List testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
createAndInitialize("customer.sql");
@@ -400,7 +427,13 @@ public class OracleSourceITCase extends OracleSourceTestBase {
String[] captureCustomerTables)
throws Exception {
testOracleParallelSource(
- parallelism, failoverType, failoverPhase, captureCustomerTables, false);
+ parallelism,
+ failoverType,
+ failoverPhase,
+ captureCustomerTables,
+ false,
+ RestartStrategies.fixedDelayRestart(1, 0),
+ null);
}
private void testOracleParallelSource(
@@ -408,7 +441,9 @@ public class OracleSourceITCase extends OracleSourceTestBase {
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables,
- boolean skipSnapshotBackfill)
+ boolean skipSnapshotBackfill,
+ RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
+ String chunkColumn)
throws Exception {
createAndInitialize("customer.sql");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -416,7 +451,7 @@ public class OracleSourceITCase extends OracleSourceTestBase {
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.setRestartStrategy(restartStrategyConfiguration);
String sourceDDL =
format(
@@ -439,6 +474,7 @@ public class OracleSourceITCase extends OracleSourceTestBase {
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ + "%s"
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
@@ -447,7 +483,12 @@ public class OracleSourceITCase extends OracleSourceTestBase {
ORACLE_DATABASE,
ORACLE_SCHEMA,
getTableNameRegex(captureCustomerTables), // (customer|customer_1)
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ chunkColumn == null
+ ? ""
+ : ",'scan.incremental.snapshot.chunk.key-column'='"
+ + chunkColumn
+ + "'");
// first step: check the snapshot data
String[] snapshotForSingleTable =
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java
index f08d30326..76503faa7 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -238,7 +238,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
RestartStrategies.noRestart(),
- false);
+ false,
+ null);
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
@@ -256,7 +257,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
RestartStrategies.noRestart(),
- false);
+ false,
+ null);
}
}
@@ -269,7 +271,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
FailoverPhase.SNAPSHOT,
new String[] {"customers"},
RestartStrategies.fixedDelayRestart(1, 0),
- true);
+ true,
+ null);
}
@Test
@@ -621,6 +624,33 @@ public class PostgresSourceITCase extends PostgresTestBase {
Thread.sleep(1000L);
}
+ @Test
+ public void testTableWithChunkColumnOfNoPrimaryKey() {
+ if (!DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
+ return;
+ }
+ String chunkColumn = "name";
+ try {
+ testPostgresParallelSource(
+ 1,
+ scanStartupMode,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ new String[] {"customers"},
+ RestartStrategies.noRestart(),
+ false,
+ chunkColumn);
+ } catch (Exception e) {
+ assertTrue(
+ ExceptionUtils.findThrowableWithMessage(
+ e,
+ String.format(
+ "Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
+ chunkColumn, "id", "customer.customers"))
+ .isPresent());
+ }
+ }
+
private List testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill,
int fetchSize,
@@ -721,7 +751,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
failoverPhase,
captureCustomerTables,
RestartStrategies.fixedDelayRestart(1, 0),
- false);
+ false,
+ null);
}
private void testPostgresParallelSource(
@@ -731,7 +762,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
FailoverPhase failoverPhase,
String[] captureCustomerTables,
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
- boolean skipSnapshotBackfill)
+ boolean skipSnapshotBackfill,
+ String chunkColumn)
throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -761,6 +793,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ + ""
+ ")",
customDatabase.getHost(),
customDatabase.getDatabasePort(),
@@ -771,7 +804,12 @@ public class PostgresSourceITCase extends PostgresTestBase {
getTableNameRegex(captureCustomerTables),
scanStartupMode,
slotName,
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ chunkColumn == null
+ ? ""
+ : ",'scan.incremental.snapshot.chunk.key-column'='"
+ + chunkColumn
+ + "'");
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
index 3800e0cb4..a21156336 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
@@ -89,7 +89,8 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
long start = System.currentTimeMillis();
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
- Column splitColumn = SqlServerUtils.getSplitColumn(table);
+ Column splitColumn =
+ SqlServerUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
final List chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index ee9b7d38f..fb7bcaf2d 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -49,6 +49,7 @@ import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
@@ -205,7 +206,8 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
@Override
public RowType getSplitType(Table table) {
- return SqlServerUtils.getSplitType(table);
+ Column splitColumn = SqlServerUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
+ return SqlServerUtils.getSplitType(splitColumn);
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
index 9743384c6..184d1d3e9 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
@@ -37,6 +37,8 @@ import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.source.SourceRecord;
+import javax.annotation.Nullable;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -158,7 +160,7 @@ public class SqlServerUtils {
});
}
- public static Column getSplitColumn(Table table) {
+ public static Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
List primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
@@ -168,15 +170,27 @@ public class SqlServerUtils {
table.id()));
}
+ if (chunkKeyColumn != null) {
+ Optional targetPkColumn =
+ primaryKeys.stream()
+ .filter(col -> chunkKeyColumn.equals(col.name()))
+ .findFirst();
+ if (targetPkColumn.isPresent()) {
+ return targetPkColumn.get();
+ }
+ throw new ValidationException(
+ String.format(
+ "Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
+ chunkKeyColumn,
+ primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
+ table.id()));
+ }
+
// use first field in primary key as the split key
return primaryKeys.get(0);
}
- public static RowType getSplitType(Table table) {
- return getSplitType(getSplitColumn(table));
- }
-
- private static RowType getSplitType(Column splitColumn) {
+ public static RowType getSplitType(Column splitColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(), SqlServerTypeUtils.fromDbzColumn(splitColumn)))
.getLogicalType();
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
index 9e2a55e37..16039edd2 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
@@ -55,6 +56,7 @@ import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
/** IT tests for {@link SqlServerSourceBuilder.SqlServerIncrementalSource}. */
@@ -117,7 +119,9 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
FailoverType.TM,
FailoverPhase.SNAPSHOT,
new String[] {"dbo.customers"},
- true);
+ true,
+ RestartStrategies.fixedDelayRestart(1, 0),
+ null);
}
@Test
@@ -273,6 +277,29 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
+ @Test
+ public void testTableWithChunkColumnOfNoPrimaryKey() {
+ String chunkColumn = "name";
+ try {
+ testSqlServerParallelSource(
+ 1,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ new String[] {"dbo.customers"},
+ false,
+ RestartStrategies.noRestart(),
+ chunkColumn);
+ } catch (Exception e) {
+ assertTrue(
+ ExceptionUtils.findThrowableWithMessage(
+ e,
+ String.format(
+ "Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
+ chunkColumn, "id", "customer.dbo.customers"))
+ .isPresent());
+ }
+ }
+
private List testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
@@ -360,7 +387,13 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
String[] captureCustomerTables)
throws Exception {
testSqlServerParallelSource(
- parallelism, failoverType, failoverPhase, captureCustomerTables, false);
+ parallelism,
+ failoverType,
+ failoverPhase,
+ captureCustomerTables,
+ false,
+ RestartStrategies.fixedDelayRestart(1, 0),
+ null);
}
private void testSqlServerParallelSource(
@@ -368,7 +401,9 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables,
- boolean skipSnapshotBackfill)
+ boolean skipSnapshotBackfill,
+ RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
+ String chunkColumn)
throws Exception {
String databaseName = "customer";
@@ -379,7 +414,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.setRestartStrategy(restartStrategyConfiguration);
String sourceDDL =
format(
@@ -400,6 +435,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'scan.incremental.snapshot.chunk.size' = '4',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ + "%s"
+ ")",
MSSQL_SERVER_CONTAINER.getHost(),
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
@@ -407,7 +443,12 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
MSSQL_SERVER_CONTAINER.getPassword(),
databaseName,
getTableNameRegex(captureCustomerTables),
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ chunkColumn == null
+ ? ""
+ : ",'scan.incremental.snapshot.chunk.key-column'='"
+ + chunkColumn
+ + "'");
// first step: check the snapshot data
String[] snapshotForSingleTable =
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java
index 891021376..bfbc75339 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java
@@ -472,4 +472,67 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
+
+ @Test
+ public void testCompositePkTableSplitsUnevenlyWithChunkKeyColumn()
+ throws InterruptedException, ExecutionException {
+ if (parallelismSnapshot) {
+ testUseChunkColumn("product_kind");
+ }
+ }
+
+ @Test
+ public void testCompositePkTableSplitsEvenlyWithChunkKeyColumn()
+ throws ExecutionException, InterruptedException {
+ if (parallelismSnapshot) {
+ testUseChunkColumn("product_no");
+ }
+ }
+
+ private void testUseChunkColumn(String chunkColumn)
+ throws InterruptedException, ExecutionException {
+ initializeSqlServerTable("customer");
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE evenly_shopping_cart (\n"
+ + " product_no INT NOT NULL,\n"
+ + " product_kind VARCHAR(255),\n"
+ + " user_id VARCHAR(255) NOT NULL,\n"
+ + " description VARCHAR(255) NOT NULL\n"
+ + ") WITH ("
+ + " 'connector' = 'sqlserver-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.key-column' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.size' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s'"
+ + ")",
+ MSSQL_SERVER_CONTAINER.getHost(),
+ MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
+ MSSQL_SERVER_CONTAINER.getUsername(),
+ MSSQL_SERVER_CONTAINER.getPassword(),
+ parallelismSnapshot,
+ chunkColumn,
+ 4,
+ "customer",
+ "dbo.evenly_shopping_cart");
+ String sinkDDL =
+ "CREATE TABLE sink "
+ + " WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ") LIKE evenly_shopping_cart (EXCLUDING OPTIONS)";
+
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM evenly_shopping_cart");
+ waitForSinkSize("sink", 12);
+ result.getJobClient().get().cancel().get();
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/customer.sql
index 70f10fc85..b7569d873 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/customer.sql
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/customer.sql
@@ -83,3 +83,28 @@ VALUES (101,'user_1','Shanghai','123567891234'),
(1019,'user_20','Shanghai','123567891234'),
(2000,'user_21','Shanghai','123567891234');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers_1', @role_name = NULL, @supports_net_changes = 0;
+
+-- table has combined primary key and one of the primary key is evenly
+CREATE TABLE evenly_shopping_cart (
+ product_no INT NOT NULL,
+ product_kind VARCHAR(255),
+ user_id VARCHAR(255) NOT NULL,
+ description VARCHAR(255) NOT NULL,
+ PRIMARY KEY(product_kind, product_no, user_id)
+);
+
+insert into evenly_shopping_cart
+VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'),
+ (102, 'KIND_002', 'user_1', 'my shopping cart'),
+ (103, 'KIND_007', 'user_1', 'my shopping cart'),
+ (104, 'KIND_008', 'user_1', 'my shopping cart'),
+ (105, 'KIND_100', 'user_2', 'my shopping list'),
+ (105, 'KIND_999', 'user_3', 'my shopping list'),
+ (107, 'KIND_010', 'user_4', 'my shopping list'),
+ (108, 'KIND_009', 'user_4', 'my shopping list'),
+ (109, 'KIND_002', 'user_5', 'leo list'),
+ (111, 'KIND_007', 'user_5', 'leo list'),
+ (111, 'KIND_008', 'user_5', 'leo list'),
+ (112, 'KIND_009', 'user_6', 'my shopping cart');
+EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'evenly_shopping_cart', @role_name = NULL, @supports_net_changes = 0;
+