[mongodb] Match multiple database and collection names using a regular expression in MongoDB. (#940)

Co-authored-by: Bobby Richard <bobbyrlg@gmail.com>
pull/947/head
Jiabao Sun 3 years ago committed by GitHub
parent 580391b6ee
commit 473e36b9cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -151,17 +151,19 @@ Connector Options
</tr>
<tr>
<td>database</td>
<td>required</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the database to watch for changes.</td>
<td>Name of the database to watch for changes. If not set then all databases will be captured. <br>
The database also supports regular expressions to monitor multiple databases matching the regular expression.</td>
</tr>
<tr>
<td>collection</td>
<td>required</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the collection in the database to watch for changes.</td>
<td>Name of the collection in the database to watch for changes. If not set then all collections will be captured.<br>
The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers.</td>
</tr>
<tr>
<td>connection.options</td>
@ -324,7 +326,7 @@ This can filter only required data and improve the use of indexes by the copying
In the following example, the `$match` aggregation operator ensures that only documents in which the closed field is set to false are copied.
```
copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]'
```
### Change Streams

@ -30,6 +30,7 @@ import com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer;
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.After;
@ -82,14 +83,12 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
// please check your '/etc/hosts' file contains the line 'internet_ip(not 127.0.0.1)
// hostname' e.g: '30.225.0.87 leonard.machine'
mongodb =
new MongoDBContainer()
.withNetwork(NETWORK)
new MongoDBContainer(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
Startables.deepStart(Stream.of(mongodb)).join();
executeCommandFileInMongoDB("mongo_setup", "admin");
MongoClientSettings settings =
MongoClientSettings.builder()
.applyConnectionString(
@ -225,7 +224,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
String command1 =
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("//") && !x.isEmpty())
.filter(x -> StringUtils.isNotBlank(x) && !x.trim().startsWith("//"))
.map(
x -> {
final Matcher m = COMMENT_PATTERN.matcher(x);

@ -1,48 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// In production you would almost certainly limit the replication user must be on the follower (slave) machine,
// to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
// However, in this database we'll grant 2 users different privileges:
//
// 1) 'flinkuser' - all privileges required by the snapshot reader AND oplog reader (used for testing)
// 2) 'superuser' - all privileges
//
//use admin;
if (db.system.users.find({user:'superuser'}).count() == 0) {
db.createUser(
{
user: 'superuser',
pwd: 'superpw',
roles: [ { role: 'root', db: 'admin' } ]
}
);
}
if (db.system.users.find({user:'flinkuser'}).count() == 0) {
db.createUser(
{
user: 'flinkuser',
pwd: 'a1?~!@#$%^&*(){}[]<>.,+_-=/|:;',
roles: [
{ role: 'read', db: 'admin' },
{ role: 'readAnyDatabase', db: 'admin' }
]
}
);
}
rs.status()

@ -36,9 +36,13 @@ import org.apache.commons.lang3.StringUtils;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -123,10 +127,9 @@ public class MongoDBSource {
private String hosts;
private String username;
private String password;
private String database;
private String collection;
private List<String> databaseList;
private List<String> collectionList;
private String connectionOptions;
private String pipeline;
private Integer batchSize;
private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS_DEFAULT;
private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE_DEFAULT;
@ -167,24 +170,18 @@ public class MongoDBSource {
return this;
}
/** Name of the database to watch for changes. */
public Builder<T> database(String database) {
this.database = database;
return this;
}
/** Name of the collection in the database to watch for changes. */
public Builder<T> collection(String collection) {
this.collection = collection;
/** Regular expressions list that match database names to be monitored. */
public Builder<T> databaseList(String... databaseList) {
this.databaseList = Arrays.asList(databaseList);
return this;
}
/**
* An array of objects describing the pipeline operations to run. eg. [{"$match":
* {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}]
* Regular expressions that match fully-qualified collection identifiers for collections to
* be monitored. Each identifier is of the form {@code <databaseName>.<collectionName>}.
*/
public Builder<T> pipeline(String pipeline) {
this.pipeline = pipeline;
public Builder<T> collectionList(String... collectionList) {
this.collectionList = Arrays.asList(collectionList);
return this;
}
@ -353,11 +350,17 @@ public class MongoDBSource {
"connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName());
props.setProperty("name", "mongodb_binlog_source");
ConnectionString connectionString = buildConnectionUri();
props.setProperty(
MongoSourceConfig.CONNECTION_URI_CONFIG, String.valueOf(buildConnectionUri()));
MongoSourceConfig.CONNECTION_URI_CONFIG, String.valueOf(connectionString));
if (databaseList != null) {
props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));
}
props.setProperty(MongoSourceConfig.DATABASE_CONFIG, checkNotNull(database));
props.setProperty(MongoSourceConfig.COLLECTION_CONFIG, checkNotNull(collection));
if (collectionList != null) {
props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList));
}
props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
props.setProperty(
@ -372,10 +375,6 @@ public class MongoDBSource {
props.setProperty(
MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA_VALUE_DEFAULT);
if (pipeline != null) {
props.setProperty(MongoSourceConfig.PIPELINE_CONFIG, pipeline);
}
if (batchSize != null) {
props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, String.valueOf(batchSize));
}

@ -18,6 +18,11 @@
package com.ververica.cdc.connectors.mongodb.internal;
import com.mongodb.ConnectionString;
import com.mongodb.MongoNamespace;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.MongoSourceTask;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SnapshotRecord;
@ -29,14 +34,34 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.bson.conversions.Bson;
import org.bson.json.JsonReader;
import java.lang.reflect.Field;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static com.mongodb.client.model.Aggregates.match;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.regex;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.ADD_NS_FIELD;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.ADD_NS_FIELD_NAME;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.bsonListToJson;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.collectionNames;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.collectionsFilter;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.completionPattern;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.databaseFilter;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.databaseNames;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.includeListAsPatterns;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.isIncludeListExplicitlySpecified;
/**
* Source Task that proxies mongodb kafka connector's {@link MongoSourceTask} to adapt to {@link
@ -44,6 +69,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class MongoDBConnectorSourceTask extends SourceTask {
public static final String DATABASE_INCLUDE_LIST = "database.include.list";
public static final String COLLECTION_INCLUDE_LIST = "collection.include.list";
private static final String TRUE = "true";
private static final Schema HEARTBEAT_VALUE_SCHEMA =
@ -77,6 +106,7 @@ public class MongoDBConnectorSourceTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
initCapturedCollections(props);
target.start(props);
isInSnapshotPhase = isCopying();
}
@ -226,4 +256,103 @@ public class MongoDBConnectorSourceTask extends SourceTask {
throw new IllegalStateException("Cannot access isCopying field of SourceTask", e);
}
}
private void initCapturedCollections(Map<String, String> props) {
ConnectionString connectionString =
new ConnectionString(props.get(MongoSourceConfig.CONNECTION_URI_CONFIG));
String databaseIncludeList = props.get(DATABASE_INCLUDE_LIST);
String collectionIncludeList = props.get(COLLECTION_INCLUDE_LIST);
List<String> databaseList =
Optional.ofNullable(databaseIncludeList)
.map(input -> Arrays.asList(input.split(",")))
.orElse(null);
List<String> collectionList =
Optional.ofNullable(collectionIncludeList)
.map(input -> Arrays.asList(input.split(",")))
.orElse(null);
if (collectionList != null) {
// Watching collections changes
List<String> discoveredDatabases;
List<String> discoveredCollections;
try (MongoClient mongoClient = MongoClients.create(connectionString)) {
discoveredDatabases = databaseNames(mongoClient, databaseFilter(databaseList));
discoveredCollections =
collectionNames(
mongoClient,
discoveredDatabases,
collectionsFilter(collectionList));
}
// case: database = db0, collection = coll1
if (isIncludeListExplicitlySpecified(collectionList, discoveredCollections)) {
MongoNamespace namespace = new MongoNamespace(discoveredCollections.get(0));
props.put(MongoSourceConfig.DATABASE_CONFIG, namespace.getDatabaseName());
props.put(MongoSourceConfig.COLLECTION_CONFIG, namespace.getCollectionName());
} else { // case: database = db0|db2, collection = (db0.coll[0-9])|(db1.coll[1-2])
String namespacesRegex =
includeListAsPatterns(collectionList).stream()
.map(Pattern::pattern)
.collect(Collectors.joining("|"));
List<Bson> pipeline = new ArrayList<>();
pipeline.add(ADD_NS_FIELD);
Bson nsFilter = regex(ADD_NS_FIELD_NAME, namespacesRegex);
if (databaseList != null) {
String databaseRegex =
includeListAsPatterns(databaseList).stream()
.map(Pattern::pattern)
.collect(Collectors.joining("|"));
Bson dbFilter = regex("ns.db", databaseRegex);
nsFilter = and(dbFilter, nsFilter);
}
pipeline.add(match(nsFilter));
props.put(MongoSourceConfig.PIPELINE_CONFIG, bsonListToJson(pipeline));
String copyExistingNamespaceRegex =
discoveredCollections.stream()
.map(ns -> completionPattern(ns).pattern())
.collect(Collectors.joining("|"));
props.put(
MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG,
copyExistingNamespaceRegex);
}
} else if (databaseList != null) {
// Watching databases changes
List<String> discoveredDatabases;
try (MongoClient mongoClient = MongoClients.create(connectionString)) {
discoveredDatabases = databaseNames(mongoClient, databaseFilter(databaseList));
}
if (isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
props.put(MongoSourceConfig.DATABASE_CONFIG, discoveredDatabases.get(0));
} else {
String databaseRegex =
includeListAsPatterns(databaseList).stream()
.map(Pattern::pattern)
.collect(Collectors.joining("|"));
List<Bson> pipeline = new ArrayList<>();
pipeline.add(match(regex("ns.db", databaseRegex)));
props.put(MongoSourceConfig.PIPELINE_CONFIG, bsonListToJson(pipeline));
String copyExistingNamespaceRegex =
discoveredDatabases.stream()
.map(db -> completionPattern(db + "\\..*").pattern())
.collect(Collectors.joining("|"));
props.put(
MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG,
copyExistingNamespaceRegex);
}
} else {
// Watching all changes on the cluster by default, we do nothing here
}
}
}

@ -34,6 +34,7 @@ import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.MetadataConverter;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
@ -46,6 +47,9 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.mongodb.MongoNamespace.checkCollectionNameValidity;
import static com.mongodb.MongoNamespace.checkDatabaseNameValidity;
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.containsRegexMetaCharacters;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@ -87,8 +91,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
String hosts,
@Nullable String username,
@Nullable String password,
String database,
String collection,
@Nullable String database,
@Nullable String collection,
@Nullable String connectionOptions,
@Nullable String errorsTolerance,
@Nullable Boolean errorsLogEnable,
@ -104,8 +108,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.hosts = checkNotNull(hosts);
this.username = username;
this.password = password;
this.database = checkNotNull(database);
this.collection = checkNotNull(collection);
this.database = database;
this.collection = collection;
this.connectionOptions = connectionOptions;
this.errorsTolerance = errorsTolerance;
this.errorsLogEnable = errorsLogEnable;
@ -142,11 +146,27 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
physicalDataType, metadataConverters, typeInfo, localTimeZone);
MongoDBSource.Builder<RowData> builder =
MongoDBSource.<RowData>builder()
.hosts(hosts)
.database(database)
.collection(collection)
.deserializer(deserializer);
MongoDBSource.<RowData>builder().hosts(hosts).deserializer(deserializer);
if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
// explicitly specified database and collection.
if (!containsRegexMetaCharacters(database)
&& !containsRegexMetaCharacters(collection)) {
checkDatabaseNameValidity(database);
checkCollectionNameValidity(collection);
builder.databaseList(database);
builder.collectionList(database + "." + collection);
} else {
builder.databaseList(database);
builder.collectionList(collection);
}
} else if (StringUtils.isNotEmpty(database)) {
builder.databaseList(database);
} else if (StringUtils.isNotEmpty(collection)) {
builder.collectionList(collection);
} else {
// Watching all changes on the cluster by default, we do nothing here
}
Optional.ofNullable(username).ifPresent(builder::username);
Optional.ofNullable(password).ifPresent(builder::password);

@ -72,14 +72,21 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
ConfigOptions.key("database")
.stringType()
.noDefaultValue()
.withDescription("Name of the database to watch for changes.");
.withDescription(
"Name of the database to watch for changes."
+ "The database also supports regular expression "
+ "to monitor multiple databases matches the regular expression."
+ "e.g. db[0-9] .");
private static final ConfigOption<String> COLLECTION =
ConfigOptions.key("collection")
.stringType()
.noDefaultValue()
.withDescription(
"Name of the collection in the database to watch for changes.");
"Name of the collection in the database to watch for changes."
+ "The collection also supports regular expression "
+ "to monitor multiple collections matches fully-qualified collection identifiers."
+ "e.g. db0\\.coll[0-9] .");
private static final ConfigOption<String> CONNECTION_OPTIONS =
ConfigOptions.key("connection.options")
@ -184,8 +191,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
String username = config.getOptional(USERNAME).orElse(null);
String password = config.getOptional(PASSWORD).orElse(null);
String database = config.get(DATABASE);
String collection = config.get(COLLECTION);
String database = config.getOptional(DATABASE).orElse(null);
String collection = config.getOptional(COLLECTION).orElse(null);
String errorsTolerance = config.get(ERRORS_TOLERANCE);
Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
@ -246,8 +253,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTS);
options.add(DATABASE);
options.add(COLLECTION);
return options;
}
@ -257,6 +262,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(USERNAME);
options.add(PASSWORD);
options.add(CONNECTION_OPTIONS);
options.add(DATABASE);
options.add(COLLECTION);
options.add(ERRORS_TOLERANCE);
options.add(ERRORS_LOG_ENABLE);
options.add(COPY_EXISTING);

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mongodb.utils;
import com.mongodb.MongoNamespace;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/** Utilities to discovery matched collections. */
public class CollectionDiscoveryUtils {
public static final String REGEX_META_CHARACTERS = ".$|()[]{}<>^?*+-=!\\";
public static final String ADD_NS_FIELD_NAME = "_ns_";
public static final Bson ADD_NS_FIELD =
BsonDocument.parse(
String.format(
"{'$addFields': {'%s': {'$concat': ['$ns.db', '.', '$ns.coll']}}}",
ADD_NS_FIELD_NAME));
private CollectionDiscoveryUtils() {}
public static List<String> databaseNames(
MongoClient mongoClient, Predicate<String> databaseFilter) {
List<String> databaseNames = new ArrayList<>();
mongoClient
.listDatabaseNames()
.forEach(
dbName -> {
if (databaseFilter.test(dbName)) {
databaseNames.add(dbName);
}
});
return databaseNames;
}
public static List<String> collectionNames(
MongoClient mongoClient,
List<String> databaseNames,
Predicate<String> collectionFilter) {
List<String> collectionNames = new ArrayList<>();
for (String dbName : databaseNames) {
MongoDatabase db = mongoClient.getDatabase(dbName);
db.listCollectionNames()
.map(collName -> dbName + "." + collName)
.forEach(
fullName -> {
if (collectionFilter.test(fullName)) {
collectionNames.add(fullName);
}
});
}
return collectionNames;
}
public static Predicate<String> databaseFilter(List<String> databaseList) {
Predicate<String> databaseFilter = CollectionDiscoveryUtils::isNotBuiltInDatabase;
if (databaseList != null && !databaseList.isEmpty()) {
List<Pattern> databasePatterns = includeListAsPatterns(databaseList);
databaseFilter = databaseFilter.and(anyMatch(databasePatterns));
}
return databaseFilter;
}
public static Predicate<String> collectionsFilter(List<String> collectionList) {
Predicate<String> collectionFilter = CollectionDiscoveryUtils::isNotBuiltInCollections;
if (collectionList != null && !collectionList.isEmpty()) {
List<Pattern> collectionPatterns = includeListAsPatterns(collectionList);
collectionFilter = collectionFilter.and(anyMatch(collectionPatterns));
}
return collectionFilter;
}
public static Predicate<String> anyMatch(List<Pattern> patterns) {
return s -> {
for (Pattern p : patterns) {
if (p.matcher(s).matches()) {
return true;
}
}
return false;
};
}
public static List<Pattern> includeListAsPatterns(List<String> includeList) {
if (includeList != null && !includeList.isEmpty()) {
return includeList.stream()
// Notice that MongoDB's database and collection names are case-sensitive.
// Please refer to https://docs.mongodb.com/manual/reference/limits/
// We use case-sensitive pattern here to avoid unexpected results.
.map(CollectionDiscoveryUtils::completionPattern)
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
public static boolean isIncludeListExplicitlySpecified(
List<String> includeList, List<String> discoveredList) {
if (includeList == null || includeList.size() != 1) {
return false;
}
if (discoveredList == null || discoveredList.size() != 1) {
return false;
}
String firstOfIncludeList = includeList.get(0);
String firstOfDiscoveredList = discoveredList.get(0);
return firstOfDiscoveredList.equals(firstOfIncludeList);
}
public static boolean isNotBuiltInCollections(String fullName) {
if (fullName == null) {
return false;
}
MongoNamespace namespace = new MongoNamespace(fullName);
return isNotBuiltInDatabase(namespace.getDatabaseName())
&& !namespace.getCollectionName().startsWith("system.");
}
public static boolean isNotBuiltInDatabase(String databaseName) {
if (databaseName == null) {
return false;
}
return !"local".equals(databaseName)
&& !"admin".equals(databaseName)
&& !"config".equals(databaseName);
}
public static boolean containsRegexMetaCharacters(String literal) {
if (StringUtils.isEmpty(literal)) {
return false;
}
for (int i = 0; i < literal.length(); i++) {
if (REGEX_META_CHARACTERS.indexOf(literal.charAt(i)) != -1) {
return true;
}
}
return false;
}
public static Pattern completionPattern(String pattern) {
if (pattern.startsWith("^") && pattern.endsWith("$")) {
return Pattern.compile(pattern);
}
return Pattern.compile("^(" + pattern + ")$");
}
public static String bsonListToJson(List<Bson> bsonList) {
StringBuilder sb = new StringBuilder();
sb.append("[");
boolean first = true;
for (Bson bson : bsonList) {
if (first) {
first = false;
} else {
sb.append(",");
}
sb.append(bson.toBsonDocument().toJson());
}
sb.append("]");
return sb.toString();
}
}

@ -382,8 +382,8 @@ public class MongoDBSourceTest extends MongoDBTestBase {
.hosts(MONGODB_CONTAINER.getHostAndPort())
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
.database(database)
.collection("products")
.databaseList(database)
.collectionList(database + ".products")
.deserializer(new ForwardDeserializeSchema())
.build();
}

@ -26,9 +26,12 @@ import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer;
import org.apache.commons.lang3.StringUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
@ -41,6 +44,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGO_SUPER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGO_SUPER_USER;
import static org.junit.Assert.assertNotNull;
/**
@ -54,11 +59,10 @@ public class MongoDBTestBase extends AbstractTestBase {
protected static final String FLINK_USER = "flinkuser";
protected static final String FLINK_USER_PASSWORD = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
protected static final String MONGO_SUPER_USER = "superuser";
protected static final String MONGO_SUPER_PASSWORD = "superpw";
@ClassRule public static final Network NETWORK = Network.newNetwork();
protected static final MongoDBContainer MONGODB_CONTAINER =
new MongoDBContainer().withLogConsumer(new Slf4jLogConsumer(LOG));
new MongoDBContainer(NETWORK).withLogConsumer(new Slf4jLogConsumer(LOG));
protected static MongoClient mongodbClient;
@ -66,7 +70,6 @@ public class MongoDBTestBase extends AbstractTestBase {
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(MONGODB_CONTAINER)).join();
executeCommandFileInDatabase("setup", "admin");
initialClient();
LOG.info("Containers are started.");
}
@ -111,8 +114,7 @@ public class MongoDBTestBase extends AbstractTestBase {
String command0 = String.format("db = db.getSiblingDB('%s');\n", dbName);
String command1 =
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("//") && !x.isEmpty())
.filter(x -> StringUtils.isNotBlank(x) && !x.trim().startsWith("//"))
.map(
x -> {
final Matcher m = COMMENT_PATTERN.matcher(x);

@ -42,14 +42,14 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.waitForSinkSize;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.waitForSnapshotStarted;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/** Integration tests for MongoDB change stream event SQL source. */
public class MongoDBConnectorITCase extends MongoDBTestBase {
@ -476,43 +476,6 @@ public class MongoDBConnectorITCase extends MongoDBTestBase {
result.getJobClient().get().cancel().get();
}
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);
}
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
waitForSinkSize(sinkName, expectedSize, 10, TimeUnit.MINUTES);
}
private static void waitForSinkSize(
String sinkName, int expectedSize, long timeout, TimeUnit timeUnit)
throws InterruptedException {
long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
while (sinkSize(sinkName) < expectedSize) {
if (System.nanoTime() > deadline) {
fail(
"Wait for sink size timeout, raw results: \n"
+ String.join(
"\n", TestValuesTableFactory.getRawResults(sinkName)));
}
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
private Document productDocOf(String id, String name, String description, Double weight) {
Document document = new Document();
if (id != null) {

@ -0,0 +1,282 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import com.mongodb.client.MongoDatabase;
import com.ververica.cdc.connectors.mongodb.MongoDBTestBase;
import org.bson.Document;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.waitForSinkSize;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.waitForSnapshotStarted;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/** Integration tests to check mongodb-cdc works well under namespace.regex. */
public class MongoDBRegexFilterITCase extends MongoDBTestBase {
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(1);
}
/** match multiple databases and collections: collection = ^(db0|db1)\.coll_a\d?$ . */
@Test
public void testMatchMultipleDatabasesAndCollections() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
String db0 = executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
String db1 = executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match: collection = ^(db0|db1)\.coll_a\d?$
String collectionRegex = String.format("^(%s|%s)\\.coll_a\\d?$", db0, db1);
TableResult result = submitTestCase(null, collectionRegex);
// 3. Wait snapshot finished
waitForSinkSize("mongodb_sink", 4);
// 4. Insert new records in database: [coll_a1.A102, coll_a2.A202, coll_b1.B102,
// coll_b1.B102]
insertRecordsInDatabase(db0);
insertRecordsInDatabase(db1);
// 5. Wait change stream records come
waitForSinkSize("mongodb_sink", 8);
// 6. Check results
String[] expected =
new String[] {
String.format("+I[%s, coll_a1, A101]", db0),
String.format("+I[%s, coll_a2, A201]", db0),
String.format("+I[%s, coll_a1, A101]", db1),
String.format("+I[%s, coll_a2, A201]", db1),
String.format("+I[%s, coll_a1, A102]", db0),
String.format("+I[%s, coll_a2, A202]", db0),
String.format("+I[%s, coll_a1, A102]", db1),
String.format("+I[%s, coll_a2, A202]", db1)
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
/** match multiple databases: database = db0|db1 . */
@Test
public void testMatchMultipleDatabases() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
String db0 = executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
String db1 = executeCommandFileInSeparateDatabase("ns_regex");
// db2: [coll_a1, coll_a2, coll_b1, coll_b2]
String db2 = executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match database: ^(db0|db1)$
String databaseRegex = String.format("%s|%s", db0, db1);
TableResult result = submitTestCase(databaseRegex, null);
// 3. Wait snapshot finished
waitForSinkSize("mongodb_sink", 8);
// 4. Insert new records in database: [coll_a1.A102, coll_a2.A202, coll_b1.B102,
// coll_b1.B102]
insertRecordsInDatabase(db0);
insertRecordsInDatabase(db1);
insertRecordsInDatabase(db2);
// 5. Wait change stream records come
waitForSinkSize("mongodb_sink", 16);
// 6. Check results
String[] expected =
new String[] {
String.format("+I[%s, coll_a1, A101]", db0),
String.format("+I[%s, coll_a2, A201]", db0),
String.format("+I[%s, coll_b1, B101]", db0),
String.format("+I[%s, coll_b2, B201]", db0),
String.format("+I[%s, coll_a1, A101]", db1),
String.format("+I[%s, coll_a2, A201]", db1),
String.format("+I[%s, coll_b1, B101]", db1),
String.format("+I[%s, coll_b2, B201]", db1),
String.format("+I[%s, coll_a1, A102]", db0),
String.format("+I[%s, coll_a2, A202]", db0),
String.format("+I[%s, coll_b1, B102]", db0),
String.format("+I[%s, coll_b2, B202]", db0),
String.format("+I[%s, coll_a1, A102]", db1),
String.format("+I[%s, coll_a2, A202]", db1),
String.format("+I[%s, coll_b1, B102]", db1),
String.format("+I[%s, coll_b2, B202]", db1),
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
/** match single database and multiple collections: collection = ^db0\.coll_b\d?$ . */
@Test
public void testMatchSingleQualifiedCollectionPattern() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
String db0 = executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
String db1 = executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match: collection ^(db0|db1)\.coll_a\d?$
String collectionRegex = String.format("^%s\\.coll_b\\d?$", db0);
TableResult result = submitTestCase(null, collectionRegex);
// 3. Wait snapshot finished
waitForSinkSize("mongodb_sink", 2);
// 4. Insert new records in database: [coll_a1.A102, coll_a2.A202, coll_b1.B102,
// coll_b1.B102]
insertRecordsInDatabase(db0);
insertRecordsInDatabase(db1);
// 5. Wait change stream records come
waitForSinkSize("mongodb_sink", 4);
// 6. Check results
String[] expected =
new String[] {
String.format("+I[%s, coll_b1, B101]", db0),
String.format("+I[%s, coll_b2, B201]", db0),
String.format("+I[%s, coll_b1, B102]", db0),
String.format("+I[%s, coll_b2, B202]", db0)
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
/** match single database and multiple collections: database = db0 collection = .*coll_b\d? . */
@Test
public void testMatchSingleDatabaseWithCollectionPattern() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
String db0 = executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
String db1 = executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match: collection .*coll_b\d?
String collectionRegex = ".*coll_b\\d?";
TableResult result = submitTestCase(db0, collectionRegex);
// 3. Wait snapshot finished
waitForSinkSize("mongodb_sink", 2);
// 4. Insert new records in database: [coll_a1.A102, coll_a2.A202, coll_b1.B102,
// coll_b1.B102]
insertRecordsInDatabase(db0);
insertRecordsInDatabase(db1);
// 5. Wait change stream records come
waitForSinkSize("mongodb_sink", 4);
// 6. Check results
String[] expected =
new String[] {
String.format("+I[%s, coll_b1, B101]", db0),
String.format("+I[%s, coll_b2, B201]", db0),
String.format("+I[%s, coll_b1, B102]", db0),
String.format("+I[%s, coll_b2, B202]", db0)
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
private TableResult submitTestCase(String database, String collection) throws Exception {
String sourceDDL =
"CREATE TABLE mongodb_source ("
+ " _id STRING NOT NULL,"
+ " seq STRING,"
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " coll_name STRING METADATA FROM 'collection_name' VIRTUAL,"
+ " PRIMARY KEY (_id) NOT ENFORCED"
+ ") WITH ("
+ ignoreIfNull("hosts", MONGODB_CONTAINER.getHostAndPort())
+ ignoreIfNull("username", FLINK_USER)
+ ignoreIfNull("password", FLINK_USER_PASSWORD)
+ ignoreIfNull("database", database)
+ ignoreIfNull("collection", collection)
+ " 'connector' = 'mongodb-cdc'"
+ ")";
String sinkDDL =
"CREATE TABLE mongodb_sink ("
+ " db_name STRING,"
+ " coll_name STRING,"
+ " seq STRING,"
+ " PRIMARY KEY (db_name, coll_name, seq) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result =
tEnv.executeSql(
"INSERT INTO mongodb_sink SELECT db_name, coll_name, seq FROM mongodb_source");
waitForSnapshotStarted("mongodb_sink");
return result;
}
private String ignoreIfNull(String configName, String configValue) {
return configValue != null ? String.format(" '%s' = '%s',", configName, configValue) : "";
}
private void insertRecordsInDatabase(String database) {
MongoDatabase db = getMongoDatabase(database);
db.getCollection("coll_a1").insertOne(new Document("seq", "A102"));
db.getCollection("coll_a2").insertOne(new Document("seq", "A202"));
db.getCollection("coll_b1").insertOne(new Document("seq", "B102"));
db.getCollection("coll_b2").insertOne(new Document("seq", "B202"));
}
}

@ -22,12 +22,11 @@ import com.github.dockerjava.api.command.InspectContainerResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.ImageFromDockerfile;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;
/** Mongodb test replica container. */
public class MongoDBContainer extends GenericContainer<MongoDBContainer> {
@ -38,55 +37,61 @@ public class MongoDBContainer extends GenericContainer<MongoDBContainer> {
private static final String DOCKER_IMAGE_NAME = "mongo:5.0.2";
public static final String MONGODB_HOST = "mongo0";
public static final int MONGODB_PORT = 27017;
public MongoDBContainer() {
super(DOCKER_IMAGE_NAME);
public static final String MONGO_SUPER_USER = "superuser";
public static final String MONGO_SUPER_PASSWORD = "superpw";
public MongoDBContainer(Network network) {
super(
new ImageFromDockerfile()
.withFileFromClasspath("random.key", "docker/random.key")
.withFileFromClasspath("setup.js", "docker/setup.js")
.withDockerfileFromBuilder(
builder ->
builder.from(DOCKER_IMAGE_NAME)
.copy(
"setup.js",
"/docker-entrypoint-initdb.d/setup.js")
.copy("random.key", "/data/keyfile/random.key")
.run("chown mongodb /data/keyfile/random.key")
.run("chmod 400 /data/keyfile/random.key")
.build()));
withNetwork(network);
withNetworkAliases(MONGODB_HOST);
withExposedPorts(MONGODB_PORT);
withCommand("--replSet", REPLICA_SET_NAME);
waitingFor(Wait.forLogMessage(".*[Ww]aiting for connections.*", 1));
}
public String getLocalHost() {
try {
Enumeration<NetworkInterface> networkInterfaces =
NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface ni = networkInterfaces.nextElement();
if (ni.isLoopback() || ni.isVirtual() || ni.isPointToPoint() || !ni.isUp()) {
continue;
}
Enumeration<InetAddress> addresses = ni.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if (address.isSiteLocalAddress()) {
return address.getHostAddress();
}
}
}
return InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) {
throw new IllegalStateException("Cannot get local ip address", e);
}
withEnv("MONGO_INITDB_ROOT_USERNAME", MONGO_SUPER_USER);
withEnv("MONGO_INITDB_ROOT_PASSWORD", MONGO_SUPER_PASSWORD);
withEnv("MONGO_INITDB_DATABASE", "admin");
withCommand("--replSet", REPLICA_SET_NAME, "--keyFile", "/data/keyfile/random.key");
waitingFor(Wait.forLogMessage(".*Replication has not yet been configured.*", 1));
}
public String getConnectionString(String username, String password) {
return String.format(
"mongodb://%s:%s@%s:%d",
username, password, getLocalHost(), getMappedPort(MONGODB_PORT));
username, password, getContainerIpAddress(), getMappedPort(MONGODB_PORT));
}
public String getHostAndPort() {
return String.format("%s:%s", getLocalHost(), getMappedPort(MONGODB_PORT));
return String.format("%s:%s", getContainerIpAddress(), getMappedPort(MONGODB_PORT));
}
public void executeCommand(String command) {
try {
LOG.info("Executing mongo command: {}", command);
ExecResult execResult = execInContainer("mongo", "--eval", command);
ExecResult execResult =
execInContainer(
"mongo",
"-u",
MONGO_SUPER_USER,
"-p",
MONGO_SUPER_PASSWORD,
"--eval",
command);
LOG.info(execResult.getStdout());
if (execResult.getExitCode() != 0) {
throw new IllegalStateException(
@ -107,7 +112,7 @@ public class MongoDBContainer extends GenericContainer<MongoDBContainer> {
executeCommand(
String.format(
"rs.initiate({ _id : '%s', members: [{ _id: 0, host: '%s:%d'}]})",
REPLICA_SET_NAME, getLocalHost(), getMappedPort(MONGODB_PORT)));
REPLICA_SET_NAME, MONGODB_HOST, MONGODB_PORT));
LOG.info("Waiting for single node node replica set initialized...");
executeCommand(

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mongodb.utils;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.fail;
/** MongoDB test utilities. */
public class MongoDBTestUtils {
public static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);
}
}
public static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
waitForSinkSize(sinkName, expectedSize, 10, TimeUnit.MINUTES);
}
public static void waitForSinkSize(
String sinkName, int expectedSize, long timeout, TimeUnit timeUnit)
throws InterruptedException {
long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
while (sinkSize(sinkName) < expectedSize) {
if (System.nanoTime() > deadline) {
fail(
"Wait for sink size timeout, raw results: \n"
+ String.join(
"\n", TestValuesTableFactory.getRawResults(sinkName)));
}
Thread.sleep(100);
}
}
public static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}

@ -0,0 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
db.getCollection('coll_a1').insertOne({"seq": "A101"});
db.getCollection('coll_a2').insertOne({"seq": "A201"});
db.getCollection('coll_b1').insertOne({"seq": "B101"});
db.getCollection('coll_b2').insertOne({"seq": "B201"});

@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
XK8G9pNKhEPp/BlsKT7pHEc5i0oCpvNVZMALH5pD/6EHSuMzuyO1FpoeDwmWHXl0
+Gp+VOI89Xp7E6eqop+fFHtoM3Mnk2oTiI/442GvS0xISPTwFVY9nO3MfO8VcPVx
J3JCAb80GeXD5x55eAOi7NqXzpjk0OKqfPEwIn1lrjlkL2m5vq6kaKEd93i1+bMh
3LRd1jLbgwWWxqYVV92BTQNnJin+G1er7Y2FzLpeFIKqyy+I22qIE2XIC7yj3wSw
kxwKsPN5LjFsfVeKpf169R0KgBg4Nm0qlllVUGNKuEjaVoLOEBOJgoPnhC6L2avc
/iDeunZDlDDgYG6t6aJXJelP+W1uXp4JQj1j18Scn0lrvgWxdAVrAtK6ftxqutHc
RQBt6Ap63zojTraulm3aeo/w/yz0zjyYjxQ5t8cojIM/7TaNLe2GfVxwhqitUPL1
ct2YFXWwX1H/+8E7yTsnquKqe6+r0aGQqxS5x+wFMsDun/1mxv7jgjwzZc1rEk8H
DGdhnQ7MFPOE6Bp03zGpa6B6K4I5uDgUUeOC7zmAN63cPEumuuCjPVK42sMt5wwR
NPJyL4+sWHa9vb2sBJ1dk3thQ+wwz856BZ9ILgeMUutQgasSwctlI7t3rhM+BGYy
+naEhKWN9/cIDXtl3ZMhNWJIh/MqbluYazQ/97MZHeWc9CJXFU6yUrnJOdE0VvQd
tROQNDuEB0Tq9ITxSYpZTY49+1CQp5E14GIc8frieWPvcbNVknriFquQfsW/tMvk
V2Aj8sBYE+sW9sGQJlyfRrhTSN6aBG1em7ZkOAgcx2/5ftaEZTwBxNnJR9VZDYEi
CDbobs3hIX3qhS6J9YbTEPFF2L6MMTL3ADgS44cWtmlYQrb2HJT0YLmdCzk4lSa6
yWYLorduRtblgGo6v/nn7y41gn/l/aRdcDUsii/LgMco4ZPSRm0HixD8oA3agX9/
23M5UVNCBO4/RKFOnjWM/2tN1xjeQrS2Hn6j3BtoTOl6k4ho

@ -15,34 +15,19 @@
// In production you would almost certainly limit the replication user must be on the follower (slave) machine,
// to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
// However, in this database we'll grant 2 users different privileges:
// However, in this database we'll grant flink user with privileges:
//
// 1) 'flinkuser' - all privileges required by the snapshot reader AND oplog reader (used for testing)
// 2) 'superuser' - all privileges
// 'flinkuser' - all privileges required by the snapshot reader AND oplog reader (used for testing)
//
//use admin;
if (db.system.users.find({user:'superuser'}).count() == 0) {
db.createUser(
{
user: 'superuser',
pwd: 'superpw',
roles: [ { role: 'root', db: 'admin' } ]
}
);
}
if (db.system.users.find({user:'flinkuser'}).count() == 0) {
db.createUser(
{
user: 'flinkuser',
pwd: 'a1?~!@#$%^&*(){}[]<>.,+_-=/|:;',
roles: [
{ role: 'read', db: 'admin' },
{ role: 'readAnyDatabase', db: 'admin' }
]
}
);
}
rs.status()
db.createUser(
{
user: 'flinkuser',
pwd: 'a1?~!@#$%^&*(){}[]<>.,+_-=/|:;',
roles: [
{ role: 'read', db: 'admin' },
{ role: 'readAnyDatabase', db: 'admin' }
]
}
);
Loading…
Cancel
Save