[FLINK-35243][cdc-common] Extends more schema change event types support

This close  #3521.
pull/3559/head
yuxiqian 5 months ago committed by GitHub
parent d3473de4db
commit 78fda8b5ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -17,16 +17,23 @@
package org.apache.flink.cdc.common.event;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A {@link SchemaChangeEvent} that represents an {@code ALTER COLUMN} DDL, which may contain the
* lenient column type changes.
*/
public class AlterColumnTypeEvent implements SchemaChangeEvent {
@PublicEvolving
public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, SchemaChangeEvent {
private static final long serialVersionUID = 1L;
@ -35,9 +42,21 @@ public class AlterColumnTypeEvent implements SchemaChangeEvent {
/** key => column name, value => column type after changing. */
private final Map<String, DataType> typeMapping;
private final Map<String, DataType> oldTypeMapping;
public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = new HashMap<>();
}
public AlterColumnTypeEvent(
TableId tableId,
Map<String, DataType> typeMapping,
Map<String, DataType> oldTypeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = oldTypeMapping;
}
/** Returns the type mapping. */
@ -55,22 +74,34 @@ public class AlterColumnTypeEvent implements SchemaChangeEvent {
}
AlterColumnTypeEvent that = (AlterColumnTypeEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(typeMapping, that.typeMapping);
&& Objects.equals(typeMapping, that.typeMapping)
&& Objects.equals(oldTypeMapping, that.oldTypeMapping);
}
@Override
public int hashCode() {
return Objects.hash(tableId, typeMapping);
return Objects.hash(tableId, typeMapping, oldTypeMapping);
}
@Override
public String toString() {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", nameMapping="
+ typeMapping
+ '}';
if (hasPreSchema()) {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", typeMapping="
+ typeMapping
+ ", oldTypeMapping="
+ oldTypeMapping
+ '}';
} else {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", typeMapping="
+ typeMapping
+ '}';
}
}
@Override
@ -78,6 +109,39 @@ public class AlterColumnTypeEvent implements SchemaChangeEvent {
return tableId;
}
public Map<String, DataType> getOldTypeMapping() {
return oldTypeMapping;
}
@Override
public boolean hasPreSchema() {
return !oldTypeMapping.isEmpty();
}
@Override
public void fillPreSchema(Schema oldTypeSchema) {
oldTypeMapping.clear();
oldTypeMapping.putAll(
oldTypeSchema.getColumns().stream()
.filter(e -> typeMapping.containsKey(e.getName()) && e.getType() != null)
.collect(Collectors.toMap(Column::getName, Column::getType)));
}
@Override
public boolean trimRedundantChanges() {
if (hasPreSchema()) {
Set<String> redundantlyChangedColumns =
typeMapping.keySet().stream()
.filter(e -> Objects.equals(typeMapping.get(e), oldTypeMapping.get(e)))
.collect(Collectors.toSet());
// Remove redundant alter column type records that doesn't really change the type
typeMapping.keySet().removeAll(redundantlyChangedColumns);
oldTypeMapping.keySet().removeAll(redundantlyChangedColumns);
}
return !typeMapping.isEmpty();
}
@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ALTER_COLUMN_TYPE;

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.source.DataSource;
import java.util.Objects;
/**
* A {@link SchemaChangeEvent} that represents an {@code DROP TABLE} DDL. this will be sent by
* {@link DataSource} before all {@link DataChangeEvent} with the same tableId.
*/
@PublicEvolving
public class DropTableEvent implements SchemaChangeEvent {
private static final long serialVersionUID = 1L;
private final TableId tableId;
public DropTableEvent(TableId tableId) {
this.tableId = tableId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DropTableEvent)) {
return false;
}
DropTableEvent that = (DropTableEvent) o;
return Objects.equals(tableId, that.tableId);
}
@Override
public int hashCode() {
return Objects.hash(tableId);
}
@Override
public String toString() {
return "DropTableEvent{" + "tableId=" + tableId + '}';
}
@Override
public TableId tableId() {
return tableId;
}
@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.DROP_TABLE;
}
}

@ -17,6 +17,8 @@
package org.apache.flink.cdc.common.event;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import java.util.Map;
import java.util.Objects;
@ -24,6 +26,7 @@ import java.util.Objects;
* A {@link SchemaChangeEvent} that represents an {@code RENAME COLUMN} DDL, which may contain the
* lenient column type changes.
*/
@PublicEvolving
public class RenameColumnEvent implements SchemaChangeEvent {
private static final long serialVersionUID = 1L;

@ -26,7 +26,9 @@ public enum SchemaChangeEventType {
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
RENAME_COLUMN;
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE;
public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
@ -37,8 +39,12 @@ public enum SchemaChangeEventType {
return CREATE_TABLE;
} else if (event instanceof DropColumnEvent) {
return DROP_COLUMN;
} else if (event instanceof DropTableEvent) {
return DROP_TABLE;
} else if (event instanceof RenameColumnEvent) {
return RENAME_COLUMN;
} else if (event instanceof TruncateTableEvent) {
return TRUNCATE_TABLE;
} else {
throw new RuntimeException("Unknown schema change event type: " + event.getClass());
}
@ -54,8 +60,12 @@ public enum SchemaChangeEventType {
return CREATE_TABLE;
case "drop.column":
return DROP_COLUMN;
case "drop.table":
return DROP_TABLE;
case "rename.column":
return RENAME_COLUMN;
case "truncate.table":
return TRUNCATE_TABLE;
default:
throw new RuntimeException("Unknown schema change event type: " + tag);
}

@ -19,6 +19,14 @@ package org.apache.flink.cdc.common.event;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
/**
* An enumeration of schema change event families for clustering {@link SchemaChangeEvent}s into
* categories.
@ -26,31 +34,30 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving;
@PublicEvolving
public class SchemaChangeEventTypeFamily {
public static final SchemaChangeEventType[] ADD = {SchemaChangeEventType.ADD_COLUMN};
public static final SchemaChangeEventType[] ADD = {ADD_COLUMN};
public static final SchemaChangeEventType[] ALTER = {SchemaChangeEventType.ALTER_COLUMN_TYPE};
public static final SchemaChangeEventType[] ALTER = {ALTER_COLUMN_TYPE};
public static final SchemaChangeEventType[] CREATE = {SchemaChangeEventType.CREATE_TABLE};
public static final SchemaChangeEventType[] CREATE = {CREATE_TABLE};
public static final SchemaChangeEventType[] DROP = {SchemaChangeEventType.DROP_COLUMN};
public static final SchemaChangeEventType[] DROP = {DROP_COLUMN, DROP_TABLE};
public static final SchemaChangeEventType[] RENAME = {SchemaChangeEventType.RENAME_COLUMN};
public static final SchemaChangeEventType[] RENAME = {RENAME_COLUMN};
public static final SchemaChangeEventType[] TABLE = {SchemaChangeEventType.CREATE_TABLE};
public static final SchemaChangeEventType[] TABLE = {CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE};
public static final SchemaChangeEventType[] COLUMN = {
SchemaChangeEventType.ADD_COLUMN,
SchemaChangeEventType.ALTER_COLUMN_TYPE,
SchemaChangeEventType.DROP_COLUMN,
SchemaChangeEventType.RENAME_COLUMN
ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN
};
public static final SchemaChangeEventType[] ALL = {
SchemaChangeEventType.ADD_COLUMN,
SchemaChangeEventType.CREATE_TABLE,
SchemaChangeEventType.ALTER_COLUMN_TYPE,
SchemaChangeEventType.DROP_COLUMN,
SchemaChangeEventType.RENAME_COLUMN
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE
};
public static final SchemaChangeEventType[] NONE = {};

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.schema.Schema;
/** A {@link SchemaChangeEvent} that supports appending schema before change event. */
@PublicEvolving
public interface SchemaChangeEventWithPreSchema extends SchemaChangeEvent {
/** Describes if this event already has schema before change info. */
boolean hasPreSchema();
/** Append schema before change info to this event. */
void fillPreSchema(Schema oldSchema);
/** Check if this event contains redundant schema change request only. */
default boolean trimRedundantChanges() {
return false;
}
}

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.source.DataSource;
import java.util.Objects;
/**
* A {@link SchemaChangeEvent} that represents an {@code TRUNCATE TABLE} DDL. this will be sent by
* {@link DataSource} before all {@link DataChangeEvent} with the same tableId.
*/
@PublicEvolving
public class TruncateTableEvent implements SchemaChangeEvent {
private static final long serialVersionUID = 1L;
private final TableId tableId;
public TruncateTableEvent(TableId tableId) {
this.tableId = tableId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TruncateTableEvent)) {
return false;
}
TruncateTableEvent that = (TruncateTableEvent) o;
return Objects.equals(tableId, that.tableId);
}
@Override
public int hashCode() {
return Objects.hash(tableId);
}
@Override
public String toString() {
return "TruncateTableEvent{" + "tableId=" + tableId + '}';
}
@Override
public TableId tableId() {
return tableId;
}
@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.TRUNCATE_TABLE;
}
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
/** Visitor for {@link AddColumnEvent}s. */
@Internal
@FunctionalInterface
public interface AddColumnEventVisitor<T, E extends Throwable> {
T visit(AddColumnEvent event) throws E;
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
/** Visitor for {@link AlterColumnTypeEvent}s. */
@Internal
@FunctionalInterface
public interface AlterColumnTypeEventVisitor<T, E extends Throwable> {
T visit(AlterColumnTypeEvent event) throws E;
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.CreateTableEvent;
/** Visitor for {@link CreateTableEvent}s. */
@Internal
@FunctionalInterface
public interface CreateTableEventVisitor<T, E extends Throwable> {
T visit(CreateTableEvent event) throws E;
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.DropColumnEvent;
/** Visitor for {@link DropColumnEvent}s. */
@Internal
@FunctionalInterface
public interface DropColumnEventVisitor<T, E extends Throwable> {
T visit(DropColumnEvent event) throws E;
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.DropTableEvent;
/** Visitor for {@link DropTableEvent}s. */
@Internal
@FunctionalInterface
public interface DropTableEventVisitor<T, E extends Throwable> {
T visit(DropTableEvent event) throws E;
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
/** Visitor for {@link RenameColumnEvent}s. */
@Internal
@FunctionalInterface
public interface RenameColumnEventVisitor<T, E extends Throwable> {
T visit(RenameColumnEvent event) throws E;
}

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
/** Visitor clas for all {@link SchemaChangeEvent}s and returns a specific typed object. */
@Internal
public class SchemaChangeEventVisitor {
public static <T, E extends Throwable> T visit(
SchemaChangeEvent event,
AddColumnEventVisitor<T, E> addColumnVisitor,
AlterColumnTypeEventVisitor<T, E> alterColumnTypeEventVisitor,
CreateTableEventVisitor<T, E> createTableEventVisitor,
DropColumnEventVisitor<T, E> dropColumnEventVisitor,
DropTableEventVisitor<T, E> dropTableEventVisitor,
RenameColumnEventVisitor<T, E> renameColumnEventVisitor,
TruncateTableEventVisitor<T, E> truncateTableEventVisitor)
throws E {
if (event instanceof AddColumnEvent) {
if (addColumnVisitor == null) {
return null;
}
return addColumnVisitor.visit((AddColumnEvent) event);
} else if (event instanceof AlterColumnTypeEvent) {
if (alterColumnTypeEventVisitor == null) {
return null;
}
return alterColumnTypeEventVisitor.visit((AlterColumnTypeEvent) event);
} else if (event instanceof CreateTableEvent) {
if (createTableEventVisitor == null) {
return null;
}
return createTableEventVisitor.visit((CreateTableEvent) event);
} else if (event instanceof DropColumnEvent) {
if (dropColumnEventVisitor == null) {
return null;
}
return dropColumnEventVisitor.visit((DropColumnEvent) event);
} else if (event instanceof DropTableEvent) {
if (dropTableEventVisitor == null) {
return null;
}
return dropTableEventVisitor.visit((DropTableEvent) event);
} else if (event instanceof RenameColumnEvent) {
if (renameColumnEventVisitor == null) {
return null;
}
return renameColumnEventVisitor.visit((RenameColumnEvent) event);
} else if (event instanceof TruncateTableEvent) {
if (truncateTableEventVisitor == null) {
return null;
}
return truncateTableEventVisitor.visit((TruncateTableEvent) event);
} else {
throw new IllegalArgumentException(
"Unknown schema change event type " + event.getType());
}
}
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.event.visitor;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
/** Visitor for {@link TruncateTableEvent}s. */
@Internal
@FunctionalInterface
public interface TruncateTableEventVisitor<T, E extends Throwable> {
T visit(TruncateTableEvent event) throws E;
}

@ -23,11 +23,14 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import java.util.ArrayList;
import java.util.Arrays;
@ -65,30 +68,22 @@ public class ChangeEventUtils {
public static SchemaChangeEvent recreateSchemaChangeEvent(
SchemaChangeEvent schemaChangeEvent, TableId tableId) {
if (schemaChangeEvent instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
return new CreateTableEvent(tableId, createTableEvent.getSchema());
}
if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) schemaChangeEvent;
return new AlterColumnTypeEvent(tableId, alterColumnTypeEvent.getTypeMapping());
}
if (schemaChangeEvent instanceof RenameColumnEvent) {
RenameColumnEvent renameColumnEvent = (RenameColumnEvent) schemaChangeEvent;
return new RenameColumnEvent(tableId, renameColumnEvent.getNameMapping());
}
if (schemaChangeEvent instanceof DropColumnEvent) {
DropColumnEvent dropColumnEvent = (DropColumnEvent) schemaChangeEvent;
return new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames());
}
if (schemaChangeEvent instanceof AddColumnEvent) {
AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent;
return new AddColumnEvent(tableId, addColumnEvent.getAddedColumns());
}
throw new UnsupportedOperationException(
String.format(
"Unsupported schema change event with type \"%s\"",
schemaChangeEvent.getClass().getCanonicalName()));
return SchemaChangeEventVisitor.visit(
schemaChangeEvent,
addColumnEvent -> new AddColumnEvent(tableId, addColumnEvent.getAddedColumns()),
alterColumnEvent ->
new AlterColumnTypeEvent(
tableId,
alterColumnEvent.getTypeMapping(),
alterColumnEvent.getOldTypeMapping()),
createTableEvent -> new CreateTableEvent(tableId, createTableEvent.getSchema()),
dropColumnEvent ->
new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames()),
dropTableEvent -> new DropTableEvent(tableId),
renameColumnEvent ->
new RenameColumnEvent(tableId, renameColumnEvent.getNameMapping()),
truncateTableEvent -> new TruncateTableEvent(tableId));
}
public static Set<SchemaChangeEventType> resolveSchemaEvolutionOptions(

@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
@ -244,20 +245,15 @@ public class SchemaUtils {
/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
return applyAddColumnEvent((AddColumnEvent) event, schema);
} else if (event instanceof DropColumnEvent) {
return applyDropColumnEvent((DropColumnEvent) event, schema);
} else if (event instanceof RenameColumnEvent) {
return applyRenameColumnEvent((RenameColumnEvent) event, schema);
} else if (event instanceof AlterColumnTypeEvent) {
return applyAlterColumnTypeEvent((AlterColumnTypeEvent) event, schema);
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported schema change event type \"%s\"",
event.getClass().getCanonicalName()));
}
return SchemaChangeEventVisitor.visit(
event,
addColumnEvent -> applyAddColumnEvent(addColumnEvent, schema),
alterColumnTypeEvent -> applyAlterColumnTypeEvent(alterColumnTypeEvent, schema),
createTableEvent -> createTableEvent.getSchema(),
dropColumnEvent -> applyDropColumnEvent(dropColumnEvent, schema),
dropTableEvent -> schema,
renameColumnEvent -> applyRenameColumnEvent(renameColumnEvent, schema),
truncateTableEvent -> schema);
}
private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) {

@ -17,7 +17,6 @@
package org.apache.flink.cdc.common.utils;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.Test;
@ -28,91 +27,108 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat;
/** A test for the {@link org.apache.flink.cdc.common.utils.ChangeEventUtils}. */
public class ChangeEventUtilsTest {
@Test
public void testResolveSchemaEvolutionOptions() {
Assertions.assertThat(
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.emptyList()))
.isEqualTo(
Sets.set(
TRUNCATE_TABLE,
RENAME_COLUMN,
CREATE_TABLE,
ADD_COLUMN,
DROP_TABLE,
ALTER_COLUMN_TYPE,
DROP_COLUMN,
RENAME_COLUMN));
ADD_COLUMN,
DROP_COLUMN));
Assertions.assertThat(
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.singletonList("drop")))
.isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN));
.isEqualTo(
Sets.set(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
RENAME_COLUMN,
CREATE_TABLE,
TRUNCATE_TABLE));
Assertions.assertThat(
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Arrays.asList("create", "add"), Collections.emptyList()))
.isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN));
.isEqualTo(Sets.set(ADD_COLUMN, CREATE_TABLE));
Assertions.assertThat(
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.singletonList("column"),
Collections.singletonList("drop.column")))
.isEqualTo(Sets.set(ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN));
Assertions.assertThat(
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.singletonList("drop.column")))
.isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN));
.isEqualTo(
Sets.set(
ADD_COLUMN,
DROP_TABLE,
TRUNCATE_TABLE,
RENAME_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE));
}
@Test
public void testResolveSchemaEvolutionTag() {
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("all"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("all"))
.isEqualTo(
Arrays.asList(
ADD_COLUMN,
CREATE_TABLE,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
RENAME_COLUMN));
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column"))
.isEqualTo(
Arrays.asList(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table"))
.isEqualTo(Collections.singletonList(CREATE_TABLE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename"))
.isEqualTo(Collections.singletonList(RENAME_COLUMN));
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table"))
.isEqualTo(Arrays.asList(CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename.column"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename.column"))
.isEqualTo(Collections.singletonList(RENAME_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop"))
.isEqualTo(Collections.singletonList(DROP_COLUMN));
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop"))
.isEqualTo(Arrays.asList(DROP_COLUMN, DROP_TABLE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop.column"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop.column"))
.isEqualTo(Collections.singletonList(DROP_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create"))
.isEqualTo(Collections.singletonList(CREATE_TABLE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create.table"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create.table"))
.isEqualTo(Collections.singletonList(CREATE_TABLE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter"))
.isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.type"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.type"))
.isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add"))
.isEqualTo(Collections.singletonList(ADD_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column"))
.isEqualTo(Collections.singletonList(ADD_COLUMN));
}
}

@ -40,7 +40,7 @@ import java.util.Map;
public class SchemaUtilsTest {
@Test
public void testApplySchemaChangeEvent() {
public void testApplyColumnSchemaChangeEvent() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()

@ -798,7 +798,7 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={age=BIGINT, id=BIGINT}, oldTypeMapping={age=INT, id=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, student], after=[], op=DELETE, meta=()}",
@ -1004,7 +1004,7 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={age=BIGINT, id=BIGINT}, oldTypeMapping={age=INT, id=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}",

@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Column;
@ -46,12 +47,10 @@ import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
@ -60,6 +59,7 @@ import java.util.Map;
import java.util.Set;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
@ -93,52 +93,66 @@ public class DorisMetadataApplier implements MetadataApplier {
@Override
public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
return Sets.newHashSet(ADD_COLUMN, DROP_COLUMN, RENAME_COLUMN);
return Sets.newHashSet(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN);
}
@Override
public void applySchemaChange(SchemaChangeEvent event) throws SchemaEvolveException {
public void applySchemaChange(SchemaChangeEvent event) {
SchemaChangeEventVisitor.<Void, SchemaEvolveException>visit(
event,
addColumnEvent -> {
applyAddColumnEvent(addColumnEvent);
return null;
},
alterColumnTypeEvent -> {
applyAlterColumnTypeEvent(alterColumnTypeEvent);
return null;
},
createTableEvent -> {
applyCreateTableEvent(createTableEvent);
return null;
},
dropColumnEvent -> {
applyDropColumnEvent(dropColumnEvent);
return null;
},
dropTableEvent -> {
throw new UnsupportedSchemaChangeEventException(event);
},
renameColumnEvent -> {
applyRenameColumnEvent(renameColumnEvent);
return null;
},
truncateTableEvent -> {
throw new UnsupportedSchemaChangeEventException(event);
});
}
private void applyCreateTableEvent(CreateTableEvent event) throws SchemaEvolveException {
try {
// send schema change op to doris
if (event instanceof CreateTableEvent) {
applyCreateTableEvent((CreateTableEvent) event);
} else if (event instanceof AddColumnEvent) {
applyAddColumnEvent((AddColumnEvent) event);
} else if (event instanceof DropColumnEvent) {
applyDropColumnEvent((DropColumnEvent) event);
} else if (event instanceof RenameColumnEvent) {
applyRenameColumnEvent((RenameColumnEvent) event);
} else if (event instanceof AlterColumnTypeEvent) {
applyAlterColumnTypeEvent((AlterColumnTypeEvent) event);
Schema schema = event.getSchema();
TableId tableId = event.tableId();
TableSchema tableSchema = new TableSchema();
tableSchema.setTable(tableId.getTableName());
tableSchema.setDatabase(tableId.getSchemaName());
tableSchema.setFields(buildFields(schema));
tableSchema.setDistributeKeys(buildDistributeKeys(schema));
if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
tableSchema.setModel(DataModel.DUPLICATE);
} else {
throw new UnsupportedSchemaChangeEventException(event);
tableSchema.setKeys(schema.primaryKeys());
tableSchema.setModel(DataModel.UNIQUE);
}
} catch (Exception ex) {
throw new SchemaEvolveException(event, ex.getMessage(), null);
}
}
private void applyCreateTableEvent(CreateTableEvent event)
throws IOException, IllegalArgumentException {
Schema schema = event.getSchema();
TableId tableId = event.tableId();
TableSchema tableSchema = new TableSchema();
tableSchema.setTable(tableId.getTableName());
tableSchema.setDatabase(tableId.getSchemaName());
tableSchema.setFields(buildFields(schema));
tableSchema.setDistributeKeys(buildDistributeKeys(schema));
if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
tableSchema.setModel(DataModel.DUPLICATE);
} else {
tableSchema.setKeys(schema.primaryKeys());
tableSchema.setModel(DataModel.UNIQUE);
Map<String, String> tableProperties =
DorisDataSinkOptions.getPropertiesByPrefix(
config, TABLE_CREATE_PROPERTIES_PREFIX);
tableSchema.setProperties(tableProperties);
schemaChangeManager.createTable(tableSchema);
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
Map<String, String> tableProperties =
DorisDataSinkOptions.getPropertiesByPrefix(config, TABLE_CREATE_PROPERTIES_PREFIX);
tableSchema.setProperties(tableProperties);
schemaChangeManager.createTable(tableSchema);
}
private Map<String, FieldSchema> buildFields(Schema schema) {
@ -191,59 +205,74 @@ public class DorisMetadataApplier implements MetadataApplier {
}
}
private void applyAddColumnEvent(AddColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns();
for (AddColumnEvent.ColumnWithPosition col : addedColumns) {
Column column = col.getAddColumn();
FieldSchema addFieldSchema =
new FieldSchema(
column.getName(),
buildTypeString(column.getType()),
column.getDefaultValueExpression(),
column.getComment());
schemaChangeManager.addColumn(
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
private void applyAddColumnEvent(AddColumnEvent event) throws SchemaEvolveException {
try {
TableId tableId = event.tableId();
List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns();
for (AddColumnEvent.ColumnWithPosition col : addedColumns) {
Column column = col.getAddColumn();
FieldSchema addFieldSchema =
new FieldSchema(
column.getName(),
buildTypeString(column.getType()),
column.getDefaultValueExpression(),
column.getComment());
schemaChangeManager.addColumn(
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyDropColumnEvent(DropColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
List<String> droppedColumns = event.getDroppedColumnNames();
for (String col : droppedColumns) {
schemaChangeManager.dropColumn(tableId.getSchemaName(), tableId.getTableName(), col);
private void applyDropColumnEvent(DropColumnEvent event) throws SchemaEvolveException {
try {
TableId tableId = event.tableId();
List<String> droppedColumns = event.getDroppedColumnNames();
for (String col : droppedColumns) {
schemaChangeManager.dropColumn(
tableId.getSchemaName(), tableId.getTableName(), col);
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyRenameColumnEvent(RenameColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
Map<String, String> nameMapping = event.getNameMapping();
for (Map.Entry<String, String> entry : nameMapping.entrySet()) {
schemaChangeManager.renameColumn(
tableId.getSchemaName(),
tableId.getTableName(),
entry.getKey(),
entry.getValue());
private void applyRenameColumnEvent(RenameColumnEvent event) throws SchemaEvolveException {
try {
TableId tableId = event.tableId();
Map<String, String> nameMapping = event.getNameMapping();
for (Map.Entry<String, String> entry : nameMapping.entrySet()) {
schemaChangeManager.renameColumn(
tableId.getSchemaName(),
tableId.getTableName(),
entry.getKey(),
entry.getValue());
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
Map<String, DataType> typeMapping = event.getTypeMapping();
for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
schemaChangeManager.modifyColumnDataType(
tableId.getSchemaName(),
tableId.getTableName(),
new FieldSchema(
entry.getKey(),
buildTypeString(entry.getValue()),
null)); // Currently, AlterColumnTypeEvent carries no comment info. This
// will be fixed after FLINK-35243 got merged.
throws SchemaEvolveException {
try {
TableId tableId = event.tableId();
Map<String, DataType> typeMapping = event.getTypeMapping();
for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
schemaChangeManager.modifyColumnDataType(
tableId.getSchemaName(),
tableId.getTableName(),
new FieldSchema(
entry.getKey(),
buildTypeString(entry.getValue()),
null)); // Currently, AlterColumnTypeEvent carries no comment info.
// This
// will be fixed after FLINK-35243 got merged.
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
}

@ -20,8 +20,10 @@ package org.apache.flink.cdc.connectors.mysql.source.parser;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.types.DataType;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
@ -277,6 +279,25 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
super.exitAlterByRenameColumn(ctx);
}
@Override
public void exitTruncateTable(MySqlParser.TruncateTableContext ctx) {
TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId());
changes.add(new TruncateTableEvent(toCdcTableId(tableId)));
super.exitTruncateTable(ctx);
}
@Override
public void exitDropTable(MySqlParser.DropTableContext ctx) {
ctx.tables()
.tableName()
.forEach(
evt -> {
TableId tableId = parser.parseQualifiedTableId(evt.fullId());
changes.add(new DropTableEvent(toCdcTableId(tableId)));
});
super.exitDropTable(ctx);
}
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(),

@ -25,9 +25,11 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
@ -61,6 +63,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
@ -345,11 +348,144 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("cols9", DataTypes.CHAR(1))))));
// Drop orders table first to remove foreign key restraints
statement.execute(
String.format(
"DROP TABLE `%s`.`orders`;", inventoryDatabase.getDatabaseName()));
statement.execute(
String.format(
"TRUNCATE TABLE `%s`.`products`;",
inventoryDatabase.getDatabaseName()));
expected.add(new TruncateTableEvent(tableId));
statement.execute(
String.format(
"DROP TABLE `%s`.`products`;", inventoryDatabase.getDatabaseName()));
expected.add(new DropTableEvent(tableId));
}
List<Event> actual = fetchResults(events, expected.size());
assertThat(actual).isEqualTo(expected);
}
@Test
public void testSchemaChangeEvents() throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
.port(MYSQL8_CONTAINER.getDatabasePort())
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(inventoryDatabase.getDatabaseName())
.tableList(inventoryDatabase.getDatabaseName() + ".*")
.startupOptions(StartupOptions.latest())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
Thread.sleep(5_000);
List<Event> expected =
new ArrayList<>(
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"ALTER TABLE `%s`.`customers` ADD COLUMN `newcol1` INT NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("newcol1", DataTypes.INT())))));
// Test MODIFY COLUMN DDL
statement.execute(
String.format(
"ALTER TABLE `%s`.`customers` MODIFY COLUMN `newcol1` DOUBLE;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AlterColumnTypeEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonMap("newcol1", DataTypes.DOUBLE())));
// Test CHANGE COLUMN DDL
statement.execute(
String.format(
"ALTER TABLE `%s`.`customers` CHANGE COLUMN `newcol1` `newcol2` INT;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AlterColumnTypeEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonMap("newcol1", DataTypes.INT())));
expected.add(
new RenameColumnEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonMap("newcol1", "newcol2")));
statement.execute(
String.format(
"ALTER TABLE `%s`.`customers` CHANGE COLUMN `newcol2` `newcol1` DOUBLE;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AlterColumnTypeEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonMap("newcol2", DataTypes.DOUBLE())));
expected.add(
new RenameColumnEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonMap("newcol2", "newcol1")));
// Test truncate table DDL
statement.execute(
String.format(
"TRUNCATE TABLE `%s`.`orders`;", inventoryDatabase.getDatabaseName()));
expected.add(
new TruncateTableEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "orders")));
// Test drop table DDL
statement.execute(
String.format(
"DROP TABLE `%s`.`orders`, `%s`.`customers`;",
inventoryDatabase.getDatabaseName(),
inventoryDatabase.getDatabaseName()));
expected.add(
new DropTableEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "orders")));
expected.add(
new DropTableEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers")));
}
List<Event> actual = fetchResults(events, expected.size());
assertEqualsInAnyOrder(
expected.stream().map(Object::toString).collect(Collectors.toList()),
actual.stream().map(Object::toString).collect(Collectors.toList()));
}
private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,
@ -362,6 +498,46 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
.build());
}
private List<CreateTableEvent> getInventoryCreateAllTableEvents(String databaseName) {
return Arrays.asList(
new CreateTableEvent(
TableId.tableId(databaseName, "products"),
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), "flink")
.physicalColumn("description", DataTypes.VARCHAR(512))
.physicalColumn("weight", DataTypes.FLOAT())
.primaryKey(Collections.singletonList("id"))
.build()),
new CreateTableEvent(
TableId.tableId(databaseName, "customers"),
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("first_name", DataTypes.VARCHAR(255).notNull())
.physicalColumn("last_name", DataTypes.VARCHAR(255).notNull())
.physicalColumn("email", DataTypes.VARCHAR(255).notNull())
.primaryKey(Collections.singletonList("id"))
.build()),
new CreateTableEvent(
TableId.tableId(databaseName, "orders"),
Schema.newBuilder()
.physicalColumn("order_number", DataTypes.INT().notNull())
.physicalColumn("order_date", DataTypes.DATE().notNull())
.physicalColumn("purchaser", DataTypes.INT().notNull())
.physicalColumn("quantity", DataTypes.INT().notNull())
.physicalColumn("product_id", DataTypes.INT().notNull())
.primaryKey(Collections.singletonList("order_number"))
.build()),
new CreateTableEvent(
TableId.tableId(databaseName, "multi_max_table"),
Schema.newBuilder()
.physicalColumn("order_id", DataTypes.VARCHAR(128).notNull())
.physicalColumn("index", DataTypes.INT().notNull())
.physicalColumn("desc", DataTypes.VARCHAR(512).notNull())
.primaryKey(Arrays.asList("order_id", "index"))
.build()));
}
private List<Event> getSnapshotExpected(TableId tableId) {
RowType rowType =
RowType.of(

@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Schema;
@ -117,110 +118,134 @@ public class PaimonMetadataApplier implements MetadataApplier {
if (catalog == null) {
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
}
SchemaChangeEventVisitor.visit(
schemaChangeEvent,
addColumnEvent -> {
applyAddColumn(addColumnEvent);
return null;
},
alterColumnTypeEvent -> {
applyAlterColumnType(alterColumnTypeEvent);
return null;
},
createTableEvent -> {
applyCreateTable(createTableEvent);
return null;
},
dropColumnEvent -> {
applyDropColumn(dropColumnEvent);
return null;
},
dropTableEvent -> {
throw new UnsupportedSchemaChangeEventException(dropTableEvent);
},
renameColumnEvent -> {
applyRenameColumn(renameColumnEvent);
return null;
},
truncateTableEvent -> {
throw new UnsupportedSchemaChangeEventException(truncateTableEvent);
});
}
private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveException {
try {
if (schemaChangeEvent instanceof CreateTableEvent) {
applyCreateTable((CreateTableEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof AddColumnEvent) {
applyAddColumn((AddColumnEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof DropColumnEvent) {
applyDropColumn((DropColumnEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof RenameColumnEvent) {
applyRenameColumn((RenameColumnEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent);
} else {
throw new UnsupportedSchemaChangeEventException(schemaChangeEvent);
if (!catalog.databaseExists(event.tableId().getSchemaName())) {
catalog.createDatabase(event.tableId().getSchemaName(), true);
}
} catch (Exception e) {
throw new SchemaEvolveException(schemaChangeEvent, "schema change applying failure", e);
Schema schema = event.getSchema();
org.apache.paimon.schema.Schema.Builder builder =
new org.apache.paimon.schema.Schema.Builder();
schema.getColumns()
.forEach(
(column) ->
builder.column(
column.getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(column.getType())
.getLogicalType())));
builder.primaryKey(schema.primaryKeys().toArray(new String[0]));
if (partitionMaps.containsKey(event.tableId())) {
builder.partitionKeys(partitionMaps.get(event.tableId()));
} else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) {
builder.partitionKeys(schema.partitionKeys());
}
builder.options(tableOptions);
builder.options(schema.options());
catalog.createTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
builder.build(),
true);
} catch (Catalog.TableAlreadyExistException
| Catalog.DatabaseNotExistException
| Catalog.DatabaseAlreadyExistException e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyCreateTable(CreateTableEvent event)
throws Catalog.DatabaseAlreadyExistException, Catalog.TableAlreadyExistException,
Catalog.DatabaseNotExistException {
if (!catalog.databaseExists(event.tableId().getSchemaName())) {
catalog.createDatabase(event.tableId().getSchemaName(), true);
}
Schema schema = event.getSchema();
org.apache.paimon.schema.Schema.Builder builder =
new org.apache.paimon.schema.Schema.Builder();
schema.getColumns()
.forEach(
(column) ->
builder.column(
column.getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(column.getType())
.getLogicalType())));
builder.primaryKey(schema.primaryKeys().toArray(new String[0]));
if (partitionMaps.containsKey(event.tableId())) {
builder.partitionKeys(partitionMaps.get(event.tableId()));
} else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) {
builder.partitionKeys(schema.partitionKeys());
private void applyAddColumn(AddColumnEvent event) throws SchemaEvolveException {
try {
List<SchemaChange> tableChangeList = applyAddColumnEventWithPosition(event);
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
| Catalog.ColumnNotExistException e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
builder.options(tableOptions);
builder.options(schema.options());
catalog.createTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
builder.build(),
true);
}
private void applyAddColumn(AddColumnEvent event)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
List<SchemaChange> tableChangeList = applyAddColumnEventWithPosition(event);
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
}
private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
throws Catalog.TableNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
SchemaChange tableChange;
switch (columnWithPosition.getPosition()) {
case FIRST:
tableChange =
SchemaChangeProvider.add(
columnWithPosition,
SchemaChange.Move.first(
columnWithPosition.getAddColumn().getName()));
tableChangeList.add(tableChange);
break;
case LAST:
SchemaChange schemaChangeWithLastPosition =
SchemaChangeProvider.add(columnWithPosition);
tableChangeList.add(schemaChangeWithLastPosition);
break;
case BEFORE:
SchemaChange schemaChangeWithBeforePosition =
applyAddColumnWithBeforePosition(
event.tableId().getSchemaName(),
event.tableId().getTableName(),
columnWithPosition);
tableChangeList.add(schemaChangeWithBeforePosition);
break;
case AFTER:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for AFTER position");
SchemaChange.Move after =
SchemaChange.Move.after(
columnWithPosition.getAddColumn().getName(),
columnWithPosition.getExistedColumnName());
tableChange = SchemaChangeProvider.add(columnWithPosition, after);
tableChangeList.add(tableChange);
break;
default:
throw new IllegalArgumentException(
"Unknown column position: " + columnWithPosition.getPosition());
throws SchemaEvolveException {
try {
List<SchemaChange> tableChangeList = new ArrayList<>();
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
SchemaChange tableChange;
switch (columnWithPosition.getPosition()) {
case FIRST:
tableChange =
SchemaChangeProvider.add(
columnWithPosition,
SchemaChange.Move.first(
columnWithPosition.getAddColumn().getName()));
tableChangeList.add(tableChange);
break;
case LAST:
SchemaChange schemaChangeWithLastPosition =
SchemaChangeProvider.add(columnWithPosition);
tableChangeList.add(schemaChangeWithLastPosition);
break;
case BEFORE:
SchemaChange schemaChangeWithBeforePosition =
applyAddColumnWithBeforePosition(
event.tableId().getSchemaName(),
event.tableId().getTableName(),
columnWithPosition);
tableChangeList.add(schemaChangeWithBeforePosition);
break;
case AFTER:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for AFTER position");
SchemaChange.Move after =
SchemaChange.Move.after(
columnWithPosition.getAddColumn().getName(),
columnWithPosition.getExistedColumnName());
tableChange = SchemaChangeProvider.add(columnWithPosition, after);
tableChangeList.add(tableChange);
break;
default:
throw new SchemaEvolveException(
event,
"Unknown column position: " + columnWithPosition.getPosition());
}
}
return tableChangeList;
} catch (Catalog.TableNotExistException e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
return tableChangeList;
}
private SchemaChange applyAddColumnWithBeforePosition(
@ -248,44 +273,58 @@ public class PaimonMetadataApplier implements MetadataApplier {
return index;
}
private void applyDropColumn(DropColumnEvent event)
throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
Catalog.ColumnNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getDroppedColumnNames()
.forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException {
try {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getDroppedColumnNames()
.forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
| Catalog.ColumnNotExistException e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyRenameColumn(RenameColumnEvent event)
throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
Catalog.ColumnNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getNameMapping()
.forEach(
(oldName, newName) ->
tableChangeList.add(SchemaChangeProvider.rename(oldName, newName)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveException {
try {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getNameMapping()
.forEach(
(oldName, newName) ->
tableChangeList.add(
SchemaChangeProvider.rename(oldName, newName)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
| Catalog.ColumnNotExistException e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyAlterColumn(AlterColumnTypeEvent event)
throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
Catalog.ColumnNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getTypeMapping()
.forEach(
(oldName, newType) ->
tableChangeList.add(
SchemaChangeProvider.updateColumnType(oldName, newType)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolveException {
try {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getTypeMapping()
.forEach(
(oldName, newType) ->
tableChangeList.add(
SchemaChangeProvider.updateColumnType(
oldName, newType)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
| Catalog.ColumnNotExistException e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
}

@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Column;
@ -97,19 +98,34 @@ public class StarRocksMetadataApplier implements MetadataApplier {
catalog.open();
}
if (schemaChangeEvent instanceof CreateTableEvent) {
applyCreateTable((CreateTableEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof AddColumnEvent) {
applyAddColumn((AddColumnEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof DropColumnEvent) {
applyDropColumn((DropColumnEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof RenameColumnEvent) {
applyRenameColumn((RenameColumnEvent) schemaChangeEvent);
} else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent);
} else {
throw new UnsupportedSchemaChangeEventException(schemaChangeEvent);
}
SchemaChangeEventVisitor.visit(
schemaChangeEvent,
addColumnEvent -> {
applyAddColumn(addColumnEvent);
return null;
},
alterColumnTypeEvent -> {
applyAlterColumnType(alterColumnTypeEvent);
return null;
},
createTableEvent -> {
applyCreateTable(createTableEvent);
return null;
},
dropColumnEvent -> {
applyDropColumn(dropColumnEvent);
return null;
},
dropTableEvent -> {
throw new UnsupportedSchemaChangeEventException(dropTableEvent);
},
renameColumnEvent -> {
applyRenameColumn(renameColumnEvent);
return null;
},
truncateTableEvent -> {
throw new UnsupportedSchemaChangeEventException(truncateTableEvent);
});
}
private void applyCreateTable(CreateTableEvent createTableEvent) throws SchemaEvolveException {
@ -289,7 +305,7 @@ public class StarRocksMetadataApplier implements MetadataApplier {
throw new UnsupportedSchemaChangeEventException(renameColumnEvent);
}
private void applyAlterColumn(AlterColumnTypeEvent alterColumnTypeEvent)
private void applyAlterColumnType(AlterColumnTypeEvent alterColumnTypeEvent)
throws SchemaEvolveException {
// TODO There are limitations for data type conversions. We should know the data types
// before and after changing so that we can make a validation. But the event only contains

@ -24,10 +24,12 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
@ -223,6 +225,13 @@ public class ValuesDatabase {
return builder.primaryKey(table.primaryKeys).build();
}
public static void applyTruncateTableEvent(TruncateTableEvent event) {
ValuesTable table = globalTables.get(event.tableId());
Preconditions.checkNotNull(table, event.tableId() + " is not existed");
table.applyTruncateTableEvent(event);
LOG.info("apply TruncateTableEvent: " + event);
}
public static void applyDataChangeEvent(DataChangeEvent event) {
ValuesTable table = globalTables.get(event.tableId());
Preconditions.checkNotNull(table, event.tableId() + " is not existed");
@ -237,6 +246,13 @@ public class ValuesDatabase {
globalTables.put(
tableId, new ValuesTable(tableId, ((CreateTableEvent) event).getSchema()));
}
} else if (event instanceof DropTableEvent) {
globalTables.remove(tableId);
} else if (event instanceof TruncateTableEvent) {
if (globalTables.containsKey(tableId)) {
ValuesTable table = globalTables.get(event.tableId());
table.applyTruncateTableEvent((TruncateTableEvent) event);
}
} else {
ValuesTable table = globalTables.get(event.tableId());
Preconditions.checkNotNull(table, event.tableId() + " is not existed");
@ -489,5 +505,9 @@ public class ValuesDatabase {
});
});
}
private void applyTruncateTableEvent(TruncateTableEvent event) {
records.clear();
}
}
}

@ -39,8 +39,6 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
/** End-to-end tests for mysql cdc pipeline job. */
@ -123,54 +121,24 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()));
List<String> expectedEvents =
Arrays.asList(
String.format(
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()));
validateResult(expectedEvents);
validateResult(
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}",
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}");
LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
@ -212,38 +180,159 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()));
expectedEvents =
Arrays.asList(
String.format(
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()));
validateResult(expectedEvents);
validateResult(
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}",
"AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}");
}
@Test
public void testSchemaChangeEvents() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()));
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()));
validateResult(
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}",
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}");
LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
// Perform DDL changes after the binlog is generated
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()));
LOG.info("Begin schema evolution stage.");
// Test AddColumnEvent
stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
stat.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110
stat.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111
stat.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
stat.execute("DELETE FROM products WHERE id=111;");
// Test AlterColumnTypeEvent
stat.execute("ALTER TABLE products MODIFY COLUMN new_col BIGINT;");
stat.execute(
"INSERT INTO products VALUES (default,'derrida','forever 21',2.1728, null, null, null, 2147483649);"); // 112
// Test RenameColumnEvent
stat.execute("ALTER TABLE products RENAME COLUMN new_col TO new_column;");
stat.execute(
"INSERT INTO products VALUES (default,'dynazenon','SSSS',2.1728, null, null, null, 2147483649);"); // 113
// Test DropColumnEvent
stat.execute("ALTER TABLE products DROP COLUMN new_column;");
stat.execute(
"INSERT INTO products VALUES (default,'evangelion','Eva',2.1728, null, null, null);"); // 114
// Test TruncateTableEvent
stat.execute("TRUNCATE TABLE products;");
// Test DropTableEvent. It's all over.
stat.execute("DROP TABLE products;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
waitUntilSpecificEvent(
String.format(
"DropTableEvent{tableId=%s.products}",
mysqlInventoryDatabase.getDatabaseName()));
validateResult(
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}",
"AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
"AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}",
"DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}",
"DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}",
"DropColumnEvent{tableId=%s.products, droppedColumnNames=[new_column]}",
"DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.products}",
"DropTableEvent{tableId=%s.products}");
}
private void validateResult(List<String> expectedEvents) throws Exception {
private void validateResult(String... expectedEvents) throws Exception {
String dbName = mysqlInventoryDatabase.getDatabaseName();
for (String event : expectedEvents) {
waitUntilSpecificEvent(event);
waitUntilSpecificEvent(String.format(event, dbName, dbName));
}
}

@ -221,7 +221,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=%s.TABLEBETA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}",
"AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}",
"RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}",
"DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}");
@ -305,7 +305,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.ALL, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}",
"AddColumnEvent{tableId=%s.ALL, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.ALL, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.ALL, nameMapping={VERSION=STRING}}",
"AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}",
"DataChangeEvent{tableId=%s.ALL, before=[], after=[10003, null, null, Fluorite], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.ALL, before=[], after=[10004, null, null, null], op=INSERT, meta=()}");
}
@ -398,7 +398,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}",
"AddColumnEvent{tableId=NEW_%s.ALPHABET, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}",
"AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}",
"RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}",
"DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}",
@ -504,7 +504,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
"AddColumnEvent{tableId=NEW_%s.BETAGAMM, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}",
"DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10002, null, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, nameMapping={VERSION=STRING}}",
"AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}",
"DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10003, null, Fluorite], op=INSERT, meta=()}",
"DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}",
"DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}");
@ -616,7 +616,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=NEW_%s.TABLEC, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=%s.TABLEBETA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}",
"AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}",
"RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}",
"DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}",
@ -706,7 +706,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.ALL, before=[], after=[10001, 12, Derrida, extras], op=INSERT, meta=()}",
"AddColumnEvent{tableId=%s.ALL, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.ALL, before=[], after=[10002, null, extras, null, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.ALL, nameMapping={VERSION=STRING}}",
"AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}",
"DataChangeEvent{tableId=%s.ALL, before=[], after=[10003, null, extras, null, Fluorite], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.ALL, before=[], after=[10004, null, extras, null, null], op=INSERT, meta=()}");
}
@ -800,7 +800,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}",
"AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}",
"DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, droppedColumnNames=[VERSION]}",

@ -96,11 +96,14 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
"DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}"));
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, 17.0], op=INSERT, meta=()}",
"DropTableEvent{tableId=%s.members}"));
}
@Test
@ -157,7 +160,8 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
false,
Arrays.asList(
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}"));
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null], op=INSERT, meta=()}"));
}
@Test
@ -183,10 +187,17 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}"));
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null, null, 17.0, null], op=INSERT, meta=()}"));
assertNotExists(
Collections.singletonList(
"Applied schema change event DropTableEvent{tableId=%s.members}"),
taskManagerConsumer);
}
@Test
@ -200,12 +211,15 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, null], op=INSERT, meta=()}"),
Collections.singletonList(
"Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members."));
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, null], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, 17.0, null], op=INSERT, meta=()}"),
Arrays.asList(
"Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members.",
"Ignored schema change DropTableEvent{tableId=%s.members} to table %s.members."));
}
@Test
@ -337,6 +351,13 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
// triggers DropColumnEvent
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
// triggers TruncateTableEvent
stmt.execute("TRUNCATE TABLE members;");
stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
// triggers DropTableEvent
stmt.execute("DROP TABLE members;");
}
List<String> expectedTmEvents =
@ -387,6 +408,15 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
}
}
private void assertNotExists(List<String> unexpectedEvents, ToStringConsumer consumer) {
String consumerLog = consumer.toUtf8String();
for (String event : unexpectedEvents) {
Assert.assertFalse(
consumerLog.contains(
String.format(event, schemaEvolveDatabase.getDatabaseName())));
}
}
private void waitUntilSpecificEvent(String event, ToStringConsumer consumer) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT;

@ -21,10 +21,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
@ -47,6 +47,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -111,37 +112,43 @@ public class SchemaDerivation {
// multiple source mapping (merging tables)
Schema derivedTableSchema =
schemaManager.getLatestEvolvedSchema(derivedTable).get();
if (schemaChangeEvent instanceof CreateTableEvent) {
events.addAll(
handleCreateTableEvent(
(CreateTableEvent) schemaChangeEvent,
derivedTableSchema,
derivedTable));
} else if (schemaChangeEvent instanceof AddColumnEvent) {
events.addAll(
handleAddColumnEvent(
(AddColumnEvent) schemaChangeEvent,
derivedTableSchema,
derivedTable));
} else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
events.addAll(
handleAlterColumnTypeEvent(
(AlterColumnTypeEvent) schemaChangeEvent,
derivedTableSchema,
derivedTable));
} else if (schemaChangeEvent instanceof DropColumnEvent) {
// Do nothing: drop column event should not be sent to downstream
} else if (schemaChangeEvent instanceof RenameColumnEvent) {
events.addAll(
handleRenameColumnEvent(
(RenameColumnEvent) schemaChangeEvent,
derivedTableSchema,
derivedTable));
} else {
throw new IllegalStateException(
String.format(
"Unrecognized SchemaChangeEvent type: %s", schemaChangeEvent));
}
events.addAll(
Objects.requireNonNull(
SchemaChangeEventVisitor.visit(
schemaChangeEvent,
addColumnEvent ->
handleAddColumnEvent(
addColumnEvent,
derivedTableSchema,
derivedTable),
alterColumnTypeEvent ->
handleAlterColumnTypeEvent(
alterColumnTypeEvent,
derivedTableSchema,
derivedTable),
createTableEvent ->
handleCreateTableEvent(
createTableEvent,
derivedTableSchema,
derivedTable),
dropColumnEvent ->
Collections.emptyList(), // Column drop shouldn't be
// spread to route
// destination.
dropTableEvent ->
Collections.emptyList(), // Table drop shouldn't be
// spread to route
// destination.
renameColumnEvent ->
handleRenameColumnEvent(
renameColumnEvent,
derivedTableSchema,
derivedTable),
truncateTableEvent ->
Collections.emptyList() // // Table truncation
// shouldn't be spread to route
// destination.
)));
}
}

@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@ -175,14 +176,26 @@ public class SchemaRegistryRequestHandler implements Closeable {
LOG.info(
"Received schema change event request from table {}. Start to buffer requests for others.",
request.getTableId().toString());
if (request.getSchemaChangeEvent() instanceof CreateTableEvent
SchemaChangeEvent event = request.getSchemaChangeEvent();
if (event instanceof CreateTableEvent
&& schemaManager.originalSchemaExists(request.getTableId())) {
return CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(Collections.emptyList())));
}
schemaManager.applyOriginalSchemaChange(request.getSchemaChangeEvent());
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
derivedSchemaChangeEvents.forEach(
e -> {
if (e instanceof SchemaChangeEventWithPreSchema) {
SchemaChangeEventWithPreSchema pe = (SchemaChangeEventWithPreSchema) e;
if (!pe.hasPreSchema()) {
schemaManager
.getLatestEvolvedSchema(pe.tableId())
.ifPresent(pe::fillPreSchema);
}
}
});
CompletableFuture<CoordinationResponse> response =
CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
@ -415,6 +428,10 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
return events;
}
case DROP_TABLE:
// We don't drop any tables in Lenient mode.
LOG.info("A drop table event {} has been ignored in Lenient mode.", event);
return Collections.emptyList();
default:
return Collections.singletonList(event);
}

@ -24,9 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
@ -223,16 +225,18 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
preTransformProcessorMap.remove(createTableEvent.tableId());
event = cacheCreateTable(createTableEvent);
output.collect(new StreamRecord<>(cacheCreateTable(createTableEvent)));
} else if (event instanceof DropTableEvent) {
output.collect(new StreamRecord<>(event));
} else if (event instanceof TruncateTableEvent) {
output.collect(new StreamRecord<>(event));
} else if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
preTransformProcessorMap.remove(schemaChangeEvent.tableId());
event = cacheChangeSchema(schemaChangeEvent);
cacheChangeSchema(schemaChangeEvent);
output.collect(new StreamRecord<>(event));
} else if (event instanceof DataChangeEvent) {
DataChangeEvent dataChangeEvent = processDataChangeEvent(((DataChangeEvent) event));
output.collect(new StreamRecord<>(dataChangeEvent));
output.collect(new StreamRecord<>(processDataChangeEvent(((DataChangeEvent) event))));
}
}

@ -60,7 +60,9 @@ public class AlterColumnTypeEventSerializer extends TypeSerializerSingleton<Alte
@Override
public AlterColumnTypeEvent copy(AlterColumnTypeEvent from) {
return new AlterColumnTypeEvent(
from.tableId(), typeMapSerializer.copy(from.getTypeMapping()));
from.tableId(),
typeMapSerializer.copy(from.getTypeMapping()),
typeMapSerializer.copy(from.getOldTypeMapping()));
}
@Override
@ -77,12 +79,15 @@ public class AlterColumnTypeEventSerializer extends TypeSerializerSingleton<Alte
public void serialize(AlterColumnTypeEvent record, DataOutputView target) throws IOException {
tableIdSerializer.serialize(record.tableId(), target);
typeMapSerializer.serialize(record.getTypeMapping(), target);
typeMapSerializer.serialize(record.getOldTypeMapping(), target);
}
@Override
public AlterColumnTypeEvent deserialize(DataInputView source) throws IOException {
return new AlterColumnTypeEvent(
tableIdSerializer.deserialize(source), typeMapSerializer.deserialize(source));
tableIdSerializer.deserialize(source),
typeMapSerializer.deserialize(source),
typeMapSerializer.deserialize(source));
}
@Override

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.event;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.runtime.serializer.MapSerializer;
import org.apache.flink.cdc.runtime.serializer.StringSerializer;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
import org.apache.flink.cdc.runtime.serializer.schema.DataTypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
/** A {@link TypeSerializer} for {@link DropTableEvent}. */
public class DropTableEventSerializer extends TypeSerializerSingleton<DropTableEvent> {
private static final long serialVersionUID = 1L;
/** Sharable instance of the TableIdSerializer. */
public static final DropTableEventSerializer INSTANCE = new DropTableEventSerializer();
private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
private final MapSerializer<String, DataType> typeMapSerializer =
new MapSerializer<>(StringSerializer.INSTANCE, new DataTypeSerializer());
@Override
public boolean isImmutableType() {
return false;
}
@Override
public DropTableEvent createInstance() {
return new DropTableEvent(TableId.tableId("unknown"));
}
@Override
public DropTableEvent copy(DropTableEvent from) {
return new DropTableEvent(from.tableId());
}
@Override
public DropTableEvent copy(DropTableEvent from, DropTableEvent reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(DropTableEvent record, DataOutputView target) throws IOException {
tableIdSerializer.serialize(record.tableId(), target);
}
@Override
public DropTableEvent deserialize(DataInputView source) throws IOException {
return new DropTableEvent(tableIdSerializer.deserialize(source));
}
@Override
public DropTableEvent deserialize(DropTableEvent reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
serialize(deserialize(source), target);
}
@Override
public TypeSerializerSnapshot<DropTableEvent> snapshotConfiguration() {
return new DropTableEventSerializer.DropTableEventSerializerSnapshot();
}
/** Serializer configuration snapshot for compatibility and format evolution. */
@SuppressWarnings("WeakerAccess")
public static final class DropTableEventSerializerSnapshot
extends SimpleTypeSerializerSnapshot<DropTableEvent> {
public DropTableEventSerializerSnapshot() {
super(() -> INSTANCE);
}
}
}

@ -20,14 +20,10 @@ package org.apache.flink.cdc.runtime.serializer.event;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.runtime.serializer.EnumSerializer;
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
@ -35,6 +31,14 @@ import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
/** A {@link TypeSerializer} for {@link SchemaChangeEvent}. */
public final class SchemaChangeEventSerializer extends TypeSerializerSingleton<SchemaChangeEvent> {
@ -55,32 +59,28 @@ public final class SchemaChangeEventSerializer extends TypeSerializerSingleton<S
public SchemaChangeEvent createInstance() {
return new SchemaChangeEvent() {
@Override
public TableId tableId() {
return TableId.tableId("unknown", "unknown", "unknown");
public SchemaChangeEventType getType() {
return null;
}
@Override
public SchemaChangeEventType getType() {
return null;
public TableId tableId() {
return TableId.tableId("unknown", "unknown", "unknown");
}
};
}
@Override
public SchemaChangeEvent copy(SchemaChangeEvent from) {
if (from instanceof AlterColumnTypeEvent) {
return AlterColumnTypeEventSerializer.INSTANCE.copy((AlterColumnTypeEvent) from);
} else if (from instanceof CreateTableEvent) {
return CreateTableEventSerializer.INSTANCE.copy((CreateTableEvent) from);
} else if (from instanceof RenameColumnEvent) {
return RenameColumnEventSerializer.INSTANCE.copy((RenameColumnEvent) from);
} else if (from instanceof AddColumnEvent) {
return AddColumnEventSerializer.INSTANCE.copy((AddColumnEvent) from);
} else if (from instanceof DropColumnEvent) {
return DropColumnEventSerializer.INSTANCE.copy((DropColumnEvent) from);
} else {
throw new IllegalArgumentException("Unknown schema change event: " + from);
}
return SchemaChangeEventVisitor.visit(
from,
AddColumnEventSerializer.INSTANCE::copy,
AlterColumnTypeEventSerializer.INSTANCE::copy,
CreateTableEventSerializer.INSTANCE::copy,
DropColumnEventSerializer.INSTANCE::copy,
DropTableEventSerializer.INSTANCE::copy,
RenameColumnEventSerializer.INSTANCE::copy,
TruncateTableEventSerializer.INSTANCE::copy);
}
@Override
@ -95,25 +95,44 @@ public final class SchemaChangeEventSerializer extends TypeSerializerSingleton<S
@Override
public void serialize(SchemaChangeEvent record, DataOutputView target) throws IOException {
if (record instanceof AlterColumnTypeEvent) {
enumSerializer.serialize(SchemaChangeEventType.ALTER_COLUMN_TYPE, target);
AlterColumnTypeEventSerializer.INSTANCE.serialize(
(AlterColumnTypeEvent) record, target);
} else if (record instanceof CreateTableEvent) {
enumSerializer.serialize(SchemaChangeEventType.CREATE_TABLE, target);
CreateTableEventSerializer.INSTANCE.serialize((CreateTableEvent) record, target);
} else if (record instanceof RenameColumnEvent) {
enumSerializer.serialize(SchemaChangeEventType.RENAME_COLUMN, target);
RenameColumnEventSerializer.INSTANCE.serialize((RenameColumnEvent) record, target);
} else if (record instanceof AddColumnEvent) {
enumSerializer.serialize(SchemaChangeEventType.ADD_COLUMN, target);
AddColumnEventSerializer.INSTANCE.serialize((AddColumnEvent) record, target);
} else if (record instanceof DropColumnEvent) {
enumSerializer.serialize(SchemaChangeEventType.DROP_COLUMN, target);
DropColumnEventSerializer.INSTANCE.serialize((DropColumnEvent) record, target);
} else {
throw new IllegalArgumentException("Unknown schema change event: " + record);
}
SchemaChangeEventVisitor.<Void, IOException>visit(
record,
addColumnEvent -> {
enumSerializer.serialize(ADD_COLUMN, target);
AddColumnEventSerializer.INSTANCE.serialize(addColumnEvent, target);
return null;
},
alterColumnTypeEvent -> {
enumSerializer.serialize(ALTER_COLUMN_TYPE, target);
AlterColumnTypeEventSerializer.INSTANCE.serialize(alterColumnTypeEvent, target);
return null;
},
createTableEvent -> {
enumSerializer.serialize(CREATE_TABLE, target);
CreateTableEventSerializer.INSTANCE.serialize(createTableEvent, target);
return null;
},
dropColumnEvent -> {
enumSerializer.serialize(DROP_COLUMN, target);
DropColumnEventSerializer.INSTANCE.serialize(dropColumnEvent, target);
return null;
},
dropTableEvent -> {
enumSerializer.serialize(DROP_TABLE, target);
DropTableEventSerializer.INSTANCE.serialize(dropTableEvent, target);
return null;
},
renameColumnEvent -> {
enumSerializer.serialize(RENAME_COLUMN, target);
RenameColumnEventSerializer.INSTANCE.serialize(renameColumnEvent, target);
return null;
},
truncateTableEvent -> {
enumSerializer.serialize(TRUNCATE_TABLE, target);
TruncateTableEventSerializer.INSTANCE.serialize(truncateTableEvent, target);
return null;
});
}
@Override
@ -130,6 +149,10 @@ public final class SchemaChangeEventSerializer extends TypeSerializerSingleton<S
return RenameColumnEventSerializer.INSTANCE.deserialize(source);
case ALTER_COLUMN_TYPE:
return AlterColumnTypeEventSerializer.INSTANCE.deserialize(source);
case DROP_TABLE:
return DropTableEventSerializer.INSTANCE.deserialize(source);
case TRUNCATE_TABLE:
return TruncateTableEventSerializer.INSTANCE.deserialize(source);
default:
throw new IllegalArgumentException(
"Unknown schema change event class: " + schemaChangeEventType);

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.event;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.runtime.serializer.MapSerializer;
import org.apache.flink.cdc.runtime.serializer.StringSerializer;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
import org.apache.flink.cdc.runtime.serializer.schema.DataTypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
/** A {@link TypeSerializer} for {@link TruncateTableEvent}. */
public class TruncateTableEventSerializer extends TypeSerializerSingleton<TruncateTableEvent> {
private static final long serialVersionUID = 1L;
/** Sharable instance of the TableIdSerializer. */
public static final TruncateTableEventSerializer INSTANCE = new TruncateTableEventSerializer();
private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
private final MapSerializer<String, DataType> typeMapSerializer =
new MapSerializer<>(StringSerializer.INSTANCE, new DataTypeSerializer());
@Override
public boolean isImmutableType() {
return false;
}
@Override
public TruncateTableEvent createInstance() {
return new TruncateTableEvent(TableId.tableId("unknown"));
}
@Override
public TruncateTableEvent copy(TruncateTableEvent from) {
return new TruncateTableEvent(from.tableId());
}
@Override
public TruncateTableEvent copy(TruncateTableEvent from, TruncateTableEvent reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(TruncateTableEvent record, DataOutputView target) throws IOException {
tableIdSerializer.serialize(record.tableId(), target);
}
@Override
public TruncateTableEvent deserialize(DataInputView source) throws IOException {
return new TruncateTableEvent(tableIdSerializer.deserialize(source));
}
@Override
public TruncateTableEvent deserialize(TruncateTableEvent reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
serialize(deserialize(source), target);
}
@Override
public TypeSerializerSnapshot<TruncateTableEvent> snapshotConfiguration() {
return new TruncateTableEventSerializer.TruncateTableEventSerializerSnapshot();
}
/** Serializer configuration snapshot for compatibility and format evolution. */
@SuppressWarnings("WeakerAccess")
public static final class TruncateTableEventSerializerSnapshot
extends SimpleTypeSerializerSnapshot<TruncateTableEvent> {
public TruncateTableEventSerializerSnapshot() {
super(() -> INSTANCE);
}
}
}

@ -249,7 +249,9 @@ public class SchemaEvolveTest {
List<Event> alterColumnTypeEvents =
Arrays.asList(
new AlterColumnTypeEvent(
tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)),
tableId,
ImmutableMap.of("score", BIGINT, "toshi", FLOAT),
ImmutableMap.of("score", INT, "toshi", SMALLINT)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@ -504,7 +506,9 @@ public class SchemaEvolveTest {
List<Event> alterColumnTypeEvents =
Arrays.asList(
new AlterColumnTypeEvent(
tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)),
tableId,
ImmutableMap.of("score", BIGINT, "toshi", FLOAT),
ImmutableMap.of("score", INT, "toshi", SMALLINT)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@ -1828,7 +1832,9 @@ public class SchemaEvolveTest {
Column.physicalColumn(
"toshi", SMALLINT, null)))),
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("name", STRING)),
tableId,
Collections.singletonMap("name", STRING),
Collections.singletonMap("name", STRING.notNull())),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@ -1906,7 +1912,9 @@ public class SchemaEvolveTest {
List<Event> alterColumnTypeEvents =
Arrays.asList(
new AlterColumnTypeEvent(
tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)),
tableId,
ImmutableMap.of("score", BIGINT, "toshi", FLOAT),
ImmutableMap.of("score", INT, "toshi", SMALLINT)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@ -1923,7 +1931,9 @@ public class SchemaEvolveTest {
List<Event> lenientAlterColumnTypeEvents =
Arrays.asList(
new AlterColumnTypeEvent(
tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)),
tableId,
ImmutableMap.of("score", BIGINT, "toshi", FLOAT),
ImmutableMap.of("score", INT, "toshi", SMALLINT)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@ -2106,7 +2116,9 @@ public class SchemaEvolveTest {
List<Event> lenientDropColumnEvents =
Arrays.asList(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("name", STRING)),
tableId,
Collections.singletonMap("name", STRING),
Collections.singletonMap("name", STRING.notNull())),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@ -2311,7 +2323,9 @@ public class SchemaEvolveTest {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("yina", INT)))),
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("iina", INT)),
tableId,
Collections.singletonMap("iina", INT),
Collections.singletonMap("iina", INT.notNull())),
DataChangeEvent.insertEvent(
tableId,
buildRecord(

@ -49,10 +49,17 @@ public class AlterColumnTypeEventSerializerTest extends SerializerTestBase<Alter
Map<String, DataType> map = new HashMap<>();
map.put("col1", DataTypes.BYTES());
map.put("col2", DataTypes.TIME());
Map<String, DataType> oldMap = new HashMap<>();
oldMap.put("col1", DataTypes.TIME());
oldMap.put("col2", DataTypes.BYTES());
return new AlterColumnTypeEvent[] {
new AlterColumnTypeEvent(TableId.tableId("table"), map),
new AlterColumnTypeEvent(TableId.tableId("schema", "table"), map),
new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), map)
new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), map),
new AlterColumnTypeEvent(TableId.tableId("table"), map, oldMap),
new AlterColumnTypeEvent(TableId.tableId("schema", "table"), map, oldMap),
new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), map, oldMap)
};
}
}

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.event;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
/** A test for the {@link RenameTableEventSerializer}. */
public class DropTableEventSerializerTest extends SerializerTestBase<DropTableEvent> {
@Override
protected TypeSerializer<DropTableEvent> createSerializer() {
return DropTableEventSerializer.INSTANCE;
}
@Override
protected int getLength() {
return -1;
}
@Override
protected Class<DropTableEvent> getTypeClass() {
return DropTableEvent.class;
}
@Override
protected DropTableEvent[] getTestData() {
return new DropTableEvent[] {
new DropTableEvent(TableId.tableId("table")),
new DropTableEvent(TableId.tableId("schema", "table")),
new DropTableEvent(TableId.tableId("namespace", "schema", "table"))
};
}
}

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.event;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
/** A test for the {@link RenameTableEventSerializer}. */
public class TruncateTableEventSerializerTest extends SerializerTestBase<TruncateTableEvent> {
@Override
protected TypeSerializer<TruncateTableEvent> createSerializer() {
return TruncateTableEventSerializer.INSTANCE;
}
@Override
protected int getLength() {
return -1;
}
@Override
protected Class<TruncateTableEvent> getTypeClass() {
return TruncateTableEvent.class;
}
@Override
protected TruncateTableEvent[] getTestData() {
return new TruncateTableEvent[] {
new TruncateTableEvent(TableId.tableId("table")),
new TruncateTableEvent(TableId.tableId("schema", "table")),
new TruncateTableEvent(TableId.tableId("namespace", "schema", "table"))
};
}
}
Loading…
Cancel
Save