[postgres] Flush LSN offset to PG even if no updates to the monitored tables to avoid infinite WAL segments (#111)

This closes #111
pull/334/head
赵万梓 4 years ago committed by Jark Wu
parent c1c96a6ac7
commit c34d9776e7
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -107,6 +107,11 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
SourceRecord record = event.value();
if (isHeartbeatEvent(record)) {
// keep offset update
synchronized (checkpointLock) {
debeziumOffset.setSourcePartition(record.sourcePartition());
debeziumOffset.setSourceOffset(record.sourceOffset());
}
// drop heartbeat events
continue;
}

@ -82,7 +82,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
@Test
public void testConsumingAllEvents() throws Exception {
DebeziumSourceFunction<SourceRecord> source = createPostgreSqlSource();
DebeziumSourceFunction<SourceRecord> source = createPostgreSqlSourceWithHeartbeatDisabled();
TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
setupSource(source);
@ -152,7 +152,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
// ---------------------------------------------------------------------------
// Step-1: start the source from empty state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source = createPostgreSqlSource();
final DebeziumSourceFunction<SourceRecord> source = createPostgreSqlSourceWithHeartbeatDisabled();
// we use blocking context to block the source to emit before last snapshot record
final BlockingSourceContext<SourceRecord> sourceContext = new BlockingSourceContext<>(8);
// setup source with empty state
@ -211,7 +211,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
// ---------------------------------------------------------------------------
// Step-3: restore the source from state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source2 = createPostgreSqlSource();
final DebeziumSourceFunction<SourceRecord> source2 = createPostgreSqlSourceWithHeartbeatDisabled();
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
setupSource(source2, true, offsetState, historyState, true, 0, 1);
final CheckedThread runThread2 = new CheckedThread() {
@ -266,7 +266,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
// ---------------------------------------------------------------------------
// Step-5: restore the source from checkpoint-2
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source3 = createPostgreSqlSource();
final DebeziumSourceFunction<SourceRecord> source3 = createPostgreSqlSourceWithHeartbeatDisabled();
final TestSourceContext<SourceRecord> sourceContext3 = new TestSourceContext<>();
setupSource(source3, true, offsetState, historyState, true, 0, 1);
@ -320,7 +320,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
// ---------------------------------------------------------------------------
// Step-7: restore the source from checkpoint-3
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source4 = createPostgreSqlSource();
final DebeziumSourceFunction<SourceRecord> source4 = createPostgreSqlSourceWithHeartbeatDisabled();
final TestSourceContext<SourceRecord> sourceContext4 = new TestSourceContext<>();
setupSource(source4, true, offsetState, historyState, true, 0, 1);
@ -367,7 +367,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
// ---------------------------------------------------------------------------
// Step-1: start the source from empty state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source = createPostgreSqlSource();
final DebeziumSourceFunction<SourceRecord> source = createPostgreSqlSourceWithHeartbeatEnabled();
final TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
@ -416,7 +416,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
// ---------------------------------------------------------------------------
// Step-3: restore the source from state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source2 = createPostgreSqlSource();
final DebeziumSourceFunction<SourceRecord> source2 = createPostgreSqlSourceWithHeartbeatEnabled();
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
// setup source with empty state
setupSource(source2, true, offsetState, historyState, true, 0, 1);
@ -432,12 +432,23 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
assertFalse(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(0, source2, sourceContext2, 401);
Thread.sleep(3_000); // waiting heartbeat events, we set 1s heartbeat interval
Thread.sleep(3_000); // waiting heartbeat events, we have set 1s heartbeat interval
// trigger checkpoint once again to make sure ChangeConsumer is initialized
batchInsertAndCheckpoint(0, source2, sourceContext2, 402);
source2.notifyCheckpointComplete(402);
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
// verify LSN is advanced even if there is no changes on the table
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
// we have to do some transactions which is not related to the monitored table
statement.execute("CREATE TABLE dummy (a int)");
}
Thread.sleep(3_000);
batchInsertAndCheckpoint(0, source2, sourceContext2, 404);
source2.notifyCheckpointComplete(404);
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(3, source2, sourceContext2, 501);
batchInsertAndCheckpoint(2, source2, sourceContext2, 502);
batchInsertAndCheckpoint(1, source2, sourceContext2, 503);
@ -454,7 +465,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
runThread.sync();
}
assertEquals(4, flushLsn.size());
assertEquals(5, flushLsn.size());
}
private void batchInsertAndCheckpoint(
@ -480,9 +491,17 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
// Utilities
// ------------------------------------------------------------------------------------------
private DebeziumSourceFunction<SourceRecord> createPostgreSqlSource() {
private DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled() {
return createPostgreSqlSource(0);
}
private DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatEnabled() {
return createPostgreSqlSource(1000);
}
private DebeziumSourceFunction<SourceRecord> createPostgreSqlSource(int heartbeatInterval) {
Properties properties = new Properties();
properties.setProperty("heartbeat.interval.ms", "1000");
properties.setProperty("heartbeat.interval.ms", String.valueOf(heartbeatInterval));
return PostgreSQLSource.<SourceRecord>builder()
.hostname(POSTGERS_CONTAINER.getHost())
.port(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT))

Loading…
Cancel
Save