[cdc-common] Introduce AssertJ-style assertion system for Flink CDC data structures

pull/2793/head
Qingsheng Ren 1 year ago
parent 6eb980a614
commit 2bde0beaec

@ -0,0 +1,41 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.event.AddColumnEvent;
import org.assertj.core.internal.Iterables;
import java.util.List;
/** Assertions for {@link AddColumnEvent}. */
public class AddColumnEventAssert extends ChangeEventAssert<AddColumnEventAssert, AddColumnEvent> {
private final Iterables iterables = Iterables.instance();
public static AddColumnEventAssert assertThatAddColumnEvent(AddColumnEvent event) {
return new AddColumnEventAssert(event);
}
private AddColumnEventAssert(AddColumnEvent addColumnEvent) {
super(addColumnEvent, AddColumnEventAssert.class);
}
public AddColumnEventAssert containsAddedColumns(AddColumnEvent.ColumnWithPosition... columns) {
List<AddColumnEvent.ColumnWithPosition> actualAddedColumns = actual.getAddedColumns();
iterables.assertContainsExactlyInAnyOrder(info, actualAddedColumns, columns);
return myself;
}
}

@ -0,0 +1,44 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.event.AlterColumnTypeEvent;
import com.ververica.cdc.common.types.DataType;
import org.assertj.core.internal.Maps;
import java.util.Map;
/** Assertions for {@link AlterColumnTypeEvent}. */
public class AlterColumnTypeEventAssert
extends ChangeEventAssert<AlterColumnTypeEventAssert, AlterColumnTypeEvent> {
private final Maps maps = Maps.instance();
public static AlterColumnTypeEventAssert assertThatAlterColumnTypeEvent(
AlterColumnTypeEvent event) {
return new AlterColumnTypeEventAssert(event);
}
private AlterColumnTypeEventAssert(AlterColumnTypeEvent event) {
super(event, AlterColumnTypeEventAssert.class);
}
public AlterColumnTypeEventAssert containsTypeMapping(Map<String, DataType> typeMapping) {
Map<String, DataType> actualTypeMapping = actual.getTypeMapping();
maps.assertContainsAllEntriesOf(info, actualTypeMapping, typeMapping);
return myself;
}
}

@ -0,0 +1,45 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.event.ChangeEvent;
import com.ververica.cdc.common.event.TableId;
import org.assertj.core.api.AbstractAssert;
/** Assertions for {@link ChangeEvent}. */
public class ChangeEventAssert<SELF extends AbstractAssert<SELF, EVENT>, EVENT extends ChangeEvent>
extends AbstractAssert<SELF, EVENT> {
public static <SELF extends ChangeEventAssert<SELF, ChangeEvent>>
ChangeEventAssert<SELF, ChangeEvent> assertThatChangeEvent(ChangeEvent changeEvent) {
return new ChangeEventAssert<>(changeEvent, ChangeEventAssert.class);
}
protected ChangeEventAssert(EVENT event, Class<?> selfType) {
super(event, selfType);
}
public SELF hasTableId(TableId tableId) {
if (!actual.tableId().equals(tableId)) {
failWithActualExpectedAndMessage(
actual.tableId(),
tableId,
"Table ID of the DataChangeEvent is not as expected");
}
return myself;
}
}

@ -0,0 +1,44 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.schema.Schema;
/** Assertions for {@link CreateTableEvent}. */
public class CreateTableEventAssert
extends ChangeEventAssert<CreateTableEventAssert, CreateTableEvent> {
public static CreateTableEventAssert assertThatCreateTableEvent(CreateTableEvent event) {
return new CreateTableEventAssert(event);
}
protected CreateTableEventAssert(CreateTableEvent event) {
super(event, CreateTableEventAssert.class);
}
public CreateTableEventAssert hasSchema(Schema schema) {
isNotNull();
if (!actual.getSchema().equals(schema)) {
failWithActualExpectedAndMessage(
actual.getSchema(),
schema,
"The schema of the CreateTableEvent is not as expected");
}
return myself;
}
}

@ -0,0 +1,60 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.OperationType;
/** Assertions for {@link DataChangeEvent}. */
public class DataChangeEventAssert
extends ChangeEventAssert<DataChangeEventAssert, DataChangeEvent> {
public static DataChangeEventAssert assertThatDataChangeEvent(DataChangeEvent event) {
return new DataChangeEventAssert(event);
}
protected DataChangeEventAssert(DataChangeEvent dataChangeEvent) {
super(dataChangeEvent, DataChangeEventAssert.class);
}
public DataChangeEventAssert hasOperationType(OperationType operationType) {
if (!actual.op().equals(operationType)) {
failWithMessage(
"Expect DataChangeEvent to have operation type \"%s\", but was \"%s\"",
operationType, actual.op());
}
return this;
}
public RecordDataAssert<?> withBeforeRecordData() {
if (actual.before() == null) {
failWithMessage(
"DataChangeEvent with operation type \"%s\" does not have \"before\" field",
actual.op());
}
return RecordDataAssert.assertThatRecordData(actual.before());
}
public RecordDataAssert<?> withAfterRecordData() {
if (actual.after() == null) {
failWithMessage(
"DataChangeEvent with operation type \"%s\" does not have \"after\" field",
actual.op());
}
return RecordDataAssert.assertThatRecordData(actual.after());
}
}

@ -0,0 +1,43 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.event.DropColumnEvent;
import com.ververica.cdc.common.schema.Column;
import org.assertj.core.internal.Iterables;
import java.util.List;
/** Assertions for {@link DropColumnEvent}. */
public class DropColumnEventAssert
extends ChangeEventAssert<DropColumnEventAssert, DropColumnEvent> {
private final Iterables iterables = Iterables.instance();
public static DropColumnEventAssert assertThatDropColumnEvent(DropColumnEvent event) {
return new DropColumnEventAssert(event);
}
private DropColumnEventAssert(DropColumnEvent event) {
super(event, DropColumnEventAssert.class);
}
public DropColumnEventAssert containsDroppedColumns(Column... droppedColumns) {
List<Column> actualDroppedColumns = actual.getDroppedColumns();
iterables.assertContainsExactlyInAnyOrder(info, actualDroppedColumns, droppedColumns);
return myself;
}
}

@ -0,0 +1,43 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import org.assertj.core.api.AbstractAssert;
/** Assertions for {@link EventAssert}. */
public class EventAssert extends AbstractAssert<EventAssert, Event> {
public static EventAssert assertThatEvent(Event event) {
return new EventAssert(event);
}
protected EventAssert(Event event) {
super(event, EventAssert.class);
}
public DataChangeEventAssert asDataChangeEvent() {
isInstanceOf(DataChangeEvent.class);
return DataChangeEventAssert.assertThatDataChangeEvent((DataChangeEvent) actual);
}
public SchemaChangeEventAssert asSchemaChangeEvent() {
isInstanceOf(SchemaChangeEvent.class);
return SchemaChangeEventAssert.assertThatSchemaChangeEvent((SchemaChangeEvent) actual);
}
}

@ -0,0 +1,69 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.event.AddColumnEvent;
import com.ververica.cdc.common.event.ChangeEvent;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.DropColumnEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.RenameColumnEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import org.assertj.core.api.Assertions;
/** Collection for entries of customized event-related assertions. */
public class EventAssertions extends Assertions {
public static EventAssert assertThat(Event event) {
return EventAssert.assertThatEvent(event);
}
public static <SELF extends ChangeEventAssert<SELF, ChangeEvent>>
ChangeEventAssert<SELF, ChangeEvent> assertThat(ChangeEvent changeEvent) {
return ChangeEventAssert.assertThatChangeEvent(changeEvent);
}
public static DataChangeEventAssert assertThat(DataChangeEvent event) {
return DataChangeEventAssert.assertThatDataChangeEvent(event);
}
public static <SELF extends RecordDataAssert<SELF>> RecordDataAssert<SELF> assertThat(
RecordData recordData) {
return RecordDataAssert.assertThatRecordData(recordData);
}
public static SchemaChangeEventAssert assertThat(SchemaChangeEvent event) {
return SchemaChangeEventAssert.assertThatSchemaChangeEvent(event);
}
public static CreateTableEventAssert assertThat(CreateTableEvent event) {
return CreateTableEventAssert.assertThatCreateTableEvent(event);
}
public static AddColumnEventAssert assertThat(AddColumnEvent event) {
return AddColumnEventAssert.assertThatAddColumnEvent(event);
}
public static DropColumnEventAssert assertThat(DropColumnEvent event) {
return DropColumnEventAssert.assertThatDropColumnEvent(event);
}
public static RenameColumnEventAssert assertThat(RenameColumnEvent event) {
return RenameColumnEventAssert.assertThatRenameColumnEvent(event);
}
}

@ -0,0 +1,47 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.schema.Schema;
import org.assertj.core.api.AbstractAssert;
/** Assertions for {@link RecordData}. */
public class RecordDataAssert<SELF extends RecordDataAssert<SELF>>
extends AbstractAssert<SELF, RecordData> {
public static <SELF extends RecordDataAssert<SELF>> RecordDataAssert<SELF> assertThatRecordData(
RecordData recordData) {
return new RecordDataAssert<>(recordData, RecordDataAssert.class);
}
public RecordDataAssert(RecordData recordData, Class<?> selfType) {
super(recordData, selfType);
}
public RecordDataWithSchemaAssert withSchema(Schema schema) {
return new RecordDataWithSchemaAssert(actual, schema);
}
public SELF hasArity(int arity) {
if (actual.getArity() != arity) {
failWithActualExpectedAndMessage(
actual.getArity(), arity, "The RecordData has unexpected arity");
}
return myself;
}
}

@ -0,0 +1,49 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.utils.SchemaUtils;
import org.assertj.core.internal.Iterables;
import java.util.ArrayList;
import java.util.List;
/**
* Assertions for {@link RecordData} with schema so that fields in the RecordData can be checked.
*/
public class RecordDataWithSchemaAssert extends RecordDataAssert<RecordDataWithSchemaAssert> {
private final Schema schema;
private final Iterables iterables = Iterables.instance();
protected RecordDataWithSchemaAssert(RecordData recordData, Schema schema) {
super(recordData, RecordDataWithSchemaAssert.class);
this.schema = schema;
}
public RecordDataWithSchemaAssert hasFields(Object... fields) {
objects.assertNotNull(info, schema);
List<RecordData.FieldGetter> fieldGetters = SchemaUtils.createFieldGetters(schema);
List<Object> actualFields = new ArrayList<>();
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
actualFields.add(fieldGetter.getFieldOrNull(actual));
}
iterables.assertContainsExactly(info, actualFields, fields);
return myself;
}
}

@ -0,0 +1,42 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.event.RenameColumnEvent;
import org.assertj.core.internal.Maps;
import java.util.Map;
/** Assertions for {@link RenameColumnEvent}. */
public class RenameColumnEventAssert
extends ChangeEventAssert<RenameColumnEventAssert, RenameColumnEvent> {
private final Maps maps = Maps.instance();
public static RenameColumnEventAssert assertThatRenameColumnEvent(RenameColumnEvent event) {
return new RenameColumnEventAssert(event);
}
private RenameColumnEventAssert(RenameColumnEvent event) {
super(event, RenameColumnEventAssert.class);
}
public RenameColumnEventAssert containsNameMapping(Map<String, String> nameMapping) {
Map<String, String> actualNameMapping = actual.getNameMapping();
maps.assertContainsAllEntriesOf(info, actualNameMapping, nameMapping);
return myself;
}
}

@ -0,0 +1,63 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.testutils.assertions;
import com.ververica.cdc.common.event.AddColumnEvent;
import com.ververica.cdc.common.event.AlterColumnTypeEvent;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.DropColumnEvent;
import com.ververica.cdc.common.event.RenameColumnEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
/** Assertions for {@link SchemaChangeEvent}. */
public class SchemaChangeEventAssert
extends ChangeEventAssert<SchemaChangeEventAssert, SchemaChangeEvent> {
public static SchemaChangeEventAssert assertThatSchemaChangeEvent(SchemaChangeEvent event) {
return new SchemaChangeEventAssert(event);
}
protected SchemaChangeEventAssert(SchemaChangeEvent event) {
super(event, SchemaChangeEventAssert.class);
}
public CreateTableEventAssert asCreateTableEvent() {
isInstanceOf(CreateTableEvent.class);
return CreateTableEventAssert.assertThatCreateTableEvent((CreateTableEvent) actual);
}
public AddColumnEventAssert asAddColumnEvent() {
isInstanceOf(AddColumnEvent.class);
return AddColumnEventAssert.assertThatAddColumnEvent((AddColumnEvent) actual);
}
public DropColumnEventAssert asDropColumnEvent() {
isInstanceOf(DropColumnEvent.class);
return DropColumnEventAssert.assertThatDropColumnEvent((DropColumnEvent) actual);
}
public RenameColumnEventAssert asRenameColumnEvent() {
isInstanceOf(RenameColumnEvent.class);
return RenameColumnEventAssert.assertThatRenameColumnEvent(((RenameColumnEvent) actual));
}
public AlterColumnTypeEventAssert asAlterColumnTypeEvent() {
isInstanceOf(AlterColumnTypeEvent.class);
return AlterColumnTypeEventAssert.assertThatAlterColumnTypeEvent(
((AlterColumnTypeEvent) actual));
}
}
Loading…
Cancel
Save