@ -1241,7 +1241,7 @@ class FlinkPipelineComposerITCase {
assertThat ( mergedTableSchema )
. isEqualTo (
Schema . newBuilder ( )
. physicalColumn ( "id" , DataTypes . BIGINT ( ) )
. physicalColumn ( "id" , DataTypes . BIGINT ( ) .notNull ( ) )
. physicalColumn ( "name" , DataTypes . STRING ( ) )
. physicalColumn ( "age" , DataTypes . INT ( ) )
. physicalColumn ( "last_name" , DataTypes . STRING ( ) )
@ -1252,9 +1252,9 @@ class FlinkPipelineComposerITCase {
String [ ] outputEvents = outCaptor . toString ( ) . trim ( ) . split ( "\n" ) ;
assertThat ( outputEvents )
. containsExactly (
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT ,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT NOT NULL ,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}" ,
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT }, oldTypeMapping={id=INT}}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL }, oldTypeMapping={id=INT NOT NULL }}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name, null], after=[2, Bob, 30, last_name, null], op=UPDATE, meta=()}" ,