[oracle][test] Use prebuilt docker image in tests

pull/1089/head
gongzhongqiang 3 years ago committed by Leonard Xu
parent 59b310a24c
commit 4cf36ad22f

@ -157,7 +157,7 @@ public class JdbcSourceEventDispatcher extends EventDispatcher<TableId> {
TableId dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter)
throws InterruptedException {
if (dataCollectionId != null && !filter.isIncluded(dataCollectionId)) {
if (historizedSchema == null || historizedSchema.storeOnlyMonitoredTables()) {
if (historizedSchema == null || historizedSchema.storeOnlyCapturedTables()) {
LOG.trace("Filtering schema change event for {}", dataCollectionId);
return;
}
@ -182,7 +182,7 @@ public class JdbcSourceEventDispatcher extends EventDispatcher<TableId> {
}
}
if (!anyNonfilteredEvent) {
if (historizedSchema == null || historizedSchema.storeOnlyMonitoredTables()) {
if (historizedSchema == null || historizedSchema.storeOnlyCapturedTables()) {
LOG.trace("Filtering schema change event for {}", dataCollectionIds);
return;
}

@ -130,7 +130,7 @@ public class EmbeddedFlinkDatabaseHistory implements DatabaseHistory {
}
@Override
public boolean storeOnlyMonitoredTables() {
public boolean storeOnlyCapturedTables() {
return storeOnlyMonitoredTablesDdl;
}

@ -107,7 +107,9 @@ public class MySqlScanFetchTask implements FetchTask<SourceSplitBase> {
split);
SnapshotSplitChangeEventSourceContext changeEventSourceContext =
new SnapshotSplitChangeEventSourceContext();
SnapshotResult snapshotResult = snapshotSplitReadTask.execute(changeEventSourceContext);
SnapshotResult snapshotResult =
snapshotSplitReadTask.execute(
changeEventSourceContext, sourceFetchContext.getOffsetContext());
final StreamSplit backfillBinlogSplit = createBackfillBinlogSplit(changeEventSourceContext);
// optimization that skip the binlog read when the low watermark equals high
@ -128,7 +130,9 @@ public class MySqlScanFetchTask implements FetchTask<SourceSplitBase> {
if (snapshotResult.isCompletedOrSkipped()) {
final MySqlBinlogSplitReadTask backfillBinlogReadTask =
createBackfillBinlogReadTask(backfillBinlogSplit, sourceFetchContext);
backfillBinlogReadTask.execute(new SnapshotBinlogSplitChangeEventSourceContext());
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContext(),
sourceFetchContext.getOffsetContext());
} else {
taskRunning = false;
throw new IllegalStateException(
@ -213,7 +217,7 @@ public class MySqlScanFetchTask implements FetchTask<SourceSplitBase> {
MySqlConnection jdbcConnection,
JdbcSourceEventDispatcher dispatcher,
SnapshotSplit snapshotSplit) {
super(connectorConfig, previousOffset, snapshotProgressListener);
super(connectorConfig, snapshotProgressListener);
this.offsetContext = previousOffset;
this.connectorConfig = connectorConfig;
this.databaseSchema = databaseSchema;
@ -225,7 +229,8 @@ public class MySqlScanFetchTask implements FetchTask<SourceSplitBase> {
}
@Override
public SnapshotResult execute(ChangeEventSourceContext context)
public SnapshotResult execute(
ChangeEventSourceContext context, OffsetContext previousOffset)
throws InterruptedException {
SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);
final SnapshotContext ctx;
@ -236,7 +241,7 @@ public class MySqlScanFetchTask implements FetchTask<SourceSplitBase> {
throw new RuntimeException(e);
}
try {
return doExecute(context, ctx, snapshottingTask);
return doExecute(context, previousOffset, ctx, snapshottingTask);
} catch (InterruptedException e) {
LOG.warn("Snapshot was interrupted before completion");
throw e;
@ -248,6 +253,7 @@ public class MySqlScanFetchTask implements FetchTask<SourceSplitBase> {
@Override
protected SnapshotResult doExecute(
ChangeEventSourceContext context,
OffsetContext previousOffset,
SnapshotContext snapshotContext,
SnapshottingTask snapshottingTask)
throws Exception {

@ -69,7 +69,8 @@ public class MySqlStreamFetchTask implements FetchTask<SourceSplitBase> {
split);
BinlogSplitChangeEventSourceContext changeEventSourceContext =
new BinlogSplitChangeEventSourceContext();
binlogSplitReadTask.execute(changeEventSourceContext);
binlogSplitReadTask.execute(
changeEventSourceContext, sourceFetchContext.getOffsetContext());
}
@Override
@ -106,7 +107,6 @@ public class MySqlStreamFetchTask implements FetchTask<SourceSplitBase> {
StreamSplit binlogSplit) {
super(
connectorConfig,
offsetContext,
connection,
dispatcher,
errorHandler,
@ -120,14 +120,15 @@ public class MySqlStreamFetchTask implements FetchTask<SourceSplitBase> {
}
@Override
public void execute(ChangeEventSourceContext context) throws InterruptedException {
public void execute(ChangeEventSourceContext context, MySqlOffsetContext offsetContext)
throws InterruptedException {
this.context = context;
super.execute(context);
super.execute(context, offsetContext);
}
@Override
protected void handleEvent(Event event) {
super.handleEvent(event);
protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
super.handleEvent(offsetContext, event);
// check do we need to stop for fetch binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset =

@ -96,6 +96,8 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
" 'password' = '" + ORACLE_TEST_PASSWORD + "',",
" 'database-name' = 'XE',",
" 'schema-name' = 'debezium',",
" 'debezium.log.mining.strategy' = 'online_catalog',",
" 'debezium.log.mining.continuous.mine' = 'true',",
" 'table-name' = 'products'",
");",
"CREATE TABLE products_sink (",
@ -169,7 +171,7 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
expectResult,
"products_sink",
new String[] {"id", "name", "description", "weight"},
60000L);
150000L);
}
private Connection getOracleJdbcConnection() throws SQLException {

@ -43,6 +43,7 @@ import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,6 +58,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -166,10 +168,10 @@ public class OracleSourceTest extends AbstractTestBase {
}
@Test
@Ignore("Enable the test once DBZ-4997 fixed")
public void testCheckpointAndRestore() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
int prevPos = 0;
{
// ---------------------------------------------------------------------------
// Step-1: start the source from empty state
@ -221,15 +223,10 @@ public class OracleSourceTest extends AbstractTestBase {
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server"));
// assertEquals("mysql-bin.000003", JsonPath.read(state, "$.sourceOffset.file"));
assertFalse(state.contains("row"));
assertFalse(state.contains("server_id"));
assertFalse(state.contains("event"));
// int pos = JsonPath.read(state, "$.sourceOffset.pos");
// assertTrue(pos > prevPos);
// prevPos = pos;
source.cancel();
source.close();
runThread.sync();
}
@ -282,7 +279,6 @@ public class OracleSourceTest extends AbstractTestBase {
}
// cancel the source
source2.cancel();
source2.close();
runThread2.sync();
}
@ -333,7 +329,6 @@ public class OracleSourceTest extends AbstractTestBase {
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server"));
source3.cancel();
source3.close();
runThread3.sync();
}
@ -371,13 +366,13 @@ public class OracleSourceTest extends AbstractTestBase {
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server"));
source4.cancel();
source4.close();
runThread4.sync();
}
}
@Test
@Ignore("Debezium Oracle connector don't monitor unknown tables since 1.6, see DBZ-3612")
public void testRecoverFromRenameOperation() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
@ -434,7 +429,6 @@ public class OracleSourceTest extends AbstractTestBase {
assertTrue(historyState.list.size() > 0);
assertTrue(offsetState.list.size() > 0);
source.cancel();
source.close();
runThread.sync();
}
@ -465,7 +459,6 @@ public class OracleSourceTest extends AbstractTestBase {
assertEquals(1, records.size());
assertInsert(records.get(0), "ID", 113);
source2.cancel();
source2.close();
runThread2.sync();
}
@ -580,7 +573,8 @@ public class OracleSourceTest extends AbstractTestBase {
}
private OracleSource.Builder<SourceRecord> basicSourceBuilder(OracleContainer oracleContainer) {
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("database.history.store.only.monitored.tables.ddl", "false");
return OracleSource.<SourceRecord>builder()
.hostname(oracleContainer.getHost())
.port(oracleContainer.getOraclePort())
@ -588,6 +582,7 @@ public class OracleSourceTest extends AbstractTestBase {
.tableList("debezium" + "." + "products") // monitor table "products"
.username(oracleContainer.getUsername())
.password(oracleContainer.getPassword())
.debeziumProperties(debeziumProperties)
.deserializer(new ForwardDeserializeSchema());
}
@ -597,6 +592,7 @@ public class OracleSourceTest extends AbstractTestBase {
LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs();
while (allRecords.size() < expectedRecordCount) {
StreamRecord<T> record = queue.poll(100, TimeUnit.SECONDS);
System.out.println(record);
if (record != null) {
allRecords.add(record.getValue());
} else {

@ -19,7 +19,6 @@
package com.ververica.cdc.connectors.oracle.utils;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.images.builder.ImageFromDockerfile;
import java.sql.Connection;
import java.sql.DriverManager;
@ -27,16 +26,22 @@ import java.sql.SQLException;
/** Utility class for oracle tests. */
public class OracleTestUtils {
public static final OracleContainer ORACLE_CONTAINER =
new OracleContainer(
new ImageFromDockerfile("oracle-xe-11g-tmp")
.withFileFromClasspath(".", "docker")
.withFileFromClasspath(
"assets/activate-archivelog.sh",
"docker/assets/activate-archivelog.sh")
.withFileFromClasspath(
"assets/activate-archivelog.sql",
"docker/assets/activate-archivelog.sql"));
// You can build OracleContainer from official oracle docker image in following way, we use
// prebuilt image for time cost consideration
// ----------------- begin --------------------------
// new OracleContainer(new ImageFromDockerfile("oracle-xe-11g-tmp")
// .withFileFromClasspath(".", "docker")
// .withFileFromClasspath(
// "assets/activate-archivelog.sh",
// "docker/assets/activate-archivelog.sh")
// .withFileFromClasspath(
// "assets/activate-archivelog.sql",
// "docker/assets/activate-archivelog.sql")
// ----------------- end --------------------------
private static final String ORACLE_IMAGE = "jark/oracle-xe-11g-r2-cdc:0.1";
public static final OracleContainer ORACLE_CONTAINER = new OracleContainer(ORACLE_IMAGE);
public static final String CONNECTOR_USER = "dbzuser";

Loading…
Cancel
Save