From c9d0091af4b459d2dc95dc8ea1843f818431707e Mon Sep 17 00:00:00 2001 From: Jiabao Sun <328226527@qq.com> Date: Wed, 1 Dec 2021 21:23:29 +0800 Subject: [PATCH] [mongodb] Support to connect MongoDB without authentication (#642) --- .../cdc/connectors/mongodb/MongoDBSource.java | 13 +++++++--- .../connectors/mongodb/MongoDBSourceTest.java | 26 +++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java index 87468df59..b12886589 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java @@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.mongodb; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; import com.mongodb.ConnectionString; import com.mongodb.client.model.changestream.FullDocument; @@ -321,14 +322,18 @@ public class MongoDBSource { } /** Build connection uri. */ - private ConnectionString buildConnectionUri() { + @VisibleForTesting + public ConnectionString buildConnectionUri() { StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://"); - if (username != null && password != null) { - sb.append(encodeValue(username)).append(":").append(encodeValue(password)); + if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) { + sb.append(encodeValue(username)) + .append(":") + .append(encodeValue(password)) + .append("@"); } - sb.append("@").append(checkNotNull(hosts)); + sb.append(checkNotNull(hosts)); if (StringUtils.isNotEmpty(connectionOptions)) { sb.append("/?").append(connectionOptions); diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBSourceTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBSourceTest.java index 8ee086bd4..a1705871e 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBSourceTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBSourceTest.java @@ -35,6 +35,7 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import com.jayway.jsonpath.JsonPath; +import com.mongodb.ConnectionString; import com.mongodb.client.MongoCollection; import com.mongodb.client.model.Filters; import com.mongodb.client.model.Updates; @@ -347,6 +348,31 @@ public class MongoDBSourceTest extends MongoDBTestBase { } } + @Test + public void testConnectionUri() { + String hosts = MONGODB_CONTAINER.getHostAndPort(); + + ConnectionString case0 = MongoDBSource.builder().hosts(hosts).buildConnectionUri(); + assertEquals(String.format("mongodb://%s", hosts), case0.toString()); + + ConnectionString case1 = + MongoDBSource.builder().username("").hosts(hosts).buildConnectionUri(); + assertEquals(String.format("mongodb://%s", hosts), case1.toString()); + + ConnectionString case2 = + MongoDBSource.builder().password("").hosts(hosts).buildConnectionUri(); + assertEquals(String.format("mongodb://%s", hosts), case2.toString()); + + ConnectionString case3 = + MongoDBSource.builder() + .username(FLINK_USER) + .password(FLINK_USER_PASSWORD) + .hosts(hosts) + .buildConnectionUri(); + assertEquals(FLINK_USER, case3.getUsername()); + assertEquals(FLINK_USER_PASSWORD, new String(case3.getPassword())); + } + // ------------------------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------------------------