[FLINK-35251][cdc][runtime] Fix bug of serializing derivation mapping in SchemaDerivation

This closes  #3267.
pull/3271/head
Qingsheng Ren 11 months ago committed by GitHub
parent ec643c9dd7
commit 9cc3451ddf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -123,7 +123,7 @@ public class SchemaDerivation {
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
// Serialize derivation mapping in SchemaDerivation
Map<TableId, Set<TableId>> derivationMapping = schemaDerivation.getDerivationMapping();
out.write(derivationMapping.size());
out.writeInt(derivationMapping.size());
for (Map.Entry<TableId, Set<TableId>> entry : derivationMapping.entrySet()) {
// Routed table ID
TableId routedTableId = entry.getKey();

@ -36,11 +36,17 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@ -362,4 +368,26 @@ class SchemaDerivationTest {
.isInstanceOf(IllegalStateException.class)
.hasMessage("Incompatible types: \"INT\" and \"STRING\"");
}
@Test
void testSerde() throws Exception {
Map<TableId, Set<TableId>> derivationMapping = new HashMap<>();
Set<TableId> originalTableIds = new HashSet<>();
originalTableIds.add(TABLE_1);
originalTableIds.add(TABLE_2);
derivationMapping.put(MERGED_TABLE, originalTableIds);
SchemaDerivation schemaDerivation =
new SchemaDerivation(new SchemaManager(), ROUTES, derivationMapping);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
byte[] serialized = baos.toByteArray();
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
Map<TableId, Set<TableId>> deserialized =
SchemaDerivation.deserializerDerivationMapping(in);
assertThat(deserialized).isEqualTo(derivationMapping);
}
}
}
}

Loading…
Cancel
Save