diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 0c3fb5dca..d9c02929b 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1779,6 +1779,104 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); } + @Test + void testTransformWithLargeLiterals() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1"); + List events = generateSchemaEvolutionEvents(tableId); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + "\\.*.\\.*.\\.*", + "*, 2147483647 AS int_max, " + + "2147483648 AS greater_than_int_max, " + + "-2147483648 AS int_min, " + + "-2147483649 AS less_than_int_min, " + + "CAST(1234567890123456789 AS DECIMAL(20, 0)) AS really_big_decimal", + "CAST(id AS BIGINT) + 2147483648 > 2147483649", // equivalent to id > 1 + null, + null, + null, + null)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + assertThat(outputEvents) + .containsExactly( + // Initial stage + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`int_max` INT,`greater_than_int_max` BIGINT,`int_min` INT,`less_than_int_min` BIGINT,`really_big_decimal` DECIMAL(19, 0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[3, Colin, 24, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}", + + // Add Column + "AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[2nd, 5, Eva, 20, 2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}", + + // Alter column type + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[5th, 8, Harry, 18.0, -3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}", + + // Rename column + "RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[8th, 11, Kella, 18.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}", + + // Drop column + "DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[11th, 14, Nein, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}"); + } + private List generateSchemaEvolutionEvents(TableId tableId) { List events = new ArrayList<>(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 9a5c5680c..6f5b26125 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -31,6 +31,7 @@ import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlNumericLiteral; import org.apache.calcite.sql.fun.SqlCase; import org.apache.calcite.sql.type.SqlTypeName; import org.codehaus.commons.compiler.CompileException; @@ -138,6 +139,13 @@ public class JaninoCompiler { if (sqlLiteral instanceof SqlCharStringLiteral) { // Double quotation marks represent strings in Janino. value = "\"" + value.substring(1, value.length() - 1) + "\""; + } else if (sqlLiteral instanceof SqlNumericLiteral) { + if (((SqlNumericLiteral) sqlLiteral).isInteger()) { + long longValue = sqlLiteral.longValue(true); + if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) { + value += "L"; + } + } } if (SQL_TYPE_NAME_IGNORE.contains(sqlLiteral.getTypeName())) { value = "\"" + value + "\""; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java index 6fbaef3bf..dbfd1fb38 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.runtime.parser; +import org.apache.flink.api.java.tuple.Tuple2; + import org.codehaus.commons.compiler.CompileException; import org.codehaus.commons.compiler.Location; import org.codehaus.janino.ExpressionEvaluator; @@ -34,6 +36,9 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for the {@link JaninoCompiler}. */ public class JaninoCompilerTest { @@ -120,4 +125,81 @@ public class JaninoCompilerTest { Object evaluate = expressionEvaluator.evaluate(params.toArray()); Assert.assertEquals(3.0, evaluate); } + + @Test + public void testLargeNumericLiterals() { + // Test parsing integer literals + Stream.of( + Tuple2.of("0", 0), + Tuple2.of("1", 1), + Tuple2.of("1", 1), + Tuple2.of("2147483647", 2147483647), + Tuple2.of("-2147483648", -2147483648)) + .forEach( + t -> { + String expression = t.f0; + List columnNames = new ArrayList<>(); + List> paramTypes = new ArrayList<>(); + ExpressionEvaluator expressionEvaluator = + JaninoCompiler.compileExpression( + JaninoCompiler.loadSystemFunction(expression), + columnNames, + paramTypes, + Integer.class); + try { + assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } + }); + + // Test parsing double literals + Stream.of( + Tuple2.of("3.1415926", 3.1415926), + Tuple2.of("0.0", 0.0), + Tuple2.of("17.0", 17.0), + Tuple2.of("123456789.123456789", 123456789.123456789), + Tuple2.of("-987654321.987654321", -987654321.987654321)) + .forEach( + t -> { + String expression = t.f0; + List columnNames = new ArrayList<>(); + List> paramTypes = new ArrayList<>(); + ExpressionEvaluator expressionEvaluator = + JaninoCompiler.compileExpression( + JaninoCompiler.loadSystemFunction(expression), + columnNames, + paramTypes, + Double.class); + try { + assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } + }); + + // Test parsing long literals + Stream.of( + Tuple2.of("2147483648L", 2147483648L), + Tuple2.of("-2147483649L", -2147483649L), + Tuple2.of("9223372036854775807L", 9223372036854775807L), + Tuple2.of("-9223372036854775808L", -9223372036854775808L)) + .forEach( + t -> { + String expression = t.f0; + List columnNames = new ArrayList<>(); + List> paramTypes = new ArrayList<>(); + ExpressionEvaluator expressionEvaluator = + JaninoCompiler.compileExpression( + JaninoCompiler.loadSystemFunction(expression), + columnNames, + paramTypes, + Long.class); + try { + assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } + }); + } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index f17f27c9d..0fd97c1b2 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -36,6 +36,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.runtime.CalciteContextException; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; @@ -461,6 +462,34 @@ public class TransformParserTest { "__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)) > 4 || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")"); } + @Test + void testLargeNumericalLiterals() { + // For literals within [-2147483648, 2147483647] range, plain Integers are OK + testFilterExpression("id > 2147483647", "id > 2147483647"); + testFilterExpression("id < -2147483648", "id < -2147483648"); + + // For out-of-range literals, an extra `L` suffix is required + testFilterExpression("id > 2147483648", "id > 2147483648L"); + testFilterExpression("id > -2147483649", "id > -2147483649L"); + testFilterExpression("id < 9223372036854775807", "id < 9223372036854775807L"); + testFilterExpression("id > -9223372036854775808", "id > -9223372036854775808L"); + + // But there's still a limit + Assertions.assertThatThrownBy( + () -> + TransformParser.translateFilterExpressionToJaninoExpression( + "id > 9223372036854775808", Collections.emptyList())) + .isExactlyInstanceOf(CalciteContextException.class) + .hasMessageContaining("Numeric literal '9223372036854775808' out of range"); + + Assertions.assertThatThrownBy( + () -> + TransformParser.translateFilterExpressionToJaninoExpression( + "id < -9223372036854775809", Collections.emptyList())) + .isExactlyInstanceOf(CalciteContextException.class) + .hasMessageContaining("Numeric literal '-9223372036854775809' out of range"); + } + private void testFilterExpression(String expression, String expressionExpect) { String janinoExpression = TransformParser.translateFilterExpressionToJaninoExpression(