|
|
|
@ -20,6 +20,7 @@ package com.ververica.cdc.connectors.mongodb;
|
|
|
|
|
|
|
|
|
|
import org.apache.flink.annotation.PublicEvolving;
|
|
|
|
|
|
|
|
|
|
import com.mongodb.ConnectionString;
|
|
|
|
|
import com.mongodb.client.model.changestream.FullDocument;
|
|
|
|
|
import com.mongodb.kafka.connect.source.MongoSourceConfig;
|
|
|
|
|
import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
|
|
|
|
@ -29,10 +30,9 @@ import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
|
|
|
|
|
import com.ververica.cdc.debezium.DebeziumSourceFunction;
|
|
|
|
|
import com.ververica.cdc.debezium.Validator;
|
|
|
|
|
import io.debezium.heartbeat.Heartbeat;
|
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
|
|
|
|
|
import java.io.UnsupportedEncodingException;
|
|
|
|
|
import java.net.URI;
|
|
|
|
|
import java.net.URISyntaxException;
|
|
|
|
|
import java.net.URLEncoder;
|
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
|
import java.util.Locale;
|
|
|
|
@ -321,19 +321,20 @@ public class MongoDBSource {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Build connection uri. */
|
|
|
|
|
private URI buildConnectionUri() {
|
|
|
|
|
String authority = checkNotNull(hosts);
|
|
|
|
|
private ConnectionString buildConnectionUri() {
|
|
|
|
|
StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://");
|
|
|
|
|
|
|
|
|
|
if (username != null && password != null) {
|
|
|
|
|
authority =
|
|
|
|
|
String.format(
|
|
|
|
|
"%s:%s@%s", encodeValue(username), encodeValue(password), hosts);
|
|
|
|
|
sb.append(encodeValue(username)).append(":").append(encodeValue(password));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
return new URI(MONGODB_SCHEME, authority, "/", connectionOptions, null);
|
|
|
|
|
} catch (URISyntaxException e) {
|
|
|
|
|
throw new IllegalArgumentException("Cannot build mongo connection uri");
|
|
|
|
|
sb.append("@").append(checkNotNull(hosts));
|
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(connectionOptions)) {
|
|
|
|
|
sb.append("/?").append(connectionOptions);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return new ConnectionString(sb.toString());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|