diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 561097286..6364b6981 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1979,7 +1979,7 @@ class FlinkPipelineTransformITCase { + "2147483648 AS greater_than_int_max, " + "-2147483648 AS int_min, " + "-2147483649 AS less_than_int_min, " - + "CAST(1234567890123456789 AS DECIMAL(20, 0)) AS really_big_decimal", + + "CAST(1234567890123456789 AS DECIMAL(19, 0)) AS really_big_decimal", "CAST(id AS BIGINT) + 2147483648 > 2147483649", // equivalent to id > 1 null, null, diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index f3161105e..03edd9069 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -841,7 +841,7 @@ public class FlinkPipelineUdfITCase { @ParameterizedTest @MethodSource("testParams") @Disabled("For manual test as there is a limit for quota.") - void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception { + void testTransformWithModel(ValuesDataSink.SinkApi sinkApi, String language) throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); // Setup value source @@ -909,6 +909,112 @@ public class FlinkPipelineUdfITCase { .hasSize(9); } + @ParameterizedTest + @MethodSource("testParams") + void testComplicatedUdfReturnTypes(ValuesDataSink.SinkApi sinkApi, String language) + throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + TransformDef transformDef = + new TransformDef( + "default_namespace.default_schema.table1", + "*, get_char() AS char_col, get_varchar() AS varchar_col, get_binary() AS binary_col, get_varbinary() AS varbinary_col, get_ts() AS ts_col, get_ts_ltz() AS ts_ltz_col, get_decimal() AS decimal_col, get_non_null() AS non_null_col", + null, + "col1", + null, + "key1=value1", + "", + null); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles"); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList(transformDef), + Arrays.asList( + new UdfDef( + "get_char", + String.format( + "org.apache.flink.cdc.udf.examples.%s.precision.CharTypeReturningClass", + language)), + new UdfDef( + "get_varchar", + String.format( + "org.apache.flink.cdc.udf.examples.%s.precision.VarCharTypeReturningClass", + language)), + new UdfDef( + "get_binary", + String.format( + "org.apache.flink.cdc.udf.examples.%s.precision.BinaryTypeReturningClass", + language)), + new UdfDef( + "get_varbinary", + String.format( + "org.apache.flink.cdc.udf.examples.%s.precision.VarBinaryTypeReturningClass", + language)), + new UdfDef( + "get_ts", + String.format( + "org.apache.flink.cdc.udf.examples.%s.precision.TimestampTypeReturningClass", + language)), + new UdfDef( + "get_ts_ltz", + String.format( + "org.apache.flink.cdc.udf.examples.%s.precision.LocalZonedTimestampTypeReturningClass", + language)), + new UdfDef( + "get_decimal", + String.format( + "org.apache.flink.cdc.udf.examples.%s.precision.DecimalTypeReturningClass", + language)), + new UdfDef( + "get_non_null", + String.format( + "org.apache.flink.cdc.udf.examples.%s.precision.DecimalTypeNonNullReturningClass", + language))), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .contains( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`char_col` STRING,`varchar_col` STRING,`binary_col` BINARY(17),`varbinary_col` VARBINARY(17),`ts_col` TIMESTAMP(2),`ts_ltz_col` TIMESTAMP_LTZ(2),`decimal_col` DECIMAL(10, 3),`non_null_col` DECIMAL(10, 3)}, primaryKeys=col1, options=({key1=value1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", + "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", + "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], after=[2, x, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=UPDATE, meta=()}"); + } + private static Stream testParams() { return Stream.of( arguments(ValuesDataSink.SinkApi.SINK_FUNCTION, "java"), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java index 1d93899f9..d1941c3eb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java @@ -67,10 +67,13 @@ public class ValuesDataSinkHelper { return fields.stream() .map( o -> { + if (o == null) { + return "null"; + } if (o instanceof byte[]) { return BaseEncoding.base64().encode((byte[]) o); } else { - return o; + return o.toString(); } }) .collect(Collectors.toList()); diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/BinaryTypeReturningClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/BinaryTypeReturningClass.java new file mode 100644 index 000000000..b1ab5e6ae --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/BinaryTypeReturningClass.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java.precision; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +/** This is an example UDF class for testing purposes only. */ +public class BinaryTypeReturningClass implements UserDefinedFunction { + @Override + public DataType getReturnType() { + return DataTypes.BINARY(17); + } + + public byte[] eval() { + return "xyzzy".getBytes(); + } +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/CharTypeReturningClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/CharTypeReturningClass.java new file mode 100644 index 000000000..df3994899 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/CharTypeReturningClass.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java.precision; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +/** This is an example UDF class for testing purposes only. */ +public class CharTypeReturningClass implements UserDefinedFunction { + + @Override + public DataType getReturnType() { + return DataTypes.CHAR(17); + } + + public String eval() { + return "This is a string."; + } +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeNonNullReturningClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeNonNullReturningClass.java new file mode 100644 index 000000000..6b27ef24e --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeNonNullReturningClass.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java.precision; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +import java.math.BigDecimal; + +/** This is an example UDF class for testing purposes only. */ +public class DecimalTypeNonNullReturningClass implements UserDefinedFunction { + @Override + public DataType getReturnType() { + return DataTypes.DECIMAL(10, 3).notNull(); + } + + public DecimalData eval() { + return DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3); + } +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeReturningClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeReturningClass.java new file mode 100644 index 000000000..8e5bcd39a --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeReturningClass.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java.precision; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +import java.math.BigDecimal; + +/** This is an example UDF class for testing purposes only. */ +public class DecimalTypeReturningClass implements UserDefinedFunction { + @Override + public DataType getReturnType() { + return DataTypes.DECIMAL(10, 3); + } + + public DecimalData eval() { + return DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3); + } +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/LocalZonedTimestampTypeReturningClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/LocalZonedTimestampTypeReturningClass.java new file mode 100644 index 000000000..9231c12e2 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/LocalZonedTimestampTypeReturningClass.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java.precision; + +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +/** This is an example UDF class for testing purposes only. */ +public class LocalZonedTimestampTypeReturningClass implements UserDefinedFunction { + @Override + public DataType getReturnType() { + return DataTypes.TIMESTAMP_LTZ(2); + } + + public LocalZonedTimestampData eval() { + return LocalZonedTimestampData.fromEpochMillis(24 * 60 * 60 * 1000); + } +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/TimestampTypeReturningClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/TimestampTypeReturningClass.java new file mode 100644 index 000000000..22826c2e8 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/TimestampTypeReturningClass.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java.precision; + +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +/** This is an example UDF class for testing purposes only. */ +public class TimestampTypeReturningClass implements UserDefinedFunction { + @Override + public DataType getReturnType() { + return DataTypes.TIMESTAMP(2); + } + + public TimestampData eval() { + return TimestampData.fromMillis(24 * 60 * 60 * 1000); + } +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarBinaryTypeReturningClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarBinaryTypeReturningClass.java new file mode 100644 index 000000000..fdb3d54aa --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarBinaryTypeReturningClass.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java.precision; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +/** This is an example UDF class for testing purposes only. */ +public class VarBinaryTypeReturningClass implements UserDefinedFunction { + @Override + public DataType getReturnType() { + return DataTypes.VARBINARY(17); + } + + public byte[] eval() { + return "xyzzy".getBytes(); + } +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarCharTypeReturningClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarCharTypeReturningClass.java new file mode 100644 index 000000000..0e15fe80d --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarCharTypeReturningClass.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java.precision; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +/** This is an example UDF class for testing purposes only. */ +public class VarCharTypeReturningClass implements UserDefinedFunction { + @Override + public DataType getReturnType() { + return DataTypes.VARCHAR(17); + } + + public String eval() { + return "This is a string."; + } +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/BinaryTypeReturningClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/BinaryTypeReturningClass.scala new file mode 100644 index 000000000..6f76d0899 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/BinaryTypeReturningClass.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.scala.precision + +import org.apache.flink.cdc.common.types.DataType +import org.apache.flink.cdc.common.types.DataTypes +import org.apache.flink.cdc.common.udf.UserDefinedFunction + +/** This is an example UDF class for testing purposes only. */ +class BinaryTypeReturningClass extends UserDefinedFunction { + override def getReturnType: DataType = DataTypes.BINARY(17) + def eval: Array[Byte] = "xyzzy".getBytes +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/CharTypeReturningClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/CharTypeReturningClass.scala new file mode 100644 index 000000000..dcb73862c --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/CharTypeReturningClass.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.scala.precision + +import org.apache.flink.cdc.common.types.DataType +import org.apache.flink.cdc.common.types.DataTypes +import org.apache.flink.cdc.common.udf.UserDefinedFunction + +/** This is an example UDF class for testing purposes only. */ +class CharTypeReturningClass extends UserDefinedFunction { + override def getReturnType: DataType = DataTypes.CHAR(17) + def eval = "This is a string." +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeNonNullReturningClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeNonNullReturningClass.scala new file mode 100644 index 000000000..2940ce200 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeNonNullReturningClass.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.scala.precision + +import org.apache.flink.cdc.common.data.DecimalData +import org.apache.flink.cdc.common.types.{DataType, DataTypes} +import org.apache.flink.cdc.common.udf.UserDefinedFunction + +import java.math.BigDecimal + +/** This is an example UDF class for testing purposes only. */ +class DecimalTypeNonNullReturningClass extends UserDefinedFunction { + override def getReturnType: DataType = DataTypes.DECIMAL(10, 3).notNull() + def eval: DecimalData = DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3) +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeReturningClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeReturningClass.scala new file mode 100644 index 000000000..46cbe0215 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeReturningClass.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.scala.precision + +import org.apache.flink.cdc.common.data.DecimalData +import org.apache.flink.cdc.common.types.DataType +import org.apache.flink.cdc.common.types.DataTypes +import org.apache.flink.cdc.common.udf.UserDefinedFunction + +import java.math.BigDecimal + +/** This is an example UDF class for testing purposes only. */ +class DecimalTypeReturningClass extends UserDefinedFunction { + override def getReturnType: DataType = DataTypes.DECIMAL(10, 3) + def eval: DecimalData = DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3) +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/LocalZonedTimestampTypeReturningClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/LocalZonedTimestampTypeReturningClass.scala new file mode 100644 index 000000000..dba554158 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/LocalZonedTimestampTypeReturningClass.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.scala.precision + +import org.apache.flink.cdc.common.data.LocalZonedTimestampData +import org.apache.flink.cdc.common.types.DataType +import org.apache.flink.cdc.common.types.DataTypes +import org.apache.flink.cdc.common.udf.UserDefinedFunction + +/** This is an example UDF class for testing purposes only. */ +class LocalZonedTimestampTypeReturningClass extends UserDefinedFunction { + override def getReturnType: DataType = DataTypes.TIMESTAMP_LTZ(2) + def eval: LocalZonedTimestampData = LocalZonedTimestampData.fromEpochMillis(24 * 60 * 60 * 1000) +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/TimestampTypeReturningClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/TimestampTypeReturningClass.scala new file mode 100644 index 000000000..958f23e6d --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/TimestampTypeReturningClass.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.scala.precision + +import org.apache.flink.cdc.common.data.TimestampData +import org.apache.flink.cdc.common.types.DataType +import org.apache.flink.cdc.common.types.DataTypes +import org.apache.flink.cdc.common.udf.UserDefinedFunction + +/** This is an example UDF class for testing purposes only. */ +class TimestampTypeReturningClass extends UserDefinedFunction { + override def getReturnType: DataType = DataTypes.TIMESTAMP(2) + def eval: TimestampData = TimestampData.fromMillis(24 * 60 * 60 * 1000) +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarBinaryTypeReturningClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarBinaryTypeReturningClass.scala new file mode 100644 index 000000000..1f9a376d7 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarBinaryTypeReturningClass.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.scala.precision + +import org.apache.flink.cdc.common.types.DataType +import org.apache.flink.cdc.common.types.DataTypes +import org.apache.flink.cdc.common.udf.UserDefinedFunction + +/** This is an example UDF class for testing purposes only. */ +class VarBinaryTypeReturningClass extends UserDefinedFunction { + override def getReturnType: DataType = DataTypes.VARBINARY(17) + def eval: Array[Byte] = "xyzzy".getBytes +} diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarCharTypeReturningClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarCharTypeReturningClass.scala new file mode 100644 index 000000000..ad8baaa29 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarCharTypeReturningClass.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.scala.precision + +import org.apache.flink.cdc.common.types.DataType +import org.apache.flink.cdc.common.types.DataTypes +import org.apache.flink.cdc.common.udf.UserDefinedFunction + +/** This is an example UDF class for testing purposes only. */ +class VarCharTypeReturningClass extends UserDefinedFunction { + override def getReturnType: DataType = DataTypes.VARCHAR(17) + def eval = "This is a string." +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index 4a2674478..0a062d3e8 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.runtime.functions; +import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.ZonedTimestampData; @@ -258,6 +259,14 @@ public class SystemFunctionUtils { return value.compareTo(minValue) >= 0 && value.compareTo(maxValue) <= 0; } + public static boolean betweenAsymmetric( + DecimalData value, DecimalData minValue, DecimalData maxValue) { + if (value == null) { + return false; + } + return value.compareTo(minValue) >= 0 && value.compareTo(maxValue) <= 0; + } + public static boolean notBetweenAsymmetric(String value, String minValue, String maxValue) { return !betweenAsymmetric(value, minValue, maxValue); } @@ -287,6 +296,11 @@ public class SystemFunctionUtils { return !betweenAsymmetric(value, minValue, maxValue); } + public static boolean notBetweenAsymmetric( + DecimalData value, DecimalData minValue, DecimalData maxValue) { + return !betweenAsymmetric(value, minValue, maxValue); + } + public static boolean in(String value, String... str) { return Arrays.stream(str).anyMatch(item -> value.equals(item)); } @@ -315,6 +329,10 @@ public class SystemFunctionUtils { return Arrays.stream(values).anyMatch(item -> value.equals(item)); } + public static boolean in(DecimalData value, DecimalData... values) { + return Arrays.stream(values).anyMatch(item -> value.equals(item)); + } + public static boolean notIn(String value, String... values) { return !in(value, values); } @@ -343,6 +361,10 @@ public class SystemFunctionUtils { return !in(value, values); } + public static boolean notIn(DecimalData value, DecimalData... values) { + return !in(value, values); + } + public static int charLength(String str) { return str.length(); } @@ -577,11 +599,27 @@ public class SystemFunctionUtils { return round(b0, 0); } + /** SQL ROUND operator applied to BigDecimal values. */ + public static DecimalData round(DecimalData b0) { + return round(b0, 0); + } + /** SQL ROUND operator applied to BigDecimal values. */ public static BigDecimal round(BigDecimal b0, int b1) { return b0.movePointRight(b1).setScale(0, RoundingMode.HALF_UP).movePointLeft(b1); } + /** SQL ROUND operator applied to DecimalData values. */ + public static DecimalData round(DecimalData b0, int b1) { + return DecimalData.fromBigDecimal( + b0.toBigDecimal() + .movePointRight(b1) + .setScale(0, RoundingMode.HALF_UP) + .movePointLeft(b1), + b0.precision(), + b0.scale()); + } + /** SQL ROUND operator applied to float values. */ public static float round(float b0) { return round(b0, 0); @@ -649,6 +687,8 @@ public class SystemFunctionUtils { return !object.equals(0d); } else if (object instanceof BigDecimal) { return ((BigDecimal) object).compareTo(BigDecimal.ZERO) != 0; + } else if (object instanceof DecimalData) { + return ((DecimalData) object).compareTo(DecimalData.zero(1, 0)) != 0; } return Boolean.valueOf(castToString(object)); } @@ -663,6 +703,9 @@ public class SystemFunctionUtils { if (object instanceof BigDecimal) { return ((BigDecimal) object).byteValue(); } + if (object instanceof DecimalData) { + return ((DecimalData) object).toBigDecimal().byteValue(); + } if (object instanceof Double) { return ((Double) object).byteValue(); } @@ -693,6 +736,9 @@ public class SystemFunctionUtils { if (object instanceof BigDecimal) { return ((BigDecimal) object).shortValue(); } + if (object instanceof DecimalData) { + return ((DecimalData) object).toBigDecimal().shortValue(); + } if (object instanceof Double) { return ((Double) object).shortValue(); } @@ -723,6 +769,9 @@ public class SystemFunctionUtils { if (object instanceof BigDecimal) { return ((BigDecimal) object).intValue(); } + if (object instanceof DecimalData) { + return ((DecimalData) object).toBigDecimal().intValue(); + } if (object instanceof Double) { return ((Double) object).intValue(); } @@ -753,6 +802,9 @@ public class SystemFunctionUtils { if (object instanceof BigDecimal) { return ((BigDecimal) object).longValue(); } + if (object instanceof DecimalData) { + return ((DecimalData) object).toBigDecimal().longValue(); + } if (object instanceof Double) { return ((Double) object).longValue(); } @@ -783,6 +835,9 @@ public class SystemFunctionUtils { if (object instanceof BigDecimal) { return ((BigDecimal) object).floatValue(); } + if (object instanceof DecimalData) { + return ((DecimalData) object).toBigDecimal().floatValue(); + } if (object instanceof Double) { return ((Double) object).floatValue(); } @@ -806,6 +861,9 @@ public class SystemFunctionUtils { if (object instanceof BigDecimal) { return ((BigDecimal) object).doubleValue(); } + if (object instanceof DecimalData) { + return ((DecimalData) object).toBigDecimal().doubleValue(); + } if (object instanceof Double) { return (Double) object; } @@ -843,6 +901,30 @@ public class SystemFunctionUtils { return bigDecimal; } + public static DecimalData castToDecimalData(Object object, int precision, int scale) { + if (object == null) { + return null; + } + if (object instanceof Boolean) { + object = (Boolean) object ? 1 : 0; + } + + BigDecimal bigDecimal; + try { + bigDecimal = new BigDecimal(castObjectIntoString(object), new MathContext(precision)); + bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP); + } catch (NumberFormatException ignored) { + return null; + } + + // If the precision overflows, null will be returned. Otherwise, we may accidentally emit a + // non-serializable object into the pipeline that breaks downstream. + if (bigDecimal.precision() > precision) { + return null; + } + return DecimalData.fromBigDecimal(bigDecimal, precision, scale); + } + public static TimestampData castToTimestamp(Object object, String timezone) { if (object == null) { return null; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index b122aded8..14cb79026 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -470,7 +470,7 @@ public class JaninoCompiler { return new Java.MethodInvocation( Location.NOWHERE, null, - "castToBigDecimal", + "castToDecimalData", newAtoms.toArray(new Java.Rvalue[0])); case "CHAR": case "VARCHAR": @@ -496,7 +496,7 @@ public class JaninoCompiler { return String.format( "(%s) __instanceOf%s.eval", DataTypeConverter.convertOriginalClass(udfFunction.getReturnTypeHint()) - .getName(), + .getCanonicalName(), udfFunction.getClassName()); } else { return String.format("__instanceOf%s.eval", udfFunction.getClassName()); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 4dfcd09a5..5c8417633 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -176,8 +176,8 @@ public class TransformParser { new HepPlanner(new HepProgramBuilder().build()), new RexBuilder(factory)), StandardConvertletTable.INSTANCE, - SqlToRelConverter.config().withTrimUnusedFields(false)); - RelRoot relRoot = sqlToRelConverter.convertQuery(validateSqlNode, false, true); + SqlToRelConverter.config().withTrimUnusedFields(true)); + RelRoot relRoot = sqlToRelConverter.convertQuery(validateSqlNode, false, false); return relRoot.rel; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformTable.java index 814ed74e4..34fcc8f57 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformTable.java @@ -23,9 +23,7 @@ import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.impl.AbstractTable; -import org.apache.calcite.util.Pair; -import java.util.ArrayList; import java.util.List; /** TransformTable to generate the metadata of calcite. */ @@ -46,14 +44,6 @@ public class TransformTable extends AbstractTable { @Override public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { - List names = new ArrayList<>(); - List types = new ArrayList<>(); - for (Column column : columns) { - names.add(column.getName()); - RelDataType sqlType = - DataTypeConverter.convertCalciteType(relDataTypeFactory, column.getType()); - types.add(sqlType); - } - return relDataTypeFactory.createStructType(Pair.zip(names, types)); + return DataTypeConverter.convertCalciteRelDataType(relDataTypeFactory, columns); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java index 8da220293..cd2542e7e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java @@ -27,14 +27,27 @@ import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.types.ArrayType; +import org.apache.flink.cdc.common.types.BigIntType; import org.apache.flink.cdc.common.types.BinaryType; +import org.apache.flink.cdc.common.types.BooleanType; +import org.apache.flink.cdc.common.types.CharType; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.DateType; import org.apache.flink.cdc.common.types.DecimalType; +import org.apache.flink.cdc.common.types.DoubleType; +import org.apache.flink.cdc.common.types.FloatType; +import org.apache.flink.cdc.common.types.IntType; +import org.apache.flink.cdc.common.types.LocalZonedTimestampType; import org.apache.flink.cdc.common.types.MapType; import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.common.types.SmallIntType; +import org.apache.flink.cdc.common.types.TimeType; import org.apache.flink.cdc.common.types.TimestampType; +import org.apache.flink.cdc.common.types.TinyIntType; import org.apache.flink.cdc.common.types.VarBinaryType; +import org.apache.flink.cdc.common.types.VarCharType; +import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -98,7 +111,7 @@ public class DataTypeConverter { case VARBINARY: return byte[].class; case DECIMAL: - return BigDecimal.class; + return DecimalData.class; case ROW: return Object.class; case ARRAY: @@ -110,6 +123,172 @@ public class DataTypeConverter { } } + public static RelDataType convertCalciteRelDataType( + RelDataTypeFactory typeFactory, List columns) { + RelDataTypeFactory.Builder fieldInfoBuilder = typeFactory.builder(); + for (Column column : columns) { + switch (column.getType().getTypeRoot()) { + case BOOLEAN: + BooleanType booleanType = (BooleanType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.BOOLEAN) + .nullable(booleanType.isNullable()); + break; + case TINYINT: + TinyIntType tinyIntType = (TinyIntType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.TINYINT) + .nullable(tinyIntType.isNullable()); + break; + case SMALLINT: + SmallIntType smallIntType = (SmallIntType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.SMALLINT) + .nullable(smallIntType.isNullable()); + break; + case INTEGER: + IntType intType = (IntType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.INTEGER) + .nullable(intType.isNullable()); + break; + case BIGINT: + BigIntType bigIntType = (BigIntType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.BIGINT) + .nullable(bigIntType.isNullable()); + break; + case DATE: + DateType dataType = (DateType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.DATE) + .nullable(dataType.isNullable()); + break; + case TIME_WITHOUT_TIME_ZONE: + TimeType timeType = (TimeType) column.getType(); + fieldInfoBuilder + .add( + column.getName(), + SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE, + timeType.getPrecision()) + .nullable(timeType.isNullable()); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType timestampType = (TimestampType) column.getType(); + fieldInfoBuilder + .add( + column.getName(), + SqlTypeName.TIMESTAMP, + timestampType.getPrecision()) + .nullable(timestampType.isNullable()); + break; + case TIMESTAMP_WITH_TIME_ZONE: + ZonedTimestampType zonedTimestampType = (ZonedTimestampType) column.getType(); + fieldInfoBuilder + .add( + column.getName(), + SqlTypeName.TIMESTAMP, + zonedTimestampType.getPrecision()) + .nullable(zonedTimestampType.isNullable()); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType localZonedTimestampType = + (LocalZonedTimestampType) column.getType(); + fieldInfoBuilder + .add( + column.getName(), + SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + localZonedTimestampType.getPrecision()) + .nullable(localZonedTimestampType.isNullable()); + break; + case FLOAT: + FloatType floatType = (FloatType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.FLOAT) + .nullable(floatType.isNullable()); + break; + case DOUBLE: + DoubleType doubleType = (DoubleType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.DOUBLE) + .nullable(doubleType.isNullable()); + break; + case CHAR: + CharType charType = (CharType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.CHAR, charType.getLength()) + .nullable(charType.isNullable()); + break; + case VARCHAR: + VarCharType varCharType = (VarCharType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.VARCHAR, varCharType.getLength()) + .nullable(varCharType.isNullable()); + break; + case BINARY: + BinaryType binaryType = (BinaryType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.BINARY, binaryType.getLength()) + .nullable(binaryType.isNullable()); + break; + case VARBINARY: + VarBinaryType varBinaryType = (VarBinaryType) column.getType(); + fieldInfoBuilder + .add(column.getName(), SqlTypeName.VARBINARY, varBinaryType.getLength()) + .nullable(varBinaryType.isNullable()); + break; + case DECIMAL: + DecimalType decimalType = (DecimalType) column.getType(); + fieldInfoBuilder + .add( + column.getName(), + SqlTypeName.DECIMAL, + decimalType.getPrecision(), + decimalType.getScale()) + .nullable(decimalType.isNullable()); + break; + case ROW: + List dataTypes = + ((RowType) column.getType()) + .getFieldTypes().stream() + .map((type) -> convertCalciteType(typeFactory, type)) + .collect(Collectors.toList()); + fieldInfoBuilder + .add( + column.getName(), + typeFactory.createStructType( + dataTypes, + ((RowType) column.getType()).getFieldNames())) + .nullable(true); + break; + case ARRAY: + DataType elementType = ((ArrayType) column.getType()).getElementType(); + fieldInfoBuilder + .add( + column.getName(), + typeFactory.createArrayType( + convertCalciteType(typeFactory, elementType), -1)) + .nullable(true); + break; + case MAP: + RelDataType keyType = + convertCalciteType( + typeFactory, ((MapType) column.getType()).getKeyType()); + RelDataType valueType = + convertCalciteType( + typeFactory, ((MapType) column.getType()).getValueType()); + fieldInfoBuilder + .add(column.getName(), typeFactory.createMapType(keyType, valueType)) + .nullable(true); + break; + default: + throw new UnsupportedOperationException( + "Unsupported type: " + column.getType()); + } + } + return fieldInfoBuilder.build(); + } + public static RelDataType convertCalciteType( RelDataTypeFactory typeFactory, DataType dataType) { switch (dataType.getTypeRoot()) { @@ -126,25 +305,43 @@ public class DataTypeConverter { case DATE: return typeFactory.createSqlType(SqlTypeName.DATE); case TIME_WITHOUT_TIME_ZONE: - return typeFactory.createSqlType(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE); + TimeType timeType = (TimeType) dataType; + return typeFactory.createSqlType( + SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE, timeType.getPrecision()); case TIMESTAMP_WITHOUT_TIME_ZONE: - return typeFactory.createSqlType(SqlTypeName.TIMESTAMP); + TimestampType timestampType = (TimestampType) dataType; + return typeFactory.createSqlType( + SqlTypeName.TIMESTAMP, timestampType.getPrecision()); + case TIMESTAMP_WITH_TIME_ZONE: + ZonedTimestampType zonedTimestampType = (ZonedTimestampType) dataType; + return typeFactory.createSqlType( + SqlTypeName.TIMESTAMP, zonedTimestampType.getPrecision()); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + LocalZonedTimestampType localZonedTimestampType = + (LocalZonedTimestampType) dataType; + return typeFactory.createSqlType( + SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + localZonedTimestampType.getPrecision()); case FLOAT: return typeFactory.createSqlType(SqlTypeName.FLOAT); case DOUBLE: return typeFactory.createSqlType(SqlTypeName.DOUBLE); case CHAR: - return typeFactory.createSqlType(SqlTypeName.CHAR); + CharType charType = (CharType) dataType; + return typeFactory.createSqlType(SqlTypeName.CHAR, charType.getLength()); case VARCHAR: - return typeFactory.createSqlType(SqlTypeName.VARCHAR); + VarCharType varCharType = (VarCharType) dataType; + return typeFactory.createSqlType(SqlTypeName.VARCHAR, varCharType.getLength()); case BINARY: - return typeFactory.createSqlType(SqlTypeName.BINARY); + BinaryType binaryType = (BinaryType) dataType; + return typeFactory.createSqlType(SqlTypeName.BINARY, binaryType.getLength()); case VARBINARY: - return typeFactory.createSqlType(SqlTypeName.VARBINARY); + VarBinaryType varBinaryType = (VarBinaryType) dataType; + return typeFactory.createSqlType(SqlTypeName.VARBINARY, varBinaryType.getLength()); case DECIMAL: - return typeFactory.createSqlType(SqlTypeName.DECIMAL); + DecimalType decimalType = (DecimalType) dataType; + return typeFactory.createSqlType( + SqlTypeName.DECIMAL, decimalType.getPrecision(), decimalType.getScale()); case ROW: List dataTypes = ((RowType) dataType) @@ -197,9 +394,9 @@ public class DataTypeConverter { case VARCHAR: return DataTypes.STRING(); case BINARY: - return DataTypes.BINARY(BinaryType.MAX_LENGTH); + return DataTypes.BINARY(relDataType.getPrecision()); case VARBINARY: - return DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH); + return DataTypes.VARBINARY(relDataType.getPrecision()); case DECIMAL: return DataTypes.DECIMAL(relDataType.getPrecision(), relDataType.getScale()); case ARRAY: @@ -298,7 +495,7 @@ public class DataTypeConverter { case VARBINARY: return convertToBinary(value); case DECIMAL: - return convertToDecimalOriginal(value); + return convertToDecimal(value); case ROW: return value; case ARRAY: @@ -620,6 +817,7 @@ public class DataTypeConverter { } } + // convert to DecimalData private static Object convertToDecimal(Object obj) { if (obj instanceof BigDecimal) { BigDecimal bigDecimalValue = (BigDecimal) obj; @@ -632,16 +830,4 @@ public class DataTypeConverter { "Unsupported Decimal value type: " + obj.getClass().getSimpleName()); } } - - private static Object convertToDecimalOriginal(Object obj) { - if (obj instanceof BigDecimal) { - return obj; - } else if (obj instanceof DecimalData) { - DecimalData decimalData = (DecimalData) obj; - return decimalData.toBigDecimal(); - } else { - throw new UnsupportedOperationException( - "Unsupported Decimal value type: " + obj.getClass().getSimpleName()); - } - } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 3c5aae6db..fc5dfd447 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -303,7 +303,7 @@ public class TransformParserTest { testFilterExpression("cast(1 as bigint)", "castToLong(1)"); testFilterExpression("cast(1 as float)", "castToFloat(1)"); testFilterExpression("cast(1 as double)", "castToDouble(1)"); - testFilterExpression("cast(1 as decimal)", "castToBigDecimal(1, 10, 0)"); + testFilterExpression("cast(1 as decimal)", "castToDecimalData(1, 10, 0)"); testFilterExpression("cast(1 as char)", "castToString(1)"); testFilterExpression("cast(1 as varchar)", "castToString(1)"); testFilterExpression("cast(null as int)", "castToInteger(null)"); @@ -314,7 +314,7 @@ public class TransformParserTest { testFilterExpression("cast(null as bigint)", "castToLong(null)"); testFilterExpression("cast(null as float)", "castToFloat(null)"); testFilterExpression("cast(null as double)", "castToDouble(null)"); - testFilterExpression("cast(null as decimal)", "castToBigDecimal(null, 10, 0)"); + testFilterExpression("cast(null as decimal)", "castToDecimalData(null, 10, 0)"); testFilterExpression("cast(null as char)", "castToString(null)"); testFilterExpression("cast(null as varchar)", "castToString(null)"); testFilterExpression( @@ -328,15 +328,18 @@ public class TransformParserTest { List testColumns = Arrays.asList( Column.physicalColumn("id", DataTypes.INT(), "id"), - Column.physicalColumn("name", DataTypes.STRING(), "string"), + Column.physicalColumn("name", DataTypes.STRING(), "name"), Column.physicalColumn("age", DataTypes.INT(), "age"), - Column.physicalColumn("address", DataTypes.STRING(), "address"), + Column.physicalColumn( + "createTime", DataTypes.TIMESTAMP(3), "newCreateTime"), + Column.physicalColumn("address", DataTypes.VARCHAR(50), "newAddress"), + Column.physicalColumn("deposit", DataTypes.DECIMAL(10, 2), "deposit"), Column.physicalColumn("weight", DataTypes.DOUBLE(), "weight"), Column.physicalColumn("height", DataTypes.DOUBLE(), "height")); List result = TransformParser.generateProjectionColumns( - "id, upper(name) as name, age + 1 as newage, weight / (height * height) as bmi", + "id, upper(name) as name, age + 1 as newage, createTime as newCreateTime, address as newAddress, deposit as deposits, weight / (height * height) as bmi", testColumns, Collections.emptyList(), new SupportedMetadataColumn[0]); @@ -346,6 +349,9 @@ public class TransformParserTest { "ProjectionColumn{column=`id` INT 'id', expression='id', scriptExpression='id', originalColumnNames=[id], transformExpressionKey=null}", "ProjectionColumn{column=`name` STRING, expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)', originalColumnNames=[name], transformExpressionKey=null}", "ProjectionColumn{column=`newage` INT, expression='`TB`.`age` + 1', scriptExpression='age + 1', originalColumnNames=[age], transformExpressionKey=null}", + "ProjectionColumn{column=`newCreateTime` TIMESTAMP(3) 'newCreateTime', expression='createTime', scriptExpression='createTime', originalColumnNames=[createTime], transformExpressionKey=null}", + "ProjectionColumn{column=`newAddress` VARCHAR(50) 'newAddress', expression='address', scriptExpression='address', originalColumnNames=[address], transformExpressionKey=null}", + "ProjectionColumn{column=`deposits` DECIMAL(10, 2) 'deposit', expression='deposit', scriptExpression='deposit', originalColumnNames=[deposit], transformExpressionKey=null}", "ProjectionColumn{column=`bmi` DOUBLE, expression='`TB`.`weight` / (`TB`.`height` * `TB`.`height`)', scriptExpression='weight / height * height', originalColumnNames=[weight, height, height], transformExpressionKey=null}"); Assertions.assertThat(result).hasToString("[" + String.join(", ", expected) + "]"); @@ -359,9 +365,11 @@ public class TransformParserTest { List metadataExpected = Arrays.asList( "ProjectionColumn{column=`id` INT 'id', expression='id', scriptExpression='id', originalColumnNames=[id], transformExpressionKey=null}", - "ProjectionColumn{column=`name` STRING 'string', expression='name', scriptExpression='name', originalColumnNames=[name], transformExpressionKey=null}", + "ProjectionColumn{column=`name` STRING 'name', expression='name', scriptExpression='name', originalColumnNames=[name], transformExpressionKey=null}", "ProjectionColumn{column=`age` INT 'age', expression='age', scriptExpression='age', originalColumnNames=[age], transformExpressionKey=null}", - "ProjectionColumn{column=`address` STRING 'address', expression='address', scriptExpression='address', originalColumnNames=[address], transformExpressionKey=null}", + "ProjectionColumn{column=`createTime` TIMESTAMP(3) 'newCreateTime', expression='createTime', scriptExpression='createTime', originalColumnNames=[createTime], transformExpressionKey=null}", + "ProjectionColumn{column=`address` VARCHAR(50) 'newAddress', expression='address', scriptExpression='address', originalColumnNames=[address], transformExpressionKey=null}", + "ProjectionColumn{column=`deposit` DECIMAL(10, 2) 'deposit', expression='deposit', scriptExpression='deposit', originalColumnNames=[deposit], transformExpressionKey=null}", "ProjectionColumn{column=`weight` DOUBLE 'weight', expression='weight', scriptExpression='weight', originalColumnNames=[weight], transformExpressionKey=null}", "ProjectionColumn{column=`height` DOUBLE 'height', expression='height', scriptExpression='height', originalColumnNames=[height], transformExpressionKey=null}", "ProjectionColumn{column=`__namespace_name__` STRING NOT NULL, expression='__namespace_name__', scriptExpression='__namespace_name__', originalColumnNames=[__namespace_name__], transformExpressionKey=null}", @@ -384,6 +392,42 @@ public class TransformParserTest { "Unrecognized projection expression: 1 + 1. Should be AS "); } + @Test + public void testGenerateProjectionColumnsWithPrecision() { + List testColumns = + Arrays.asList( + Column.physicalColumn("id", DataTypes.INT(), "id"), + Column.physicalColumn("name", DataTypes.VARCHAR(50), "name"), + Column.physicalColumn("sex", DataTypes.CHAR(1), "sex"), + Column.physicalColumn("address", DataTypes.BINARY(50), "address"), + Column.physicalColumn("phone", DataTypes.VARBINARY(50), "phone"), + Column.physicalColumn("deposit", DataTypes.DECIMAL(10, 2), "deposit"), + Column.physicalColumn("birthday", DataTypes.TIMESTAMP(3), "birthday"), + Column.physicalColumn( + "birthday_ltz", DataTypes.TIMESTAMP_LTZ(3), "birthday_ltz"), + Column.physicalColumn("update_time", DataTypes.TIME(3), "update_time")); + + List result = + TransformParser.generateProjectionColumns( + "id, UPPER(name) as name2, UPPER(sex) as sex2, COALESCE(address,address) as address2, COALESCE(phone,phone) as phone2, COALESCE(deposit,deposit) as deposit2, COALESCE(birthday,birthday) as birthday2, COALESCE(birthday_ltz,birthday_ltz) as birthday_ltz2, COALESCE(update_time,update_time) as update_time2", + testColumns, + Collections.emptyList(), + new SupportedMetadataColumn[0]); + + List expected = + Arrays.asList( + "ProjectionColumn{column=`id` INT 'id', expression='id', scriptExpression='id', originalColumnNames=[id], transformExpressionKey=null}", + "ProjectionColumn{column=`name2` STRING, expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)', originalColumnNames=[name], transformExpressionKey=null}", + "ProjectionColumn{column=`sex2` STRING, expression='UPPER(`TB`.`sex`)', scriptExpression='upper(sex)', originalColumnNames=[sex], transformExpressionKey=null}", + "ProjectionColumn{column=`address2` BINARY(50), expression='CASE WHEN `TB`.`address` IS NOT NULL THEN `TB`.`address` ELSE `TB`.`address` END', scriptExpression='(null != address ? address : address)', originalColumnNames=[address, address, address], transformExpressionKey=null}", + "ProjectionColumn{column=`phone2` VARBINARY(50), expression='CASE WHEN `TB`.`phone` IS NOT NULL THEN `TB`.`phone` ELSE `TB`.`phone` END', scriptExpression='(null != phone ? phone : phone)', originalColumnNames=[phone, phone, phone], transformExpressionKey=null}", + "ProjectionColumn{column=`deposit2` DECIMAL(10, 2), expression='CASE WHEN `TB`.`deposit` IS NOT NULL THEN `TB`.`deposit` ELSE `TB`.`deposit` END', scriptExpression='(null != deposit ? deposit : deposit)', originalColumnNames=[deposit, deposit, deposit], transformExpressionKey=null}", + "ProjectionColumn{column=`birthday2` TIMESTAMP(3), expression='CASE WHEN `TB`.`birthday` IS NOT NULL THEN `TB`.`birthday` ELSE `TB`.`birthday` END', scriptExpression='(null != birthday ? birthday : birthday)', originalColumnNames=[birthday, birthday, birthday], transformExpressionKey=null}", + "ProjectionColumn{column=`birthday_ltz2` TIMESTAMP_LTZ(3), expression='CASE WHEN `TB`.`birthday_ltz` IS NOT NULL THEN `TB`.`birthday_ltz` ELSE `TB`.`birthday_ltz` END', scriptExpression='(null != birthday_ltz ? birthday_ltz : birthday_ltz)', originalColumnNames=[birthday_ltz, birthday_ltz, birthday_ltz], transformExpressionKey=null}", + "ProjectionColumn{column=`update_time2` TIME(3), expression='CASE WHEN `TB`.`update_time` IS NOT NULL THEN `TB`.`update_time` ELSE `TB`.`update_time` END', scriptExpression='(null != update_time ? update_time : update_time)', originalColumnNames=[update_time, update_time, update_time], transformExpressionKey=null}"); + Assertions.assertThat(result).hasToString("[" + String.join(", ", expected) + "]"); + } + @Test public void testGenerateReferencedColumns() { List testColumns =