[mongodb] Support to connect MongoDB without authentication (#642)

pull/738/head
Jiabao Sun 3 years ago committed by Leonard Xu
parent 2b1e4a6ebe
commit c9d0091af4

@ -19,6 +19,7 @@
package com.ververica.cdc.connectors.mongodb;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import com.mongodb.ConnectionString;
import com.mongodb.client.model.changestream.FullDocument;
@ -321,14 +322,18 @@ public class MongoDBSource {
}
/** Build connection uri. */
private ConnectionString buildConnectionUri() {
@VisibleForTesting
public ConnectionString buildConnectionUri() {
StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://");
if (username != null && password != null) {
sb.append(encodeValue(username)).append(":").append(encodeValue(password));
if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
sb.append(encodeValue(username))
.append(":")
.append(encodeValue(password))
.append("@");
}
sb.append("@").append(checkNotNull(hosts));
sb.append(checkNotNull(hosts));
if (StringUtils.isNotEmpty(connectionOptions)) {
sb.append("/?").append(connectionOptions);

@ -35,6 +35,7 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.jayway.jsonpath.JsonPath;
import com.mongodb.ConnectionString;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
@ -347,6 +348,31 @@ public class MongoDBSourceTest extends MongoDBTestBase {
}
}
@Test
public void testConnectionUri() {
String hosts = MONGODB_CONTAINER.getHostAndPort();
ConnectionString case0 = MongoDBSource.builder().hosts(hosts).buildConnectionUri();
assertEquals(String.format("mongodb://%s", hosts), case0.toString());
ConnectionString case1 =
MongoDBSource.builder().username("").hosts(hosts).buildConnectionUri();
assertEquals(String.format("mongodb://%s", hosts), case1.toString());
ConnectionString case2 =
MongoDBSource.builder().password("").hosts(hosts).buildConnectionUri();
assertEquals(String.format("mongodb://%s", hosts), case2.toString());
ConnectionString case3 =
MongoDBSource.builder()
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
.hosts(hosts)
.buildConnectionUri();
assertEquals(FLINK_USER, case3.getUsername());
assertEquals(FLINK_USER_PASSWORD, new String(case3.getPassword()));
}
// ------------------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------------------

Loading…
Cancel
Save