[FLINK-35805][transform] Add __data_event_type__ metadata column

This closes  #3468
pull/3211/merge
yuxiqian 6 months ago committed by GitHub
parent 874ff4ff20
commit 3cb91fb51a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file.
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.
| Field | Data Type | Description |
|--------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
|---------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
| __data_event_type__ | String | Operation type of data change event. |
## Metadata relationship

@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file.
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.
| Field | Data Type | Description |
|--------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
|---------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
| __data_event_type__ | String | Operation type of data change event. |
## Metadata relationship

@ -340,6 +340,66 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}");
}
@ParameterizedTest
@EnumSource
void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup transform
TransformDef transformDef =
new TransformDef(
"default_namespace.default_schema.table1",
"*,concat(col1,'0') as col12,__data_event_type__ as rk",
"col1 <> '3'",
"col1",
"col12",
"key1=value1",
"");
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
new ArrayList<>(Collections.singletonList(transformDef)),
Collections.emptyList(),
pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U], after=[2, x, 20, +U], op=UPDATE, meta=()}");
}
@ParameterizedTest
@EnumSource
void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {

@ -442,9 +442,9 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " type: values\n"
+ "transform:\n"
+ " - source-table: %s.TABLEALPHA\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n"
+ " - source-table: %s.TABLEBETA\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
@ -462,25 +462,25 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}",
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}",
transformTestDatabase.getDatabaseName()),
60000L);
waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}",
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}",
transformTestDatabase.getDatabaseName()),
60000L);
validateEvents(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA], op=INSERT, meta=()}");
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA, +I], op=INSERT, meta=()}");
// generate binlogs
String mysqlJdbcUrl =
@ -492,9 +492,9 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
insertBinlogEvents(mysqlJdbcUrl);
validateEvents(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], after=[], op=DELETE, meta=()}");
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, -U], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA, +U], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, -D], after=[], op=DELETE, meta=()}");
}
private static void insertBinlogEvents(String mysqlJdbcUrl) throws SQLException {

@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@ -389,13 +390,15 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
// insert and update event only process afterData, delete only process beforeData
if (after != null) {
if (transformFilterProcessor.process(after, epochTime)) {
if (transformFilterProcessor.process(
after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'))) {
return Optional.of(dataChangeEvent);
} else {
return Optional.empty();
}
} else if (before != null) {
if (transformFilterProcessor.process(before, epochTime)) {
if (transformFilterProcessor.process(
before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'))) {
return Optional.of(dataChangeEvent);
} else {
return Optional.empty();
@ -412,11 +415,14 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
if (before != null) {
BinaryRecordData projectedBefore =
postTransformProcessor.processData(before, epochTime);
postTransformProcessor.processData(
before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'));
dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
}
if (after != null) {
BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime);
BinaryRecordData projectedAfter =
postTransformProcessor.processData(
after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'));
dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
}
return Optional.of(dataChangeEvent);
@ -499,4 +505,8 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
}
});
}
private String opTypeToRowKind(OperationType opType, char beforeOrAfter) {
return String.format("%c%c", beforeOrAfter, opType.name().charAt(0));
}
}

@ -21,7 +21,7 @@ import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
import org.codehaus.janino.ExpressionEvaluator;
@ -33,6 +33,8 @@ import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS;
/**
* The processor of the projection column. It processes the data column and the user-defined
* computed columns.
@ -79,9 +81,9 @@ public class ProjectionColumnProcessor {
return projectionColumn;
}
public Object evaluate(BinaryRecordData after, long epochTime) {
public Object evaluate(BinaryRecordData record, long epochTime, String opType) {
try {
return expressionEvaluator.evaluate(generateParams(after, epochTime));
return expressionEvaluator.evaluate(generateParams(record, epochTime, opType));
} catch (InvocationTargetException e) {
LOG.error(
"Table:{} column:{} projection:{} execute failed. {}",
@ -93,7 +95,7 @@ public class ProjectionColumnProcessor {
}
}
private Object[] generateParams(BinaryRecordData after, long epochTime) {
private Object[] generateParams(BinaryRecordData record, long epochTime, String opType) {
List<Object> params = new ArrayList<>();
List<Column> columns = tableInfo.getPreTransformedSchema().getColumns();
@ -103,15 +105,18 @@ public class ProjectionColumnProcessor {
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
case MetadataColumns.DEFAULT_NAMESPACE_NAME:
params.add(tableInfo.getNamespace());
continue;
case TransformParser.DEFAULT_SCHEMA_NAME:
case MetadataColumns.DEFAULT_SCHEMA_NAME:
params.add(tableInfo.getSchemaName());
continue;
case TransformParser.DEFAULT_TABLE_NAME:
case MetadataColumns.DEFAULT_TABLE_NAME:
params.add(tableInfo.getTableName());
continue;
case MetadataColumns.DEFAULT_DATA_EVENT_TYPE:
params.add(opType);
continue;
}
boolean argumentFound = false;
@ -120,7 +125,7 @@ public class ProjectionColumnProcessor {
if (column.getName().equals(originalColumnName)) {
params.add(
DataTypeConverter.convertToOriginal(
fieldGetters[i].getFieldOrNull(after), column.getType()));
fieldGetters[i].getFieldOrNull(record), column.getType()));
argumentFound = true;
break;
}
@ -158,20 +163,14 @@ public class ProjectionColumnProcessor {
}
for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
paramTypes.add(String.class);
break;
case TransformParser.DEFAULT_SCHEMA_NAME:
argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
paramTypes.add(String.class);
break;
case TransformParser.DEFAULT_TABLE_NAME:
argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
paramTypes.add(String.class);
break;
}
METADATA_COLUMNS.stream()
.filter(col -> col.f0.equals(originalColumnName))
.findFirst()
.ifPresent(
col -> {
argumentNames.add(col.f0);
paramTypes.add(col.f2);
});
}
argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE);

@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
import org.codehaus.janino.ExpressionEvaluator;
@ -32,11 +33,8 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.stream.Stream;
import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_NAMESPACE_NAME;
import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_SCHEMA_NAME;
import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_TABLE_NAME;
import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS;
/** The processor of the transform filter. It processes the data change event of matched table. */
public class TransformFilterProcessor {
@ -74,9 +72,10 @@ public class TransformFilterProcessor {
tableInfo, transformFilter, timezone, udfDescriptors, udfFunctionInstances);
}
public boolean process(BinaryRecordData after, long epochTime) {
public boolean process(BinaryRecordData record, long epochTime, String opType) {
try {
return (Boolean) expressionEvaluator.evaluate(generateParams(after, epochTime));
return (Boolean)
expressionEvaluator.evaluate(generateParams(record, epochTime, opType));
} catch (InvocationTargetException e) {
LOG.error(
"Table:{} filter:{} execute failed. {}",
@ -102,19 +101,19 @@ public class TransformFilterProcessor {
}
}
}
Stream.of(DEFAULT_NAMESPACE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME)
METADATA_COLUMNS.stream()
.forEach(
metadataColumn -> {
if (scriptExpression.contains(metadataColumn)
&& !argNames.contains(metadataColumn)) {
argNames.add(metadataColumn);
argTypes.add(String.class);
col -> {
if (scriptExpression.contains(col.f0) && !argNames.contains(col.f0)) {
argNames.add(col.f0);
argTypes.add(col.f2);
}
});
return Tuple2.of(argNames, argTypes);
}
private Object[] generateParams(BinaryRecordData after, long epochTime) {
private Object[] generateParams(BinaryRecordData record, long epochTime, String opType) {
List<Object> params = new ArrayList<>();
List<Column> columns = tableInfo.getPreTransformedSchema().getColumns();
@ -123,22 +122,25 @@ public class TransformFilterProcessor {
RecordData.FieldGetter[] fieldGetters = tableInfo.getPreTransformedFieldGetters();
for (String columnName : args.f0) {
switch (columnName) {
case DEFAULT_NAMESPACE_NAME:
case MetadataColumns.DEFAULT_NAMESPACE_NAME:
params.add(tableInfo.getNamespace());
continue;
case DEFAULT_SCHEMA_NAME:
case MetadataColumns.DEFAULT_SCHEMA_NAME:
params.add(tableInfo.getSchemaName());
continue;
case DEFAULT_TABLE_NAME:
case MetadataColumns.DEFAULT_TABLE_NAME:
params.add(tableInfo.getTableName());
continue;
case MetadataColumns.DEFAULT_DATA_EVENT_TYPE:
params.add(opType);
continue;
}
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
if (column.getName().equals(columnName)) {
params.add(
DataTypeConverter.convertToOriginal(
fieldGetters[i].getFieldOrNull(after), column.getType()));
fieldGetters[i].getFieldOrNull(record), column.getType()));
break;
}
}

@ -113,7 +113,7 @@ public class TransformProjectionProcessor {
.collect(Collectors.toList()));
}
public BinaryRecordData processData(BinaryRecordData payload, long epochTime) {
public BinaryRecordData processData(BinaryRecordData payload, long epochTime, String opType) {
List<Object> valueList = new ArrayList<>();
List<Column> columns = postTransformChangeInfo.getPostTransformedSchema().getColumns();
@ -124,7 +124,7 @@ public class TransformProjectionProcessor {
ProjectionColumn projectionColumn = projectionColumnProcessor.getProjectionColumn();
valueList.add(
DataTypeConverter.convert(
projectionColumnProcessor.evaluate(payload, epochTime),
projectionColumnProcessor.evaluate(payload, epochTime, opType),
projectionColumn.getDataType()));
} else {
Column column = columns.get(i);

@ -20,7 +20,6 @@ package org.apache.flink.cdc.runtime.parser;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.parser.metadata.TransformSchemaFactory;
@ -84,6 +83,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.cdc.common.utils.StringUtils.isNullOrWhitespaceOnly;
import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS;
import static org.apache.flink.cdc.runtime.typeutils.DataTypeConverter.convertCalciteType;
/** Use Flink's calcite parser to parse the statement of flink cdc pipeline transform. */
@ -91,9 +91,6 @@ public class TransformParser {
private static final Logger LOG = LoggerFactory.getLogger(TransformParser.class);
private static final String DEFAULT_SCHEMA = "default_schema";
private static final String DEFAULT_TABLE = "TB";
public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__";
public static final String DEFAULT_SCHEMA_NAME = "__schema_name__";
public static final String DEFAULT_TABLE_NAME = "__table_name__";
private static SqlParser getCalciteParser(String sql) {
return SqlParser.create(
@ -497,16 +494,14 @@ public class TransformParser {
private static List<Column> copyFillMetadataColumn(List<Column> columns) {
// Add metaColumn for SQLValidator.validate
List<Column> columnsWithMetadata = new ArrayList<>(columns);
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING()));
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING()));
columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING()));
METADATA_COLUMNS.stream()
.map(col -> Column.physicalColumn(col.f0, col.f1))
.forEach(columnsWithMetadata::add);
return columnsWithMetadata;
}
private static boolean isMetadataColumn(String columnName) {
return DEFAULT_TABLE_NAME.equals(columnName)
|| DEFAULT_SCHEMA_NAME.equals(columnName)
|| DEFAULT_NAMESPACE_NAME.equals(columnName);
return METADATA_COLUMNS.stream().anyMatch(col -> col.f0.equals(columnName));
}
public static SqlSelect parseFilterExpression(String filterExpression) {

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.parser.metadata;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import java.util.Arrays;
import java.util.List;
/** Contains all supported metadata columns that could be used in transform expressions. */
public class MetadataColumns {
public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__";
public static final String DEFAULT_SCHEMA_NAME = "__schema_name__";
public static final String DEFAULT_TABLE_NAME = "__table_name__";
public static final String DEFAULT_DATA_EVENT_TYPE = "__data_event_type__";
public static final List<Tuple3<String, DataType, Class<?>>> METADATA_COLUMNS =
Arrays.asList(
Tuple3.of(DEFAULT_NAMESPACE_NAME, DataTypes.STRING(), String.class),
Tuple3.of(DEFAULT_SCHEMA_NAME, DataTypes.STRING(), String.class),
Tuple3.of(DEFAULT_TABLE_NAME, DataTypes.STRING(), String.class),
Tuple3.of(DEFAULT_DATA_EVENT_TYPE, DataTypes.STRING(), String.class));
}
Loading…
Cancel
Save