|
|
|
@ -23,6 +23,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
|
|
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
|
|
|
|
import org.apache.flink.api.common.typeutils.TypeSerializer;
|
|
|
|
|
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
|
|
|
|
|
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
|
|
|
|
|
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHook;
|
|
|
|
|
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
|
|
|
|
|
import org.apache.flink.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
|
|
|
|
@ -31,9 +32,16 @@ import org.apache.flink.cdc.connectors.mysql.testutils.TestTable;
|
|
|
|
|
import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
|
|
|
|
|
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
|
|
|
|
|
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
|
|
|
|
|
import org.apache.flink.cdc.debezium.StringDebeziumDeserializationSchema;
|
|
|
|
|
import org.apache.flink.cdc.debezium.table.MetadataConverter;
|
|
|
|
|
import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
|
|
|
|
|
import org.apache.flink.core.execution.JobClient;
|
|
|
|
|
import org.apache.flink.metrics.Gauge;
|
|
|
|
|
import org.apache.flink.metrics.Metric;
|
|
|
|
|
import org.apache.flink.metrics.groups.OperatorMetricGroup;
|
|
|
|
|
import org.apache.flink.runtime.metrics.MetricNames;
|
|
|
|
|
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
|
|
|
|
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
|
|
|
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
|
|
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
|
|
|
|
@ -95,13 +103,16 @@ import java.util.stream.Collectors;
|
|
|
|
|
import static java.lang.String.format;
|
|
|
|
|
import static org.apache.flink.api.common.JobStatus.RUNNING;
|
|
|
|
|
import static org.apache.flink.util.Preconditions.checkState;
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
|
|
|
|
/** IT tests for {@link MySqlSource}. */
|
|
|
|
|
@RunWith(Parameterized.class)
|
|
|
|
|
public class MySqlSourceITCase extends MySqlSourceTestBase {
|
|
|
|
|
|
|
|
|
|
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
|
|
|
|
|
public static final Duration TIMEOUT = Duration.ofSeconds(300);
|
|
|
|
|
|
|
|
|
|
@Rule public final Timeout timeoutPerTest = Timeout.seconds(TIMEOUT.getSeconds());
|
|
|
|
|
|
|
|
|
|
private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
|
|
|
|
|
private final UniqueDatabase customDatabase =
|
|
|
|
@ -686,6 +697,123 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
@Test
|
|
|
|
|
public void testSourceMetrics() throws Exception {
|
|
|
|
|
customDatabase.createAndInitialize();
|
|
|
|
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
|
|
env.setParallelism(1);
|
|
|
|
|
MySqlSource<String> source =
|
|
|
|
|
MySqlSource.<String>builder()
|
|
|
|
|
.hostname(MYSQL_CONTAINER.getHost())
|
|
|
|
|
.port(MYSQL_CONTAINER.getDatabasePort())
|
|
|
|
|
.databaseList(customDatabase.getDatabaseName())
|
|
|
|
|
.tableList(customDatabase.getDatabaseName() + ".customers")
|
|
|
|
|
.username(customDatabase.getUsername())
|
|
|
|
|
.password(customDatabase.getPassword())
|
|
|
|
|
.deserializer(new StringDebeziumDeserializationSchema())
|
|
|
|
|
.serverId(getServerId())
|
|
|
|
|
.build();
|
|
|
|
|
DataStreamSource<String> stream =
|
|
|
|
|
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
|
|
|
|
|
CollectResultIterator<String> iterator = addCollector(env, stream);
|
|
|
|
|
JobClient jobClient = env.executeAsync();
|
|
|
|
|
iterator.setJobClient(jobClient);
|
|
|
|
|
|
|
|
|
|
// ---------------------------- Snapshot phase ------------------------------
|
|
|
|
|
// Wait until we receive all 21 snapshot records
|
|
|
|
|
int numSnapshotRecordsExpected = 21;
|
|
|
|
|
int numSnapshotRecordsReceived = 0;
|
|
|
|
|
while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && iterator.hasNext()) {
|
|
|
|
|
iterator.next();
|
|
|
|
|
numSnapshotRecordsReceived++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check metrics
|
|
|
|
|
List<OperatorMetricGroup> metricGroups =
|
|
|
|
|
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MySQL CDC Source");
|
|
|
|
|
// There should be only 1 parallelism of source, so it's safe to get the only group
|
|
|
|
|
OperatorMetricGroup group = metricGroups.get(0);
|
|
|
|
|
Map<String, Metric> metrics = metricReporter.getMetricsByGroup(group);
|
|
|
|
|
|
|
|
|
|
// numRecordsOut
|
|
|
|
|
assertEquals(
|
|
|
|
|
numSnapshotRecordsExpected,
|
|
|
|
|
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());
|
|
|
|
|
|
|
|
|
|
// currentEmitEventTimeLag should be UNDEFINED during snapshot phase
|
|
|
|
|
assertTrue(metrics.containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG));
|
|
|
|
|
Gauge<Long> currentEmitEventTimeLag =
|
|
|
|
|
(Gauge<Long>) metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
|
|
|
|
|
assertEquals(
|
|
|
|
|
InternalSourceReaderMetricGroup.UNDEFINED,
|
|
|
|
|
(long) currentEmitEventTimeLag.getValue());
|
|
|
|
|
|
|
|
|
|
// currentFetchEventTimeLag should be UNDEFINED during snapshot phase
|
|
|
|
|
assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG));
|
|
|
|
|
Gauge<Long> currentFetchEventTimeLag =
|
|
|
|
|
(Gauge<Long>) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
|
|
|
|
|
assertEquals(
|
|
|
|
|
MySqlSourceReaderMetrics.UNDEFINED, (long) currentFetchEventTimeLag.getValue());
|
|
|
|
|
|
|
|
|
|
// sourceIdleTime should be positive (we can't know the exact value)
|
|
|
|
|
assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME));
|
|
|
|
|
Gauge<Long> sourceIdleTime = (Gauge<Long>) metrics.get(MetricNames.SOURCE_IDLE_TIME);
|
|
|
|
|
assertTrue(sourceIdleTime.getValue() > 0);
|
|
|
|
|
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());
|
|
|
|
|
|
|
|
|
|
// --------------------------------- Binlog phase -----------------------------
|
|
|
|
|
makeFirstPartBinlogEvents(getConnection(), customDatabase.qualifiedTableName("customers"));
|
|
|
|
|
// Wait until we receive 4 changes made above
|
|
|
|
|
int numBinlogRecordsExpected = 4;
|
|
|
|
|
int numBinlogRecordsReceived = 0;
|
|
|
|
|
while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) {
|
|
|
|
|
iterator.next();
|
|
|
|
|
numBinlogRecordsReceived++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check metrics
|
|
|
|
|
// numRecordsOut
|
|
|
|
|
assertEquals(
|
|
|
|
|
numSnapshotRecordsExpected + numBinlogRecordsExpected,
|
|
|
|
|
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());
|
|
|
|
|
|
|
|
|
|
// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
|
|
|
|
|
assertTrue(currentEmitEventTimeLag.getValue() > 0);
|
|
|
|
|
assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis());
|
|
|
|
|
|
|
|
|
|
// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
|
|
|
|
|
assertTrue(currentFetchEventTimeLag.getValue() > 0);
|
|
|
|
|
assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis());
|
|
|
|
|
|
|
|
|
|
// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
|
|
|
|
|
assertTrue(sourceIdleTime.getValue() > 0);
|
|
|
|
|
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());
|
|
|
|
|
|
|
|
|
|
jobClient.cancel().get();
|
|
|
|
|
iterator.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <T> CollectResultIterator<T> addCollector(
|
|
|
|
|
StreamExecutionEnvironment env, DataStream<T> stream) {
|
|
|
|
|
TypeSerializer<T> serializer =
|
|
|
|
|
stream.getTransformation().getOutputType().createSerializer(env.getConfig());
|
|
|
|
|
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
|
|
|
|
|
CollectSinkOperatorFactory<T> factory =
|
|
|
|
|
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
|
|
|
|
|
CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator();
|
|
|
|
|
CollectResultIterator<T> iterator =
|
|
|
|
|
new CollectResultIterator<>(
|
|
|
|
|
operator.getOperatorIdFuture(),
|
|
|
|
|
serializer,
|
|
|
|
|
accumulatorName,
|
|
|
|
|
env.getCheckpointConfig());
|
|
|
|
|
CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
|
|
|
|
|
sink.name("Data stream collect sink");
|
|
|
|
|
env.addOperator(sink.getTransformation());
|
|
|
|
|
return iterator;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private MySqlSource<RowData> buildSleepingSource() {
|
|
|
|
|
ResolvedSchema physicalSchema =
|
|
|
|
|
new ResolvedSchema(
|
|
|
|
|