[FLINK-35259][cdc][transform] Fix FlinkCDC pipeline transform can't deal timestamp field (#3278)

pull/3284/head
Wink 9 months ago committed by GitHub
parent 4609f5f155
commit 0108d0e5d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -17,6 +17,9 @@
package org.apache.flink.cdc.runtime.functions;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.utils.DateTimeUtils;
import org.slf4j.Logger;
@ -42,8 +45,8 @@ public class SystemFunctionUtils {
return DateTimeUtils.timestampMillisToTime(epochTime);
}
public static long localtimestamp(long epochTime, String timezone) {
return epochTime;
public static TimestampData localtimestamp(long epochTime, String timezone) {
return TimestampData.fromMillis(epochTime);
}
// synonym: localtime
@ -55,18 +58,28 @@ public class SystemFunctionUtils {
return DateTimeUtils.timestampMillisToDate(epochTime);
}
public static long currentTimestamp(long epochTime, String timezone) {
return epochTime + TimeZone.getTimeZone(timezone).getOffset(epochTime);
public static TimestampData currentTimestamp(long epochTime, String timezone) {
return TimestampData.fromMillis(
epochTime + TimeZone.getTimeZone(timezone).getOffset(epochTime));
}
// synonym: currentTimestamp
public static long now(long epochTime, String timezone) {
return currentTimestamp(epochTime, timezone);
public static LocalZonedTimestampData now(long epochTime, String timezone) {
return LocalZonedTimestampData.fromEpochMillis(epochTime);
}
public static String dateFormat(long timestamp, String format) {
public static String dateFormat(LocalZonedTimestampData timestamp, String format) {
SimpleDateFormat dateFormat = new SimpleDateFormat(format);
return dateFormat.format(new Date(timestamp));
return dateFormat.format(new Date(timestamp.getEpochMillisecond()));
}
public static String dateFormat(TimestampData timestamp, String format) {
SimpleDateFormat dateFormat = new SimpleDateFormat(format);
return dateFormat.format(new Date(timestamp.getMillisecond()));
}
public static String dateFormat(ZonedTimestampData timestamp, String format) {
SimpleDateFormat dateFormat = new SimpleDateFormat(format);
return dateFormat.format(new Date(timestamp.getMillisecond()));
}
public static int toDate(String str) {
@ -77,20 +90,38 @@ public class SystemFunctionUtils {
return DateTimeUtils.parseDate(str, format);
}
public static long toTimestamp(String str) {
public static TimestampData toTimestamp(String str) {
return toTimestamp(str, "yyyy-MM-dd HH:mm:ss");
}
public static long toTimestamp(String str, String format) {
public static TimestampData toTimestamp(String str, String format) {
SimpleDateFormat dateFormat = new SimpleDateFormat(format);
try {
return dateFormat.parse(str).getTime();
return TimestampData.fromMillis(dateFormat.parse(str).getTime());
} catch (ParseException e) {
LOG.error("Unsupported date type convert: {}", str);
throw new RuntimeException(e);
}
}
public static int timestampDiff(
String symbol,
LocalZonedTimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
return timestampDiff(
symbol, fromTimestamp.getEpochMillisecond(), toTimestamp.getEpochMillisecond());
}
public static int timestampDiff(
String symbol, TimestampData fromTimestamp, TimestampData toTimestamp) {
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
}
public static int timestampDiff(
String symbol, ZonedTimestampData fromTimestamp, ZonedTimestampData toTimestamp) {
return timestampDiff(symbol, fromTimestamp.getMillisecond(), toTimestamp.getMillisecond());
}
public static int timestampDiff(String symbol, long fromDate, long toDate) {
Calendar from = Calendar.getInstance();
from.setTime(new Date(fromDate));

@ -451,6 +451,10 @@ public class DataTypeConverter {
// TIMESTAMP_LTZ type is encoded in string type
Instant instant = Instant.parse(str);
return LocalZonedTimestampData.fromInstant(instant);
} else if (obj instanceof Long) {
return LocalZonedTimestampData.fromEpochMillis((Long) obj);
} else if (obj instanceof LocalZonedTimestampData) {
return obj;
}
throw new IllegalArgumentException(
"Unable to convert to TIMESTAMP_LTZ from unexpected value '"

@ -465,7 +465,7 @@ public class TransformDataOperatorTest {
.addTransform(
TIMESTAMP_TABLEID.identifier(),
"col1, IF(LOCALTIME = CURRENT_TIME, 1, 0) as time_equal,"
+ " IF(LOCALTIMESTAMP = CURRENT_TIMESTAMP and NOW() = LOCALTIMESTAMP, 1, 0) as timestamp_equal,"
+ " IF(LOCALTIMESTAMP = CURRENT_TIMESTAMP, 1, 0) as timestamp_equal,"
+ " IF(TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) = CURRENT_DATE, 1, 0) as date_equal",
"LOCALTIMESTAMP = CURRENT_TIMESTAMP")
.addTimezone("GMT")

@ -17,6 +17,8 @@
package org.apache.flink.cdc.runtime.parser;
import org.apache.flink.cdc.common.data.TimestampData;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.commons.compiler.Location;
import org.codehaus.janino.ExpressionEvaluator;
@ -119,9 +121,9 @@ public class JaninoCompilerTest {
JaninoCompiler.loadSystemFunction(expression),
columnNames,
paramTypes,
Long.class);
TimestampData.class);
Object evaluate = expressionEvaluator.evaluate(params.toArray());
Assert.assertEquals(localTime, evaluate);
Assert.assertEquals(TimestampData.fromMillis(localTime), evaluate);
}
@Test

Loading…
Cancel
Save