From d77cf7fa759d64f257f67c4a9b713893cec84f5c Mon Sep 17 00:00:00 2001 From: wenmo <32723967+aiwenmo@users.noreply.github.com> Date: Thu, 7 Nov 2024 23:39:38 +0800 Subject: [PATCH] [FLINK-36647][transform] Support Timestampdiff and Timestampadd function in cdc pipeline transform This closes #3698 --- .../content.zh/docs/core-concept/transform.md | 1 + docs/content/docs/core-concept/transform.md | 1 + .../flink/cdc/common/utils/DateTimeUtils.java | 90 ++ .../functions/SystemFunctionUtils.java | 168 ++-- .../cdc/runtime/parser/JaninoCompiler.java | 97 ++- .../cdc/runtime/parser/TransformParser.java | 4 +- .../metadata/TransformSqlOperatorTable.java | 20 + .../metadata/TransformSqlReturnTypes.java | 3 + .../runtime/typeutils/DataTypeConverter.java | 23 +- .../transform/PostTransformOperatorTest.java | 797 +++++++++++++++++- .../runtime/parser/TransformParserTest.java | 129 ++- 11 files changed, 1233 insertions(+), 100 deletions(-) diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index ec88aeadf..787edc7c2 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -157,6 +157,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). | | NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. | | DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. | +| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. | | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. | diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 388b0ff46..b04e1d763 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -157,6 +157,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). | | NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. | | DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. | +| TIMESTAMPADD(timeintervalunit, interval, timepoint) | timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of timepoint2 after timepoint added interval. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. | | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. | diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java index 1fb080de9..bb01e4a8e 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java @@ -25,6 +25,7 @@ import java.text.SimpleDateFormat; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.Calendar; import java.util.Date; import java.util.TimeZone; @@ -198,4 +199,93 @@ public class DateTimeUtils { Date dateTime = new Date(ts); return formatter.format(dateTime); } + + // -------------------------------------------------------------------------------------------- + // Compare + // -------------------------------------------------------------------------------------------- + + public static Integer timestampDiff( + String timeIntervalUnit, + long fromDate, + String fromTimezone, + long toDate, + String toTimezone) { + Calendar from = Calendar.getInstance(TimeZone.getTimeZone(fromTimezone)); + from.setTime(new Date(fromDate)); + Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toTimezone)); + to.setTime(new Date(toDate)); + long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000; + switch (timeIntervalUnit) { + case "SECOND": + if (second > Integer.MAX_VALUE) { + return null; + } + return (int) second; + case "MINUTE": + if (second > Integer.MAX_VALUE) { + return null; + } + return (int) second / 60; + case "HOUR": + if (second > Integer.MAX_VALUE) { + return null; + } + return (int) second / 3600; + case "DAY": + if (second > Integer.MAX_VALUE) { + return null; + } + return (int) second / (24 * 3600); + case "MONTH": + return to.get(Calendar.YEAR) * 12 + + to.get(Calendar.MONTH) + - (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONTH)); + case "YEAR": + return to.get(Calendar.YEAR) - from.get(Calendar.YEAR); + default: + throw new RuntimeException( + String.format( + "Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR", + timeIntervalUnit)); + } + } + + // -------------------------------------------------------------------------------------------- + // Add + // -------------------------------------------------------------------------------------------- + + public static long timestampAdd( + String timeIntervalUnit, int interval, long timePoint, String timezone) { + Calendar calendar = Calendar.getInstance(); + calendar.setTimeZone(TimeZone.getTimeZone(timezone)); + calendar.setTime(new Date(timePoint)); + int field; + switch (timeIntervalUnit) { + case "SECOND": + field = Calendar.SECOND; + break; + case "MINUTE": + field = Calendar.MINUTE; + break; + case "HOUR": + field = Calendar.HOUR; + break; + case "DAY": + field = Calendar.DATE; + break; + case "MONTH": + field = Calendar.MONTH; + break; + case "YEAR": + field = Calendar.YEAR; + break; + default: + throw new RuntimeException( + String.format( + "Unsupported timestamp interval unit %s. Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR", + timeIntervalUnit)); + } + calendar.add(field, interval); + return calendar.getTimeInMillis(); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index 0a062d3e8..7fc7482e6 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -35,8 +35,6 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Arrays; -import java.util.Calendar; -import java.util.Date; import java.util.TimeZone; import java.util.UUID; import java.util.regex.Matcher; @@ -130,83 +128,127 @@ public class SystemFunctionUtils { } } - public static int timestampDiff( - String symbol, + // Be compatible with the existing definition of Function TIMESTAMP_DIFF + public static Integer timestampDiff( + String timeIntervalUnit, LocalZonedTimestampData fromTimestamp, - LocalZonedTimestampData toTimestamp) { - return timestampDiff( - symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getEpochMillisecond()); - } - - public static int timestampDiff( - String symbol, TimestampData fromTimestamp, TimestampData toTimestamp) { - return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); - } - - public static int timestampDiff( - String symbol, TimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) { - return timestampDiff( - symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond()); + LocalZonedTimestampData toTimestamp, + String timezone) { + if (fromTimestamp == null || toTimestamp == null) { + return null; + } + return DateTimeUtils.timestampDiff( + timeIntervalUnit, + fromTimestamp.getEpochMillisecond(), + timezone, + toTimestamp.getEpochMillisecond(), + timezone); + } + + // Be compatible with the existing definition of Function TIMESTAMP_DIFF + public static Integer timestampDiff( + String timeIntervalUnit, + TimestampData fromTimestamp, + TimestampData toTimestamp, + String timezone) { + if (fromTimestamp == null || toTimestamp == null) { + return null; + } + return DateTimeUtils.timestampDiff( + timeIntervalUnit, + fromTimestamp.getMillisecond(), + "UTC", + toTimestamp.getMillisecond(), + "UTC"); + } + + // Be compatible with the existing definition of Function TIMESTAMP_DIFF + public static Integer timestampDiff( + String timeIntervalUnit, + TimestampData fromTimestamp, + LocalZonedTimestampData toTimestamp, + String timezone) { + if (fromTimestamp == null || toTimestamp == null) { + return null; + } + return DateTimeUtils.timestampDiff( + timeIntervalUnit, + fromTimestamp.getMillisecond(), + "UTC", + toTimestamp.getEpochMillisecond(), + timezone); } - public static int timestampDiff( - String symbol, LocalZonedTimestampData fromTimestamp, TimestampData toTimestamp) { - return timestampDiff( - symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond()); + // Be compatible with the existing definition of Function TIMESTAMP_DIFF + public static Integer timestampDiff( + String timeIntervalUnit, + LocalZonedTimestampData fromTimestamp, + TimestampData toTimestamp, + String timezone) { + if (fromTimestamp == null || toTimestamp == null) { + return null; + } + return DateTimeUtils.timestampDiff( + timeIntervalUnit, + fromTimestamp.getEpochMillisecond(), + timezone, + toTimestamp.getMillisecond(), + "UTC"); } - public static int timestampDiff( - String symbol, ZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) { - return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); + public static Integer timestampdiff( + String timeIntervalUnit, + LocalZonedTimestampData fromTimestamp, + LocalZonedTimestampData toTimestamp, + String timezone) { + return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone); } - public static int timestampDiff( - String symbol, LocalZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) { - return timestampDiff( - symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getMillisecond()); + public static Integer timestampdiff( + String timeIntervalUnit, + TimestampData fromTimestamp, + TimestampData toTimestamp, + String timezone) { + return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone); } - public static int timestampDiff( - String symbol, ZonedTimestampData fromTimestamp, LocalZonedTimestampData toTimestamp) { - return timestampDiff( - symbol, fromTimestamp.getMillisecond(), toTimestamp.getEpochMillisecond()); + public static Integer timestampdiff( + String timeIntervalUnit, + TimestampData fromTimestamp, + LocalZonedTimestampData toTimestamp, + String timezone) { + return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone); } - public static int timestampDiff( - String symbol, TimestampData fromTimestamp, ZonedTimestampData toTimestamp) { - return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); + public static Integer timestampdiff( + String timeIntervalUnit, + LocalZonedTimestampData fromTimestamp, + TimestampData toTimestamp, + String timezone) { + return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, timezone); } - public static int timestampDiff( - String symbol, ZonedTimestampData fromTimestamp, TimestampData toTimestamp) { - return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond()); + public static LocalZonedTimestampData timestampadd( + String timeIntervalUnit, + Integer interval, + LocalZonedTimestampData timePoint, + String timezone) { + if (interval == null || timePoint == null) { + return null; + } + return LocalZonedTimestampData.fromEpochMillis( + DateTimeUtils.timestampAdd( + timeIntervalUnit, interval, timePoint.getEpochMillisecond(), timezone)); } - public static int timestampDiff(String symbol, long fromDate, long toDate) { - Calendar from = Calendar.getInstance(); - from.setTime(new Date(fromDate)); - Calendar to = Calendar.getInstance(); - to.setTime(new Date(toDate)); - Long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000; - switch (symbol) { - case "SECOND": - return second.intValue(); - case "MINUTE": - return second.intValue() / 60; - case "HOUR": - return second.intValue() / 3600; - case "DAY": - return second.intValue() / (24 * 3600); - case "MONTH": - return to.get(Calendar.YEAR) * 12 - + to.get(Calendar.MONDAY) - - (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONDAY)); - case "YEAR": - return to.get(Calendar.YEAR) - from.get(Calendar.YEAR); - default: - LOG.error("Unsupported timestamp diff: {}", symbol); - throw new RuntimeException("Unsupported timestamp diff: " + symbol); + public static TimestampData timestampadd( + String timeIntervalUnit, Integer interval, TimestampData timePoint, String timezone) { + if (interval == null || timePoint == null) { + return null; } + return TimestampData.fromMillis( + DateTimeUtils.timestampAdd( + timeIntervalUnit, interval, timePoint.getMillisecond(), "UTC")); } public static boolean betweenAsymmetric(String value, String minValue, String maxValue) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 14cb79026..d9f644b0b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -68,7 +68,13 @@ public class JaninoCompiler { Arrays.asList("DATE_FORMAT"); private static final List TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS = - Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME"); + Arrays.asList( + "TO_DATE", + "TO_TIMESTAMP", + "FROM_UNIXTIME", + "TIMESTAMPADD", + "TIMESTAMPDIFF", + "TIMESTAMP_DIFF"); public static final String DEFAULT_EPOCH_TIME = "__epoch_time__"; public static final String DEFAULT_TIME_ZONE = "__time_zone__"; @@ -123,13 +129,14 @@ public class JaninoCompiler { private static Java.Rvalue translateSqlIdentifier(SqlIdentifier sqlIdentifier) { String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1); - if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName)) { + if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) { return generateTimezoneFreeTemporalFunctionOperation(columnName); - } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName)) { + } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) { return generateTimezoneRequiredTemporalFunctionOperation(columnName); - } else if (TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) { + } else if (TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName.toUpperCase())) { return generateTimezoneFreeTemporalConversionFunctionOperation(columnName); - } else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName)) { + } else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains( + columnName.toUpperCase())) { return generateTimezoneRequiredTemporalConversionFunctionOperation(columnName); } else { return new Java.AmbiguousName(Location.NOWHERE, new String[] {columnName}); @@ -165,14 +172,15 @@ public class JaninoCompiler { for (SqlNode sqlNode : operandList) { translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors); } - if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(sqlBasicCall.getOperator().getName())) { + if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains( + sqlBasicCall.getOperator().getName().toUpperCase())) { atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME})); } else if (TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains( - sqlBasicCall.getOperator().getName())) { + sqlBasicCall.getOperator().getName().toUpperCase())) { atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_EPOCH_TIME})); atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE})); } else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains( - sqlBasicCall.getOperator().getName())) { + sqlBasicCall.getOperator().getName().toUpperCase())) { atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] {DEFAULT_TIME_ZONE})); } return sqlBasicCallToJaninoRvalue( @@ -275,6 +283,10 @@ public class JaninoCompiler { return generateCompareOperation(sqlBasicCall, atoms); case CAST: return generateCastOperation(sqlBasicCall, atoms); + case TIMESTAMP_DIFF: + return generateTimestampDiffOperation(sqlBasicCall, atoms); + case TIMESTAMP_ADD: + return generateTimestampAddOperation(sqlBasicCall, atoms); case OTHER: return generateOtherOperation(sqlBasicCall, atoms); default: @@ -341,6 +353,75 @@ public class JaninoCompiler { Location.NOWHERE, null, StringUtils.convertToCamelCase(compareMethodName), atoms); } + private static Java.Rvalue generateTimestampDiffOperation( + SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + if (atoms.length != 4) { + throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); + } + String timeIntervalUnit = atoms[0].toString().toUpperCase(); + switch (timeIntervalUnit) { + case "\"SECOND\"": + case "\"MINUTE\"": + case "\"HOUR\"": + case "\"DAY\"": + case "\"MONTH\"": + case "\"YEAR\"": + break; + default: + throw new ParseException( + "Unsupported time interval unit in timestamp diff function: " + + timeIntervalUnit); + } + List timestampDiffFunctionParam = new ArrayList<>(); + timestampDiffFunctionParam.add( + new Java.AmbiguousName(Location.NOWHERE, new String[] {timeIntervalUnit})); + timestampDiffFunctionParam.add(atoms[1]); + timestampDiffFunctionParam.add(atoms[2]); + timestampDiffFunctionParam.add(atoms[3]); + return new Java.MethodInvocation( + Location.NOWHERE, + null, + StringUtils.convertToCamelCase(sqlBasicCall.getOperator().getName()), + timestampDiffFunctionParam.toArray(new Java.Rvalue[0])); + } + + private static Java.Rvalue generateTimestampAddOperation( + SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + if (atoms.length != 4) { + throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); + } + String timeIntervalUnit = atoms[0].toString().toUpperCase(); + switch (timeIntervalUnit) { + case "\"SECOND\"": + case "\"MINUTE\"": + case "\"HOUR\"": + case "\"DAY\"": + case "\"MONTH\"": + case "\"YEAR\"": + break; + default: + throw new ParseException( + "Unsupported time interval unit in timestamp add function: " + + timeIntervalUnit); + } + List timestampDiffFunctionParam = new ArrayList<>(); + timestampDiffFunctionParam.add( + new Java.AmbiguousName(Location.NOWHERE, new String[] {timeIntervalUnit})); + timestampDiffFunctionParam.add(atoms[1]); + timestampDiffFunctionParam.add(atoms[2]); + timestampDiffFunctionParam.add(atoms[3]); + return new Java.MethodInvocation( + Location.NOWHERE, + null, + StringUtils.convertToCamelCase(sqlBasicCall.getOperator().getName()), + timestampDiffFunctionParam.toArray(new Java.Rvalue[0])); + } + + private static Java.Rvalue generateCharLengthOperation(Java.Rvalue[] atoms) { + return new Java.MethodInvocation( + Location.NOWHERE, null, StringUtils.convertToCamelCase("CHAR_LENGTH"), atoms); + } + private static Java.Rvalue generateOtherOperation( SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { if (sqlBasicCall.getOperator().getName().equals("||")) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 5c8417633..ee29bbf2c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -165,7 +165,9 @@ public class TransformParser { SqlOperatorTables.chain(transformSqlOperatorTable, udfOperatorTable), calciteCatalogReader, factory, - SqlValidator.Config.DEFAULT.withIdentifierExpansion(true)); + SqlValidator.Config.DEFAULT + .withIdentifierExpansion(true) + .withConformance(SqlConformanceEnum.MYSQL_5)); SqlNode validateSqlNode = validator.validate(sqlNode); SqlToRelConverter sqlToRelConverter = new SqlToRelConverter( diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index d47db49f8..bfeba7ab3 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -254,6 +254,26 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable { OperandTypes.family( SqlTypeFamily.ANY, SqlTypeFamily.TIMESTAMP, SqlTypeFamily.TIMESTAMP), SqlFunctionCategory.TIMEDATE); + public static final SqlFunction TIMESTAMPDIFF = + new SqlFunction( + "TIMESTAMPDIFF", + SqlKind.OTHER_FUNCTION, + ReturnTypes.cascade( + ReturnTypes.explicit(SqlTypeName.INTEGER), + SqlTypeTransforms.FORCE_NULLABLE), + null, + OperandTypes.family( + SqlTypeFamily.ANY, SqlTypeFamily.TIMESTAMP, SqlTypeFamily.TIMESTAMP), + SqlFunctionCategory.TIMEDATE); + public static final SqlFunction TIMESTAMPADD = + new SqlFunction( + "TIMESTAMPADD", + SqlKind.OTHER_FUNCTION, + TransformSqlReturnTypes.ARG2_TIMESTAMP_FORCE_NULLABLE, + null, + OperandTypes.family( + SqlTypeFamily.ANY, SqlTypeFamily.INTEGER, SqlTypeFamily.TIMESTAMP), + SqlFunctionCategory.TIMEDATE); public static final SqlFunction TO_DATE = new SqlFunction( "TO_DATE", diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlReturnTypes.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlReturnTypes.java index 32831d20b..fb647fcd6 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlReturnTypes.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlReturnTypes.java @@ -188,4 +188,7 @@ public class TransformSqlReturnTypes { return opBinding.getTypeFactory().leastRestrictive(types); } } + + public static final SqlReturnTypeInference ARG2_TIMESTAMP_FORCE_NULLABLE = + new OrdinalReturnTypeInference(2) {}; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java index cd2542e7e..4d6dffcaf 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.data.GenericMapData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.MapData; import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.types.ArrayType; @@ -98,6 +99,8 @@ public class DataTypeConverter { return Integer.class; case TIMESTAMP_WITHOUT_TIME_ZONE: return TimestampData.class; + case TIMESTAMP_WITH_TIME_ZONE: + return ZonedTimestampData.class; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return LocalZonedTimestampData.class; case FLOAT: @@ -313,9 +316,8 @@ public class DataTypeConverter { return typeFactory.createSqlType( SqlTypeName.TIMESTAMP, timestampType.getPrecision()); case TIMESTAMP_WITH_TIME_ZONE: - ZonedTimestampType zonedTimestampType = (ZonedTimestampType) dataType; - return typeFactory.createSqlType( - SqlTypeName.TIMESTAMP, zonedTimestampType.getPrecision()); + // TODO: Bump Calcite to support its TIMESTAMP_TZ type via #FLINK-37123 + throw new UnsupportedOperationException("Unsupported type: TIMESTAMP_TZ"); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) dataType; @@ -436,6 +438,8 @@ public class DataTypeConverter { return convertToTime(value); case TIMESTAMP_WITHOUT_TIME_ZONE: return convertToTimestamp(value); + case TIMESTAMP_WITH_TIME_ZONE: + return convertToZonedTimestampData(value); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return convertToLocalTimeZoneTimestamp(value); case FLOAT: @@ -482,6 +486,8 @@ public class DataTypeConverter { return convertToTime(value); case TIMESTAMP_WITHOUT_TIME_ZONE: return convertToTimestamp(value); + case TIMESTAMP_WITH_TIME_ZONE: + return convertToZonedTimestampData(value); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return convertToLocalTimeZoneTimestamp(value); case FLOAT: @@ -777,6 +783,17 @@ public class DataTypeConverter { + obj.getClass().getName()); } + private static Object convertToZonedTimestampData(Object obj) { + if (obj instanceof ZonedTimestampData) { + return obj; + } + throw new IllegalArgumentException( + "Unable to convert to TIMESTAMP_TZ from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + private static Object convertToLocalTimeZoneTimestamp(Object obj) { if (obj instanceof String) { String str = (String) obj; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index f744967ba..11ae3ee5a 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.operators.transform; import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -158,6 +159,57 @@ public class PostTransformOperatorTest { .physicalColumn("minute_diff", DataTypes.INT()) .physicalColumn("hour_diff", DataTypes.INT()) .physicalColumn("day_diff", DataTypes.INT()) + .physicalColumn("month_diff", DataTypes.INT()) + .physicalColumn("year_diff", DataTypes.INT()) + .primaryKey("col1") + .build(); + + private static final TableId TIMESTAMPDIFF_DATA_TABLEID = + TableId.tableId("my_company", "my_branch", "timestampdiff_data_table"); + private static final Schema TIMESTAMPDIFF_DATA_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("time_interval_unit", DataTypes.STRING()) + .physicalColumn("start_timestamp", DataTypes.TIMESTAMP()) + .physicalColumn("end_timestamp", DataTypes.TIMESTAMP()) + .physicalColumn("start_timestamp_ltz", DataTypes.TIMESTAMP_LTZ()) + .physicalColumn("end_timestamp_ltz", DataTypes.TIMESTAMP_LTZ()) + .primaryKey("col1") + .build(); + private static final Schema EXPECTED_TIMESTAMPDIFF_DATA_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING().notNull()) + .physicalColumn("time_interval_unit", DataTypes.STRING()) + .physicalColumn("timestamp_timestamp", DataTypes.INT()) + .physicalColumn("timestamp_timestamp_ltz", DataTypes.INT()) + .physicalColumn("timestamp_ltz_timestamp", DataTypes.INT()) + .physicalColumn("timestamp_ltz_timestamp_ltz", DataTypes.INT()) + .primaryKey("col1") + .build(); + + private static final TableId TIMESTAMPADD_DATA_TABLEID = + TableId.tableId("my_company", "my_branch", "timestampadd_data_table"); + private static final Schema TIMESTAMPADD_DATA_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING().notNull()) + .physicalColumn("time_interval_unit", DataTypes.STRING()) + .physicalColumn("interval_value", DataTypes.INT()) + .physicalColumn("time_point_timestamp", DataTypes.TIMESTAMP(0)) + .physicalColumn("time_point_timestamp_ltz", DataTypes.TIMESTAMP_LTZ(0)) + .primaryKey("col1") + .build(); + + private static final TableId TIMESTAMPADD_TABLEID = + TableId.tableId("my_company", "my_branch", "timestampadd_table"); + private static final Schema TIMESTAMPADD_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING().notNull()) + .physicalColumn("second_add", DataTypes.STRING()) + .physicalColumn("minute_add", DataTypes.STRING()) + .physicalColumn("hour_add", DataTypes.STRING()) + .physicalColumn("day_add", DataTypes.STRING()) + .physicalColumn("month_add", DataTypes.STRING()) + .physicalColumn("year_add", DataTypes.STRING()) .primaryKey("col1") .build(); @@ -205,7 +257,7 @@ public class PostTransformOperatorTest { private static final Schema TIMEZONE_SCHEMA = Schema.newBuilder() .physicalColumn("col1", DataTypes.STRING().notNull()) - .physicalColumn("datetime", DataTypes.STRING()) + .physicalColumn("datetime_value", DataTypes.STRING()) .primaryKey("col1") .build(); @@ -1184,22 +1236,26 @@ public class PostTransformOperatorTest { } @Test - void testTimestampDiffTransform() throws Exception { + void testTimestampdiffTransform() throws Exception { PostTransformOperator transform = PostTransformOperator.newBuilder() .addTransform( TIMESTAMPDIFF_TABLEID.identifier(), - "col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff," - + " TIMESTAMP_DIFF('MINUTE', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as minute_diff," - + " TIMESTAMP_DIFF('HOUR', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as hour_diff," - + " TIMESTAMP_DIFF('DAY', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as day_diff", + "col1, TIMESTAMPDIFF(SECOND, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff," + + " TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as minute_diff," + + " TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as hour_diff," + + " TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as day_diff," + + " TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as month_diff," + + " TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as year_diff", "col1='1'") .addTransform( TIMESTAMPDIFF_TABLEID.identifier(), - "col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as second_diff," - + " TIMESTAMP_DIFF('MINUTE', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as minute_diff," - + " TIMESTAMP_DIFF('HOUR', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as hour_diff," - + " TIMESTAMP_DIFF('DAY', LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as day_diff", + "col1, TIMESTAMPDIFF(SECOND, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as second_diff," + + " TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as minute_diff," + + " TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as hour_diff," + + " TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as day_diff," + + " TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as month_diff," + + " TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as year_diff", "col1='2'") .addTimezone("Asia/Shanghai") .build(); @@ -1218,12 +1274,14 @@ public class PostTransformOperatorTest { DataChangeEvent.insertEvent( TIMESTAMPDIFF_TABLEID, recordDataGenerator.generate( - new Object[] {new BinaryStringData("1"), null, null, null, null})); + new Object[] { + new BinaryStringData("1"), null, null, null, null, null, null + })); DataChangeEvent insertEventExpect = DataChangeEvent.insertEvent( TIMESTAMPDIFF_TABLEID, recordDataGenerator.generate( - new Object[] {new BinaryStringData("1"), 0, 0, 0, 0})); + new Object[] {new BinaryStringData("1"), 0, 0, 0, 0, 0, 0})); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) @@ -1239,12 +1297,448 @@ public class PostTransformOperatorTest { DataChangeEvent.insertEvent( TIMESTAMPDIFF_TABLEID, recordDataGenerator.generate( - new Object[] {new BinaryStringData("2"), null, null, null, null})); + new Object[] { + new BinaryStringData("2"), null, null, null, null, null, null + })); DataChangeEvent insertEventExpect2 = DataChangeEvent.insertEvent( TIMESTAMPDIFF_TABLEID, recordDataGenerator.generate( - new Object[] {new BinaryStringData("2"), 0, 0, 0, 0})); + new Object[] {new BinaryStringData("2"), 0, 0, 0, 0, 0, 0})); + + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + } + + @Test + void testTimestampdiffTransformData() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + TIMESTAMPDIFF_DATA_TABLEID.identifier(), + "col1, time_interval_unit," + + " TIMESTAMPDIFF(SECOND, start_timestamp, end_timestamp) as timestamp_timestamp," + + " TIMESTAMPDIFF(SECOND, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," + + " TIMESTAMPDIFF(SECOND, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," + + " TIMESTAMPDIFF(SECOND, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", + "time_interval_unit='SECOND'") + .addTransform( + TIMESTAMPDIFF_DATA_TABLEID.identifier(), + "col1, time_interval_unit," + + " TIMESTAMPDIFF(MINUTE, start_timestamp, end_timestamp) as timestamp_timestamp," + + " TIMESTAMPDIFF(MINUTE, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," + + " TIMESTAMPDIFF(MINUTE, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," + + " TIMESTAMPDIFF(MINUTE, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", + "time_interval_unit='MINUTE'") + .addTransform( + TIMESTAMPDIFF_DATA_TABLEID.identifier(), + "col1, time_interval_unit," + + " TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp) as timestamp_timestamp," + + " TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," + + " TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," + + " TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", + "time_interval_unit='HOUR'") + .addTransform( + TIMESTAMPDIFF_DATA_TABLEID.identifier(), + "col1, time_interval_unit," + + " TIMESTAMPDIFF(DAY, start_timestamp, end_timestamp) as timestamp_timestamp," + + " TIMESTAMPDIFF(DAY, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," + + " TIMESTAMPDIFF(DAY, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," + + " TIMESTAMPDIFF(DAY, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", + "time_interval_unit='DAY'") + .addTransform( + TIMESTAMPDIFF_DATA_TABLEID.identifier(), + "col1, time_interval_unit," + + " TIMESTAMPDIFF(MONTH, start_timestamp, end_timestamp) as timestamp_timestamp," + + " TIMESTAMPDIFF(MONTH, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," + + " TIMESTAMPDIFF(MONTH, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," + + " TIMESTAMPDIFF(MONTH, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", + "time_interval_unit='MONTH'") + .addTransform( + TIMESTAMPDIFF_DATA_TABLEID.identifier(), + "col1, time_interval_unit," + + " TIMESTAMPDIFF(YEAR, start_timestamp, end_timestamp) as timestamp_timestamp," + + " TIMESTAMPDIFF(YEAR, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," + + " TIMESTAMPDIFF(YEAR, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," + + " TIMESTAMPDIFF(YEAR, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", + "time_interval_unit='YEAR'") + .addTimezone("UTC") + .build(); + RegularEventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(TIMESTAMPDIFF_DATA_TABLEID, TIMESTAMPDIFF_DATA_SCHEMA); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + TIMESTAMPDIFF_DATA_TABLEID, + EXPECTED_TIMESTAMPDIFF_DATA_SCHEMA))); + + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) TIMESTAMPDIFF_DATA_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator recordDataGeneratorExpect = + new BinaryRecordDataGenerator( + ((RowType) EXPECTED_TIMESTAMPDIFF_DATA_SCHEMA.toRowDataType())); + // 1970-01-01 00:00:00 ~ 2025-01-01 00:00:00, Second: 1735689600 + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("SECOND"), + TimestampData.fromMillis(0, 0), + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0) + })); + DataChangeEvent insertEventExpect1 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGeneratorExpect.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("SECOND"), + 1735689600, + 1735689600, + 1735689600, + 1735689600 + })); + transform.processElement(new StreamRecord<>(insertEvent1)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect1)); + // 1970-01-01 00:00:00 ~ 2025-01-01 00:00:00, Minute: 28928160 + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), + new BinaryStringData("MINUTE"), + TimestampData.fromMillis(0, 0), + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0) + })); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGeneratorExpect.generate( + new Object[] { + new BinaryStringData("2"), + new BinaryStringData("MINUTE"), + 28928160, + 28928160, + 28928160, + 28928160 + })); + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + // 1970-01-01 00:00:00 ~ 2025-01-01 00:00:00, Hour: 482136 + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("3"), + new BinaryStringData("HOUR"), + TimestampData.fromMillis(0, 0), + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0) + })); + DataChangeEvent insertEventExpect3 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGeneratorExpect.generate( + new Object[] { + new BinaryStringData("3"), + new BinaryStringData("HOUR"), + 482136, + 482136, + 482136, + 482136 + })); + transform.processElement(new StreamRecord<>(insertEvent3)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect3)); + // 1970-01-01 00:00:00 ~ 2025-01-01 00:00:00, Day: 20089 + DataChangeEvent insertEvent4 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), + new BinaryStringData("DAY"), + TimestampData.fromMillis(0, 0), + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0) + })); + DataChangeEvent insertEventExpect4 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGeneratorExpect.generate( + new Object[] { + new BinaryStringData("4"), + new BinaryStringData("DAY"), + 20089, + 20089, + 20089, + 20089 + })); + transform.processElement(new StreamRecord<>(insertEvent4)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect4)); + // 1970-01-01 00:00:00 ~ 2025-01-01 00:00:00, Month: 660 + DataChangeEvent insertEvent5 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("5"), + new BinaryStringData("MONTH"), + TimestampData.fromMillis(0, 0), + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0) + })); + DataChangeEvent insertEventExpect5 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGeneratorExpect.generate( + new Object[] { + new BinaryStringData("5"), + new BinaryStringData("MONTH"), + 660, + 660, + 660, + 660 + })); + transform.processElement(new StreamRecord<>(insertEvent5)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect5)); + // 1970-01-01 00:00:00 ~ 2025-01-01 00:00:00, Year: 660 + DataChangeEvent insertEvent6 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("6"), + new BinaryStringData("YEAR"), + TimestampData.fromMillis(0, 0), + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0) + })); + DataChangeEvent insertEventExpect6 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGeneratorExpect.generate( + new Object[] { + new BinaryStringData("6"), + new BinaryStringData("YEAR"), + 55, + 55, + 55, + 55 + })); + + transform.processElement(new StreamRecord<>(insertEvent6)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect6)); + // 1970-01-01 00:00:00 ~ 9999-12-31 23:59:59, Year: 8029 + DataChangeEvent insertEvent7 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("7"), + new BinaryStringData("YEAR"), + TimestampData.fromMillis(0, 0), + TimestampData.fromMillis(253402271999000L, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(253402271999000L, 0) + })); + DataChangeEvent insertEventExpect7 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGeneratorExpect.generate( + new Object[] { + new BinaryStringData("7"), + new BinaryStringData("YEAR"), + 8029, + 8029, + 8029, + 8029 + })); + transform.processElement(new StreamRecord<>(insertEvent7)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect7)); + // 1970-01-01 00:00:00 ~ 9999-12-31 23:59:59, Second: null ( > Integer.MAX_VALUE) + DataChangeEvent insertEvent8 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("8"), + new BinaryStringData("SECOND"), + TimestampData.fromMillis(0, 0), + TimestampData.fromMillis(253402271999000L, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(253402271999000L, 0) + })); + DataChangeEvent insertEventExpect8 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGeneratorExpect.generate( + new Object[] { + new BinaryStringData("8"), + new BinaryStringData("SECOND"), + null, + null, + null, + null + })); + transform.processElement(new StreamRecord<>(insertEvent8)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect8)); + // 1970-01-01 00:00:00 ~ null, Year: null + DataChangeEvent insertEvent9 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("9"), + new BinaryStringData("YEAR"), + TimestampData.fromMillis(0, 0), + null, + LocalZonedTimestampData.fromEpochMillis(0, 0), + null + })); + DataChangeEvent insertEventExpect9 = + DataChangeEvent.insertEvent( + TIMESTAMPDIFF_DATA_TABLEID, + recordDataGeneratorExpect.generate( + new Object[] { + new BinaryStringData("9"), + new BinaryStringData("YEAR"), + null, + null, + null, + null + })); + transform.processElement(new StreamRecord<>(insertEvent9)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect9)); + } + + @Test + void testTimestampaddTransform() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + TIMESTAMPADD_TABLEID.identifier(), + "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add," + + " DATE_FORMAT(TIMESTAMPADD(MINUTE, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add," + + " DATE_FORMAT(TIMESTAMPADD(HOUR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add," + + " DATE_FORMAT(TIMESTAMPADD(DAY, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add," + + " DATE_FORMAT(TIMESTAMPADD(MONTH, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add," + + " DATE_FORMAT(TIMESTAMPADD(YEAR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add", + "col1='1'") + .addTransform( + TIMESTAMPADD_TABLEID.identifier(), + "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add," + + " DATE_FORMAT(TIMESTAMPADD(MINUTE, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add," + + " DATE_FORMAT(TIMESTAMPADD(HOUR, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add," + + " DATE_FORMAT(TIMESTAMPADD(DAY, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add," + + " DATE_FORMAT(TIMESTAMPADD(MONTH, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add," + + " DATE_FORMAT(TIMESTAMPADD(YEAR, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add", + "col1='2'") + .addTimezone("UTC") + .build(); + RegularEventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(TIMESTAMPADD_TABLEID, TIMESTAMPADD_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) TIMESTAMPADD_SCHEMA.toRowDataType())); + // Insert + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + TIMESTAMPADD_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), null, null, null, null, null, null + })); + DataChangeEvent insertEventExpect = + DataChangeEvent.insertEvent( + TIMESTAMPADD_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("2024-10-01 00:00:01"), + new BinaryStringData("2024-10-01 00:01:00"), + new BinaryStringData("2024-10-01 01:00:00"), + new BinaryStringData("2024-10-02 00:00:00"), + new BinaryStringData("2024-11-01 00:00:00"), + new BinaryStringData("2025-10-01 00:00:00") + })); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent(TIMESTAMPADD_TABLEID, TIMESTAMPADD_SCHEMA))); + transform.processElement(new StreamRecord<>(insertEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect)); + + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), null, null, null, null, null, null + })); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), + new BinaryStringData("2024-09-30 23:59:59"), + new BinaryStringData("2024-09-30 23:59:00"), + new BinaryStringData("2024-09-30 23:00:00"), + new BinaryStringData("2024-09-30 00:00:00"), + new BinaryStringData("2024-09-01 00:00:00"), + new BinaryStringData("2023-10-01 00:00:00") + })); transform.processElement(new StreamRecord<>(insertEvent2)); Assertions.assertThat( @@ -1253,13 +1747,270 @@ public class PostTransformOperatorTest { transformFunctionEventEventOperatorTestHarness.close(); } + @Test + void testTimestampaddTransformData() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + TIMESTAMPADD_DATA_TABLEID.identifier(), + "col1, time_interval_unit, interval_value," + + " TIMESTAMPADD(SECOND, interval_value, time_point_timestamp) as time_point_timestamp," + + " TIMESTAMPADD(SECOND, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", + "time_interval_unit='SECOND'") + .addTransform( + TIMESTAMPADD_DATA_TABLEID.identifier(), + "col1, time_interval_unit, interval_value," + + " TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp) as time_point_timestamp," + + " TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", + "time_interval_unit='MINUTE'") + .addTransform( + TIMESTAMPADD_DATA_TABLEID.identifier(), + "col1, time_interval_unit, interval_value," + + " TIMESTAMPADD(HOUR, interval_value, time_point_timestamp) as time_point_timestamp," + + " TIMESTAMPADD(HOUR, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", + "time_interval_unit='HOUR'") + .addTransform( + TIMESTAMPADD_DATA_TABLEID.identifier(), + "col1, time_interval_unit, interval_value," + + " TIMESTAMPADD(DAY, interval_value, time_point_timestamp) as time_point_timestamp," + + " TIMESTAMPADD(DAY, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", + "time_interval_unit='DAY'") + .addTransform( + TIMESTAMPADD_DATA_TABLEID.identifier(), + "col1, time_interval_unit, interval_value," + + " TIMESTAMPADD(MONTH, interval_value, time_point_timestamp) as time_point_timestamp," + + " TIMESTAMPADD(MONTH, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", + "time_interval_unit='MONTH'") + .addTransform( + TIMESTAMPADD_DATA_TABLEID.identifier(), + "col1, time_interval_unit, interval_value," + + " TIMESTAMPADD(YEAR, interval_value, time_point_timestamp) as time_point_timestamp," + + " TIMESTAMPADD(YEAR, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", + "time_interval_unit='YEAR'") + .addTimezone("UTC") + .build(); + RegularEventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(TIMESTAMPADD_DATA_TABLEID, TIMESTAMPADD_DATA_SCHEMA); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + TIMESTAMPADD_DATA_TABLEID, TIMESTAMPADD_DATA_SCHEMA))); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) TIMESTAMPADD_DATA_SCHEMA.toRowDataType())); + + // 1970-01-01 00:00:00 + Second: 1735689600 = 2025-01-01 00:00:00 + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("SECOND"), + 1735689600, + TimestampData.fromMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + })); + DataChangeEvent insertEventExpect1 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("SECOND"), + 1735689600, + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0), + })); + transform.processElement(new StreamRecord<>(insertEvent1)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect1)); + + // 1970-01-01 00:00:00 + Minute: 28928160 = 2025-01-01 00:00:00 + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), + new BinaryStringData("MINUTE"), + 28928160, + TimestampData.fromMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + })); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), + new BinaryStringData("MINUTE"), + 28928160, + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0), + })); + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + + // 1970-01-01 00:00:00 + Hour: 482136 = 2025-01-01 00:00:00 + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("3"), + new BinaryStringData("HOUR"), + 482136, + TimestampData.fromMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + })); + DataChangeEvent insertEventExpect3 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("3"), + new BinaryStringData("HOUR"), + 482136, + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0), + })); + transform.processElement(new StreamRecord<>(insertEvent3)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect3)); + + // 1970-01-01 00:00:00 + Day: 20089 = 2025-01-01 00:00:00 + DataChangeEvent insertEvent4 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), + new BinaryStringData("DAY"), + 20089, + TimestampData.fromMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + })); + DataChangeEvent insertEventExpect4 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), + new BinaryStringData("DAY"), + 20089, + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0), + })); + transform.processElement(new StreamRecord<>(insertEvent4)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect4)); + + // 1970-01-01 00:00:00 + Month: 660 = 2025-01-01 00:00:00 + DataChangeEvent insertEvent5 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("5"), + new BinaryStringData("MONTH"), + 660, + TimestampData.fromMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + })); + DataChangeEvent insertEventExpect5 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("5"), + new BinaryStringData("MONTH"), + 660, + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0), + })); + transform.processElement(new StreamRecord<>(insertEvent5)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect5)); + + // 1970-01-01 00:00:00 + Year: 55 = 2025-01-01 00:00:00 + DataChangeEvent insertEvent6 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("6"), + new BinaryStringData("YEAR"), + 55, + TimestampData.fromMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + })); + DataChangeEvent insertEventExpect6 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("6"), + new BinaryStringData("YEAR"), + 55, + TimestampData.fromMillis(1735689600000L, 0), + LocalZonedTimestampData.fromEpochMillis(1735689600000L, 0), + })); + transform.processElement(new StreamRecord<>(insertEvent6)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect6)); + + // 1970-01-01 00:00:00 + Year: null = null + DataChangeEvent insertEvent7 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("7"), + new BinaryStringData("YEAR"), + null, + TimestampData.fromMillis(0, 0), + LocalZonedTimestampData.fromEpochMillis(0, 0), + })); + DataChangeEvent insertEventExpect7 = + DataChangeEvent.insertEvent( + TIMESTAMPADD_DATA_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("7"), + new BinaryStringData("YEAR"), + null, + null, + null, + })); + transform.processElement(new StreamRecord<>(insertEvent7)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect7)); + } + @Test void testTimezoneTransform() throws Exception { PostTransformOperator transform = PostTransformOperator.newBuilder() .addTransform( TIMEZONE_TABLEID.identifier(), - "col1, DATE_FORMAT(TO_TIMESTAMP('2024-08-01 00:00:00'), 'yyyy-MM-dd HH:mm:ss') as datetime", + "col1, DATE_FORMAT(TO_TIMESTAMP('2024-08-01 00:00:00'), 'yyyy-MM-dd HH:mm:ss') as datetime_value", null) .addTimezone("UTC") .build(); @@ -1296,7 +2047,6 @@ public class PostTransformOperatorTest { Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) .isEqualTo(new StreamRecord<>(insertEventExpect)); - transformFunctionEventEventOperatorTestHarness.close(); } @Test @@ -1402,11 +2152,11 @@ public class PostTransformOperatorTest { + ",cast(castBoolean as tinyint) as castTinyint" + ",cast(castBoolean as smallint) as castSmallint" + ",cast(castBoolean as bigint) as castBigint" - + ",cast(castBoolean as float) as castFloat" - + ",cast(castBoolean as double) as castDouble" + + ",castFloat" + + ",castDouble" + ",cast(castBoolean as char) as castChar" + ",cast(castBoolean as varchar) as castVarchar" - + ",cast(castBoolean as DECIMAL(4,2)) as castDecimal" + + ",castDecimal" + ", castTimestamp", "col1 = '3'") .addTransform( @@ -1638,11 +2388,11 @@ public class PostTransformOperatorTest { new Byte("1"), new Short("1"), new Long(1), - new Float(1.0f), - new Double(1.0d), + null, + null, new BinaryStringData("true"), new BinaryStringData("true"), - DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + null, null })); transform.processElement(new StreamRecord<>(insertEvent3)); @@ -1930,7 +2680,6 @@ public class PostTransformOperatorTest { Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) .isEqualTo(new StreamRecord<>(insertEventExpect10)); - transformFunctionEventEventOperatorTestHarness.close(); } @Test @@ -2185,7 +2934,7 @@ public class PostTransformOperatorTest { testExpressionConditionTransform( "TO_TIMESTAMP('1970-01-01 00:00:00') = TO_TIMESTAMP('1970-01-01', 'yyyy-MM-dd')"); testExpressionConditionTransform( - "TIMESTAMP_DIFF('DAY', TO_TIMESTAMP('1970-01-01 00:00:00'), TO_TIMESTAMP('1970-01-02 00:00:00')) = 1"); + "TIMESTAMPDIFF(DAY, TO_TIMESTAMP('1970-01-01 00:00:00'), TO_TIMESTAMP('1970-01-02 00:00:00')) = 1"); testExpressionConditionTransform("2 between 1 and 3"); testExpressionConditionTransform("4 not between 1 and 3"); testExpressionConditionTransform("2 in (1, 2, 3)"); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index fc5dfd447..45ce576ec 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -262,7 +262,100 @@ public class TransformParserTest { testFilterExpression( "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", __time_zone__)"); testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, __time_zone__)"); - testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", "timestampDiff(\"DAY\", dt1, dt2)"); + testFilterExpression( + "TIMESTAMP_DIFF('SECOND', dt1, dt2)", + "timestampDiff(\"SECOND\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestamp_diff('second', dt1, dt2)", + "timestampDiff(\"second\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMP_DIFF('MINUTE', dt1, dt2)", + "timestampDiff(\"MINUTE\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestamp_diff('minute', dt1, dt2)", + "timestampDiff(\"minute\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMP_DIFF('HOUR', dt1, dt2)", + "timestampDiff(\"HOUR\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestamp_diff('hour', dt1, dt2)", + "timestampDiff(\"hour\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMP_DIFF('DAY', dt1, dt2)", + "timestampDiff(\"DAY\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestamp_diff('day', dt1, dt2)", + "timestampDiff(\"day\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMP_DIFF('MONTH', dt1, dt2)", + "timestampDiff(\"MONTH\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestamp_diff('month', dt1, dt2)", + "timestampDiff(\"month\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMP_DIFF('YEAR', dt1, dt2)", + "timestampDiff(\"YEAR\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestamp_diff('year', dt1, dt2)", + "timestampDiff(\"year\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMPDIFF(SECOND, dt1, dt2)", + "timestampdiff(\"SECOND\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestampdiff(second, dt1, dt2)", + "timestampdiff(\"SECOND\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMPDIFF(MINUTE, dt1, dt2)", + "timestampdiff(\"MINUTE\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestampdiff(minute, dt1, dt2)", + "timestampdiff(\"MINUTE\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMPDIFF(HOUR, dt1, dt2)", + "timestampdiff(\"HOUR\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestampdiff(hour, dt1, dt2)", + "timestampdiff(\"HOUR\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMPDIFF(DAY, dt1, dt2)", "timestampdiff(\"DAY\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestampdiff(day, dt1, dt2)", "timestampdiff(\"DAY\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMPDIFF(MONTH, dt1, dt2)", + "timestampdiff(\"MONTH\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestampdiff(month, dt1, dt2)", + "timestampdiff(\"MONTH\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMPDIFF(YEAR, dt1, dt2)", + "timestampdiff(\"YEAR\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "timestampdiff(year, dt1, dt2)", + "timestampdiff(\"YEAR\", dt1, dt2, __time_zone__)"); + testFilterExpression( + "TIMESTAMPADD(SECOND, 1, dt)", "timestampadd(\"SECOND\", 1, dt, __time_zone__)"); + testFilterExpression( + "timestampadd(second, 1, dt)", "timestampadd(\"SECOND\", 1, dt, __time_zone__)"); + testFilterExpression( + "TIMESTAMPADD(MINUTE, 1, dt)", "timestampadd(\"MINUTE\", 1, dt, __time_zone__)"); + testFilterExpression( + "timestampadd(minute, 1, dt)", "timestampadd(\"MINUTE\", 1, dt, __time_zone__)"); + testFilterExpression( + "TIMESTAMPADD(HOUR, 1, dt)", "timestampadd(\"HOUR\", 1, dt, __time_zone__)"); + testFilterExpression( + "timestampadd(hour, 1, dt)", "timestampadd(\"HOUR\", 1, dt, __time_zone__)"); + testFilterExpression( + "TIMESTAMPADD(DAY, 1, dt)", "timestampadd(\"DAY\", 1, dt, __time_zone__)"); + testFilterExpression( + "timestampadd(day, 1, dt)", "timestampadd(\"DAY\", 1, dt, __time_zone__)"); + testFilterExpression( + "TIMESTAMPADD(MONTH, 1, dt)", "timestampadd(\"MONTH\", 1, dt, __time_zone__)"); + testFilterExpression( + "timestampadd(month, 1, dt)", "timestampadd(\"MONTH\", 1, dt, __time_zone__)"); + testFilterExpression( + "TIMESTAMPADD(YEAR, 1, dt)", "timestampadd(\"YEAR\", 1, dt, __time_zone__)"); + testFilterExpression( + "timestampadd(year, 1, dt)", "timestampadd(\"YEAR\", 1, dt, __time_zone__)"); testFilterExpression("IF(a>b,a,b)", "greaterThan(a, b) ? a : b"); testFilterExpression("NULLIF(a,b)", "nullif(a, b)"); testFilterExpression("COALESCE(a,b,c)", "coalesce(a, b, c)"); @@ -323,6 +416,40 @@ public class TransformParserTest { testFilterExpression("cast(dt as TIMESTAMP)", "castToTimestamp(dt, __time_zone__)"); } + @Test + public void testTranslateFilterToJaninoExpressionError() { + Assertions.assertThatThrownBy( + () -> { + TransformParser.translateFilterExpressionToJaninoExpression( + "TIMESTAMPDIFF(SECONDS, dt1, dt2)", Collections.emptyList()); + }) + .isExactlyInstanceOf(ParseException.class) + .hasMessage("Statements can not be parsed."); + Assertions.assertThatThrownBy( + () -> { + TransformParser.translateFilterExpressionToJaninoExpression( + "TIMESTAMPDIFF(QUARTER, dt1, dt2)", Collections.emptyList()); + }) + .isExactlyInstanceOf(ParseException.class) + .hasMessage( + "Unsupported time interval unit in timestamp diff function: \"QUARTER\""); + Assertions.assertThatThrownBy( + () -> { + TransformParser.translateFilterExpressionToJaninoExpression( + "TIMESTAMPADD(SECONDS, dt1, dt2)", Collections.emptyList()); + }) + .isExactlyInstanceOf(ParseException.class) + .hasMessage("Statements can not be parsed."); + Assertions.assertThatThrownBy( + () -> { + TransformParser.translateFilterExpressionToJaninoExpression( + "TIMESTAMPADD(QUARTER, dt1, dt2)", Collections.emptyList()); + }) + .isExactlyInstanceOf(ParseException.class) + .hasMessage( + "Unsupported time interval unit in timestamp add function: \"QUARTER\""); + } + @Test public void testGenerateProjectionColumns() { List testColumns =