[FLINK-36700][pipeline-connector][elasticsearch] Elasticsearch pipeline sink supports authentication

This closes  #3728
pull/3823/head
Junbo wang 2 weeks ago committed by GitHub
parent 56dbb1fd53
commit 0f675061e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -31,6 +31,7 @@ import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import org.apache.http.HttpHost;
import java.io.Serializable; import java.io.Serializable;
import java.time.ZoneId; import java.time.ZoneId;
@ -46,7 +47,7 @@ import java.time.ZoneId;
public class ElasticsearchDataSink<InputT> implements DataSink, Serializable { public class ElasticsearchDataSink<InputT> implements DataSink, Serializable {
/** The Elasticsearch sink options. */ /** The Elasticsearch sink options. */
private final ElasticsearchSinkOptions elasticsearchOptions; private final ElasticsearchSinkOptions esOptions;
/** The time zone ID for handling time-related operations. */ /** The time zone ID for handling time-related operations. */
private final ZoneId zoneId; private final ZoneId zoneId;
@ -58,13 +59,13 @@ public class ElasticsearchDataSink<InputT> implements DataSink, Serializable {
* @param zoneId The time zone ID for handling time-related operations. * @param zoneId The time zone ID for handling time-related operations.
*/ */
public ElasticsearchDataSink(ElasticsearchSinkOptions elasticsearchOptions, ZoneId zoneId) { public ElasticsearchDataSink(ElasticsearchSinkOptions elasticsearchOptions, ZoneId zoneId) {
this.elasticsearchOptions = elasticsearchOptions; this.esOptions = elasticsearchOptions;
this.zoneId = zoneId; this.zoneId = zoneId;
} }
@Override @Override
public EventSinkProvider getEventSinkProvider() { public EventSinkProvider getEventSinkProvider() {
switch (elasticsearchOptions.getVersion()) { switch (esOptions.getVersion()) {
case 6: case 6:
return getElasticsearch6SinkProvider(); return getElasticsearch6SinkProvider();
case 7: case 7:
@ -73,14 +74,14 @@ public class ElasticsearchDataSink<InputT> implements DataSink, Serializable {
return getElasticsearch8SinkProvider(); return getElasticsearch8SinkProvider();
default: default:
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Unsupported Elasticsearch version: " + elasticsearchOptions.getVersion()); "Unsupported Elasticsearch version: " + esOptions.getVersion());
} }
} }
private EventSinkProvider getElasticsearch6SinkProvider() { private EventSinkProvider getElasticsearch6SinkProvider() {
ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId); ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId);
org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts = org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts =
elasticsearchOptions.getHosts().stream() esOptions.getHosts().stream()
.map( .map(
host -> host ->
new org.apache.flink.elasticsearch6.shaded.org.apache.http new org.apache.flink.elasticsearch6.shaded.org.apache.http
@ -91,8 +92,7 @@ public class ElasticsearchDataSink<InputT> implements DataSink, Serializable {
.toArray( .toArray(
org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[]
::new); ::new);
Elasticsearch6SinkBuilder<Event> sinkBuilder =
return FlinkSinkProvider.of(
new Elasticsearch6SinkBuilder<Event>() new Elasticsearch6SinkBuilder<Event>()
.setHosts(hosts) .setHosts(hosts)
.setEmitter( .setEmitter(
@ -109,63 +109,36 @@ public class ElasticsearchDataSink<InputT> implements DataSink, Serializable {
(DeleteOperation) operation)); (DeleteOperation) operation));
} }
}) })
.setBulkFlushMaxActions(elasticsearchOptions.getMaxBatchSize()) .setBulkFlushMaxActions(esOptions.getMaxBatchSize())
.setBulkFlushInterval(elasticsearchOptions.getMaxTimeInBufferMS()) .setBulkFlushInterval(esOptions.getMaxTimeInBufferMS());
.build()); if (esOptions.getUsername() != null) {
sinkBuilder
.setConnectionUsername(esOptions.getUsername())
.setConnectionPassword(esOptions.getPassword());
} }
private EventSinkProvider getElasticsearch7SinkProvider() { return FlinkSinkProvider.of(sinkBuilder.build());
ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId);
org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts =
elasticsearchOptions.getHosts().stream()
.map(
host ->
new org.apache.flink.elasticsearch6.shaded.org.apache.http
.HttpHost(
host.getHostName(),
host.getPort(),
host.getSchemeName()))
.toArray(
org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[]
::new);
return FlinkSinkProvider.of(
new Elasticsearch6SinkBuilder<Event>()
.setHosts(hosts)
.setEmitter(
(element, context, indexer) -> {
BulkOperationVariant operation =
serializer.apply(element, context);
if (operation instanceof IndexOperation) {
indexer.add(
Elasticsearch6RequestCreator.createIndexRequest(
(IndexOperation<?>) operation));
} else if (operation instanceof DeleteOperation) {
indexer.add(
Elasticsearch6RequestCreator.createDeleteRequest(
(DeleteOperation) operation));
} }
})
.setBulkFlushMaxActions(elasticsearchOptions.getMaxBatchSize()) private EventSinkProvider getElasticsearch7SinkProvider() {
.setBulkFlushInterval(elasticsearchOptions.getMaxTimeInBufferMS()) return getElasticsearch6SinkProvider();
.build());
} }
private EventSinkProvider getElasticsearch8SinkProvider() { private EventSinkProvider getElasticsearch8SinkProvider() {
return FlinkSinkProvider.of( Elasticsearch8AsyncSinkBuilder<Event> sinkBuilder =
new Elasticsearch8AsyncSinkBuilder<Event>() new Elasticsearch8AsyncSinkBuilder<Event>()
.setHosts( .setHosts(esOptions.getHosts().toArray(new HttpHost[0]))
elasticsearchOptions
.getHosts()
.toArray(new org.apache.http.HttpHost[0]))
.setElementConverter(new ElasticsearchEventSerializer(zoneId)) .setElementConverter(new ElasticsearchEventSerializer(zoneId))
.setMaxBatchSize(elasticsearchOptions.getMaxBatchSize()) .setMaxBatchSize(esOptions.getMaxBatchSize())
.setMaxInFlightRequests(elasticsearchOptions.getMaxInFlightRequests()) .setMaxInFlightRequests(esOptions.getMaxInFlightRequests())
.setMaxBufferedRequests(elasticsearchOptions.getMaxBufferedRequests()) .setMaxBufferedRequests(esOptions.getMaxBufferedRequests())
.setMaxBatchSizeInBytes(elasticsearchOptions.getMaxBatchSizeInBytes()) .setMaxBatchSizeInBytes(esOptions.getMaxBatchSizeInBytes())
.setMaxTimeInBufferMS(elasticsearchOptions.getMaxTimeInBufferMS()) .setMaxTimeInBufferMS(esOptions.getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(elasticsearchOptions.getMaxRecordSizeInBytes()) .setMaxRecordSizeInBytes(esOptions.getMaxRecordSizeInBytes());
.build()); if (esOptions.getUsername() != null) {
sinkBuilder.setUsername(esOptions.getUsername()).setPassword(esOptions.getPassword());
}
return FlinkSinkProvider.of(sinkBuilder.build());
} }
@Override @Override

@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchContainer;
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils; import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost; import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost;
@ -43,14 +44,10 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZoneOffset; import java.time.ZoneOffset;
@ -67,11 +64,6 @@ public class Elasticsearch6DataSinkITCaseTest {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class); LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class);
private static final String ELASTICSEARCH_VERSION = "6.8.20"; private static final String ELASTICSEARCH_VERSION = "6.8.20";
private static final DockerImageName ELASTICSEARCH_IMAGE =
DockerImageName.parse(
"docker.elastic.co/elasticsearch/elasticsearch:"
+ ELASTICSEARCH_VERSION)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
@Container @Container
private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
@ -147,13 +139,9 @@ public class Elasticsearch6DataSinkITCaseTest {
} }
private static ElasticsearchContainer createElasticsearchContainer() { private static ElasticsearchContainer createElasticsearchContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) ElasticsearchContainer esContainer = new ElasticsearchContainer(ELASTICSEARCH_VERSION);
.withEnv("discovery.type", "single-node") esContainer.withLogConsumer(new Slf4jLogConsumer(LOG));
.withEnv("xpack.security.enabled", "false") return esContainer;
.withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
.withEnv("logger.org.elasticsearch", "ERROR")
.withLogConsumer(new Slf4jLogConsumer(LOG))
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5)));
} }
private RestHighLevelClient createElasticsearchClient() { private RestHighLevelClient createElasticsearchClient() {

@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchContainer;
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils; import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
import org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost; import org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost;
@ -43,14 +44,11 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerImageName;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZoneOffset; import java.time.ZoneOffset;
@ -147,13 +145,9 @@ public class Elasticsearch7DataSinkITCaseTest {
} }
private static ElasticsearchContainer createElasticsearchContainer() { private static ElasticsearchContainer createElasticsearchContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) ElasticsearchContainer esContainer = new ElasticsearchContainer(ELASTICSEARCH_VERSION);
.withEnv("discovery.type", "single-node") esContainer.withLogConsumer(new Slf4jLogConsumer(LOG));
.withEnv("xpack.security.enabled", "false") return esContainer;
.withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
.withEnv("logger.org.elasticsearch", "ERROR")
.withLogConsumer(new Slf4jLogConsumer(LOG))
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5)));
} }
private RestHighLevelClient createElasticsearchClient() { private RestHighLevelClient createElasticsearchClient() {

@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchContainer;
import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils; import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
@ -35,21 +36,23 @@ import co.elastic.clients.elasticsearch.core.GetResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport; import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZoneOffset; import java.time.ZoneOffset;
@ -67,11 +70,8 @@ public class ElasticsearchDataSinkITCaseTest {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class); LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class);
private static final String ELASTICSEARCH_VERSION = "8.12.1"; private static final String ELASTICSEARCH_VERSION = "8.12.1";
private static final DockerImageName ELASTICSEARCH_IMAGE = private static final String DEFAULT_USERNAME = "elastic";
DockerImageName.parse( private static final String DEFAULT_PASSWORD = "123456";
"docker.elastic.co/elasticsearch/elasticsearch:"
+ ELASTICSEARCH_VERSION)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
@Container @Container
private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
@ -147,16 +147,17 @@ public class ElasticsearchDataSinkITCaseTest {
} }
private static ElasticsearchContainer createElasticsearchContainer() { private static ElasticsearchContainer createElasticsearchContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) ElasticsearchContainer esContainer = new ElasticsearchContainer(ELASTICSEARCH_VERSION);
.withEnv("discovery.type", "single-node") esContainer.withLogConsumer(new Slf4jLogConsumer(LOG));
.withEnv("xpack.security.enabled", "false") esContainer.withPassword(DEFAULT_PASSWORD);
.withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") esContainer.withEnv("xpack.security.enabled", "true");
.withEnv("logger.org.elasticsearch", "ERROR") return esContainer;
.withLogConsumer(new Slf4jLogConsumer(LOG))
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5)));
} }
private ElasticsearchClient createElasticsearchClient() { private ElasticsearchClient createElasticsearchClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(DEFAULT_USERNAME, DEFAULT_PASSWORD));
RestClientTransport transport = RestClientTransport transport =
new RestClientTransport( new RestClientTransport(
RestClient.builder( RestClient.builder(
@ -164,6 +165,16 @@ public class ElasticsearchDataSinkITCaseTest {
ELASTICSEARCH_CONTAINER.getHost(), ELASTICSEARCH_CONTAINER.getHost(),
ELASTICSEARCH_CONTAINER.getFirstMappedPort(), ELASTICSEARCH_CONTAINER.getFirstMappedPort(),
"http")) "http"))
.setHttpClientConfigCallback(
new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpAsyncClientBuilder) {
return httpAsyncClientBuilder
.setDefaultCredentialsProvider(
credentialsProvider);
}
})
.build(), .build(),
new JacksonJsonpMapper()); new JacksonJsonpMapper());
return new ElasticsearchClient(transport); return new ElasticsearchClient(transport);
@ -196,7 +207,16 @@ public class ElasticsearchDataSinkITCaseTest {
null); null);
return new ElasticsearchSinkOptions( return new ElasticsearchSinkOptions(
5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, networkConfig, 8, null, null); 5,
1,
10,
50 * 1024 * 1024,
1000,
10 * 1024 * 1024,
networkConfig,
8,
DEFAULT_USERNAME,
DEFAULT_PASSWORD);
} }
private StreamExecutionEnvironment createStreamExecutionEnvironment() { private StreamExecutionEnvironment createStreamExecutionEnvironment() {

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.elasticsearch.sink.utils;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
/** Docker container for Elasticsearch. */
public class ElasticsearchContainer
extends org.testcontainers.elasticsearch.ElasticsearchContainer {
private static final String ELASTICSEARCH_IMAGE =
"docker.elastic.co/elasticsearch/elasticsearch";
public ElasticsearchContainer(String version) {
this(DockerImageName.parse(String.format("%s:%s", ELASTICSEARCH_IMAGE, version)));
}
public ElasticsearchContainer(DockerImageName imageName) {
super(imageName);
withEnv("discovery.type", "single-node");
withEnv("xpack.security.enabled", "false");
withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g");
withEnv("logger.org.elasticsearch", "ERROR");
waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5)));
}
}
Loading…
Cancel
Save