diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java
index f9642491e..87468df59 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java
@@ -20,6 +20,7 @@ package com.ververica.cdc.connectors.mongodb;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import com.mongodb.ConnectionString;
 import com.mongodb.client.model.changestream.FullDocument;
 import com.mongodb.kafka.connect.source.MongoSourceConfig;
 import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
@@ -29,10 +30,9 @@ import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.DebeziumSourceFunction;
 import com.ververica.cdc.debezium.Validator;
 import io.debezium.heartbeat.Heartbeat;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Locale;
@@ -321,19 +321,20 @@ public class MongoDBSource {
         }
 
         /** Build connection uri. */
-        private URI buildConnectionUri() {
-            String authority = checkNotNull(hosts);
+        private ConnectionString buildConnectionUri() {
+            StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://");
+
             if (username != null && password != null) {
-                authority =
-                        String.format(
-                                "%s:%s@%s", encodeValue(username), encodeValue(password), hosts);
+                sb.append(encodeValue(username)).append(":").append(encodeValue(password));
             }
 
-            try {
-                return new URI(MONGODB_SCHEME, authority, "/", connectionOptions, null);
-            } catch (URISyntaxException e) {
-                throw new IllegalArgumentException("Cannot build mongo connection uri");
+            sb.append("@").append(checkNotNull(hosts));
+
+            if (StringUtils.isNotEmpty(connectionOptions)) {
+                sb.append("/?").append(connectionOptions);
             }
+
+            return new ConnectionString(sb.toString());
         }
 
         /**
diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBTestBase.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBTestBase.java
index e28ec606d..f8cfd3aa0 100644
--- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBTestBase.java
+++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBTestBase.java
@@ -52,7 +52,7 @@ public class MongoDBTestBase extends AbstractTestBase {
     private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)//.*$");
 
     protected static final String FLINK_USER = "flinkuser";
-    protected static final String FLINK_USER_PASSWORD = "flinkpw";
+    protected static final String FLINK_USER_PASSWORD = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
 
     protected static final String MONGO_SUPER_USER = "superuser";
     protected static final String MONGO_SUPER_PASSWORD = "superpw";
diff --git a/flink-connector-mongodb-cdc/src/test/resources/ddl/setup.js b/flink-connector-mongodb-cdc/src/test/resources/ddl/setup.js
index db3a44f1b..dd3828308 100644
--- a/flink-connector-mongodb-cdc/src/test/resources/ddl/setup.js
+++ b/flink-connector-mongodb-cdc/src/test/resources/ddl/setup.js
@@ -36,7 +36,7 @@ if (db.system.users.find({user:'flinkuser'}).count() == 0) {
   db.createUser(
    {
      user: 'flinkuser',
-     pwd: 'flinkpw',
+     pwd: 'a1?~!@#$%^&*(){}[]<>.,+_-=/|:;',
      roles: [
         { role: 'read', db: 'admin' },
         { role: 'readAnyDatabase', db: 'admin' }