[FLINK-36865][cdc] Provide UNIX_TIMESTAMP series functions in YAML pipeline

This closes .
pull/3482/head
Wink committed by GitHub
parent 39fd00ce49
commit 2fd03e683e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is yyyy-MM-dd HH:mm:ss). numeric is an internal timestamp value representing seconds since 1970-01-01 00:00:00 UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns 1970-01-01 00:00:44 if in UTC time zone, but returns 1970-01-01 09:00:44 if in Asia/Tokyo time zone. |
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.<br/>If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions

@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is yyyy-MM-dd HH:mm:ss). numeric is an internal timestamp value representing seconds since 1970-01-01 00:00:00 UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns 1970-01-01 00:00:44 if in UTC time zone, but returns 1970-01-01 09:00:44 if in Asia/Tokyo time zone. |
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.<br/>If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions

@ -43,6 +43,16 @@ public class DateTimeUtils {
*/
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000
/** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
/** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
private static final String TIME_FORMAT_STRING = "HH:mm:ss";
/** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */
private static final String TIMESTAMP_FORMAT_STRING =
DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
/**
* A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe.
* (string_format) => formatter
@ -109,7 +119,7 @@ public class DateTimeUtils {
} catch (ParseException e) {
LOG.error(
String.format(
"Exception when parsing datetime string '%s' in format '%s'",
"Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.",
dateStr, format),
e);
return Long.MIN_VALUE;
@ -128,6 +138,56 @@ public class DateTimeUtils {
return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045;
}
// --------------------------------------------------------------------------------------------
// UNIX TIME
// --------------------------------------------------------------------------------------------
/**
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
* "yyyy-MM-dd HH:mm:ss" format.
*/
public static String formatUnixTimestamp(long unixTime, TimeZone timeZone) {
return formatUnixTimestamp(unixTime, TIMESTAMP_FORMAT_STRING, timeZone);
}
/**
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
* given format.
*/
public static String formatUnixTimestamp(long unixTime, String format, TimeZone timeZone) {
SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
formatter.setTimeZone(timeZone);
Date date = new Date(unixTime * 1000);
try {
return formatter.format(date);
} catch (Exception e) {
LOG.error("Exception when formatting.", e);
return null;
}
}
/**
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
* 00:00:00' UTC.
*/
public static long unixTimestamp(String dateStr, TimeZone timeZone) {
return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, timeZone);
}
/**
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
* 00:00:00' UTC.
*/
public static long unixTimestamp(String dateStr, String format, TimeZone timeZone) {
long ts = internalParseTimestampMillis(dateStr, format, timeZone);
if (ts == Long.MIN_VALUE) {
return Long.MIN_VALUE;
} else {
// return the seconds
return ts / 1000;
}
}
// --------------------------------------------------------------------------------------------
// Format
// --------------------------------------------------------------------------------------------

@ -80,6 +80,27 @@ public class SystemFunctionUtils {
return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond());
}
public static String fromUnixtime(long seconds, String timezone) {
return DateTimeUtils.formatUnixTimestamp(seconds, TimeZone.getTimeZone(timezone));
}
public static String fromUnixtime(long seconds, String format, String timezone) {
return DateTimeUtils.formatUnixTimestamp(seconds, format, TimeZone.getTimeZone(timezone));
}
public static long unixTimestamp(long epochTime, String timezone) {
return epochTime / 1000;
}
public static long unixTimestamp(String dateTimeStr, long epochTime, String timezone) {
return DateTimeUtils.unixTimestamp(dateTimeStr, TimeZone.getTimeZone(timezone));
}
public static long unixTimestamp(
String dateTimeStr, String format, long epochTime, String timezone) {
return DateTimeUtils.unixTimestamp(dateTimeStr, format, TimeZone.getTimeZone(timezone));
}
public static String dateFormat(TimestampData timestamp, String format) {
return DateTimeUtils.formatTimestampMillis(
timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC"));

@ -57,13 +57,18 @@ public class JaninoCompiler {
Arrays.asList("CURRENT_TIMESTAMP", "NOW");
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE");
Arrays.asList(
"LOCALTIME",
"LOCALTIMESTAMP",
"CURRENT_TIME",
"CURRENT_DATE",
"UNIX_TIMESTAMP");
private static final List<String> TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList("DATE_FORMAT");
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList("TO_DATE", "TO_TIMESTAMP");
Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME");
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
public static final String DEFAULT_TIME_ZONE = "__time_zone__";

@ -212,6 +212,27 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable {
return SqlSyntax.FUNCTION;
}
};
public static final SqlFunction UNIX_TIMESTAMP =
new SqlFunction(
"UNIX_TIMESTAMP",
SqlKind.OTHER_FUNCTION,
ReturnTypes.BIGINT_NULLABLE,
null,
OperandTypes.or(
OperandTypes.NILADIC,
OperandTypes.family(SqlTypeFamily.CHARACTER),
OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction FROM_UNIXTIME =
new SqlFunction(
"FROM_UNIXTIME",
SqlKind.OTHER_FUNCTION,
TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.INTEGER),
OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER)),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction DATE_FORMAT =
new SqlFunction(
"DATE_FORMAT",

@ -113,6 +113,40 @@ public class PostTransformOperatorTest {
.primaryKey("col1")
.build();
private static final TableId FROM_UNIX_TIME_TABLEID =
TableId.tableId("my_company", "my_branch", "from_unix_time_table");
private static final Schema FROM_UNIX_TIME_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING().notNull())
.physicalColumn("seconds", DataTypes.BIGINT())
.physicalColumn("format_str", DataTypes.STRING())
.primaryKey("col1")
.build();
private static final Schema EXPECTED_FROM_UNIX_TIME_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING().notNull())
.physicalColumn("from_unix_time", DataTypes.STRING())
.physicalColumn("from_unix_time_format", DataTypes.STRING())
.primaryKey("col1")
.build();
private static final TableId UNIX_TIMESTAMP_TABLEID =
TableId.tableId("my_company", "my_branch", "unix_timestamp_table");
private static final Schema UNIX_TIMESTAMP_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING().notNull())
.physicalColumn("date_time_str", DataTypes.STRING())
.physicalColumn("unix_timestamp_format", DataTypes.STRING())
.primaryKey("col1")
.build();
private static final Schema EXPECTED_UNIX_TIMESTAMP_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING().notNull())
.physicalColumn("unix_timestamp", DataTypes.BIGINT())
.physicalColumn("unix_timestamp_format", DataTypes.BIGINT())
.primaryKey("col1")
.build();
private static final TableId TIMESTAMPDIFF_TABLEID =
TableId.tableId("my_company", "my_branch", "timestampdiff_table");
private static final Schema TIMESTAMPDIFF_SCHEMA =
@ -799,6 +833,322 @@ public class PostTransformOperatorTest {
transformFunctionEventEventOperatorTestHarness.close();
}
@Test
void testFromUnixTimeTransform() throws Exception {
// In UTC, from_unix_time(0s) ==> 1970-01-01 00:00:00
testFromUnixTimeTransformWithTimeZone("UTC", 0L, "1970-01-01 00:00:00");
// In UTC, from_unix_time(44s) ==> 1970-01-01 00:00:44
testFromUnixTimeTransformWithTimeZone("UTC", 44L, "1970-01-01 00:00:44");
// In Berlin, the time zone is +1:00, from_unix_time(44s) ==> 1970-01-01 01:00:44
testFromUnixTimeTransformWithTimeZone("Europe/Berlin", 44L, "1970-01-01 01:00:44");
// In Shanghai, the time zone is +8:00, from_unix_time(44s) ==> 1970-01-01 08:00:44
testFromUnixTimeTransformWithTimeZone("Asia/Shanghai", 44L, "1970-01-01 08:00:44");
}
private void testFromUnixTimeTransformWithTimeZone(
String timeZone, Long seconds, String unixTimeStr) throws Exception {
PostTransformOperator transform =
PostTransformOperator.newBuilder()
.addTransform(
FROM_UNIX_TIME_TABLEID.identifier(),
"col1, FROM_UNIXTIME(seconds) as from_unix_time,"
+ " FROM_UNIXTIME(seconds, format_str) as from_unix_time_format",
null)
.addTimezone(timeZone)
.build();
RegularEventOperatorTestHarness<PostTransformOperator, Event>
transformFunctionEventEventOperatorTestHarness =
RegularEventOperatorTestHarness.with(transform, 1);
// Initialization
transformFunctionEventEventOperatorTestHarness.open();
// Create table
CreateTableEvent createTableEvent =
new CreateTableEvent(FROM_UNIX_TIME_TABLEID, FROM_UNIX_TIME_SCHEMA);
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(((RowType) FROM_UNIX_TIME_SCHEMA.toRowDataType()));
BinaryRecordDataGenerator expectedRecordDataGenerator =
new BinaryRecordDataGenerator(
((RowType) EXPECTED_FROM_UNIX_TIME_SCHEMA.toRowDataType()));
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
FROM_UNIX_TIME_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"),
seconds,
new BinaryStringData("yyyy-MM-dd HH:mm:ss")
}));
DataChangeEvent insertEventExpect =
DataChangeEvent.insertEvent(
FROM_UNIX_TIME_TABLEID,
expectedRecordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"),
new BinaryStringData(unixTimeStr),
new BinaryStringData(unixTimeStr)
}));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
new CreateTableEvent(
FROM_UNIX_TIME_TABLEID, EXPECTED_FROM_UNIX_TIME_SCHEMA)));
transform.processElement(new StreamRecord<>(insertEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect));
transformFunctionEventEventOperatorTestHarness.close();
}
/*
Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds),
using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as yyyy-MM-dd HH:mm:ss.SSS X,
this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed,
the default value Long.MIN_VALUE(-9223372036854775808) will be returned.
*/
@Test
void testUnixTimestampTransformInBerlin() throws Exception {
PostTransformOperator transform =
PostTransformOperator.newBuilder()
.addTransform(
UNIX_TIMESTAMP_TABLEID.identifier(),
"col1,"
+ " UNIX_TIMESTAMP(date_time_str) as unix_timestamp,"
+ " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format",
null)
.addTimezone("Europe/Berlin")
.build();
RegularEventOperatorTestHarness<PostTransformOperator, Event>
transformFunctionEventEventOperatorTestHarness =
RegularEventOperatorTestHarness.with(transform, 1);
// Initialization
transformFunctionEventEventOperatorTestHarness.open();
// Create table
CreateTableEvent createTableEvent =
new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA);
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
BinaryRecordDataGenerator expectedRecordDataGenerator =
new BinaryRecordDataGenerator(
((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
new CreateTableEvent(
UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
// In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 25201L
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"),
new BinaryStringData("1970-01-01 08:00:01.001"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
}));
DataChangeEvent insertEventExpect1 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
expectedRecordDataGenerator.generate(
new Object[] {new BinaryStringData("1"), 25201L, 25201L}));
transform.processElement(new StreamRecord<>(insertEvent1));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect1));
// In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
// 1L
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("2"),
new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
}));
DataChangeEvent insertEventExpect2 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
expectedRecordDataGenerator.generate(
new Object[] {new BinaryStringData("2"), 25201L, 1L}));
transform.processElement(new StreamRecord<>(insertEvent2));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect2));
// In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==>
// 25201L
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("3"),
new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
}));
DataChangeEvent insertEventExpect3 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
expectedRecordDataGenerator.generate(
new Object[] {new BinaryStringData("3"), 25201L, 25201L}));
transform.processElement(new StreamRecord<>(insertEvent3));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect3));
// In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
// -9223372036854775808L
DataChangeEvent insertEvent4 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("4"),
new BinaryStringData("1970-01-01 08:00:01.001"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
}));
DataChangeEvent insertEventExpect4 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
expectedRecordDataGenerator.generate(
new Object[] {
new BinaryStringData("4"), 25201L, -9223372036854775808L
}));
transform.processElement(new StreamRecord<>(insertEvent4));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect4));
transformFunctionEventEventOperatorTestHarness.close();
}
@Test
void testUnixTimestampTransformInShanghai() throws Exception {
PostTransformOperator transform =
PostTransformOperator.newBuilder()
.addTransform(
UNIX_TIMESTAMP_TABLEID.identifier(),
"col1,"
+ " UNIX_TIMESTAMP(date_time_str) as unix_timestamp,"
+ " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format",
null)
.addTimezone("Asia/Shanghai")
.build();
RegularEventOperatorTestHarness<PostTransformOperator, Event>
transformFunctionEventEventOperatorTestHarness =
RegularEventOperatorTestHarness.with(transform, 1);
// Initialization
transformFunctionEventEventOperatorTestHarness.open();
// Create table
CreateTableEvent createTableEvent =
new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA);
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
BinaryRecordDataGenerator expectedRecordDataGenerator =
new BinaryRecordDataGenerator(
((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
new CreateTableEvent(
UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
// In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"),
new BinaryStringData("1970-01-01 08:00:01.001"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
}));
DataChangeEvent insertEventExpect1 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
expectedRecordDataGenerator.generate(
new Object[] {new BinaryStringData("1"), 1L, 1L}));
transform.processElement(new StreamRecord<>(insertEvent1));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect1));
// In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
// 1L
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("2"),
new BinaryStringData("1970-01-01 08:00:01.001 +0100"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
}));
DataChangeEvent insertEventExpect2 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
expectedRecordDataGenerator.generate(
new Object[] {new BinaryStringData("2"), 1L, 25201L}));
transform.processElement(new StreamRecord<>(insertEvent2));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect2));
// In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==>
// 1L
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("3"),
new BinaryStringData("1970-01-01 08:00:01.001 +0100"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
}));
DataChangeEvent insertEventExpect3 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
expectedRecordDataGenerator.generate(
new Object[] {new BinaryStringData("3"), 1L, 1L}));
transform.processElement(new StreamRecord<>(insertEvent3));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect3));
// In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
// -9223372036854775808L
DataChangeEvent insertEvent4 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("4"),
new BinaryStringData("1970-01-01 08:00:01.001"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
}));
DataChangeEvent insertEventExpect4 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
expectedRecordDataGenerator.generate(
new Object[] {
new BinaryStringData("4"), 1L, -9223372036854775808L
}));
transform.processElement(new StreamRecord<>(insertEvent4));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect4));
transformFunctionEventEventOperatorTestHarness.close();
}
@Test
void testTimestampDiffTransform() throws Exception {
PostTransformOperator transform =

@ -243,6 +243,17 @@ public class TransformParserTest {
testFilterExpression(
"id = CURRENT_TIMESTAMP", "valueEquals(id, currentTimestamp(__epoch_time__))");
testFilterExpression("NOW()", "now(__epoch_time__)");
testFilterExpression("FROM_UNIXTIME(44)", "fromUnixtime(44, __time_zone__)");
testFilterExpression(
"FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss')",
"fromUnixtime(44, \"yyyy/MM/dd HH:mm:ss\", __time_zone__)");
testFilterExpression("UNIX_TIMESTAMP()", "unixTimestamp(__epoch_time__, __time_zone__)");
testFilterExpression(
"UNIX_TIMESTAMP('1970-01-01 08:00:01')",
"unixTimestamp(\"1970-01-01 08:00:01\", __epoch_time__, __time_zone__)");
testFilterExpression(
"UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X')",
"unixTimestamp(\"1970-01-01 08:00:01.001 +0800\", \"yyyy-MM-dd HH:mm:ss.SSS X\", __epoch_time__, __time_zone__)");
testFilterExpression("YEAR(dt)", "year(dt)");
testFilterExpression("QUARTER(dt)", "quarter(dt)");
testFilterExpression("MONTH(dt)", "month(dt)");

Loading…
Cancel
Save