[3.0][cdc-common] Add SchemaChangeEvent implementations (#2664)

pull/2674/head
Kunni 1 year ago committed by GitHub
parent 1329a48bcc
commit c52ccfa5c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,163 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.event;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.schema.Column;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
/**
* A {@link SchemaChangeEvent} that represents an {@code ADD COLUMN} DDL, which may contain the
* lenient column type changes.
*/
@PublicEvolving
public final class AddColumnEvent implements SchemaChangeEvent {
private static final long serialVersionUID = 1L;
private final TableId tableId;
private final List<ColumnWithPosition> addedColumns;
public AddColumnEvent(TableId tableId, List<ColumnWithPosition> addedColumns) {
this.tableId = tableId;
this.addedColumns = addedColumns;
}
/** Returns the added columns. */
public List<ColumnWithPosition> getAddedColumns() {
return addedColumns;
}
/** relative Position of column. */
public enum ColumnPosition implements Serializable {
BEFORE,
AFTER,
FIRST,
LAST
}
/** represents result of an ADD COLUMN DDL that may change column sequence. */
public static class ColumnWithPosition implements Serializable {
/** The added column. */
private final Column addColumn;
/** The position of the added column. */
private final ColumnPosition position;
/** The added column lies in the position relative to this column. */
private final @Nullable Column existingColumn;
/** In the default scenario, we add fields at the end of the column. */
public ColumnWithPosition(Column addColumn) {
this.addColumn = addColumn;
position = ColumnPosition.LAST;
existingColumn = null;
}
public ColumnWithPosition(
Column addColumn, ColumnPosition position, Column existingColumn) {
this.addColumn = addColumn;
this.position = position;
this.existingColumn = existingColumn;
}
public Column getAddColumn() {
return addColumn;
}
public ColumnPosition getPosition() {
return position;
}
public Column getExistingColumn() {
return existingColumn;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
ColumnWithPosition that = (ColumnWithPosition) o;
return Objects.equals(addColumn, that.addColumn)
&& Objects.equals(position, that.position)
&& Objects.equals(existingColumn, that.existingColumn);
}
@Override
public int hashCode() {
return Objects.hash(addColumn, position, existingColumn);
}
@Override
public String toString() {
return "ColumnWithPosition{"
+ "column="
+ addColumn
+ ", position="
+ position
+ ", existingColumn="
+ existingColumn
+ '}';
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
AddColumnEvent that = (AddColumnEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(addedColumns, that.addedColumns);
}
@Override
public int hashCode() {
return Objects.hash(tableId, addedColumns);
}
@Override
public String toString() {
return "AddColumnEvent{" + "tableId=" + tableId + ", addedColumns=" + addedColumns + '}';
}
@Override
public TableId tableId() {
return tableId;
}
}

@ -0,0 +1,82 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.event;
import com.ververica.cdc.common.types.DataType;
import java.util.Map;
import java.util.Objects;
/**
* A {@link SchemaChangeEvent} that represents an {@code ALTER COLUMN} DDL, which may contain the
* lenient column type changes.
*/
public class AlterColumnTypeEvent implements SchemaChangeEvent {
private static final long serialVersionUID = 1L;
private final TableId tableId;
/** key => column name, value => column type after changing. */
private final Map<String, DataType> typeMapping;
public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
}
/** Returns the type mapping. */
public Map<String, DataType> getTypeMapping() {
return typeMapping;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
AlterColumnTypeEvent that = (AlterColumnTypeEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(typeMapping, that.typeMapping);
}
@Override
public int hashCode() {
return Objects.hash(tableId, typeMapping);
}
@Override
public String toString() {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", nameMapping="
+ typeMapping
+ '}';
}
@Override
public TableId tableId() {
return tableId;
}
}

@ -0,0 +1,78 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.event;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.source.DataSource;
import java.util.Objects;
/**
* A {@link SchemaChangeEvent} that represents an {@code CREATE TABLE} DDL. this will be sent by
* {@link DataSource} before all {@link DataChangeEvent} with the same tableId
*/
@PublicEvolving
public class CreateTableEvent implements SchemaChangeEvent {
private static final long serialVersionUID = 1L;
private final TableId tableId;
private final Schema schema;
public CreateTableEvent(TableId tableId, Schema schema) {
this.tableId = tableId;
this.schema = schema;
}
/** Returns the table schema. */
public Schema getSchema() {
return schema;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
CreateTableEvent that = (CreateTableEvent) o;
return Objects.equals(tableId, that.tableId) && Objects.equals(schema, that.schema);
}
@Override
public int hashCode() {
return Objects.hash(tableId, schema);
}
@Override
public String toString() {
return "CreateTableEvent{" + "tableId=" + tableId + ", schema=" + schema + '}';
}
@Override
public TableId tableId() {
return tableId;
}
}

@ -0,0 +1,84 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.event;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.schema.Column;
import java.util.List;
import java.util.Objects;
/**
* A {@link SchemaChangeEvent} that represents an {@code DROP COLUMN} DDL, which may contain the
* lenient column type changes.
*/
@PublicEvolving
public class DropColumnEvent implements SchemaChangeEvent {
private static final long serialVersionUID = 1L;
private final TableId tableId;
private final List<Column> droppedColumns;
public DropColumnEvent(TableId tableId, List<Column> droppedColumns) {
this.tableId = tableId;
this.droppedColumns = droppedColumns;
}
/** Returns the dropped columns. */
public List<Column> getDroppedColumns() {
return droppedColumns;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
DropColumnEvent that = (DropColumnEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(droppedColumns, that.droppedColumns);
}
@Override
public int hashCode() {
return Objects.hash(tableId, droppedColumns);
}
@Override
public String toString() {
return "DropColumnEvent{"
+ "tableId="
+ tableId
+ ", droppedColumns="
+ droppedColumns
+ '}';
}
@Override
public TableId tableId() {
return tableId;
}
}

@ -0,0 +1,75 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.event;
import java.util.Map;
import java.util.Objects;
/**
* A {@link SchemaChangeEvent} that represents an {@code RENAME COLUMN} DDL, which may contain the
* lenient column type changes.
*/
public class RenameColumnEvent implements SchemaChangeEvent {
private static final long serialVersionUID = 1L;
private final TableId tableId;
/** key => column name before changing, value => column name after changing. */
private final Map<String, String> nameMapping;
public RenameColumnEvent(TableId tableId, Map<String, String> nameMapping) {
this.tableId = tableId;
this.nameMapping = nameMapping;
}
/** Returns the name mapping. */
public Map<String, String> getNameMapping() {
return nameMapping;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
RenameColumnEvent that = (RenameColumnEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(nameMapping, that.nameMapping);
}
@Override
public int hashCode() {
return Objects.hash(tableId, nameMapping);
}
@Override
public String toString() {
return "RenameColumnEvent{" + "tableId=" + tableId + ", nameMapping=" + nameMapping + '}';
}
@Override
public TableId tableId() {
return tableId;
}
}

@ -18,9 +18,11 @@ package com.ververica.cdc.common.event;
import org.apache.flink.annotation.PublicEvolving;
import java.io.Serializable;
/**
* Class {@code SchemaChangeEvent} represents the changes in the table structure of the external
* system, such as CREATE, DROP, RENAME and so on.
*/
@PublicEvolving
public interface SchemaChangeEvent extends ChangeEvent {}
public interface SchemaChangeEvent extends ChangeEvent, Serializable {}

@ -20,6 +20,7 @@ import org.apache.flink.annotation.PublicEvolving;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Objects;
/**
@ -43,7 +44,7 @@ import java.util.Objects;
* </ul>
*/
@PublicEvolving
public class TableId {
public class TableId implements Serializable {
@Nullable private final String namespace;
@Nullable private final String schemaName;

Loading…
Cancel
Save