diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md
index 892243628..dfa90a728 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
+| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
+| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions
diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md
index b4979beca..67eeaf2d8 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
+| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
+| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
index b923107e2..1fb080de9 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
@@ -43,6 +43,16 @@ public class DateTimeUtils {
*/
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000
+ /** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
+ private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
+
+ /** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
+ private static final String TIME_FORMAT_STRING = "HH:mm:ss";
+
+ /** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */
+ private static final String TIMESTAMP_FORMAT_STRING =
+ DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
+
/**
* A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe.
* (string_format) => formatter
@@ -109,7 +119,7 @@ public class DateTimeUtils {
} catch (ParseException e) {
LOG.error(
String.format(
- "Exception when parsing datetime string '%s' in format '%s'",
+ "Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.",
dateStr, format),
e);
return Long.MIN_VALUE;
@@ -128,6 +138,56 @@ public class DateTimeUtils {
return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045;
}
+ // --------------------------------------------------------------------------------------------
+ // UNIX TIME
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
+ * "yyyy-MM-dd HH:mm:ss" format.
+ */
+ public static String formatUnixTimestamp(long unixTime, TimeZone timeZone) {
+ return formatUnixTimestamp(unixTime, TIMESTAMP_FORMAT_STRING, timeZone);
+ }
+
+ /**
+ * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
+ * given format.
+ */
+ public static String formatUnixTimestamp(long unixTime, String format, TimeZone timeZone) {
+ SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
+ formatter.setTimeZone(timeZone);
+ Date date = new Date(unixTime * 1000);
+ try {
+ return formatter.format(date);
+ } catch (Exception e) {
+ LOG.error("Exception when formatting.", e);
+ return null;
+ }
+ }
+
+ /**
+ * Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
+ * 00:00:00' UTC.
+ */
+ public static long unixTimestamp(String dateStr, TimeZone timeZone) {
+ return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, timeZone);
+ }
+
+ /**
+ * Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
+ * 00:00:00' UTC.
+ */
+ public static long unixTimestamp(String dateStr, String format, TimeZone timeZone) {
+ long ts = internalParseTimestampMillis(dateStr, format, timeZone);
+ if (ts == Long.MIN_VALUE) {
+ return Long.MIN_VALUE;
+ } else {
+ // return the seconds
+ return ts / 1000;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Format
// --------------------------------------------------------------------------------------------
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 6f6d52a3b..a7555203c 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -80,6 +80,27 @@ public class SystemFunctionUtils {
return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond());
}
+ public static String fromUnixtime(long seconds, String timezone) {
+ return DateTimeUtils.formatUnixTimestamp(seconds, TimeZone.getTimeZone(timezone));
+ }
+
+ public static String fromUnixtime(long seconds, String format, String timezone) {
+ return DateTimeUtils.formatUnixTimestamp(seconds, format, TimeZone.getTimeZone(timezone));
+ }
+
+ public static long unixTimestamp(long epochTime, String timezone) {
+ return epochTime / 1000;
+ }
+
+ public static long unixTimestamp(String dateTimeStr, long epochTime, String timezone) {
+ return DateTimeUtils.unixTimestamp(dateTimeStr, TimeZone.getTimeZone(timezone));
+ }
+
+ public static long unixTimestamp(
+ String dateTimeStr, String format, long epochTime, String timezone) {
+ return DateTimeUtils.unixTimestamp(dateTimeStr, format, TimeZone.getTimeZone(timezone));
+ }
+
public static String dateFormat(TimestampData timestamp, String format) {
return DateTimeUtils.formatTimestampMillis(
timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC"));
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 6f5b26125..f60bf968c 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -57,13 +57,18 @@ public class JaninoCompiler {
Arrays.asList("CURRENT_TIMESTAMP", "NOW");
private static final List TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
- Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE");
+ Arrays.asList(
+ "LOCALTIME",
+ "LOCALTIMESTAMP",
+ "CURRENT_TIME",
+ "CURRENT_DATE",
+ "UNIX_TIMESTAMP");
private static final List TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList("DATE_FORMAT");
private static final List TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
- Arrays.asList("TO_DATE", "TO_TIMESTAMP");
+ Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME");
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index bb2c3503d..d47db49f8 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -212,6 +212,27 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable {
return SqlSyntax.FUNCTION;
}
};
+ public static final SqlFunction UNIX_TIMESTAMP =
+ new SqlFunction(
+ "UNIX_TIMESTAMP",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BIGINT_NULLABLE,
+ null,
+ OperandTypes.or(
+ OperandTypes.NILADIC,
+ OperandTypes.family(SqlTypeFamily.CHARACTER),
+ OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
+ SqlFunctionCategory.TIMEDATE);
+ public static final SqlFunction FROM_UNIXTIME =
+ new SqlFunction(
+ "FROM_UNIXTIME",
+ SqlKind.OTHER_FUNCTION,
+ TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+ null,
+ OperandTypes.or(
+ OperandTypes.family(SqlTypeFamily.INTEGER),
+ OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER)),
+ SqlFunctionCategory.TIMEDATE);
public static final SqlFunction DATE_FORMAT =
new SqlFunction(
"DATE_FORMAT",
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 2de893a09..f30c19946 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -113,6 +113,40 @@ public class PostTransformOperatorTest {
.primaryKey("col1")
.build();
+ private static final TableId FROM_UNIX_TIME_TABLEID =
+ TableId.tableId("my_company", "my_branch", "from_unix_time_table");
+ private static final Schema FROM_UNIX_TIME_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("seconds", DataTypes.BIGINT())
+ .physicalColumn("format_str", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ private static final Schema EXPECTED_FROM_UNIX_TIME_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("from_unix_time", DataTypes.STRING())
+ .physicalColumn("from_unix_time_format", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+
+ private static final TableId UNIX_TIMESTAMP_TABLEID =
+ TableId.tableId("my_company", "my_branch", "unix_timestamp_table");
+ private static final Schema UNIX_TIMESTAMP_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("date_time_str", DataTypes.STRING())
+ .physicalColumn("unix_timestamp_format", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ private static final Schema EXPECTED_UNIX_TIMESTAMP_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("unix_timestamp", DataTypes.BIGINT())
+ .physicalColumn("unix_timestamp_format", DataTypes.BIGINT())
+ .primaryKey("col1")
+ .build();
+
private static final TableId TIMESTAMPDIFF_TABLEID =
TableId.tableId("my_company", "my_branch", "timestampdiff_table");
private static final Schema TIMESTAMPDIFF_SCHEMA =
@@ -799,6 +833,322 @@ public class PostTransformOperatorTest {
transformFunctionEventEventOperatorTestHarness.close();
}
+ @Test
+ void testFromUnixTimeTransform() throws Exception {
+ // In UTC, from_unix_time(0s) ==> 1970-01-01 00:00:00
+ testFromUnixTimeTransformWithTimeZone("UTC", 0L, "1970-01-01 00:00:00");
+ // In UTC, from_unix_time(44s) ==> 1970-01-01 00:00:44
+ testFromUnixTimeTransformWithTimeZone("UTC", 44L, "1970-01-01 00:00:44");
+ // In Berlin, the time zone is +1:00, from_unix_time(44s) ==> 1970-01-01 01:00:44
+ testFromUnixTimeTransformWithTimeZone("Europe/Berlin", 44L, "1970-01-01 01:00:44");
+ // In Shanghai, the time zone is +8:00, from_unix_time(44s) ==> 1970-01-01 08:00:44
+ testFromUnixTimeTransformWithTimeZone("Asia/Shanghai", 44L, "1970-01-01 08:00:44");
+ }
+
+ private void testFromUnixTimeTransformWithTimeZone(
+ String timeZone, Long seconds, String unixTimeStr) throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ FROM_UNIX_TIME_TABLEID.identifier(),
+ "col1, FROM_UNIXTIME(seconds) as from_unix_time,"
+ + " FROM_UNIXTIME(seconds, format_str) as from_unix_time_format",
+ null)
+ .addTimezone(timeZone)
+ .build();
+ RegularEventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(FROM_UNIX_TIME_TABLEID, FROM_UNIX_TIME_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) FROM_UNIX_TIME_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_FROM_UNIX_TIME_SCHEMA.toRowDataType()));
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ FROM_UNIX_TIME_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ seconds,
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss")
+ }));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ FROM_UNIX_TIME_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData(unixTimeStr),
+ new BinaryStringData(unixTimeStr)
+ }));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ FROM_UNIX_TIME_TABLEID, EXPECTED_FROM_UNIX_TIME_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
+ /*
+ Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds),
+ using the specified timezone in table config.
+
+ If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”,
+ this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed,
+ the default value Long.MIN_VALUE(-9223372036854775808) will be returned.
+ */
+ @Test
+ void testUnixTimestampTransformInBerlin() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ UNIX_TIMESTAMP_TABLEID.identifier(),
+ "col1,"
+ + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp,"
+ + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format",
+ null)
+ .addTimezone("Europe/Berlin")
+ .build();
+ RegularEventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
+
+ // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 25201L
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("1"), 25201L, 25201L}));
+ transform.processElement(new StreamRecord<>(insertEvent1));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect1));
+
+ // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // 1L
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("2"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("2"), 25201L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent2));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect2));
+
+ // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==>
+ // 25201L
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("3"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("3"), 25201L, 25201L}));
+ transform.processElement(new StreamRecord<>(insertEvent3));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect3));
+
+ // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // -9223372036854775808L
+ DataChangeEvent insertEvent4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"), 25201L, -9223372036854775808L
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent4));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect4));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
+ @Test
+ void testUnixTimestampTransformInShanghai() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ UNIX_TIMESTAMP_TABLEID.identifier(),
+ "col1,"
+ + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp,"
+ + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format",
+ null)
+ .addTimezone("Asia/Shanghai")
+ .build();
+ RegularEventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
+
+ // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("1"), 1L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent1));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect1));
+
+ // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // 1L
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("2"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0100"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("2"), 1L, 25201L}));
+ transform.processElement(new StreamRecord<>(insertEvent2));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect2));
+
+ // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==>
+ // 1L
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("3"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0100"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("3"), 1L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent3));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect3));
+
+ // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // -9223372036854775808L
+ DataChangeEvent insertEvent4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"), 1L, -9223372036854775808L
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent4));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect4));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
@Test
void testTimestampDiffTransform() throws Exception {
PostTransformOperator transform =
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index bc5d12005..478d4e92e 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -243,6 +243,17 @@ public class TransformParserTest {
testFilterExpression(
"id = CURRENT_TIMESTAMP", "valueEquals(id, currentTimestamp(__epoch_time__))");
testFilterExpression("NOW()", "now(__epoch_time__)");
+ testFilterExpression("FROM_UNIXTIME(44)", "fromUnixtime(44, __time_zone__)");
+ testFilterExpression(
+ "FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss')",
+ "fromUnixtime(44, \"yyyy/MM/dd HH:mm:ss\", __time_zone__)");
+ testFilterExpression("UNIX_TIMESTAMP()", "unixTimestamp(__epoch_time__, __time_zone__)");
+ testFilterExpression(
+ "UNIX_TIMESTAMP('1970-01-01 08:00:01')",
+ "unixTimestamp(\"1970-01-01 08:00:01\", __epoch_time__, __time_zone__)");
+ testFilterExpression(
+ "UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X')",
+ "unixTimestamp(\"1970-01-01 08:00:01.001 +0800\", \"yyyy-MM-dd HH:mm:ss.SSS X\", __epoch_time__, __time_zone__)");
testFilterExpression("YEAR(dt)", "year(dt)");
testFilterExpression("QUARTER(dt)", "quarter(dt)");
testFilterExpression("MONTH(dt)", "month(dt)");