[mysql] Release the debezium reader thread resources after reading finished (#1358)

This closes #1350.
pull/1449/head
Matrix42 3 years ago committed by GitHub
parent 2de4903261
commit c96926b6ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,6 +16,7 @@
package com.ververica.cdc.connectors.mysql.debezium.reader;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
@ -51,6 +52,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey;
@ -65,7 +67,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
private static final Logger LOG = LoggerFactory.getLogger(BinlogSplitReader.class);
private final StatefulTaskContext statefulTaskContext;
private final ExecutorService executor;
private final ExecutorService executorService;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile boolean currentTaskRunning;
@ -79,11 +81,13 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
private final Set<TableId> pureBinlogPhaseTables;
private Tables.TableFilter capturedTableFilter;
private static final long READER_CLOSE_TIMEOUT = 30L;
public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) {
this.statefulTaskContext = statefulTaskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = true;
this.pureBinlogPhaseTables = new HashSet<>();
}
@ -108,7 +112,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
currentBinlogSplit);
executor.submit(
executorService.submit(
() -> {
try {
binlogSplitReadTask.execute(
@ -176,6 +180,17 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
if (statefulTaskContext.getBinaryLogClient() != null) {
statefulTaskContext.getBinaryLogClient().disconnect();
}
// set currentTaskRunning to false to terminate the
// while loop in MySqlStreamingChangeEventSource's execute method
currentTaskRunning = false;
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.warn(
"Failed to close the binlog split reader in {} seconds.",
READER_CLOSE_TIMEOUT);
}
}
} catch (Exception e) {
LOG.error("Close binlog reader error", e);
}
@ -284,4 +299,9 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
public void stopBinlogReadTask() {
this.currentTaskRunning = false;
}
@VisibleForTesting
public ExecutorService getExecutorService() {
return executorService;
}
}

@ -16,6 +16,7 @@
package com.ververica.cdc.connectors.mysql.debezium.reader;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -54,6 +55,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.formatMessageTimestamp;
@ -73,7 +75,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReader.class);
private final StatefulTaskContext statefulTaskContext;
private final ExecutorService executor;
private final ExecutorService executorService;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile boolean currentTaskRunning;
@ -86,11 +88,13 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
public AtomicBoolean hasNextElement;
public AtomicBoolean reachEnd;
private static final long READER_CLOSE_TIMEOUT = 30L;
public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
this.statefulTaskContext = statefulTaskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subtaskId).build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = false;
this.hasNextElement = new AtomicBoolean(false);
this.reachEnd = new AtomicBoolean(false);
@ -114,7 +118,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
statefulTaskContext.getSnapshotReceiver(),
StatefulTaskContext.getClock(),
currentSnapshotSplit);
executor.submit(
executorService.submit(
() -> {
try {
currentTaskRunning = true;
@ -328,11 +332,24 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
if (statefulTaskContext.getBinaryLogClient() != null) {
statefulTaskContext.getBinaryLogClient().disconnect();
}
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.warn(
"Failed to close the snapshot split reader in {} seconds.",
READER_CLOSE_TIMEOUT);
}
}
} catch (Exception e) {
LOG.error("Close snapshot reader error", e);
}
}
@VisibleForTesting
public ExecutorService getExecutorService() {
return executorService;
}
/**
* {@link ChangeEventSource.ChangeEventSourceContext} implementation that keeps low/high
* watermark for each {@link MySqlSnapshotSplit}.

@ -65,6 +65,8 @@ import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** Tests for {@link BinlogSplitReader}. */
public class BinlogSplitReaderTest extends MySqlSourceTestBase {
@ -441,6 +443,10 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
}
}
}
snapshotSplitReader.close();
assertNotNull(snapshotSplitReader.getExecutorService());
assertTrue(snapshotSplitReader.getExecutorService().isTerminated());
// step-2: create binlog split according the finished snapshot splits
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
@ -483,6 +489,11 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
pollRecordsFromReader(binlogReader, RecordUtils::isDataChangeRecord),
dataType));
}
binlogReader.close();
assertNotNull(snapshotSplitReader.getExecutorService());
assertTrue(snapshotSplitReader.getExecutorService().isTerminated());
return actual;
}

@ -53,6 +53,7 @@ import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -411,6 +412,11 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
if (binaryLogClient != null) {
binaryLogClient.disconnect();
}
snapshotSplitReader.close();
assertNotNull(snapshotSplitReader.getExecutorService());
assertTrue(snapshotSplitReader.getExecutorService().isTerminated());
return formatResult(result, dataType);
}

Loading…
Cancel
Save