From 5f3393c7f66d4608394a53d3717f33b64d53ad55 Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Mon, 8 Nov 2021 10:37:25 +0800 Subject: [PATCH] [hotfix][mongodb] Correct the connection uri building of MongoDB (#568) --- .../cdc/connectors/mongodb/MongoDBSource.java | 23 ++++++++++--------- .../connectors/mongodb/MongoDBTestBase.java | 2 +- .../src/test/resources/ddl/setup.js | 2 +- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java index f9642491e..87468df59 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java @@ -20,6 +20,7 @@ package com.ververica.cdc.connectors.mongodb; import org.apache.flink.annotation.PublicEvolving; +import com.mongodb.ConnectionString; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.kafka.connect.source.MongoSourceConfig; import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance; @@ -29,10 +30,9 @@ import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.Validator; import io.debezium.heartbeat.Heartbeat; +import org.apache.commons.lang3.StringUtils; import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URISyntaxException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.Locale; @@ -321,19 +321,20 @@ public class MongoDBSource { } /** Build connection uri. */ - private URI buildConnectionUri() { - String authority = checkNotNull(hosts); + private ConnectionString buildConnectionUri() { + StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://"); + if (username != null && password != null) { - authority = - String.format( - "%s:%s@%s", encodeValue(username), encodeValue(password), hosts); + sb.append(encodeValue(username)).append(":").append(encodeValue(password)); } - try { - return new URI(MONGODB_SCHEME, authority, "/", connectionOptions, null); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Cannot build mongo connection uri"); + sb.append("@").append(checkNotNull(hosts)); + + if (StringUtils.isNotEmpty(connectionOptions)) { + sb.append("/?").append(connectionOptions); } + + return new ConnectionString(sb.toString()); } /** diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBTestBase.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBTestBase.java index e28ec606d..f8cfd3aa0 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBTestBase.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBTestBase.java @@ -52,7 +52,7 @@ public class MongoDBTestBase extends AbstractTestBase { private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)//.*$"); protected static final String FLINK_USER = "flinkuser"; - protected static final String FLINK_USER_PASSWORD = "flinkpw"; + protected static final String FLINK_USER_PASSWORD = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;"; protected static final String MONGO_SUPER_USER = "superuser"; protected static final String MONGO_SUPER_PASSWORD = "superpw"; diff --git a/flink-connector-mongodb-cdc/src/test/resources/ddl/setup.js b/flink-connector-mongodb-cdc/src/test/resources/ddl/setup.js index db3a44f1b..dd3828308 100644 --- a/flink-connector-mongodb-cdc/src/test/resources/ddl/setup.js +++ b/flink-connector-mongodb-cdc/src/test/resources/ddl/setup.js @@ -36,7 +36,7 @@ if (db.system.users.find({user:'flinkuser'}).count() == 0) { db.createUser( { user: 'flinkuser', - pwd: 'flinkpw', + pwd: 'a1?~!@#$%^&*(){}[]<>.,+_-=/|:;', roles: [ { role: 'read', db: 'admin' }, { role: 'readAnyDatabase', db: 'admin' }