private void clearOperator() {
this.transforms = null;
- this.processorMap = null;
+ this.preTransformProcessorMap = null;
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
new file mode 100644
index 000000000..1ede3ce77
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.runtime.parser.TransformParser;
+import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The processor of pre-transform projection in {@link PreTransformOperator}.
+ *
+ * A pre-transform projection processor handles:
+ *
+ *
+ * CreateTableEvent: removes unused (unreferenced) columns from given schema.
+ * SchemaChangeEvent: update the columns of TransformProjection.
+ * DataChangeEvent: omits unused columns in data row.
+ *
+ */
+public class PreTransformProcessor {
+ private PreTransformChangeInfo tableChangeInfo;
+ private TransformProjection transformProjection;
+ private @Nullable TransformFilter transformFilter;
+ private List cachedProjectionColumnsState;
+
+ public PreTransformProcessor(
+ PreTransformChangeInfo tableChangeInfo,
+ TransformProjection transformProjection,
+ @Nullable TransformFilter transformFilter) {
+ this.tableChangeInfo = tableChangeInfo;
+ this.transformProjection = transformProjection;
+ this.transformFilter = transformFilter;
+ this.cachedProjectionColumnsState =
+ cacheIsProjectionColumnMap(tableChangeInfo, transformProjection);
+ }
+
+ public boolean hasTableChangeInfo() {
+ return this.tableChangeInfo != null;
+ }
+
+ /**
+ * This method analyses (directly and indirectly) referenced columns, and peels unused columns
+ * from schema. For example, given original schema with columns (A, B, C, D, E) with projection
+ * rule (A, B + 1 as newB) and filtering rule (C > 0), a peeled schema containing (A, B, C) only
+ * will be sent to downstream, and (D, E) column along with corresponding data will be trimmed.
+ */
+ public CreateTableEvent preTransformCreateTableEvent(CreateTableEvent createTableEvent) {
+ List preTransformColumns =
+ TransformParser.generateReferencedColumns(
+ transformProjection.getProjection(),
+ transformFilter != null ? transformFilter.getExpression() : null,
+ createTableEvent.getSchema().getColumns());
+ Schema schema = createTableEvent.getSchema().copy(preTransformColumns);
+ return new CreateTableEvent(createTableEvent.tableId(), schema);
+ }
+
+ public BinaryRecordData processFillDataField(BinaryRecordData data) {
+ List valueList = new ArrayList<>();
+ List columns = tableChangeInfo.getPreTransformedSchema().getColumns();
+
+ for (int i = 0; i < columns.size(); i++) {
+ if (cachedProjectionColumnsState.get(i)) {
+ valueList.add(null);
+ } else {
+ valueList.add(
+ getValueFromBinaryRecordData(
+ columns.get(i).getName(),
+ data,
+ tableChangeInfo.getSourceSchema().getColumns(),
+ tableChangeInfo.getSourceFieldGetters()));
+ }
+ }
+
+ return tableChangeInfo
+ .getPreTransformedRecordDataGenerator()
+ .generate(valueList.toArray(new Object[0]));
+ }
+
+ private Object getValueFromBinaryRecordData(
+ String columnName,
+ BinaryRecordData binaryRecordData,
+ List columns,
+ RecordData.FieldGetter[] fieldGetters) {
+ for (int i = 0; i < columns.size(); i++) {
+ if (columnName.equals(columns.get(i).getName())) {
+ return DataTypeConverter.convert(
+ fieldGetters[i].getFieldOrNull(binaryRecordData), columns.get(i).getType());
+ }
+ }
+ return null;
+ }
+
+ private List cacheIsProjectionColumnMap(
+ PreTransformChangeInfo tableChangeInfo, TransformProjection transformProjection) {
+ List cachedMap = new ArrayList<>();
+ if (!hasTableChangeInfo()) {
+ return cachedMap;
+ }
+
+ for (Column column : tableChangeInfo.getPreTransformedSchema().getColumns()) {
+ boolean isProjectionColumn = false;
+ for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
+ if (column.getName().equals(projectionColumn.getColumnName())
+ && projectionColumn.isValidTransformedProjectionColumn()) {
+ isProjectionColumn = true;
+ break;
+ }
+ }
+ cachedMap.add(isProjectionColumn);
+ }
+
+ return cachedMap;
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformer.java
new file mode 100644
index 000000000..305d7f387
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.schema.Selectors;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/** Pre-Transformation rule used by {@link PreTransformOperator}. */
+public class PreTransformer {
+ private final Selectors selectors;
+
+ private final Optional projection;
+ private final Optional filter;
+
+ public PreTransformer(
+ Selectors selectors,
+ @Nullable TransformProjection projection,
+ @Nullable TransformFilter filter) {
+ this.selectors = selectors;
+ this.projection = projection != null ? Optional.of(projection) : Optional.empty();
+ this.filter = filter != null ? Optional.of(filter) : Optional.empty();
+ }
+
+ public Selectors getSelectors() {
+ return selectors;
+ }
+
+ public Optional getProjection() {
+ return projection;
+ }
+
+ public Optional getFilter() {
+ return filter;
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
index 500e36c99..4bbe3d558 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.utils.StringUtils;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -59,6 +60,14 @@ public class ProjectionColumn implements Serializable {
this.originalColumnNames = originalColumnNames;
}
+ public ProjectionColumn copy() {
+ return new ProjectionColumn(
+ column.copy(column.getName()),
+ expression,
+ scriptExpression,
+ new ArrayList<>(originalColumnNames));
+ }
+
public Column getColumn() {
return column;
}
@@ -103,4 +112,22 @@ public class ProjectionColumn implements Serializable {
scriptExpression,
originalColumnNames);
}
+
+ @Override
+ public String toString() {
+ return "ProjectionColumn{"
+ + "column="
+ + column
+ + ", expression='"
+ + expression
+ + '\''
+ + ", scriptExpression='"
+ + scriptExpression
+ + '\''
+ + ", originalColumnNames="
+ + originalColumnNames
+ + ", transformExpressionKey="
+ + transformExpressionKey
+ + '}';
+ }
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index 6c5202342..0d7df06ab 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -39,13 +39,13 @@ import java.util.List;
public class ProjectionColumnProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ProjectionColumnProcessor.class);
- private TableInfo tableInfo;
+ private PostTransformChangeInfo tableInfo;
private ProjectionColumn projectionColumn;
private String timezone;
private TransformExpressionKey transformExpressionKey;
public ProjectionColumnProcessor(
- TableInfo tableInfo, ProjectionColumn projectionColumn, String timezone) {
+ PostTransformChangeInfo tableInfo, ProjectionColumn projectionColumn, String timezone) {
this.tableInfo = tableInfo;
this.projectionColumn = projectionColumn;
this.timezone = timezone;
@@ -53,10 +53,14 @@ public class ProjectionColumnProcessor {
}
public static ProjectionColumnProcessor of(
- TableInfo tableInfo, ProjectionColumn projectionColumn, String timezone) {
+ PostTransformChangeInfo tableInfo, ProjectionColumn projectionColumn, String timezone) {
return new ProjectionColumnProcessor(tableInfo, projectionColumn, timezone);
}
+ public ProjectionColumn getProjectionColumn() {
+ return projectionColumn;
+ }
+
public Object evaluate(BinaryRecordData after, long epochTime) {
ExpressionEvaluator expressionEvaluator =
TransformExpressionCompiler.compileExpression(transformExpressionKey);
@@ -75,30 +79,36 @@ public class ProjectionColumnProcessor {
private Object[] generateParams(BinaryRecordData after, long epochTime) {
List params = new ArrayList<>();
- List columns = tableInfo.getSchema().getColumns();
- RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters();
+ List columns = tableInfo.getPreTransformedSchema().getColumns();
+ RecordData.FieldGetter[] fieldGetters = tableInfo.getPreTransformedFieldGetters();
for (String originalColumnName : projectionColumn.getOriginalColumnNames()) {
- if (originalColumnName.equals(TransformParser.DEFAULT_NAMESPACE_NAME)) {
- params.add(tableInfo.getNamespace());
- continue;
- }
- if (originalColumnName.equals(TransformParser.DEFAULT_SCHEMA_NAME)) {
- params.add(tableInfo.getSchemaName());
- continue;
- }
- if (originalColumnName.equals(TransformParser.DEFAULT_TABLE_NAME)) {
- params.add(tableInfo.getTableName());
- continue;
+ switch (originalColumnName) {
+ case TransformParser.DEFAULT_NAMESPACE_NAME:
+ params.add(tableInfo.getNamespace());
+ continue;
+ case TransformParser.DEFAULT_SCHEMA_NAME:
+ params.add(tableInfo.getSchemaName());
+ continue;
+ case TransformParser.DEFAULT_TABLE_NAME:
+ params.add(tableInfo.getTableName());
+ continue;
}
+
+ boolean argumentFound = false;
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
if (column.getName().equals(originalColumnName)) {
params.add(
DataTypeConverter.convertToOriginal(
fieldGetters[i].getFieldOrNull(after), column.getType()));
+ argumentFound = true;
break;
}
}
+ if (!argumentFound) {
+ throw new IllegalArgumentException(
+ "Failed to evaluate argument " + originalColumnName);
+ }
}
params.add(timezone);
params.add(epochTime);
@@ -108,12 +118,11 @@ public class ProjectionColumnProcessor {
private TransformExpressionKey generateTransformExpressionKey() {
List argumentNames = new ArrayList<>();
List> paramTypes = new ArrayList<>();
- List columns = tableInfo.getSchema().getColumns();
+ List columns = tableInfo.getPreTransformedSchema().getColumns();
String scriptExpression = projectionColumn.getScriptExpression();
List originalColumnNames = projectionColumn.getOriginalColumnNames();
for (String originalColumnName : originalColumnNames) {
- for (int i = 0; i < columns.size(); i++) {
- Column column = columns.get(i);
+ for (Column column : columns) {
if (column.getName().equals(originalColumnName)) {
argumentNames.add(originalColumnName);
paramTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableInfo.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableInfo.java
deleted file mode 100644
index 93bfcafec..000000000
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableInfo.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.transform;
-
-import org.apache.flink.cdc.common.data.RecordData;
-import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.utils.SchemaUtils;
-import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
-import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
-
-import java.util.List;
-
-/** The TableInfo applies to cache schema and fieldGetters. */
-public class TableInfo {
- private TableId tableId;
- private Schema schema;
- private RecordData.FieldGetter[] fieldGetters;
- private BinaryRecordDataGenerator recordDataGenerator;
-
- public TableInfo(
- TableId tableId,
- Schema schema,
- RecordData.FieldGetter[] fieldGetters,
- BinaryRecordDataGenerator recordDataGenerator) {
- this.tableId = tableId;
- this.schema = schema;
- this.fieldGetters = fieldGetters;
- this.recordDataGenerator = recordDataGenerator;
- }
-
- public String getName() {
- return tableId.identifier();
- }
-
- public String getTableName() {
- return tableId.getTableName();
- }
-
- public String getSchemaName() {
- return tableId.getSchemaName();
- }
-
- public String getNamespace() {
- return tableId.getNamespace();
- }
-
- public TableId getTableId() {
- return tableId;
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- public RecordData.FieldGetter[] getFieldGetters() {
- return fieldGetters;
- }
-
- public BinaryRecordDataGenerator getRecordDataGenerator() {
- return recordDataGenerator;
- }
-
- public static TableInfo of(TableId tableId, Schema schema) {
- List fieldGetters =
- SchemaUtils.createFieldGetters(schema.getColumns());
- BinaryRecordDataGenerator recordDataGenerator =
- new BinaryRecordDataGenerator(DataTypeConverter.toRowType(schema.getColumns()));
- return new TableInfo(
- tableId,
- schema,
- fieldGetters.toArray(new RecordData.FieldGetter[0]),
- recordDataGenerator);
- }
-}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index 7eaefc3c8..9bae6a573 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -17,11 +17,11 @@
package org.apache.flink.cdc.runtime.operators.transform;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
-import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
import org.codehaus.janino.ExpressionEvaluator;
@@ -31,17 +31,22 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_NAMESPACE_NAME;
+import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_SCHEMA_NAME;
+import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_TABLE_NAME;
/** The processor of the transform filter. It processes the data change event of matched table. */
public class TransformFilterProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TransformFilterProcessor.class);
- private TableInfo tableInfo;
+ private PostTransformChangeInfo tableInfo;
private TransformFilter transformFilter;
private String timezone;
private TransformExpressionKey transformExpressionKey;
public TransformFilterProcessor(
- TableInfo tableInfo, TransformFilter transformFilter, String timezone) {
+ PostTransformChangeInfo tableInfo, TransformFilter transformFilter, String timezone) {
this.tableInfo = tableInfo;
this.transformFilter = transformFilter;
this.timezone = timezone;
@@ -49,7 +54,7 @@ public class TransformFilterProcessor {
}
public static TransformFilterProcessor of(
- TableInfo tableInfo, TransformFilter transformFilter, String timezone) {
+ PostTransformChangeInfo tableInfo, TransformFilter transformFilter, String timezone) {
return new TransformFilterProcessor(tableInfo, transformFilter, timezone);
}
@@ -68,22 +73,52 @@ public class TransformFilterProcessor {
}
}
+ private Tuple2, List>> generateArguments() {
+ List argNames = new ArrayList<>();
+ List> argTypes = new ArrayList<>();
+ String scriptExpression = transformFilter.getScriptExpression();
+ List columns = tableInfo.getPreTransformedSchema().getColumns();
+ List columnNames = transformFilter.getColumnNames();
+ for (String columnName : columnNames) {
+ for (Column column : columns) {
+ if (column.getName().equals(columnName)) {
+ if (!argNames.contains(columnName)) {
+ argNames.add(columnName);
+ argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
+ }
+ break;
+ }
+ }
+ }
+ Stream.of(DEFAULT_NAMESPACE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME)
+ .forEach(
+ metadataColumn -> {
+ if (scriptExpression.contains(metadataColumn)
+ && !argNames.contains(metadataColumn)) {
+ argNames.add(metadataColumn);
+ argTypes.add(String.class);
+ }
+ });
+ return Tuple2.of(argNames, argTypes);
+ }
+
private Object[] generateParams(BinaryRecordData after, long epochTime) {
List params = new ArrayList<>();
- List columns = tableInfo.getSchema().getColumns();
- RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters();
- for (String columnName : transformFilter.getColumnNames()) {
- if (columnName.equals(TransformParser.DEFAULT_NAMESPACE_NAME)) {
- params.add(tableInfo.getNamespace());
- continue;
- }
- if (columnName.equals(TransformParser.DEFAULT_SCHEMA_NAME)) {
- params.add(tableInfo.getSchemaName());
- continue;
- }
- if (columnName.equals(TransformParser.DEFAULT_TABLE_NAME)) {
- params.add(tableInfo.getTableName());
- continue;
+ List columns = tableInfo.getPreTransformedSchema().getColumns();
+
+ Tuple2, List>> args = generateArguments();
+ RecordData.FieldGetter[] fieldGetters = tableInfo.getPreTransformedFieldGetters();
+ for (String columnName : args.f0) {
+ switch (columnName) {
+ case DEFAULT_NAMESPACE_NAME:
+ params.add(tableInfo.getNamespace());
+ continue;
+ case DEFAULT_SCHEMA_NAME:
+ params.add(tableInfo.getSchemaName());
+ continue;
+ case DEFAULT_TABLE_NAME:
+ params.add(tableInfo.getTableName());
+ continue;
}
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
@@ -101,48 +136,17 @@ public class TransformFilterProcessor {
}
private TransformExpressionKey generateTransformExpressionKey() {
- List argumentNames = new ArrayList<>();
- List> paramTypes = new ArrayList<>();
- List columns = tableInfo.getSchema().getColumns();
- String scriptExpression = transformFilter.getScriptExpression();
- List columnNames = transformFilter.getColumnNames();
- for (String columnName : columnNames) {
- for (int i = 0; i < columns.size(); i++) {
- Column column = columns.get(i);
- if (column.getName().equals(columnName)) {
- argumentNames.add(columnName);
- paramTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
- break;
- }
- }
- }
- if (scriptExpression.contains(TransformParser.DEFAULT_NAMESPACE_NAME)
- && !argumentNames.contains(TransformParser.DEFAULT_NAMESPACE_NAME)) {
- argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
- paramTypes.add(String.class);
- }
-
- if (scriptExpression.contains(TransformParser.DEFAULT_SCHEMA_NAME)
- && !argumentNames.contains(TransformParser.DEFAULT_SCHEMA_NAME)) {
- argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
- paramTypes.add(String.class);
- }
-
- if (scriptExpression.contains(TransformParser.DEFAULT_TABLE_NAME)
- && !argumentNames.contains(TransformParser.DEFAULT_TABLE_NAME)) {
- argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
- paramTypes.add(String.class);
- }
+ Tuple2, List>> args = generateArguments();
- argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE);
- paramTypes.add(String.class);
- argumentNames.add(JaninoCompiler.DEFAULT_EPOCH_TIME);
- paramTypes.add(Long.class);
+ args.f0.add(JaninoCompiler.DEFAULT_TIME_ZONE);
+ args.f1.add(String.class);
+ args.f0.add(JaninoCompiler.DEFAULT_EPOCH_TIME);
+ args.f1.add(Long.class);
return TransformExpressionKey.of(
- JaninoCompiler.loadSystemFunction(scriptExpression),
- argumentNames,
- paramTypes,
+ JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()),
+ args.f0,
+ args.f1,
Boolean.class);
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
index 7049bbdfd..307c890fd 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
@@ -19,9 +19,9 @@ package org.apache.flink.cdc.runtime.operators.transform;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
-import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Optional;
+import java.util.stream.Collectors;
/**
* The processor of transform projection applies to process a row of filtering tables.
@@ -41,146 +41,127 @@ import java.util.concurrent.ConcurrentHashMap;
*
* CreateTableEvent: add the user-defined computed columns into Schema.
* SchemaChangeEvent: update the columns of TransformProjection.
- * DataChangeEvent: Fill data field to row in TransformSchemaOperator. Process the data column
+ * DataChangeEvent: Fill data field to row in PreTransformOperator. Process the data column
* and the user-defined expression computed columns.
*
*/
public class TransformProjectionProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TransformProjectionProcessor.class);
- private TableInfo tableInfo;
- private TableChangeInfo tableChangeInfo;
- private TransformProjection transformProjection;
- private String timezone;
- private Map projectionColumnProcessorMap;
+ private final PostTransformChangeInfo postTransformChangeInfo;
+ private final TransformProjection transformProjection;
+ private final String timezone;
+ private final List cachedProjectionColumnProcessors;
public TransformProjectionProcessor(
- TableInfo tableInfo,
- TableChangeInfo tableChangeInfo,
+ PostTransformChangeInfo postTransformChangeInfo,
TransformProjection transformProjection,
String timezone) {
- this.tableInfo = tableInfo;
- this.tableChangeInfo = tableChangeInfo;
+ this.postTransformChangeInfo = postTransformChangeInfo;
this.transformProjection = transformProjection;
this.timezone = timezone;
- this.projectionColumnProcessorMap = new ConcurrentHashMap<>();
- }
-
- public boolean hasTableChangeInfo() {
- return this.tableChangeInfo != null;
+ this.cachedProjectionColumnProcessors =
+ cacheProjectionColumnProcessors(postTransformChangeInfo, transformProjection);
}
public boolean hasTableInfo() {
- return this.tableInfo != null;
+ return this.postTransformChangeInfo != null;
}
public static TransformProjectionProcessor of(
- TableInfo tableInfo, TransformProjection transformProjection, String timezone) {
- return new TransformProjectionProcessor(tableInfo, null, transformProjection, timezone);
+ PostTransformChangeInfo tableInfo,
+ TransformProjection transformProjection,
+ String timezone) {
+ return new TransformProjectionProcessor(tableInfo, transformProjection, timezone);
}
public static TransformProjectionProcessor of(
- TableChangeInfo tableChangeInfo, TransformProjection transformProjection) {
- return new TransformProjectionProcessor(null, tableChangeInfo, transformProjection, null);
+ TransformProjection transformProjection, String timezone) {
+ return new TransformProjectionProcessor(null, transformProjection, timezone);
}
public static TransformProjectionProcessor of(TransformProjection transformProjection) {
- return new TransformProjectionProcessor(null, null, transformProjection, null);
- }
-
- public CreateTableEvent processCreateTableEvent(CreateTableEvent createTableEvent) {
- List projectionColumns =
- TransformParser.generateProjectionColumns(
- transformProjection.getProjection(),
- createTableEvent.getSchema().getColumns());
- transformProjection.setProjectionColumns(projectionColumns);
- List allColumnList = transformProjection.getAllColumnList();
- // add the column of projection into Schema
- Schema schema = createTableEvent.getSchema().copy(allColumnList);
- return new CreateTableEvent(createTableEvent.tableId(), schema);
+ return new TransformProjectionProcessor(null, transformProjection, null);
}
- public void processSchemaChangeEvent(Schema schema) {
+ public Schema processSchemaChangeEvent(Schema schema) {
List projectionColumns =
TransformParser.generateProjectionColumns(
transformProjection.getProjection(), schema.getColumns());
transformProjection.setProjectionColumns(projectionColumns);
+ return schema.copy(
+ projectionColumns.stream()
+ .map(ProjectionColumn::getColumn)
+ .collect(Collectors.toList()));
}
- public BinaryRecordData processFillDataField(BinaryRecordData data) {
+ public BinaryRecordData processData(BinaryRecordData payload, long epochTime) {
List valueList = new ArrayList<>();
- for (Column column : tableChangeInfo.getTransformedSchema().getColumns()) {
- boolean isProjectionColumn = false;
- for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
- if (column.getName().equals(projectionColumn.getColumnName())
- && projectionColumn.isValidTransformedProjectionColumn()) {
- valueList.add(null);
- isProjectionColumn = true;
- break;
- }
- }
- if (!isProjectionColumn) {
- valueList.add(
- getValueFromBinaryRecordData(
- column.getName(),
- data,
- tableChangeInfo.getOriginalSchema().getColumns(),
- tableChangeInfo.getFieldGetters()));
- }
- }
- return tableChangeInfo
- .getRecordDataGenerator()
- .generate(valueList.toArray(new Object[valueList.size()]));
- }
+ List columns = postTransformChangeInfo.getPostTransformedSchema().getColumns();
- public BinaryRecordData processData(BinaryRecordData after, long epochTime) {
- List valueList = new ArrayList<>();
- for (Column column : tableInfo.getSchema().getColumns()) {
- boolean isProjectionColumn = false;
- for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
- if (column.getName().equals(projectionColumn.getColumnName())
- && projectionColumn.isValidTransformedProjectionColumn()) {
- if (!projectionColumnProcessorMap.containsKey(
- projectionColumn.getColumnName())) {
- projectionColumnProcessorMap.put(
- projectionColumn.getColumnName(),
- ProjectionColumnProcessor.of(
- tableInfo, projectionColumn, timezone));
- }
- ProjectionColumnProcessor projectionColumnProcessor =
- projectionColumnProcessorMap.get(projectionColumn.getColumnName());
- valueList.add(
- DataTypeConverter.convert(
- projectionColumnProcessor.evaluate(after, epochTime),
- projectionColumn.getDataType()));
- isProjectionColumn = true;
- break;
- }
- }
- if (!isProjectionColumn) {
+ for (int i = 0; i < columns.size(); i++) {
+ ProjectionColumnProcessor projectionColumnProcessor =
+ cachedProjectionColumnProcessors.get(i);
+ if (projectionColumnProcessor != null) {
+ ProjectionColumn projectionColumn = projectionColumnProcessor.getProjectionColumn();
+ valueList.add(
+ DataTypeConverter.convert(
+ projectionColumnProcessor.evaluate(payload, epochTime),
+ projectionColumn.getDataType()));
+ } else {
+ Column column = columns.get(i);
valueList.add(
getValueFromBinaryRecordData(
column.getName(),
- after,
- tableInfo.getSchema().getColumns(),
- tableInfo.getFieldGetters()));
+ column.getType(),
+ payload,
+ postTransformChangeInfo.getPreTransformedSchema().getColumns(),
+ postTransformChangeInfo.getPreTransformedFieldGetters()));
}
}
- return tableInfo
+
+ return postTransformChangeInfo
.getRecordDataGenerator()
- .generate(valueList.toArray(new Object[valueList.size()]));
+ .generate(valueList.toArray(new Object[0]));
}
private Object getValueFromBinaryRecordData(
String columnName,
+ DataType expectedType,
BinaryRecordData binaryRecordData,
List columns,
RecordData.FieldGetter[] fieldGetters) {
for (int i = 0; i < columns.size(); i++) {
if (columnName.equals(columns.get(i).getName())) {
return DataTypeConverter.convert(
- fieldGetters[i].getFieldOrNull(binaryRecordData), columns.get(i).getType());
+ fieldGetters[i].getFieldOrNull(binaryRecordData), expectedType);
}
}
return null;
}
+
+ private List cacheProjectionColumnProcessors(
+ PostTransformChangeInfo tableInfo, TransformProjection transformProjection) {
+ List cachedProjectionColumnProcessors = new ArrayList<>();
+ if (!hasTableInfo()) {
+ return cachedProjectionColumnProcessors;
+ }
+
+ for (Column column : tableInfo.getPostTransformedSchema().getColumns()) {
+ ProjectionColumn matchedProjectionColumn = null;
+ for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
+ if (column.getName().equals(projectionColumn.getColumnName())
+ && projectionColumn.isValidTransformedProjectionColumn()) {
+ matchedProjectionColumn = projectionColumn;
+ break;
+ }
+ }
+
+ cachedProjectionColumnProcessors.add(
+ Optional.ofNullable(matchedProjectionColumn)
+ .map(col -> ProjectionColumnProcessor.of(tableInfo, col, timezone))
+ .orElse(null));
+ }
+
+ return cachedProjectionColumnProcessors;
+ }
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
new file mode 100644
index 000000000..27b265226
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static org.apache.flink.cdc.runtime.parser.TransformParser.normalizeFilter;
+
+/** A rule defining pre-transformations where filtered rows and irrelevant columns are removed. */
+public class TransformRule implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String tableInclusions;
+ private final @Nullable String projection;
+ private final @Nullable String filter;
+ private final String primaryKey;
+ private final String partitionKey;
+ private final String tableOption;
+
+ public TransformRule(
+ String tableInclusions,
+ @Nullable String projection,
+ @Nullable String filter,
+ String primaryKey,
+ String partitionKey,
+ String tableOption) {
+ this.tableInclusions = tableInclusions;
+ this.projection = projection;
+ this.filter = normalizeFilter(projection, filter);
+ this.primaryKey = primaryKey;
+ this.partitionKey = partitionKey;
+ this.tableOption = tableOption;
+ }
+
+ public String getTableInclusions() {
+ return tableInclusions;
+ }
+
+ @Nullable
+ public String getProjection() {
+ return projection;
+ }
+
+ @Nullable
+ public String getFilter() {
+ return filter;
+ }
+
+ public String getPrimaryKey() {
+ return primaryKey;
+ }
+
+ public String getPartitionKey() {
+ return partitionKey;
+ }
+
+ public String getTableOption() {
+ return tableOption;
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index 23cf4b376..4db3f354f 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
-import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
import org.apache.flink.cdc.runtime.parser.metadata.TransformSchemaFactory;
import org.apache.flink.cdc.runtime.parser.metadata.TransformSqlOperatorTable;
@@ -41,15 +40,16 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.fun.SqlCase;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
@@ -60,14 +60,21 @@ import org.apache.calcite.sql2rel.StandardConvertletTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.cdc.common.utils.StringUtils.isNullOrWhitespaceOnly;
/** Use Flink's calcite parser to parse the statement of flink cdc pipeline transform. */
public class TransformParser {
@@ -141,16 +148,86 @@ public class TransformParser {
}
}
- // Parse all columns
+ // Returns referenced columns (directly and indirectly) by projection and filter expression.
+ // For example, given projection expression "a, c, upper(x) as d, y as e", filter expression "z
+ // > 0", and columns array [a, b, c, x, y, z], returns referenced column array [a, c, x, y, z].
+ public static List generateReferencedColumns(
+ String projectionExpression, @Nullable String filterExpression, List columns) {
+ if (isNullOrWhitespaceOnly(projectionExpression)) {
+ return new ArrayList<>();
+ }
+
+ Set referencedColumnNames = new HashSet<>();
+
+ SqlSelect sqlProject = parseProjectionExpression(projectionExpression);
+ if (!sqlProject.getSelectList().isEmpty()) {
+ for (SqlNode sqlNode : sqlProject.getSelectList()) {
+ if (sqlNode instanceof SqlBasicCall) {
+ SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
+ if (SqlKind.AS.equals(sqlBasicCall.getOperator().kind)) {
+ referencedColumnNames.addAll(
+ parseColumnNameList(sqlBasicCall.getOperandList().get(0)));
+ } else {
+ throw new ParseException(
+ "Unrecognized projection expression: "
+ + sqlBasicCall
+ + ". Should be AS ");
+ }
+ } else if (sqlNode instanceof SqlIdentifier) {
+ SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
+ if (sqlIdentifier.isStar()) {
+ // wildcard star character matches all columns
+ return columns;
+ }
+ referencedColumnNames.add(
+ sqlIdentifier.names.get(sqlIdentifier.names.size() - 1));
+ }
+ }
+ }
+
+ if (!isNullOrWhitespaceOnly(projectionExpression)) {
+ SqlSelect sqlFilter = parseFilterExpression(filterExpression);
+ referencedColumnNames.addAll(parseColumnNameList(sqlFilter.getWhere()));
+ }
+
+ return columns.stream()
+ .filter(e -> referencedColumnNames.contains(e.getName()))
+ .collect(Collectors.toList());
+ }
+
+ // Expands wildcard character * to full column list.
+ // For example, given projection expression "a AS new_a, *, c as new_c"
+ // and schema [a, b, c], expand it to [a as new_a, a, b, c, c as new_c].
+ // This step is necessary since passing wildcard to sqlToRel will capture
+ // unexpected metadata columns.
+ private static void expandWildcard(SqlSelect sqlSelect, List columns) {
+ List expandedNodes = new ArrayList<>();
+ for (SqlNode sqlNode : sqlSelect.getSelectList().getList()) {
+ if (sqlNode instanceof SqlIdentifier && ((SqlIdentifier) sqlNode).isStar()) {
+ expandedNodes.addAll(
+ columns.stream()
+ .map(c -> new SqlIdentifier(c.getName(), SqlParserPos.QUOTED_ZERO))
+ .collect(Collectors.toList()));
+ } else {
+ expandedNodes.add(sqlNode);
+ }
+ }
+ sqlSelect.setSelectList(new SqlNodeList(expandedNodes, SqlParserPos.ZERO));
+ }
+
+ // Returns projected columns based on given projection expression.
+ // For example, given projection expression "a, b, c, upper(a) as d, b as e" and columns array
+ // [a, b, c, x, y, z], returns projection column array [a, b, c, d, e].
public static List generateProjectionColumns(
String projectionExpression, List columns) {
- if (StringUtils.isNullOrWhitespaceOnly(projectionExpression)) {
+ if (isNullOrWhitespaceOnly(projectionExpression)) {
return new ArrayList<>();
}
SqlSelect sqlSelect = parseProjectionExpression(projectionExpression);
if (sqlSelect.getSelectList().isEmpty()) {
return new ArrayList<>();
}
+ expandWildcard(sqlSelect, columns);
RelNode relNode = sqlToRel(columns, sqlSelect);
Map relDataTypeMap =
relNode.getRowType().getFieldList().stream()
@@ -158,18 +235,23 @@ public class TransformParser {
Collectors.toMap(
RelDataTypeField::getName, RelDataTypeField::getType));
+ Map rawDataTypeMap =
+ columns.stream().collect(Collectors.toMap(Column::getName, Column::getType));
+
Map isNotNullMap =
columns.stream()
.collect(
Collectors.toMap(
Column::getName, column -> !column.getType().isNullable()));
+
List projectionColumns = new ArrayList<>();
+
for (SqlNode sqlNode : sqlSelect.getSelectList()) {
if (sqlNode instanceof SqlBasicCall) {
SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
if (SqlKind.AS.equals(sqlBasicCall.getOperator().kind)) {
Optional transformOptional = Optional.empty();
- String columnName = null;
+ String columnName;
List operandList = sqlBasicCall.getOperandList();
if (operandList.size() == 2) {
transformOptional = Optional.of(operandList.get(0));
@@ -177,7 +259,11 @@ public class TransformParser {
if (sqlNode1 instanceof SqlIdentifier) {
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode1;
columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
+ } else {
+ columnName = null;
}
+ } else {
+ columnName = null;
}
if (isMetadataColumn(columnName)) {
continue;
@@ -209,14 +295,25 @@ public class TransformParser {
projectionColumns.add(projectionColumn);
}
} else {
- throw new ParseException("Unrecognized projection: " + sqlBasicCall.toString());
+ throw new ParseException(
+ "Unrecognized projection expression: "
+ + sqlBasicCall
+ + ". Should be AS ");
}
} else if (sqlNode instanceof SqlIdentifier) {
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
- DataType columnType =
- DataTypeConverter.convertCalciteRelDataTypeToDataType(
- relDataTypeMap.get(columnName));
+ DataType columnType;
+ if (rawDataTypeMap.containsKey(columnName)) {
+ columnType = rawDataTypeMap.get(columnName);
+ } else if (relDataTypeMap.containsKey(columnName)) {
+ columnType =
+ DataTypeConverter.convertCalciteRelDataTypeToDataType(
+ relDataTypeMap.get(columnName));
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to deduce column %s type", columnName));
+ }
if (isMetadataColumn(columnName)) {
projectionColumns.add(
ProjectionColumn.of(
@@ -244,7 +341,7 @@ public class TransformParser {
}
public static String translateFilterExpressionToJaninoExpression(String filterExpression) {
- if (StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
+ if (isNullOrWhitespaceOnly(filterExpression)) {
return "";
}
SqlSelect sqlSelect = TransformParser.parseFilterExpression(filterExpression);
@@ -257,7 +354,7 @@ public class TransformParser {
public static List parseComputedColumnNames(String projection) {
List columnNames = new ArrayList<>();
- if (StringUtils.isNullOrWhitespaceOnly(projection)) {
+ if (isNullOrWhitespaceOnly(projection)) {
return columnNames;
}
SqlSelect sqlSelect = parseProjectionExpression(projection);
@@ -298,7 +395,7 @@ public class TransformParser {
}
public static List parseFilterColumnNameList(String filterExpression) {
- if (StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
+ if (isNullOrWhitespaceOnly(filterExpression)) {
return new ArrayList<>();
}
SqlSelect sqlSelect = parseFilterExpression(filterExpression);
@@ -315,12 +412,12 @@ public class TransformParser {
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
columnNameList.add(columnName);
- } else if (sqlNode instanceof SqlBasicCall) {
- SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
- findSqlIdentifier(sqlBasicCall.getOperandList(), columnNameList);
- } else if (sqlNode instanceof SqlCase) {
- SqlCase sqlCase = (SqlCase) sqlNode;
- findSqlIdentifier(sqlCase.getWhenOperands().getList(), columnNameList);
+ } else if (sqlNode instanceof SqlCall) {
+ SqlCall sqlCall = (SqlCall) sqlNode;
+ findSqlIdentifier(sqlCall.getOperandList(), columnNameList);
+ } else if (sqlNode instanceof SqlNodeList) {
+ SqlNodeList sqlNodeList = (SqlNodeList) sqlNode;
+ findSqlIdentifier(sqlNodeList.getList(), columnNameList);
}
return columnNameList;
}
@@ -331,13 +428,12 @@ public class TransformParser {
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
columnNameList.add(columnName);
- } else if (sqlNode instanceof SqlBasicCall) {
- SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
- findSqlIdentifier(sqlBasicCall.getOperandList(), columnNameList);
- } else if (sqlNode instanceof SqlCase) {
- SqlCase sqlCase = (SqlCase) sqlNode;
- SqlNodeList whenOperands = sqlCase.getWhenOperands();
- findSqlIdentifier(whenOperands.getList(), columnNameList);
+ } else if (sqlNode instanceof SqlCall) {
+ SqlCall sqlCall = (SqlCall) sqlNode;
+ findSqlIdentifier(sqlCall.getOperandList(), columnNameList);
+ } else if (sqlNode instanceof SqlNodeList) {
+ SqlNodeList sqlNodeList = (SqlNodeList) sqlNode;
+ findSqlIdentifier(sqlNodeList.getList(), columnNameList);
}
}
}
@@ -384,10 +480,81 @@ public class TransformParser {
StringBuilder statement = new StringBuilder();
statement.append("SELECT * FROM ");
statement.append(DEFAULT_TABLE);
- if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
+ if (!isNullOrWhitespaceOnly(filterExpression)) {
statement.append(" WHERE ");
statement.append(filterExpression);
}
return parseSelect(statement.toString());
}
+
+ public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) {
+ if (sqlNode instanceof SqlCall) {
+ SqlCall sqlCall = (SqlCall) sqlNode;
+
+ List operands = sqlCall.getOperandList();
+ IntStream.range(0, sqlCall.operandCount())
+ .forEach(
+ i ->
+ sqlCall.setOperand(
+ i, rewriteExpression(operands.get(i), replaceMap)));
+ return sqlCall;
+ } else if (sqlNode instanceof SqlIdentifier) {
+ SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
+ if (sqlIdentifier.names.size() == 1) {
+ String name = sqlIdentifier.names.get(0);
+ if (replaceMap.containsKey(name)) {
+ return replaceMap.get(name);
+ }
+ }
+ return sqlIdentifier;
+ } else if (sqlNode instanceof SqlNodeList) {
+ SqlNodeList sqlNodeList = (SqlNodeList) sqlNode;
+ IntStream.range(0, sqlNodeList.size())
+ .forEach(
+ i ->
+ sqlNodeList.set(
+ i, rewriteExpression(sqlNodeList.get(i), replaceMap)));
+ return sqlNodeList;
+ } else {
+ return sqlNode;
+ }
+ }
+
+ // Filter expression might hold reference to a calculated column, which causes confusion about
+ // the sequence of projection and filtering operations. This function rewrites filtering about
+ // calculated columns to circumvent this problem.
+ public static String normalizeFilter(String projection, String filter) {
+ if (isNullOrWhitespaceOnly(projection) || isNullOrWhitespaceOnly(filter)) {
+ return filter;
+ }
+
+ SqlSelect sqlSelect = parseProjectionExpression(projection);
+ if (sqlSelect.getSelectList().isEmpty()) {
+ return filter;
+ }
+
+ Map calculatedExpression = new HashMap<>();
+ for (SqlNode sqlNode : sqlSelect.getSelectList()) {
+ if (sqlNode instanceof SqlBasicCall) {
+ SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
+ if (SqlKind.AS.equals(sqlBasicCall.getOperator().kind)) {
+ List operandList = sqlBasicCall.getOperandList();
+ if (operandList.size() == 2) {
+ SqlIdentifier alias = (SqlIdentifier) operandList.get(1);
+ String name = alias.names.get(alias.names.size() - 1);
+ SqlNode expression = operandList.get(0);
+ calculatedExpression.put(name, expression);
+ }
+ }
+ }
+ }
+
+ SqlNode sqlFilter = parseFilterExpression(filter).getWhere();
+ sqlFilter = rewriteExpression(sqlFilter, calculatedExpression);
+ if (sqlFilter != null) {
+ return sqlFilter.toString();
+ } else {
+ return filter;
+ }
+ }
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
index 13ddbb6b4..6d66fa878 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
@@ -39,13 +39,14 @@ import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
+import java.io.Serializable;
import java.time.Duration;
import java.util.Optional;
/** Operator for processing events from {@link SchemaOperator} before {@link EventPartitioner}. */
@Internal
public class PrePartitionOperator extends AbstractStreamOperator
- implements OneInputStreamOperator {
+ implements OneInputStreamOperator, Serializable {
private static final long serialVersionUID = 1L;
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
similarity index 83%
rename from flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
rename to flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index a19722ef1..34b710374 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -33,11 +33,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.math.BigDecimal;
-/** Unit tests for the {@link TransformDataOperator}. */
-public class TransformDataOperatorTest {
+/** Unit tests for the {@link PostTransformOperator}. */
+public class PostTransformOperatorTest {
private static final TableId CUSTOMERS_TABLEID =
TableId.tableId("my_company", "my_branch", "customers");
private static final Schema CUSTOMERS_SCHEMA =
@@ -70,12 +71,17 @@ public class TransformDataOperatorTest {
private static final TableId METADATA_TABLEID =
TableId.tableId("my_company", "my_branch", "metadata_table");
private static final Schema METADATA_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ private static final Schema EXPECTED_METADATA_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("identifier_name", DataTypes.STRING())
- .physicalColumn("__namespace_name__", DataTypes.STRING())
- .physicalColumn("__schema_name__", DataTypes.STRING())
- .physicalColumn("__table_name__", DataTypes.STRING())
+ .physicalColumn("__namespace_name__", DataTypes.STRING().notNull())
+ .physicalColumn("__schema_name__", DataTypes.STRING().notNull())
+ .physicalColumn("__table_name__", DataTypes.STRING().notNull())
.primaryKey("col1")
.build();
@@ -126,8 +132,8 @@ public class TransformDataOperatorTest {
.physicalColumn("nullBigint", DataTypes.BIGINT())
.physicalColumn("nullFloat", DataTypes.FLOAT())
.physicalColumn("nullDouble", DataTypes.DOUBLE())
- .physicalColumn("nullChar", DataTypes.CHAR(1))
- .physicalColumn("nullVarchar", DataTypes.VARCHAR(1))
+ .physicalColumn("nullChar", DataTypes.STRING())
+ .physicalColumn("nullVarchar", DataTypes.STRING())
.physicalColumn("nullDecimal", DataTypes.DECIMAL(4, 2))
.physicalColumn("nullTimestamp", DataTypes.TIMESTAMP(3))
.primaryKey("col1")
@@ -145,8 +151,8 @@ public class TransformDataOperatorTest {
.physicalColumn("castBigint", DataTypes.BIGINT())
.physicalColumn("castFloat", DataTypes.FLOAT())
.physicalColumn("castDouble", DataTypes.DOUBLE())
- .physicalColumn("castChar", DataTypes.CHAR(1))
- .physicalColumn("castVarchar", DataTypes.VARCHAR(1))
+ .physicalColumn("castChar", DataTypes.STRING())
+ .physicalColumn("castVarchar", DataTypes.STRING())
.physicalColumn("castDecimal", DataTypes.DECIMAL(4, 2))
.physicalColumn("castTimestamp", DataTypes.TIMESTAMP(3))
.primaryKey("col1")
@@ -170,16 +176,66 @@ public class TransformDataOperatorTest {
.primaryKey("col1")
.build();
+ private static final TableId REDUCE_TABLEID =
+ TableId.tableId("my_company", "my_branch", "reduce_table");
+
+ private static final Schema REDUCE_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("ref1", DataTypes.STRING())
+ .physicalColumn("ref2", DataTypes.INT())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final Schema EXPECTED_REDUCE_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("uid", DataTypes.STRING())
+ .physicalColumn("newage", DataTypes.INT())
+ .physicalColumn("ref1", DataTypes.STRING())
+ .physicalColumn("seventeen", DataTypes.INT())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final TableId WILDCARD_TABLEID =
+ TableId.tableId("my_company", "my_branch", "wildcard_table");
+
+ private static final Schema WILDCARD_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final Schema EXPECTED_WILDCARD_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("newage", DataTypes.INT())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
@Test
void testDataChangeEventTransform() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
CUSTOMERS_TABLEID.identifier(),
"*, concat(col1,col2) col12",
"col1 = '1'")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -252,7 +308,6 @@ public class TransformDataOperatorTest {
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect));
- transform.processElement(new StreamRecord<>(insertEventIgnored));
transform.processElement(new StreamRecord<>(updateEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
@@ -261,8 +316,8 @@ public class TransformDataOperatorTest {
@Test
void testDataChangeEventTransformTwice() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
CUSTOMERS_TABLEID.identifier(),
"*, concat(col1, '1') col12",
@@ -272,7 +327,7 @@ public class TransformDataOperatorTest {
"*, concat(col1, '2') col12",
"col1 = '2'")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -366,11 +421,11 @@ public class TransformDataOperatorTest {
@Test
void testDataChangeEventTransformProjectionDataTypeConvert() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
- .addTransform(DATATYPE_TABLEID.identifier(), "*", null)
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(DATATYPE_TABLEID.identifier(), "*", null, null, null, null)
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -412,14 +467,14 @@ public class TransformDataOperatorTest {
@Test
void testMetadataTransform() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
METADATA_TABLEID.identifier(),
"*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name, __namespace_name__, __schema_name__, __table_name__",
" __table_name__ = 'metadata_table' ")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -428,16 +483,17 @@ public class TransformDataOperatorTest {
CreateTableEvent createTableEvent = new CreateTableEvent(METADATA_TABLEID, METADATA_SCHEMA);
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(((RowType) METADATA_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) EXPECTED_METADATA_SCHEMA.toRowDataType()));
// Insert
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
METADATA_TABLEID,
- recordDataGenerator.generate(
- new Object[] {new BinaryStringData("1"), null, null, null, null}));
+ recordDataGenerator.generate(new Object[] {new BinaryStringData("1")}));
DataChangeEvent insertEventExpect =
DataChangeEvent.insertEvent(
METADATA_TABLEID,
- recordDataGenerator.generate(
+ expectedRecordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"),
new BinaryStringData("my_company.my_branch.metadata_table"),
@@ -450,7 +506,7 @@ public class TransformDataOperatorTest {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
- new CreateTableEvent(METADATA_TABLEID, METADATA_SCHEMA)));
+ new CreateTableEvent(METADATA_TABLEID, EXPECTED_METADATA_SCHEMA)));
transform.processElement(new StreamRecord<>(insertEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
@@ -459,14 +515,14 @@ public class TransformDataOperatorTest {
@Test
void testMetadataASTransform() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
METADATA_AS_TABLEID.identifier(),
"sid, name, UPPER(name) as name_upper, __table_name__ as tbname",
"sid < 3")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -506,8 +562,8 @@ public class TransformDataOperatorTest {
@Test
void testTimestampTransform() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
TIMESTAMP_TABLEID.identifier(),
"col1, IF(LOCALTIME = CURRENT_TIME, 1, 0) as time_equal,"
@@ -516,7 +572,7 @@ public class TransformDataOperatorTest {
"LOCALTIMESTAMP = CAST(CURRENT_TIMESTAMP AS TIMESTAMP)")
.addTimezone("UTC")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -551,8 +607,8 @@ public class TransformDataOperatorTest {
@Test
void testTimestampDiffTransform() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
TIMESTAMPDIFF_TABLEID.identifier(),
"col1, TIMESTAMP_DIFF('SECOND', LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff,"
@@ -569,7 +625,7 @@ public class TransformDataOperatorTest {
"col1='2'")
.addTimezone("Asia/Shanghai")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -620,15 +676,15 @@ public class TransformDataOperatorTest {
@Test
void testTimezoneTransform() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
TIMEZONE_TABLEID.identifier(),
"col1, DATE_FORMAT(TO_TIMESTAMP('2024-08-01 00:00:00'), 'yyyy-MM-dd HH:mm:ss') as datetime",
null)
.addTimezone("UTC")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -665,8 +721,8 @@ public class TransformDataOperatorTest {
@Test
void testNullCastTransform() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
NULL_TABLEID.identifier(),
"col1"
@@ -684,7 +740,7 @@ public class TransformDataOperatorTest {
+ ",cast(colString as TIMESTAMP(3)) as nullTimestamp",
null)
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -725,8 +781,8 @@ public class TransformDataOperatorTest {
@Test
void testCastTransform() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
CAST_TABLEID.identifier(),
"col1"
@@ -878,7 +934,7 @@ public class TransformDataOperatorTest {
+ ",cast('1970-01-01T00:00:01.234' as TIMESTAMP(3)) as castTimestamp",
"col1 = '10'")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -1297,8 +1353,8 @@ public class TransformDataOperatorTest {
@Test
void testCastErrorTransform() throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
CAST_TABLEID.identifier(),
"col1"
@@ -1312,10 +1368,10 @@ public class TransformDataOperatorTest {
+ ",cast(castFloat as char) as castChar"
+ ",cast(castFloat as varchar) as castVarchar"
+ ",cast(castFloat as DECIMAL(4,2)) as castDecimal"
- + ",cast(castFloat as TIMESTAMP) as castTimestamp",
+ + ",cast(castFloat as TIMESTAMP(3)) as castTimestamp",
"col1 = '1'")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -1417,15 +1473,15 @@ public class TransformDataOperatorTest {
}
private void testExpressionConditionTransform(String expression) throws Exception {
- TransformDataOperator transform =
- TransformDataOperator.newBuilder()
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
.addTransform(
CONDITION_TABLEID.identifier(),
"col1, IF(" + expression + ", true, false) as condition_result",
expression)
.addTimezone("UTC")
.build();
- EventOperatorTestHarness
+ EventOperatorTestHarness
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
@@ -1457,4 +1513,202 @@ public class TransformDataOperatorTest {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect));
}
+
+ @Test
+ public void testReduceSchemaTransform() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ REDUCE_TABLEID.identifier(),
+ "id, upper(id) as uid, age + 1 as newage, lower(ref1) as ref1, 17 as seventeen",
+ "newage > 17 and ref2 > 17")
+ .addTimezone("GMT")
+ .build();
+ EventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent = new CreateTableEvent(REDUCE_TABLEID, REDUCE_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) REDUCE_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) EXPECTED_REDUCE_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ REDUCE_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Reference"),
+ 42
+ }));
+
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ REDUCE_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ new BinaryStringData("ID001"),
+ 18,
+ new BinaryStringData("reference"),
+ 17
+ }));
+
+ // Update
+ DataChangeEvent updateEvent =
+ DataChangeEvent.updateEvent(
+ REDUCE_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Reference"),
+ 42
+ }),
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 18,
+ new BinaryStringData("UpdatedReference"),
+ 41
+ }));
+
+ DataChangeEvent updateEventExpect =
+ DataChangeEvent.updateEvent(
+ REDUCE_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ new BinaryStringData("ID001"),
+ 18,
+ new BinaryStringData("reference"),
+ 17
+ }),
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ new BinaryStringData("ID001"),
+ 19,
+ new BinaryStringData("updatedreference"),
+ 17
+ }));
+
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(REDUCE_TABLEID, EXPECTED_REDUCE_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+
+ transform.processElement(new StreamRecord<>(updateEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(updateEventExpect));
+ }
+
+ @Test
+ public void testWildcardSchemaTransform() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ WILDCARD_TABLEID.identifier(),
+ "*, age + 1 as newage",
+ "newage > 17")
+ .addTimezone("GMT")
+ .build();
+ EventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent = new CreateTableEvent(WILDCARD_TABLEID, WILDCARD_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) WILDCARD_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) EXPECTED_WILDCARD_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ WILDCARD_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Alice"),
+ }));
+
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ WILDCARD_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Alice"),
+ 18
+ }));
+
+ // Update
+ DataChangeEvent updateEvent =
+ DataChangeEvent.updateEvent(
+ WILDCARD_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Alice"),
+ }),
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 18,
+ new BinaryStringData("Arisu"),
+ }));
+
+ DataChangeEvent updateEventExpect =
+ DataChangeEvent.updateEvent(
+ WILDCARD_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Alice"),
+ 18
+ }),
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 18,
+ new BinaryStringData("Arisu"),
+ 19
+ }));
+
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(WILDCARD_TABLEID, EXPECTED_WILDCARD_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+
+ transform.processElement(new StreamRecord<>(updateEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(updateEventExpect));
+ }
}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
new file mode 100644
index 000000000..c447d51c2
--- /dev/null
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+
+/** Unit tests for the {@link PreTransformOperator}. */
+public class PreTransformOperatorTest {
+ private static final TableId CUSTOMERS_TABLEID =
+ TableId.tableId("my_company", "my_branch", "customers");
+ private static final Schema CUSTOMERS_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ private static final Schema CUSTOMERS_LATEST_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .physicalColumn("col3", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ private static final Schema EXPECT_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .primaryKey("col2")
+ .partitionKey("col12")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+ private static final Schema EXPECT_LATEST_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .physicalColumn("col3", DataTypes.STRING())
+ .primaryKey("col2")
+ .partitionKey("col12")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final Schema NULLABILITY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final Schema EXPECTED_NULLABILITY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final Schema REFERENCED_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("ref1", DataTypes.STRING())
+ .physicalColumn("ref2", DataTypes.INT())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final Schema EXPECTED_REFERENCED_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("ref1", DataTypes.STRING())
+ .physicalColumn("ref2", DataTypes.INT())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final Schema WILDCARD_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final Schema EXPECTED_WILDCARD_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .partitionKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final TableId METADATA_TABLEID =
+ TableId.tableId("my_company", "my_branch", "metadata_table");
+ private static final Schema METADATA_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final Schema EXPECTED_METADATA_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
+ .build();
+
+ private static final TableId METADATA_AS_TABLEID =
+ TableId.tableId("my_company", "my_branch", "metadata_as_table");
+ private static final Schema METADATA_AS_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("sid", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("name_upper", DataTypes.STRING())
+ .physicalColumn("tbname", DataTypes.STRING())
+ .primaryKey("sid")
+ .build();
+
+ @Test
+ void testEventTransform() throws Exception {
+ PreTransformOperator transform =
+ PreTransformOperator.newBuilder()
+ .addTransform(
+ CUSTOMERS_TABLEID.identifier(),
+ "*, concat(col1,col2) col12",
+ null,
+ "col2",
+ "col12",
+ "key1=value1,key2=value2")
+ .build();
+ EventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
+ // Add column
+ AddColumnEvent.ColumnWithPosition columnWithPosition =
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("col3", DataTypes.STRING()));
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ CUSTOMERS_TABLEID, Collections.singletonList(columnWithPosition));
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator recordDataGeneratorExpect =
+ new BinaryRecordDataGenerator(((RowType) EXPECT_LATEST_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("2"),
+ new BinaryStringData("3"),
+ }));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGeneratorExpect.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("2"),
+ new BinaryStringData("3")
+ }));
+
+ // Update
+ DataChangeEvent updateEvent =
+ DataChangeEvent.updateEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("2"),
+ new BinaryStringData("3")
+ }),
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("3"),
+ new BinaryStringData("3")
+ }));
+ DataChangeEvent updateEventExpect =
+ DataChangeEvent.updateEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGeneratorExpect.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("2"),
+ new BinaryStringData("3")
+ }),
+ recordDataGeneratorExpect.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("3"),
+ new BinaryStringData("3")
+ }));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(new CreateTableEvent(CUSTOMERS_TABLEID, EXPECT_SCHEMA)));
+ transform.processElement(new StreamRecord<>(addColumnEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(addColumnEvent));
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+ transform.processElement(new StreamRecord<>(updateEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(updateEventExpect));
+ }
+
+ @Test
+ public void testNullabilityColumn() throws Exception {
+ PreTransformOperator transform =
+ PreTransformOperator.newBuilder()
+ .addTransform(
+ CUSTOMERS_TABLEID.identifier(),
+ "id, upper(id) uid, name, upper(name) uname",
+ null,
+ "id",
+ "id",
+ "key1=value1,key2=value2")
+ .build();
+ EventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(CUSTOMERS_TABLEID, NULLABILITY_SCHEMA);
+ transform.processElement(new StreamRecord<>(createTableEvent));
+
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ CUSTOMERS_TABLEID, EXPECTED_NULLABILITY_SCHEMA)));
+ }
+
+ @Test
+ public void testReduceTransformColumn() throws Exception {
+ PreTransformOperator transform =
+ PreTransformOperator.newBuilder()
+ .addTransform(
+ CUSTOMERS_TABLEID.identifier(),
+ "id, upper(id) as uid, age + 1 as newage, lower(ref1) as ref1",
+ "newage > 17 and ref2 > 17",
+ "id",
+ "id",
+ "key1=value1,key2=value2")
+ .build();
+ EventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(CUSTOMERS_TABLEID, REFERENCED_SCHEMA);
+ transform.processElement(new StreamRecord<>(createTableEvent));
+
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ CUSTOMERS_TABLEID, EXPECTED_REFERENCED_SCHEMA)));
+
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) REFERENCED_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator recordDataGeneratorExpect =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_REFERENCED_SCHEMA.toRowDataType()));
+
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Alice"),
+ new BinaryStringData("Reference"),
+ 42,
+ }));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGeneratorExpect.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Reference"),
+ 42
+ }));
+
+ // Update
+ DataChangeEvent updateEvent =
+ DataChangeEvent.updateEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Alice"),
+ new BinaryStringData("Reference"),
+ 42,
+ }),
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 18,
+ new BinaryStringData("Arisu"),
+ new BinaryStringData("UpdatedReference"),
+ 41,
+ }));
+ DataChangeEvent updateEventExpect =
+ DataChangeEvent.updateEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGeneratorExpect.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Reference"),
+ 42
+ }),
+ recordDataGeneratorExpect.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 18,
+ new BinaryStringData("UpdatedReference"),
+ 41
+ }));
+
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+
+ transform.processElement(new StreamRecord<>(updateEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(updateEventExpect));
+ }
+
+ @Test
+ public void testWildcardTransformColumn() throws Exception {
+ PreTransformOperator transform =
+ PreTransformOperator.newBuilder()
+ .addTransform(
+ CUSTOMERS_TABLEID.identifier(),
+ "*, age + 1 as newage",
+ "newage > 17",
+ "id",
+ "id",
+ "key1=value1,key2=value2")
+ .build();
+ EventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(CUSTOMERS_TABLEID, WILDCARD_SCHEMA);
+ transform.processElement(new StreamRecord<>(createTableEvent));
+
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(CUSTOMERS_TABLEID, EXPECTED_WILDCARD_SCHEMA)));
+
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) WILDCARD_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator recordDataGeneratorExpect =
+ new BinaryRecordDataGenerator(((RowType) EXPECTED_WILDCARD_SCHEMA.toRowDataType()));
+
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"), 17, new BinaryStringData("Alice")
+ }));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGeneratorExpect.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Alice"),
+ }));
+
+ // Update
+ DataChangeEvent updateEvent =
+ DataChangeEvent.updateEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"), 17, new BinaryStringData("Alice")
+ }),
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("id001"), 18, new BinaryStringData("Arisu")
+ }));
+ DataChangeEvent updateEventExpect =
+ DataChangeEvent.updateEvent(
+ CUSTOMERS_TABLEID,
+ recordDataGeneratorExpect.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 17,
+ new BinaryStringData("Alice"),
+ }),
+ recordDataGeneratorExpect.generate(
+ new Object[] {
+ new BinaryStringData("id001"),
+ 18,
+ new BinaryStringData("Arisu"),
+ }));
+
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+
+ transform.processElement(new StreamRecord<>(updateEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(updateEventExpect));
+ }
+
+ @Test
+ void testMetadataTransform() throws Exception {
+ PreTransformOperator transform =
+ PreTransformOperator.newBuilder()
+ .addTransform(
+ METADATA_TABLEID.identifier(),
+ "*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name, __namespace_name__, __schema_name__, __table_name__",
+ " __table_name__ = 'metadata_table' ")
+ .build();
+
+ EventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent = new CreateTableEvent(METADATA_TABLEID, METADATA_SCHEMA);
+ transform.processElement(new StreamRecord<>(createTableEvent));
+
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(METADATA_TABLEID, EXPECTED_METADATA_SCHEMA)));
+ }
+}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperatorTest.java
deleted file mode 100644
index 82992a64e..000000000
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperatorTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.transform;
-
-import org.apache.flink.cdc.common.data.binary.BinaryStringData;
-import org.apache.flink.cdc.common.event.AddColumnEvent;
-import org.apache.flink.cdc.common.event.CreateTableEvent;
-import org.apache.flink.cdc.common.event.DataChangeEvent;
-import org.apache.flink.cdc.common.event.Event;
-import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.schema.Column;
-import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.types.DataTypes;
-import org.apache.flink.cdc.common.types.RowType;
-import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
-import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-
-import java.util.Collections;
-
-/** Unit tests for the {@link TransformSchemaOperator}. */
-public class TransformSchemaOperatorTest {
- private static final TableId CUSTOMERS_TABLEID =
- TableId.tableId("my_company", "my_branch", "customers");
- private static final Schema CUSTOMERS_SCHEMA =
- Schema.newBuilder()
- .physicalColumn("col1", DataTypes.STRING())
- .physicalColumn("col2", DataTypes.STRING())
- .primaryKey("col1")
- .build();
- private static final Schema CUSTOMERS_LATEST_SCHEMA =
- Schema.newBuilder()
- .physicalColumn("col1", DataTypes.STRING())
- .physicalColumn("col2", DataTypes.STRING())
- .physicalColumn("col3", DataTypes.STRING())
- .primaryKey("col1")
- .build();
- private static final Schema EXPECT_SCHEMA =
- Schema.newBuilder()
- .physicalColumn("col1", DataTypes.STRING())
- .physicalColumn("col2", DataTypes.STRING())
- .physicalColumn("col12", DataTypes.STRING())
- .primaryKey("col2")
- .partitionKey("col12")
- .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
- .build();
- private static final Schema EXPECT_LATEST_SCHEMA =
- Schema.newBuilder()
- .physicalColumn("col1", DataTypes.STRING())
- .physicalColumn("col2", DataTypes.STRING())
- .physicalColumn("col12", DataTypes.STRING())
- .physicalColumn("col3", DataTypes.STRING())
- .primaryKey("col2")
- .partitionKey("col12")
- .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
- .build();
-
- private static final Schema NULLABILITY_SCHEMA =
- Schema.newBuilder()
- .physicalColumn("id", DataTypes.STRING().notNull())
- .physicalColumn("name", DataTypes.STRING())
- .primaryKey("id")
- .partitionKey("id")
- .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
- .build();
-
- private static final Schema EXPECTED_NULLABILITY_SCHEMA =
- Schema.newBuilder()
- .physicalColumn("id", DataTypes.STRING().notNull())
- .physicalColumn("uid", DataTypes.STRING())
- .physicalColumn("name", DataTypes.STRING())
- .physicalColumn("uname", DataTypes.STRING())
- .primaryKey("id")
- .partitionKey("id")
- .options(ImmutableMap.of("key1", "value1", "key2", "value2"))
- .build();
-
- @Test
- void testEventTransform() throws Exception {
- TransformSchemaOperator transform =
- TransformSchemaOperator.newBuilder()
- .addTransform(
- CUSTOMERS_TABLEID.identifier(),
- "*, concat(col1,col2) col12",
- "col2",
- "col12",
- "key1=value1,key2=value2")
- .build();
- EventOperatorTestHarness
- transformFunctionEventEventOperatorTestHarness =
- new EventOperatorTestHarness<>(transform, 1);
- // Initialization
- transformFunctionEventEventOperatorTestHarness.open();
- // Create table
- CreateTableEvent createTableEvent =
- new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
- // Add column
- AddColumnEvent.ColumnWithPosition columnWithPosition =
- new AddColumnEvent.ColumnWithPosition(
- Column.physicalColumn("col3", DataTypes.STRING()));
- AddColumnEvent addColumnEvent =
- new AddColumnEvent(
- CUSTOMERS_TABLEID, Collections.singletonList(columnWithPosition));
- BinaryRecordDataGenerator recordDataGenerator =
- new BinaryRecordDataGenerator(((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType()));
- BinaryRecordDataGenerator recordDataGeneratorExpect =
- new BinaryRecordDataGenerator(((RowType) EXPECT_LATEST_SCHEMA.toRowDataType()));
- // Insert
- DataChangeEvent insertEvent =
- DataChangeEvent.insertEvent(
- CUSTOMERS_TABLEID,
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("2"),
- new BinaryStringData("3"),
- }));
- DataChangeEvent insertEventExpect =
- DataChangeEvent.insertEvent(
- CUSTOMERS_TABLEID,
- recordDataGeneratorExpect.generate(
- new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("2"),
- null,
- new BinaryStringData("3")
- }));
-
- // Update
- DataChangeEvent updateEvent =
- DataChangeEvent.updateEvent(
- CUSTOMERS_TABLEID,
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("2"),
- new BinaryStringData("3")
- }),
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("3"),
- new BinaryStringData("3")
- }));
- DataChangeEvent updateEventExpect =
- DataChangeEvent.updateEvent(
- CUSTOMERS_TABLEID,
- recordDataGeneratorExpect.generate(
- new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("2"),
- null,
- new BinaryStringData("3")
- }),
- recordDataGeneratorExpect.generate(
- new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("3"),
- null,
- new BinaryStringData("3")
- }));
- transform.processElement(new StreamRecord<>(createTableEvent));
- Assertions.assertThat(
- transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(
- new StreamRecord<>(new CreateTableEvent(CUSTOMERS_TABLEID, EXPECT_SCHEMA)));
- transform.processElement(new StreamRecord<>(addColumnEvent));
- Assertions.assertThat(
- transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(new StreamRecord<>(addColumnEvent));
- transform.processElement(new StreamRecord<>(insertEvent));
- Assertions.assertThat(
- transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(new StreamRecord<>(insertEventExpect));
- transform.processElement(new StreamRecord<>(updateEvent));
- Assertions.assertThat(
- transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(new StreamRecord<>(updateEventExpect));
- }
-
- @Test
- public void testNullabilityColumn() throws Exception {
- TransformSchemaOperator transform =
- TransformSchemaOperator.newBuilder()
- .addTransform(
- CUSTOMERS_TABLEID.identifier(),
- "id, upper(id) uid, name, upper(name) uname",
- "id",
- "id",
- "key1=value1,key2=value2")
- .build();
- EventOperatorTestHarness
- transformFunctionEventEventOperatorTestHarness =
- new EventOperatorTestHarness<>(transform, 1);
- // Initialization
- transformFunctionEventEventOperatorTestHarness.open();
- // Create table
- CreateTableEvent createTableEvent =
- new CreateTableEvent(CUSTOMERS_TABLEID, NULLABILITY_SCHEMA);
- transform.processElement(new StreamRecord<>(createTableEvent));
-
- Assertions.assertThat(
- transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(
- new StreamRecord<>(
- new CreateTableEvent(
- CUSTOMERS_TABLEID, EXPECTED_NULLABILITY_SCHEMA)));
- }
-}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
new file mode 100644
index 000000000..009d700fe
--- /dev/null
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/** Unit tests for the {@link PreTransformOperator} and {@link PostTransformOperator}. */
+public class UnifiedTransformOperatorTest {
+
+ /** Defines a unified transform test cases. */
+ static class UnifiedTransformTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UnifiedTransformTestCase.class);
+
+ private final TableId tableId;
+ private final String projectionExpression;
+ private final String filterExpression;
+
+ private Schema sourceSchema;
+ private Schema preTransformedSchema;
+ private Schema postTransformedSchema;
+
+ private final List sourceEvents;
+ private final List preTransformedEvents;
+ private final List postTransformedEvents;
+
+ private final List sourceFieldGetters;
+ private final List preTransformedFieldGetters;
+ private final List postTransformedFieldGetters;
+
+ private PreTransformOperator preTransformOperator;
+ private PostTransformOperator postTransformOperator;
+
+ private final BinaryRecordDataGenerator sourceRecordGenerator;
+ private final BinaryRecordDataGenerator preTransformedRecordGenerator;
+ private final BinaryRecordDataGenerator postTransformedRecordGenerator;
+
+ private EventOperatorTestHarness preTransformOperatorHarness;
+ private EventOperatorTestHarness postTransformOperatorHarness;
+
+ public static UnifiedTransformTestCase of(
+ TableId tableId,
+ String projectionExpression,
+ String filterExpression,
+ Schema sourceSchema,
+ Schema preTransformedSchema,
+ Schema postTransformedSchema) {
+ return new UnifiedTransformTestCase(
+ tableId,
+ projectionExpression,
+ filterExpression,
+ sourceSchema,
+ preTransformedSchema,
+ postTransformedSchema);
+ }
+
+ private Object[] stringify(Object... objects) {
+ return Arrays.stream(objects)
+ .map(o -> o instanceof String ? new BinaryStringData((String) o) : o)
+ .toArray();
+ }
+
+ public UnifiedTransformTestCase insertSource(Object... record) {
+ sourceEvents.add(
+ DataChangeEvent.insertEvent(
+ tableId, sourceRecordGenerator.generate(stringify(record))));
+ return this;
+ }
+
+ public UnifiedTransformTestCase insertPreTransformed() {
+ preTransformedEvents.add(null);
+ return this;
+ }
+
+ public UnifiedTransformTestCase insertPreTransformed(Object... record) {
+ preTransformedEvents.add(
+ DataChangeEvent.insertEvent(
+ tableId, preTransformedRecordGenerator.generate(stringify(record))));
+ return this;
+ }
+
+ public UnifiedTransformTestCase insertPostTransformed() {
+ postTransformedEvents.add(null);
+ return this;
+ }
+
+ public UnifiedTransformTestCase insertPostTransformed(Object... record) {
+ postTransformedEvents.add(
+ DataChangeEvent.insertEvent(
+ tableId, postTransformedRecordGenerator.generate(stringify(record))));
+ return this;
+ }
+
+ public UnifiedTransformTestCase updateSource(Object[] beforeRecord, Object[] afterRecord) {
+ sourceEvents.add(
+ DataChangeEvent.updateEvent(
+ tableId,
+ sourceRecordGenerator.generate(stringify(beforeRecord)),
+ sourceRecordGenerator.generate(stringify(afterRecord))));
+ return this;
+ }
+
+ public UnifiedTransformTestCase updatePreTransformed() {
+ preTransformedEvents.add(null);
+ return this;
+ }
+
+ public UnifiedTransformTestCase updatePreTransformed(
+ Object[] beforeRecord, Object[] afterRecord) {
+ preTransformedEvents.add(
+ DataChangeEvent.updateEvent(
+ tableId,
+ preTransformedRecordGenerator.generate(stringify(beforeRecord)),
+ preTransformedRecordGenerator.generate(stringify(afterRecord))));
+ return this;
+ }
+
+ public UnifiedTransformTestCase updatePostTransformed() {
+ postTransformedEvents.add(null);
+ return this;
+ }
+
+ public UnifiedTransformTestCase updatePostTransformed(
+ Object[] beforeRecord, Object[] afterRecord) {
+ postTransformedEvents.add(
+ DataChangeEvent.updateEvent(
+ tableId,
+ postTransformedRecordGenerator.generate(stringify(beforeRecord)),
+ postTransformedRecordGenerator.generate(stringify(afterRecord))));
+ return this;
+ }
+
+ public UnifiedTransformTestCase deleteSource(Object... record) {
+ sourceEvents.add(
+ DataChangeEvent.deleteEvent(
+ tableId, sourceRecordGenerator.generate(stringify(record))));
+ return this;
+ }
+
+ public UnifiedTransformTestCase deletePreTransformed() {
+ preTransformedEvents.add(null);
+ return this;
+ }
+
+ public UnifiedTransformTestCase deletePreTransformed(Object... record) {
+ preTransformedEvents.add(
+ DataChangeEvent.deleteEvent(
+ tableId, preTransformedRecordGenerator.generate(stringify(record))));
+ return this;
+ }
+
+ public UnifiedTransformTestCase deletePostTransformed() {
+ postTransformedEvents.add(null);
+ return this;
+ }
+
+ public UnifiedTransformTestCase deletePostTransformed(Object... record) {
+ postTransformedEvents.add(
+ DataChangeEvent.deleteEvent(
+ tableId, postTransformedRecordGenerator.generate(stringify(record))));
+ return this;
+ }
+
+ private UnifiedTransformTestCase(
+ TableId tableId,
+ String projectionExpression,
+ String filterExpression,
+ Schema sourceSchema,
+ Schema preTransformedSchema,
+ Schema postTransformedSchema) {
+ this.tableId = tableId;
+ this.projectionExpression = projectionExpression;
+ this.filterExpression = filterExpression;
+
+ this.sourceSchema = sourceSchema;
+ this.preTransformedSchema = preTransformedSchema;
+ this.postTransformedSchema = postTransformedSchema;
+
+ this.sourceRecordGenerator =
+ new BinaryRecordDataGenerator(((RowType) sourceSchema.toRowDataType()));
+ this.preTransformedRecordGenerator =
+ new BinaryRecordDataGenerator(((RowType) preTransformedSchema.toRowDataType()));
+ this.postTransformedRecordGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) postTransformedSchema.toRowDataType()));
+
+ this.sourceEvents = new ArrayList<>();
+ this.preTransformedEvents = new ArrayList<>();
+ this.postTransformedEvents = new ArrayList<>();
+
+ this.sourceEvents.add(new CreateTableEvent(tableId, sourceSchema));
+ this.preTransformedEvents.add(new CreateTableEvent(tableId, preTransformedSchema));
+ this.postTransformedEvents.add(new CreateTableEvent(tableId, postTransformedSchema));
+
+ this.sourceFieldGetters = SchemaUtils.createFieldGetters(sourceSchema);
+ this.preTransformedFieldGetters = SchemaUtils.createFieldGetters(preTransformedSchema);
+ this.postTransformedFieldGetters =
+ SchemaUtils.createFieldGetters(postTransformedSchema);
+ }
+
+ private UnifiedTransformTestCase initializeHarness() throws Exception {
+ preTransformOperator =
+ PreTransformOperator.newBuilder()
+ .addTransform(
+ tableId.identifier(), projectionExpression, filterExpression)
+ .build();
+ preTransformOperatorHarness = new EventOperatorTestHarness<>(preTransformOperator, 1);
+ preTransformOperatorHarness.open();
+
+ postTransformOperator =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ tableId.identifier(), projectionExpression, filterExpression)
+ .build();
+ postTransformOperatorHarness = new EventOperatorTestHarness<>(postTransformOperator, 1);
+ postTransformOperatorHarness.open();
+ return this;
+ }
+
+ private void destroyHarness() throws Exception {
+ if (preTransformOperatorHarness != null) {
+ preTransformOperatorHarness.close();
+ }
+ if (postTransformOperatorHarness != null) {
+ postTransformOperatorHarness.close();
+ }
+ }
+
+ private void logBinaryDataContents(
+ String prefix, Event event, List fieldGetters) {
+ LOG.info("{}: {}", prefix, event);
+ if (event instanceof DataChangeEvent) {
+ LOG.info(
+ " Before Record Data: {}",
+ SchemaUtils.restoreOriginalData(
+ ((DataChangeEvent) event).before(), fieldGetters));
+ LOG.info(
+ " After Record Data: {}",
+ SchemaUtils.restoreOriginalData(
+ ((DataChangeEvent) event).after(), fieldGetters));
+ }
+ }
+
+ public UnifiedTransformTestCase runTests() throws Exception {
+ for (int i = 0; i < sourceEvents.size(); i++) {
+ Event sourceEvent = sourceEvents.get(i);
+ logBinaryDataContents("Source Event", sourceEvent, sourceFieldGetters);
+
+ preTransformOperator.processElement(new StreamRecord<>(sourceEvent));
+
+ Event expectedPreTransformEvent = preTransformedEvents.get(i);
+ Event actualPreTransformEvent =
+ Optional.ofNullable(preTransformOperatorHarness.getOutputRecords().poll())
+ .map(StreamRecord::getValue)
+ .orElse(null);
+
+ logBinaryDataContents(
+ "Expected PreTransform ",
+ expectedPreTransformEvent,
+ preTransformedFieldGetters);
+ logBinaryDataContents(
+ " Actual PreTransform ",
+ actualPreTransformEvent,
+ preTransformedFieldGetters);
+ Assertions.assertThat(actualPreTransformEvent).isEqualTo(expectedPreTransformEvent);
+
+ postTransformOperator.processElement(
+ new StreamRecord<>(preTransformedEvents.get(i)));
+ Event expectedPostTransformEvent = postTransformedEvents.get(i);
+ Event actualPostTransformEvent =
+ Optional.ofNullable(postTransformOperatorHarness.getOutputRecords().poll())
+ .map(StreamRecord::getValue)
+ .orElse(null);
+ logBinaryDataContents(
+ "Expected PostTransform",
+ expectedPostTransformEvent,
+ postTransformedFieldGetters);
+ logBinaryDataContents(
+ " Actual PostTransform",
+ actualPostTransformEvent,
+ postTransformedFieldGetters);
+ Assertions.assertThat(actualPostTransformEvent)
+ .isEqualTo(expectedPostTransformEvent);
+ }
+
+ sourceEvents.clear();
+ preTransformedEvents.clear();
+ postTransformedEvents.clear();
+ return this;
+ }
+ }
+
+ @Test
+ public void testDataChangeEventTransform() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "id, age, id + age as computed",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("computed", DataTypes.INT())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, 17)
+ .insertPostTransformed(1000, 17, 1017)
+ .insertSource(2000, "Bob", 18)
+ .insertPreTransformed(2000, 18)
+ .insertPostTransformed(2000, 18, 2018)
+ .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePreTransformed(new Object[] {2000, 18}, new Object[] {2000, 16})
+ .updatePostTransformed(new Object[] {2000, 18, 2018}, new Object[] {2000, 16, 2016})
+ // filtered out data row
+ .insertSource(50, "Carol", 19)
+ .insertPreTransformed(50, 19)
+ .insertPostTransformed()
+ .deleteSource(1000, "Alice", 17)
+ .deletePreTransformed(1000, 17)
+ .deletePostTransformed(1000, 17, 1017)
+ .runTests()
+ .destroyHarness();
+ }
+
+ @Test
+ public void testSchemaNullabilityTransform() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch", "schema_nullability");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "id, name, age, id + age as computed",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT().notNull())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT().notNull())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT().notNull())
+ .physicalColumn("computed", DataTypes.INT())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, "Alice", 17)
+ .insertPostTransformed(1000, "Alice", 17, 1017)
+ .insertSource(2000, "Bob", 18)
+ .insertPreTransformed(2000, "Bob", 18)
+ .insertPostTransformed(2000, "Bob", 18, 2018)
+ .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePreTransformed(
+ new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePostTransformed(
+ new Object[] {2000, "Bob", 18, 2018},
+ new Object[] {2000, "Barcarolle", 16, 2016})
+ // filtered out data row
+ .insertSource(50, "Carol", 19)
+ .insertPreTransformed(50, "Carol", 19)
+ .insertPostTransformed()
+ .deleteSource(1000, "Alice", 17)
+ .deletePreTransformed(1000, "Alice", 17)
+ .deletePostTransformed(1000, "Alice", 17, 1017)
+ .runTests()
+ .destroyHarness();
+ }
+
+ @Test
+ public void testReduceColumnsTransform() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch", "reduce_column");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "id, upper(id) as uid, age + 1 as newage, lower(ref1) as lowerref",
+ "newage > 17 and ref2 > 17",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("name", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT().notNull())
+ .physicalColumn("ref1", DataTypes.STRING())
+ .physicalColumn("ref2", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT().notNull())
+ .physicalColumn("ref1", DataTypes.STRING())
+ .physicalColumn("ref2", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("uid", DataTypes.STRING())
+ .physicalColumn("newage", DataTypes.INT())
+ .physicalColumn("lowerref", DataTypes.STRING())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource("id001", "Alice", 17, "Reference001", 2021)
+ .insertPreTransformed("id001", 17, "Reference001", 2021)
+ .insertPostTransformed("id001", "ID001", 18, "reference001")
+ // this data record is filtered out since newage <= 17
+ .insertSource("id002", "Bob", 15, "Reference002", 2017)
+ .insertPreTransformed("id002", 15, "Reference002", 2017)
+ .insertPostTransformed()
+ // this data record is filtered out since ref2 <= 17
+ .insertSource("id003", "Bill", 18, "Reference003", 0)
+ .insertPreTransformed("id003", 18, "Reference003", 0)
+ .insertPostTransformed()
+ .insertSource("id004", "Carol", 18, "Reference004", 2018)
+ .insertPreTransformed("id004", 18, "Reference004", 2018)
+ .insertPostTransformed("id004", "ID004", 19, "reference004")
+ // test update event transform
+ .updateSource(
+ new Object[] {"id004", "Carol", 18, "Reference004", 2018},
+ new Object[] {"id004", "Colin", 18, "NeoReference004", 2018})
+ .updatePreTransformed(
+ new Object[] {"id004", 18, "Reference004", 2018},
+ new Object[] {"id004", 18, "NeoReference004", 2018})
+ .updatePostTransformed(
+ new Object[] {"id004", "ID004", 19, "reference004"},
+ new Object[] {"id004", "ID004", 19, "neoreference004"})
+ // updated value to a filtered out condition
+ .updateSource(
+ new Object[] {"id004", "Colin", 18, "NeoReference004", 2018},
+ new Object[] {"id004", "Colin", 10, "NeoReference004", 2018})
+ .updatePreTransformed(
+ new Object[] {"id004", 18, "NeoReference004", 2018},
+ new Object[] {"id004", 10, "NeoReference004", 2018})
+ .updatePostTransformed()
+ .deleteSource("id001", "Alice", 17, "Reference001", 2021)
+ .deletePreTransformed("id001", 17, "Reference001", 2021)
+ .deletePostTransformed("id001", "ID001", 18, "reference001")
+ .runTests()
+ .destroyHarness();
+ }
+
+ @Test
+ public void testWildcardTransform() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch", "wildcard");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "*, id + age as computed",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("computed", DataTypes.INT())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, "Alice", 17)
+ .insertPostTransformed(1000, "Alice", 17, 1017)
+ .insertSource(2000, "Bob", 18)
+ .insertPreTransformed(2000, "Bob", 18)
+ .insertPostTransformed(2000, "Bob", 18, 2018)
+ .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePreTransformed(
+ new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePostTransformed(
+ new Object[] {2000, "Bob", 18, 2018},
+ new Object[] {2000, "Barcarolle", 16, 2016})
+ // filtered out data row
+ .insertSource(50, "Carol", 19)
+ .insertPreTransformed(50, "Carol", 19)
+ .insertPostTransformed()
+ .deleteSource(1000, "Alice", 17)
+ .deletePreTransformed(1000, "Alice", 17)
+ .deletePostTransformed(1000, "Alice", 17, 1017)
+ .runTests()
+ .destroyHarness();
+
+ UnifiedTransformTestCase.of(
+ tableId,
+ "id + age as computed, *",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("computed", DataTypes.INT())
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, "Alice", 17)
+ .insertPostTransformed(1017, 1000, "Alice", 17)
+ .insertSource(2000, "Bob", 18)
+ .insertPreTransformed(2000, "Bob", 18)
+ .insertPostTransformed(2018, 2000, "Bob", 18)
+ .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePreTransformed(
+ new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePostTransformed(
+ new Object[] {2018, 2000, "Bob", 18},
+ new Object[] {2016, 2000, "Barcarolle", 16})
+ // filtered out data row
+ .insertSource(50, "Carol", 19)
+ .insertPreTransformed(50, "Carol", 19)
+ .insertPostTransformed()
+ .deleteSource(1000, "Alice", 17)
+ .deletePreTransformed(1000, "Alice", 17)
+ .deletePostTransformed(1017, 1000, "Alice", 17)
+ .runTests()
+ .destroyHarness();
+ }
+
+ @Test
+ public void testMetadataTransform() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch", "metadata");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "*, __namespace_name__, __schema_name__, __table_name__",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("__namespace_name__", DataTypes.STRING().notNull())
+ .physicalColumn("__schema_name__", DataTypes.STRING().notNull())
+ .physicalColumn("__table_name__", DataTypes.STRING().notNull())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, "Alice", 17)
+ .insertPostTransformed(1000, "Alice", 17, "my_company", "my_branch", "metadata")
+ .insertSource(2000, "Bob", 18)
+ .insertPreTransformed(2000, "Bob", 18)
+ .insertPostTransformed(2000, "Bob", 18, "my_company", "my_branch", "metadata")
+ .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePreTransformed(
+ new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePostTransformed(
+ new Object[] {2000, "Bob", 18, "my_company", "my_branch", "metadata"},
+ new Object[] {
+ 2000, "Barcarolle", 16, "my_company", "my_branch", "metadata"
+ })
+ // filtered out data row
+ .insertSource(50, "Carol", 19)
+ .insertPreTransformed(50, "Carol", 19)
+ .insertPostTransformed()
+ .deleteSource(1000, "Alice", 17)
+ .deletePreTransformed(1000, "Alice", 17)
+ .deletePostTransformed(1000, "Alice", 17, "my_company", "my_branch", "metadata")
+ .runTests()
+ .destroyHarness();
+ }
+
+ @Test
+ public void testCalculatedMetadataTransform() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch", "metadata_transform");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("identifier_name", DataTypes.STRING())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, "Alice", 17)
+ .insertPostTransformed(1000, "Alice", 17, "my_company.my_branch.metadata_transform")
+ .insertSource(2000, "Bob", 18)
+ .insertPreTransformed(2000, "Bob", 18)
+ .insertPostTransformed(2000, "Bob", 18, "my_company.my_branch.metadata_transform")
+ .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePreTransformed(
+ new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePostTransformed(
+ new Object[] {2000, "Bob", 18, "my_company.my_branch.metadata_transform"},
+ new Object[] {
+ 2000, "Barcarolle", 16, "my_company.my_branch.metadata_transform"
+ })
+ // filtered out data row
+ .insertSource(50, "Carol", 19)
+ .insertPreTransformed(50, "Carol", 19)
+ .insertPostTransformed()
+ .deleteSource(1000, "Alice", 17)
+ .deletePreTransformed(1000, "Alice", 17)
+ .deletePostTransformed(1000, "Alice", 17, "my_company.my_branch.metadata_transform")
+ .runTests()
+ .destroyHarness();
+
+ UnifiedTransformTestCase.of(
+ tableId,
+ "__namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, *",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("identifier_name", DataTypes.STRING())
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, "Alice", 17)
+ .insertPostTransformed("my_company.my_branch.metadata_transform", 1000, "Alice", 17)
+ .insertSource(2000, "Bob", 18)
+ .insertPreTransformed(2000, "Bob", 18)
+ .insertPostTransformed("my_company.my_branch.metadata_transform", 2000, "Bob", 18)
+ .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePreTransformed(
+ new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePostTransformed(
+ new Object[] {"my_company.my_branch.metadata_transform", 2000, "Bob", 18},
+ new Object[] {
+ "my_company.my_branch.metadata_transform", 2000, "Barcarolle", 16
+ })
+ // filtered out data row
+ .insertSource(50, "Carol", 19)
+ .insertPreTransformed(50, "Carol", 19)
+ .insertPostTransformed()
+ .deleteSource(1000, "Alice", 17)
+ .deletePreTransformed(1000, "Alice", 17)
+ .deletePostTransformed("my_company.my_branch.metadata_transform", 1000, "Alice", 17)
+ .runTests()
+ .destroyHarness();
+ }
+
+ @Test
+ public void testMetadataAndCalculatedTransform() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch", "metadata_transform");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __namespace_name__, __schema_name__, __table_name__",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("identifier_name", DataTypes.STRING())
+ .physicalColumn("__namespace_name__", DataTypes.STRING().notNull())
+ .physicalColumn("__schema_name__", DataTypes.STRING().notNull())
+ .physicalColumn("__table_name__", DataTypes.STRING().notNull())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, "Alice", 17)
+ .insertPostTransformed(
+ 1000,
+ "Alice",
+ 17,
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform")
+ .insertSource(2000, "Bob", 18)
+ .insertPreTransformed(2000, "Bob", 18)
+ .insertPostTransformed(
+ 2000,
+ "Bob",
+ 18,
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform")
+ .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePreTransformed(
+ new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePostTransformed(
+ new Object[] {
+ 2000,
+ "Bob",
+ 18,
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform"
+ },
+ new Object[] {
+ 2000,
+ "Barcarolle",
+ 16,
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform"
+ })
+ // filtered out data row
+ .insertSource(50, "Carol", 19)
+ .insertPreTransformed(50, "Carol", 19)
+ .insertPostTransformed()
+ .deleteSource(1000, "Alice", 17)
+ .deletePreTransformed(1000, "Alice", 17)
+ .deletePostTransformed(
+ 1000,
+ "Alice",
+ 17,
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform")
+ .runTests()
+ .destroyHarness();
+
+ UnifiedTransformTestCase.of(
+ tableId,
+ "__namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __namespace_name__, __schema_name__, __table_name__, *",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("identifier_name", DataTypes.STRING())
+ .physicalColumn("__namespace_name__", DataTypes.STRING().notNull())
+ .physicalColumn("__schema_name__", DataTypes.STRING().notNull())
+ .physicalColumn("__table_name__", DataTypes.STRING().notNull())
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, "Alice", 17)
+ .insertPostTransformed(
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform",
+ 1000,
+ "Alice",
+ 17)
+ .insertSource(2000, "Bob", 18)
+ .insertPreTransformed(2000, "Bob", 18)
+ .insertPostTransformed(
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform",
+ 2000,
+ "Bob",
+ 18)
+ .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePreTransformed(
+ new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
+ .updatePostTransformed(
+ new Object[] {
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform",
+ 2000,
+ "Bob",
+ 18
+ },
+ new Object[] {
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform",
+ 2000,
+ "Barcarolle",
+ 16
+ })
+ // filtered out data row
+ .insertSource(50, "Carol", 19)
+ .insertPreTransformed(50, "Carol", 19)
+ .insertPostTransformed()
+ .deleteSource(1000, "Alice", 17)
+ .deletePreTransformed(1000, "Alice", 17)
+ .deletePostTransformed(
+ "my_company.my_branch.metadata_transform",
+ "my_company",
+ "my_branch",
+ "metadata_transform",
+ 1000,
+ "Alice",
+ 17)
+ .runTests()
+ .destroyHarness();
+ }
+
+ @Test
+ public void testTransformWithCast() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch", "transform_with_cast");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "id, age + 1 as newage, CAST(CAST(id AS INT) + age AS BIGINT) as longevity, CAST(age AS VARCHAR) as string_age",
+ "newage > 17 and ref2 > 17",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("name", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT().notNull())
+ .physicalColumn("ref1", DataTypes.STRING())
+ .physicalColumn("ref2", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("age", DataTypes.INT().notNull())
+ .physicalColumn("ref2", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("newage", DataTypes.INT())
+ .physicalColumn("longevity", DataTypes.BIGINT())
+ .physicalColumn("string_age", DataTypes.STRING())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource("1001", "Alice", 17, "Reference001", 2021)
+ .insertPreTransformed("1001", 17, 2021)
+ .insertPostTransformed("1001", 18, 1018L, "17")
+ // this data record is filtered out since newage <= 17
+ .insertSource("1002", "Bob", 15, "Reference002", 2017)
+ .insertPreTransformed("1002", 15, 2017)
+ .insertPostTransformed()
+ // this data record is filtered out since ref2 <= 17
+ .insertSource("1003", "Bill", 18, "Reference003", 0)
+ .insertPreTransformed("1003", 18, 0)
+ .insertPostTransformed()
+ .insertSource("1004", "Carol", 18, "Reference004", 2018)
+ .insertPreTransformed("1004", 18, 2018)
+ .insertPostTransformed("1004", 19, 1022L, "18")
+ // test update event transform
+ .updateSource(
+ new Object[] {"1004", "Carol", 18, "Reference004", 2018},
+ new Object[] {"1004", "Colin", 19, "NeoReference004", 2018})
+ .updatePreTransformed(
+ new Object[] {"1004", 18, 2018}, new Object[] {"1004", 19, 2018})
+ .updatePostTransformed(
+ new Object[] {"1004", 19, 1022L, "18"},
+ new Object[] {"1004", 20, 1023L, "19"})
+ // updated value to a filtered out condition
+ .updateSource(
+ new Object[] {"1004", "Colin", 19, "NeoReference004", 2018},
+ new Object[] {"1004", "Colin", 10, "NeoReference004", 2018})
+ .updatePreTransformed(
+ new Object[] {"1004", 19, 2018}, new Object[] {"1004", 10, 2018})
+ .updatePostTransformed()
+ .deleteSource("1001", "Alice", 17, "Reference001", 2021)
+ .deletePreTransformed("1001", 17, 2021)
+ .deletePostTransformed("1001", 18, 1018L, "17")
+ .runTests()
+ .destroyHarness();
+ }
+}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 66a405a97..270e77534 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -17,8 +17,11 @@
package org.apache.flink.cdc.runtime.parser;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
import org.apache.flink.cdc.runtime.parser.metadata.TransformSchemaFactory;
import org.apache.flink.cdc.runtime.parser.metadata.TransformSqlOperatorTable;
@@ -43,9 +46,10 @@ import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.sql2rel.StandardConvertletTable;
import org.apache.calcite.tools.RelBuilder;
-import org.junit.Assert;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -66,9 +70,11 @@ public class TransformParserTest {
SqlSelect parse =
TransformParser.parseSelect(
"select CONCAT(id, order_id) as uniq_id, * from tb where uniq_id > 10 and id is not null");
- Assert.assertEquals(
- "`CONCAT`(`id`, `order_id`) AS `uniq_id`, *", parse.getSelectList().toString());
- Assert.assertEquals("`uniq_id` > 10 AND `id` IS NOT NULL", parse.getWhere().toString());
+ Assertions.assertThat(parse.getSelectList().toString())
+ .isEqualTo("`CONCAT`(`id`, `order_id`) AS `uniq_id`, *");
+
+ Assertions.assertThat(parse.getWhere().toString())
+ .isEqualTo("`uniq_id` > 10 AND `id` IS NOT NULL");
}
@Test
@@ -101,15 +107,17 @@ public class TransformParserTest {
factory,
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
SqlNode validateSqlNode = validator.validate(parse);
- Assert.assertEquals(
- "SUBSTR(`tb`.`id`, 1) AS `uniq_id`, `tb`.`id`, `tb`.`order_id`",
- parse.getSelectList().toString());
- Assert.assertEquals("`tb`.`id` IS NOT NULL", parse.getWhere().toString());
- Assert.assertEquals(
- "SELECT SUBSTR(`tb`.`id`, 1) AS `uniq_id`, `tb`.`id`, `tb`.`order_id`\n"
- + "FROM `default_schema`.`tb` AS `tb`\n"
- + "WHERE `tb`.`id` IS NOT NULL",
- validateSqlNode.toString().replaceAll("\r\n", "\n"));
+
+ Assertions.assertThat(parse.getSelectList().toString())
+ .isEqualTo("SUBSTR(`tb`.`id`, 1) AS `uniq_id`, `tb`.`id`, `tb`.`order_id`");
+
+ Assertions.assertThat(parse.getWhere().toString()).isEqualTo("`tb`.`id` IS NOT NULL");
+
+ Assertions.assertThat(validateSqlNode.toString().replaceAll("\r\n", "\n"))
+ .isEqualTo(
+ "SELECT SUBSTR(`tb`.`id`, 1) AS `uniq_id`, `tb`.`id`, `tb`.`order_id`\n"
+ + "FROM `default_schema`.`tb` AS `tb`\n"
+ + "WHERE `tb`.`id` IS NOT NULL");
}
@Test
@@ -160,29 +168,33 @@ public class TransformParserTest {
RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster, null);
relRoot = relRoot.withRel(RelDecorrelator.decorrelateQuery(relRoot.rel, relBuilder));
RelNode relNode = relRoot.rel;
- Assert.assertEquals(
- "SUBSTR(`tb`.`id`, 1) AS `uniq_id`, `tb`.`id`, `tb`.`order_id`",
- parse.getSelectList().toString());
- Assert.assertEquals("`tb`.`id` IS NOT NULL", parse.getWhere().toString());
- Assert.assertEquals(
- "SELECT SUBSTR(`tb`.`id`, 1) AS `uniq_id`, `tb`.`id`, `tb`.`order_id`\n"
- + "FROM `default_schema`.`tb` AS `tb`\n"
- + "WHERE `tb`.`id` IS NOT NULL",
- validateSqlNode.toString().replaceAll("\r\n", "\n"));
+
+ Assertions.assertThat(parse.getSelectList().toString())
+ .isEqualTo("SUBSTR(`tb`.`id`, 1) AS `uniq_id`, `tb`.`id`, `tb`.`order_id`");
+
+ Assertions.assertThat(parse.getWhere().toString()).isEqualTo("`tb`.`id` IS NOT NULL");
+
+ Assertions.assertThat(validateSqlNode.toString().replaceAll("\r\n", "\n"))
+ .isEqualTo(
+ "SELECT SUBSTR(`tb`.`id`, 1) AS `uniq_id`, `tb`.`id`, `tb`.`order_id`\n"
+ + "FROM `default_schema`.`tb` AS `tb`\n"
+ + "WHERE `tb`.`id` IS NOT NULL");
}
@Test
public void testParseComputedColumnNames() {
List computedColumnNames =
TransformParser.parseComputedColumnNames("CONCAT(id, order_id) as uniq_id, *");
- Assert.assertEquals(new String[] {"uniq_id"}, computedColumnNames.toArray());
+
+ Assertions.assertThat(computedColumnNames.toArray()).isEqualTo(new String[] {"uniq_id"});
}
@Test
public void testParseFilterColumnNameList() {
List computedColumnNames =
TransformParser.parseFilterColumnNameList(" uniq_id > 10 and id is not null");
- Assert.assertEquals(new String[] {"uniq_id", "id"}, computedColumnNames.toArray());
+ Assertions.assertThat(computedColumnNames.toArray())
+ .isEqualTo(new String[] {"uniq_id", "id"});
}
@Test
@@ -298,9 +310,112 @@ public class TransformParserTest {
testFilterExpression("cast(dt as TIMESTAMP)", "castToTimestamp(dt, __time_zone__)");
}
+ @Test
+ public void testGenerateProjectionColumns() {
+ List testColumns =
+ Arrays.asList(
+ Column.physicalColumn("id", DataTypes.INT(), "id"),
+ Column.physicalColumn("name", DataTypes.STRING(), "string"),
+ Column.physicalColumn("age", DataTypes.INT(), "age"),
+ Column.physicalColumn("address", DataTypes.STRING(), "address"),
+ Column.physicalColumn("weight", DataTypes.DOUBLE(), "weight"),
+ Column.physicalColumn("height", DataTypes.DOUBLE(), "height"));
+
+ List result =
+ TransformParser.generateProjectionColumns(
+ "id, upper(name) as name, age + 1 as newage, weight / (height * height) as bmi",
+ testColumns);
+
+ List expected =
+ Arrays.asList(
+ "ProjectionColumn{column=`id` INT, expression='null', scriptExpression='null', originalColumnNames=null, transformExpressionKey=null}",
+ "ProjectionColumn{column=`name` STRING, expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)', originalColumnNames=[name], transformExpressionKey=null}",
+ "ProjectionColumn{column=`newage` INT, expression='`TB`.`age` + 1', scriptExpression='age + 1', originalColumnNames=[age], transformExpressionKey=null}",
+ "ProjectionColumn{column=`bmi` DOUBLE, expression='`TB`.`weight` / (`TB`.`height` * `TB`.`height`)', scriptExpression='weight / height * height', originalColumnNames=[weight, height, height], transformExpressionKey=null}");
+ Assertions.assertThat(result.toString()).isEqualTo("[" + String.join(", ", expected) + "]");
+
+ List metadataResult =
+ TransformParser.generateProjectionColumns(
+ "*, __namespace_name__, __schema_name__, __table_name__", testColumns);
+
+ List metadataExpected =
+ Arrays.asList(
+ "ProjectionColumn{column=`id` INT, expression='null', scriptExpression='null', originalColumnNames=null, transformExpressionKey=null}",
+ "ProjectionColumn{column=`name` STRING, expression='null', scriptExpression='null', originalColumnNames=null, transformExpressionKey=null}",
+ "ProjectionColumn{column=`age` INT, expression='null', scriptExpression='null', originalColumnNames=null, transformExpressionKey=null}",
+ "ProjectionColumn{column=`address` STRING, expression='null', scriptExpression='null', originalColumnNames=null, transformExpressionKey=null}",
+ "ProjectionColumn{column=`weight` DOUBLE, expression='null', scriptExpression='null', originalColumnNames=null, transformExpressionKey=null}",
+ "ProjectionColumn{column=`height` DOUBLE, expression='null', scriptExpression='null', originalColumnNames=null, transformExpressionKey=null}",
+ "ProjectionColumn{column=`__namespace_name__` STRING NOT NULL, expression='__namespace_name__', scriptExpression='__namespace_name__', originalColumnNames=[__namespace_name__], transformExpressionKey=null}",
+ "ProjectionColumn{column=`__schema_name__` STRING NOT NULL, expression='__schema_name__', scriptExpression='__schema_name__', originalColumnNames=[__schema_name__], transformExpressionKey=null}",
+ "ProjectionColumn{column=`__table_name__` STRING NOT NULL, expression='__table_name__', scriptExpression='__table_name__', originalColumnNames=[__table_name__], transformExpressionKey=null}");
+ Assertions.assertThat(metadataResult.toString())
+ .isEqualTo("[" + String.join(", ", metadataExpected) + "]");
+
+ // calculated columns must use AS to provide an alias name
+ Assertions.assertThatThrownBy(
+ () -> TransformParser.generateProjectionColumns("id, 1 + 1", testColumns))
+ .isExactlyInstanceOf(ParseException.class);
+ }
+
+ @Test
+ public void testGenerateReferencedColumns() {
+ List testColumns =
+ Arrays.asList(
+ Column.physicalColumn("id", DataTypes.INT(), "id"),
+ Column.physicalColumn("name", DataTypes.STRING(), "string"),
+ Column.physicalColumn("age", DataTypes.INT(), "age"),
+ Column.physicalColumn("address", DataTypes.STRING(), "address"),
+ Column.physicalColumn("weight", DataTypes.DOUBLE(), "weight"),
+ Column.physicalColumn("height", DataTypes.DOUBLE(), "height"),
+ Column.physicalColumn("birthday", DataTypes.DATE(), "birthday"));
+
+ List result =
+ TransformParser.generateReferencedColumns(
+ "id, upper(name) as name, age + 1 as newage, weight / (height * height) as bmi",
+ "bmi > 17 and char_length(address) > 10",
+ testColumns);
+
+ List expected =
+ Arrays.asList(
+ "`id` INT 'id'",
+ "`name` STRING 'string'",
+ "`age` INT 'age'",
+ "`address` STRING 'address'",
+ "`weight` DOUBLE 'weight'",
+ "`height` DOUBLE 'height'");
+ Assertions.assertThat(result.toString()).isEqualTo("[" + String.join(", ", expected) + "]");
+
+ // calculated columns must use AS to provide an alias name
+ Assertions.assertThatThrownBy(
+ () ->
+ TransformParser.generateReferencedColumns(
+ "id, 1 + 1", null, testColumns))
+ .isExactlyInstanceOf(ParseException.class);
+ }
+
+ @Test
+ public void testNormalizeFilter() {
+ Assertions.assertThat(TransformParser.normalizeFilter("a, b, c, d", "a > 0 and b > 0"))
+ .isEqualTo("`a` > 0 AND `b` > 0");
+ Assertions.assertThat(TransformParser.normalizeFilter("a, b, c, d", null)).isEqualTo(null);
+ Assertions.assertThat(
+ TransformParser.normalizeFilter(
+ "abs(a) as cal_a, char_length(b) as cal_b, c, d",
+ "a > 4 and cal_a > 8 and cal_b < 17 and c != d"))
+ .isEqualTo("`a` > 4 AND ABS(`a`) > 8 AND CHAR_LENGTH(`b`) < 17 AND `c` <> `d`");
+
+ Assertions.assertThat(
+ TransformParser.normalizeFilter(
+ "x, y, z, 1 - x as u, 1 - y as v, 1 - z as w",
+ "concat(u, concat(v, concat(w, x), y), z) != 10"))
+ .isEqualTo(
+ "`concat`(1 - `x`, `concat`(1 - `y`, `concat`(1 - `z`, `x`), `y`), `z`) <> 10");
+ }
+
private void testFilterExpression(String expression, String expressionExpect) {
String janinoExpression =
TransformParser.translateFilterExpressionToJaninoExpression(expression);
- Assert.assertEquals(expressionExpect, janinoExpression);
+ Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
}
}