pull/3898/head
wangqh 2 days ago
parent 44bff3a993
commit cc79b4d062

@ -189,12 +189,13 @@ limitations under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
<scope>test</scope>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
<build>

@ -24,9 +24,7 @@ import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
@ -55,6 +53,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -97,8 +96,7 @@ public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
@Parameterized.Parameters(name = "flinkVersion: {0}, elasticsearchVersion: {1}")
public static Collection<Object[]> getTestParameters() {
List<Object[]> parameters = new ArrayList<>();
for (String flinkVersion : Arrays.asList("1.19.1", "1.20.0")) {
for (String flinkVersion : getFlinkVersion()) {
parameters.add(new Object[] {flinkVersion, "6.8.20"});
parameters.add(new Object[] {flinkVersion, "7.10.2"});
parameters.add(new Object[] {flinkVersion, "8.12.1"});
@ -117,8 +115,7 @@ public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
.withNetworkAliases("elasticsearch")
.withLogConsumer(new Slf4jLogConsumer(LOG));
Startables.deepStart(Stream.of(MYSQL)).join();
Startables.deepStart(Stream.of(elasticsearchContainer)).join();
Startables.deepStart(Stream.of(MYSQL, elasticsearchContainer)).join();
}
@After
@ -159,12 +156,12 @@ public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
+ " record.size.max.bytes: 5242880\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
databaseName,
elasticsearchVersion.split("\\.")[0] // Use major version number
);
elasticsearchVersion.split("\\.")[0],
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path elasticsearchCdcConnector =
TestUtils.getResource("elasticsearch-cdc-pipeline-connector.jar");
@ -336,15 +333,17 @@ public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
Thread.sleep(1000);
throw new Exception();
}
JSONObject jsonObject = JSON.parseObject(responseBody);
JSONObject hits = jsonObject.getJSONObject("hits");
JSONArray jsonArray = hits.getJSONArray("hits");
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject json = jsonArray.getJSONObject(i);
JSONObject sourceObject = json.getJSONObject("_source");
ObjectMapper objectMapper = new ObjectMapper();
Map responseMap = objectMapper.readValue(responseBody, Map.class);
Map<String, Object> hitsMap = (Map<String, Object>) responseMap.get("hits");
List<Map<String, Object>> hitsArr = (List<Map<String, Object>>) hitsMap.get("hits");
for (int i = 0; i < hitsArr.size(); i++) {
Map<String, Object> hit = hitsArr.get(i);
Map<String, Object> source = (Map<String, Object>) hit.get("_source");
List<String> doc = new ArrayList<>();
for (String column : columns) {
doc.add(String.valueOf(sourceObject.get(column)));
doc.add(String.valueOf(source.get(column)));
}
results.add(String.join(" | ", doc));
}

Loading…
Cancel
Save