[oracle] Ignore testCheckpointAndRestore util DBZ-5245 and DBZ-4936 fix

pull/1590/merge
01410172 3 years ago committed by Leonard Xu
parent 4855fa3337
commit f99efd809f

@ -142,10 +142,11 @@ public class OracleDialect implements JdbcDataSourceDialect {
}
@Override
public OracleSourceFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase) {
public OracleSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final OracleConnection jdbcConnection =
createOracleConnection(sourceConfig.getDbzConfiguration());
return new OracleSourceFetchTaskContext(sourceConfig, this, jdbcConnection);
createOracleConnection(taskSourceConfig.getDbzConfiguration());
return new OracleSourceFetchTaskContext(taskSourceConfig, this, jdbcConnection);
}
@Override

@ -24,7 +24,6 @@ import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
import com.ververica.cdc.connectors.oracle.source.meta.split.OracleSourceSplitSerializer;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import java.time.Duration;
@ -218,12 +217,7 @@ public class OracleSourceBuilder<T> {
DebeziumDeserializationSchema<T> deserializationSchema,
RedoLogOffsetFactory offsetFactory,
OracleDialect dataSourceDialect) {
super(
configFactory,
deserializationSchema,
offsetFactory,
dataSourceDialect,
new OracleSourceSplitSerializer(offsetFactory));
super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect);
}
}
}

@ -19,105 +19,92 @@
package com.ververica.cdc.connectors.oracle.source.meta.offset;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.oracle.Scn;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/** A structure describes an offset in a redo log event. */
public class RedoLogOffset extends Offset {
private static final long serialVersionUID = 1L;
public static final String REDO_LOG_SCN_OFFSET_KEY = "scn";
public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
public static final String ROWS_TO_SKIP_OFFSET_KEY = "row";
public static final String SCN_KEY = "scn";
public static final String COMMIT_SCN_KEY = "commit_scn";
public static final String LCR_POSITION_KEY = "lcr_position";
public static final RedoLogOffset INITIAL_OFFSET = new RedoLogOffset(0L);
public static final RedoLogOffset NO_STOPPING_OFFSET = new RedoLogOffset(Long.MIN_VALUE);
public static final RedoLogOffset INITIAL_OFFSET = new RedoLogOffset(0);
public RedoLogOffset(Map<String, String> offset) {
this.offset = offset;
}
public RedoLogOffset(String readUTF, long scn) {
this.offset = new HashMap<>();
this.offset.put(REDO_LOG_SCN_OFFSET_KEY, String.valueOf(scn));
public RedoLogOffset(Long scn) {
this(scn, 0L, null);
}
public RedoLogOffset(long scn) {
this.offset = new HashMap<>();
this.offset.put(REDO_LOG_SCN_OFFSET_KEY, String.valueOf(scn));
public RedoLogOffset(Long scn, Long commitScn, @Nullable String lcrPosition) {
Map<String, String> offsetMap = new HashMap<>();
offsetMap.put(SCN_KEY, String.valueOf(scn));
offsetMap.put(COMMIT_SCN_KEY, String.valueOf(commitScn));
offsetMap.put(LCR_POSITION_KEY, lcrPosition);
this.offset = offsetMap;
}
public RedoLogOffset(String scn) {
this.offset = new HashMap<>();
this.offset.put(REDO_LOG_SCN_OFFSET_KEY, scn);
public String getScn() {
return offset.get(SCN_KEY);
}
public Map<String, String> getOffset() {
return offset;
public String getCommitScn() {
return offset.get(COMMIT_SCN_KEY);
}
public long longOffsetValue(Map<String, ?> values, String key) {
Object obj = values.get(key);
if (obj == null) {
return 0L;
}
if (obj instanceof Number) {
return ((Number) obj).longValue();
}
try {
return Long.parseLong(obj.toString());
} catch (NumberFormatException e) {
throw new ConnectException(
"Source offset '"
+ key
+ "' parameter value "
+ obj
+ " could not be converted to a long");
}
public String getLcrPosition() {
return offset.get(LCR_POSITION_KEY);
}
@Override
public int compareTo(Offset that) {
if (Objects.isNull(this.offset.get(REDO_LOG_SCN_OFFSET_KEY))
&& Objects.isNull(that.getOffset().get(REDO_LOG_SCN_OFFSET_KEY))) {
public int compareTo(Offset offset) {
RedoLogOffset that = (RedoLogOffset) offset;
// the NO_STOPPING_OFFSET is the max offset
if (NO_STOPPING_OFFSET.equals(that) && NO_STOPPING_OFFSET.equals(this)) {
return 0;
}
if (Objects.isNull(this.offset.get(REDO_LOG_SCN_OFFSET_KEY))
&& !Objects.isNull(that.getOffset().get(REDO_LOG_SCN_OFFSET_KEY))) {
if (NO_STOPPING_OFFSET.equals(this)) {
return 1;
}
if (NO_STOPPING_OFFSET.equals(that)) {
return -1;
}
if (!Objects.isNull(this.offset.get(REDO_LOG_SCN_OFFSET_KEY))
&& Objects.isNull(that.getOffset().get(REDO_LOG_SCN_OFFSET_KEY))) {
String scnStr = this.getScn();
String targetScnStr = that.getScn();
if (StringUtils.isNotEmpty(targetScnStr)) {
if (StringUtils.isNotEmpty(scnStr)) {
Scn scn = Scn.valueOf(scnStr);
Scn targetScn = Scn.valueOf(targetScnStr);
return scn.compareTo(targetScn);
}
return -1;
} else if (StringUtils.isNotEmpty(scnStr)) {
return 1;
}
Long thisScn = Long.parseLong(this.offset.get(REDO_LOG_SCN_OFFSET_KEY));
Long thatScn = Long.parseLong(that.getOffset().get(REDO_LOG_SCN_OFFSET_KEY));
return thisScn.compareTo(thatScn);
}
public boolean isAtOrBefore(RedoLogOffset that) {
return this.compareTo(that) <= 0;
}
public boolean isBefore(RedoLogOffset that) {
return this.compareTo(that) < 0;
}
public boolean isAtOrAfter(RedoLogOffset that) {
return this.compareTo(that) >= 0;
}
public boolean isAfter(RedoLogOffset that) {
return this.compareTo(that) > 0;
return 0;
}
@Override
public String toString() {
return offset.toString();
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof RedoLogOffset)) {
return false;
}
RedoLogOffset that = (RedoLogOffset) o;
return offset.equals(that.offset);
}
}

@ -37,7 +37,7 @@ public class RedoLogOffsetFactory extends OffsetFactory {
@Override
public Offset newOffset(String filename, Long position) {
return new RedoLogOffset(filename, position);
throw new FlinkRuntimeException("not supported create new Offset by filename and position.");
}
@Override

@ -1,197 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.meta.split;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.utils.SerializerUtils;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** An Oracle serializer for the {@link SourceSplitBase}. */
public class OracleSourceSplitSerializer extends SourceSplitSerializer {
private static final int SNAPSHOT_SPLIT_FLAG = 1;
private static final int REDOLOG_SPLIT_FLAG = 2;
RedoLogOffsetFactory offsetFactory;
public OracleSourceSplitSerializer(RedoLogOffsetFactory offsetFactory) {
this.offsetFactory = offsetFactory;
}
@Override
public OffsetFactory getOffsetFactory() {
return offsetFactory;
}
@Override
public Offset readOffsetPosition(int offsetVersion, DataInputDeserializer in)
throws IOException {
return super.readOffsetPosition(offsetVersion, in);
}
@Override
public Offset readOffsetPosition(DataInputDeserializer in) throws IOException {
return super.readOffsetPosition(in);
}
@Override
public void writeOffsetPosition(Offset offset, DataOutputSerializer out) throws IOException {
super.writeOffsetPosition(offset, out);
}
@Override
public OffsetDeserializer createOffsetDeserializer() {
return super.createOffsetDeserializer();
}
@Override
public FinishedSnapshotSplitInfo deserialize(byte[] serialized) {
return super.deserialize(serialized);
}
@Override
public byte[] serialize(SourceSplitBase split) throws IOException {
return super.serialize(split);
}
@Override
public SourceSplitBase deserialize(int version, byte[] serialized) throws IOException {
return super.deserialize(version, serialized);
}
@Override
public SourceSplitBase deserializeSplit(int version, byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
int splitKind = in.readInt();
if (splitKind == SNAPSHOT_SPLIT_FLAG) {
TableId tableId = TableId.parse(in.readUTF(), false);
String splitId = in.readUTF();
RowType splitKeyType = (RowType) LogicalTypeParser.parse(in.readUTF());
Object[] splitBoundaryStart = SerializerUtils.serializedStringToRow(in.readUTF());
Object[] splitBoundaryEnd = SerializerUtils.serializedStringToRow(in.readUTF());
Offset highWatermark = readOffsetPosition(version, in);
Map<TableId, TableChanges.TableChange> tableSchemas = readTableSchemas(version, in);
return new SnapshotSplit(
tableId,
splitId,
splitKeyType,
splitBoundaryStart,
splitBoundaryEnd,
highWatermark,
tableSchemas);
} else if (splitKind == REDOLOG_SPLIT_FLAG) {
String splitId = in.readUTF();
// skip split Key Type
in.readUTF();
Offset startingOffset = readOffsetPosition(version, in);
Offset endingOffset = readOffsetPosition(version, in);
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
readFinishedSplitsInfo(version, in);
Map<TableId, TableChanges.TableChange> tableChangeMap = readTableSchemas(version, in);
int totalFinishedSplitSize = finishedSplitsInfo.size();
if (version == 3) {
totalFinishedSplitSize = in.readInt();
}
in.releaseArrays();
return new StreamSplit(
splitId,
startingOffset,
endingOffset,
finishedSplitsInfo,
tableChangeMap,
totalFinishedSplitSize);
} else {
throw new IOException("Unknown split kind: " + splitKind);
}
}
private List<FinishedSnapshotSplitInfo> readFinishedSplitsInfo(
int version, DataInputDeserializer in) throws IOException {
List<FinishedSnapshotSplitInfo> finishedSplitsInfo = new ArrayList<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
TableId tableId = TableId.parse(in.readUTF(), false);
String splitId = in.readUTF();
Object[] splitStart = SerializerUtils.serializedStringToRow(in.readUTF());
Object[] splitEnd = SerializerUtils.serializedStringToRow(in.readUTF());
OffsetFactory offsetFactory =
(OffsetFactory) SerializerUtils.serializedStringToObject(in.readUTF());
Offset highWatermark = readOffsetPosition(version, in);
finishedSplitsInfo.add(
new FinishedSnapshotSplitInfo(
tableId, splitId, splitStart, splitEnd, highWatermark, offsetFactory));
}
return finishedSplitsInfo;
}
private static Map<TableId, TableChanges.TableChange> readTableSchemas(
int version, DataInputDeserializer in) throws IOException {
DocumentReader documentReader = DocumentReader.defaultReader();
Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
TableId tableId = TableId.parse(in.readUTF(), false);
final String tableChangeStr;
switch (version) {
case 1:
tableChangeStr = in.readUTF();
break;
case 2:
case 3:
final int len = in.readInt();
final byte[] bytes = new byte[len];
in.read(bytes);
tableChangeStr = new String(bytes, StandardCharsets.UTF_8);
break;
default:
throw new IOException("Unknown version: " + version);
}
Document document = documentReader.read(tableChangeStr);
TableChanges.TableChange tableChange =
FlinkJsonTableChangeSerializer.fromDocument(document, false);
tableSchemas.put(tableId, tableChange);
}
return tableSchemas;
}
}
/// *
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
// package com.ververica.cdc.connectors.oracle.source.meta.split;
//
// import org.apache.flink.core.memory.DataInputDeserializer;
// import org.apache.flink.core.memory.DataOutputSerializer;
// import org.apache.flink.table.types.logical.RowType;
// import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
//
// import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
// import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
// import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
// import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
// import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
// import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
// import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
// import com.ververica.cdc.connectors.base.utils.SerializerUtils;
// import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
// import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
// import io.debezium.document.Document;
// import io.debezium.document.DocumentReader;
// import io.debezium.relational.TableId;
// import io.debezium.relational.history.TableChanges;
//
// import java.io.IOException;
// import java.nio.charset.StandardCharsets;
// import java.util.ArrayList;
// import java.util.HashMap;
// import java.util.List;
// import java.util.Map;
//
/// ** An Oracle serializer for the {@link SourceSplitBase}. */
// public class OracleSourceSplitSerializer extends SourceSplitSerializer {
//
// private static final int SNAPSHOT_SPLIT_FLAG = 1;
// private static final int REDOLOG_SPLIT_FLAG = 2;
// RedoLogOffsetFactory offsetFactory;
//
// public OracleSourceSplitSerializer(RedoLogOffsetFactory offsetFactory) {
// this.offsetFactory = offsetFactory;
// }
//
// @Override
// public OffsetFactory getOffsetFactory() {
// return offsetFactory;
// }
//
// @Override
// public Offset readOffsetPosition(int offsetVersion, DataInputDeserializer in)
// throws IOException {
// return super.readOffsetPosition(offsetVersion, in);
// }
//
// @Override
// public Offset readOffsetPosition(DataInputDeserializer in) throws IOException {
// return super.readOffsetPosition(in);
// }
//
// @Override
// public void writeOffsetPosition(Offset offset, DataOutputSerializer out) throws IOException {
// super.writeOffsetPosition(offset, out);
// }
//
// @Override
// public OffsetDeserializer createOffsetDeserializer() {
// return super.createOffsetDeserializer();
// }
//
// @Override
// public FinishedSnapshotSplitInfo deserialize(byte[] serialized) {
// return super.deserialize(serialized);
// }
//
// @Override
// public byte[] serialize(SourceSplitBase split) throws IOException {
// return super.serialize(split);
// }
//
// @Override
// public SourceSplitBase deserialize(int version, byte[] serialized) throws IOException {
// return super.deserialize(version, serialized);
// }
//
// @Override
// public SourceSplitBase deserializeSplit(int version, byte[] serialized) throws IOException {
// final DataInputDeserializer in = new DataInputDeserializer(serialized);
//
// int splitKind = in.readInt();
// if (splitKind == SNAPSHOT_SPLIT_FLAG) {
// TableId tableId = TableId.parse(in.readUTF(), false);
// String splitId = in.readUTF();
// RowType splitKeyType = (RowType) LogicalTypeParser.parse(in.readUTF());
// Object[] splitBoundaryStart = SerializerUtils.serializedStringToRow(in.readUTF());
// Object[] splitBoundaryEnd = SerializerUtils.serializedStringToRow(in.readUTF());
// Offset highWatermark = readOffsetPosition(version, in);
// Map<TableId, TableChanges.TableChange> tableSchemas = readTableSchemas(version, in);
//
// return new SnapshotSplit(
// tableId,
// splitId,
// splitKeyType,
// splitBoundaryStart,
// splitBoundaryEnd,
// highWatermark,
// tableSchemas);
// } else if (splitKind == REDOLOG_SPLIT_FLAG) {
// String splitId = in.readUTF();
// // skip split Key Type
// in.readUTF();
// Offset startingOffset = readOffsetPosition(version, in);
// Offset endingOffset = readOffsetPosition(version, in);
// List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
// readFinishedSplitsInfo(version, in);
// Map<TableId, TableChanges.TableChange> tableChangeMap = readTableSchemas(version, in);
// int totalFinishedSplitSize = finishedSplitsInfo.size();
// if (version == 3) {
// totalFinishedSplitSize = in.readInt();
// }
// in.releaseArrays();
// return new StreamSplit(
// splitId,
// startingOffset,
// endingOffset,
// finishedSplitsInfo,
// tableChangeMap,
// totalFinishedSplitSize);
// } else {
// throw new IOException("Unknown split kind: " + splitKind);
// }
// }
//
// private List<FinishedSnapshotSplitInfo> readFinishedSplitsInfo(
// int version, DataInputDeserializer in) throws IOException {
// List<FinishedSnapshotSplitInfo> finishedSplitsInfo = new ArrayList<>();
// final int size = in.readInt();
// for (int i = 0; i < size; i++) {
// TableId tableId = TableId.parse(in.readUTF(), false);
// String splitId = in.readUTF();
// Object[] splitStart = SerializerUtils.serializedStringToRow(in.readUTF());
// Object[] splitEnd = SerializerUtils.serializedStringToRow(in.readUTF());
// OffsetFactory offsetFactory =
// (OffsetFactory) SerializerUtils.serializedStringToObject(in.readUTF());
// Offset highWatermark = readOffsetPosition(version, in);
//
// finishedSplitsInfo.add(
// new FinishedSnapshotSplitInfo(
// tableId, splitId, splitStart, splitEnd, highWatermark,
// offsetFactory));
// }
// return finishedSplitsInfo;
// }
//
// private static Map<TableId, TableChanges.TableChange> readTableSchemas(
// int version, DataInputDeserializer in) throws IOException {
// DocumentReader documentReader = DocumentReader.defaultReader();
// Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
// final int size = in.readInt();
// for (int i = 0; i < size; i++) {
// TableId tableId = TableId.parse(in.readUTF(), false);
// final String tableChangeStr;
// switch (version) {
// case 1:
// tableChangeStr = in.readUTF();
// break;
// case 2:
// case 3:
// final int len = in.readInt();
// final byte[] bytes = new byte[len];
// in.read(bytes);
// tableChangeStr = new String(bytes, StandardCharsets.UTF_8);
// break;
// default:
// throw new IOException("Unknown version: " + version);
// }
// Document document = documentReader.read(tableChangeStr);
// TableChanges.TableChange tableChange =
// FlinkJsonTableChangeSerializer.fromDocument(document, false);
// tableSchemas.put(tableId, tableChange);
// }
// return tableSchemas;
// }
// }

@ -25,6 +25,7 @@ import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.Scn;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalTableFilters;
@ -69,7 +70,7 @@ public class OracleConnectionUtils {
rs -> {
if (rs.next()) {
final String scn = rs.getString(1);
return new RedoLogOffset(scn);
return new RedoLogOffset(Scn.valueOf(scn).longValue());
} else {
throw new FlinkRuntimeException(
"Cannot read the scn via '"

@ -1,123 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.utils;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetSerializer;
import io.debezium.DebeziumException;
import io.debezium.util.HexConverter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/** Utils for serialization and deserialization. */
public class SerializerUtils {
private SerializerUtils() {}
public static void writeRedoLogPosition(RedoLogOffset offset, DataOutputSerializer out)
throws IOException {
out.writeBoolean(offset != null);
if (offset != null) {
byte[] redoLogOffsetBytes = RedoLogOffsetSerializer.INSTANCE.serialize(offset);
out.writeInt(redoLogOffsetBytes.length);
out.write(redoLogOffsetBytes);
}
}
public static RedoLogOffset readRedoLogPosition(int offsetVersion, DataInputDeserializer in)
throws IOException {
switch (offsetVersion) {
case 1:
return in.readBoolean() ? new RedoLogOffset(in.readUTF(), in.readLong()) : null;
case 2:
case 3:
case 4:
return readRedoLogPosition(in);
default:
throw new IOException("Unknown version: " + offsetVersion);
}
}
public static RedoLogOffset readRedoLogPosition(DataInputDeserializer in) throws IOException {
boolean offsetNonNull = in.readBoolean();
if (offsetNonNull) {
int redoLogOffsetBytesLength = in.readInt();
byte[] redoLogOffsetBytes = new byte[redoLogOffsetBytesLength];
in.readFully(redoLogOffsetBytes);
return RedoLogOffsetSerializer.INSTANCE.deserialize(redoLogOffsetBytes);
} else {
return null;
}
}
public static String rowToSerializedString(Object[] splitBoundary) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(splitBoundary);
return HexConverter.convertToHexString(bos.toByteArray());
} catch (IOException e) {
throw new DebeziumException(
String.format("Cannot serialize split boundary information %s", splitBoundary));
}
}
public static String rowToSerializedString(Object splitBoundary) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(splitBoundary);
return HexConverter.convertToHexString(bos.toByteArray());
} catch (IOException e) {
throw new DebeziumException(
String.format("Cannot serialize split boundary information %s", splitBoundary));
}
}
public static Object[] serializedStringToRow(String serialized) {
try (final ByteArrayInputStream bis =
new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
ObjectInputStream ois = new ObjectInputStream(bis)) {
return (Object[]) ois.readObject();
} catch (Exception e) {
throw new DebeziumException(
String.format(
"Failed to deserialize split boundary with value '%s'", serialized),
e);
}
}
public static Object serializedStringToObject(String serialized) {
try (final ByteArrayInputStream bis =
new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
ObjectInputStream ois = new ObjectInputStream(bis)) {
return ois.readObject();
} catch (Exception e) {
throw new DebeziumException(
String.format(
"Failed to deserialize split boundary with value '%s'", serialized),
e);
}
}
}
/// *
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
// package com.ververica.cdc.connectors.oracle.source.utils;
//
// import org.apache.flink.core.memory.DataInputDeserializer;
// import org.apache.flink.core.memory.DataOutputSerializer;
//
// import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
// import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetSerializer;
// import io.debezium.DebeziumException;
// import io.debezium.util.HexConverter;
//
// import java.io.ByteArrayInputStream;
// import java.io.ByteArrayOutputStream;
// import java.io.IOException;
// import java.io.ObjectInputStream;
// import java.io.ObjectOutputStream;
//
/// ** Utils for serialization and deserialization. */
// public class SerializerUtils {
//
// private SerializerUtils() {}
//
// public static void writeRedoLogPosition(RedoLogOffset offset, DataOutputSerializer out)
// throws IOException {
// out.writeBoolean(offset != null);
// if (offset != null) {
// byte[] redoLogOffsetBytes = RedoLogOffsetSerializer.INSTANCE.serialize(offset);
// out.writeInt(redoLogOffsetBytes.length);
// out.write(redoLogOffsetBytes);
// }
// }
//
// public static RedoLogOffset readRedoLogPosition(int offsetVersion, DataInputDeserializer in)
// throws IOException {
// switch (offsetVersion) {
// case 1:
//// return in.readBoolean() ? new RedoLogOffset(in.readUTF(), in.readLong()) : null;
// case 2:
// case 3:
// case 4:
// return readRedoLogPosition(in);
// default:
// throw new IOException("Unknown version: " + offsetVersion);
// }
// }
//
// public static RedoLogOffset readRedoLogPosition(DataInputDeserializer in) throws IOException {
// boolean offsetNonNull = in.readBoolean();
// if (offsetNonNull) {
// int redoLogOffsetBytesLength = in.readInt();
// byte[] redoLogOffsetBytes = new byte[redoLogOffsetBytesLength];
// in.readFully(redoLogOffsetBytes);
// return RedoLogOffsetSerializer.INSTANCE.deserialize(redoLogOffsetBytes);
// } else {
// return null;
// }
// }
//
// public static String rowToSerializedString(Object[] splitBoundary) {
// try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
// ObjectOutputStream oos = new ObjectOutputStream(bos)) {
// oos.writeObject(splitBoundary);
// return HexConverter.convertToHexString(bos.toByteArray());
// } catch (IOException e) {
// throw new DebeziumException(
// String.format("Cannot serialize split boundary information %s",
// splitBoundary));
// }
// }
//
// public static String rowToSerializedString(Object splitBoundary) {
// try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
// ObjectOutputStream oos = new ObjectOutputStream(bos)) {
// oos.writeObject(splitBoundary);
// return HexConverter.convertToHexString(bos.toByteArray());
// } catch (IOException e) {
// throw new DebeziumException(
// String.format("Cannot serialize split boundary information %s",
// splitBoundary));
// }
// }
//
// public static Object[] serializedStringToRow(String serialized) {
// try (final ByteArrayInputStream bis =
// new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
// ObjectInputStream ois = new ObjectInputStream(bis)) {
// return (Object[]) ois.readObject();
// } catch (Exception e) {
// throw new DebeziumException(
// String.format(
// "Failed to deserialize split boundary with value '%s'", serialized),
// e);
// }
// }
//
// public static Object serializedStringToObject(String serialized) {
// try (final ByteArrayInputStream bis =
// new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
// ObjectInputStream ois = new ObjectInputStream(bis)) {
// return ois.readObject();
// } catch (Exception e) {
// throw new DebeziumException(
// String.format(
// "Failed to deserialize split boundary with value '%s'", serialized),
// e);
// }
// }
// }

@ -167,6 +167,7 @@ public class OracleSourceTest extends AbstractTestBase {
}
@Test
@Ignore("It can be open until DBZ-5245 and DBZ-4936 fix")
public void testCheckpointAndRestore() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();

Loading…
Cancel
Save