[cdc-composer] Introduce partitioning related runtime functions and translator

pull/2749/head
Qingsheng Ren 1 year ago committed by Leonard Xu
parent bf5914e6ac
commit b52e88a43f

@ -0,0 +1,49 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.flink.translator;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.runtime.partitioning.EventPartitioner;
import com.ververica.cdc.runtime.partitioning.PartitioningEventKeySelector;
import com.ververica.cdc.runtime.partitioning.PostPartitionProcessor;
import com.ververica.cdc.runtime.partitioning.PrePartitionOperator;
import com.ververica.cdc.runtime.typeutils.EventTypeInfo;
import com.ververica.cdc.runtime.typeutils.PartitioningEventTypeInfo;
/** Translator for building partitioning related transformations. */
@Internal
public class PartitioningTranslator {
public DataStream<Event> translate(
DataStream<Event> input,
int upstreamParallelism,
int downstreamParallelism,
OperatorID schemaOperatorID) {
return input.transform(
"PrePartition",
new PartitioningEventTypeInfo(),
new PrePartitionOperator(schemaOperatorID, downstreamParallelism))
.setParallelism(upstreamParallelism)
.partitionCustom(new EventPartitioner(), new PartitioningEventKeySelector())
.map(new PostPartitionProcessor(), new EventTypeInfo())
.name("PostPartition");
}
}

@ -0,0 +1,36 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.partitioning;
import org.apache.flink.api.common.functions.Partitioner;
import com.ververica.cdc.common.annotation.Internal;
/** Partitioner that send {@link PartitioningEvent} to its target partition. */
@Internal
public class EventPartitioner implements Partitioner<Integer> {
@Override
public int partition(Integer target, int numPartitions) {
if (target >= numPartitions) {
throw new IllegalStateException(
String.format(
"The target of the event %d is greater than number of downstream partitions %d",
target, numPartitions));
}
return target;
}
}

@ -0,0 +1,62 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.partitioning;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.Event;
import java.util.Objects;
/**
* A wrapper around {@link Event}, which contains the target partition number and will be used in
* {@link EventPartitioner}.
*/
@Internal
public class PartitioningEvent implements Event {
private final Event payload;
private final int targetPartition;
public PartitioningEvent(Event payload, int targetPartition) {
this.payload = payload;
this.targetPartition = targetPartition;
}
public Event getPayload() {
return payload;
}
public int getTargetPartition() {
return targetPartition;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PartitioningEvent that = (PartitioningEvent) o;
return targetPartition == that.targetPartition && Objects.equals(payload, that.payload);
}
@Override
public int hashCode() {
return Objects.hash(payload, targetPartition);
}
}

@ -0,0 +1,30 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.partitioning;
import org.apache.flink.api.java.functions.KeySelector;
import com.ververica.cdc.common.annotation.Internal;
/** Key selector for {@link PartitioningEvent}. */
@Internal
public class PartitioningEventKeySelector implements KeySelector<PartitioningEvent, Integer> {
@Override
public Integer getKey(PartitioningEvent event) {
return event.getTargetPartition();
}
}

@ -0,0 +1,34 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.partitioning;
import org.apache.flink.api.common.functions.RichMapFunction;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.Event;
/**
* Function for unloading {@link Event} from internal {@link PartitioningEvent} after {@link
* EventPartitioner}.
*/
@Internal
public class PostPartitionProcessor extends RichMapFunction<PartitioningEvent, Event> {
@Override
public Event map(PartitioningEvent value) throws Exception {
return value.getPayload();
}
}

@ -0,0 +1,170 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.partitioning;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.common.event.OperationType;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.runtime.operators.sink.SchemaEvolutionClient;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
/**
* Operator for processing events from {@link
* com.ververica.cdc.runtime.operators.schema.SchemaOperator} before {@link EventPartitioner}.
*/
@Internal
public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEvent>
implements OneInputStreamOperator<Event, PartitioningEvent> {
private final OperatorID schemaOperatorId;
private final int downstreamParallelism;
private SchemaEvolutionClient schemaEvolutionClient;
private final Map<TableId, Function<DataChangeEvent, Integer>> cachedHashFunctions =
new HashMap<>();
public PrePartitionOperator(OperatorID schemaOperatorId, int downstreamParallelism) {
this.schemaOperatorId = schemaOperatorId;
this.downstreamParallelism = downstreamParallelism;
}
@Override
public void open() throws Exception {
super.open();
TaskOperatorEventGateway toCoordinator =
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway();
schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, schemaOperatorId);
}
@Override
public void processElement(StreamRecord<Event> element) throws Exception {
Event event = element.getValue();
if (event instanceof SchemaChangeEvent) {
// Update hash function
recreateHashFunction(((SchemaChangeEvent) event).tableId());
// Broadcast SchemaChangeEvent
broadcastEvent(event);
} else if (event instanceof FlushEvent) {
// Broadcast FlushEvent
broadcastEvent(event);
} else if (event instanceof DataChangeEvent) {
// Partition DataChangeEvent by table ID and primary keys
partitionBy(((DataChangeEvent) event));
}
}
private void partitionBy(DataChangeEvent dataChangeEvent) {
Function<DataChangeEvent, Integer> hashFunction =
cachedHashFunctions.get(dataChangeEvent.tableId());
output.collect(
new StreamRecord<>(
new PartitioningEvent(
dataChangeEvent,
hashFunction.apply(dataChangeEvent) % downstreamParallelism)));
}
private void broadcastEvent(Event toBroadcast) {
for (int i = 0; i < downstreamParallelism; i++) {
output.collect(new StreamRecord<>(new PartitioningEvent(toBroadcast, i)));
}
}
private void recreateHashFunction(TableId tableId) throws Exception {
Optional<Schema> optionalSchema = schemaEvolutionClient.getLatestSchema(tableId);
if (!optionalSchema.isPresent()) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table %s", tableId));
}
cachedHashFunctions.put(tableId, new HashFunction(optionalSchema.get()));
}
private static class HashFunction implements Function<DataChangeEvent, Integer> {
private final List<RecordData.FieldGetter> primaryKeyGetters;
public HashFunction(Schema schema) {
primaryKeyGetters = createFieldGetters(schema);
}
@Override
public Integer apply(DataChangeEvent event) {
List<Object> objectsToHash = new ArrayList<>();
// Table ID
TableId tableId = event.tableId();
Optional.ofNullable(tableId.getNamespace()).ifPresent(objectsToHash::add);
Optional.ofNullable(tableId.getSchemaName()).ifPresent(objectsToHash::add);
objectsToHash.add(tableId.getTableName());
// Primary key
RecordData data =
event.op().equals(OperationType.DELETE) ? event.before() : event.after();
for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) {
objectsToHash.add(primaryKeyGetter.getFieldOrNull(data));
}
// Calculate hash
return Objects.hash(objectsToHash.toArray());
}
private List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
List<RecordData.FieldGetter> fieldGetters =
new ArrayList<>(schema.primaryKeys().size());
int[] primaryKeyPositions =
schema.primaryKeys().stream()
.mapToInt(
pk -> {
int i = 0;
while (!schema.getColumns().get(i).getName().equals(pk)) {
++i;
}
if (i >= schema.getColumnCount()) {
throw new IllegalStateException(
String.format(
"Unable to find column \"%s\" which is defined as primary key",
pk));
}
return i;
})
.toArray();
for (int primaryKeyPosition : primaryKeyPositions) {
fieldGetters.add(
RecordData.createFieldGetter(
schema.getColumns().get(primaryKeyPosition).getType(),
primaryKeyPosition));
}
return fieldGetters;
}
}
}

@ -0,0 +1,103 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.serializer.event;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.runtime.partitioning.PartitioningEvent;
import com.ververica.cdc.runtime.serializer.TypeSerializerSingleton;
import java.io.IOException;
/** A {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link PartitioningEvent}. */
@Internal
public class PartitioningEventSerializer extends TypeSerializerSingleton<PartitioningEvent> {
public static final PartitioningEventSerializer INSTANCE = new PartitioningEventSerializer();
private final EventSerializer eventSerializer = EventSerializer.INSTANCE;
@Override
public boolean isImmutableType() {
return false;
}
@Override
public PartitioningEvent createInstance() {
return new PartitioningEvent(null, -1);
}
@Override
public PartitioningEvent copy(PartitioningEvent from) {
return new PartitioningEvent(
eventSerializer.copy(from.getPayload()), from.getTargetPartition());
}
@Override
public PartitioningEvent copy(PartitioningEvent from, PartitioningEvent reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(PartitioningEvent record, DataOutputView target) throws IOException {
eventSerializer.serialize(record.getPayload(), target);
target.writeInt(record.getTargetPartition());
}
@Override
public PartitioningEvent deserialize(DataInputView source) throws IOException {
Event payload = eventSerializer.deserialize(source);
int targetPartition = source.readInt();
return new PartitioningEvent(payload, targetPartition);
}
@Override
public PartitioningEvent deserialize(PartitioningEvent reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
PartitioningEvent deserialized = deserialize(source);
serialize(deserialized, target);
}
@Override
public TypeSerializerSnapshot<PartitioningEvent> snapshotConfiguration() {
return new PartitioningEventSerializerSnapshot();
}
/** {@link TypeSerializerSnapshot} for {@link PartitioningEventSerializer}. */
public static final class PartitioningEventSerializerSnapshot
extends SimpleTypeSerializerSnapshot<PartitioningEvent> {
public PartitioningEventSerializerSnapshot() {
super(() -> INSTANCE);
}
}
}

@ -0,0 +1,85 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.typeutils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.runtime.partitioning.PartitioningEvent;
import com.ververica.cdc.runtime.serializer.event.PartitioningEventSerializer;
/** Type information for {@link PartitioningEvent}. */
@Internal
public class PartitioningEventTypeInfo extends TypeInformation<PartitioningEvent> {
@Override
public boolean isBasicType() {
return false;
}
@Override
public boolean isTupleType() {
return false;
}
@Override
public int getArity() {
return 2;
}
@Override
public int getTotalFields() {
return 2;
}
@Override
public Class<PartitioningEvent> getTypeClass() {
return PartitioningEvent.class;
}
@Override
public boolean isKeyType() {
return false;
}
@Override
public TypeSerializer<PartitioningEvent> createSerializer(ExecutionConfig config) {
return PartitioningEventSerializer.INSTANCE;
}
@Override
public String toString() {
return "PartitioningEvent";
}
@Override
public boolean equals(Object obj) {
return obj instanceof PartitioningEvent;
}
@Override
public int hashCode() {
return getClass().hashCode();
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof PartitioningEvent;
}
}

@ -0,0 +1,78 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.serializer.event;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.partitioning.PartitioningEvent;
import com.ververica.cdc.runtime.serializer.SerializerTestBase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/** Unit test for {@link PartitioningEventSerializer}. */
class PartitioningEventSerializerTest extends SerializerTestBase<PartitioningEvent> {
@Override
protected TypeSerializer<PartitioningEvent> createSerializer() {
return PartitioningEventSerializer.INSTANCE;
}
@Override
protected int getLength() {
return -1;
}
@Override
protected Class<PartitioningEvent> getTypeClass() {
return PartitioningEvent.class;
}
@Override
protected PartitioningEvent[] getTestData() {
Event[] flushEvents =
new Event[] {
new FlushEvent(TableId.tableId("table")),
new FlushEvent(TableId.tableId("schema", "table")),
new FlushEvent(TableId.tableId("namespace", "schema", "table"))
};
Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData();
Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData();
List<PartitioningEvent> partitioningEvents = new ArrayList<>();
partitioningEvents.addAll(
Arrays.stream(flushEvents)
.map(event -> new PartitioningEvent(event, 1))
.collect(Collectors.toList()));
partitioningEvents.addAll(
Arrays.stream(dataChangeEvents)
.map(event -> new PartitioningEvent(event, 2))
.collect(Collectors.toList()));
partitioningEvents.addAll(
Arrays.stream(schemaChangeEvents)
.map(event -> new PartitioningEvent(event, 3))
.collect(Collectors.toList()));
return partitioningEvents.toArray(new PartitioningEvent[0]);
}
}
Loading…
Cancel
Save