|
|
|
@ -16,19 +16,23 @@
|
|
|
|
|
|
|
|
|
|
package com.ververica.cdc.connectors.mysql;
|
|
|
|
|
|
|
|
|
|
import org.apache.flink.api.common.JobStatus;
|
|
|
|
|
import org.apache.flink.api.common.state.BroadcastState;
|
|
|
|
|
import org.apache.flink.api.common.state.KeyedStateStore;
|
|
|
|
|
import org.apache.flink.api.common.state.ListState;
|
|
|
|
|
import org.apache.flink.api.common.state.ListStateDescriptor;
|
|
|
|
|
import org.apache.flink.api.common.state.MapStateDescriptor;
|
|
|
|
|
import org.apache.flink.api.common.state.OperatorStateStore;
|
|
|
|
|
import org.apache.flink.api.common.time.Deadline;
|
|
|
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
|
|
|
|
import org.apache.flink.configuration.Configuration;
|
|
|
|
|
import org.apache.flink.core.execution.JobClient;
|
|
|
|
|
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
|
|
|
|
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
|
|
|
|
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
|
|
|
|
|
import org.apache.flink.util.Collector;
|
|
|
|
|
import org.apache.flink.util.Preconditions;
|
|
|
|
|
import org.apache.flink.util.function.SupplierWithException;
|
|
|
|
|
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
|
|
|
|
|
import com.ververica.cdc.connectors.utils.TestSourceContext;
|
|
|
|
@ -43,6 +47,7 @@ import java.util.Properties;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
|
|
|
|
/** Utils to help test. */
|
|
|
|
|
public class MySqlTestUtils {
|
|
|
|
@ -108,6 +113,53 @@ public class MySqlTestUtils {
|
|
|
|
|
return allRecords;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static void waitUntilCondition(
|
|
|
|
|
SupplierWithException<Boolean, Exception> condition,
|
|
|
|
|
Deadline timeout,
|
|
|
|
|
long retryIntervalMillis,
|
|
|
|
|
String errorMsg)
|
|
|
|
|
throws Exception {
|
|
|
|
|
while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
|
|
|
|
|
long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
|
|
|
|
|
Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!timeout.hasTimeLeft()) {
|
|
|
|
|
throw new TimeoutException(errorMsg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static void waitForJobStatus(
|
|
|
|
|
JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception {
|
|
|
|
|
waitUntilCondition(
|
|
|
|
|
() -> {
|
|
|
|
|
JobStatus currentStatus = (JobStatus) client.getJobStatus().get();
|
|
|
|
|
if (expectedStatus.contains(currentStatus)) {
|
|
|
|
|
return true;
|
|
|
|
|
} else if (currentStatus.isTerminalState()) {
|
|
|
|
|
try {
|
|
|
|
|
client.getJobExecutionResult().get();
|
|
|
|
|
} catch (Exception var4) {
|
|
|
|
|
throw new IllegalStateException(
|
|
|
|
|
String.format(
|
|
|
|
|
"Job has entered %s state, but expecting %s",
|
|
|
|
|
currentStatus, expectedStatus),
|
|
|
|
|
var4);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
throw new IllegalStateException(
|
|
|
|
|
String.format(
|
|
|
|
|
"Job has entered a terminal state %s, but expecting %s",
|
|
|
|
|
currentStatus, expectedStatus));
|
|
|
|
|
} else {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
deadline,
|
|
|
|
|
100L,
|
|
|
|
|
"Condition was not met in given timeout.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static Properties createDebeziumProperties(boolean useLegacyImplementation) {
|
|
|
|
|
Properties debeziumProps = new Properties();
|
|
|
|
|
if (useLegacyImplementation) {
|
|
|
|
|