[mongodb] Support specific timestamp startup mode. (#2207)

pull/2179/head
Jiabao Sun 2 years ago committed by GitHub
parent 6f6c978e3d
commit aff5b0566d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -201,30 +201,32 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>电流和分离 <a href="https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options">连接选项</a> of MongoDB. eg. <br>
<td><a href="https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options">MongoDB连接选项</a>。 例如: <br>
<code>replicaSet=test&connectTimeoutMS=300000</code>
</td>
</tr>
<tr>
<td>copy.existing</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>是否从源集合复制现有数据。</td>
<td>scan.startup.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td> MongoDB CDC 消费者可选的启动模式,
合法的模式为 "initial""latest-offset" 和 "timestamp"。
请查阅 <a href="#a-name-id-002-a">启动模式</a> 章节了解更多详细信息。</td>
</tr>
<tr>
<td>copy.existing.queue.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">10240</td>
<td>Integer</td>
<td>复制数据时要使用的队列的最大大小。</td>
<td>scan.startup.timestamp-millis</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>起始毫秒数, 仅适用于 <code>'timestamp'</code> 启动模式.</td>
</tr>
<tr>
<td>batch.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>光标批次大小。</td>
<td>Cursor 批次大小。</td>
</tr>
<tr>
<td>poll.max.batch.size</td>
@ -245,7 +247,7 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
<td>optional</td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>发送检测信号消息之间的时间长度(以毫秒为单位)。使用 0 禁用。</td>
<td>心跳间隔(毫秒)。使用 0 禁用。</td>
</tr>
<tr>
<td>scan.incremental.snapshot.enabled</td>
@ -337,20 +339,38 @@ CREATE TABLE products (
MongoDB CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后在处理**甚至失败时继续读取带有**的更改流事件。
### Snapshot When Startup Or Not
### 启动模式<a name="启动模式" id="002" ></a>
配置选项 `copy.existing` 指定在 MongoDB CDC 消费者启动时是否执行快照。 <br>默认是 `true`.
配置选项```scan.startup.mode```指定 MySQL CDC 使用者的启动模式。有效枚举包括:
### 快照数据筛选器
- `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 oplog。
- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 oplog 事件。
配置选项 `copy.existing.pipeline` 描述复制现有数据时的筛选器。<br>
这可以只过滤所需的数据,并改进复制管理器对索引的使用。
例如使用 DataStream API:
```java
MongoDBSource.builder()
.startupOptions(StartupOptions.latest()) // Start from latest offset
.startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp
.build()
```
在下面的示例中,`$match` 聚合运算符确保只复制关闭字段设置为 false 的文档。
and with SQL:
```SQL
CREATE TABLE mongodb_source (...) WITH (
'connector' = 'mongodb-cdc',
'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
...
'scan.incremental.snapshot.enabled' = 'true', -- 指定时间戳启动,需要开启增量快照读
'scan.startup.mode' = 'timestamp', -- 指定时间戳启动模式
'scan.startup.timestamp-millis' = '1667232000000' -- 启动毫秒时间
...
)
```
'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]'
```
**Notes:**
- 'timestamp' 指定时间戳启动模式,需要开启增量快照读。
### 更改流

@ -206,11 +206,19 @@ Connector Options
</td>
</tr>
<tr>
<td>copy.existing</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether copy existing data from source collections.</td>
<td>scan.startup.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td>Optional startup mode for MongoDB CDC consumer, valid enumerations are "initial", "latest-offset" and "timestamp".
Please see <a href="#startup-reading-position">Startup Reading Position</a> section for more detailed information.</td>
</tr>
<tr>
<td>scan.startup.timestamp-millis</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Timestamp in millis of the start point, only used for <code>'timestamp'</code> startup mode.</td>
</tr>
<tr>
<td>copy.existing.queue.size</td>
@ -337,20 +345,40 @@ Features
The MongoDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change stream events with **exactly-once processing** even failures happen.
### Snapshot When Startup Or Not
### Startup Reading Position
The config option `copy.existing` specifies whether do snapshot when MongoDB CDC consumer startup. <br>Defaults to `true`.
The config option `scan.startup.mode` specifies the startup mode for MongoDB CDC consumer. The valid enumerations are:
### Snapshot Data Filters
- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest oplog.
- `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from
the end of the oplog which means only have the changes since the connector was started.
- `timestamp`: Skip snapshot phase and start reading oplog events from a specific timestamp.
The config option `copy.existing.pipeline` describing the filters when copying existing data.<br>
This can filter only required data and improve the use of indexes by the copying manager.
For example in DataStream API:
```java
MongoDBSource.builder()
.startupOptions(StartupOptions.latest()) // Start from latest offset
.startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp
.build()
```
In the following example, the `$match` aggregation operator ensures that only documents in which the closed field is set to false are copied.
and with SQL:
```SQL
CREATE TABLE mongodb_source (...) WITH (
'connector' = 'mongodb-cdc',
'scan.startup.mode' = 'latest-offset', -- Start from latest offset
...
'scan.incremental.snapshot.enabled' = 'true', -- To use timestamp startup mode should enable incremental snapshot.
'scan.startup.mode' = 'timestamp', -- Start from timestamp
'scan.startup.timestamp-millis' = '1667232000000' -- Timestamp under timestamp startup mode
...
)
```
'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]'
```
**Notes:**
- 'timestamp' startup mode is not supported by legacy source. To use timestamp startup mode, you need to enable incremental snapshot.
### Change Streams

@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.base.source.assigner;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState;
import com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
@ -128,10 +129,34 @@ public class StreamSplitAssigner implements SplitAssigner {
// ------------------------------------------------------------------------------------------
public StreamSplit createStreamSplit() {
StartupOptions startupOptions = sourceConfig.getStartupOptions();
Offset startingOffset;
switch (startupOptions.startupMode) {
case LATEST_OFFSET:
startingOffset = dialect.displayCurrentOffset(sourceConfig);
break;
case EARLIEST_OFFSET:
startingOffset = offsetFactory.createInitialOffset();
break;
case TIMESTAMP:
startingOffset =
offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis);
break;
case SPECIFIC_OFFSETS:
startingOffset =
offsetFactory.newOffset(
startupOptions.specificOffsetFile,
startupOptions.specificOffsetPos.longValue());
break;
default:
throw new IllegalStateException(
"Unsupported startup mode " + startupOptions.startupMode);
}
return new StreamSplit(
STREAM_SPLIT_ID,
dialect.displayCurrentOffset(sourceConfig),
startingOffset,
offsetFactory.createNoStoppingOffset(),
new ArrayList<>(),
new HashMap<>(),

@ -31,6 +31,8 @@ public abstract class OffsetFactory implements Serializable {
public abstract Offset newOffset(Long position);
public abstract Offset createTimestampOffset(long timestampMillis);
public abstract Offset createInitialOffset();
public abstract Offset createNoStoppingOffset();

@ -57,6 +57,10 @@ public class BinlogOffset extends Offset {
this(filename, position, 0L, 0L, 0L, null, null);
}
public BinlogOffset(long binlogEpochSecs) {
this(null, 0L, 0L, 0L, binlogEpochSecs, null, null);
}
public BinlogOffset(
String filename,
long position,

@ -43,6 +43,11 @@ public class BinlogOffsetFactory extends OffsetFactory {
throw new FlinkRuntimeException("not supported create new Offset by Long position.");
}
@Override
public Offset createTimestampOffset(long timestampMillis) {
return new BinlogOffset(timestampMillis / 1000);
}
@Override
public Offset createInitialOffset() {
return BinlogOffset.INITIAL_OFFSET;

@ -144,6 +144,12 @@ under the License.
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

@ -40,7 +40,6 @@ import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONG
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
@ -77,7 +76,7 @@ public class MongoDBSource {
private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue();
private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue();
private Boolean updateLookup = true;
private Boolean copyExisting = COPY_EXISTING.defaultValue();
private Boolean copyExisting = true;
private Integer copyExistingMaxThreads;
private Integer copyExistingQueueSize;
private String copyExistingPipeline;

@ -136,18 +136,13 @@ public class MongoDBSourceBuilder<T> {
}
/**
* copy.existing
* scan.startup.mode
*
* <p>Copy existing data from source collections and convert them to Change Stream events on
* their respective topics. Any changes to the data that occur during the copy process are
* applied once the copy is completed.
* <p>Optional startup mode for MongoDB CDC consumer, valid enumerations are initial,
* latest-offset, timestamp. Default: initial
*/
public MongoDBSourceBuilder<T> copyExisting(boolean copyExisting) {
if (copyExisting) {
this.configFactory.startupOptions(StartupOptions.initial());
} else {
this.configFactory.startupOptions(StartupOptions.latest());
}
public MongoDBSourceBuilder<T> startupOptions(StartupOptions startupOptions) {
this.configFactory.startupOptions(startupOptions);
return this;
}

@ -78,15 +78,6 @@ public class MongoDBSourceOptions {
"The ampersand-separated MongoDB connection options. "
+ "eg. replicaSet=test&connectTimeoutMS=300000");
public static final ConfigOption<Boolean> COPY_EXISTING =
ConfigOptions.key("copy.existing")
.booleanType()
.defaultValue(Boolean.TRUE)
.withDescription(
"Copy existing data from source collections and convert them "
+ "to Change Stream events on their respective topics. Any changes to the data "
+ "that occur during the copy process are applied once the copy is completed.");
public static final ConfigOption<Integer> COPY_EXISTING_QUEUE_SIZE =
ConfigOptions.key("copy.existing.queue.size")
.intType()

@ -16,6 +16,7 @@
package com.ververica.cdc.connectors.mongodb.source.offset;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import java.util.Map;
@ -39,7 +40,12 @@ public class ChangeStreamOffsetFactory extends OffsetFactory {
@Override
public ChangeStreamOffset newOffset(Long position) {
return new ChangeStreamOffset(bsonTimestampFromEpochMillis(position));
throw new UnsupportedOperationException("not supported create new Offset by position.");
}
@Override
public Offset createTimestampOffset(long timestampMillis) {
return new ChangeStreamOffset(bsonTimestampFromEpochMillis(timestampMillis));
}
@Override

@ -17,6 +17,7 @@
package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
@ -29,6 +30,7 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
@ -69,7 +71,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final String password;
private final String database;
private final String collection;
private final Boolean copyExisting;
private final StartupOptions startupOptions;
private final Integer copyExistingQueueSize;
private final Integer batchSize;
private final Integer pollMaxBatchSize;
@ -100,7 +102,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
@Nullable String database,
@Nullable String collection,
@Nullable String connectionOptions,
@Nullable Boolean copyExisting,
StartupOptions startupOptions,
@Nullable Integer copyExistingQueueSize,
@Nullable Integer batchSize,
@Nullable Integer pollMaxBatchSize,
@ -119,7 +121,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.database = database;
this.collection = collection;
this.connectionOptions = connectionOptions;
this.copyExisting = copyExisting;
this.startupOptions = checkNotNull(startupOptions);
this.copyExistingQueueSize = copyExistingQueueSize;
this.batchSize = batchSize;
this.pollMaxBatchSize = pollMaxBatchSize;
@ -181,6 +183,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
.scheme(scheme)
.hosts(hosts)
.closeIdleReaders(closeIdlerReaders)
.startupOptions(startupOptions)
.deserializer(deserializer);
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
@ -188,7 +191,6 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
Optional.ofNullable(username).ifPresent(builder::username);
Optional.ofNullable(password).ifPresent(builder::password);
Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
Optional.ofNullable(copyExisting).ifPresent(builder::copyExisting);
Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
@ -205,12 +207,24 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
.hosts(hosts)
.deserializer(deserializer);
switch (startupOptions.startupMode) {
case INITIAL:
builder.copyExisting(true);
break;
case LATEST_OFFSET:
builder.copyExisting(false);
break;
default:
throw new ValidationException(
startupOptions.startupMode
+ " is not supported by legacy source. To use this feature, 'scan.incremental.snapshot.enabled' needs to be set to true.");
}
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
Optional.ofNullable(username).ifPresent(builder::username);
Optional.ofNullable(password).ifPresent(builder::password);
Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
Optional.ofNullable(copyExisting).ifPresent(builder::copyExisting);
Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize);
Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
@ -265,7 +279,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
database,
collection,
connectionOptions,
copyExisting,
startupOptions,
copyExistingQueueSize,
batchSize,
pollMaxBatchSize,
@ -298,7 +312,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
&& Objects.equals(database, that.database)
&& Objects.equals(collection, that.collection)
&& Objects.equals(connectionOptions, that.connectionOptions)
&& Objects.equals(copyExisting, that.copyExisting)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize)
&& Objects.equals(batchSize, that.batchSize)
&& Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
@ -324,7 +338,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
database,
collection,
connectionOptions,
copyExisting,
startupOptions,
copyExistingQueueSize,
batchSize,
pollMaxBatchSize,

@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
@ -26,6 +27,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.utils.OptionUtils;
import java.time.ZoneId;
@ -34,10 +36,11 @@ import java.util.Set;
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COLLECTION;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.CONNECTION_OPTIONS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING_QUEUE_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.DATABASE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
@ -51,6 +54,7 @@ import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOp
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Factory for creating configured instance of {@link MongoDBTableSource}. */
public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
@ -83,7 +87,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
Integer heartbeatIntervalMillis = config.get(HEARTBEAT_INTERVAL_MILLIS);
Boolean copyExisting = config.get(COPY_EXISTING);
StartupOptions startupOptions = getStartupOptions(config);
Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
@ -114,7 +118,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
database,
collection,
connectionOptions,
copyExisting,
startupOptions,
copyExistingQueueSize,
batchSize,
pollMaxBatchSize,
@ -133,6 +137,37 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
message);
}
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
private static StartupOptions getStartupOptions(ReadableConfig config) {
String modeString = config.get(SCAN_STARTUP_MODE);
switch (modeString.toLowerCase()) {
case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
return StartupOptions.timestamp(
checkNotNull(
config.get(SCAN_STARTUP_TIMESTAMP_MILLIS),
String.format(
"To use timestamp startup mode, the startup timestamp millis '%s' must be set.",
SCAN_STARTUP_TIMESTAMP_MILLIS.key())));
default:
throw new ValidationException(
String.format(
"Invalid value for option '%s'. Supported values are [%s, %s, %s], but was: %s",
SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_LATEST,
SCAN_STARTUP_MODE_VALUE_TIMESTAMP,
modeString));
}
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
@ -154,7 +189,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(CONNECTION_OPTIONS);
options.add(DATABASE);
options.add(COLLECTION);
options.add(COPY_EXISTING);
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(COPY_EXISTING_QUEUE_SIZE);
options.add(BATCH_SIZE);
options.add(POLL_MAX_BATCH_SIZE);

@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
@ -52,6 +53,7 @@ import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.waitFo
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertThrows;
/** Integration tests for MongoDB change stream event SQL source. */
@RunWith(Parameterized.class)
@ -220,6 +222,94 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromTimestamp() throws Exception {
String database = ROUTER.executeCommandFileInSeparateDatabase("inventory");
// Unfortunately we have to sleep here to differ initial and later-generating changes in
// oplog by timestamp
Thread.sleep(5000L);
String sourceDDL =
String.format(
"CREATE TABLE mongodb_source ("
+ " _id STRING NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " PRIMARY KEY (_id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'mongodb-cdc',"
+ " 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000',"
+ " 'hosts' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database' = '%s',"
+ " 'collection' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'scan.startup.mode' = 'timestamp',"
+ " 'scan.startup.timestamp-millis' = '"
+ System.currentTimeMillis()
+ "',"
+ " 'heartbeat.interval.ms' = '1000'"
+ ")",
ROUTER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
"products",
parallelismSnapshot);
String sinkDDL =
"CREATE TABLE sink ("
+ " name STRING,"
+ " weightSum DECIMAL(10,3),"
+ " PRIMARY KEY (name) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
if (!parallelismSnapshot) {
assertThrows(
ValidationException.class,
() ->
tEnv.executeSql(
"INSERT INTO sink SELECT name, SUM(weight) FROM mongodb_source GROUP BY name"));
return;
}
// async submit job
TableResult result =
tEnv.executeSql(
"INSERT INTO sink SELECT name, SUM(weight) FROM mongodb_source GROUP BY name");
MongoCollection<Document> products =
mongodbClient.getDatabase(database).getCollection("products");
products.insertOne(
productDocOf(
"100000000000000000000110",
"jacket",
"water resistent white wind breaker",
0.2));
products.insertOne(
productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", 5.18));
waitForSinkSize("sink", 2);
String[] expected = new String[] {"jacket,0.200", "scooter,5.180"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@Test
public void testAllTypes() throws Throwable {
String database = ROUTER.executeCommandFileInSeparateDatabase("column_type_test");

@ -33,6 +33,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils;
import org.junit.Test;
@ -48,7 +49,6 @@ import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
@ -93,7 +93,6 @@ public class MongoDBTableFactoryTest {
private static final String MY_DATABASE = "myDB";
private static final String MY_TABLE = "myTable";
private static final ZoneId LOCAL_TIME_ZONE = ZoneId.systemDefault();
private static final Boolean COPY_EXISTING_DEFAULT = COPY_EXISTING.defaultValue();
private static final int BATCH_SIZE_DEFAULT = BATCH_SIZE.defaultValue();
private static final int POLL_MAX_BATCH_SIZE_DEFAULT = POLL_MAX_BATCH_SIZE.defaultValue();
private static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = POLL_AWAIT_TIME_MILLIS.defaultValue();
@ -123,7 +122,7 @@ public class MongoDBTableFactoryTest {
MY_DATABASE,
MY_TABLE,
null,
COPY_EXISTING_DEFAULT,
StartupOptions.initial(),
null,
BATCH_SIZE_DEFAULT,
POLL_MAX_BATCH_SIZE_DEFAULT,
@ -142,7 +141,8 @@ public class MongoDBTableFactoryTest {
Map<String, String> options = getAllOptions();
options.put("scheme", MONGODB_SRV_SCHEME);
options.put("connection.options", "replicaSet=test&connectTimeoutMS=300000");
options.put("copy.existing", "false");
options.put("scan.startup.mode", "timestamp");
options.put("scan.startup.timestamp-millis", "1667232000000");
options.put("copy.existing.queue.size", "100");
options.put("batch.size", "101");
options.put("poll.max.batch.size", "102");
@ -164,7 +164,7 @@ public class MongoDBTableFactoryTest {
MY_DATABASE,
MY_TABLE,
"replicaSet=test&connectTimeoutMS=300000",
false,
StartupOptions.timestamp(1667232000000L),
100,
101,
102,
@ -200,7 +200,7 @@ public class MongoDBTableFactoryTest {
MY_DATABASE,
MY_TABLE,
null,
COPY_EXISTING_DEFAULT,
StartupOptions.initial(),
null,
BATCH_SIZE_DEFAULT,
POLL_MAX_BATCH_SIZE_DEFAULT,

@ -45,6 +45,12 @@ public class RedoLogOffsetFactory extends OffsetFactory {
"Do not support to create RedoLogOffset by position.");
}
@Override
public Offset createTimestampOffset(long timestampMillis) {
throw new UnsupportedOperationException(
"Do not support to create RedoLogOffset by timestamp.");
}
@Override
public Offset createInitialOffset() {
return RedoLogOffset.INITIAL_OFFSET;

@ -44,6 +44,11 @@ public class LsnFactory extends OffsetFactory {
"not supported create new Offset by filename and position.");
}
@Override
public Offset createTimestampOffset(long timestampMillis) {
throw new UnsupportedOperationException("not supported create new Offset by timestamp.");
}
@Override
public Offset createInitialOffset() {
return LsnOffset.INITIAL_OFFSET;

Loading…
Cancel
Save