[FLINK-35272][cdc-runtime] Transform supports omitting and renaming computed column
This closes #3285.pull/3523/head
parent
2dabfc0815
commit
81d916fc73
@ -0,0 +1,91 @@
|
||||
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
-- contributor license agreements. See the NOTICE file distributed with
|
||||
-- this work for additional information regarding copyright ownership.
|
||||
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
-- (the "License"); you may not use this file except in compliance with
|
||||
-- the License. You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
-- DATABASE: column_type_test
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
|
||||
CREATE TABLE DATA_TYPES_TABLE
|
||||
(
|
||||
id INT,
|
||||
tiny_c TINYINT,
|
||||
tiny_un_c TINYINT UNSIGNED,
|
||||
tiny_un_z_c TINYINT UNSIGNED ZEROFILL,
|
||||
small_c SMALLINT,
|
||||
small_un_c SMALLINT UNSIGNED,
|
||||
small_un_z_c SMALLINT UNSIGNED ZEROFILL,
|
||||
medium_c MEDIUMINT,
|
||||
medium_un_c MEDIUMINT UNSIGNED,
|
||||
medium_un_z_c MEDIUMINT UNSIGNED ZEROFILL,
|
||||
int_c INTEGER,
|
||||
int_un_c INTEGER UNSIGNED,
|
||||
int_un_z_c INTEGER UNSIGNED ZEROFILL,
|
||||
int11_c INT(11),
|
||||
big_c BIGINT,
|
||||
varchar_c VARCHAR(255),
|
||||
char_c CHAR(3),
|
||||
real_c REAL,
|
||||
float_c FLOAT,
|
||||
float_un_c FLOAT UNSIGNED,
|
||||
float_un_z_c FLOAT UNSIGNED ZEROFILL,
|
||||
double_c DOUBLE,
|
||||
double_un_c DOUBLE UNSIGNED,
|
||||
double_un_z_c DOUBLE UNSIGNED ZEROFILL,
|
||||
decimal_c DECIMAL(8, 4),
|
||||
decimal_un_c DECIMAL(8, 4) UNSIGNED,
|
||||
decimal_un_z_c DECIMAL(8, 4) UNSIGNED ZEROFILL,
|
||||
numeric_c NUMERIC(6, 0),
|
||||
big_decimal_c DECIMAL(65, 1),
|
||||
bit1_c BIT,
|
||||
tiny1_c TINYINT(1),
|
||||
boolean_c BOOLEAN,
|
||||
date_c DATE,
|
||||
datetime3_c DATETIME(3),
|
||||
datetime6_c DATETIME(6),
|
||||
timestamp_c TIMESTAMP,
|
||||
text_c TEXT,
|
||||
tiny_blob_c TINYBLOB,
|
||||
blob_c BLOB,
|
||||
medium_blob_c MEDIUMBLOB,
|
||||
long_blob_c LONGBLOB,
|
||||
year_c YEAR,
|
||||
enum_c enum('red', 'white') default 'red',
|
||||
point_c POINT,
|
||||
geometry_c GEOMETRY,
|
||||
linestring_c LINESTRING,
|
||||
polygon_c POLYGON,
|
||||
multipoint_c MULTIPOINT,
|
||||
multiline_c MULTILINESTRING,
|
||||
multipolygon_c MULTIPOLYGON,
|
||||
geometrycollection_c GEOMCOLLECTION,
|
||||
PRIMARY KEY (id)
|
||||
) DEFAULT CHARSET=utf8;
|
||||
|
||||
INSERT INTO DATA_TYPES_TABLE
|
||||
VALUES (1, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147483647,
|
||||
4294967295, 4294967295, 2147483647, 9223372036854775807,
|
||||
'Hello World', 'abc', 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445,
|
||||
123.4567, 123.4568, 123.4569, 345.6, 34567892.1, 0, 1, true,
|
||||
'2020-07-17', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22',
|
||||
'text', UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 2021,
|
||||
'red',
|
||||
ST_GeomFromText('POINT(1 1)'),
|
||||
ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),
|
||||
ST_GeomFromText('LINESTRING(3 0, 3 3, 3 5)'),
|
||||
ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),
|
||||
ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
|
||||
ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
|
||||
ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'),
|
||||
ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))'));
|
@ -0,0 +1,117 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.runtime.operators.transform;
|
||||
|
||||
import org.apache.flink.cdc.common.data.RecordData;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.common.utils.SchemaUtils;
|
||||
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
|
||||
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* PostTransformChangeInfo caches pre-transformed / pre-transformed schema, schema field getters,
|
||||
* and binary record data generator for post-transform schema.
|
||||
*/
|
||||
public class PostTransformChangeInfo {
|
||||
private TableId tableId;
|
||||
private Schema preTransformedSchema;
|
||||
private Schema postTransformedSchema;
|
||||
private RecordData.FieldGetter[] preTransformedFieldGetters;
|
||||
private RecordData.FieldGetter[] postTransformedFieldGetters;
|
||||
private BinaryRecordDataGenerator recordDataGenerator;
|
||||
|
||||
public PostTransformChangeInfo(
|
||||
TableId tableId,
|
||||
Schema postTransformedSchema,
|
||||
RecordData.FieldGetter[] postTransformedFieldGetters,
|
||||
Schema preTransformedSchema,
|
||||
RecordData.FieldGetter[] preTransformedFieldGetters,
|
||||
BinaryRecordDataGenerator recordDataGenerator) {
|
||||
this.tableId = tableId;
|
||||
this.postTransformedSchema = postTransformedSchema;
|
||||
this.postTransformedFieldGetters = postTransformedFieldGetters;
|
||||
this.preTransformedSchema = preTransformedSchema;
|
||||
this.preTransformedFieldGetters = preTransformedFieldGetters;
|
||||
this.recordDataGenerator = recordDataGenerator;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return tableId.identifier();
|
||||
}
|
||||
|
||||
public String getTableName() {
|
||||
return tableId.getTableName();
|
||||
}
|
||||
|
||||
public String getSchemaName() {
|
||||
return tableId.getSchemaName();
|
||||
}
|
||||
|
||||
public String getNamespace() {
|
||||
return tableId.getNamespace();
|
||||
}
|
||||
|
||||
public TableId getTableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
public Schema getPostTransformedSchema() {
|
||||
return postTransformedSchema;
|
||||
}
|
||||
|
||||
public Schema getPreTransformedSchema() {
|
||||
return preTransformedSchema;
|
||||
}
|
||||
|
||||
public RecordData.FieldGetter[] getPostTransformedFieldGetters() {
|
||||
return postTransformedFieldGetters;
|
||||
}
|
||||
|
||||
public RecordData.FieldGetter[] getPreTransformedFieldGetters() {
|
||||
return preTransformedFieldGetters;
|
||||
}
|
||||
|
||||
public BinaryRecordDataGenerator getRecordDataGenerator() {
|
||||
return recordDataGenerator;
|
||||
}
|
||||
|
||||
public static PostTransformChangeInfo of(
|
||||
TableId tableId, Schema postTransformedSchema, Schema preTransformedSchema) {
|
||||
|
||||
List<RecordData.FieldGetter> postTransformedFieldGetters =
|
||||
SchemaUtils.createFieldGetters(postTransformedSchema.getColumns());
|
||||
|
||||
List<RecordData.FieldGetter> preTransformedFieldGetters =
|
||||
SchemaUtils.createFieldGetters(preTransformedSchema.getColumns());
|
||||
|
||||
BinaryRecordDataGenerator postTransformedRecordDataGenerator =
|
||||
new BinaryRecordDataGenerator(
|
||||
DataTypeConverter.toRowType(postTransformedSchema.getColumns()));
|
||||
|
||||
return new PostTransformChangeInfo(
|
||||
tableId,
|
||||
postTransformedSchema,
|
||||
postTransformedFieldGetters.toArray(new RecordData.FieldGetter[0]),
|
||||
preTransformedSchema,
|
||||
preTransformedFieldGetters.toArray(new RecordData.FieldGetter[0]),
|
||||
postTransformedRecordDataGenerator);
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.runtime.operators.transform;
|
||||
|
||||
import org.apache.flink.cdc.common.schema.Selectors;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/** Post-Transformation rule used by {@link PostTransformOperator}. */
|
||||
public class PostTransformer {
|
||||
private final Selectors selectors;
|
||||
|
||||
private final Optional<TransformProjection> projection;
|
||||
private final Optional<TransformFilter> filter;
|
||||
|
||||
public PostTransformer(
|
||||
Selectors selectors,
|
||||
@Nullable TransformProjection projection,
|
||||
@Nullable TransformFilter filter) {
|
||||
this.selectors = selectors;
|
||||
this.projection = projection != null ? Optional.of(projection) : Optional.empty();
|
||||
this.filter = filter != null ? Optional.of(filter) : Optional.empty();
|
||||
}
|
||||
|
||||
public Selectors getSelectors() {
|
||||
return selectors;
|
||||
}
|
||||
|
||||
public Optional<TransformProjection> getProjection() {
|
||||
return projection;
|
||||
}
|
||||
|
||||
public Optional<TransformFilter> getFilter() {
|
||||
return filter;
|
||||
}
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.runtime.operators.transform;
|
||||
|
||||
import org.apache.flink.cdc.common.data.RecordData;
|
||||
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
|
||||
import org.apache.flink.cdc.common.event.CreateTableEvent;
|
||||
import org.apache.flink.cdc.common.schema.Column;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.runtime.parser.TransformParser;
|
||||
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The processor of pre-transform projection in {@link PreTransformOperator}.
|
||||
*
|
||||
* <p>A pre-transform projection processor handles:
|
||||
*
|
||||
* <ul>
|
||||
* <li>CreateTableEvent: removes unused (unreferenced) columns from given schema.
|
||||
* <li>SchemaChangeEvent: update the columns of TransformProjection.
|
||||
* <li>DataChangeEvent: omits unused columns in data row.
|
||||
* </ul>
|
||||
*/
|
||||
public class PreTransformProcessor {
|
||||
private PreTransformChangeInfo tableChangeInfo;
|
||||
private TransformProjection transformProjection;
|
||||
private @Nullable TransformFilter transformFilter;
|
||||
private List<Boolean> cachedProjectionColumnsState;
|
||||
|
||||
public PreTransformProcessor(
|
||||
PreTransformChangeInfo tableChangeInfo,
|
||||
TransformProjection transformProjection,
|
||||
@Nullable TransformFilter transformFilter) {
|
||||
this.tableChangeInfo = tableChangeInfo;
|
||||
this.transformProjection = transformProjection;
|
||||
this.transformFilter = transformFilter;
|
||||
this.cachedProjectionColumnsState =
|
||||
cacheIsProjectionColumnMap(tableChangeInfo, transformProjection);
|
||||
}
|
||||
|
||||
public boolean hasTableChangeInfo() {
|
||||
return this.tableChangeInfo != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method analyses (directly and indirectly) referenced columns, and peels unused columns
|
||||
* from schema. For example, given original schema with columns (A, B, C, D, E) with projection
|
||||
* rule (A, B + 1 as newB) and filtering rule (C > 0), a peeled schema containing (A, B, C) only
|
||||
* will be sent to downstream, and (D, E) column along with corresponding data will be trimmed.
|
||||
*/
|
||||
public CreateTableEvent preTransformCreateTableEvent(CreateTableEvent createTableEvent) {
|
||||
List<Column> preTransformColumns =
|
||||
TransformParser.generateReferencedColumns(
|
||||
transformProjection.getProjection(),
|
||||
transformFilter != null ? transformFilter.getExpression() : null,
|
||||
createTableEvent.getSchema().getColumns());
|
||||
Schema schema = createTableEvent.getSchema().copy(preTransformColumns);
|
||||
return new CreateTableEvent(createTableEvent.tableId(), schema);
|
||||
}
|
||||
|
||||
public BinaryRecordData processFillDataField(BinaryRecordData data) {
|
||||
List<Object> valueList = new ArrayList<>();
|
||||
List<Column> columns = tableChangeInfo.getPreTransformedSchema().getColumns();
|
||||
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
if (cachedProjectionColumnsState.get(i)) {
|
||||
valueList.add(null);
|
||||
} else {
|
||||
valueList.add(
|
||||
getValueFromBinaryRecordData(
|
||||
columns.get(i).getName(),
|
||||
data,
|
||||
tableChangeInfo.getSourceSchema().getColumns(),
|
||||
tableChangeInfo.getSourceFieldGetters()));
|
||||
}
|
||||
}
|
||||
|
||||
return tableChangeInfo
|
||||
.getPreTransformedRecordDataGenerator()
|
||||
.generate(valueList.toArray(new Object[0]));
|
||||
}
|
||||
|
||||
private Object getValueFromBinaryRecordData(
|
||||
String columnName,
|
||||
BinaryRecordData binaryRecordData,
|
||||
List<Column> columns,
|
||||
RecordData.FieldGetter[] fieldGetters) {
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
if (columnName.equals(columns.get(i).getName())) {
|
||||
return DataTypeConverter.convert(
|
||||
fieldGetters[i].getFieldOrNull(binaryRecordData), columns.get(i).getType());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<Boolean> cacheIsProjectionColumnMap(
|
||||
PreTransformChangeInfo tableChangeInfo, TransformProjection transformProjection) {
|
||||
List<Boolean> cachedMap = new ArrayList<>();
|
||||
if (!hasTableChangeInfo()) {
|
||||
return cachedMap;
|
||||
}
|
||||
|
||||
for (Column column : tableChangeInfo.getPreTransformedSchema().getColumns()) {
|
||||
boolean isProjectionColumn = false;
|
||||
for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
|
||||
if (column.getName().equals(projectionColumn.getColumnName())
|
||||
&& projectionColumn.isValidTransformedProjectionColumn()) {
|
||||
isProjectionColumn = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
cachedMap.add(isProjectionColumn);
|
||||
}
|
||||
|
||||
return cachedMap;
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.runtime.operators.transform;
|
||||
|
||||
import org.apache.flink.cdc.common.schema.Selectors;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/** Pre-Transformation rule used by {@link PreTransformOperator}. */
|
||||
public class PreTransformer {
|
||||
private final Selectors selectors;
|
||||
|
||||
private final Optional<TransformProjection> projection;
|
||||
private final Optional<TransformFilter> filter;
|
||||
|
||||
public PreTransformer(
|
||||
Selectors selectors,
|
||||
@Nullable TransformProjection projection,
|
||||
@Nullable TransformFilter filter) {
|
||||
this.selectors = selectors;
|
||||
this.projection = projection != null ? Optional.of(projection) : Optional.empty();
|
||||
this.filter = filter != null ? Optional.of(filter) : Optional.empty();
|
||||
}
|
||||
|
||||
public Selectors getSelectors() {
|
||||
return selectors;
|
||||
}
|
||||
|
||||
public Optional<TransformProjection> getProjection() {
|
||||
return projection;
|
||||
}
|
||||
|
||||
public Optional<TransformFilter> getFilter() {
|
||||
return filter;
|
||||
}
|
||||
}
|
@ -1,90 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.runtime.operators.transform;
|
||||
|
||||
import org.apache.flink.cdc.common.data.RecordData;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.common.utils.SchemaUtils;
|
||||
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
|
||||
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/** The TableInfo applies to cache schema and fieldGetters. */
|
||||
public class TableInfo {
|
||||
private TableId tableId;
|
||||
private Schema schema;
|
||||
private RecordData.FieldGetter[] fieldGetters;
|
||||
private BinaryRecordDataGenerator recordDataGenerator;
|
||||
|
||||
public TableInfo(
|
||||
TableId tableId,
|
||||
Schema schema,
|
||||
RecordData.FieldGetter[] fieldGetters,
|
||||
BinaryRecordDataGenerator recordDataGenerator) {
|
||||
this.tableId = tableId;
|
||||
this.schema = schema;
|
||||
this.fieldGetters = fieldGetters;
|
||||
this.recordDataGenerator = recordDataGenerator;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return tableId.identifier();
|
||||
}
|
||||
|
||||
public String getTableName() {
|
||||
return tableId.getTableName();
|
||||
}
|
||||
|
||||
public String getSchemaName() {
|
||||
return tableId.getSchemaName();
|
||||
}
|
||||
|
||||
public String getNamespace() {
|
||||
return tableId.getNamespace();
|
||||
}
|
||||
|
||||
public TableId getTableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
public Schema getSchema() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
public RecordData.FieldGetter[] getFieldGetters() {
|
||||
return fieldGetters;
|
||||
}
|
||||
|
||||
public BinaryRecordDataGenerator getRecordDataGenerator() {
|
||||
return recordDataGenerator;
|
||||
}
|
||||
|
||||
public static TableInfo of(TableId tableId, Schema schema) {
|
||||
List<RecordData.FieldGetter> fieldGetters =
|
||||
SchemaUtils.createFieldGetters(schema.getColumns());
|
||||
BinaryRecordDataGenerator recordDataGenerator =
|
||||
new BinaryRecordDataGenerator(DataTypeConverter.toRowType(schema.getColumns()));
|
||||
return new TableInfo(
|
||||
tableId,
|
||||
schema,
|
||||
fieldGetters.toArray(new RecordData.FieldGetter[0]),
|
||||
recordDataGenerator);
|
||||
}
|
||||
}
|
@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.runtime.operators.transform;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import static org.apache.flink.cdc.runtime.parser.TransformParser.normalizeFilter;
|
||||
|
||||
/** A rule defining pre-transformations where filtered rows and irrelevant columns are removed. */
|
||||
public class TransformRule implements Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final String tableInclusions;
|
||||
private final @Nullable String projection;
|
||||
private final @Nullable String filter;
|
||||
private final String primaryKey;
|
||||
private final String partitionKey;
|
||||
private final String tableOption;
|
||||
|
||||
public TransformRule(
|
||||
String tableInclusions,
|
||||
@Nullable String projection,
|
||||
@Nullable String filter,
|
||||
String primaryKey,
|
||||
String partitionKey,
|
||||
String tableOption) {
|
||||
this.tableInclusions = tableInclusions;
|
||||
this.projection = projection;
|
||||
this.filter = normalizeFilter(projection, filter);
|
||||
this.primaryKey = primaryKey;
|
||||
this.partitionKey = partitionKey;
|
||||
this.tableOption = tableOption;
|
||||
}
|
||||
|
||||
public String getTableInclusions() {
|
||||
return tableInclusions;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String getProjection() {
|
||||
return projection;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String getFilter() {
|
||||
return filter;
|
||||
}
|
||||
|
||||
public String getPrimaryKey() {
|
||||
return primaryKey;
|
||||
}
|
||||
|
||||
public String getPartitionKey() {
|
||||
return partitionKey;
|
||||
}
|
||||
|
||||
public String getTableOption() {
|
||||
return tableOption;
|
||||
}
|
||||
}
|
@ -0,0 +1,520 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.runtime.operators.transform;
|
||||
|
||||
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
|
||||
import org.apache.flink.cdc.common.event.AddColumnEvent;
|
||||
import org.apache.flink.cdc.common.event.CreateTableEvent;
|
||||
import org.apache.flink.cdc.common.event.DataChangeEvent;
|
||||
import org.apache.flink.cdc.common.event.Event;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.schema.Column;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.common.types.DataTypes;
|
||||
import org.apache.flink.cdc.common.types.RowType;
|
||||
import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
|
||||
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
|
||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
/** Unit tests for the {@link PreTransformOperator}. */
|
||||
public class PreTransformOperatorTest {
|
||||
private static final TableId CUSTOMERS_TABLEID =
|
||||
TableId.tableId("my_company", "my_branch", "customers");
|
||||
private static final Schema CUSTOMERS_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("col1", DataTypes.STRING())
|
||||
.physicalColumn("col2", DataTypes.STRING())
|
||||
.primaryKey("col1")
|
||||
.build();
|
||||
private static final Schema CUSTOMERS_LATEST_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("col1", DataTypes.STRING())
|
||||
.physicalColumn("col2", DataTypes.STRING())
|
||||
.physicalColumn("col3", DataTypes.STRING())
|
||||
.primaryKey("col1")
|
||||
.build();
|
||||
private static final Schema EXPECT_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("col1", DataTypes.STRING())
|
||||
.physicalColumn("col2", DataTypes.STRING())
|
||||
.primaryKey("col2")
|
||||
.partitionKey("col12")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
private static final Schema EXPECT_LATEST_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("col1", DataTypes.STRING())
|
||||
.physicalColumn("col2", DataTypes.STRING())
|
||||
.physicalColumn("col3", DataTypes.STRING())
|
||||
.primaryKey("col2")
|
||||
.partitionKey("col12")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final Schema NULLABILITY_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.partitionKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final Schema EXPECTED_NULLABILITY_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.partitionKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final Schema REFERENCED_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("ref1", DataTypes.STRING())
|
||||
.physicalColumn("ref2", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.partitionKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final Schema EXPECTED_REFERENCED_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("ref1", DataTypes.STRING())
|
||||
.physicalColumn("ref2", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.partitionKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final Schema WILDCARD_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.partitionKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final Schema EXPECTED_WILDCARD_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.partitionKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final TableId METADATA_TABLEID =
|
||||
TableId.tableId("my_company", "my_branch", "metadata_table");
|
||||
private static final Schema METADATA_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final Schema EXPECTED_METADATA_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final TableId METADATA_AS_TABLEID =
|
||||
TableId.tableId("my_company", "my_branch", "metadata_as_table");
|
||||
private static final Schema METADATA_AS_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("sid", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("name_upper", DataTypes.STRING())
|
||||
.physicalColumn("tbname", DataTypes.STRING())
|
||||
.primaryKey("sid")
|
||||
.build();
|
||||
|
||||
@Test
|
||||
void testEventTransform() throws Exception {
|
||||
PreTransformOperator transform =
|
||||
PreTransformOperator.newBuilder()
|
||||
.addTransform(
|
||||
CUSTOMERS_TABLEID.identifier(),
|
||||
"*, concat(col1,col2) col12",
|
||||
null,
|
||||
"col2",
|
||||
"col12",
|
||||
"key1=value1,key2=value2")
|
||||
.build();
|
||||
EventOperatorTestHarness<PreTransformOperator, Event>
|
||||
transformFunctionEventEventOperatorTestHarness =
|
||||
new EventOperatorTestHarness<>(transform, 1);
|
||||
// Initialization
|
||||
transformFunctionEventEventOperatorTestHarness.open();
|
||||
// Create table
|
||||
CreateTableEvent createTableEvent =
|
||||
new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
|
||||
// Add column
|
||||
AddColumnEvent.ColumnWithPosition columnWithPosition =
|
||||
new AddColumnEvent.ColumnWithPosition(
|
||||
Column.physicalColumn("col3", DataTypes.STRING()));
|
||||
AddColumnEvent addColumnEvent =
|
||||
new AddColumnEvent(
|
||||
CUSTOMERS_TABLEID, Collections.singletonList(columnWithPosition));
|
||||
BinaryRecordDataGenerator recordDataGenerator =
|
||||
new BinaryRecordDataGenerator(((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType()));
|
||||
BinaryRecordDataGenerator recordDataGeneratorExpect =
|
||||
new BinaryRecordDataGenerator(((RowType) EXPECT_LATEST_SCHEMA.toRowDataType()));
|
||||
// Insert
|
||||
DataChangeEvent insertEvent =
|
||||
DataChangeEvent.insertEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("2"),
|
||||
new BinaryStringData("3"),
|
||||
}));
|
||||
DataChangeEvent insertEventExpect =
|
||||
DataChangeEvent.insertEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("2"),
|
||||
new BinaryStringData("3")
|
||||
}));
|
||||
|
||||
// Update
|
||||
DataChangeEvent updateEvent =
|
||||
DataChangeEvent.updateEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("2"),
|
||||
new BinaryStringData("3")
|
||||
}),
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("3"),
|
||||
new BinaryStringData("3")
|
||||
}));
|
||||
DataChangeEvent updateEventExpect =
|
||||
DataChangeEvent.updateEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("2"),
|
||||
new BinaryStringData("3")
|
||||
}),
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("3"),
|
||||
new BinaryStringData("3")
|
||||
}));
|
||||
transform.processElement(new StreamRecord<>(createTableEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(
|
||||
new StreamRecord<>(new CreateTableEvent(CUSTOMERS_TABLEID, EXPECT_SCHEMA)));
|
||||
transform.processElement(new StreamRecord<>(addColumnEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(addColumnEvent));
|
||||
transform.processElement(new StreamRecord<>(insertEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(insertEventExpect));
|
||||
transform.processElement(new StreamRecord<>(updateEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(updateEventExpect));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullabilityColumn() throws Exception {
|
||||
PreTransformOperator transform =
|
||||
PreTransformOperator.newBuilder()
|
||||
.addTransform(
|
||||
CUSTOMERS_TABLEID.identifier(),
|
||||
"id, upper(id) uid, name, upper(name) uname",
|
||||
null,
|
||||
"id",
|
||||
"id",
|
||||
"key1=value1,key2=value2")
|
||||
.build();
|
||||
EventOperatorTestHarness<PreTransformOperator, Event>
|
||||
transformFunctionEventEventOperatorTestHarness =
|
||||
new EventOperatorTestHarness<>(transform, 1);
|
||||
// Initialization
|
||||
transformFunctionEventEventOperatorTestHarness.open();
|
||||
// Create table
|
||||
CreateTableEvent createTableEvent =
|
||||
new CreateTableEvent(CUSTOMERS_TABLEID, NULLABILITY_SCHEMA);
|
||||
transform.processElement(new StreamRecord<>(createTableEvent));
|
||||
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(
|
||||
new StreamRecord<>(
|
||||
new CreateTableEvent(
|
||||
CUSTOMERS_TABLEID, EXPECTED_NULLABILITY_SCHEMA)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReduceTransformColumn() throws Exception {
|
||||
PreTransformOperator transform =
|
||||
PreTransformOperator.newBuilder()
|
||||
.addTransform(
|
||||
CUSTOMERS_TABLEID.identifier(),
|
||||
"id, upper(id) as uid, age + 1 as newage, lower(ref1) as ref1",
|
||||
"newage > 17 and ref2 > 17",
|
||||
"id",
|
||||
"id",
|
||||
"key1=value1,key2=value2")
|
||||
.build();
|
||||
EventOperatorTestHarness<PreTransformOperator, Event>
|
||||
transformFunctionEventEventOperatorTestHarness =
|
||||
new EventOperatorTestHarness<>(transform, 1);
|
||||
// Initialization
|
||||
transformFunctionEventEventOperatorTestHarness.open();
|
||||
// Create table
|
||||
CreateTableEvent createTableEvent =
|
||||
new CreateTableEvent(CUSTOMERS_TABLEID, REFERENCED_SCHEMA);
|
||||
transform.processElement(new StreamRecord<>(createTableEvent));
|
||||
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(
|
||||
new StreamRecord<>(
|
||||
new CreateTableEvent(
|
||||
CUSTOMERS_TABLEID, EXPECTED_REFERENCED_SCHEMA)));
|
||||
|
||||
BinaryRecordDataGenerator recordDataGenerator =
|
||||
new BinaryRecordDataGenerator(((RowType) REFERENCED_SCHEMA.toRowDataType()));
|
||||
BinaryRecordDataGenerator recordDataGeneratorExpect =
|
||||
new BinaryRecordDataGenerator(
|
||||
((RowType) EXPECTED_REFERENCED_SCHEMA.toRowDataType()));
|
||||
|
||||
// Insert
|
||||
DataChangeEvent insertEvent =
|
||||
DataChangeEvent.insertEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"),
|
||||
17,
|
||||
new BinaryStringData("Alice"),
|
||||
new BinaryStringData("Reference"),
|
||||
42,
|
||||
}));
|
||||
DataChangeEvent insertEventExpect =
|
||||
DataChangeEvent.insertEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"),
|
||||
17,
|
||||
new BinaryStringData("Reference"),
|
||||
42
|
||||
}));
|
||||
|
||||
// Update
|
||||
DataChangeEvent updateEvent =
|
||||
DataChangeEvent.updateEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"),
|
||||
17,
|
||||
new BinaryStringData("Alice"),
|
||||
new BinaryStringData("Reference"),
|
||||
42,
|
||||
}),
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"),
|
||||
18,
|
||||
new BinaryStringData("Arisu"),
|
||||
new BinaryStringData("UpdatedReference"),
|
||||
41,
|
||||
}));
|
||||
DataChangeEvent updateEventExpect =
|
||||
DataChangeEvent.updateEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"),
|
||||
17,
|
||||
new BinaryStringData("Reference"),
|
||||
42
|
||||
}),
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"),
|
||||
18,
|
||||
new BinaryStringData("UpdatedReference"),
|
||||
41
|
||||
}));
|
||||
|
||||
transform.processElement(new StreamRecord<>(insertEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(insertEventExpect));
|
||||
|
||||
transform.processElement(new StreamRecord<>(updateEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(updateEventExpect));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWildcardTransformColumn() throws Exception {
|
||||
PreTransformOperator transform =
|
||||
PreTransformOperator.newBuilder()
|
||||
.addTransform(
|
||||
CUSTOMERS_TABLEID.identifier(),
|
||||
"*, age + 1 as newage",
|
||||
"newage > 17",
|
||||
"id",
|
||||
"id",
|
||||
"key1=value1,key2=value2")
|
||||
.build();
|
||||
EventOperatorTestHarness<PreTransformOperator, Event>
|
||||
transformFunctionEventEventOperatorTestHarness =
|
||||
new EventOperatorTestHarness<>(transform, 1);
|
||||
// Initialization
|
||||
transformFunctionEventEventOperatorTestHarness.open();
|
||||
// Create table
|
||||
CreateTableEvent createTableEvent =
|
||||
new CreateTableEvent(CUSTOMERS_TABLEID, WILDCARD_SCHEMA);
|
||||
transform.processElement(new StreamRecord<>(createTableEvent));
|
||||
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(
|
||||
new StreamRecord<>(
|
||||
new CreateTableEvent(CUSTOMERS_TABLEID, EXPECTED_WILDCARD_SCHEMA)));
|
||||
|
||||
BinaryRecordDataGenerator recordDataGenerator =
|
||||
new BinaryRecordDataGenerator(((RowType) WILDCARD_SCHEMA.toRowDataType()));
|
||||
BinaryRecordDataGenerator recordDataGeneratorExpect =
|
||||
new BinaryRecordDataGenerator(((RowType) EXPECTED_WILDCARD_SCHEMA.toRowDataType()));
|
||||
|
||||
// Insert
|
||||
DataChangeEvent insertEvent =
|
||||
DataChangeEvent.insertEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"), 17, new BinaryStringData("Alice")
|
||||
}));
|
||||
DataChangeEvent insertEventExpect =
|
||||
DataChangeEvent.insertEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"),
|
||||
17,
|
||||
new BinaryStringData("Alice"),
|
||||
}));
|
||||
|
||||
// Update
|
||||
DataChangeEvent updateEvent =
|
||||
DataChangeEvent.updateEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"), 17, new BinaryStringData("Alice")
|
||||
}),
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"), 18, new BinaryStringData("Arisu")
|
||||
}));
|
||||
DataChangeEvent updateEventExpect =
|
||||
DataChangeEvent.updateEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"),
|
||||
17,
|
||||
new BinaryStringData("Alice"),
|
||||
}),
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("id001"),
|
||||
18,
|
||||
new BinaryStringData("Arisu"),
|
||||
}));
|
||||
|
||||
transform.processElement(new StreamRecord<>(insertEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(insertEventExpect));
|
||||
|
||||
transform.processElement(new StreamRecord<>(updateEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(updateEventExpect));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testMetadataTransform() throws Exception {
|
||||
PreTransformOperator transform =
|
||||
PreTransformOperator.newBuilder()
|
||||
.addTransform(
|
||||
METADATA_TABLEID.identifier(),
|
||||
"*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name, __namespace_name__, __schema_name__, __table_name__",
|
||||
" __table_name__ = 'metadata_table' ")
|
||||
.build();
|
||||
|
||||
EventOperatorTestHarness<PreTransformOperator, Event>
|
||||
transformFunctionEventEventOperatorTestHarness =
|
||||
new EventOperatorTestHarness<>(transform, 1);
|
||||
// Initialization
|
||||
transformFunctionEventEventOperatorTestHarness.open();
|
||||
// Create table
|
||||
CreateTableEvent createTableEvent = new CreateTableEvent(METADATA_TABLEID, METADATA_SCHEMA);
|
||||
transform.processElement(new StreamRecord<>(createTableEvent));
|
||||
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(
|
||||
new StreamRecord<>(
|
||||
new CreateTableEvent(METADATA_TABLEID, EXPECTED_METADATA_SCHEMA)));
|
||||
}
|
||||
}
|
@ -1,228 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.runtime.operators.transform;
|
||||
|
||||
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
|
||||
import org.apache.flink.cdc.common.event.AddColumnEvent;
|
||||
import org.apache.flink.cdc.common.event.CreateTableEvent;
|
||||
import org.apache.flink.cdc.common.event.DataChangeEvent;
|
||||
import org.apache.flink.cdc.common.event.Event;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.schema.Column;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.common.types.DataTypes;
|
||||
import org.apache.flink.cdc.common.types.RowType;
|
||||
import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
|
||||
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
|
||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
/** Unit tests for the {@link TransformSchemaOperator}. */
|
||||
public class TransformSchemaOperatorTest {
|
||||
private static final TableId CUSTOMERS_TABLEID =
|
||||
TableId.tableId("my_company", "my_branch", "customers");
|
||||
private static final Schema CUSTOMERS_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("col1", DataTypes.STRING())
|
||||
.physicalColumn("col2", DataTypes.STRING())
|
||||
.primaryKey("col1")
|
||||
.build();
|
||||
private static final Schema CUSTOMERS_LATEST_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("col1", DataTypes.STRING())
|
||||
.physicalColumn("col2", DataTypes.STRING())
|
||||
.physicalColumn("col3", DataTypes.STRING())
|
||||
.primaryKey("col1")
|
||||
.build();
|
||||
private static final Schema EXPECT_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("col1", DataTypes.STRING())
|
||||
.physicalColumn("col2", DataTypes.STRING())
|
||||
.physicalColumn("col12", DataTypes.STRING())
|
||||
.primaryKey("col2")
|
||||
.partitionKey("col12")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
private static final Schema EXPECT_LATEST_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("col1", DataTypes.STRING())
|
||||
.physicalColumn("col2", DataTypes.STRING())
|
||||
.physicalColumn("col12", DataTypes.STRING())
|
||||
.physicalColumn("col3", DataTypes.STRING())
|
||||
.primaryKey("col2")
|
||||
.partitionKey("col12")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final Schema NULLABILITY_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.partitionKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
private static final Schema EXPECTED_NULLABILITY_SCHEMA =
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("uid", DataTypes.STRING())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("uname", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.partitionKey("id")
|
||||
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
|
||||
.build();
|
||||
|
||||
@Test
|
||||
void testEventTransform() throws Exception {
|
||||
TransformSchemaOperator transform =
|
||||
TransformSchemaOperator.newBuilder()
|
||||
.addTransform(
|
||||
CUSTOMERS_TABLEID.identifier(),
|
||||
"*, concat(col1,col2) col12",
|
||||
"col2",
|
||||
"col12",
|
||||
"key1=value1,key2=value2")
|
||||
.build();
|
||||
EventOperatorTestHarness<TransformSchemaOperator, Event>
|
||||
transformFunctionEventEventOperatorTestHarness =
|
||||
new EventOperatorTestHarness<>(transform, 1);
|
||||
// Initialization
|
||||
transformFunctionEventEventOperatorTestHarness.open();
|
||||
// Create table
|
||||
CreateTableEvent createTableEvent =
|
||||
new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
|
||||
// Add column
|
||||
AddColumnEvent.ColumnWithPosition columnWithPosition =
|
||||
new AddColumnEvent.ColumnWithPosition(
|
||||
Column.physicalColumn("col3", DataTypes.STRING()));
|
||||
AddColumnEvent addColumnEvent =
|
||||
new AddColumnEvent(
|
||||
CUSTOMERS_TABLEID, Collections.singletonList(columnWithPosition));
|
||||
BinaryRecordDataGenerator recordDataGenerator =
|
||||
new BinaryRecordDataGenerator(((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType()));
|
||||
BinaryRecordDataGenerator recordDataGeneratorExpect =
|
||||
new BinaryRecordDataGenerator(((RowType) EXPECT_LATEST_SCHEMA.toRowDataType()));
|
||||
// Insert
|
||||
DataChangeEvent insertEvent =
|
||||
DataChangeEvent.insertEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("2"),
|
||||
new BinaryStringData("3"),
|
||||
}));
|
||||
DataChangeEvent insertEventExpect =
|
||||
DataChangeEvent.insertEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("2"),
|
||||
null,
|
||||
new BinaryStringData("3")
|
||||
}));
|
||||
|
||||
// Update
|
||||
DataChangeEvent updateEvent =
|
||||
DataChangeEvent.updateEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("2"),
|
||||
new BinaryStringData("3")
|
||||
}),
|
||||
recordDataGenerator.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("3"),
|
||||
new BinaryStringData("3")
|
||||
}));
|
||||
DataChangeEvent updateEventExpect =
|
||||
DataChangeEvent.updateEvent(
|
||||
CUSTOMERS_TABLEID,
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("2"),
|
||||
null,
|
||||
new BinaryStringData("3")
|
||||
}),
|
||||
recordDataGeneratorExpect.generate(
|
||||
new Object[] {
|
||||
new BinaryStringData("1"),
|
||||
new BinaryStringData("3"),
|
||||
null,
|
||||
new BinaryStringData("3")
|
||||
}));
|
||||
transform.processElement(new StreamRecord<>(createTableEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(
|
||||
new StreamRecord<>(new CreateTableEvent(CUSTOMERS_TABLEID, EXPECT_SCHEMA)));
|
||||
transform.processElement(new StreamRecord<>(addColumnEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(addColumnEvent));
|
||||
transform.processElement(new StreamRecord<>(insertEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(insertEventExpect));
|
||||
transform.processElement(new StreamRecord<>(updateEvent));
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(new StreamRecord<>(updateEventExpect));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullabilityColumn() throws Exception {
|
||||
TransformSchemaOperator transform =
|
||||
TransformSchemaOperator.newBuilder()
|
||||
.addTransform(
|
||||
CUSTOMERS_TABLEID.identifier(),
|
||||
"id, upper(id) uid, name, upper(name) uname",
|
||||
"id",
|
||||
"id",
|
||||
"key1=value1,key2=value2")
|
||||
.build();
|
||||
EventOperatorTestHarness<TransformSchemaOperator, Event>
|
||||
transformFunctionEventEventOperatorTestHarness =
|
||||
new EventOperatorTestHarness<>(transform, 1);
|
||||
// Initialization
|
||||
transformFunctionEventEventOperatorTestHarness.open();
|
||||
// Create table
|
||||
CreateTableEvent createTableEvent =
|
||||
new CreateTableEvent(CUSTOMERS_TABLEID, NULLABILITY_SCHEMA);
|
||||
transform.processElement(new StreamRecord<>(createTableEvent));
|
||||
|
||||
Assertions.assertThat(
|
||||
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
|
||||
.isEqualTo(
|
||||
new StreamRecord<>(
|
||||
new CreateTableEvent(
|
||||
CUSTOMERS_TABLEID, EXPECTED_NULLABILITY_SCHEMA)));
|
||||
}
|
||||
}
|
@ -0,0 +1,985 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.runtime.operators.transform;
|
||||
|
||||
import org.apache.flink.cdc.common.data.RecordData;
|
||||
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
|
||||
import org.apache.flink.cdc.common.event.CreateTableEvent;
|
||||
import org.apache.flink.cdc.common.event.DataChangeEvent;
|
||||
import org.apache.flink.cdc.common.event.Event;
|
||||
import org.apache.flink.cdc.common.event.TableId;
|
||||
import org.apache.flink.cdc.common.schema.Schema;
|
||||
import org.apache.flink.cdc.common.types.DataTypes;
|
||||
import org.apache.flink.cdc.common.types.RowType;
|
||||
import org.apache.flink.cdc.common.utils.SchemaUtils;
|
||||
import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
|
||||
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
|
||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/** Unit tests for the {@link PreTransformOperator} and {@link PostTransformOperator}. */
|
||||
public class UnifiedTransformOperatorTest {
|
||||
|
||||
/** Defines a unified transform test cases. */
|
||||
static class UnifiedTransformTestCase {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UnifiedTransformTestCase.class);
|
||||
|
||||
private final TableId tableId;
|
||||
private final String projectionExpression;
|
||||
private final String filterExpression;
|
||||
|
||||
private Schema sourceSchema;
|
||||
private Schema preTransformedSchema;
|
||||
private Schema postTransformedSchema;
|
||||
|
||||
private final List<Event> sourceEvents;
|
||||
private final List<Event> preTransformedEvents;
|
||||
private final List<Event> postTransformedEvents;
|
||||
|
||||
private final List<RecordData.FieldGetter> sourceFieldGetters;
|
||||
private final List<RecordData.FieldGetter> preTransformedFieldGetters;
|
||||
private final List<RecordData.FieldGetter> postTransformedFieldGetters;
|
||||
|
||||
private PreTransformOperator preTransformOperator;
|
||||
private PostTransformOperator postTransformOperator;
|
||||
|
||||
private final BinaryRecordDataGenerator sourceRecordGenerator;
|
||||
private final BinaryRecordDataGenerator preTransformedRecordGenerator;
|
||||
private final BinaryRecordDataGenerator postTransformedRecordGenerator;
|
||||
|
||||
private EventOperatorTestHarness<PreTransformOperator, Event> preTransformOperatorHarness;
|
||||
private EventOperatorTestHarness<PostTransformOperator, Event> postTransformOperatorHarness;
|
||||
|
||||
public static UnifiedTransformTestCase of(
|
||||
TableId tableId,
|
||||
String projectionExpression,
|
||||
String filterExpression,
|
||||
Schema sourceSchema,
|
||||
Schema preTransformedSchema,
|
||||
Schema postTransformedSchema) {
|
||||
return new UnifiedTransformTestCase(
|
||||
tableId,
|
||||
projectionExpression,
|
||||
filterExpression,
|
||||
sourceSchema,
|
||||
preTransformedSchema,
|
||||
postTransformedSchema);
|
||||
}
|
||||
|
||||
private Object[] stringify(Object... objects) {
|
||||
return Arrays.stream(objects)
|
||||
.map(o -> o instanceof String ? new BinaryStringData((String) o) : o)
|
||||
.toArray();
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase insertSource(Object... record) {
|
||||
sourceEvents.add(
|
||||
DataChangeEvent.insertEvent(
|
||||
tableId, sourceRecordGenerator.generate(stringify(record))));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase insertPreTransformed() {
|
||||
preTransformedEvents.add(null);
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase insertPreTransformed(Object... record) {
|
||||
preTransformedEvents.add(
|
||||
DataChangeEvent.insertEvent(
|
||||
tableId, preTransformedRecordGenerator.generate(stringify(record))));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase insertPostTransformed() {
|
||||
postTransformedEvents.add(null);
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase insertPostTransformed(Object... record) {
|
||||
postTransformedEvents.add(
|
||||
DataChangeEvent.insertEvent(
|
||||
tableId, postTransformedRecordGenerator.generate(stringify(record))));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase updateSource(Object[] beforeRecord, Object[] afterRecord) {
|
||||
sourceEvents.add(
|
||||
DataChangeEvent.updateEvent(
|
||||
tableId,
|
||||
sourceRecordGenerator.generate(stringify(beforeRecord)),
|
||||
sourceRecordGenerator.generate(stringify(afterRecord))));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase updatePreTransformed() {
|
||||
preTransformedEvents.add(null);
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase updatePreTransformed(
|
||||
Object[] beforeRecord, Object[] afterRecord) {
|
||||
preTransformedEvents.add(
|
||||
DataChangeEvent.updateEvent(
|
||||
tableId,
|
||||
preTransformedRecordGenerator.generate(stringify(beforeRecord)),
|
||||
preTransformedRecordGenerator.generate(stringify(afterRecord))));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase updatePostTransformed() {
|
||||
postTransformedEvents.add(null);
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase updatePostTransformed(
|
||||
Object[] beforeRecord, Object[] afterRecord) {
|
||||
postTransformedEvents.add(
|
||||
DataChangeEvent.updateEvent(
|
||||
tableId,
|
||||
postTransformedRecordGenerator.generate(stringify(beforeRecord)),
|
||||
postTransformedRecordGenerator.generate(stringify(afterRecord))));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase deleteSource(Object... record) {
|
||||
sourceEvents.add(
|
||||
DataChangeEvent.deleteEvent(
|
||||
tableId, sourceRecordGenerator.generate(stringify(record))));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase deletePreTransformed() {
|
||||
preTransformedEvents.add(null);
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase deletePreTransformed(Object... record) {
|
||||
preTransformedEvents.add(
|
||||
DataChangeEvent.deleteEvent(
|
||||
tableId, preTransformedRecordGenerator.generate(stringify(record))));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase deletePostTransformed() {
|
||||
postTransformedEvents.add(null);
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase deletePostTransformed(Object... record) {
|
||||
postTransformedEvents.add(
|
||||
DataChangeEvent.deleteEvent(
|
||||
tableId, postTransformedRecordGenerator.generate(stringify(record))));
|
||||
return this;
|
||||
}
|
||||
|
||||
private UnifiedTransformTestCase(
|
||||
TableId tableId,
|
||||
String projectionExpression,
|
||||
String filterExpression,
|
||||
Schema sourceSchema,
|
||||
Schema preTransformedSchema,
|
||||
Schema postTransformedSchema) {
|
||||
this.tableId = tableId;
|
||||
this.projectionExpression = projectionExpression;
|
||||
this.filterExpression = filterExpression;
|
||||
|
||||
this.sourceSchema = sourceSchema;
|
||||
this.preTransformedSchema = preTransformedSchema;
|
||||
this.postTransformedSchema = postTransformedSchema;
|
||||
|
||||
this.sourceRecordGenerator =
|
||||
new BinaryRecordDataGenerator(((RowType) sourceSchema.toRowDataType()));
|
||||
this.preTransformedRecordGenerator =
|
||||
new BinaryRecordDataGenerator(((RowType) preTransformedSchema.toRowDataType()));
|
||||
this.postTransformedRecordGenerator =
|
||||
new BinaryRecordDataGenerator(
|
||||
((RowType) postTransformedSchema.toRowDataType()));
|
||||
|
||||
this.sourceEvents = new ArrayList<>();
|
||||
this.preTransformedEvents = new ArrayList<>();
|
||||
this.postTransformedEvents = new ArrayList<>();
|
||||
|
||||
this.sourceEvents.add(new CreateTableEvent(tableId, sourceSchema));
|
||||
this.preTransformedEvents.add(new CreateTableEvent(tableId, preTransformedSchema));
|
||||
this.postTransformedEvents.add(new CreateTableEvent(tableId, postTransformedSchema));
|
||||
|
||||
this.sourceFieldGetters = SchemaUtils.createFieldGetters(sourceSchema);
|
||||
this.preTransformedFieldGetters = SchemaUtils.createFieldGetters(preTransformedSchema);
|
||||
this.postTransformedFieldGetters =
|
||||
SchemaUtils.createFieldGetters(postTransformedSchema);
|
||||
}
|
||||
|
||||
private UnifiedTransformTestCase initializeHarness() throws Exception {
|
||||
preTransformOperator =
|
||||
PreTransformOperator.newBuilder()
|
||||
.addTransform(
|
||||
tableId.identifier(), projectionExpression, filterExpression)
|
||||
.build();
|
||||
preTransformOperatorHarness = new EventOperatorTestHarness<>(preTransformOperator, 1);
|
||||
preTransformOperatorHarness.open();
|
||||
|
||||
postTransformOperator =
|
||||
PostTransformOperator.newBuilder()
|
||||
.addTransform(
|
||||
tableId.identifier(), projectionExpression, filterExpression)
|
||||
.build();
|
||||
postTransformOperatorHarness = new EventOperatorTestHarness<>(postTransformOperator, 1);
|
||||
postTransformOperatorHarness.open();
|
||||
return this;
|
||||
}
|
||||
|
||||
private void destroyHarness() throws Exception {
|
||||
if (preTransformOperatorHarness != null) {
|
||||
preTransformOperatorHarness.close();
|
||||
}
|
||||
if (postTransformOperatorHarness != null) {
|
||||
postTransformOperatorHarness.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void logBinaryDataContents(
|
||||
String prefix, Event event, List<RecordData.FieldGetter> fieldGetters) {
|
||||
LOG.info("{}: {}", prefix, event);
|
||||
if (event instanceof DataChangeEvent) {
|
||||
LOG.info(
|
||||
" Before Record Data: {}",
|
||||
SchemaUtils.restoreOriginalData(
|
||||
((DataChangeEvent) event).before(), fieldGetters));
|
||||
LOG.info(
|
||||
" After Record Data: {}",
|
||||
SchemaUtils.restoreOriginalData(
|
||||
((DataChangeEvent) event).after(), fieldGetters));
|
||||
}
|
||||
}
|
||||
|
||||
public UnifiedTransformTestCase runTests() throws Exception {
|
||||
for (int i = 0; i < sourceEvents.size(); i++) {
|
||||
Event sourceEvent = sourceEvents.get(i);
|
||||
logBinaryDataContents("Source Event", sourceEvent, sourceFieldGetters);
|
||||
|
||||
preTransformOperator.processElement(new StreamRecord<>(sourceEvent));
|
||||
|
||||
Event expectedPreTransformEvent = preTransformedEvents.get(i);
|
||||
Event actualPreTransformEvent =
|
||||
Optional.ofNullable(preTransformOperatorHarness.getOutputRecords().poll())
|
||||
.map(StreamRecord::getValue)
|
||||
.orElse(null);
|
||||
|
||||
logBinaryDataContents(
|
||||
"Expected PreTransform ",
|
||||
expectedPreTransformEvent,
|
||||
preTransformedFieldGetters);
|
||||
logBinaryDataContents(
|
||||
" Actual PreTransform ",
|
||||
actualPreTransformEvent,
|
||||
preTransformedFieldGetters);
|
||||
Assertions.assertThat(actualPreTransformEvent).isEqualTo(expectedPreTransformEvent);
|
||||
|
||||
postTransformOperator.processElement(
|
||||
new StreamRecord<>(preTransformedEvents.get(i)));
|
||||
Event expectedPostTransformEvent = postTransformedEvents.get(i);
|
||||
Event actualPostTransformEvent =
|
||||
Optional.ofNullable(postTransformOperatorHarness.getOutputRecords().poll())
|
||||
.map(StreamRecord::getValue)
|
||||
.orElse(null);
|
||||
logBinaryDataContents(
|
||||
"Expected PostTransform",
|
||||
expectedPostTransformEvent,
|
||||
postTransformedFieldGetters);
|
||||
logBinaryDataContents(
|
||||
" Actual PostTransform",
|
||||
actualPostTransformEvent,
|
||||
postTransformedFieldGetters);
|
||||
Assertions.assertThat(actualPostTransformEvent)
|
||||
.isEqualTo(expectedPostTransformEvent);
|
||||
}
|
||||
|
||||
sourceEvents.clear();
|
||||
preTransformedEvents.clear();
|
||||
postTransformedEvents.clear();
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataChangeEventTransform() throws Exception {
|
||||
TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes");
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"id, age, id + age as computed",
|
||||
"id > 100",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("computed", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource(1000, "Alice", 17)
|
||||
.insertPreTransformed(1000, 17)
|
||||
.insertPostTransformed(1000, 17, 1017)
|
||||
.insertSource(2000, "Bob", 18)
|
||||
.insertPreTransformed(2000, 18)
|
||||
.insertPostTransformed(2000, 18, 2018)
|
||||
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePreTransformed(new Object[] {2000, 18}, new Object[] {2000, 16})
|
||||
.updatePostTransformed(new Object[] {2000, 18, 2018}, new Object[] {2000, 16, 2016})
|
||||
// filtered out data row
|
||||
.insertSource(50, "Carol", 19)
|
||||
.insertPreTransformed(50, 19)
|
||||
.insertPostTransformed()
|
||||
.deleteSource(1000, "Alice", 17)
|
||||
.deletePreTransformed(1000, 17)
|
||||
.deletePostTransformed(1000, 17, 1017)
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaNullabilityTransform() throws Exception {
|
||||
TableId tableId = TableId.tableId("my_company", "my_branch", "schema_nullability");
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"id, name, age, id + age as computed",
|
||||
"id > 100",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT().notNull())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT().notNull())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT().notNull())
|
||||
.physicalColumn("computed", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource(1000, "Alice", 17)
|
||||
.insertPreTransformed(1000, "Alice", 17)
|
||||
.insertPostTransformed(1000, "Alice", 17, 1017)
|
||||
.insertSource(2000, "Bob", 18)
|
||||
.insertPreTransformed(2000, "Bob", 18)
|
||||
.insertPostTransformed(2000, "Bob", 18, 2018)
|
||||
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePreTransformed(
|
||||
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePostTransformed(
|
||||
new Object[] {2000, "Bob", 18, 2018},
|
||||
new Object[] {2000, "Barcarolle", 16, 2016})
|
||||
// filtered out data row
|
||||
.insertSource(50, "Carol", 19)
|
||||
.insertPreTransformed(50, "Carol", 19)
|
||||
.insertPostTransformed()
|
||||
.deleteSource(1000, "Alice", 17)
|
||||
.deletePreTransformed(1000, "Alice", 17)
|
||||
.deletePostTransformed(1000, "Alice", 17, 1017)
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReduceColumnsTransform() throws Exception {
|
||||
TableId tableId = TableId.tableId("my_company", "my_branch", "reduce_column");
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"id, upper(id) as uid, age + 1 as newage, lower(ref1) as lowerref",
|
||||
"newage > 17 and ref2 > 17",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("name", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT().notNull())
|
||||
.physicalColumn("ref1", DataTypes.STRING())
|
||||
.physicalColumn("ref2", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT().notNull())
|
||||
.physicalColumn("ref1", DataTypes.STRING())
|
||||
.physicalColumn("ref2", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("uid", DataTypes.STRING())
|
||||
.physicalColumn("newage", DataTypes.INT())
|
||||
.physicalColumn("lowerref", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource("id001", "Alice", 17, "Reference001", 2021)
|
||||
.insertPreTransformed("id001", 17, "Reference001", 2021)
|
||||
.insertPostTransformed("id001", "ID001", 18, "reference001")
|
||||
// this data record is filtered out since newage <= 17
|
||||
.insertSource("id002", "Bob", 15, "Reference002", 2017)
|
||||
.insertPreTransformed("id002", 15, "Reference002", 2017)
|
||||
.insertPostTransformed()
|
||||
// this data record is filtered out since ref2 <= 17
|
||||
.insertSource("id003", "Bill", 18, "Reference003", 0)
|
||||
.insertPreTransformed("id003", 18, "Reference003", 0)
|
||||
.insertPostTransformed()
|
||||
.insertSource("id004", "Carol", 18, "Reference004", 2018)
|
||||
.insertPreTransformed("id004", 18, "Reference004", 2018)
|
||||
.insertPostTransformed("id004", "ID004", 19, "reference004")
|
||||
// test update event transform
|
||||
.updateSource(
|
||||
new Object[] {"id004", "Carol", 18, "Reference004", 2018},
|
||||
new Object[] {"id004", "Colin", 18, "NeoReference004", 2018})
|
||||
.updatePreTransformed(
|
||||
new Object[] {"id004", 18, "Reference004", 2018},
|
||||
new Object[] {"id004", 18, "NeoReference004", 2018})
|
||||
.updatePostTransformed(
|
||||
new Object[] {"id004", "ID004", 19, "reference004"},
|
||||
new Object[] {"id004", "ID004", 19, "neoreference004"})
|
||||
// updated value to a filtered out condition
|
||||
.updateSource(
|
||||
new Object[] {"id004", "Colin", 18, "NeoReference004", 2018},
|
||||
new Object[] {"id004", "Colin", 10, "NeoReference004", 2018})
|
||||
.updatePreTransformed(
|
||||
new Object[] {"id004", 18, "NeoReference004", 2018},
|
||||
new Object[] {"id004", 10, "NeoReference004", 2018})
|
||||
.updatePostTransformed()
|
||||
.deleteSource("id001", "Alice", 17, "Reference001", 2021)
|
||||
.deletePreTransformed("id001", 17, "Reference001", 2021)
|
||||
.deletePostTransformed("id001", "ID001", 18, "reference001")
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWildcardTransform() throws Exception {
|
||||
TableId tableId = TableId.tableId("my_company", "my_branch", "wildcard");
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"*, id + age as computed",
|
||||
"id > 100",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("computed", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource(1000, "Alice", 17)
|
||||
.insertPreTransformed(1000, "Alice", 17)
|
||||
.insertPostTransformed(1000, "Alice", 17, 1017)
|
||||
.insertSource(2000, "Bob", 18)
|
||||
.insertPreTransformed(2000, "Bob", 18)
|
||||
.insertPostTransformed(2000, "Bob", 18, 2018)
|
||||
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePreTransformed(
|
||||
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePostTransformed(
|
||||
new Object[] {2000, "Bob", 18, 2018},
|
||||
new Object[] {2000, "Barcarolle", 16, 2016})
|
||||
// filtered out data row
|
||||
.insertSource(50, "Carol", 19)
|
||||
.insertPreTransformed(50, "Carol", 19)
|
||||
.insertPostTransformed()
|
||||
.deleteSource(1000, "Alice", 17)
|
||||
.deletePreTransformed(1000, "Alice", 17)
|
||||
.deletePostTransformed(1000, "Alice", 17, 1017)
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"id + age as computed, *",
|
||||
"id > 100",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("computed", DataTypes.INT())
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource(1000, "Alice", 17)
|
||||
.insertPreTransformed(1000, "Alice", 17)
|
||||
.insertPostTransformed(1017, 1000, "Alice", 17)
|
||||
.insertSource(2000, "Bob", 18)
|
||||
.insertPreTransformed(2000, "Bob", 18)
|
||||
.insertPostTransformed(2018, 2000, "Bob", 18)
|
||||
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePreTransformed(
|
||||
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePostTransformed(
|
||||
new Object[] {2018, 2000, "Bob", 18},
|
||||
new Object[] {2016, 2000, "Barcarolle", 16})
|
||||
// filtered out data row
|
||||
.insertSource(50, "Carol", 19)
|
||||
.insertPreTransformed(50, "Carol", 19)
|
||||
.insertPostTransformed()
|
||||
.deleteSource(1000, "Alice", 17)
|
||||
.deletePreTransformed(1000, "Alice", 17)
|
||||
.deletePostTransformed(1017, 1000, "Alice", 17)
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetadataTransform() throws Exception {
|
||||
TableId tableId = TableId.tableId("my_company", "my_branch", "metadata");
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"*, __namespace_name__, __schema_name__, __table_name__",
|
||||
"id > 100",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("__namespace_name__", DataTypes.STRING().notNull())
|
||||
.physicalColumn("__schema_name__", DataTypes.STRING().notNull())
|
||||
.physicalColumn("__table_name__", DataTypes.STRING().notNull())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource(1000, "Alice", 17)
|
||||
.insertPreTransformed(1000, "Alice", 17)
|
||||
.insertPostTransformed(1000, "Alice", 17, "my_company", "my_branch", "metadata")
|
||||
.insertSource(2000, "Bob", 18)
|
||||
.insertPreTransformed(2000, "Bob", 18)
|
||||
.insertPostTransformed(2000, "Bob", 18, "my_company", "my_branch", "metadata")
|
||||
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePreTransformed(
|
||||
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePostTransformed(
|
||||
new Object[] {2000, "Bob", 18, "my_company", "my_branch", "metadata"},
|
||||
new Object[] {
|
||||
2000, "Barcarolle", 16, "my_company", "my_branch", "metadata"
|
||||
})
|
||||
// filtered out data row
|
||||
.insertSource(50, "Carol", 19)
|
||||
.insertPreTransformed(50, "Carol", 19)
|
||||
.insertPostTransformed()
|
||||
.deleteSource(1000, "Alice", 17)
|
||||
.deletePreTransformed(1000, "Alice", 17)
|
||||
.deletePostTransformed(1000, "Alice", 17, "my_company", "my_branch", "metadata")
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculatedMetadataTransform() throws Exception {
|
||||
TableId tableId = TableId.tableId("my_company", "my_branch", "metadata_transform");
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name",
|
||||
"id > 100",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("identifier_name", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource(1000, "Alice", 17)
|
||||
.insertPreTransformed(1000, "Alice", 17)
|
||||
.insertPostTransformed(1000, "Alice", 17, "my_company.my_branch.metadata_transform")
|
||||
.insertSource(2000, "Bob", 18)
|
||||
.insertPreTransformed(2000, "Bob", 18)
|
||||
.insertPostTransformed(2000, "Bob", 18, "my_company.my_branch.metadata_transform")
|
||||
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePreTransformed(
|
||||
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePostTransformed(
|
||||
new Object[] {2000, "Bob", 18, "my_company.my_branch.metadata_transform"},
|
||||
new Object[] {
|
||||
2000, "Barcarolle", 16, "my_company.my_branch.metadata_transform"
|
||||
})
|
||||
// filtered out data row
|
||||
.insertSource(50, "Carol", 19)
|
||||
.insertPreTransformed(50, "Carol", 19)
|
||||
.insertPostTransformed()
|
||||
.deleteSource(1000, "Alice", 17)
|
||||
.deletePreTransformed(1000, "Alice", 17)
|
||||
.deletePostTransformed(1000, "Alice", 17, "my_company.my_branch.metadata_transform")
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"__namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, *",
|
||||
"id > 100",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("identifier_name", DataTypes.STRING())
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource(1000, "Alice", 17)
|
||||
.insertPreTransformed(1000, "Alice", 17)
|
||||
.insertPostTransformed("my_company.my_branch.metadata_transform", 1000, "Alice", 17)
|
||||
.insertSource(2000, "Bob", 18)
|
||||
.insertPreTransformed(2000, "Bob", 18)
|
||||
.insertPostTransformed("my_company.my_branch.metadata_transform", 2000, "Bob", 18)
|
||||
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePreTransformed(
|
||||
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePostTransformed(
|
||||
new Object[] {"my_company.my_branch.metadata_transform", 2000, "Bob", 18},
|
||||
new Object[] {
|
||||
"my_company.my_branch.metadata_transform", 2000, "Barcarolle", 16
|
||||
})
|
||||
// filtered out data row
|
||||
.insertSource(50, "Carol", 19)
|
||||
.insertPreTransformed(50, "Carol", 19)
|
||||
.insertPostTransformed()
|
||||
.deleteSource(1000, "Alice", 17)
|
||||
.deletePreTransformed(1000, "Alice", 17)
|
||||
.deletePostTransformed("my_company.my_branch.metadata_transform", 1000, "Alice", 17)
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetadataAndCalculatedTransform() throws Exception {
|
||||
TableId tableId = TableId.tableId("my_company", "my_branch", "metadata_transform");
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __namespace_name__, __schema_name__, __table_name__",
|
||||
"id > 100",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.physicalColumn("identifier_name", DataTypes.STRING())
|
||||
.physicalColumn("__namespace_name__", DataTypes.STRING().notNull())
|
||||
.physicalColumn("__schema_name__", DataTypes.STRING().notNull())
|
||||
.physicalColumn("__table_name__", DataTypes.STRING().notNull())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource(1000, "Alice", 17)
|
||||
.insertPreTransformed(1000, "Alice", 17)
|
||||
.insertPostTransformed(
|
||||
1000,
|
||||
"Alice",
|
||||
17,
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform")
|
||||
.insertSource(2000, "Bob", 18)
|
||||
.insertPreTransformed(2000, "Bob", 18)
|
||||
.insertPostTransformed(
|
||||
2000,
|
||||
"Bob",
|
||||
18,
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform")
|
||||
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePreTransformed(
|
||||
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePostTransformed(
|
||||
new Object[] {
|
||||
2000,
|
||||
"Bob",
|
||||
18,
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform"
|
||||
},
|
||||
new Object[] {
|
||||
2000,
|
||||
"Barcarolle",
|
||||
16,
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform"
|
||||
})
|
||||
// filtered out data row
|
||||
.insertSource(50, "Carol", 19)
|
||||
.insertPreTransformed(50, "Carol", 19)
|
||||
.insertPostTransformed()
|
||||
.deleteSource(1000, "Alice", 17)
|
||||
.deletePreTransformed(1000, "Alice", 17)
|
||||
.deletePostTransformed(
|
||||
1000,
|
||||
"Alice",
|
||||
17,
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform")
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"__namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __namespace_name__, __schema_name__, __table_name__, *",
|
||||
"id > 100",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("identifier_name", DataTypes.STRING())
|
||||
.physicalColumn("__namespace_name__", DataTypes.STRING().notNull())
|
||||
.physicalColumn("__schema_name__", DataTypes.STRING().notNull())
|
||||
.physicalColumn("__table_name__", DataTypes.STRING().notNull())
|
||||
.physicalColumn("id", DataTypes.INT())
|
||||
.physicalColumn("name", DataTypes.STRING())
|
||||
.physicalColumn("age", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource(1000, "Alice", 17)
|
||||
.insertPreTransformed(1000, "Alice", 17)
|
||||
.insertPostTransformed(
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform",
|
||||
1000,
|
||||
"Alice",
|
||||
17)
|
||||
.insertSource(2000, "Bob", 18)
|
||||
.insertPreTransformed(2000, "Bob", 18)
|
||||
.insertPostTransformed(
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform",
|
||||
2000,
|
||||
"Bob",
|
||||
18)
|
||||
.updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePreTransformed(
|
||||
new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16})
|
||||
.updatePostTransformed(
|
||||
new Object[] {
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform",
|
||||
2000,
|
||||
"Bob",
|
||||
18
|
||||
},
|
||||
new Object[] {
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform",
|
||||
2000,
|
||||
"Barcarolle",
|
||||
16
|
||||
})
|
||||
// filtered out data row
|
||||
.insertSource(50, "Carol", 19)
|
||||
.insertPreTransformed(50, "Carol", 19)
|
||||
.insertPostTransformed()
|
||||
.deleteSource(1000, "Alice", 17)
|
||||
.deletePreTransformed(1000, "Alice", 17)
|
||||
.deletePostTransformed(
|
||||
"my_company.my_branch.metadata_transform",
|
||||
"my_company",
|
||||
"my_branch",
|
||||
"metadata_transform",
|
||||
1000,
|
||||
"Alice",
|
||||
17)
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithCast() throws Exception {
|
||||
TableId tableId = TableId.tableId("my_company", "my_branch", "transform_with_cast");
|
||||
UnifiedTransformTestCase.of(
|
||||
tableId,
|
||||
"id, age + 1 as newage, CAST(CAST(id AS INT) + age AS BIGINT) as longevity, CAST(age AS VARCHAR) as string_age",
|
||||
"newage > 17 and ref2 > 17",
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("name", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT().notNull())
|
||||
.physicalColumn("ref1", DataTypes.STRING())
|
||||
.physicalColumn("ref2", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("age", DataTypes.INT().notNull())
|
||||
.physicalColumn("ref2", DataTypes.INT())
|
||||
.primaryKey("id")
|
||||
.build(),
|
||||
Schema.newBuilder()
|
||||
.physicalColumn("id", DataTypes.STRING().notNull())
|
||||
.physicalColumn("newage", DataTypes.INT())
|
||||
.physicalColumn("longevity", DataTypes.BIGINT())
|
||||
.physicalColumn("string_age", DataTypes.STRING())
|
||||
.primaryKey("id")
|
||||
.build())
|
||||
.initializeHarness()
|
||||
.insertSource("1001", "Alice", 17, "Reference001", 2021)
|
||||
.insertPreTransformed("1001", 17, 2021)
|
||||
.insertPostTransformed("1001", 18, 1018L, "17")
|
||||
// this data record is filtered out since newage <= 17
|
||||
.insertSource("1002", "Bob", 15, "Reference002", 2017)
|
||||
.insertPreTransformed("1002", 15, 2017)
|
||||
.insertPostTransformed()
|
||||
// this data record is filtered out since ref2 <= 17
|
||||
.insertSource("1003", "Bill", 18, "Reference003", 0)
|
||||
.insertPreTransformed("1003", 18, 0)
|
||||
.insertPostTransformed()
|
||||
.insertSource("1004", "Carol", 18, "Reference004", 2018)
|
||||
.insertPreTransformed("1004", 18, 2018)
|
||||
.insertPostTransformed("1004", 19, 1022L, "18")
|
||||
// test update event transform
|
||||
.updateSource(
|
||||
new Object[] {"1004", "Carol", 18, "Reference004", 2018},
|
||||
new Object[] {"1004", "Colin", 19, "NeoReference004", 2018})
|
||||
.updatePreTransformed(
|
||||
new Object[] {"1004", 18, 2018}, new Object[] {"1004", 19, 2018})
|
||||
.updatePostTransformed(
|
||||
new Object[] {"1004", 19, 1022L, "18"},
|
||||
new Object[] {"1004", 20, 1023L, "19"})
|
||||
// updated value to a filtered out condition
|
||||
.updateSource(
|
||||
new Object[] {"1004", "Colin", 19, "NeoReference004", 2018},
|
||||
new Object[] {"1004", "Colin", 10, "NeoReference004", 2018})
|
||||
.updatePreTransformed(
|
||||
new Object[] {"1004", 19, 2018}, new Object[] {"1004", 10, 2018})
|
||||
.updatePostTransformed()
|
||||
.deleteSource("1001", "Alice", 17, "Reference001", 2021)
|
||||
.deletePreTransformed("1001", 17, 2021)
|
||||
.deletePostTransformed("1001", 18, 1018L, "17")
|
||||
.runTests()
|
||||
.destroyHarness();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue