|
|
|
@ -147,10 +147,11 @@ public class RecordUtils {
|
|
|
|
|
Envelope.Operation.forCode(
|
|
|
|
|
value.getString(Envelope.FieldName.OPERATION));
|
|
|
|
|
switch (operation) {
|
|
|
|
|
case CREATE:
|
|
|
|
|
case UPDATE:
|
|
|
|
|
Envelope envelope = Envelope.fromSchema(binlog.valueSchema());
|
|
|
|
|
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
|
|
|
|
|
Struct updateAfter = value.getStruct(Envelope.FieldName.AFTER);
|
|
|
|
|
Struct after = value.getStruct(Envelope.FieldName.AFTER);
|
|
|
|
|
Instant fetchTs =
|
|
|
|
|
Instant.ofEpochMilli(
|
|
|
|
|
(Long) source.get(Envelope.FieldName.TIMESTAMP));
|
|
|
|
@ -163,15 +164,12 @@ public class RecordUtils {
|
|
|
|
|
binlog.keySchema(),
|
|
|
|
|
binlog.key(),
|
|
|
|
|
binlog.valueSchema(),
|
|
|
|
|
envelope.read(updateAfter, source, fetchTs));
|
|
|
|
|
envelope.read(after, source, fetchTs));
|
|
|
|
|
snapshotRecords.put(key, record);
|
|
|
|
|
break;
|
|
|
|
|
case DELETE:
|
|
|
|
|
snapshotRecords.remove(key);
|
|
|
|
|
break;
|
|
|
|
|
case CREATE:
|
|
|
|
|
snapshotRecords.put(key, binlog);
|
|
|
|
|
break;
|
|
|
|
|
case READ:
|
|
|
|
|
throw new IllegalStateException(
|
|
|
|
|
String.format(
|
|
|
|
|