[FLINK-36586][build] Update flink version to 1.19 (#3660)

Co-authored-by: ConradJam <czy006@apache.com>
Co-authored-by: Hang Ruan <ruanhang1993@hotmail.com>
pull/3744/head
ConradJam 2 months ago committed by GitHub
parent 8e6c361f96
commit dd79a02f0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -71,7 +71,7 @@ jobs:
strategy:
matrix:
java-version: [ '8' ]
flink-version: ['1.17.2', '1.18.1', '1.19.1', '1.20.0']
flink-version: ['1.19.1', '1.20.0']
module: [ 'pipeline_e2e' ]
name: Pipeline E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
@ -83,7 +83,7 @@ jobs:
strategy:
matrix:
java-version: [ '8' ]
flink-version: ['1.16.3', '1.17.2', '1.18.1', '1.19.1', '1.20.0']
flink-version: ['1.19.1', '1.20.0']
module: [ 'source_e2e' ]
name: Source E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml

@ -39,7 +39,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
flink-version: [ '1.18.1', '1.19.1', '1.20.0' ]
flink-version: [ '1.19.1', '1.20.0' ]
steps:
- uses: actions/checkout@v4
@ -78,7 +78,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
flink-version: [ '1.18.1', '1.19.1', '1.20.0' ]
flink-version: [ '1.19.1', '1.20.0' ]
steps:
- uses: actions/checkout@v4

@ -36,7 +36,7 @@ limitations under the License.
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<elasticsearch.version>8.12.1</elasticsearch.version>
<flink.version>1.18.0</flink.version>
<flink.version>1.19.1</flink.version>
<scala.binary.version>4.0</scala.binary.version>
<jackson.version>2.13.2</jackson.version>
<surefire.module.config>--add-opens=java.base/java.util=ALL-UNNAMED</surefire.module.config>

@ -35,7 +35,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.2-${flink.major.version}</version>
<version>3.3.0-${flink.major.version}</version>
</dependency>
<dependency>

@ -72,6 +72,13 @@ limitations under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>

@ -18,6 +18,8 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
@ -40,6 +42,8 @@ import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
@ -572,7 +576,10 @@ public class PaimonSinkITCase {
private static class MockCommitRequestImpl<CommT> extends CommitRequestImpl<CommT> {
protected MockCommitRequestImpl(CommT committable) {
super(committable);
super(
committable,
InternalSinkCommitterMetricGroup.wrap(
UnregisteredMetricsGroup.createOperatorMetricGroup()));
}
}
@ -633,5 +640,15 @@ public class PaimonSinkITCase {
public JobID getJobId() {
return null;
}
@Override
public JobInfo getJobInfo() {
return null;
}
@Override
public TaskInfo getTaskInfo() {
return null;
}
}
}

@ -28,7 +28,7 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>
<properties>
<starrocks.connector.version>1.2.9_flink-${flink.major.version}</starrocks.connector.version>
<starrocks.connector.version>1.2.10_flink-${flink.major.version}</starrocks.connector.version>
</properties>
<dependencies>
@ -38,15 +38,6 @@ limitations under the License.
<version>${starrocks.connector.version}</version>
</dependency>
<dependency>
<!-- TODO connector 1.2.9 depends on this, but not package it, so add this dependency here.
This dependency can be removed after upgrading connector to 1.2.10 which will not use
commons-compress anymore. -->
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>

@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.starrocks.shade.org.apache.commons.compress.utils;
import java.util.ArrayList;
/**
* Dummy class of shaded apache-commons since connector 1.2.9 depends on this, but not package it.
* This package should be removed after upgrading to 1.2.10 which will not use commons-compress
* anymore.
*/
public class Lists {
public static <E> ArrayList<E> newArrayList() {
return new ArrayList<>();
}
}

@ -18,6 +18,8 @@
package org.apache.flink.cdc.connectors.starrocks.sink;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
@ -353,5 +355,15 @@ public class EventRecordSerializationSchemaTest {
public JobID getJobId() {
throw new UnsupportedOperationException();
}
@Override
public JobInfo getJobInfo() {
throw new UnsupportedOperationException();
}
@Override
public TaskInfo getTaskInfo() {
throw new UnsupportedOperationException();
}
}
}

@ -209,7 +209,7 @@ public class Db2ConnectorITCase extends Db2TestBase {
"spare tire,22.200"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
cancelJobIfRunning(result);
@ -302,7 +302,7 @@ public class Db2ConnectorITCase extends Db2TestBase {
Arrays.asList(
"+I(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)",
"+U(1,0,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(expected);
Collections.sort(actual);
assertEquals(expected, actual);
@ -373,7 +373,7 @@ public class Db2ConnectorITCase extends Db2TestBase {
String[] expected =
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
cancelJobIfRunning(result);
@ -470,7 +470,7 @@ public class Db2ConnectorITCase extends Db2TestBase {
"+U(testdb,DB2INST1,PRODUCTS,111,scooter,Big 2-wheel scooter ,5.170)",
"-D(testdb,DB2INST1,PRODUCTS,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(expected);
Collections.sort(actual);
assertEquals(expected, actual);
@ -493,7 +493,7 @@ public class Db2ConnectorITCase extends Db2TestBase {
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -401,7 +401,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// first round's changelog data
makeOplogForAddressTableInRound(database, collection0, 0);
@ -418,7 +418,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
collection0, 417022095255614380L, cityName0, cityName0)));
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
@ -466,7 +466,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
captureTableThisRound, cityName, cityName)));
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 4: make changelog data for all collections before this round(also includes this
// round),
@ -512,7 +512,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 6: trigger savepoint
if (round != captureAddressCollections.length - 1) {
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
@ -590,7 +590,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
}
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
@ -629,7 +629,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 3: make oplog data for all collections
List<String> expectedOplogDataThisRound = new ArrayList<>();
@ -665,7 +665,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
}
if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM
&& TestValuesTableFactory.getRawResults("sink").size()
&& TestValuesTableFactory.getRawResultsAsStrings("sink").size()
> fetchedDataList.size()) {
MongoDBTestUtils.triggerFailover(
failoverType,
@ -679,7 +679,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 5: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
@ -798,7 +798,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
fetchedDataList.addAll(expectedSnapshotDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResults("sink"));
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 3: make some changelog data for this round
makeFirstPartOplogForAddressCollection(
@ -843,7 +843,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
// checkpoint to wait retract old record and send new record
Thread.sleep(1000);
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResults("sink"));
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 6: trigger savepoint
if (round != captureAddressCollections.length - 1) {
@ -1023,7 +1023,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
protected static int upsertSinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getResults(sinkName).size();
return TestValuesTableFactory.getResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -221,7 +221,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
"spare tire,22.200"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -300,7 +300,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
String[] expected = new String[] {"jacket,0.200", "scooter,5.180"};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -463,7 +463,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
"+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)",
"-U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)",
"+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,18:36:04,2021-09-03,1960-08-11,2021-09-03T18:36:04.123,2021-09-03T18:36:04.123Z,2021-09-03T18:36:04,2021-09-03T18:36:04Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
@ -582,7 +582,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
.sorted()
.collect(Collectors.toList());
List<String> actual = TestValuesTableFactory.getRawResults("meta_sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("meta_sink");
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();

@ -115,7 +115,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
String.format("+I[%s, coll_a2, A202]", db1)
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -169,7 +169,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
String.format("+I[%s, coll_b2, B202]", db1),
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -208,7 +208,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
String.format("+I[%s, coll_b2, B202]", db0)
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -247,7 +247,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
String.format("+I[%s, coll_b2, B202]", db0)
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -267,7 +267,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
// 3. Check results
String[] expected = new String[] {String.format("+I[%s, coll-a1, A101]", db0)};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -292,7 +292,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
String.format("+I[%s, coll.name, A103]", db)
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();

@ -56,7 +56,8 @@ public class MongoDBTestUtils {
fail(
"Wait for sink size timeout, raw results: \n"
+ String.join(
"\n", TestValuesTableFactory.getRawResults(sinkName)));
"\n",
TestValuesTableFactory.getRawResultsAsStrings(sinkName)));
}
Thread.sleep(100);
}
@ -65,7 +66,7 @@ public class MongoDBTestUtils {
public static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -326,7 +326,8 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
env.getCheckpointConfig());
env.getCheckpointConfig(),
10000L);
CollectStreamSink<RowData> sink = new CollectStreamSink(source, factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());
@ -807,7 +808,8 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
env.getCheckpointConfig());
env.getCheckpointConfig(),
10000L);
CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());

@ -179,7 +179,7 @@ public abstract class MySqlSourceTestBase extends TestLogger {
protected static int upsertSinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getResults(sinkName).size();
return TestValuesTableFactory.getResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -535,7 +535,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
stream.getExecutionEnvironment().getCheckpointConfig());
stream.getExecutionEnvironment().getCheckpointConfig(),
10000L);
return iterator;
}
@ -638,7 +639,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
() -> sleepMs(100));
}
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
@ -673,7 +675,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
JobClient jobClient = tableResult.getJobClient().get();
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 3: make binlog data for all tables
List<String> expectedBinlogDataThisRound = new ArrayList<>();
@ -701,7 +704,7 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
}
if (failoverPhase == FailoverPhase.BINLOG
&& TestValuesTableFactory.getRawResults("sink").size()
&& TestValuesTableFactory.getRawResultsAsStrings("sink").size()
> fetchedDataList.size()) {
triggerFailover(
failoverType,
@ -713,7 +716,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
fetchedDataList.addAll(expectedBinlogDataThisRound);
// step 4: assert fetched binlog data in this round
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 5: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
@ -827,7 +831,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
}
fetchedDataList.addAll(expectedSnapshotDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 3: make some binlog data for this round
makeFirstPartBinlogForAddressTable(getConnection(), newlyAddedTable);
@ -869,7 +874,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
// the result size of sink may arrive fetchedDataList.size() with old data, wait one
// checkpoint to wait retract old record and send new record
Thread.sleep(1000);
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 6: trigger savepoint
if (round != captureAddressTables.length - 1) {
@ -1108,7 +1114,7 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
@ -1176,7 +1182,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
newlyAddedTable, cityName, cityName));
fetchedDataList.addAll(expectedSnapshotDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 3: make some binlog data for this round
makeFirstPartBinlogForAddressTable(getConnection(), newlyAddedTable);
makeSecondPartBinlogForAddressTable(getConnection(), newlyAddedTable);
@ -1206,7 +1213,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
// the result size of sink may arrive fetchedDataList.size() with old data, wait one
// checkpoint to wait retract old record and send new record
Thread.sleep(1000);
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 6: trigger savepoint
if (round != captureAddressTables.length - 1) {
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);

@ -492,7 +492,8 @@ public class SpecificStartingOffsetITCase {
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
env.getCheckpointConfig());
env.getCheckpointConfig(),
10000L);
CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());

@ -271,7 +271,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+I[spare tire, 22.200]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get();
}
@ -410,7 +410,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+I[110, jacket, new water resistent white wind breaker, 0.500]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get();
}
@ -919,7 +919,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
expected.addAll(
Lists.newArrayList("+U[0, 1024]", "+U[1, 1025]", "+U[2, 2048]", "+U[3, 2049]"));
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(actual);
Collections.sort(expected);
assertEquals(expected, actual);
@ -1023,7 +1023,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
// TODO: we can't assert merged result for incremental-snapshot, because we can't add a
// keyby shuffle before "values" upsert sink. We should assert merged result once
// https://issues.apache.org/jira/browse/FLINK-24511 is fixed.
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
@ -1744,7 +1744,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
String[] expected =
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get();
@ -1845,7 +1845,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
String[] expected =
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get();
@ -1925,7 +1925,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+I[110, jacket, new water resistent white wind breaker, 0.500]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get();
@ -2000,7 +2000,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
String[] expected =
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get();
@ -2319,7 +2319,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -127,7 +127,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
tableResult = tEnv.executeSql("insert into sink select * from orders_source");
waitForSinkSize("sink", realSnapshotData.size());
assertEqualsInAnyOrder(expectedSnapshotData, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
expectedSnapshotData, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// third step: check dml events
try (Connection connection = getJdbcConnection();
@ -346,7 +347,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
waitForSinkSize("multi_key_sink", realSnapshotData.size());
assertEqualsInAnyOrder(
expectedSnapshotData, TestValuesTableFactory.getRawResults("multi_key_sink"));
expectedSnapshotData,
TestValuesTableFactory.getRawResultsAsStrings("multi_key_sink"));
// third step: check dml events
try (Connection connection = getJdbcConnection();

@ -200,7 +200,7 @@ public abstract class PolardbxSourceTestBase extends AbstractTestBase {
protected static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -139,7 +139,7 @@ public abstract class OceanBaseTestBase extends AbstractTestBase {
public static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -180,7 +180,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
"+U(110,jacket,new water resistent white wind breaker,0.5000000000)",
"+U(111,scooter,Big 2-wheel scooter ,5.1700000000)",
"-D(111,scooter,Big 2-wheel scooter ,5.1700000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
@ -276,7 +276,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
"+U("
+ tenant
+ ",inventory,products,106,hammer,18oz carpenter hammer,1.0000000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -456,7 +456,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
"+I(1,false,true,true,127,255,32767,65535,8388607,16777215,2147483647,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,abc,Hello World,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,[a, b],red,{\"key1\": \"value1\"})",
"+U(1,false,true,true,127,255,32767,65535,8388607,16777215,2147483647,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,abc,Hello World,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,[a, b],red,{\"key1\": \"value1\"})");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -536,7 +536,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
"+I(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22)",
"+U(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -591,7 +591,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
"+I(107,rocks,box of assorted rocks,5.3000000000)",
"+I(108,jacket,water resistent black wind breaker,0.1000000000)",
"+I(109,spare tire,24 inch spare tire,22.2000000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertContainsInAnyOrder(expected, actual);
}
}

@ -220,7 +220,7 @@ public class OceanBaseOracleModeITCase extends OceanBaseTestBase {
"+I(1,vc2,vc2,nvc2,c ,nc ,1.1,2.22,3.33,8.888,4.444400,5.555,6.66,1234.567891,1234.567891,77.323,1,22,333,4444,5555,true,99,9999,999999999,999999999999999999,90,9900,999999990,999999999999999900,99999999999999999999999999999999999900,2022-10-30T00:00,2022-10-30T12:34:56.007890,2022-10-30T12:34:56.130,2022-10-30T12:34:56.125500,2022-10-30T12:34:56.125457,col_clob,col_blob)",
"+U(1,vc2,vc2,nvc2,c ,nc ,1.1,2.22,3.33,8.888,4.444400,5.555,6.66,1234.567891,1234.567891,77.323,1,22,333,4444,5555,true,99,9999,999999999,999999999999999999,90,9900,999999990,999999999999999900,99999999999999999999999999999999999900,2022-10-30T00:00,2022-10-30T12:34:56.125450,2022-10-30T12:34:56.130,2022-10-30T12:34:56.125500,2022-10-30T12:34:56.125457,col_clob,col_blob)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}

@ -394,7 +394,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase {
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
captureTableThisRound, cityName, cityName)));
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 2: make redo log data for all tables before this round(also includes this
// round),
@ -419,7 +420,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase {
// step 3: assert fetched redo log data in this round
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 4: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
@ -491,7 +493,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase {
() -> sleepMs(100));
}
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
@ -526,7 +529,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase {
JobClient jobClient = tableResult.getJobClient().get();
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 3: make redo log data for all tables
List<String> expectedRedoLogDataThisRound = new ArrayList<>();
@ -555,7 +559,7 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase {
}
if (failoverPhase == FailoverPhase.REDO_LOG
&& TestValuesTableFactory.getRawResults("sink").size()
&& TestValuesTableFactory.getRawResultsAsStrings("sink").size()
> fetchedDataList.size()) {
triggerFailover(
failoverType,
@ -567,7 +571,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase {
fetchedDataList.addAll(expectedRedoLogDataThisRound);
// step 4: assert fetched redo log data in this round
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 5: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
@ -681,7 +686,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase {
}
fetchedDataList.addAll(expectedSnapshotDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 3: make some redo log data for this round
makeFirstPartRedoLogForAddressTable(newlyAddedTable);
@ -722,7 +728,8 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase {
// the result size of sink may arrive fetchedDataList.size() with old data, wait one
// checkpoint to wait retract old record and send new record
Thread.sleep(1000);
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 6: trigger savepoint
if (round != captureAddressTables.length - 1) {

@ -218,7 +218,7 @@ public class OracleConnectorITCase {
"+I[spare tire, 22.200]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get();
@ -350,7 +350,7 @@ public class OracleConnectorITCase {
"+I[spare tire, 22.200]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get();
@ -445,7 +445,7 @@ public class OracleConnectorITCase {
"+I[spare tire, 22.200]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
LOG.info("actual:{}", actual);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
@ -547,7 +547,7 @@ public class OracleConnectorITCase {
"+U[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]",
"-D[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(expected);
Collections.sort(actual);
assertEquals(expected, actual);
@ -618,7 +618,7 @@ public class OracleConnectorITCase {
String[] expected =
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -719,7 +719,7 @@ public class OracleConnectorITCase {
"+I[11000000000, false, 98, 9998, 987654320, 20000000000000000000, 987654321.12345678, 2147483647, 1024.955, 1024.955]",
"+I[11000000001, true, 99, 9999, 987654321, 20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]");
List<String> actual = TestValuesTableFactory.getRawResults("test_numeric_sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("test_numeric_sink");
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
@ -835,7 +835,7 @@ public class OracleConnectorITCase {
+ "</name>\n]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
Collections.sort(actual);
assertEquals(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get();
@ -894,7 +894,7 @@ public class OracleConnectorITCase {
waitForSinkSize("sink", RECORDS_COUNT);
List<Integer> actual =
TestValuesTableFactory.getResults("sink").stream()
TestValuesTableFactory.getResultsAsStrings("sink").stream()
.map(s -> s.replaceFirst("\\+I\\[(\\d+).+", "$1"))
.map(Integer::parseInt)
.sorted()
@ -989,7 +989,7 @@ public class OracleConnectorITCase {
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -86,7 +86,7 @@ public class OracleTestUtils {
public static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
@ -104,7 +104,7 @@ public class OracleTestUtils {
public static int upsertSinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getResults(sinkName).size();
return TestValuesTableFactory.getResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -169,11 +169,11 @@ public abstract class PostgresTestBase extends AbstractTestBase {
protected void waitForSinkResult(String sinkName, List<String> expected)
throws InterruptedException {
List<String> actual = TestValuesTableFactory.getResults(sinkName);
List<String> actual = TestValuesTableFactory.getResultsAsStrings(sinkName);
actual = actual.stream().sorted().collect(Collectors.toList());
while (actual.size() != expected.size() || !actual.equals(expected)) {
actual =
TestValuesTableFactory.getResults(sinkName).stream()
TestValuesTableFactory.getResultsAsStrings(sinkName).stream()
.sorted()
.collect(Collectors.toList());
Thread.sleep(1000);
@ -189,7 +189,7 @@ public abstract class PostgresTestBase extends AbstractTestBase {
protected int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -422,7 +422,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase {
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
captureTableThisRound, cityName, cityName)));
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 2: make wal log data for all tables before this round(also includes this round),
// test whether only this round table's data is captured.
@ -449,7 +450,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase {
// step 3: assert fetched wal log data in this round
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 4: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
@ -521,7 +523,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase {
() -> sleepMs(100));
}
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
@ -556,7 +559,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase {
JobClient jobClient = tableResult.getJobClient().get();
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 3: make wal log data for all tables
List<String> expectedWalLogDataThisRound = new ArrayList<>();
@ -590,7 +594,7 @@ public class NewlyAddedTableITCase extends PostgresTestBase {
}
if (failoverPhase == PostgresTestUtils.FailoverPhase.STREAM
&& TestValuesTableFactory.getRawResults("sink").size()
&& TestValuesTableFactory.getRawResultsAsStrings("sink").size()
> fetchedDataList.size()) {
PostgresTestUtils.triggerFailover(
failoverType,
@ -602,7 +606,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase {
fetchedDataList.addAll(expectedWalLogDataThisRound);
// step 4: assert fetched wal log data in this round
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 5: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
@ -716,7 +721,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase {
}
fetchedDataList.addAll(expectedSnapshotDataThisRound);
PostgresTestUtils.waitForUpsertSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 3: make some wal log data for this round
makeFirstPartWalLogForAddressTable(getConnection(), newlyAddedTable);
@ -757,7 +763,8 @@ public class NewlyAddedTableITCase extends PostgresTestBase {
// the result size of sink may arrive fetchedDataList.size() with old data, wait one
// checkpoint to wait retract old record and send new record
Thread.sleep(1000);
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
// step 6: trigger savepoint
if (round != captureAddressTables.length - 1) {

@ -193,7 +193,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
"spare tire,22.200"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -266,7 +266,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
String[] expected =
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -454,7 +454,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
"+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
"-D(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
"+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
@ -592,7 +592,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
"-D("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(actual);
Collections.sort(expected);
assertEquals(expected, actual);
@ -707,7 +707,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
"spare tire,22.200"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();

@ -179,7 +179,7 @@ public class PostgreSQLSavepointITCase extends PostgresTestBase {
"+I[112, jacket, new water resistent white wind breaker, 0.500]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
jobClient.cancel().get();

@ -96,7 +96,7 @@ public class PostgresTestUtils {
public static int upsertSinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getResults(sinkName).size();
return TestValuesTableFactory.getResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -215,7 +215,7 @@ public class SqlServerTestBase extends AbstractTestBase {
protected static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -214,7 +214,7 @@ public abstract class SqlServerSourceTestBase extends TestLogger {
protected static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -199,7 +199,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
"spare tire,22.200"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -275,7 +275,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
"113,scooter,Big 3-wheel scooter,5.200"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
@ -385,7 +385,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
Arrays.asList(
"+I(0,cc ,vcc,tc,cč ,vcč,tč,1.123,2,3.323,4.323,5,6,true,22,333,4444,55555,2018-07-13,10:23:45.680,10:23:45.678,2018-07-13T11:23:45.340,2018-07-13T01:23:45.456Z,2018-07-13T13:23:45.780,2018-07-13T14:24,<a>b</a>)",
"+U(0,cc ,vcc,tc,cč ,vcč,tč,1.123,2,3.323,4.323,5,6,true,22,333,8888,55555,2018-07-13,10:23:45.680,10:23:45.679,2018-07-13T11:23:45.340,2018-07-13T01:23:45.456Z,2018-07-13T13:23:45.780,2018-07-13T14:24,<a>b</a>)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
@ -483,7 +483,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
"+U(inventory,dbo,products,110,jacket,new water resistent white wind breaker,0.500)",
"+U(inventory,dbo,products,111,scooter,Big 2-wheel scooter ,5.170)",
"-D(inventory,dbo,products,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(actual);
Collections.sort(expected);
assertEquals(expected, actual);

@ -188,7 +188,7 @@ public class SqlServerTimezoneITCase extends SqlServerTestBase {
waitForSnapshotStarted("sink");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
result.getJobClient().get().cancel().get();
return actual;

@ -160,7 +160,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
"+U(110,jacket,new water resistent white wind breaker,0.5000000000)",
"+U(111,scooter,Big 2-wheel scooter ,5.1700000000)",
"-D(111,scooter,Big 2-wheel scooter ,5.1700000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -239,7 +239,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
"+U(110,jacket2,null,0.5000000000)",
"+U(111,scooter,null,5.1700000000)",
"-D(111,scooter,null,5.1700000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -324,7 +324,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
"+U(110,jacket,new water resistent white wind breaker,0.5000000000)",
"+U(111,scooter,Big 2-wheel scooter ,5.1700000000)",
"-D(111,scooter,Big 2-wheel scooter ,5.1700000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -397,7 +397,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
"+I(inventory,products,109,spare tire,24 inch spare tire,22.2000000000)",
"+U(inventory,products,106,hammer,18oz carpenter hammer,1.0000000000)",
"-U(inventory,products,106,hammer,16oz carpenter's hammer,1.0000000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -534,7 +534,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
"+I(1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,2147483647,9223372036854775807,18446744073709551615,Hello World,abc,123.102,123.102,404.4443,123.4567,346,34567892.1,false,true,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,[101, 26, -17, -65, -67, 8, 57, 15, 72, -17, -65, -67, -17, -65, -67, -17, -65, -67, 54, -17, -65, -67, 62, 123, 116, 0],[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2021,red,[a, b],{\"key1\":\"value1\"})",
"+U(1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,2147483647,9223372036854775807,18446744073709551615,Hello World,abc,123.102,123.102,404.4443,123.4567,346,34567892.1,false,true,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,[101, 26, -17, -65, -67, 8, 57, 15, 72, -17, -65, -67, -17, -65, -67, -17, -65, -67, 54, -17, -65, -67, 62, 123, 116, 0],[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2021,red,[a, b],{\"key1\":\"value1\"})");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -614,7 +614,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
"+I(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22)",
"+U(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -629,7 +629,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -139,7 +139,7 @@ public class TiDBConnectorRegionITCase extends TiDBTestBase {
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -141,7 +141,7 @@ public class VitessConnectorITCase extends VitessTestBase {
"+I[jacket, 0.600]",
"+I[spare tire, 22.200]");
List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@ -245,7 +245,7 @@ public class VitessConnectorITCase extends VitessTestBase {
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;

@ -28,12 +28,10 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-e2e-tests</artifactId>
<properties>
<flink-1.17>1.17.2</flink-1.17>
<flink-1.18>1.18.1</flink-1.18>
<flink-1.19>1.19.1</flink-1.19>
<flink-1.20>1.20.0</flink-1.20>
<mysql.driver.version>8.0.27</mysql.driver.version>
<starrocks.connector.version>1.2.9_flink-${flink.major.version}</starrocks.connector.version>
<starrocks.connector.version>1.2.10_flink-${flink.major.version}</starrocks.connector.version>
</properties>
<dependencies>

@ -95,7 +95,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
if (flinkVersion != null) {
return Collections.singletonList(flinkVersion);
} else {
return Arrays.asList("1.17.2", "1.18.1", "1.19.1", "1.20.0");
return Arrays.asList("1.19.1", "1.20.0");
}
}
@ -104,7 +104,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
LOG.info("Starting containers...");
jobManagerConsumer = new ToStringConsumer();
String flinkProperties = getFlinkProperties(flinkVersion);
String flinkProperties = getFlinkProperties();
jobManager =
new GenericContainer<>(getFlinkDockerImageTag())
@ -257,19 +257,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
versionParts.get(0), versionParts.get(1), versionParts.get(2), null, null, null);
}
private static String getFlinkProperties(String flinkVersion) {
// this is needed for oracle-cdc tests.
// see https://stackoverflow.com/a/47062742/4915129
String javaOptsConfig;
Version version = parseVersion(flinkVersion);
if (version.compareTo(parseVersion("1.17.0")) >= 0) {
// Flink 1.17 renames `env.java.opts` to `env.java.opts.all`
javaOptsConfig = "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false";
} else {
// Legacy Flink version, might drop their support in near future
javaOptsConfig = "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false";
}
private static String getFlinkProperties() {
return String.join(
"\n",
Arrays.asList(
@ -278,6 +266,6 @@ public abstract class PipelineTestEnvironment extends TestLogger {
"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
"execution.checkpointing.interval: 300",
javaOptsConfig));
"env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false"));
}
}

@ -28,13 +28,8 @@ limitations under the License.
<artifactId>flink-cdc-source-e2e-tests</artifactId>
<properties>
<flink-1.16>1.16.3</flink-1.16>
<flink-1.17>1.17.2</flink-1.17>
<flink-1.18>1.18.1</flink-1.18>
<flink-1.19>1.19.1</flink-1.19>
<flink-1.20>1.20.0</flink-1.20>
<jdbc.version-1.17>3.1.2-1.17</jdbc.version-1.17>
<jdbc.version-1.18>3.2.0-1.18</jdbc.version-1.18>
<jdbc.version-1.19>3.2.0-1.19</jdbc.version-1.19>
<jdbc.version-1.20>3.2.0-1.19</jdbc.version-1.20>
<mysql.driver.version>8.0.27</mysql.driver.version>
@ -246,36 +241,6 @@ limitations under the License.
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink-1.16}</version>
<destFileName>jdbc-connector_${flink-1.16}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${jdbc.version-1.17}</version>
<destFileName>jdbc-connector_${flink-1.17}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${jdbc.version-1.18}</version>
<destFileName>jdbc-connector_${flink-1.18}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>

@ -125,7 +125,7 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
if (flinkVersion != null) {
return Collections.singletonList(flinkVersion);
} else {
return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.1", "1.20.0");
return Arrays.asList("1.19.1", "1.20.0");
}
}

@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@ -280,6 +281,9 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
throw new UnsupportedOperationException();
}
@Override
public void emitRecordAttributes(RecordAttributes recordAttributes) {}
@Override
public void close() {}
}

@ -69,8 +69,8 @@ limitations under the License.
<flink.reuseForks>true</flink.reuseForks>
<!-- dependencies versions -->
<flink.version>1.18.1</flink.version>
<flink.major.version>1.18</flink.major.version>
<flink.version>1.19.1</flink.version>
<flink.major.version>1.19</flink.major.version>
<flink.shaded.version>17.0</flink.shaded.version>
<debezium.version>1.9.8.Final</debezium.version>
<tikv.version>3.2.0</tikv.version>
@ -356,6 +356,13 @@ limitations under the License.
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -568,6 +575,7 @@ limitations under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<id>shade-flink</id>

@ -27,7 +27,7 @@ limitations under the License.
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.1</flink.version>
<flink.version>1.19.1</flink.version>
<flink.cdc.version>3.3-SNAPSHOT</flink.cdc.version>
<debezium.version>1.9.7.Final</debezium.version>
<scala.binary.version>2.12</scala.binary.version>

Loading…
Cancel
Save