[common] Fix schema history run out of memory
Fix schema history run out of memory, this close (#207) Co-authored-by: Leonard Xu <xbjtdcq@gmail.com>pull/237/head
parent
5195647576
commit
fcea3d14f7
@ -0,0 +1,199 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.ververica.cdc.debezium.internal;
|
||||
|
||||
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
|
||||
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.StateUtils;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.Tables;
|
||||
import io.debezium.relational.ddl.DdlParser;
|
||||
import io.debezium.relational.history.DatabaseHistory;
|
||||
import io.debezium.relational.history.DatabaseHistoryException;
|
||||
import io.debezium.relational.history.DatabaseHistoryListener;
|
||||
import io.debezium.relational.history.HistoryRecord;
|
||||
import io.debezium.relational.history.HistoryRecordComparator;
|
||||
import io.debezium.relational.history.JsonTableChangeSerializer;
|
||||
import io.debezium.relational.history.TableChanges;
|
||||
import io.debezium.schema.DatabaseSchema;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static io.debezium.relational.history.TableChanges.TableChange;
|
||||
|
||||
/**
|
||||
* The {@link FlinkDatabaseSchemaHistory} only stores the latest schema of the monitored tables.
|
||||
* When recovering from the checkpoint, it should apply all the tables to the {@link
|
||||
* DatabaseSchema}, which doesn't need to replay the history anymore.
|
||||
*
|
||||
* <p>Considering the data structure maintained in the {@link FlinkDatabaseSchemaHistory} is much
|
||||
* different from the {@link FlinkDatabaseHistory}, it's not compatible with the {@link
|
||||
* FlinkDatabaseHistory}. Because it only maintains the latest schema of the table rather than all
|
||||
* history DDLs, it's useful to prevent OOM when meet massive history DDLs.
|
||||
*/
|
||||
public class FlinkDatabaseSchemaHistory implements DatabaseHistory {
|
||||
|
||||
public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
|
||||
|
||||
private final JsonTableChangeSerializer tableChangesSerializer =
|
||||
new JsonTableChangeSerializer();
|
||||
|
||||
private ConcurrentMap<TableId, SchemaRecord> latestTables;
|
||||
private String instanceName;
|
||||
private DatabaseHistoryListener listener;
|
||||
private boolean storeOnlyMonitoredTablesDdl;
|
||||
private boolean skipUnparseableDDL;
|
||||
private boolean useCatalogBeforeSchema;
|
||||
|
||||
@Override
|
||||
public void configure(
|
||||
Configuration config,
|
||||
HistoryRecordComparator comparator,
|
||||
DatabaseHistoryListener listener,
|
||||
boolean useCatalogBeforeSchema) {
|
||||
this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
|
||||
this.listener = listener;
|
||||
this.storeOnlyMonitoredTablesDdl = config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);
|
||||
this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
|
||||
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
|
||||
|
||||
// recover
|
||||
this.latestTables = new ConcurrentHashMap<>();
|
||||
for (SchemaRecord schemaRecord : StateUtils.retrieveHistory(instanceName)) {
|
||||
// validate here
|
||||
TableChange tableChange =
|
||||
JsonTableChangeSerializer.fromDocument(
|
||||
schemaRecord.toDocument(), useCatalogBeforeSchema);
|
||||
latestTables.put(tableChange.getId(), schemaRecord);
|
||||
}
|
||||
// register
|
||||
StateUtils.registerHistory(instanceName, latestTables.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
listener.started();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void record(
|
||||
Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl)
|
||||
throws DatabaseHistoryException {
|
||||
throw new UnsupportedOperationException(
|
||||
String.format(
|
||||
"The %s cannot work with 'debezium.internal.implementation' = 'legacy',"
|
||||
+ "please use %s",
|
||||
FlinkDatabaseSchemaHistory.class.getCanonicalName(),
|
||||
FlinkDatabaseHistory.class.getCanonicalName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void record(
|
||||
Map<String, ?> source,
|
||||
Map<String, ?> position,
|
||||
String databaseName,
|
||||
String schemaName,
|
||||
String ddl,
|
||||
TableChanges changes)
|
||||
throws DatabaseHistoryException {
|
||||
for (TableChanges.TableChange change : changes) {
|
||||
switch (change.getType()) {
|
||||
case CREATE:
|
||||
case ALTER:
|
||||
latestTables.put(
|
||||
change.getId(),
|
||||
new SchemaRecord(tableChangesSerializer.toDocument(change)));
|
||||
break;
|
||||
case DROP:
|
||||
latestTables.remove(change.getId());
|
||||
break;
|
||||
default:
|
||||
// impossible
|
||||
throw new RuntimeException(
|
||||
String.format("Unknown change type: %s.", change.getType()));
|
||||
}
|
||||
}
|
||||
listener.onChangeApplied(
|
||||
new HistoryRecord(source, position, databaseName, schemaName, ddl, changes));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recover(
|
||||
Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
|
||||
listener.recoveryStarted();
|
||||
for (SchemaRecord record : latestTables.values()) {
|
||||
TableChange tableChange =
|
||||
JsonTableChangeSerializer.fromDocument(
|
||||
record.getTableChangeDoc(), useCatalogBeforeSchema);
|
||||
schema.overwriteTable(tableChange.getTable());
|
||||
}
|
||||
listener.recoveryStopped();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (instanceName != null) {
|
||||
DebeziumSourceFunction.StateUtils.removeHistory(instanceName);
|
||||
}
|
||||
listener.stopped();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exists() {
|
||||
return latestTables != null && !latestTables.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean storageExists() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initializeStorage() {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean storeOnlyMonitoredTables() {
|
||||
return storeOnlyMonitoredTablesDdl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean skipUnparseableDdlStatements() {
|
||||
return skipUnparseableDDL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the {@link FlinkDatabaseSchemaHistory} is compatible with the specified
|
||||
* state.
|
||||
*/
|
||||
public static boolean isCompatible(Collection<SchemaRecord> records) {
|
||||
for (SchemaRecord record : records) {
|
||||
if (!record.isTableChangeRecord()) {
|
||||
return false;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.ververica.cdc.debezium.internal;
|
||||
|
||||
import io.debezium.document.Document;
|
||||
import io.debezium.relational.history.HistoryRecord;
|
||||
import io.debezium.relational.history.TableChanges.TableChange;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* The Record represents a schema change event, it contains either one {@link HistoryRecord} or
|
||||
* {@link TableChange}.
|
||||
*
|
||||
* <p>The {@link HistoryRecord} will be used by {@link FlinkDatabaseHistory} which keeps full
|
||||
* history of table change events for all tables, the {@link TableChange} will be used by {@link
|
||||
* FlinkDatabaseSchemaHistory} which keeps the latest latest table change for each table.
|
||||
*/
|
||||
public class SchemaRecord {
|
||||
|
||||
@Nullable private final HistoryRecord historyRecord;
|
||||
|
||||
@Nullable private final Document tableChangeDoc;
|
||||
|
||||
public SchemaRecord(HistoryRecord historyRecord) {
|
||||
this.historyRecord = historyRecord;
|
||||
this.tableChangeDoc = null;
|
||||
}
|
||||
|
||||
public SchemaRecord(Document document) {
|
||||
if (isHistoryRecordDocument(document)) {
|
||||
this.historyRecord = new HistoryRecord(document);
|
||||
this.tableChangeDoc = null;
|
||||
} else {
|
||||
this.tableChangeDoc = document;
|
||||
this.historyRecord = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public HistoryRecord getHistoryRecord() {
|
||||
return historyRecord;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public Document getTableChangeDoc() {
|
||||
return tableChangeDoc;
|
||||
}
|
||||
|
||||
public boolean isHistoryRecord() {
|
||||
return historyRecord != null;
|
||||
}
|
||||
|
||||
public boolean isTableChangeRecord() {
|
||||
return tableChangeDoc != null;
|
||||
}
|
||||
|
||||
public Document toDocument() {
|
||||
if (historyRecord != null) {
|
||||
return historyRecord.document();
|
||||
} else {
|
||||
return tableChangeDoc;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isHistoryRecordDocument(Document document) {
|
||||
return new HistoryRecord(document).isValid();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue