From cc79b4d062eb5b5ef0be5b4bbb54d1db2a689943 Mon Sep 17 00:00:00 2001 From: wangqh <1095193290@qq.com> Date: Wed, 29 Jan 2025 14:20:22 +0800 Subject: [PATCH] fix --- .../flink-cdc-pipeline-e2e-tests/pom.xml | 9 ++--- .../tests/MySqlToElasticsearchE2eITCase.java | 33 +++++++++---------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index 8f4feb3dd..a9780e953 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -189,12 +189,13 @@ limitations under the License. ${testcontainers.version} test + - com.alibaba - fastjson - 1.2.83 - test + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java index a864efe27..1492859c2 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java @@ -24,9 +24,7 @@ import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; @@ -55,6 +53,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -97,8 +96,7 @@ public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment { @Parameterized.Parameters(name = "flinkVersion: {0}, elasticsearchVersion: {1}") public static Collection getTestParameters() { List parameters = new ArrayList<>(); - - for (String flinkVersion : Arrays.asList("1.19.1", "1.20.0")) { + for (String flinkVersion : getFlinkVersion()) { parameters.add(new Object[] {flinkVersion, "6.8.20"}); parameters.add(new Object[] {flinkVersion, "7.10.2"}); parameters.add(new Object[] {flinkVersion, "8.12.1"}); @@ -117,8 +115,7 @@ public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment { .withNetworkAliases("elasticsearch") .withLogConsumer(new Slf4jLogConsumer(LOG)); - Startables.deepStart(Stream.of(MYSQL)).join(); - Startables.deepStart(Stream.of(elasticsearchContainer)).join(); + Startables.deepStart(Stream.of(MYSQL, elasticsearchContainer)).join(); } @After @@ -159,12 +156,12 @@ public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment { + " record.size.max.bytes: 5242880\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, databaseName, - elasticsearchVersion.split("\\.")[0] // Use major version number - ); + elasticsearchVersion.split("\\.")[0], + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path elasticsearchCdcConnector = TestUtils.getResource("elasticsearch-cdc-pipeline-connector.jar"); @@ -336,15 +333,17 @@ public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment { Thread.sleep(1000); throw new Exception(); } - JSONObject jsonObject = JSON.parseObject(responseBody); - JSONObject hits = jsonObject.getJSONObject("hits"); - JSONArray jsonArray = hits.getJSONArray("hits"); - for (int i = 0; i < jsonArray.size(); i++) { - JSONObject json = jsonArray.getJSONObject(i); - JSONObject sourceObject = json.getJSONObject("_source"); + ObjectMapper objectMapper = new ObjectMapper(); + Map responseMap = objectMapper.readValue(responseBody, Map.class); + Map hitsMap = (Map) responseMap.get("hits"); + List> hitsArr = (List>) hitsMap.get("hits"); + + for (int i = 0; i < hitsArr.size(); i++) { + Map hit = hitsArr.get(i); + Map source = (Map) hit.get("_source"); List doc = new ArrayList<>(); for (String column : columns) { - doc.add(String.valueOf(sourceObject.get(column))); + doc.add(String.valueOf(source.get(column))); } results.add(String.join(" | ", doc)); }