@ -18,6 +18,7 @@ package com.ververica.cdc.common.event;
import org.apache.flink.annotation.PublicEvolving ;
import java.io.Serializable ;
import java.util.Map ;
/ * *
@ -25,17 +26,108 @@ import java.util.Map;
* INSERT , UPDATE , DELETE and so on .
* /
@PublicEvolving
public interface DataChangeEvent extends ChangeEvent {
public class DataChangeEvent implements ChangeEvent , Serializable {
private static final long serialVersionUID = 1L ;
private DataChangeEvent (
TableId tableId ,
RecordData before ,
RecordData after ,
OperationType op ,
Map < String , String > meta ) {
this . tableId = tableId ;
this . before = before ;
this . after = after ;
this . op = op ;
this . meta = meta ;
}
private final TableId tableId ;
/** Describes the record of data before change. */
RecordData before ( ) ;
private final RecordData before ;
/** Describes the record of data after change. */
RecordData after ( ) ;
private final RecordData after ;
/** Describes the operation type of the change event. e.g. INSERT, UPDATE, REPLACE, DELETE. */
OperationType op ( ) ;
private final OperationType op ;
/** Optional, describes the metadata of the change event. e.g. MySQL binlog file name, pos. */
Map < String , String > meta ( ) ;
private final Map < String , String > meta ;
@Override
public TableId tableId ( ) {
return tableId ;
}
public RecordData before ( ) {
return before ;
}
public RecordData after ( ) {
return after ;
}
public OperationType op ( ) {
return op ;
}
public Map < String , String > meta ( ) {
return meta ;
}
/** Creates a {@link DataChangeEvent} instance that describes the insert event. */
public static DataChangeEvent insertEvent ( TableId tableId , RecordData after ) {
return new DataChangeEvent ( tableId , null , after , OperationType . INSERT , null ) ;
}
/ * *
* Creates a { @link DataChangeEvent } instance that describes the insert event with meta info .
* /
public static DataChangeEvent insertEvent (
TableId tableId , RecordData after , Map < String , String > meta ) {
return new DataChangeEvent ( tableId , null , after , OperationType . INSERT , meta ) ;
}
/** Creates a {@link DataChangeEvent} instance that describes the delete event. */
public static DataChangeEvent deleteEvent ( TableId tableId , RecordData before ) {
return new DataChangeEvent ( tableId , before , null , OperationType . DELETE , null ) ;
}
/ * *
* Creates a { @link DataChangeEvent } instance that describes the delete event with meta info .
* /
public static DataChangeEvent deleteEvent (
TableId tableId , RecordData before , Map < String , String > meta ) {
return new DataChangeEvent ( tableId , before , null , OperationType . DELETE , meta ) ;
}
/** Creates a {@link DataChangeEvent} instance that describes the update event. */
public static DataChangeEvent updateEvent (
TableId tableId , RecordData before , RecordData after ) {
return new DataChangeEvent ( tableId , before , after , OperationType . UPDATE , null ) ;
}
/ * *
* Creates a { @link DataChangeEvent } instance that describes the update event with meta info .
* /
public static DataChangeEvent updateEvent (
TableId tableId , RecordData before , RecordData after , Map < String , String > meta ) {
return new DataChangeEvent ( tableId , before , after , OperationType . UPDATE , meta ) ;
}
/** Creates a {@link DataChangeEvent} instance that describes the replace event. */
public static DataChangeEvent replaceEvent ( TableId tableId , RecordData after ) {
return new DataChangeEvent ( tableId , null , after , OperationType . REPLACE , null ) ;
}
/ * *
* Creates a { @link DataChangeEvent } instance that describes the replace event with meta info .
* /
public static DataChangeEvent replaceEvent (
TableId tableId , RecordData after , Map < String , String > meta ) {
return new DataChangeEvent ( tableId , null , after , OperationType . REPLACE , meta ) ;
}
}