[FLINK-36741][transform] Fix the decimal precision and length lost during transform

This closes #3740
pull/3810/merge
Wink 2 weeks ago committed by GitHub
parent 8b554458d3
commit 79e868b864
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1979,7 +1979,7 @@ class FlinkPipelineTransformITCase {
+ "2147483648 AS greater_than_int_max, "
+ "-2147483648 AS int_min, "
+ "-2147483649 AS less_than_int_min, "
+ "CAST(1234567890123456789 AS DECIMAL(20, 0)) AS really_big_decimal",
+ "CAST(1234567890123456789 AS DECIMAL(19, 0)) AS really_big_decimal",
"CAST(id AS BIGINT) + 2147483648 > 2147483649", // equivalent to id > 1
null,
null,

@ -841,7 +841,7 @@ public class FlinkPipelineUdfITCase {
@ParameterizedTest
@MethodSource("testParams")
@Disabled("For manual test as there is a limit for quota.")
void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception {
void testTransformWithModel(ValuesDataSink.SinkApi sinkApi, String language) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
@ -909,6 +909,112 @@ public class FlinkPipelineUdfITCase {
.hasSize(9);
}
@ParameterizedTest
@MethodSource("testParams")
void testComplicatedUdfReturnTypes(ValuesDataSink.SinkApi sinkApi, String language)
throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup transform
TransformDef transformDef =
new TransformDef(
"default_namespace.default_schema.table1",
"*, get_char() AS char_col, get_varchar() AS varchar_col, get_binary() AS binary_col, get_varbinary() AS varbinary_col, get_ts() AS ts_col, get_ts_ltz() AS ts_ltz_col, get_decimal() AS decimal_col, get_non_null() AS non_null_col",
null,
"col1",
null,
"key1=value1",
"",
null);
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles");
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.singletonList(transformDef),
Arrays.asList(
new UdfDef(
"get_char",
String.format(
"org.apache.flink.cdc.udf.examples.%s.precision.CharTypeReturningClass",
language)),
new UdfDef(
"get_varchar",
String.format(
"org.apache.flink.cdc.udf.examples.%s.precision.VarCharTypeReturningClass",
language)),
new UdfDef(
"get_binary",
String.format(
"org.apache.flink.cdc.udf.examples.%s.precision.BinaryTypeReturningClass",
language)),
new UdfDef(
"get_varbinary",
String.format(
"org.apache.flink.cdc.udf.examples.%s.precision.VarBinaryTypeReturningClass",
language)),
new UdfDef(
"get_ts",
String.format(
"org.apache.flink.cdc.udf.examples.%s.precision.TimestampTypeReturningClass",
language)),
new UdfDef(
"get_ts_ltz",
String.format(
"org.apache.flink.cdc.udf.examples.%s.precision.LocalZonedTimestampTypeReturningClass",
language)),
new UdfDef(
"get_decimal",
String.format(
"org.apache.flink.cdc.udf.examples.%s.precision.DecimalTypeReturningClass",
language)),
new UdfDef(
"get_non_null",
String.format(
"org.apache.flink.cdc.udf.examples.%s.precision.DecimalTypeNonNullReturningClass",
language))),
pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.contains(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`char_col` STRING,`varchar_col` STRING,`binary_col` BINARY(17),`varbinary_col` VARBINARY(17),`ts_col` TIMESTAMP(2),`ts_ltz_col` TIMESTAMP_LTZ(2),`decimal_col` DECIMAL(10, 3),`non_null_col` DECIMAL(10, 3)}, primaryKeys=col1, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], after=[2, x, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=UPDATE, meta=()}");
}
private static Stream<Arguments> testParams() {
return Stream.of(
arguments(ValuesDataSink.SinkApi.SINK_FUNCTION, "java"),

@ -67,10 +67,13 @@ public class ValuesDataSinkHelper {
return fields.stream()
.map(
o -> {
if (o == null) {
return "null";
}
if (o instanceof byte[]) {
return BaseEncoding.base64().encode((byte[]) o);
} else {
return o;
return o.toString();
}
})
.collect(Collectors.toList());

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.java.precision;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
/** This is an example UDF class for testing purposes only. */
public class BinaryTypeReturningClass implements UserDefinedFunction {
@Override
public DataType getReturnType() {
return DataTypes.BINARY(17);
}
public byte[] eval() {
return "xyzzy".getBytes();
}
}

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.java.precision;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
/** This is an example UDF class for testing purposes only. */
public class CharTypeReturningClass implements UserDefinedFunction {
@Override
public DataType getReturnType() {
return DataTypes.CHAR(17);
}
public String eval() {
return "This is a string.";
}
}

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.java.precision;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
import java.math.BigDecimal;
/** This is an example UDF class for testing purposes only. */
public class DecimalTypeNonNullReturningClass implements UserDefinedFunction {
@Override
public DataType getReturnType() {
return DataTypes.DECIMAL(10, 3).notNull();
}
public DecimalData eval() {
return DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3);
}
}

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.java.precision;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
import java.math.BigDecimal;
/** This is an example UDF class for testing purposes only. */
public class DecimalTypeReturningClass implements UserDefinedFunction {
@Override
public DataType getReturnType() {
return DataTypes.DECIMAL(10, 3);
}
public DecimalData eval() {
return DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3);
}
}

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.java.precision;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
/** This is an example UDF class for testing purposes only. */
public class LocalZonedTimestampTypeReturningClass implements UserDefinedFunction {
@Override
public DataType getReturnType() {
return DataTypes.TIMESTAMP_LTZ(2);
}
public LocalZonedTimestampData eval() {
return LocalZonedTimestampData.fromEpochMillis(24 * 60 * 60 * 1000);
}
}

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.java.precision;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
/** This is an example UDF class for testing purposes only. */
public class TimestampTypeReturningClass implements UserDefinedFunction {
@Override
public DataType getReturnType() {
return DataTypes.TIMESTAMP(2);
}
public TimestampData eval() {
return TimestampData.fromMillis(24 * 60 * 60 * 1000);
}
}

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.java.precision;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
/** This is an example UDF class for testing purposes only. */
public class VarBinaryTypeReturningClass implements UserDefinedFunction {
@Override
public DataType getReturnType() {
return DataTypes.VARBINARY(17);
}
public byte[] eval() {
return "xyzzy".getBytes();
}
}

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.java.precision;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
/** This is an example UDF class for testing purposes only. */
public class VarCharTypeReturningClass implements UserDefinedFunction {
@Override
public DataType getReturnType() {
return DataTypes.VARCHAR(17);
}
public String eval() {
return "This is a string.";
}
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.scala.precision
import org.apache.flink.cdc.common.types.DataType
import org.apache.flink.cdc.common.types.DataTypes
import org.apache.flink.cdc.common.udf.UserDefinedFunction
/** This is an example UDF class for testing purposes only. */
class BinaryTypeReturningClass extends UserDefinedFunction {
override def getReturnType: DataType = DataTypes.BINARY(17)
def eval: Array[Byte] = "xyzzy".getBytes
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.scala.precision
import org.apache.flink.cdc.common.types.DataType
import org.apache.flink.cdc.common.types.DataTypes
import org.apache.flink.cdc.common.udf.UserDefinedFunction
/** This is an example UDF class for testing purposes only. */
class CharTypeReturningClass extends UserDefinedFunction {
override def getReturnType: DataType = DataTypes.CHAR(17)
def eval = "This is a string."
}

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.scala.precision
import org.apache.flink.cdc.common.data.DecimalData
import org.apache.flink.cdc.common.types.{DataType, DataTypes}
import org.apache.flink.cdc.common.udf.UserDefinedFunction
import java.math.BigDecimal
/** This is an example UDF class for testing purposes only. */
class DecimalTypeNonNullReturningClass extends UserDefinedFunction {
override def getReturnType: DataType = DataTypes.DECIMAL(10, 3).notNull()
def eval: DecimalData = DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3)
}

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.scala.precision
import org.apache.flink.cdc.common.data.DecimalData
import org.apache.flink.cdc.common.types.DataType
import org.apache.flink.cdc.common.types.DataTypes
import org.apache.flink.cdc.common.udf.UserDefinedFunction
import java.math.BigDecimal
/** This is an example UDF class for testing purposes only. */
class DecimalTypeReturningClass extends UserDefinedFunction {
override def getReturnType: DataType = DataTypes.DECIMAL(10, 3)
def eval: DecimalData = DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3)
}

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.scala.precision
import org.apache.flink.cdc.common.data.LocalZonedTimestampData
import org.apache.flink.cdc.common.types.DataType
import org.apache.flink.cdc.common.types.DataTypes
import org.apache.flink.cdc.common.udf.UserDefinedFunction
/** This is an example UDF class for testing purposes only. */
class LocalZonedTimestampTypeReturningClass extends UserDefinedFunction {
override def getReturnType: DataType = DataTypes.TIMESTAMP_LTZ(2)
def eval: LocalZonedTimestampData = LocalZonedTimestampData.fromEpochMillis(24 * 60 * 60 * 1000)
}

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.scala.precision
import org.apache.flink.cdc.common.data.TimestampData
import org.apache.flink.cdc.common.types.DataType
import org.apache.flink.cdc.common.types.DataTypes
import org.apache.flink.cdc.common.udf.UserDefinedFunction
/** This is an example UDF class for testing purposes only. */
class TimestampTypeReturningClass extends UserDefinedFunction {
override def getReturnType: DataType = DataTypes.TIMESTAMP(2)
def eval: TimestampData = TimestampData.fromMillis(24 * 60 * 60 * 1000)
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.scala.precision
import org.apache.flink.cdc.common.types.DataType
import org.apache.flink.cdc.common.types.DataTypes
import org.apache.flink.cdc.common.udf.UserDefinedFunction
/** This is an example UDF class for testing purposes only. */
class VarBinaryTypeReturningClass extends UserDefinedFunction {
override def getReturnType: DataType = DataTypes.VARBINARY(17)
def eval: Array[Byte] = "xyzzy".getBytes
}

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.udf.examples.scala.precision
import org.apache.flink.cdc.common.types.DataType
import org.apache.flink.cdc.common.types.DataTypes
import org.apache.flink.cdc.common.udf.UserDefinedFunction
/** This is an example UDF class for testing purposes only. */
class VarCharTypeReturningClass extends UserDefinedFunction {
override def getReturnType: DataType = DataTypes.VARCHAR(17)
def eval = "This is a string."
}

@ -17,6 +17,7 @@
package org.apache.flink.cdc.runtime.functions;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
@ -258,6 +259,14 @@ public class SystemFunctionUtils {
return value.compareTo(minValue) >= 0 && value.compareTo(maxValue) <= 0;
}
public static boolean betweenAsymmetric(
DecimalData value, DecimalData minValue, DecimalData maxValue) {
if (value == null) {
return false;
}
return value.compareTo(minValue) >= 0 && value.compareTo(maxValue) <= 0;
}
public static boolean notBetweenAsymmetric(String value, String minValue, String maxValue) {
return !betweenAsymmetric(value, minValue, maxValue);
}
@ -287,6 +296,11 @@ public class SystemFunctionUtils {
return !betweenAsymmetric(value, minValue, maxValue);
}
public static boolean notBetweenAsymmetric(
DecimalData value, DecimalData minValue, DecimalData maxValue) {
return !betweenAsymmetric(value, minValue, maxValue);
}
public static boolean in(String value, String... str) {
return Arrays.stream(str).anyMatch(item -> value.equals(item));
}
@ -315,6 +329,10 @@ public class SystemFunctionUtils {
return Arrays.stream(values).anyMatch(item -> value.equals(item));
}
public static boolean in(DecimalData value, DecimalData... values) {
return Arrays.stream(values).anyMatch(item -> value.equals(item));
}
public static boolean notIn(String value, String... values) {
return !in(value, values);
}
@ -343,6 +361,10 @@ public class SystemFunctionUtils {
return !in(value, values);
}
public static boolean notIn(DecimalData value, DecimalData... values) {
return !in(value, values);
}
public static int charLength(String str) {
return str.length();
}
@ -577,11 +599,27 @@ public class SystemFunctionUtils {
return round(b0, 0);
}
/** SQL <code>ROUND</code> operator applied to BigDecimal values. */
public static DecimalData round(DecimalData b0) {
return round(b0, 0);
}
/** SQL <code>ROUND</code> operator applied to BigDecimal values. */
public static BigDecimal round(BigDecimal b0, int b1) {
return b0.movePointRight(b1).setScale(0, RoundingMode.HALF_UP).movePointLeft(b1);
}
/** SQL <code>ROUND</code> operator applied to DecimalData values. */
public static DecimalData round(DecimalData b0, int b1) {
return DecimalData.fromBigDecimal(
b0.toBigDecimal()
.movePointRight(b1)
.setScale(0, RoundingMode.HALF_UP)
.movePointLeft(b1),
b0.precision(),
b0.scale());
}
/** SQL <code>ROUND</code> operator applied to float values. */
public static float round(float b0) {
return round(b0, 0);
@ -649,6 +687,8 @@ public class SystemFunctionUtils {
return !object.equals(0d);
} else if (object instanceof BigDecimal) {
return ((BigDecimal) object).compareTo(BigDecimal.ZERO) != 0;
} else if (object instanceof DecimalData) {
return ((DecimalData) object).compareTo(DecimalData.zero(1, 0)) != 0;
}
return Boolean.valueOf(castToString(object));
}
@ -663,6 +703,9 @@ public class SystemFunctionUtils {
if (object instanceof BigDecimal) {
return ((BigDecimal) object).byteValue();
}
if (object instanceof DecimalData) {
return ((DecimalData) object).toBigDecimal().byteValue();
}
if (object instanceof Double) {
return ((Double) object).byteValue();
}
@ -693,6 +736,9 @@ public class SystemFunctionUtils {
if (object instanceof BigDecimal) {
return ((BigDecimal) object).shortValue();
}
if (object instanceof DecimalData) {
return ((DecimalData) object).toBigDecimal().shortValue();
}
if (object instanceof Double) {
return ((Double) object).shortValue();
}
@ -723,6 +769,9 @@ public class SystemFunctionUtils {
if (object instanceof BigDecimal) {
return ((BigDecimal) object).intValue();
}
if (object instanceof DecimalData) {
return ((DecimalData) object).toBigDecimal().intValue();
}
if (object instanceof Double) {
return ((Double) object).intValue();
}
@ -753,6 +802,9 @@ public class SystemFunctionUtils {
if (object instanceof BigDecimal) {
return ((BigDecimal) object).longValue();
}
if (object instanceof DecimalData) {
return ((DecimalData) object).toBigDecimal().longValue();
}
if (object instanceof Double) {
return ((Double) object).longValue();
}
@ -783,6 +835,9 @@ public class SystemFunctionUtils {
if (object instanceof BigDecimal) {
return ((BigDecimal) object).floatValue();
}
if (object instanceof DecimalData) {
return ((DecimalData) object).toBigDecimal().floatValue();
}
if (object instanceof Double) {
return ((Double) object).floatValue();
}
@ -806,6 +861,9 @@ public class SystemFunctionUtils {
if (object instanceof BigDecimal) {
return ((BigDecimal) object).doubleValue();
}
if (object instanceof DecimalData) {
return ((DecimalData) object).toBigDecimal().doubleValue();
}
if (object instanceof Double) {
return (Double) object;
}
@ -843,6 +901,30 @@ public class SystemFunctionUtils {
return bigDecimal;
}
public static DecimalData castToDecimalData(Object object, int precision, int scale) {
if (object == null) {
return null;
}
if (object instanceof Boolean) {
object = (Boolean) object ? 1 : 0;
}
BigDecimal bigDecimal;
try {
bigDecimal = new BigDecimal(castObjectIntoString(object), new MathContext(precision));
bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP);
} catch (NumberFormatException ignored) {
return null;
}
// If the precision overflows, null will be returned. Otherwise, we may accidentally emit a
// non-serializable object into the pipeline that breaks downstream.
if (bigDecimal.precision() > precision) {
return null;
}
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
public static TimestampData castToTimestamp(Object object, String timezone) {
if (object == null) {
return null;

@ -470,7 +470,7 @@ public class JaninoCompiler {
return new Java.MethodInvocation(
Location.NOWHERE,
null,
"castToBigDecimal",
"castToDecimalData",
newAtoms.toArray(new Java.Rvalue[0]));
case "CHAR":
case "VARCHAR":
@ -496,7 +496,7 @@ public class JaninoCompiler {
return String.format(
"(%s) __instanceOf%s.eval",
DataTypeConverter.convertOriginalClass(udfFunction.getReturnTypeHint())
.getName(),
.getCanonicalName(),
udfFunction.getClassName());
} else {
return String.format("__instanceOf%s.eval", udfFunction.getClassName());

@ -176,8 +176,8 @@ public class TransformParser {
new HepPlanner(new HepProgramBuilder().build()),
new RexBuilder(factory)),
StandardConvertletTable.INSTANCE,
SqlToRelConverter.config().withTrimUnusedFields(false));
RelRoot relRoot = sqlToRelConverter.convertQuery(validateSqlNode, false, true);
SqlToRelConverter.config().withTrimUnusedFields(true));
RelRoot relRoot = sqlToRelConverter.convertQuery(validateSqlNode, false, false);
return relRoot.rel;
}

@ -23,9 +23,7 @@ import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.util.Pair;
import java.util.ArrayList;
import java.util.List;
/** TransformTable to generate the metadata of calcite. */
@ -46,14 +44,6 @@ public class TransformTable extends AbstractTable {
@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
List<String> names = new ArrayList<>();
List<RelDataType> types = new ArrayList<>();
for (Column column : columns) {
names.add(column.getName());
RelDataType sqlType =
DataTypeConverter.convertCalciteType(relDataTypeFactory, column.getType());
types.add(sqlType);
}
return relDataTypeFactory.createStructType(Pair.zip(names, types));
return DataTypeConverter.convertCalciteRelDataType(relDataTypeFactory, columns);
}
}

@ -27,14 +27,27 @@ import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.ArrayType;
import org.apache.flink.cdc.common.types.BigIntType;
import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.BooleanType;
import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DateType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.DoubleType;
import org.apache.flink.cdc.common.types.FloatType;
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.MapType;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.types.SmallIntType;
import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.TinyIntType;
import org.apache.flink.cdc.common.types.VarBinaryType;
import org.apache.flink.cdc.common.types.VarCharType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@ -98,7 +111,7 @@ public class DataTypeConverter {
case VARBINARY:
return byte[].class;
case DECIMAL:
return BigDecimal.class;
return DecimalData.class;
case ROW:
return Object.class;
case ARRAY:
@ -110,6 +123,172 @@ public class DataTypeConverter {
}
}
public static RelDataType convertCalciteRelDataType(
RelDataTypeFactory typeFactory, List<Column> columns) {
RelDataTypeFactory.Builder fieldInfoBuilder = typeFactory.builder();
for (Column column : columns) {
switch (column.getType().getTypeRoot()) {
case BOOLEAN:
BooleanType booleanType = (BooleanType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.BOOLEAN)
.nullable(booleanType.isNullable());
break;
case TINYINT:
TinyIntType tinyIntType = (TinyIntType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.TINYINT)
.nullable(tinyIntType.isNullable());
break;
case SMALLINT:
SmallIntType smallIntType = (SmallIntType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.SMALLINT)
.nullable(smallIntType.isNullable());
break;
case INTEGER:
IntType intType = (IntType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.INTEGER)
.nullable(intType.isNullable());
break;
case BIGINT:
BigIntType bigIntType = (BigIntType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.BIGINT)
.nullable(bigIntType.isNullable());
break;
case DATE:
DateType dataType = (DateType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.DATE)
.nullable(dataType.isNullable());
break;
case TIME_WITHOUT_TIME_ZONE:
TimeType timeType = (TimeType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE,
timeType.getPrecision())
.nullable(timeType.isNullable());
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.TIMESTAMP,
timestampType.getPrecision())
.nullable(timestampType.isNullable());
break;
case TIMESTAMP_WITH_TIME_ZONE:
ZonedTimestampType zonedTimestampType = (ZonedTimestampType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.TIMESTAMP,
zonedTimestampType.getPrecision())
.nullable(zonedTimestampType.isNullable());
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
localZonedTimestampType.getPrecision())
.nullable(localZonedTimestampType.isNullable());
break;
case FLOAT:
FloatType floatType = (FloatType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.FLOAT)
.nullable(floatType.isNullable());
break;
case DOUBLE:
DoubleType doubleType = (DoubleType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.DOUBLE)
.nullable(doubleType.isNullable());
break;
case CHAR:
CharType charType = (CharType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.CHAR, charType.getLength())
.nullable(charType.isNullable());
break;
case VARCHAR:
VarCharType varCharType = (VarCharType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.VARCHAR, varCharType.getLength())
.nullable(varCharType.isNullable());
break;
case BINARY:
BinaryType binaryType = (BinaryType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.BINARY, binaryType.getLength())
.nullable(binaryType.isNullable());
break;
case VARBINARY:
VarBinaryType varBinaryType = (VarBinaryType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.VARBINARY, varBinaryType.getLength())
.nullable(varBinaryType.isNullable());
break;
case DECIMAL:
DecimalType decimalType = (DecimalType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.DECIMAL,
decimalType.getPrecision(),
decimalType.getScale())
.nullable(decimalType.isNullable());
break;
case ROW:
List<RelDataType> dataTypes =
((RowType) column.getType())
.getFieldTypes().stream()
.map((type) -> convertCalciteType(typeFactory, type))
.collect(Collectors.toList());
fieldInfoBuilder
.add(
column.getName(),
typeFactory.createStructType(
dataTypes,
((RowType) column.getType()).getFieldNames()))
.nullable(true);
break;
case ARRAY:
DataType elementType = ((ArrayType) column.getType()).getElementType();
fieldInfoBuilder
.add(
column.getName(),
typeFactory.createArrayType(
convertCalciteType(typeFactory, elementType), -1))
.nullable(true);
break;
case MAP:
RelDataType keyType =
convertCalciteType(
typeFactory, ((MapType) column.getType()).getKeyType());
RelDataType valueType =
convertCalciteType(
typeFactory, ((MapType) column.getType()).getValueType());
fieldInfoBuilder
.add(column.getName(), typeFactory.createMapType(keyType, valueType))
.nullable(true);
break;
default:
throw new UnsupportedOperationException(
"Unsupported type: " + column.getType());
}
}
return fieldInfoBuilder.build();
}
public static RelDataType convertCalciteType(
RelDataTypeFactory typeFactory, DataType dataType) {
switch (dataType.getTypeRoot()) {
@ -126,25 +305,43 @@ public class DataTypeConverter {
case DATE:
return typeFactory.createSqlType(SqlTypeName.DATE);
case TIME_WITHOUT_TIME_ZONE:
return typeFactory.createSqlType(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE);
TimeType timeType = (TimeType) dataType;
return typeFactory.createSqlType(
SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE, timeType.getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
TimestampType timestampType = (TimestampType) dataType;
return typeFactory.createSqlType(
SqlTypeName.TIMESTAMP, timestampType.getPrecision());
case TIMESTAMP_WITH_TIME_ZONE:
ZonedTimestampType zonedTimestampType = (ZonedTimestampType) dataType;
return typeFactory.createSqlType(
SqlTypeName.TIMESTAMP, zonedTimestampType.getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) dataType;
return typeFactory.createSqlType(
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
localZonedTimestampType.getPrecision());
case FLOAT:
return typeFactory.createSqlType(SqlTypeName.FLOAT);
case DOUBLE:
return typeFactory.createSqlType(SqlTypeName.DOUBLE);
case CHAR:
return typeFactory.createSqlType(SqlTypeName.CHAR);
CharType charType = (CharType) dataType;
return typeFactory.createSqlType(SqlTypeName.CHAR, charType.getLength());
case VARCHAR:
return typeFactory.createSqlType(SqlTypeName.VARCHAR);
VarCharType varCharType = (VarCharType) dataType;
return typeFactory.createSqlType(SqlTypeName.VARCHAR, varCharType.getLength());
case BINARY:
return typeFactory.createSqlType(SqlTypeName.BINARY);
BinaryType binaryType = (BinaryType) dataType;
return typeFactory.createSqlType(SqlTypeName.BINARY, binaryType.getLength());
case VARBINARY:
return typeFactory.createSqlType(SqlTypeName.VARBINARY);
VarBinaryType varBinaryType = (VarBinaryType) dataType;
return typeFactory.createSqlType(SqlTypeName.VARBINARY, varBinaryType.getLength());
case DECIMAL:
return typeFactory.createSqlType(SqlTypeName.DECIMAL);
DecimalType decimalType = (DecimalType) dataType;
return typeFactory.createSqlType(
SqlTypeName.DECIMAL, decimalType.getPrecision(), decimalType.getScale());
case ROW:
List<RelDataType> dataTypes =
((RowType) dataType)
@ -197,9 +394,9 @@ public class DataTypeConverter {
case VARCHAR:
return DataTypes.STRING();
case BINARY:
return DataTypes.BINARY(BinaryType.MAX_LENGTH);
return DataTypes.BINARY(relDataType.getPrecision());
case VARBINARY:
return DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH);
return DataTypes.VARBINARY(relDataType.getPrecision());
case DECIMAL:
return DataTypes.DECIMAL(relDataType.getPrecision(), relDataType.getScale());
case ARRAY:
@ -298,7 +495,7 @@ public class DataTypeConverter {
case VARBINARY:
return convertToBinary(value);
case DECIMAL:
return convertToDecimalOriginal(value);
return convertToDecimal(value);
case ROW:
return value;
case ARRAY:
@ -620,6 +817,7 @@ public class DataTypeConverter {
}
}
// convert to DecimalData
private static Object convertToDecimal(Object obj) {
if (obj instanceof BigDecimal) {
BigDecimal bigDecimalValue = (BigDecimal) obj;
@ -632,16 +830,4 @@ public class DataTypeConverter {
"Unsupported Decimal value type: " + obj.getClass().getSimpleName());
}
}
private static Object convertToDecimalOriginal(Object obj) {
if (obj instanceof BigDecimal) {
return obj;
} else if (obj instanceof DecimalData) {
DecimalData decimalData = (DecimalData) obj;
return decimalData.toBigDecimal();
} else {
throw new UnsupportedOperationException(
"Unsupported Decimal value type: " + obj.getClass().getSimpleName());
}
}
}

@ -303,7 +303,7 @@ public class TransformParserTest {
testFilterExpression("cast(1 as bigint)", "castToLong(1)");
testFilterExpression("cast(1 as float)", "castToFloat(1)");
testFilterExpression("cast(1 as double)", "castToDouble(1)");
testFilterExpression("cast(1 as decimal)", "castToBigDecimal(1, 10, 0)");
testFilterExpression("cast(1 as decimal)", "castToDecimalData(1, 10, 0)");
testFilterExpression("cast(1 as char)", "castToString(1)");
testFilterExpression("cast(1 as varchar)", "castToString(1)");
testFilterExpression("cast(null as int)", "castToInteger(null)");
@ -314,7 +314,7 @@ public class TransformParserTest {
testFilterExpression("cast(null as bigint)", "castToLong(null)");
testFilterExpression("cast(null as float)", "castToFloat(null)");
testFilterExpression("cast(null as double)", "castToDouble(null)");
testFilterExpression("cast(null as decimal)", "castToBigDecimal(null, 10, 0)");
testFilterExpression("cast(null as decimal)", "castToDecimalData(null, 10, 0)");
testFilterExpression("cast(null as char)", "castToString(null)");
testFilterExpression("cast(null as varchar)", "castToString(null)");
testFilterExpression(
@ -328,15 +328,18 @@ public class TransformParserTest {
List<Column> testColumns =
Arrays.asList(
Column.physicalColumn("id", DataTypes.INT(), "id"),
Column.physicalColumn("name", DataTypes.STRING(), "string"),
Column.physicalColumn("name", DataTypes.STRING(), "name"),
Column.physicalColumn("age", DataTypes.INT(), "age"),
Column.physicalColumn("address", DataTypes.STRING(), "address"),
Column.physicalColumn(
"createTime", DataTypes.TIMESTAMP(3), "newCreateTime"),
Column.physicalColumn("address", DataTypes.VARCHAR(50), "newAddress"),
Column.physicalColumn("deposit", DataTypes.DECIMAL(10, 2), "deposit"),
Column.physicalColumn("weight", DataTypes.DOUBLE(), "weight"),
Column.physicalColumn("height", DataTypes.DOUBLE(), "height"));
List<ProjectionColumn> result =
TransformParser.generateProjectionColumns(
"id, upper(name) as name, age + 1 as newage, weight / (height * height) as bmi",
"id, upper(name) as name, age + 1 as newage, createTime as newCreateTime, address as newAddress, deposit as deposits, weight / (height * height) as bmi",
testColumns,
Collections.emptyList(),
new SupportedMetadataColumn[0]);
@ -346,6 +349,9 @@ public class TransformParserTest {
"ProjectionColumn{column=`id` INT 'id', expression='id', scriptExpression='id', originalColumnNames=[id], transformExpressionKey=null}",
"ProjectionColumn{column=`name` STRING, expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)', originalColumnNames=[name], transformExpressionKey=null}",
"ProjectionColumn{column=`newage` INT, expression='`TB`.`age` + 1', scriptExpression='age + 1', originalColumnNames=[age], transformExpressionKey=null}",
"ProjectionColumn{column=`newCreateTime` TIMESTAMP(3) 'newCreateTime', expression='createTime', scriptExpression='createTime', originalColumnNames=[createTime], transformExpressionKey=null}",
"ProjectionColumn{column=`newAddress` VARCHAR(50) 'newAddress', expression='address', scriptExpression='address', originalColumnNames=[address], transformExpressionKey=null}",
"ProjectionColumn{column=`deposits` DECIMAL(10, 2) 'deposit', expression='deposit', scriptExpression='deposit', originalColumnNames=[deposit], transformExpressionKey=null}",
"ProjectionColumn{column=`bmi` DOUBLE, expression='`TB`.`weight` / (`TB`.`height` * `TB`.`height`)', scriptExpression='weight / height * height', originalColumnNames=[weight, height, height], transformExpressionKey=null}");
Assertions.assertThat(result).hasToString("[" + String.join(", ", expected) + "]");
@ -359,9 +365,11 @@ public class TransformParserTest {
List<String> metadataExpected =
Arrays.asList(
"ProjectionColumn{column=`id` INT 'id', expression='id', scriptExpression='id', originalColumnNames=[id], transformExpressionKey=null}",
"ProjectionColumn{column=`name` STRING 'string', expression='name', scriptExpression='name', originalColumnNames=[name], transformExpressionKey=null}",
"ProjectionColumn{column=`name` STRING 'name', expression='name', scriptExpression='name', originalColumnNames=[name], transformExpressionKey=null}",
"ProjectionColumn{column=`age` INT 'age', expression='age', scriptExpression='age', originalColumnNames=[age], transformExpressionKey=null}",
"ProjectionColumn{column=`address` STRING 'address', expression='address', scriptExpression='address', originalColumnNames=[address], transformExpressionKey=null}",
"ProjectionColumn{column=`createTime` TIMESTAMP(3) 'newCreateTime', expression='createTime', scriptExpression='createTime', originalColumnNames=[createTime], transformExpressionKey=null}",
"ProjectionColumn{column=`address` VARCHAR(50) 'newAddress', expression='address', scriptExpression='address', originalColumnNames=[address], transformExpressionKey=null}",
"ProjectionColumn{column=`deposit` DECIMAL(10, 2) 'deposit', expression='deposit', scriptExpression='deposit', originalColumnNames=[deposit], transformExpressionKey=null}",
"ProjectionColumn{column=`weight` DOUBLE 'weight', expression='weight', scriptExpression='weight', originalColumnNames=[weight], transformExpressionKey=null}",
"ProjectionColumn{column=`height` DOUBLE 'height', expression='height', scriptExpression='height', originalColumnNames=[height], transformExpressionKey=null}",
"ProjectionColumn{column=`__namespace_name__` STRING NOT NULL, expression='__namespace_name__', scriptExpression='__namespace_name__', originalColumnNames=[__namespace_name__], transformExpressionKey=null}",
@ -384,6 +392,42 @@ public class TransformParserTest {
"Unrecognized projection expression: 1 + 1. Should be <EXPR> AS <IDENTIFIER>");
}
@Test
public void testGenerateProjectionColumnsWithPrecision() {
List<Column> testColumns =
Arrays.asList(
Column.physicalColumn("id", DataTypes.INT(), "id"),
Column.physicalColumn("name", DataTypes.VARCHAR(50), "name"),
Column.physicalColumn("sex", DataTypes.CHAR(1), "sex"),
Column.physicalColumn("address", DataTypes.BINARY(50), "address"),
Column.physicalColumn("phone", DataTypes.VARBINARY(50), "phone"),
Column.physicalColumn("deposit", DataTypes.DECIMAL(10, 2), "deposit"),
Column.physicalColumn("birthday", DataTypes.TIMESTAMP(3), "birthday"),
Column.physicalColumn(
"birthday_ltz", DataTypes.TIMESTAMP_LTZ(3), "birthday_ltz"),
Column.physicalColumn("update_time", DataTypes.TIME(3), "update_time"));
List<ProjectionColumn> result =
TransformParser.generateProjectionColumns(
"id, UPPER(name) as name2, UPPER(sex) as sex2, COALESCE(address,address) as address2, COALESCE(phone,phone) as phone2, COALESCE(deposit,deposit) as deposit2, COALESCE(birthday,birthday) as birthday2, COALESCE(birthday_ltz,birthday_ltz) as birthday_ltz2, COALESCE(update_time,update_time) as update_time2",
testColumns,
Collections.emptyList(),
new SupportedMetadataColumn[0]);
List<String> expected =
Arrays.asList(
"ProjectionColumn{column=`id` INT 'id', expression='id', scriptExpression='id', originalColumnNames=[id], transformExpressionKey=null}",
"ProjectionColumn{column=`name2` STRING, expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)', originalColumnNames=[name], transformExpressionKey=null}",
"ProjectionColumn{column=`sex2` STRING, expression='UPPER(`TB`.`sex`)', scriptExpression='upper(sex)', originalColumnNames=[sex], transformExpressionKey=null}",
"ProjectionColumn{column=`address2` BINARY(50), expression='CASE WHEN `TB`.`address` IS NOT NULL THEN `TB`.`address` ELSE `TB`.`address` END', scriptExpression='(null != address ? address : address)', originalColumnNames=[address, address, address], transformExpressionKey=null}",
"ProjectionColumn{column=`phone2` VARBINARY(50), expression='CASE WHEN `TB`.`phone` IS NOT NULL THEN `TB`.`phone` ELSE `TB`.`phone` END', scriptExpression='(null != phone ? phone : phone)', originalColumnNames=[phone, phone, phone], transformExpressionKey=null}",
"ProjectionColumn{column=`deposit2` DECIMAL(10, 2), expression='CASE WHEN `TB`.`deposit` IS NOT NULL THEN `TB`.`deposit` ELSE `TB`.`deposit` END', scriptExpression='(null != deposit ? deposit : deposit)', originalColumnNames=[deposit, deposit, deposit], transformExpressionKey=null}",
"ProjectionColumn{column=`birthday2` TIMESTAMP(3), expression='CASE WHEN `TB`.`birthday` IS NOT NULL THEN `TB`.`birthday` ELSE `TB`.`birthday` END', scriptExpression='(null != birthday ? birthday : birthday)', originalColumnNames=[birthday, birthday, birthday], transformExpressionKey=null}",
"ProjectionColumn{column=`birthday_ltz2` TIMESTAMP_LTZ(3), expression='CASE WHEN `TB`.`birthday_ltz` IS NOT NULL THEN `TB`.`birthday_ltz` ELSE `TB`.`birthday_ltz` END', scriptExpression='(null != birthday_ltz ? birthday_ltz : birthday_ltz)', originalColumnNames=[birthday_ltz, birthday_ltz, birthday_ltz], transformExpressionKey=null}",
"ProjectionColumn{column=`update_time2` TIME(3), expression='CASE WHEN `TB`.`update_time` IS NOT NULL THEN `TB`.`update_time` ELSE `TB`.`update_time` END', scriptExpression='(null != update_time ? update_time : update_time)', originalColumnNames=[update_time, update_time, update_time], transformExpressionKey=null}");
Assertions.assertThat(result).hasToString("[" + String.join(", ", expected) + "]");
}
@Test
public void testGenerateReferencedColumns() {
List<Column> testColumns =

Loading…
Cancel
Save