[FLINK-36858][pipeline-connector][kafka] Fix JsonRowDataSerializationSchema compatibility issue with Flink 1.20

This closes  #3784

Co-authored-by: Leonard Xu <xbjtdcq@gmail.com>
pull/3858/head
MOBIN 3 weeks ago committed by GitHub
parent 75b8a0cdf3
commit a130718c96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -115,6 +115,19 @@ limitations under the License.
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>

@ -22,6 +22,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema; import org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema;
import org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema; import org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema;
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonFormatOptions;
@ -57,6 +58,9 @@ public class ChangeLogJsonFormatFactory {
final boolean encodeDecimalAsPlainNumber = final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
final boolean ignoreNullFields =
JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields(formatOptions);
switch (type) { switch (type) {
case DEBEZIUM_JSON: case DEBEZIUM_JSON:
{ {
@ -65,7 +69,8 @@ public class ChangeLogJsonFormatFactory {
mapNullKeyMode, mapNullKeyMode,
mapNullKeyLiteral, mapNullKeyLiteral,
zoneId, zoneId,
encodeDecimalAsPlainNumber); encodeDecimalAsPlainNumber,
ignoreNullFields);
} }
case CANAL_JSON: case CANAL_JSON:
{ {
@ -74,7 +79,8 @@ public class ChangeLogJsonFormatFactory {
mapNullKeyMode, mapNullKeyMode,
mapNullKeyLiteral, mapNullKeyLiteral,
zoneId, zoneId,
encodeDecimalAsPlainNumber); encodeDecimalAsPlainNumber,
ignoreNullFields);
} }
default: default:
{ {

@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils; import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo; import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema; import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@ -73,6 +74,8 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event>
private final boolean encodeDecimalAsPlainNumber; private final boolean encodeDecimalAsPlainNumber;
private final boolean ignoreNullFields;
private final ZoneId zoneId; private final ZoneId zoneId;
private InitializationContext context; private InitializationContext context;
@ -82,13 +85,15 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event>
JsonFormatOptions.MapNullKeyMode mapNullKeyMode, JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral, String mapNullKeyLiteral,
ZoneId zoneId, ZoneId zoneId,
boolean encodeDecimalAsPlainNumber) { boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
this.timestampFormat = timestampFormat; this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral; this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = zoneId; this.zoneId = zoneId;
jsonSerializers = new HashMap<>(); jsonSerializers = new HashMap<>();
this.ignoreNullFields = ignoreNullFields;
} }
@Override @Override
@ -114,12 +119,13 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event>
LogicalType rowType = LogicalType rowType =
DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType(); DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType();
JsonRowDataSerializationSchema jsonSerializer = JsonRowDataSerializationSchema jsonSerializer =
new JsonRowDataSerializationSchema( JsonRowDataSerializationSchemaUtils.createSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)), createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat, timestampFormat,
mapNullKeyMode, mapNullKeyMode,
mapNullKeyLiteral, mapNullKeyLiteral,
encodeDecimalAsPlainNumber); encodeDecimalAsPlainNumber,
ignoreNullFields);
try { try {
jsonSerializer.open(context); jsonSerializer.open(context);
} catch (Exception e) { } catch (Exception e) {

@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils; import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo; import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema; import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@ -72,6 +73,8 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even
private final boolean encodeDecimalAsPlainNumber; private final boolean encodeDecimalAsPlainNumber;
private final boolean ignoreNullFields;
private final ZoneId zoneId; private final ZoneId zoneId;
private InitializationContext context; private InitializationContext context;
@ -81,13 +84,15 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even
JsonFormatOptions.MapNullKeyMode mapNullKeyMode, JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral, String mapNullKeyLiteral,
ZoneId zoneId, ZoneId zoneId,
boolean encodeDecimalAsPlainNumber) { boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
this.timestampFormat = timestampFormat; this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral; this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = zoneId; this.zoneId = zoneId;
jsonSerializers = new HashMap<>(); jsonSerializers = new HashMap<>();
this.ignoreNullFields = ignoreNullFields;
} }
@Override @Override
@ -113,12 +118,13 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even
LogicalType rowType = LogicalType rowType =
DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType(); DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType();
JsonRowDataSerializationSchema jsonSerializer = JsonRowDataSerializationSchema jsonSerializer =
new JsonRowDataSerializationSchema( JsonRowDataSerializationSchemaUtils.createSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)), createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat, timestampFormat,
mapNullKeyMode, mapNullKeyMode,
mapNullKeyLiteral, mapNullKeyLiteral,
encodeDecimalAsPlainNumber); encodeDecimalAsPlainNumber,
ignoreNullFields);
try { try {
jsonSerializer.open(context); jsonSerializer.open(context);
} catch (Exception e) { } catch (Exception e) {

@ -33,6 +33,7 @@ import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils; import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo; import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema; import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@ -61,6 +62,8 @@ public class JsonSerializationSchema implements SerializationSchema<Event> {
private final boolean encodeDecimalAsPlainNumber; private final boolean encodeDecimalAsPlainNumber;
private final boolean ignoreNullFields;
private final ZoneId zoneId; private final ZoneId zoneId;
private InitializationContext context; private InitializationContext context;
@ -70,13 +73,15 @@ public class JsonSerializationSchema implements SerializationSchema<Event> {
JsonFormatOptions.MapNullKeyMode mapNullKeyMode, JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral, String mapNullKeyLiteral,
ZoneId zoneId, ZoneId zoneId,
boolean encodeDecimalAsPlainNumber) { boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
this.timestampFormat = timestampFormat; this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral; this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = zoneId; this.zoneId = zoneId;
jsonSerializers = new HashMap<>(); jsonSerializers = new HashMap<>();
this.ignoreNullFields = ignoreNullFields;
} }
@Override @Override
@ -131,11 +136,12 @@ public class JsonSerializationSchema implements SerializationSchema<Event> {
// the row should never be null // the row should never be null
DataType dataType = DataTypes.ROW(fields).notNull(); DataType dataType = DataTypes.ROW(fields).notNull();
LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType(); LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
return new JsonRowDataSerializationSchema( return JsonRowDataSerializationSchemaUtils.createSerializationSchema(
(RowType) rowType, (RowType) rowType,
timestampFormat, timestampFormat,
mapNullKeyMode, mapNullKeyMode,
mapNullKeyLiteral, mapNullKeyLiteral,
encodeDecimalAsPlainNumber); encodeDecimalAsPlainNumber,
ignoreNullFields);
} }
} }

@ -21,6 +21,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.connectors.kafka.serialization.CsvSerializationSchema; import org.apache.flink.cdc.connectors.kafka.serialization.CsvSerializationSchema;
import org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema; import org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema;
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonFormatOptions;
@ -54,12 +55,16 @@ public class KeySerializationFactory {
final boolean encodeDecimalAsPlainNumber = final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
final boolean ignoreNullFields =
JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields(
formatOptions);
return new JsonSerializationSchema( return new JsonSerializationSchema(
timestampFormat, timestampFormat,
mapNullKeyMode, mapNullKeyMode,
mapNullKeyLiteral, mapNullKeyLiteral,
zoneId, zoneId,
encodeDecimalAsPlainNumber); encodeDecimalAsPlainNumber,
ignoreNullFields);
} }
case CSV: case CSV:
{ {

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.kafka.utils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
import org.apache.flink.table.types.logical.RowType;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.Arrays;
/**
* Utils for creating JsonRowDataSerializationSchema.TODO: Remove this class after bump to Flink
* 1.20 or higher.
*/
public class JsonRowDataSerializationSchemaUtils {
/**
* In flink>=1.20, the constructor of JsonRowDataSerializationSchema has 6 parameters, and in
* flink<1.20, the constructor of JsonRowDataSerializationSchema has 5 parameters.
*/
public static JsonRowDataSerializationSchema createSerializationSchema(
RowType rowType,
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
try {
Class<?>[] fullParams =
new Class[] {
RowType.class,
TimestampFormat.class,
JsonFormatOptions.MapNullKeyMode.class,
String.class,
boolean.class,
boolean.class
};
Object[] fullParamValues =
new Object[] {
rowType,
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber,
ignoreNullFields
};
for (int i = fullParams.length; i >= 5; i--) {
try {
Constructor<?> constructor =
JsonRowDataSerializationSchema.class.getConstructor(
Arrays.copyOfRange(fullParams, 0, i));
return (JsonRowDataSerializationSchema)
constructor.newInstance(Arrays.copyOfRange(fullParamValues, 0, i));
} catch (NoSuchMethodException ignored) {
}
}
} catch (Exception e) {
throw new RuntimeException(
"Failed to create JsonRowDataSerializationSchema,please check your Flink version is 1.19 or 1.20.",
e);
}
throw new RuntimeException(
"Failed to find appropriate constructor for JsonRowDataSerializationSchema,please check your Flink version is 1.19 or 1.20.");
}
/** flink>=1.20 only has the ENCODE_IGNORE_NULL_FIELDS parameter. */
public static boolean enableIgnoreNullFields(ReadableConfig formatOptions) {
try {
Field field = JsonFormatOptions.class.getField("ENCODE_IGNORE_NULL_FIELDS");
ConfigOption<Boolean> encodeOption = (ConfigOption<Boolean>) field.get(null);
return formatOptions.get(encodeOption);
} catch (NoSuchFieldException | IllegalAccessException e) {
return false;
}
}
}

@ -108,6 +108,13 @@ limitations under the License.
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.paimon</groupId> <groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-${flink.major.version}</artifactId> <artifactId>paimon-flink-${flink.major.version}</artifactId>
@ -161,6 +168,13 @@ limitations under the License.
<version>${scala.version}</version> <version>${scala.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -280,6 +294,16 @@ limitations under the License.
</outputDirectory> </outputDirectory>
</artifactItem> </artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-kafka</artifactId>
<version>${project.version}</version>
<destFileName>kafka-cdc-pipeline-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem> <artifactItem>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-oceanbase</artifactId> <artifactId>flink-cdc-pipeline-connector-oceanbase</artifactId>

@ -0,0 +1,388 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.pipeline.tests;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.kafka.sink.KafkaUtil;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.DockerImageVersions.KAFKA;
import static org.assertj.core.api.Assertions.assertThat;
/** End-to-end tests for mysql cdc to Kafka pipeline job. */
@RunWith(Parameterized.class)
public class MysqlToKafkaE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MysqlToKafkaE2eITCase.class);
// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the data source for easier verifying)
// ------------------------------------------------------------------------------------------
protected static final String MYSQL_TEST_USER = "mysqluser";
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
protected static final long EVENT_WAITING_TIMEOUT = 60000L;
private static AdminClient admin;
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final int ZK_TIMEOUT_MILLIS = 30000;
private static final short TOPIC_REPLICATION_FACTOR = 1;
private TableId table;
private String topic;
private KafkaConsumer<byte[], byte[]> consumer;
@ClassRule
public static final MySqlContainer MYSQL =
(MySqlContainer)
new MySqlContainer(
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
.withConfigurationOverride("docker/mysql/my.cnf")
.withSetupSQL("docker/mysql/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ClassRule
public static final KafkaContainer KAFKA_CONTAINER =
KafkaUtil.createKafkaContainer(KAFKA, LOG)
.withNetworkAliases("kafka")
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
protected final UniqueDatabase mysqlInventoryDatabase =
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
@BeforeClass
public static void initializeContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(MYSQL)).join();
Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
Map<String, Object> properties = new HashMap<>();
properties.put(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
admin = AdminClient.create(properties);
LOG.info("Containers are started.");
}
@Before
public void before() throws Exception {
super.before();
createTestTopic(1, TOPIC_REPLICATION_FACTOR);
Properties properties = getKafkaClientConfiguration();
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
mysqlInventoryDatabase.createAndInitialize();
}
@After
public void after() {
super.after();
admin.deleteTopics(Collections.singletonList(topic));
consumer.close();
mysqlInventoryDatabase.dropDatabase();
}
@Test
public void testSyncWholeDatabaseWithDebeziumJson() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: kafka\n"
+ " properties.bootstrap.servers: kafka:9092\n"
+ " topic: %s\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName(),
topic,
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path kafkaCdcJar = TestUtils.getResource("kafka-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
List<ConsumerRecord<byte[], byte[]>> collectedRecords = new ArrayList<>();
int expectedEventCount = 13;
waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
List<String> expectedRecords =
getExpectedRecords("expectedEvents/mysqlToKafka/debezium-json.txt");
assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords));
LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
// modify table schema
stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
stat.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110
stat.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111
stat.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
stat.execute("DELETE FROM products WHERE id=111;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
expectedEventCount = 20;
waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
assertThat(expectedRecords)
.containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
}
@Test
public void testSyncWholeDatabaseWithCanalJson() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: kafka\n"
+ " properties.bootstrap.servers: kafka:9092\n"
+ " topic: %s\n"
+ " value.format: canal-json\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName(),
topic,
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path kafkaCdcJar = TestUtils.getResource("kafka-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
List<ConsumerRecord<byte[], byte[]>> collectedRecords = new ArrayList<>();
int expectedEventCount = 13;
waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
List<String> expectedRecords =
getExpectedRecords("expectedEvents/mysqlToKafka/canal-json.txt");
assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords));
LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
// modify table schema
stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
stat.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110
stat.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111
stat.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
stat.execute("DELETE FROM products WHERE id=111;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
expectedEventCount = 20;
waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
assertThat(expectedRecords)
.containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
}
private void waitUntilSpecificEventCount(
List<ConsumerRecord<byte[], byte[]>> actualEvent, int expectedCount) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + MysqlToKafkaE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(actualEvent::add);
if (actualEvent.size() == expectedCount) {
result = true;
break;
}
Thread.sleep(1000);
}
if (!result) {
throw new TimeoutException(
"failed to get specific event count: "
+ expectedCount
+ " from stdout: "
+ actualEvent.size());
}
}
private static Properties getKafkaClientConfiguration() {
final Properties standardProps = new Properties();
standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
standardProps.put("group.id", UUID.randomUUID().toString());
standardProps.put("enable.auto.commit", false);
standardProps.put("auto.offset.reset", "earliest");
standardProps.put("max.partition.fetch.bytes", 256);
standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS);
standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS);
standardProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
standardProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
return standardProps;
}
private void createTestTopic(int numPartitions, short replicationFactor)
throws ExecutionException, InterruptedException {
table =
TableId.tableId(
"default_namespace", "default_schema", UUID.randomUUID().toString());
topic = table.toString();
final CreateTopicsResult result =
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, replicationFactor)));
result.all().get();
}
private List<String> deserializeValues(List<ConsumerRecord<byte[], byte[]>> records)
throws IOException {
List<String> result = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : records) {
result.add(new String(record.value(), "UTF-8"));
}
return result;
}
protected List<String> getExpectedRecords(String resourceDirFormat) throws Exception {
URL url =
MysqlToKafkaE2eITCase.class
.getClassLoader()
.getResource(String.format(resourceDirFormat));
return Files.readAllLines(Paths.get(url.toURI())).stream()
.filter(this::isValidJsonRecord)
.map(
line ->
line.replace(
"$databaseName", mysqlInventoryDatabase.getDatabaseName()))
.collect(Collectors.toList());
}
protected boolean isValidJsonRecord(String line) {
try {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.readTree(line);
return !StringUtils.isEmpty(line);
} catch (JsonProcessingException e) {
return false;
}
}
}

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{"old":null,"data":[{"id":103,"name":"user_3","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]}
{"old":null,"data":[{"id":104,"name":"user_4","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]}
{"old":null,"data":[{"id":102,"name":"user_2","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]}
{"old":null,"data":[{"id":101,"name":"user_1","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]}
{"old":null,"data":[{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"enum_c":"red","json_c":"{\"key3\": \"value3\"}","point_c":"{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"enum_c":"red","json_c":"{\"k1\": \"v1\", \"k2\": \"v2\"}","point_c":"{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14,"enum_c":"red","json_c":"{\"key1\": \"value1\"}","point_c":"{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"enum_c":"white","json_c":"{\"key2\": \"value2\"}","point_c":"{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"enum_c":"white","json_c":"{\"key4\": \"value4\"}","point_c":"{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":[{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null}],"data":[{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1,"enum_c":null,"json_c":null,"point_c":null}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":[{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null}],"data":[{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"data":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":null,"data":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"DELETE","database":"$databaseName","table":"products","pkNames":["id"]}
{"old":[{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"data":[{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]}

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{"before":null,"after":{"id":102,"name":"user_2","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}
{"before":null,"after":{"id":101,"name":"user_1","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}
{"before":null,"after":{"id":103,"name":"user_3","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}
{"before":null,"after":{"id":104,"name":"user_4","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}
{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"enum_c":"red","json_c":"{\"k1\": \"v1\", \"k2\": \"v2\"}","point_c":"{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"enum_c":"white","json_c":"{\"key2\": \"value2\"}","point_c":"{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14,"enum_c":"red","json_c":"{\"key1\": \"value1\"}","point_c":"{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"enum_c":"white","json_c":"{\"key4\": \"value4\"}","point_c":"{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"enum_c":"red","json_c":"{\"key3\": \"value3\"}","point_c":"{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"op":"u","source":{"db":"$databaseName","table":"products"}}
{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1,"enum_c":null,"json_c":null,"point_c":null},"op":"u","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"c","source":{"db":"$databaseName","table":"products"}}
{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"u","source":{"db":"$databaseName","table":"products"}}
{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"u","source":{"db":"$databaseName","table":"products"}}
{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":null,"op":"d","source":{"db":"$databaseName","table":"products"}}
Loading…
Cancel
Save