From d5981c9acb6e1f8936952f83fa732d9b89310978 Mon Sep 17 00:00:00 2001 From: Qingsheng Ren Date: Thu, 23 Nov 2023 16:35:29 +0800 Subject: [PATCH] [cdc-runtime] Rewrite DataSinkWriterOperator to avoid classloading issues using reflection When the CDC job runs on Flink cluster, DataSinkWriterOperator is loaded by user code classloader, while SinkWriterOperator is loaded by app classloader, and the protection mechanism forbids us from accessing SinkWriterOperator across different classloaders. Therefore we have to manually load the SinkWriterOperator in user code classloader, and use reflections to control it. This closes #2742 Co-authored-by: lvyanquan --- .../sink/DataSinkWriterOperator.java | 188 ++++++++++++++++++ .../sink/DataSinkWriterOperatorFactory.java | 1 - .../sink/DataSinkWriterOperator.java | 116 ----------- 3 files changed, 188 insertions(+), 117 deletions(-) create mode 100644 flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/DataSinkWriterOperator.java delete mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/DataSinkWriterOperator.java diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/DataSinkWriterOperator.java new file mode 100644 index 000000000..41249431f --- /dev/null +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/DataSinkWriterOperator.java @@ -0,0 +1,188 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.operators.sink; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.event.Event; +import com.ververica.cdc.common.event.FlushEvent; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; + +/** + * An operator that processes records to be written into a {@link + * org.apache.flink.api.connector.sink2.Sink}. + * + *

The operator is a proxy of SinkWriterOperator in Flink. + * + *

The operator is always part of a sink pipeline and is the first operator. + * + * @param the type of the committable (to send to downstream operators) + */ +@Internal +public class DataSinkWriterOperator extends AbstractStreamOperator> + implements OneInputStreamOperator>, BoundedOneInput { + + private SchemaEvolutionClient schemaEvolutionClient; + + private final OperatorID schemaOperatorID; + + private final Sink sink; + + private final ProcessingTimeService processingTimeService; + + private final MailboxExecutor mailboxExecutor; + + /** Operator that actually execute sink logic. */ + private Object flinkWriterOperator; + + /** + * The internal {@link SinkWriter} of flinkWriterOperator, obtained it through reflection to + * deal with {@link FlushEvent}. + */ + private SinkWriter copySinkWriter; + + public DataSinkWriterOperator( + Sink sink, + ProcessingTimeService processingTimeService, + MailboxExecutor mailboxExecutor, + OperatorID schemaOperatorID) { + this.sink = sink; + this.processingTimeService = processingTimeService; + this.mailboxExecutor = mailboxExecutor; + this.schemaOperatorID = schemaOperatorID; + } + + @Override + public void setup( + StreamTask containingTask, + StreamConfig config, + Output>> output) { + super.setup(containingTask, config, output); + flinkWriterOperator = createFlinkWriterOperator(); + this.>>getFlinkWriterOperator() + .setup(containingTask, config, output); + schemaEvolutionClient = + new SchemaEvolutionClient( + containingTask.getEnvironment().getOperatorCoordinatorEventGateway(), + schemaOperatorID); + } + + @Override + public void open() throws Exception { + this.>>getFlinkWriterOperator().open(); + copySinkWriter = getFieldValue("sinkWriter"); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask()); + this.>>getFlinkWriterOperator() + .initializeState(context); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + Event event = element.getValue(); + if (event instanceof FlushEvent) { + copySinkWriter.flush(false); + schemaEvolutionClient.notifyFlushSuccess( + getRuntimeContext().getIndexOfThisSubtask(), ((FlushEvent) event).getTableId()); + } else { + this.>>getFlinkWriterOperator() + .processElement(element); + } + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + this.>>getFlinkWriterOperator() + .prepareSnapshotPreBarrier(checkpointId); + } + + @Override + public void close() throws Exception { + this.>>getFlinkWriterOperator() + .close(); + } + + @Override + public void endInput() throws Exception { + this.getFlinkWriterOperator().endInput(); + } + + // -------------------------- Reflection helper functions -------------------------- + + private Object createFlinkWriterOperator() { + try { + Class flinkWriterClass = + getRuntimeContext() + .getUserCodeClassLoader() + .loadClass( + "org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator"); + Constructor constructor = + flinkWriterClass.getDeclaredConstructor( + Sink.class, ProcessingTimeService.class, MailboxExecutor.class); + constructor.setAccessible(true); + return constructor.newInstance(sink, processingTimeService, mailboxExecutor); + } catch (Exception e) { + throw new RuntimeException("Failed to create SinkWriterOperator in Flink", e); + } + } + + /** + * Finds a field by name from its declaring class. This also searches for the field in super + * classes. + * + * @param fieldName the name of the field to find. + * @return the Object value of this field. + */ + @SuppressWarnings("unchecked") + private T getFieldValue(String fieldName) throws IllegalAccessException { + Class clazz = flinkWriterOperator.getClass(); + while (clazz != null) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return ((T) field.get(flinkWriterOperator)); + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + throw new RuntimeException("failed to get sinkWriter"); + } + + @SuppressWarnings("unchecked") + private T getFlinkWriterOperator() { + return (T) flinkWriterOperator; + } +} diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java index 836c51d25..bf565c6f5 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java @@ -24,7 +24,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; -import org.apache.flink.streaming.runtime.operators.sink.DataSinkWriterOperator; import com.ververica.cdc.common.annotation.Internal; import com.ververica.cdc.common.event.Event; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/DataSinkWriterOperator.java deleted file mode 100644 index 352f97c6a..000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/DataSinkWriterOperator.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 2023 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.common.operators.MailboxExecutor; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.StreamTask; - -import com.ververica.cdc.common.event.Event; -import com.ververica.cdc.common.event.FlushEvent; -import com.ververica.cdc.runtime.operators.sink.SchemaEvolutionClient; - -import java.lang.reflect.Field; - -/** - * An operator that processes records to be written into a {@link - * org.apache.flink.api.connector.sink2.Sink}. It also has a way to process committables with the - * same parallelism or send them downstream to a {@link CommitterOperator} with a different - * parallelism. - * - *

The operator is always part of a sink pipeline and is the first operator. - * - * @param the type of the committable (to send to downstream operators) - */ -public class DataSinkWriterOperator extends SinkWriterOperator { - - private SchemaEvolutionClient schemaEvolutionClient; - - private SinkWriter copySinkWriter; - - private final OperatorID schemaOperatorID; - - public DataSinkWriterOperator( - Sink sink, - ProcessingTimeService processingTimeService, - MailboxExecutor mailboxExecutor, - OperatorID schemaOperatorID) { - super(sink, processingTimeService, mailboxExecutor); - this.schemaOperatorID = schemaOperatorID; - } - - @Override - public void setup(StreamTask containingTask, StreamConfig config, Output output) { - super.setup(containingTask, config, output); - schemaEvolutionClient = - new SchemaEvolutionClient( - containingTask.getEnvironment().getOperatorCoordinatorEventGateway(), - schemaOperatorID); - } - - @Override - public void open() throws Exception { - super.open(); - copySinkWriter = (SinkWriter) getFieldValue("sinkWriter"); - } - - /** - * Finds a field by name from its declaring class. This also searches for the field in super - * classes. - * - * @param fieldName the name of the field to find. - * @return the Object value of this field. - */ - private Object getFieldValue(String fieldName) throws IllegalAccessException { - Class clazz = this.getClass(); - while (clazz != null) { - try { - Field field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - return field.get(this); - } catch (NoSuchFieldException e) { - clazz = clazz.getSuperclass(); - } - } - return null; - } - - @Override - public void initializeState(StateInitializationContext context) throws Exception { - super.initializeState(context); - schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask()); - } - - @Override - public void processElement(StreamRecord element) throws Exception { - Event event = element.getValue(); - if (event instanceof FlushEvent) { - copySinkWriter.flush(false); - schemaEvolutionClient.notifyFlushSuccess( - getRuntimeContext().getIndexOfThisSubtask(), ((FlushEvent) event).getTableId()); - } else { - super.processElement(element); - } - } -}