[FLINK-36864][cdc-runtime] Fix unable to use numeric literals that goes beyond Int32 range (#3785)

Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
pull/3807/head
yuxiqian 1 month ago committed by GitHub
parent 6d21941661
commit 2ad80066bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1779,6 +1779,104 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
}
@Test
void testTransformWithLargeLiterals() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
List<Event> events = generateSchemaEvolutionEvents(tableId);
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
// Setup value sink
Configuration sinkConfig = new Configuration();
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
"\\.*.\\.*.\\.*",
"*, 2147483647 AS int_max, "
+ "2147483648 AS greater_than_int_max, "
+ "-2147483648 AS int_min, "
+ "-2147483649 AS less_than_int_min, "
+ "CAST(1234567890123456789 AS DECIMAL(20, 0)) AS really_big_decimal",
"CAST(id AS BIGINT) + 2147483648 > 2147483649", // equivalent to id > 1
null,
null,
null,
null)),
Collections.emptyList(),
pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
// Initial stage
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`int_max` INT,`greater_than_int_max` BIGINT,`int_min` INT,`less_than_int_min` BIGINT,`really_big_decimal` DECIMAL(19, 0)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[3, Colin, 24, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",
// Add Column
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[2nd, 5, Eva, 20, 2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",
// Alter column type
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[5th, 8, Harry, 18.0, -3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",
// Rename column
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[8th, 11, Kella, 18.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",
// Drop column
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[11th, 14, Nein, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}");
}
private List<Event> generateSchemaEvolutionEvents(TableId tableId) {
List<Event> events = new ArrayList<>();

@ -31,6 +31,7 @@ import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlNumericLiteral;
import org.apache.calcite.sql.fun.SqlCase;
import org.apache.calcite.sql.type.SqlTypeName;
import org.codehaus.commons.compiler.CompileException;
@ -138,6 +139,13 @@ public class JaninoCompiler {
if (sqlLiteral instanceof SqlCharStringLiteral) {
// Double quotation marks represent strings in Janino.
value = "\"" + value.substring(1, value.length() - 1) + "\"";
} else if (sqlLiteral instanceof SqlNumericLiteral) {
if (((SqlNumericLiteral) sqlLiteral).isInteger()) {
long longValue = sqlLiteral.longValue(true);
if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) {
value += "L";
}
}
}
if (SQL_TYPE_NAME_IGNORE.contains(sqlLiteral.getTypeName())) {
value = "\"" + value + "\"";

@ -17,6 +17,8 @@
package org.apache.flink.cdc.runtime.parser;
import org.apache.flink.api.java.tuple.Tuple2;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.commons.compiler.Location;
import org.codehaus.janino.ExpressionEvaluator;
@ -34,6 +36,9 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit tests for the {@link JaninoCompiler}. */
public class JaninoCompilerTest {
@ -120,4 +125,81 @@ public class JaninoCompilerTest {
Object evaluate = expressionEvaluator.evaluate(params.toArray());
Assert.assertEquals(3.0, evaluate);
}
@Test
public void testLargeNumericLiterals() {
// Test parsing integer literals
Stream.of(
Tuple2.of("0", 0),
Tuple2.of("1", 1),
Tuple2.of("1", 1),
Tuple2.of("2147483647", 2147483647),
Tuple2.of("-2147483648", -2147483648))
.forEach(
t -> {
String expression = t.f0;
List<String> columnNames = new ArrayList<>();
List<Class<?>> paramTypes = new ArrayList<>();
ExpressionEvaluator expressionEvaluator =
JaninoCompiler.compileExpression(
JaninoCompiler.loadSystemFunction(expression),
columnNames,
paramTypes,
Integer.class);
try {
assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
});
// Test parsing double literals
Stream.of(
Tuple2.of("3.1415926", 3.1415926),
Tuple2.of("0.0", 0.0),
Tuple2.of("17.0", 17.0),
Tuple2.of("123456789.123456789", 123456789.123456789),
Tuple2.of("-987654321.987654321", -987654321.987654321))
.forEach(
t -> {
String expression = t.f0;
List<String> columnNames = new ArrayList<>();
List<Class<?>> paramTypes = new ArrayList<>();
ExpressionEvaluator expressionEvaluator =
JaninoCompiler.compileExpression(
JaninoCompiler.loadSystemFunction(expression),
columnNames,
paramTypes,
Double.class);
try {
assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
});
// Test parsing long literals
Stream.of(
Tuple2.of("2147483648L", 2147483648L),
Tuple2.of("-2147483649L", -2147483649L),
Tuple2.of("9223372036854775807L", 9223372036854775807L),
Tuple2.of("-9223372036854775808L", -9223372036854775808L))
.forEach(
t -> {
String expression = t.f0;
List<String> columnNames = new ArrayList<>();
List<Class<?>> paramTypes = new ArrayList<>();
ExpressionEvaluator expressionEvaluator =
JaninoCompiler.compileExpression(
JaninoCompiler.loadSystemFunction(expression),
columnNames,
paramTypes,
Long.class);
try {
assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
});
}
}

@ -36,6 +36,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
@ -461,6 +462,34 @@ public class TransformParserTest {
"__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)) > 4 || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")");
}
@Test
void testLargeNumericalLiterals() {
// For literals within [-2147483648, 2147483647] range, plain Integers are OK
testFilterExpression("id > 2147483647", "id > 2147483647");
testFilterExpression("id < -2147483648", "id < -2147483648");
// For out-of-range literals, an extra `L` suffix is required
testFilterExpression("id > 2147483648", "id > 2147483648L");
testFilterExpression("id > -2147483649", "id > -2147483649L");
testFilterExpression("id < 9223372036854775807", "id < 9223372036854775807L");
testFilterExpression("id > -9223372036854775808", "id > -9223372036854775808L");
// But there's still a limit
Assertions.assertThatThrownBy(
() ->
TransformParser.translateFilterExpressionToJaninoExpression(
"id > 9223372036854775808", Collections.emptyList()))
.isExactlyInstanceOf(CalciteContextException.class)
.hasMessageContaining("Numeric literal '9223372036854775808' out of range");
Assertions.assertThatThrownBy(
() ->
TransformParser.translateFilterExpressionToJaninoExpression(
"id < -9223372036854775809", Collections.emptyList()))
.isExactlyInstanceOf(CalciteContextException.class)
.hasMessageContaining("Numeric literal '-9223372036854775809' out of range");
}
private void testFilterExpression(String expression, String expressionExpect) {
String janinoExpression =
TransformParser.translateFilterExpressionToJaninoExpression(

Loading…
Cancel
Save