diff --git a/.github/workflows/flink_cdc.yml b/.github/workflows/flink_cdc.yml index 10f0494d4..2ccb59c86 100644 --- a/.github/workflows/flink_cdc.yml +++ b/.github/workflows/flink_cdc.yml @@ -71,7 +71,7 @@ jobs: strategy: matrix: java-version: [ '8' ] - flink-version: ['1.17.2', '1.18.1', '1.19.1', '1.20.0'] + flink-version: ['1.19.1', '1.20.0'] module: [ 'pipeline_e2e' ] name: Pipeline E2E Tests uses: ./.github/workflows/flink_cdc_base.yml @@ -83,7 +83,7 @@ jobs: strategy: matrix: java-version: [ '8' ] - flink-version: ['1.16.3', '1.17.2', '1.18.1', '1.19.1', '1.20.0'] + flink-version: ['1.19.1', '1.20.0'] module: [ 'source_e2e' ] name: Source E2E Tests uses: ./.github/workflows/flink_cdc_base.yml diff --git a/.github/workflows/flink_cdc_migration_test.yml b/.github/workflows/flink_cdc_migration_test.yml index 704942560..5b4e598bc 100644 --- a/.github/workflows/flink_cdc_migration_test.yml +++ b/.github/workflows/flink_cdc_migration_test.yml @@ -39,7 +39,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - flink-version: [ '1.18.1', '1.19.1', '1.20.0' ] + flink-version: [ '1.19.1', '1.20.0' ] steps: - uses: actions/checkout@v4 @@ -78,7 +78,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - flink-version: [ '1.18.1', '1.19.1', '1.20.0' ] + flink-version: [ '1.19.1', '1.20.0' ] steps: - uses: actions/checkout@v4 diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml index f634ec7a7..79a5d562b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml @@ -36,7 +36,7 @@ limitations under the License. UTF-8 8.12.1 - 1.18.0 + 1.19.1 4.0 2.13.2 --add-opens=java.base/java.util=ALL-UNNAMED diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml index 841271b4b..72065d657 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml @@ -35,7 +35,7 @@ limitations under the License. org.apache.flink flink-connector-kafka - 3.0.2-${flink.major.version} + 3.3.0-${flink.major.version} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml index 1bd7fe19c..33f6edc8a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml @@ -72,6 +72,13 @@ limitations under the License. test + + org.apache.commons + commons-compress + 1.26.0 + test + + org.mockito mockito-inline diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index c337b8cce..c05ebc292 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -18,6 +18,8 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -40,6 +42,8 @@ import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -572,7 +576,10 @@ public class PaimonSinkITCase { private static class MockCommitRequestImpl extends CommitRequestImpl { protected MockCommitRequestImpl(CommT committable) { - super(committable); + super( + committable, + InternalSinkCommitterMetricGroup.wrap( + UnregisteredMetricsGroup.createOperatorMetricGroup())); } } @@ -633,5 +640,15 @@ public class PaimonSinkITCase { public JobID getJobId() { return null; } + + @Override + public JobInfo getJobInfo() { + return null; + } + + @Override + public TaskInfo getTaskInfo() { + return null; + } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml index 251b8d47c..5dfd222f0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml @@ -28,7 +28,7 @@ limitations under the License. flink-cdc-pipeline-connector-starrocks - 1.2.9_flink-${flink.major.version} + 1.2.10_flink-${flink.major.version} @@ -38,15 +38,6 @@ limitations under the License. ${starrocks.connector.version} - - - org.apache.commons - commons-compress - 1.21 - - org.apache.flink flink-cdc-composer diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java deleted file mode 100644 index d029943e3..000000000 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/starrocks/shade/org/apache/commons/compress/utils/Lists.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.starrocks.shade.org.apache.commons.compress.utils; - -import java.util.ArrayList; - -/** - * Dummy class of shaded apache-commons since connector 1.2.9 depends on this, but not package it. - * This package should be removed after upgrading to 1.2.10 which will not use commons-compress - * anymore. - */ -public class Lists { - public static ArrayList newArrayList() { - return new ArrayList<>(); - } -} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java index d6622e3e0..1db2819ed 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java @@ -18,6 +18,8 @@ package org.apache.flink.cdc.connectors.starrocks.sink; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -353,5 +355,15 @@ public class EventRecordSerializationSchemaTest { public JobID getJobId() { throw new UnsupportedOperationException(); } + + @Override + public JobInfo getJobInfo() { + throw new UnsupportedOperationException(); + } + + @Override + public TaskInfo getTaskInfo() { + throw new UnsupportedOperationException(); + } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java index 7bf504c1d..6cde68ee4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java @@ -209,7 +209,7 @@ public class Db2ConnectorITCase extends Db2TestBase { "spare tire,22.200" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); cancelJobIfRunning(result); @@ -302,7 +302,7 @@ public class Db2ConnectorITCase extends Db2TestBase { Arrays.asList( "+I(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)", "+U(1,0,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); Collections.sort(expected); Collections.sort(actual); assertEquals(expected, actual); @@ -373,7 +373,7 @@ public class Db2ConnectorITCase extends Db2TestBase { String[] expected = new String[] {"110,jacket,new water resistent white wind breaker,0.500"}; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); cancelJobIfRunning(result); @@ -470,7 +470,7 @@ public class Db2ConnectorITCase extends Db2TestBase { "+U(testdb,DB2INST1,PRODUCTS,111,scooter,Big 2-wheel scooter ,5.170)", "-D(testdb,DB2INST1,PRODUCTS,111,scooter,Big 2-wheel scooter ,5.170)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); Collections.sort(expected); Collections.sort(actual); assertEquals(expected, actual); @@ -493,7 +493,7 @@ public class Db2ConnectorITCase extends Db2TestBase { private static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java index 1a5943492..0f6b5a625 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java @@ -401,7 +401,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size()); MongoDBAssertUtils.assertEqualsInAnyOrder( - fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // first round's changelog data makeOplogForAddressTableInRound(database, collection0, 0); @@ -418,7 +418,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { collection0, 417022095255614380L, cityName0, cityName0))); MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size()); MongoDBAssertUtils.assertEqualsInAnyOrder( - fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); } @@ -466,7 +466,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { captureTableThisRound, cityName, cityName))); MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size()); MongoDBAssertUtils.assertEqualsInAnyOrder( - fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 4: make changelog data for all collections before this round(also includes this // round), @@ -512,7 +512,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size()); MongoDBAssertUtils.assertEqualsInAnyOrder( - fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 6: trigger savepoint if (round != captureAddressCollections.length - 1) { finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); @@ -590,7 +590,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { } MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size()); MongoDBAssertUtils.assertEqualsInAnyOrder( - fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); } @@ -629,7 +629,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size()); MongoDBAssertUtils.assertEqualsInAnyOrder( - fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 3: make oplog data for all collections List expectedOplogDataThisRound = new ArrayList<>(); @@ -665,7 +665,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { } if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM - && TestValuesTableFactory.getRawResults("sink").size() + && TestValuesTableFactory.getRawResultsAsStrings("sink").size() > fetchedDataList.size()) { MongoDBTestUtils.triggerFailover( failoverType, @@ -679,7 +679,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size()); MongoDBAssertUtils.assertEqualsInAnyOrder( - fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 5: trigger savepoint finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); @@ -798,7 +798,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { fetchedDataList.addAll(expectedSnapshotDataThisRound); waitForUpsertSinkSize("sink", fetchedDataList.size()); MongoDBAssertUtils.assertEqualsInAnyOrder( - fetchedDataList, TestValuesTableFactory.getResults("sink")); + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 3: make some changelog data for this round makeFirstPartOplogForAddressCollection( @@ -843,7 +843,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { // checkpoint to wait retract old record and send new record Thread.sleep(1000); MongoDBAssertUtils.assertEqualsInAnyOrder( - fetchedDataList, TestValuesTableFactory.getResults("sink")); + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 6: trigger savepoint if (round != captureAddressCollections.length - 1) { @@ -1023,7 +1023,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { protected static int upsertSinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getResults(sinkName).size(); + return TestValuesTableFactory.getResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java index 19d47856e..edb8c27d7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java @@ -221,7 +221,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase { "spare tire,22.200" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -300,7 +300,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase { String[] expected = new String[] {"jacket,0.200", "scooter,5.180"}; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -463,7 +463,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase { "+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "-U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,18:36:04,2021-09-03,1960-08-11,2021-09-03T18:36:04.123,2021-09-03T18:36:04.123Z,2021-09-03T18:36:04,2021-09-03T18:36:04Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertEquals(expected, actual); result.getJobClient().get().cancel().get(); @@ -582,7 +582,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase { .sorted() .collect(Collectors.toList()); - List actual = TestValuesTableFactory.getRawResults("meta_sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("meta_sink"); Collections.sort(actual); assertEquals(expected, actual); result.getJobClient().get().cancel().get(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java index 3352fe1ef..8610c29b8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java @@ -115,7 +115,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase { String.format("+I[%s, coll_a2, A202]", db1) }; - List actual = TestValuesTableFactory.getResults("mongodb_sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -169,7 +169,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase { String.format("+I[%s, coll_b2, B202]", db1), }; - List actual = TestValuesTableFactory.getResults("mongodb_sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -208,7 +208,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase { String.format("+I[%s, coll_b2, B202]", db0) }; - List actual = TestValuesTableFactory.getResults("mongodb_sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -247,7 +247,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase { String.format("+I[%s, coll_b2, B202]", db0) }; - List actual = TestValuesTableFactory.getResults("mongodb_sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -267,7 +267,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase { // 3. Check results String[] expected = new String[] {String.format("+I[%s, coll-a1, A101]", db0)}; - List actual = TestValuesTableFactory.getResults("mongodb_sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -292,7 +292,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase { String.format("+I[%s, coll.name, A103]", db) }; - List actual = TestValuesTableFactory.getResults("mongodb_sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java index 260270a20..7b826dd71 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java @@ -56,7 +56,8 @@ public class MongoDBTestUtils { fail( "Wait for sink size timeout, raw results: \n" + String.join( - "\n", TestValuesTableFactory.getRawResults(sinkName))); + "\n", + TestValuesTableFactory.getRawResultsAsStrings(sinkName))); } Thread.sleep(100); } @@ -65,7 +66,7 @@ public class MongoDBTestUtils { public static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java index c93c1dc64..7be090a8f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -326,7 +326,8 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { operator.getOperatorIdFuture(), serializer, accumulatorName, - env.getCheckpointConfig()); + env.getCheckpointConfig(), + 10000L); CollectStreamSink sink = new CollectStreamSink(source, factory); sink.name("Data stream collect sink"); env.addOperator(sink.getTransformation()); @@ -807,7 +808,8 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { operator.getOperatorIdFuture(), serializer, accumulatorName, - env.getCheckpointConfig()); + env.getCheckpointConfig(), + 10000L); CollectStreamSink sink = new CollectStreamSink<>(stream, factory); sink.name("Data stream collect sink"); env.addOperator(sink.getTransformation()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 4ca1e26a4..215dc3ec7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -179,7 +179,7 @@ public abstract class MySqlSourceTestBase extends TestLogger { protected static int upsertSinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getResults(sinkName).size(); + return TestValuesTableFactory.getResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java index d1eba52ef..1f3fa0d1a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java @@ -535,7 +535,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { operator.getOperatorIdFuture(), serializer, accumulatorName, - stream.getExecutionEnvironment().getCheckpointConfig()); + stream.getExecutionEnvironment().getCheckpointConfig(), + 10000L); return iterator; } @@ -638,7 +639,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { () -> sleepMs(100)); } waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); } @@ -673,7 +675,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { JobClient jobClient = tableResult.getJobClient().get(); waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 3: make binlog data for all tables List expectedBinlogDataThisRound = new ArrayList<>(); @@ -701,7 +704,7 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { } if (failoverPhase == FailoverPhase.BINLOG - && TestValuesTableFactory.getRawResults("sink").size() + && TestValuesTableFactory.getRawResultsAsStrings("sink").size() > fetchedDataList.size()) { triggerFailover( failoverType, @@ -713,7 +716,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { fetchedDataList.addAll(expectedBinlogDataThisRound); // step 4: assert fetched binlog data in this round waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 5: trigger savepoint finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); @@ -827,7 +831,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { } fetchedDataList.addAll(expectedSnapshotDataThisRound); waitForUpsertSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 3: make some binlog data for this round makeFirstPartBinlogForAddressTable(getConnection(), newlyAddedTable); @@ -869,7 +874,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { // the result size of sink may arrive fetchedDataList.size() with old data, wait one // checkpoint to wait retract old record and send new record Thread.sleep(1000); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 6: trigger savepoint if (round != captureAddressTables.length - 1) { @@ -1108,7 +1114,7 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { private static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; @@ -1176,7 +1182,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { newlyAddedTable, cityName, cityName)); fetchedDataList.addAll(expectedSnapshotDataThisRound); waitForUpsertSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 3: make some binlog data for this round makeFirstPartBinlogForAddressTable(getConnection(), newlyAddedTable); makeSecondPartBinlogForAddressTable(getConnection(), newlyAddedTable); @@ -1206,7 +1213,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { // the result size of sink may arrive fetchedDataList.size() with old data, wait one // checkpoint to wait retract old record and send new record Thread.sleep(1000); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 6: trigger savepoint if (round != captureAddressTables.length - 1) { finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java index 44d72a64b..7a178842e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java @@ -492,7 +492,8 @@ public class SpecificStartingOffsetITCase { operator.getOperatorIdFuture(), serializer, accumulatorName, - env.getCheckpointConfig()); + env.getCheckpointConfig(), + 10000L); CollectStreamSink sink = new CollectStreamSink<>(stream, factory); sink.name("Data stream collect sink"); env.addOperator(sink.getTransformation()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java index c6f66de05..1826448b2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -271,7 +271,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+I[spare tire, 22.200]" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertEqualsInAnyOrder(Arrays.asList(expected), actual); result.getJobClient().get().cancel().get(); } @@ -410,7 +410,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+I[110, jacket, new water resistent white wind breaker, 0.500]" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertEqualsInAnyOrder(Arrays.asList(expected), actual); result.getJobClient().get().cancel().get(); } @@ -919,7 +919,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { expected.addAll( Lists.newArrayList("+U[0, 1024]", "+U[1, 1025]", "+U[2, 2048]", "+U[3, 2049]")); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); Collections.sort(actual); Collections.sort(expected); assertEquals(expected, actual); @@ -1023,7 +1023,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { // TODO: we can't assert merged result for incremental-snapshot, because we can't add a // keyby shuffle before "values" upsert sink. We should assert merged result once // https://issues.apache.org/jira/browse/FLINK-24511 is fixed. - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); Collections.sort(actual); assertEquals(expected, actual); result.getJobClient().get().cancel().get(); @@ -1744,7 +1744,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { String[] expected = new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertEqualsInAnyOrder(Arrays.asList(expected), actual); result.getJobClient().get().cancel().get(); @@ -1845,7 +1845,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { String[] expected = new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertEqualsInAnyOrder(Arrays.asList(expected), actual); result.getJobClient().get().cancel().get(); @@ -1925,7 +1925,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+I[110, jacket, new water resistent white wind breaker, 0.500]" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertEqualsInAnyOrder(Arrays.asList(expected), actual); result.getJobClient().get().cancel().get(); @@ -2000,7 +2000,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { String[] expected = new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertEqualsInAnyOrder(Arrays.asList(expected), actual); result.getJobClient().get().cancel().get(); @@ -2319,7 +2319,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { private static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java index eaa8f9def..db87a98ac 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java @@ -127,7 +127,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase { tableResult = tEnv.executeSql("insert into sink select * from orders_source"); waitForSinkSize("sink", realSnapshotData.size()); - assertEqualsInAnyOrder(expectedSnapshotData, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + expectedSnapshotData, TestValuesTableFactory.getRawResultsAsStrings("sink")); // third step: check dml events try (Connection connection = getJdbcConnection(); @@ -346,7 +347,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase { waitForSinkSize("multi_key_sink", realSnapshotData.size()); assertEqualsInAnyOrder( - expectedSnapshotData, TestValuesTableFactory.getRawResults("multi_key_sink")); + expectedSnapshotData, + TestValuesTableFactory.getRawResultsAsStrings("multi_key_sink")); // third step: check dml events try (Connection connection = getJdbcConnection(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java index 1b49831a4..43c3c6783 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java @@ -200,7 +200,7 @@ public abstract class PolardbxSourceTestBase extends AbstractTestBase { protected static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java index 413b7494f..7ccd90e3a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseTestBase.java @@ -139,7 +139,7 @@ public abstract class OceanBaseTestBase extends AbstractTestBase { public static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java index 616ccd5e2..cffb7c203 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java @@ -180,7 +180,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { "+U(110,jacket,new water resistent white wind breaker,0.5000000000)", "+U(111,scooter,Big 2-wheel scooter ,5.1700000000)", "-D(111,scooter,Big 2-wheel scooter ,5.1700000000)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertContainsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); @@ -276,7 +276,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { "+U(" + tenant + ",inventory,products,106,hammer,18oz carpenter hammer,1.0000000000)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertContainsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -456,7 +456,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { "+I(1,false,true,true,127,255,32767,65535,8388607,16777215,2147483647,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,abc,Hello World,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,[a, b],red,{\"key1\": \"value1\"})", "+U(1,false,true,true,127,255,32767,65535,8388607,16777215,2147483647,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,abc,Hello World,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,[a, b],red,{\"key1\": \"value1\"})"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertContainsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -536,7 +536,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { "+I(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22)", "+U(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertContainsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -591,7 +591,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { "+I(107,rocks,box of assorted rocks,5.3000000000)", "+I(108,jacket,water resistent black wind breaker,0.1000000000)", "+I(109,spare tire,24 inch spare tire,22.2000000000)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertContainsInAnyOrder(expected, actual); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java index e7345baba..b9f9fa0e0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java @@ -220,7 +220,7 @@ public class OceanBaseOracleModeITCase extends OceanBaseTestBase { "+I(1,vc2,vc2,nvc2,c ,nc ,1.1,2.22,3.33,8.888,4.444400,5.555,6.66,1234.567891,1234.567891,77.323,1,22,333,4444,5555,true,99,9999,999999999,999999999999999999,90,9900,999999990,999999999999999900,99999999999999999999999999999999999900,2022-10-30T00:00,2022-10-30T12:34:56.007890,2022-10-30T12:34:56.130,2022-10-30T12:34:56.125500,2022-10-30T12:34:56.125457,col_clob,col_blob)", "+U(1,vc2,vc2,nvc2,c ,nc ,1.1,2.22,3.33,8.888,4.444400,5.555,6.66,1234.567891,1234.567891,77.323,1,22,333,4444,5555,true,99,9999,999999999,999999999999999999,90,9900,999999990,999999999999999900,99999999999999999999999999999999999900,2022-10-30T00:00,2022-10-30T12:34:56.125450,2022-10-30T12:34:56.130,2022-10-30T12:34:56.125500,2022-10-30T12:34:56.125457,col_clob,col_blob)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertContainsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java index 0cbbb8a57..a8c24963a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java @@ -394,7 +394,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase { "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", captureTableThisRound, cityName, cityName))); waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 2: make redo log data for all tables before this round(also includes this // round), @@ -419,7 +420,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase { // step 3: assert fetched redo log data in this round waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 4: trigger savepoint finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); @@ -491,7 +493,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase { () -> sleepMs(100)); } waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); } @@ -526,7 +529,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase { JobClient jobClient = tableResult.getJobClient().get(); waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 3: make redo log data for all tables List expectedRedoLogDataThisRound = new ArrayList<>(); @@ -555,7 +559,7 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase { } if (failoverPhase == FailoverPhase.REDO_LOG - && TestValuesTableFactory.getRawResults("sink").size() + && TestValuesTableFactory.getRawResultsAsStrings("sink").size() > fetchedDataList.size()) { triggerFailover( failoverType, @@ -567,7 +571,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase { fetchedDataList.addAll(expectedRedoLogDataThisRound); // step 4: assert fetched redo log data in this round waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 5: trigger savepoint finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); @@ -681,7 +686,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase { } fetchedDataList.addAll(expectedSnapshotDataThisRound); waitForUpsertSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 3: make some redo log data for this round makeFirstPartRedoLogForAddressTable(newlyAddedTable); @@ -722,7 +728,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase { // the result size of sink may arrive fetchedDataList.size() with old data, wait one // checkpoint to wait retract old record and send new record Thread.sleep(1000); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 6: trigger savepoint if (round != captureAddressTables.length - 1) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java index c15ce81e1..1e0907015 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java @@ -218,7 +218,7 @@ public class OracleConnectorITCase { "+I[spare tire, 22.200]" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertEqualsInAnyOrder(Arrays.asList(expected), actual); result.getJobClient().get().cancel().get(); @@ -350,7 +350,7 @@ public class OracleConnectorITCase { "+I[spare tire, 22.200]" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertEqualsInAnyOrder(Arrays.asList(expected), actual); result.getJobClient().get().cancel().get(); @@ -445,7 +445,7 @@ public class OracleConnectorITCase { "+I[spare tire, 22.200]" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); LOG.info("actual:{}", actual); assertEqualsInAnyOrder(Arrays.asList(expected), actual); @@ -547,7 +547,7 @@ public class OracleConnectorITCase { "+U[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]", "-D[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); Collections.sort(expected); Collections.sort(actual); assertEquals(expected, actual); @@ -618,7 +618,7 @@ public class OracleConnectorITCase { String[] expected = new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -719,7 +719,7 @@ public class OracleConnectorITCase { "+I[11000000000, false, 98, 9998, 987654320, 20000000000000000000, 987654321.12345678, 2147483647, 1024.955, 1024.955]", "+I[11000000001, true, 99, 9999, 987654321, 20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]"); - List actual = TestValuesTableFactory.getRawResults("test_numeric_sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("test_numeric_sink"); Collections.sort(actual); assertEquals(expected, actual); result.getJobClient().get().cancel().get(); @@ -835,7 +835,7 @@ public class OracleConnectorITCase { + "\n]" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); Collections.sort(actual); assertEquals(Arrays.asList(expected), actual); result.getJobClient().get().cancel().get(); @@ -894,7 +894,7 @@ public class OracleConnectorITCase { waitForSinkSize("sink", RECORDS_COUNT); List actual = - TestValuesTableFactory.getResults("sink").stream() + TestValuesTableFactory.getResultsAsStrings("sink").stream() .map(s -> s.replaceFirst("\\+I\\[(\\d+).+", "$1")) .map(Integer::parseInt) .sorted() @@ -989,7 +989,7 @@ public class OracleConnectorITCase { private static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java index 2dd6e3a04..b43927712 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java @@ -86,7 +86,7 @@ public class OracleTestUtils { public static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; @@ -104,7 +104,7 @@ public class OracleTestUtils { public static int upsertSinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getResults(sinkName).size(); + return TestValuesTableFactory.getResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java index 5cc8ba5c5..c63dec1e7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java @@ -169,11 +169,11 @@ public abstract class PostgresTestBase extends AbstractTestBase { protected void waitForSinkResult(String sinkName, List expected) throws InterruptedException { - List actual = TestValuesTableFactory.getResults(sinkName); + List actual = TestValuesTableFactory.getResultsAsStrings(sinkName); actual = actual.stream().sorted().collect(Collectors.toList()); while (actual.size() != expected.size() || !actual.equals(expected)) { actual = - TestValuesTableFactory.getResults(sinkName).stream() + TestValuesTableFactory.getResultsAsStrings(sinkName).stream() .sorted() .collect(Collectors.toList()); Thread.sleep(1000); @@ -189,7 +189,7 @@ public abstract class PostgresTestBase extends AbstractTestBase { protected int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java index c28395f9d..e97ce65ab 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java @@ -422,7 +422,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase { "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", captureTableThisRound, cityName, cityName))); waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 2: make wal log data for all tables before this round(also includes this round), // test whether only this round table's data is captured. @@ -449,7 +450,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase { // step 3: assert fetched wal log data in this round waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 4: trigger savepoint finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); @@ -521,7 +523,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase { () -> sleepMs(100)); } waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); } @@ -556,7 +559,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase { JobClient jobClient = tableResult.getJobClient().get(); waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 3: make wal log data for all tables List expectedWalLogDataThisRound = new ArrayList<>(); @@ -590,7 +594,7 @@ public class NewlyAddedTableITCase extends PostgresTestBase { } if (failoverPhase == PostgresTestUtils.FailoverPhase.STREAM - && TestValuesTableFactory.getRawResults("sink").size() + && TestValuesTableFactory.getRawResultsAsStrings("sink").size() > fetchedDataList.size()) { PostgresTestUtils.triggerFailover( failoverType, @@ -602,7 +606,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase { fetchedDataList.addAll(expectedWalLogDataThisRound); // step 4: assert fetched wal log data in this round waitForSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); // step 5: trigger savepoint finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); @@ -716,7 +721,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase { } fetchedDataList.addAll(expectedSnapshotDataThisRound); PostgresTestUtils.waitForUpsertSinkSize("sink", fetchedDataList.size()); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 3: make some wal log data for this round makeFirstPartWalLogForAddressTable(getConnection(), newlyAddedTable); @@ -757,7 +763,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase { // the result size of sink may arrive fetchedDataList.size() with old data, wait one // checkpoint to wait retract old record and send new record Thread.sleep(1000); - assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + assertEqualsInAnyOrder( + fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink")); // step 6: trigger savepoint if (round != captureAddressTables.length - 1) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index bff6e43f2..c28646aa0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -193,7 +193,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { "spare tire,22.200" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -266,7 +266,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { String[] expected = new String[] {"110,jacket,new water resistent white wind breaker,0.500"}; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -454,7 +454,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { "+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", "-D(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", "+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertEquals(expected, actual); result.getJobClient().get().cancel().get(); @@ -592,7 +592,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { "-D(" + databaseName + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); Collections.sort(actual); Collections.sort(expected); assertEquals(expected, actual); @@ -707,7 +707,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { "spare tire,22.200" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java index 8a6e13cc0..24b9ec8ff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java @@ -179,7 +179,7 @@ public class PostgreSQLSavepointITCase extends PostgresTestBase { "+I[112, jacket, new water resistent white wind breaker, 0.500]" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); jobClient.cancel().get(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java index 0c3af2a74..fce9ece5e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/PostgresTestUtils.java @@ -96,7 +96,7 @@ public class PostgresTestUtils { public static int upsertSinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getResults(sinkName).size(); + return TestValuesTableFactory.getResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/SqlServerTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/SqlServerTestBase.java index 93b736943..d107ea5f4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/SqlServerTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/SqlServerTestBase.java @@ -215,7 +215,7 @@ public class SqlServerTestBase extends AbstractTestBase { protected static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceTestBase.java index 8f6ea0e92..cd670792e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceTestBase.java @@ -214,7 +214,7 @@ public abstract class SqlServerSourceTestBase extends TestLogger { protected static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java index 239756bb6..0b3465e0a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java @@ -199,7 +199,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase { "spare tire,22.200" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -275,7 +275,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase { "113,scooter,Big 3-wheel scooter,5.200" }; - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertThat(actual, containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); @@ -385,7 +385,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase { Arrays.asList( "+I(0,cc ,vcc,tc,cč ,vcč,tč,1.123,2,3.323,4.323,5,6,true,22,333,4444,55555,2018-07-13,10:23:45.680,10:23:45.678,2018-07-13T11:23:45.340,2018-07-13T01:23:45.456Z,2018-07-13T13:23:45.780,2018-07-13T14:24,b)", "+U(0,cc ,vcc,tc,cč ,vcč,tč,1.123,2,3.323,4.323,5,6,true,22,333,8888,55555,2018-07-13,10:23:45.680,10:23:45.679,2018-07-13T11:23:45.340,2018-07-13T01:23:45.456Z,2018-07-13T13:23:45.780,2018-07-13T14:24,b)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertEquals(expected, actual); result.getJobClient().get().cancel().get(); @@ -483,7 +483,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase { "+U(inventory,dbo,products,110,jacket,new water resistent white wind breaker,0.500)", "+U(inventory,dbo,products,111,scooter,Big 2-wheel scooter ,5.170)", "-D(inventory,dbo,products,111,scooter,Big 2-wheel scooter ,5.170)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); Collections.sort(actual); Collections.sort(expected); assertEquals(expected, actual); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTimezoneITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTimezoneITCase.java index 561af86e6..c3d9f9855 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTimezoneITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTimezoneITCase.java @@ -188,7 +188,7 @@ public class SqlServerTimezoneITCase extends SqlServerTestBase { waitForSnapshotStarted("sink"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); result.getJobClient().get().cancel().get(); return actual; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java index a492367a4..dcc488178 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java @@ -160,7 +160,7 @@ public class TiDBConnectorITCase extends TiDBTestBase { "+U(110,jacket,new water resistent white wind breaker,0.5000000000)", "+U(111,scooter,Big 2-wheel scooter ,5.1700000000)", "-D(111,scooter,Big 2-wheel scooter ,5.1700000000)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertEqualsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -239,7 +239,7 @@ public class TiDBConnectorITCase extends TiDBTestBase { "+U(110,jacket2,null,0.5000000000)", "+U(111,scooter,null,5.1700000000)", "-D(111,scooter,null,5.1700000000)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertEqualsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -324,7 +324,7 @@ public class TiDBConnectorITCase extends TiDBTestBase { "+U(110,jacket,new water resistent white wind breaker,0.5000000000)", "+U(111,scooter,Big 2-wheel scooter ,5.1700000000)", "-D(111,scooter,Big 2-wheel scooter ,5.1700000000)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertEqualsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -397,7 +397,7 @@ public class TiDBConnectorITCase extends TiDBTestBase { "+I(inventory,products,109,spare tire,24 inch spare tire,22.2000000000)", "+U(inventory,products,106,hammer,18oz carpenter hammer,1.0000000000)", "-U(inventory,products,106,hammer,16oz carpenter's hammer,1.0000000000)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertEqualsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -534,7 +534,7 @@ public class TiDBConnectorITCase extends TiDBTestBase { "+I(1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,2147483647,9223372036854775807,18446744073709551615,Hello World,abc,123.102,123.102,404.4443,123.4567,346,34567892.1,false,true,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,[101, 26, -17, -65, -67, 8, 57, 15, 72, -17, -65, -67, -17, -65, -67, -17, -65, -67, 54, -17, -65, -67, 62, 123, 116, 0],[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2021,red,[a, b],{\"key1\":\"value1\"})", "+U(1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,2147483647,9223372036854775807,18446744073709551615,Hello World,abc,123.102,123.102,404.4443,123.4567,346,34567892.1,false,true,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,[101, 26, -17, -65, -67, 8, 57, 15, 72, -17, -65, -67, -17, -65, -67, -17, -65, -67, 54, -17, -65, -67, 62, 123, 116, 0],[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2021,red,[a, b],{\"key1\":\"value1\"})"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertEqualsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -614,7 +614,7 @@ public class TiDBConnectorITCase extends TiDBTestBase { "+I(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22)", "+U(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22)"); - List actual = TestValuesTableFactory.getRawResults("sink"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); assertEqualsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -629,7 +629,7 @@ public class TiDBConnectorITCase extends TiDBTestBase { private static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java index 766b4b77e..47c7ac366 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java @@ -139,7 +139,7 @@ public class TiDBConnectorRegionITCase extends TiDBTestBase { private static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java index 4ddd5c1bc..c660fa67e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java @@ -141,7 +141,7 @@ public class VitessConnectorITCase extends VitessTestBase { "+I[jacket, 0.600]", "+I[spare tire, 22.200]"); - List actual = TestValuesTableFactory.getResults("sink"); + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); assertEqualsInAnyOrder(expected, actual); result.getJobClient().get().cancel().get(); } @@ -245,7 +245,7 @@ public class VitessConnectorITCase extends VitessTestBase { private static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { - return TestValuesTableFactory.getRawResults(sinkName).size(); + return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index c9850f29d..31c4d2ec8 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -28,12 +28,10 @@ limitations under the License. flink-cdc-pipeline-e2e-tests - 1.17.2 - 1.18.1 1.19.1 1.20.0 8.0.27 - 1.2.9_flink-${flink.major.version} + 1.2.10_flink-${flink.major.version} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java index 093359642..a1c3c482b 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java @@ -95,7 +95,7 @@ public abstract class PipelineTestEnvironment extends TestLogger { if (flinkVersion != null) { return Collections.singletonList(flinkVersion); } else { - return Arrays.asList("1.17.2", "1.18.1", "1.19.1", "1.20.0"); + return Arrays.asList("1.19.1", "1.20.0"); } } @@ -104,7 +104,7 @@ public abstract class PipelineTestEnvironment extends TestLogger { LOG.info("Starting containers..."); jobManagerConsumer = new ToStringConsumer(); - String flinkProperties = getFlinkProperties(flinkVersion); + String flinkProperties = getFlinkProperties(); jobManager = new GenericContainer<>(getFlinkDockerImageTag()) @@ -257,19 +257,7 @@ public abstract class PipelineTestEnvironment extends TestLogger { versionParts.get(0), versionParts.get(1), versionParts.get(2), null, null, null); } - private static String getFlinkProperties(String flinkVersion) { - // this is needed for oracle-cdc tests. - // see https://stackoverflow.com/a/47062742/4915129 - String javaOptsConfig; - Version version = parseVersion(flinkVersion); - if (version.compareTo(parseVersion("1.17.0")) >= 0) { - // Flink 1.17 renames `env.java.opts` to `env.java.opts.all` - javaOptsConfig = "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false"; - } else { - // Legacy Flink version, might drop their support in near future - javaOptsConfig = "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"; - } - + private static String getFlinkProperties() { return String.join( "\n", Arrays.asList( @@ -278,6 +266,6 @@ public abstract class PipelineTestEnvironment extends TestLogger { "taskmanager.numberOfTaskSlots: 10", "parallelism.default: 4", "execution.checkpointing.interval: 300", - javaOptsConfig)); + "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false")); } } diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml index cdb0b43e8..7f263c5e7 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml @@ -28,13 +28,8 @@ limitations under the License. flink-cdc-source-e2e-tests - 1.16.3 - 1.17.2 - 1.18.1 1.19.1 1.20.0 - 3.1.2-1.17 - 3.2.0-1.18 3.2.0-1.19 3.2.0-1.19 8.0.27 @@ -246,36 +241,6 @@ limitations under the License. - - org.apache.flink - flink-connector-jdbc - ${flink-1.16} - jdbc-connector_${flink-1.16}.jar - jar - ${project.build.directory}/dependencies - - - - - org.apache.flink - flink-connector-jdbc - ${jdbc.version-1.17} - jdbc-connector_${flink-1.17}.jar - jar - ${project.build.directory}/dependencies - - - - - org.apache.flink - flink-connector-jdbc - ${jdbc.version-1.18} - jdbc-connector_${flink-1.18}.jar - jar - ${project.build.directory}/dependencies - - - org.apache.flink flink-connector-jdbc diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java index 77aabc2df..4614a582e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java @@ -125,7 +125,7 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { if (flinkVersion != null) { return Collections.singletonList(flinkVersion); } else { - return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.1", "1.20.0"); + return Arrays.asList("1.19.1", "1.20.0"); } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java index d8b977029..edb64adcd 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; @@ -280,6 +281,9 @@ public class EventOperatorTestHarness, E ex throw new UnsupportedOperationException(); } + @Override + public void emitRecordAttributes(RecordAttributes recordAttributes) {} + @Override public void close() {} } diff --git a/pom.xml b/pom.xml index 72109d92a..314d7b1c5 100644 --- a/pom.xml +++ b/pom.xml @@ -69,8 +69,8 @@ limitations under the License. true - 1.18.1 - 1.18 + 1.19.1 + 1.19 17.0 1.9.8.Final 3.2.0 @@ -356,6 +356,13 @@ limitations under the License. ${assertj.version} test + + + commons-codec + commons-codec + 1.15 + test + @@ -568,6 +575,7 @@ limitations under the License. org.apache.maven.plugins maven-shade-plugin + 3.4.1 shade-flink diff --git a/tools/mig-test/datastream/datastream-3.3-SNAPSHOT/pom.xml b/tools/mig-test/datastream/datastream-3.3-SNAPSHOT/pom.xml index b67eb2082..17c57d623 100644 --- a/tools/mig-test/datastream/datastream-3.3-SNAPSHOT/pom.xml +++ b/tools/mig-test/datastream/datastream-3.3-SNAPSHOT/pom.xml @@ -27,7 +27,7 @@ limitations under the License. UTF-8 - 1.18.1 + 1.19.1 3.3-SNAPSHOT 1.9.7.Final 2.12