From 7455ad63ab29e4edee4314018e533ea06e188676 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sat, 12 Feb 2022 22:39:03 +0800 Subject: [PATCH] [mysql] Use READ type for all records in snapshot reading (#842) --- .../cdc/connectors/mysql/source/utils/RecordUtils.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java index b100b3b5c..97536a966 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -147,10 +147,11 @@ public class RecordUtils { Envelope.Operation.forCode( value.getString(Envelope.FieldName.OPERATION)); switch (operation) { + case CREATE: case UPDATE: Envelope envelope = Envelope.fromSchema(binlog.valueSchema()); Struct source = value.getStruct(Envelope.FieldName.SOURCE); - Struct updateAfter = value.getStruct(Envelope.FieldName.AFTER); + Struct after = value.getStruct(Envelope.FieldName.AFTER); Instant fetchTs = Instant.ofEpochMilli( (Long) source.get(Envelope.FieldName.TIMESTAMP)); @@ -163,15 +164,12 @@ public class RecordUtils { binlog.keySchema(), binlog.key(), binlog.valueSchema(), - envelope.read(updateAfter, source, fetchTs)); + envelope.read(after, source, fetchTs)); snapshotRecords.put(key, record); break; case DELETE: snapshotRecords.remove(key); break; - case CREATE: - snapshotRecords.put(key, binlog); - break; case READ: throw new IllegalStateException( String.format(