[mongodb] Support metadata columns for mongodb-cdc connector ()

pull/554/head
Jiabao Sun committed by GitHub
parent 51f5a90eb3
commit 65665ebb6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -250,6 +250,41 @@ Connector Options
Note: `heartbeat.interval.ms` is highly recommended to set a proper value larger than 0 **if the collection changes slowly**. Note: `heartbeat.interval.ms` is highly recommended to set a proper value larger than 0 **if the collection changes slowly**.
The heartbeat event can pushing the `resumeToken` forward to avoid `resumeToken` being expired when we recover the Flink job from a checkpoint or savepoint. The heartbeat event can pushing the `resumeToken` forward to avoid `resumeToken` being expired when we recover the Flink job from a checkpoint or savepoint.
Available Metadata
----------------
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 15%">Key</th>
<th class="text-left" style="width: 30%">DataType</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>database_name</td>
<td>STRING NOT NULL</td>
<td>Name of the database that contain the row.</td>
</tr>
<tr>
<td>collection_name</td>
<td>STRING NOT NULL</td>
<td>Name of the collection that contain the row.</td>
</tr>
<tr>
<td>op_ts</td>
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
</tr>
</tbody>
</table>
</div>
Features Features
-------- --------

@ -46,8 +46,6 @@ public class MongoDBConnectorSourceTask extends SourceTask {
private static final String TRUE = "true"; private static final String TRUE = "true";
private static final String CLUSTER_TIME_FIELD = "clusterTime";
private final MongoSourceTask target; private final MongoSourceTask target;
private final Field isCopyingField; private final Field isCopyingField;
@ -159,9 +157,11 @@ public class MongoDBConnectorSourceTask extends SourceTask {
private void markRecordTimestamp(SourceRecord record) { private void markRecordTimestamp(SourceRecord record) {
final Struct value = (Struct) record.value(); final Struct value = (Struct) record.value();
final Struct source = new Struct(value.schema().field(Envelope.FieldName.SOURCE).schema()); final Struct source = new Struct(value.schema().field(Envelope.FieldName.SOURCE).schema());
long timestamp = System.currentTimeMillis(); // It indicates the time that the change was made in the database. If the record is read
if (value.schema().field(CLUSTER_TIME_FIELD) != null) { // from snapshot of the table instead of the change stream, the value is always 0.
String clusterTime = value.getString(CLUSTER_TIME_FIELD); long timestamp = 0L;
if (value.schema().field(MongoDBEnvelope.CLUSTER_TIME_FIELD) != null) {
String clusterTime = value.getString(MongoDBEnvelope.CLUSTER_TIME_FIELD);
if (clusterTime != null) { if (clusterTime != null) {
timestamp = new JsonReader(clusterTime).readTimestamp().getTime() * 1000L; timestamp = new JsonReader(clusterTime).readTimestamp().getTime() * 1000L;
} }

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mongodb.internal;
/**
* An immutable descriptor for the structure of {@link
* com.mongodb.client.model.changestream.ChangeStreamDocument} envelopes.
*/
public class MongoDBEnvelope {
public static final String CLUSTER_TIME_FIELD = "clusterTime";
public static final String FULL_DOCUMENT_FIELD = "fullDocument";
public static final String DOCUMENT_KEY_FIELD = "documentKey";
public static final String OPERATION_TYPE_FIELD = "operationType";
public static final String NAMESPACE_FIELD = "ns";
public static final String NAMESPACE_DATABASE_FIELD = "db";
public static final String NAMESPACE_COLLECTION_FIELD = "coll";
}

@ -35,7 +35,10 @@ import org.apache.flink.util.Collector;
import com.mongodb.client.model.changestream.OperationType; import com.mongodb.client.model.changestream.OperationType;
import com.mongodb.internal.HexUtils; import com.mongodb.internal.HexUtils;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.AppendMetadataCollector;
import com.ververica.cdc.debezium.table.MetadataConverter;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
@ -76,12 +79,6 @@ public class MongoDBConnectorDeserializationSchema
private static final long serialVersionUID = 1750787080613035184L; private static final long serialVersionUID = 1750787080613035184L;
private static final String FULL_DOCUMENT_FIELD = "fullDocument";
private static final String DOCUMENT_KEY_FIELD = "documentKey";
private static final String OPERATION_TYPE_FIELD = "operationType";
/** TypeInformation of the produced {@link RowData}. */ /** TypeInformation of the produced {@link RowData}. */
private final TypeInformation<RowData> resultTypeInfo; private final TypeInformation<RowData> resultTypeInfo;
@ -90,14 +87,27 @@ public class MongoDBConnectorDeserializationSchema
/** /**
* Runtime converter that converts {@link * Runtime converter that converts {@link
* com.mongodb.client.model.changestream.ChangeStreamDocument}s into objects of Flink SQL * com.mongodb.client.model.changestream.ChangeStreamDocument}s into {@link RowData} consisted
* internal data structures. * of physical column values.
*/
protected final DeserializationRuntimeConverter physicalConverter;
/** Whether the deserializer needs to handle metadata columns. */
protected final boolean hasMetadata;
/**
* A wrapped output collector which is used to append metadata columns after physical columns.
*/ */
private final DeserializationRuntimeConverter runtimeConverter; private final AppendMetadataCollector appendMetadataCollector;
public MongoDBConnectorDeserializationSchema( public MongoDBConnectorDeserializationSchema(
RowType rowType, TypeInformation<RowData> resultTypeInfo, ZoneId localTimeZone) { RowType physicalDataType,
this.runtimeConverter = createConverter(rowType); MetadataConverter[] metadataConverters,
TypeInformation<RowData> resultTypeInfo,
ZoneId localTimeZone) {
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
this.physicalConverter = createConverter(physicalDataType);
this.resultTypeInfo = resultTypeInfo; this.resultTypeInfo = resultTypeInfo;
this.localTimeZone = localTimeZone; this.localTimeZone = localTimeZone;
} }
@ -109,19 +119,22 @@ public class MongoDBConnectorDeserializationSchema
OperationType op = operationTypeFor(record); OperationType op = operationTypeFor(record);
BsonDocument documentKey = BsonDocument documentKey =
checkNotNull(extractBsonDocument(value, valueSchema, DOCUMENT_KEY_FIELD)); checkNotNull(
BsonDocument fullDocument = extractBsonDocument(value, valueSchema, FULL_DOCUMENT_FIELD); extractBsonDocument(
value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD));
BsonDocument fullDocument =
extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD);
switch (op) { switch (op) {
case INSERT: case INSERT:
GenericRowData insert = extractRowData(fullDocument); GenericRowData insert = extractRowData(fullDocument);
insert.setRowKind(RowKind.INSERT); insert.setRowKind(RowKind.INSERT);
out.collect(insert); emit(record, insert, out);
break; break;
case DELETE: case DELETE:
GenericRowData delete = extractRowData(documentKey); GenericRowData delete = extractRowData(documentKey);
delete.setRowKind(RowKind.DELETE); delete.setRowKind(RowKind.DELETE);
out.collect(delete); emit(record, delete, out);
break; break;
case UPDATE: case UPDATE:
// Its null if another operation deletes the document // Its null if another operation deletes the document
@ -131,12 +144,12 @@ public class MongoDBConnectorDeserializationSchema
} }
GenericRowData updateAfter = extractRowData(fullDocument); GenericRowData updateAfter = extractRowData(fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER); updateAfter.setRowKind(RowKind.UPDATE_AFTER);
out.collect(updateAfter); emit(record, updateAfter, out);
break; break;
case REPLACE: case REPLACE:
GenericRowData replaceAfter = extractRowData(fullDocument); GenericRowData replaceAfter = extractRowData(fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER); replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
out.collect(replaceAfter); emit(record, replaceAfter, out);
break; break;
case INVALIDATE: case INVALIDATE:
case DROP: case DROP:
@ -150,7 +163,7 @@ public class MongoDBConnectorDeserializationSchema
private GenericRowData extractRowData(BsonDocument document) throws Exception { private GenericRowData extractRowData(BsonDocument document) throws Exception {
checkNotNull(document); checkNotNull(document);
return (GenericRowData) runtimeConverter.convert(document); return (GenericRowData) physicalConverter.convert(document);
} }
private BsonDocument extractBsonDocument(Struct value, Schema valueSchema, String fieldName) { private BsonDocument extractBsonDocument(Struct value, Schema valueSchema, String fieldName) {
@ -170,7 +183,18 @@ public class MongoDBConnectorDeserializationSchema
private OperationType operationTypeFor(SourceRecord record) { private OperationType operationTypeFor(SourceRecord record) {
Struct value = (Struct) record.value(); Struct value = (Struct) record.value();
return OperationType.fromString(value.getString(OPERATION_TYPE_FIELD)); return OperationType.fromString(value.getString(MongoDBEnvelope.OPERATION_TYPE_FIELD));
}
private void emit(SourceRecord inRecord, RowData physicalRow, Collector<RowData> collector) {
if (!hasMetadata) {
collector.collect(physicalRow);
return;
}
appendMetadataCollector.inputRecord = inRecord;
appendMetadataCollector.outputCollector = collector;
appendMetadataCollector.collect(physicalRow);
} }
// ------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import com.ververica.cdc.debezium.table.MetadataConverter;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/** Defines the supported metadata columns for {@link MongoDBTableSource}. */
public enum MongoDBReadableMetadata {
/** Name of the collection that contain the row. */
COLLECTION(
"collection_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct value = (Struct) record.value();
Struct to = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
return StringData.fromString(
to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD));
}
}),
/** Name of the database that contain the row. */
DATABASE(
"database_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct value = (Struct) record.value();
Struct to = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
return StringData.fromString(
to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD));
}
}),
/**
* It indicates the time that the change was made in the database. If the record is read from
* snapshot of the table instead of the change stream, the value is always 0.
*/
OP_TS(
"op_ts",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct value = (Struct) record.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
return TimestampData.fromEpochMillis(
(Long) source.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
});
private final String key;
private final DataType dataType;
private final MetadataConverter converter;
MongoDBReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}
public String getKey() {
return key;
}
public DataType getDataType() {
return dataType;
}
public MetadataConverter getConverter() {
return converter;
}
}

@ -24,19 +24,27 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.mongodb.MongoDBSource; import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.MetadataConverter;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
@ -44,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A {@link DynamicTableSource} that describes how to create a MongoDB change stream events source * A {@link DynamicTableSource} that describes how to create a MongoDB change stream events source
* from a logical description. * from a logical description.
*/ */
public class MongoDBTableSource implements ScanTableSource { public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetadata {
private final TableSchema physicalSchema; private final TableSchema physicalSchema;
private final String hosts; private final String hosts;
@ -64,6 +72,16 @@ public class MongoDBTableSource implements ScanTableSource {
private final Integer heartbeatIntervalMillis; private final Integer heartbeatIntervalMillis;
private final ZoneId localTimeZone; private final ZoneId localTimeZone;
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
/** Data type that describes the final output of the source. */
protected DataType producedDataType;
/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
public MongoDBTableSource( public MongoDBTableSource(
TableSchema physicalSchema, TableSchema physicalSchema,
String hosts, String hosts,
@ -99,6 +117,8 @@ public class MongoDBTableSource implements ScanTableSource {
this.pollAwaitTimeMillis = pollAwaitTimeMillis; this.pollAwaitTimeMillis = pollAwaitTimeMillis;
this.heartbeatIntervalMillis = heartbeatIntervalMillis; this.heartbeatIntervalMillis = heartbeatIntervalMillis;
this.localTimeZone = localTimeZone; this.localTimeZone = localTimeZone;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
} }
@Override @Override
@ -112,12 +132,15 @@ public class MongoDBTableSource implements ScanTableSource {
@Override @Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo = TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType()); scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer = DebeziumDeserializationSchema<RowData> deserializer =
new MongoDBConnectorDeserializationSchema(rowType, typeInfo, localTimeZone); new MongoDBConnectorDeserializationSchema(
physicalDataType, metadataConverters, typeInfo, localTimeZone);
MongoDBSource.Builder<RowData> builder = MongoDBSource.Builder<RowData> builder =
MongoDBSource.<RowData>builder() MongoDBSource.<RowData>builder()
@ -144,26 +167,61 @@ public class MongoDBTableSource implements ScanTableSource {
return SourceFunctionProvider.of(sourceFunction, false); return SourceFunctionProvider.of(sourceFunction, false);
} }
protected MetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new MetadataConverter[0];
}
return metadataKeys.stream()
.map(
key ->
Stream.of(MongoDBReadableMetadata.values())
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(MongoDBReadableMetadata::getConverter)
.toArray(MetadataConverter[]::new);
}
@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(MongoDBReadableMetadata.values())
.collect(
Collectors.toMap(
MongoDBReadableMetadata::getKey,
MongoDBReadableMetadata::getDataType));
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
@Override @Override
public DynamicTableSource copy() { public DynamicTableSource copy() {
return new MongoDBTableSource( MongoDBTableSource source =
physicalSchema, new MongoDBTableSource(
hosts, physicalSchema,
username, hosts,
password, username,
database, password,
collection, database,
connectionOptions, collection,
errorsTolerance, connectionOptions,
errorsLogEnable, errorsTolerance,
copyExisting, errorsLogEnable,
copyExistingPipeline, copyExisting,
copyExistingMaxThreads, copyExistingPipeline,
copyExistingQueueSize, copyExistingMaxThreads,
pollMaxBatchSize, copyExistingQueueSize,
pollAwaitTimeMillis, pollMaxBatchSize,
heartbeatIntervalMillis, pollAwaitTimeMillis,
localTimeZone); heartbeatIntervalMillis,
localTimeZone);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
} }
@Override @Override
@ -191,7 +249,9 @@ public class MongoDBTableSource implements ScanTableSource {
&& Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize) && Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
&& Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis) && Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis)
&& Objects.equals(heartbeatIntervalMillis, that.heartbeatIntervalMillis) && Objects.equals(heartbeatIntervalMillis, that.heartbeatIntervalMillis)
&& Objects.equals(localTimeZone, that.localTimeZone); && Objects.equals(localTimeZone, that.localTimeZone)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys);
} }
@Override @Override
@ -213,7 +273,9 @@ public class MongoDBTableSource implements ScanTableSource {
pollMaxBatchSize, pollMaxBatchSize,
pollAwaitTimeMillis, pollAwaitTimeMillis,
heartbeatIntervalMillis, heartbeatIntervalMillis,
localTimeZone); localTimeZone,
producedDataType,
metadataKeys);
} }
@Override @Override

@ -39,8 +39,11 @@ import org.junit.Test;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -352,6 +355,119 @@ public class MongoDBConnectorITCase extends MongoDBTestBase {
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@Test
public void testMetadataColumns() throws Exception {
String database = executeCommandFileInSeparateDatabase("inventory");
String sourceDDL =
String.format(
"CREATE TABLE mongodb_source ("
+ " _id STRING NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " collection_name STRING METADATA VIRTUAL,"
+ " PRIMARY KEY (_id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'mongodb-cdc',"
+ " 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000',"
+ " 'hosts' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database' = '%s',"
+ " 'collection' = '%s'"
+ ")",
MONGODB_CONTAINER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
"products");
String sinkDDL =
"CREATE TABLE meta_sink ("
+ " _id STRING NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " database_name STRING,"
+ " collection_name STRING,"
+ " PRIMARY KEY (_id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO meta_sink SELECT * FROM mongodb_source");
// wait for snapshot finished and begin binlog
waitForSinkSize("meta_sink", 9);
MongoCollection<Document> products = getMongoDatabase(database).getCollection("products");
products.updateOne(
Filters.eq("_id", new ObjectId("100000000000000000000106")),
Updates.set("description", "18oz carpenter hammer"));
products.updateOne(
Filters.eq("_id", new ObjectId("100000000000000000000107")),
Updates.set("weight", 5.1));
products.insertOne(
productDocOf(
"100000000000000000000110",
"jacket",
"water resistent white wind breaker",
0.2));
products.insertOne(
productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", 5.18));
products.updateOne(
Filters.eq("_id", new ObjectId("100000000000000000000110")),
Updates.combine(
Updates.set("description", "new water resistent white wind breaker"),
Updates.set("weight", 0.5)));
products.updateOne(
Filters.eq("_id", new ObjectId("100000000000000000000111")),
Updates.set("weight", 5.17));
products.deleteOne(Filters.eq("_id", new ObjectId("100000000000000000000111")));
waitForSinkSize("meta_sink", 16);
List<String> expected =
Stream.of(
"+I(100000000000000000000101,scooter,Small 2-wheel scooter,3.140,%s,products)",
"+I(100000000000000000000102,car battery,12V car battery,8.100,%s,products)",
"+I(100000000000000000000103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800,%s,products)",
"+I(100000000000000000000104,hammer,12oz carpenter''s hammer,0.750,%s,products)",
"+I(100000000000000000000105,hammer,12oz carpenter''s hammer,0.875,%s,products)",
"+I(100000000000000000000106,hammer,12oz carpenter''s hammer,1.000,%s,products)",
"+I(100000000000000000000107,rocks,box of assorted rocks,5.300,%s,products)",
"+I(100000000000000000000108,jacket,water resistent black wind breaker,0.100,%s,products)",
"+I(100000000000000000000109,spare tire,24 inch spare tire,22.200,%s,products)",
"+I(100000000000000000000110,jacket,water resistent white wind breaker,0.200,%s,products)",
"+I(100000000000000000000111,scooter,Big 2-wheel scooter,5.180,%s,products)",
"+U(100000000000000000000106,hammer,18oz carpenter hammer,1.000,%s,products)",
"+U(100000000000000000000107,rocks,box of assorted rocks,5.100,%s,products)",
"+U(100000000000000000000110,jacket,new water resistent white wind breaker,0.500,%s,products)",
"+U(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)",
"-D(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)")
.map(s -> String.format(s, database))
.sorted()
.collect(Collectors.toList());
List<String> actual = TestValuesTableFactory.getRawResults("meta_sink");
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) { while (sinkSize(sinkName) == 0) {
Thread.sleep(100); Thread.sleep(100);

@ -36,6 +36,7 @@ import org.junit.Test;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -57,9 +58,23 @@ public class MongoDBTableFactoryTest {
Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3))), Column.physical("eee", DataTypes.TIMESTAMP(3))),
new ArrayList<>(), Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Arrays.asList("_id"))); UniqueConstraint.primaryKey("pk", Arrays.asList("_id")));
private static final ResolvedSchema SCHEMA_WITH_METADATA =
new ResolvedSchema(
Arrays.asList(
Column.physical("_id", DataTypes.STRING().notNull()),
Column.physical("bbb", DataTypes.STRING().notNull()),
Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3)),
Column.metadata("time", DataTypes.TIMESTAMP(3), "op_ts", true),
Column.metadata(
"_database_name", DataTypes.STRING(), "database_name", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("_id")));
private static final String MY_HOSTS = "localhost:27017,localhost:27018"; private static final String MY_HOSTS = "localhost:27017,localhost:27018";
private static final String USER = "flinkuser"; private static final String USER = "flinkuser";
private static final String PASSWORD = "flinkpw"; private static final String PASSWORD = "flinkpw";
@ -75,7 +90,7 @@ public class MongoDBTableFactoryTest {
Map<String, String> properties = getAllOptions(); Map<String, String> properties = getAllOptions();
// validation for source // validation for source
DynamicTableSource actualSource = createTableSource(properties); DynamicTableSource actualSource = createTableSource(SCHEMA, properties);
MongoDBTableSource expectedSource = MongoDBTableSource expectedSource =
new MongoDBTableSource( new MongoDBTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)), TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
@ -111,7 +126,7 @@ public class MongoDBTableFactoryTest {
options.put("poll.max.batch.size", "102"); options.put("poll.max.batch.size", "102");
options.put("poll.await.time.ms", "103"); options.put("poll.await.time.ms", "103");
options.put("heartbeat.interval.ms", "104"); options.put("heartbeat.interval.ms", "104");
DynamicTableSource actualSource = createTableSource(options); DynamicTableSource actualSource = createTableSource(SCHEMA, options);
MongoDBTableSource expectedSource = MongoDBTableSource expectedSource =
new MongoDBTableSource( new MongoDBTableSource(
@ -135,6 +150,44 @@ public class MongoDBTableFactoryTest {
assertEquals(expectedSource, actualSource); assertEquals(expectedSource, actualSource);
} }
@Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource;
mongoDBSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = mongoDBSource.copy();
MongoDBTableSource expectedSource =
new MongoDBTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
MY_HOSTS,
USER,
PASSWORD,
MY_DATABASE,
MY_TABLE,
null,
ERROR_TOLERANCE,
ERROR_LOGS_ENABLE,
COPY_EXISTING,
null,
null,
null,
POLL_MAX_BATCH_SIZE_DEFAULT,
POLL_AWAIT_TIME_MILLIS_DEFAULT,
null,
LOCAL_TIME_ZONE);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
assertEquals(expectedSource, actualSource);
}
@Test @Test
public void testValidation() { public void testValidation() {
// validate unsupported option // validate unsupported option
@ -142,7 +195,7 @@ public class MongoDBTableFactoryTest {
Map<String, String> properties = getAllOptions(); Map<String, String> properties = getAllOptions();
properties.put("unknown", "abc"); properties.put("unknown", "abc");
createTableSource(properties); createTableSource(SCHEMA, properties);
fail("exception expected"); fail("exception expected");
} catch (Throwable t) { } catch (Throwable t) {
assertTrue( assertTrue(
@ -162,17 +215,18 @@ public class MongoDBTableFactoryTest {
return options; return options;
} }
private static DynamicTableSource createTableSource(Map<String, String> options) { private static DynamicTableSource createTableSource(
ResolvedSchema schema, Map<String, String> options) {
return FactoryUtil.createTableSource( return FactoryUtil.createTableSource(
null, null,
ObjectIdentifier.of("default", "default", "t1"), ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable( new ResolvedCatalogTable(
CatalogTable.of( CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(), fromResolvedSchema(schema).toSchema(),
"mock source", "mock source",
new ArrayList<>(), new ArrayList<>(),
options), options),
SCHEMA), schema),
new Configuration(), new Configuration(),
MongoDBTableFactoryTest.class.getClassLoader(), MongoDBTableFactoryTest.class.getClassLoader(),
false); false);

Loading…
Cancel
Save