diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java index 53227ef84..14d7d4ebe 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java @@ -31,6 +31,7 @@ import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder; import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import org.apache.http.HttpHost; import java.io.Serializable; import java.time.ZoneId; @@ -46,7 +47,7 @@ import java.time.ZoneId; public class ElasticsearchDataSink implements DataSink, Serializable { /** The Elasticsearch sink options. */ - private final ElasticsearchSinkOptions elasticsearchOptions; + private final ElasticsearchSinkOptions esOptions; /** The time zone ID for handling time-related operations. */ private final ZoneId zoneId; @@ -58,13 +59,13 @@ public class ElasticsearchDataSink implements DataSink, Serializable { * @param zoneId The time zone ID for handling time-related operations. */ public ElasticsearchDataSink(ElasticsearchSinkOptions elasticsearchOptions, ZoneId zoneId) { - this.elasticsearchOptions = elasticsearchOptions; + this.esOptions = elasticsearchOptions; this.zoneId = zoneId; } @Override public EventSinkProvider getEventSinkProvider() { - switch (elasticsearchOptions.getVersion()) { + switch (esOptions.getVersion()) { case 6: return getElasticsearch6SinkProvider(); case 7: @@ -73,14 +74,14 @@ public class ElasticsearchDataSink implements DataSink, Serializable { return getElasticsearch8SinkProvider(); default: throw new IllegalArgumentException( - "Unsupported Elasticsearch version: " + elasticsearchOptions.getVersion()); + "Unsupported Elasticsearch version: " + esOptions.getVersion()); } } private EventSinkProvider getElasticsearch6SinkProvider() { ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId); org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts = - elasticsearchOptions.getHosts().stream() + esOptions.getHosts().stream() .map( host -> new org.apache.flink.elasticsearch6.shaded.org.apache.http @@ -91,8 +92,7 @@ public class ElasticsearchDataSink implements DataSink, Serializable { .toArray( org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] ::new); - - return FlinkSinkProvider.of( + Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder() .setHosts(hosts) .setEmitter( @@ -109,63 +109,36 @@ public class ElasticsearchDataSink implements DataSink, Serializable { (DeleteOperation) operation)); } }) - .setBulkFlushMaxActions(elasticsearchOptions.getMaxBatchSize()) - .setBulkFlushInterval(elasticsearchOptions.getMaxTimeInBufferMS()) - .build()); + .setBulkFlushMaxActions(esOptions.getMaxBatchSize()) + .setBulkFlushInterval(esOptions.getMaxTimeInBufferMS()); + if (esOptions.getUsername() != null) { + sinkBuilder + .setConnectionUsername(esOptions.getUsername()) + .setConnectionPassword(esOptions.getPassword()); + } + + return FlinkSinkProvider.of(sinkBuilder.build()); } private EventSinkProvider getElasticsearch7SinkProvider() { - ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId); - org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts = - elasticsearchOptions.getHosts().stream() - .map( - host -> - new org.apache.flink.elasticsearch6.shaded.org.apache.http - .HttpHost( - host.getHostName(), - host.getPort(), - host.getSchemeName())) - .toArray( - org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] - ::new); - - return FlinkSinkProvider.of( - new Elasticsearch6SinkBuilder() - .setHosts(hosts) - .setEmitter( - (element, context, indexer) -> { - BulkOperationVariant operation = - serializer.apply(element, context); - if (operation instanceof IndexOperation) { - indexer.add( - Elasticsearch6RequestCreator.createIndexRequest( - (IndexOperation) operation)); - } else if (operation instanceof DeleteOperation) { - indexer.add( - Elasticsearch6RequestCreator.createDeleteRequest( - (DeleteOperation) operation)); - } - }) - .setBulkFlushMaxActions(elasticsearchOptions.getMaxBatchSize()) - .setBulkFlushInterval(elasticsearchOptions.getMaxTimeInBufferMS()) - .build()); + return getElasticsearch6SinkProvider(); } private EventSinkProvider getElasticsearch8SinkProvider() { - return FlinkSinkProvider.of( + Elasticsearch8AsyncSinkBuilder sinkBuilder = new Elasticsearch8AsyncSinkBuilder() - .setHosts( - elasticsearchOptions - .getHosts() - .toArray(new org.apache.http.HttpHost[0])) + .setHosts(esOptions.getHosts().toArray(new HttpHost[0])) .setElementConverter(new ElasticsearchEventSerializer(zoneId)) - .setMaxBatchSize(elasticsearchOptions.getMaxBatchSize()) - .setMaxInFlightRequests(elasticsearchOptions.getMaxInFlightRequests()) - .setMaxBufferedRequests(elasticsearchOptions.getMaxBufferedRequests()) - .setMaxBatchSizeInBytes(elasticsearchOptions.getMaxBatchSizeInBytes()) - .setMaxTimeInBufferMS(elasticsearchOptions.getMaxTimeInBufferMS()) - .setMaxRecordSizeInBytes(elasticsearchOptions.getMaxRecordSizeInBytes()) - .build()); + .setMaxBatchSize(esOptions.getMaxBatchSize()) + .setMaxInFlightRequests(esOptions.getMaxInFlightRequests()) + .setMaxBufferedRequests(esOptions.getMaxBufferedRequests()) + .setMaxBatchSizeInBytes(esOptions.getMaxBatchSizeInBytes()) + .setMaxTimeInBufferMS(esOptions.getMaxTimeInBufferMS()) + .setMaxRecordSizeInBytes(esOptions.getMaxRecordSizeInBytes()); + if (esOptions.getUsername() != null) { + sinkBuilder.setUsername(esOptions.getUsername()).setPassword(esOptions.getPassword()); + } + return FlinkSinkProvider.of(sinkBuilder.build()); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java index 54eb4c54a..b8cb39ddf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchContainer; import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils; import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost; @@ -43,14 +44,10 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import java.math.BigDecimal; -import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; @@ -67,11 +64,6 @@ public class Elasticsearch6DataSinkITCaseTest { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class); private static final String ELASTICSEARCH_VERSION = "6.8.20"; - private static final DockerImageName ELASTICSEARCH_IMAGE = - DockerImageName.parse( - "docker.elastic.co/elasticsearch/elasticsearch:" - + ELASTICSEARCH_VERSION) - .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); @Container private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = @@ -147,13 +139,9 @@ public class Elasticsearch6DataSinkITCaseTest { } private static ElasticsearchContainer createElasticsearchContainer() { - return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) - .withEnv("discovery.type", "single-node") - .withEnv("xpack.security.enabled", "false") - .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") - .withEnv("logger.org.elasticsearch", "ERROR") - .withLogConsumer(new Slf4jLogConsumer(LOG)) - .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5))); + ElasticsearchContainer esContainer = new ElasticsearchContainer(ELASTICSEARCH_VERSION); + esContainer.withLogConsumer(new Slf4jLogConsumer(LOG)); + return esContainer; } private RestHighLevelClient createElasticsearchClient() { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java index fdc79aaf8..8644a6af9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchContainer; import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils; import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; import org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost; @@ -43,14 +44,11 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.math.BigDecimal; -import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; @@ -147,13 +145,9 @@ public class Elasticsearch7DataSinkITCaseTest { } private static ElasticsearchContainer createElasticsearchContainer() { - return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) - .withEnv("discovery.type", "single-node") - .withEnv("xpack.security.enabled", "false") - .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") - .withEnv("logger.org.elasticsearch", "ERROR") - .withLogConsumer(new Slf4jLogConsumer(LOG)) - .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5))); + ElasticsearchContainer esContainer = new ElasticsearchContainer(ELASTICSEARCH_VERSION); + esContainer.withLogConsumer(new Slf4jLogConsumer(LOG)); + return esContainer; } private RestHighLevelClient createElasticsearchClient() { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java index 8772d3fbc..ce8a25d49 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchContainer; import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils; import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; import org.apache.flink.streaming.api.datastream.DataStream; @@ -35,21 +36,23 @@ import co.elastic.clients.elasticsearch.core.GetResponse; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.rest_client.RestClientTransport; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import java.math.BigDecimal; -import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; @@ -67,11 +70,8 @@ public class ElasticsearchDataSinkITCaseTest { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class); private static final String ELASTICSEARCH_VERSION = "8.12.1"; - private static final DockerImageName ELASTICSEARCH_IMAGE = - DockerImageName.parse( - "docker.elastic.co/elasticsearch/elasticsearch:" - + ELASTICSEARCH_VERSION) - .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); + private static final String DEFAULT_USERNAME = "elastic"; + private static final String DEFAULT_PASSWORD = "123456"; @Container private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = @@ -147,16 +147,17 @@ public class ElasticsearchDataSinkITCaseTest { } private static ElasticsearchContainer createElasticsearchContainer() { - return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) - .withEnv("discovery.type", "single-node") - .withEnv("xpack.security.enabled", "false") - .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") - .withEnv("logger.org.elasticsearch", "ERROR") - .withLogConsumer(new Slf4jLogConsumer(LOG)) - .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5))); + ElasticsearchContainer esContainer = new ElasticsearchContainer(ELASTICSEARCH_VERSION); + esContainer.withLogConsumer(new Slf4jLogConsumer(LOG)); + esContainer.withPassword(DEFAULT_PASSWORD); + esContainer.withEnv("xpack.security.enabled", "true"); + return esContainer; } private ElasticsearchClient createElasticsearchClient() { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(DEFAULT_USERNAME, DEFAULT_PASSWORD)); RestClientTransport transport = new RestClientTransport( RestClient.builder( @@ -164,6 +165,16 @@ public class ElasticsearchDataSinkITCaseTest { ELASTICSEARCH_CONTAINER.getHost(), ELASTICSEARCH_CONTAINER.getFirstMappedPort(), "http")) + .setHttpClientConfigCallback( + new RestClientBuilder.HttpClientConfigCallback() { + @Override + public HttpAsyncClientBuilder customizeHttpClient( + HttpAsyncClientBuilder httpAsyncClientBuilder) { + return httpAsyncClientBuilder + .setDefaultCredentialsProvider( + credentialsProvider); + } + }) .build(), new JacksonJsonpMapper()); return new ElasticsearchClient(transport); @@ -196,7 +207,16 @@ public class ElasticsearchDataSinkITCaseTest { null); return new ElasticsearchSinkOptions( - 5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, networkConfig, 8, null, null); + 5, + 1, + 10, + 50 * 1024 * 1024, + 1000, + 10 * 1024 * 1024, + networkConfig, + 8, + DEFAULT_USERNAME, + DEFAULT_PASSWORD); } private StreamExecutionEnvironment createStreamExecutionEnvironment() { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchContainer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchContainer.java new file mode 100644 index 000000000..a113ab884 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchContainer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink.utils; + +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; + +/** Docker container for Elasticsearch. */ +public class ElasticsearchContainer + extends org.testcontainers.elasticsearch.ElasticsearchContainer { + + private static final String ELASTICSEARCH_IMAGE = + "docker.elastic.co/elasticsearch/elasticsearch"; + + public ElasticsearchContainer(String version) { + this(DockerImageName.parse(String.format("%s:%s", ELASTICSEARCH_IMAGE, version))); + } + + public ElasticsearchContainer(DockerImageName imageName) { + super(imageName); + withEnv("discovery.type", "single-node"); + withEnv("xpack.security.enabled", "false"); + withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g"); + withEnv("logger.org.elasticsearch", "ERROR"); + waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5))); + } +}