[mysql] Simplify EmbeddedFlinkDatabaseHistory to not record schema changes

pull/270/head
Jark Wu 4 years ago
parent ae54f0b11a
commit a42f6ae61c
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -33,14 +33,11 @@ import io.debezium.relational.history.TableChanges.TableChange;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.SchemaStateUtils.registerHistory;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.SchemaStateUtils.removeHistory;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.SchemaStateUtils.retrieveHistory;
/**
* A {@link DatabaseHistory} implementation which store the latest table schema in Flink state.
*
@ -53,12 +50,10 @@ public class EmbeddedFlinkDatabaseHistory implements DatabaseHistory {
public static final ConcurrentMap<String, Collection<TableChange>> TABLE_SCHEMAS =
new ConcurrentHashMap<>();
private ConcurrentMap<TableId, TableChange> tableSchemas;
private String instanceName;
private Map<TableId, TableChange> tableSchemas;
private DatabaseHistoryListener listener;
private boolean storeOnlyMonitoredTablesDdl;
private boolean skipUnparseableDDL;
private boolean useCatalogBeforeSchema;
@Override
public void configure(
@ -66,19 +61,16 @@ public class EmbeddedFlinkDatabaseHistory implements DatabaseHistory {
HistoryRecordComparator comparator,
DatabaseHistoryListener listener,
boolean useCatalogBeforeSchema) {
this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
this.listener = listener;
this.storeOnlyMonitoredTablesDdl = config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);
this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
// recover
this.tableSchemas = new ConcurrentHashMap<>();
for (TableChange tableChange : retrieveHistory(instanceName)) {
String instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
this.tableSchemas = new HashMap<>();
for (TableChange tableChange : removeHistory(instanceName)) {
tableSchemas.put(tableChange.getId(), tableChange);
}
// register
registerHistory(instanceName, tableSchemas.values());
}
@Override
@ -104,21 +96,6 @@ public class EmbeddedFlinkDatabaseHistory implements DatabaseHistory {
throws DatabaseHistoryException {
final HistoryRecord record =
new HistoryRecord(source, position, databaseName, schemaName, ddl, changes);
for (TableChange change : changes) {
switch (change.getType()) {
case CREATE:
case ALTER:
tableSchemas.put(change.getId(), change);
break;
case DROP:
tableSchemas.remove(change.getId());
break;
default:
// impossible
throw new RuntimeException(
String.format("Unknown change type: %s.", change.getType()));
}
}
listener.onChangeApplied(record);
}
@ -134,9 +111,6 @@ public class EmbeddedFlinkDatabaseHistory implements DatabaseHistory {
@Override
public void stop() {
if (instanceName != null) {
removeHistory(instanceName);
}
listener.stopped();
}
@ -165,20 +139,12 @@ public class EmbeddedFlinkDatabaseHistory implements DatabaseHistory {
return skipUnparseableDDL;
}
/** Utils to get/put/remove the table schema. */
public static final class SchemaStateUtils {
public static void registerHistory(
String engineName, Collection<TableChange> engineHistory) {
public static void registerHistory(String engineName, Collection<TableChange> engineHistory) {
TABLE_SCHEMAS.put(engineName, engineHistory);
}
public static Collection<TableChange> retrieveHistory(String engineName) {
return TABLE_SCHEMAS.getOrDefault(engineName, Collections.emptyList());
}
public static void removeHistory(String engineName) {
TABLE_SCHEMAS.remove(engineName);
}
public static Collection<TableChange> removeHistory(String engineName) {
Collection<TableChange> tableChanges = TABLE_SCHEMAS.remove(engineName);
return tableChanges != null ? tableChanges : Collections.emptyList();
}
}

@ -21,7 +21,6 @@ package com.alibaba.ververica.cdc.connectors.mysql.debezium.task.context;
import org.apache.flink.configuration.Configuration;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.SchemaStateUtils;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
@ -109,7 +108,7 @@ public class StatefulTaskContext {
final boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();
this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
final MySqlValueConverters valueConverters = getValueConverters(connectorConfig);
SchemaStateUtils.registerHistory(
EmbeddedFlinkDatabaseHistory.registerHistory(
dezConf.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
mySqlSplit.getTableSchemas().values());
this.databaseSchema =

@ -38,18 +38,15 @@ public abstract class MySqlSplitState {
}
/** Casts this split state into a {@link MySqlSnapshotSplitState}. */
@SuppressWarnings("unchecked")
public final MySqlSnapshotSplitState asSnapshotSplitState() {
return (MySqlSnapshotSplitState) this;
}
/** Casts this split state into a {@link MySqlBinlogSplitState}. */
@SuppressWarnings("unchecked")
public final MySqlBinlogSplitState asBinlogSplitState() {
return (MySqlBinlogSplitState) this;
}
/** Use the current split state to create a new MySQLSplit. */
@SuppressWarnings("unchecked")
public abstract MySqlSplit toMySqlSplit();
}

@ -49,7 +49,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_OPTIMIZE_INTEGRAL_KEY;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SPLIT_COLUMN;
@ -176,7 +175,6 @@ public class MySqlTableSource implements ScanTableSource {
private Configuration getParallelSourceConf(RowType pkRowType) {
Map<String, String> properties = new HashMap<>();
properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName());
properties.put("database.history.instance.name", DATABASE_HISTORY_INSTANCE_NAME);
properties.put("database.hostname", checkNotNull(hostname));
properties.put("database.user", checkNotNull(username));
properties.put("database.password", checkNotNull(password));

Loading…
Cancel
Save