diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index 60d81baa2..4bd22a0a1 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -401,7 +401,19 @@ During a snapshot operation, the connector will query each included table to pro
         Schema change events are applied to a "shadow" table and then swapped with the original table later.
         <br>
         This is an experimental feature, and subject to change in the future.
-      </td> 
+      </td>
+    </tr>
+    <tr>
+      <td>use.legacy.json.format</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
+          It determines whether to use the legacy JSON format when retrieving JSON type data in binlog. 
+          If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
+          JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
+          When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
+      </td>
     </tr>
     </tbody>
 </table>
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 9bcd2a617..b05e67921 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -320,6 +320,18 @@ pipeline:
       <td>Boolean</td>
       <td>Whether treat TINYINT(1) as boolean, by default is true.</td>
     </tr>
+    <tr>
+      <td>use.legacy.json.format</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
+          It determines whether to use the legacy JSON format when retrieving JSON type data in binlog. 
+          If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
+          JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
+          When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 48819c50c..3072b86a0 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -94,6 +94,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
 import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
 import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USE_LEGACY_JSON_FORMAT;
 import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
 import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
 import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -148,6 +149,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
         boolean scanBinlogNewlyAddedTableEnabled =
                 config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
         boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
+        boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
 
         validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
         validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -198,7 +200,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
                         .jdbcProperties(getJdbcProperties(configMap))
                         .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
                         .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
-                        .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean);
+                        .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
+                        .useLegacyJsonFormat(useLegacyJsonFormat);
 
         List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
 
@@ -331,6 +334,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
         options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
         options.add(METADATA_LIST);
         options.add(INCLUDE_COMMENTS_ENABLED);
+        options.add(USE_LEGACY_JSON_FORMAT);
         return options;
     }
 
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index 89878d0ea..b57ebe044 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -305,4 +305,12 @@ public class MySqlDataSourceOptions {
                     .booleanType()
                     .defaultValue(true)
                     .withDescription("Whether treat TINYINT(1) as boolean, by default is true. ");
+
+    @Experimental
+    public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
+            ConfigOptions.key("use.legacy.json.format")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
 }
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
index dd15fdf6a..0df85eaa3 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
@@ -106,11 +106,39 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
         testCommonDataTypes(fullTypesMySql57Database);
     }
 
+    @Test
+    public void testMysql57JsonDataTypes() throws Throwable {
+        // Set `useLegacyJsonFormat` as false, so the json string will have no whitespace
+        // before value and after comma in json format be formatted with legacy format.
+        testJsonDataType(fullTypesMySql57Database, false);
+    }
+
+    @Test
+    public void testMysql57JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
+        // Set `useLegacyJsonFormat` as true, so the json string will have whitespace before
+        // value and after comma in json format be formatted with legacy format.
+        testJsonDataType(fullTypesMySql57Database, true);
+    }
+
     @Test
     public void testMySql8CommonDataTypes() throws Throwable {
         testCommonDataTypes(fullTypesMySql8Database);
     }
 
+    @Test
+    public void testMySql8JsonDataTypes() throws Throwable {
+        // Set `useLegacyJsonFormat` as false, so the json string will have no whitespace
+        // before value and after comma in json format be formatted with legacy format.
+        testJsonDataType(fullTypesMySql8Database, false);
+    }
+
+    @Test
+    public void testMySql8JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
+        // Set `useLegacyJsonFormat` as true, so the json string will have whitespace before
+        // value and after comma in json format be formatted with legacy format.
+        testJsonDataType(fullTypesMySql8Database, true);
+    }
+
     @Test
     public void testMysql57TimeDataTypes() throws Throwable {
         RowType recordType =
@@ -321,9 +349,13 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
                 };
 
         database.createAndInitialize();
+        Boolean useLegacyJsonFormat = true;
         CloseableIterator<Event> iterator =
                 env.fromSource(
-                                getFlinkSourceProvider(new String[] {"precision_types"}, database)
+                                getFlinkSourceProvider(
+                                                new String[] {"precision_types"},
+                                                database,
+                                                useLegacyJsonFormat)
                                         .getSource(),
                                 WatermarkStrategy.noWatermarks(),
                                 "Event-Source")
@@ -351,9 +383,15 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
 
     private void testCommonDataTypes(UniqueDatabase database) throws Exception {
         database.createAndInitialize();
+        // Set useLegacyJsonFormat option as true, so the json string will have no whitespace before
+        // value and after comma in json format.be formatted with legacy format.
+        Boolean useLegacyJsonFormat = true;
         CloseableIterator<Event> iterator =
                 env.fromSource(
-                                getFlinkSourceProvider(new String[] {"common_types"}, database)
+                                getFlinkSourceProvider(
+                                                new String[] {"common_types"},
+                                                database,
+                                                useLegacyJsonFormat)
                                         .getSource(),
                                 WatermarkStrategy.noWatermarks(),
                                 "Event-Source")
@@ -446,7 +484,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
         }
 
         expectedSnapshot[30] = null;
-        // The json string from binlog will remove useless space
+        // Legacy format removes useless space in json string from binlog
         expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
         Object[] expectedStreamRecord = expectedSnapshot;
 
@@ -457,6 +495,66 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
                 .isEqualTo(expectedStreamRecord);
     }
 
+    private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat)
+            throws Exception {
+        database.createAndInitialize();
+        CloseableIterator<Event> iterator =
+                env.fromSource(
+                                getFlinkSourceProvider(
+                                                new String[] {"json_types"},
+                                                database,
+                                                useLegacyJsonFormat)
+                                        .getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                "Event-Source")
+                        .executeAndCollect();
+
+        Object[] expectedSnapshot =
+                new Object[] {
+                    DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+                    BinaryStringData.fromString("{\"key1\": \"value1\"}"),
+                    BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"),
+                    BinaryStringData.fromString(
+                            "[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
+                    1
+                };
+
+        // skip CreateTableEvent
+        List<Event> snapshotResults =
+                MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
+        RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
+        Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES))
+                .isEqualTo(expectedSnapshot);
+
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;");
+        }
+
+        Object[] expectedStreamRecord = expectedSnapshot;
+        List<Event> streamResults =
+                MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
+        RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
+
+        expectedSnapshot[4] = null;
+
+        if (useLegacyJsonFormat) {
+            // removed whitespace before value and after comma in json format string value
+            Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
+                    .containsExactly(
+                            DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+                            BinaryStringData.fromString("{\"key1\":\"value1\"}"),
+                            BinaryStringData.fromString(
+                                    "{\"key1\":\"value1\",\"key2\":\"value2\"}"),
+                            BinaryStringData.fromString(
+                                    "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
+                            null);
+        } else {
+            Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
+                    .containsExactly(expectedStreamRecord);
+        }
+    }
+
     private Instant toInstant(String ts) {
         return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant();
     }
@@ -468,9 +566,13 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
             Object[] expectedStreamRecord)
             throws Exception {
         database.createAndInitialize();
+        Boolean useLegacyJsonFormat = true;
         CloseableIterator<Event> iterator =
                 env.fromSource(
-                                getFlinkSourceProvider(new String[] {"time_types"}, database)
+                                getFlinkSourceProvider(
+                                                new String[] {"time_types"},
+                                                database,
+                                                useLegacyJsonFormat)
                                         .getSource(),
                                 WatermarkStrategy.noWatermarks(),
                                 "Event-Source")
@@ -498,7 +600,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
     }
 
     private FlinkSourceProvider getFlinkSourceProvider(
-            String[] captureTables, UniqueDatabase database) {
+            String[] captureTables, UniqueDatabase database, Boolean useLegacyJsonFormat) {
         String[] captureTableIds =
                 Arrays.stream(captureTables)
                         .map(tableName -> database.getDatabaseName() + "." + tableName)
@@ -517,7 +619,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
                         .username(database.getUsername())
                         .password(database.getPassword())
                         .serverTimeZone(ZoneId.of("UTC").toString())
-                        .serverId(MySqSourceTestUtils.getServerId(env.getParallelism()));
+                        .serverId(MySqSourceTestUtils.getServerId(env.getParallelism()))
+                        .useLegacyJsonFormat(useLegacyJsonFormat);
         return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
     }
 
@@ -577,4 +680,12 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
                     DataTypes.STRING(),
                     DataTypes.STRING(),
                     DataTypes.STRING());
+
+    private static final RowType JSON_TYPES =
+            RowType.of(
+                    DataTypes.DECIMAL(20, 0).notNull(),
+                    DataTypes.STRING(),
+                    DataTypes.STRING(),
+                    DataTypes.STRING(),
+                    DataTypes.INT());
 }
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
index 96f3b5f92..92fafb568 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
@@ -222,6 +222,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
         fullTypesMySql57Database.createAndInitialize();
 
         String[] tables = new String[] {"precision_types"};
+
         MySqlMetadataAccessor metadataAccessor =
                 getMetadataAccessor(tables, fullTypesMySql57Database, true);
 
@@ -229,6 +230,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
                 metadataAccessor.getTableSchema(
                         TableId.tableId(
                                 fullTypesMySql57Database.getDatabaseName(), "precision_types"));
+
         Schema expectedSchema =
                 Schema.newBuilder()
                         .primaryKey("id")
@@ -304,6 +306,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
                 metadataAccessor.getTableSchema(
                         TableId.tableId(
                                 fullTypesMySql8Database.getDatabaseName(), "precision_types"));
+
         Schema expectedSchema =
                 Schema.newBuilder()
                         .primaryKey("id")
@@ -370,7 +373,8 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
     private void testAccessDatabaseAndTable(UniqueDatabase database) {
         database.createAndInitialize();
 
-        String[] tables = new String[] {"common_types", "time_types", "precision_types"};
+        String[] tables =
+                new String[] {"common_types", "time_types", "precision_types", "json_types"};
         MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true);
 
         assertThatThrownBy(metadataAccessor::listNamespaces)
@@ -528,6 +532,66 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
         return new MySqlMetadataAccessor(sourceConfig);
     }
 
+    @Test
+    public void testMysql57AccessJsonTypesSchema() {
+        fullTypesMySql57Database.createAndInitialize();
+
+        String[] tables = new String[] {"json_types"};
+        MySqlMetadataAccessor metadataAccessor =
+                getMetadataAccessor(tables, fullTypesMySql57Database, true);
+
+        Schema actualSchema =
+                metadataAccessor.getTableSchema(
+                        TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .primaryKey("id")
+                        .fromRowDataType(
+                                RowType.of(
+                                        new DataType[] {
+                                            DataTypes.DECIMAL(20, 0).notNull(),
+                                            DataTypes.STRING(),
+                                            DataTypes.STRING(),
+                                            DataTypes.STRING(),
+                                            DataTypes.INT()
+                                        },
+                                        new String[] {
+                                            "id", "json_c0", "json_c1", "json_c2", "int_c",
+                                        }))
+                        .build();
+        assertThat(actualSchema).isEqualTo(expectedSchema);
+    }
+
+    @Test
+    public void testMysql8AccessJsonTypesSchema() {
+        fullTypesMySql57Database.createAndInitialize();
+
+        String[] tables = new String[] {"json_types"};
+        MySqlMetadataAccessor metadataAccessor =
+                getMetadataAccessor(tables, fullTypesMySql57Database, true);
+
+        Schema actualSchema =
+                metadataAccessor.getTableSchema(
+                        TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .primaryKey("id")
+                        .fromRowDataType(
+                                RowType.of(
+                                        new DataType[] {
+                                            DataTypes.DECIMAL(20, 0).notNull(),
+                                            DataTypes.STRING(),
+                                            DataTypes.STRING(),
+                                            DataTypes.STRING(),
+                                            DataTypes.INT()
+                                        },
+                                        new String[] {
+                                            "id", "json_c0", "json_c1", "json_c2", "int_c",
+                                        }))
+                        .build();
+        assertThat(actualSchema).isEqualTo(expectedSchema);
+    }
+
     private MySqlSourceConfig getConfig(
             String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) {
         String[] captureTableIds =
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
index 9699ed908..e7dc07e28 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
@@ -180,4 +180,22 @@ VALUES (DEFAULT,
         23,
         29,
         31,
-        37);
\ No newline at end of file
+        37);
+
+CREATE TABLE json_types
+(
+    id                   SERIAL,
+    json_c0               JSON,
+    json_c1               JSON,
+    json_c2               JSON,
+    int_c  INTEGER,
+    PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO json_types
+VALUES (DEFAULT,
+        '{"key1":"value1"}',
+        '{"key1":"value1","key2":"value2"}',
+        '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]',
+        1
+       );
\ No newline at end of file
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
index 8abe8868c..92548381f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
@@ -185,3 +185,21 @@ VALUES (DEFAULT,
         29,
         31,
         37);
+
+CREATE TABLE json_types
+(
+    id                   SERIAL,
+    json_c0               JSON,
+    json_c1               JSON,
+    json_c2               JSON,
+    int_c  INTEGER,
+    PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO json_types
+VALUES (DEFAULT,
+        '{"key1":"value1"}',
+        '{"key1":"value1","key2":"value2"}',
+        '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]',
+        1
+        );
\ No newline at end of file
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java
new file mode 100644
index 000000000..7154496c0
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.shyiko.mysql.binlog.event.deserialization.json;
+
+import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Base64;
+
+import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig.useLegacyJsonFormat;
+
+/**
+ * Copied from mysql-binlog-connector-java 0.27.2 to add whitespace before value and after comma.
+ *
+ * <p>Line 105: Added whitespace before value, Line 213: Added whitespace after comma
+ */
+public class JsonStringFormatter implements JsonFormatter {
+
+    /**
+     * Value used for lookup tables to indicate that matching characters do not need to be escaped.
+     */
+    private static final int ESCAPE_NONE = 0;
+
+    /**
+     * Value used for lookup tables to indicate that matching characters are to be escaped using
+     * standard escaping; for JSON this means (for example) using "backslash - u" escape method.
+     */
+    private static final int ESCAPE_GENERIC = -1;
+
+    /**
+     * A lookup table that determines which of the first 128 Unicode code points (single-byte UTF-8
+     * characters) must be escaped. A value of '0' means no escaping is required; positive values
+     * must be escaped with a preceding backslash; and negative values that generic escaping (e.g.,
+     */
+    private static final int[] ESCAPES;
+
+    static {
+        int[] escape = new int[128];
+        // Generic escape for control characters ...
+        for (int i = 0; i < 32; ++i) {
+            escape[i] = ESCAPE_GENERIC;
+        }
+        // Backslash escape for other specific characters ...
+        escape['"'] = '"';
+        escape['\\'] = '\\';
+        // Escaping of slash is optional, so let's not add it
+        escape[0x08] = 'b';
+        escape[0x09] = 't';
+        escape[0x0C] = 'f';
+        escape[0x0A] = 'n';
+        escape[0x0D] = 'r';
+        ESCAPES = escape;
+    }
+
+    private static final char[] HEX_CODES = "0123456789ABCDEF".toCharArray();
+
+    private final StringBuilder sb = new StringBuilder();
+
+    @Override
+    public String toString() {
+        return getString();
+    }
+
+    public String getString() {
+        return sb.toString();
+    }
+
+    @Override
+    public void beginObject(int numElements) {
+        sb.append('{');
+    }
+
+    @Override
+    public void beginArray(int numElements) {
+        sb.append('[');
+    }
+
+    @Override
+    public void endObject() {
+        sb.append('}');
+    }
+
+    @Override
+    public void endArray() {
+        sb.append(']');
+    }
+
+    @Override
+    public void name(String name) {
+        sb.append('"');
+        appendString(name);
+        if (useLegacyJsonFormat) {
+            sb.append("\":");
+        } else {
+            sb.append("\": ");
+        }
+    }
+
+    @Override
+    public void value(String value) {
+        sb.append('"');
+        appendString(value);
+        sb.append('"');
+    }
+
+    @Override
+    public void value(int value) {
+        sb.append(Integer.toString(value));
+    }
+
+    @Override
+    public void value(long value) {
+        sb.append(Long.toString(value));
+    }
+
+    @Override
+    public void value(double value) {
+        // Double's toString method will result in scientific notation and loss of precision
+        String str = Double.toString(value);
+        if (str.contains("E")) {
+            value(new BigDecimal(value));
+        } else {
+            sb.append(str);
+        }
+    }
+
+    @Override
+    public void value(BigInteger value) {
+        // Using the BigInteger.toString() method will result in scientific notation, so instead ...
+        value(new BigDecimal(value));
+    }
+
+    @Override
+    public void value(BigDecimal value) {
+        // Using the BigInteger.toString() method will result in scientific notation, so instead ...
+        sb.append(value.toPlainString());
+    }
+
+    @Override
+    public void value(boolean value) {
+        sb.append(Boolean.toString(value));
+    }
+
+    @Override
+    public void valueNull() {
+        sb.append("null");
+    }
+
+    @Override
+    public void valueYear(int year) {
+        sb.append(year);
+    }
+
+    @Override
+    public void valueDate(int year, int month, int day) {
+        sb.append('"');
+        appendDate(year, month, day);
+        sb.append('"');
+    }
+
+    @Override
+    // checkstyle, please ignore ParameterNumber for the next line
+    public void valueDatetime(
+            int year, int month, int day, int hour, int min, int sec, int microSeconds) {
+        sb.append('"');
+        appendDate(year, month, day);
+        sb.append(' ');
+        appendTime(hour, min, sec, microSeconds);
+        sb.append('"');
+    }
+
+    @Override
+    public void valueTime(int hour, int min, int sec, int microSeconds) {
+        sb.append('"');
+        if (hour < 0) {
+            sb.append('-');
+            hour = Math.abs(hour);
+        }
+        appendTime(hour, min, sec, microSeconds);
+        sb.append('"');
+    }
+
+    @Override
+    public void valueTimestamp(long secondsPastEpoch, int microSeconds) {
+        sb.append(secondsPastEpoch);
+        appendSixDigitUnsignedInt(microSeconds, false);
+    }
+
+    @Override
+    public void valueOpaque(ColumnType type, byte[] value) {
+        sb.append('"');
+        sb.append(Base64.getEncoder().encodeToString(value));
+        sb.append('"');
+    }
+
+    @Override
+    public void nextEntry() {
+        if (useLegacyJsonFormat) {
+            sb.append(",");
+        } else {
+            sb.append(", ");
+        }
+    }
+
+    /**
+     * Append a string by escaping any characters that must be escaped.
+     *
+     * @param original the string to be written; may not be null
+     */
+    protected void appendString(String original) {
+        for (int i = 0, len = original.length(); i < len; ++i) {
+            char c = original.charAt(i);
+            int ch = c;
+            if (ch < 0 || ch >= ESCAPES.length || ESCAPES[ch] == 0) {
+                sb.append(c);
+                continue;
+            }
+            int escape = ESCAPES[ch];
+            if (escape > 0) { // 2-char escape, fine
+                sb.append('\\');
+                sb.append((char) escape);
+            } else {
+                unicodeEscape(ch);
+            }
+        }
+    }
+
+    /**
+     * Append a generic Unicode escape for given character.
+     *
+     * @param charToEscape the character to escape
+     */
+    private void unicodeEscape(int charToEscape) {
+        sb.append('\\');
+        sb.append('u');
+        if (charToEscape > 0xFF) {
+            int hi = (charToEscape >> 8) & 0xFF;
+            sb.append(HEX_CODES[hi >> 4]);
+            sb.append(HEX_CODES[hi & 0xF]);
+            charToEscape &= 0xFF;
+        } else {
+            sb.append('0');
+            sb.append('0');
+        }
+        // We know it's a control char, so only the last 2 chars are non-0
+        sb.append(HEX_CODES[charToEscape >> 4]);
+        sb.append(HEX_CODES[charToEscape & 0xF]);
+    }
+
+    protected void appendTwoDigitUnsignedInt(int value) {
+        assert value >= 0;
+        assert value < 100;
+        if (value < 10) {
+            sb.append("0").append(value);
+        } else {
+            sb.append(value);
+        }
+    }
+
+    protected void appendFourDigitUnsignedInt(int value) {
+        if (value < 10) {
+            sb.append("000").append(value);
+        } else if (value < 100) {
+            sb.append("00").append(value);
+        } else if (value < 1000) {
+            sb.append("0").append(value);
+        } else {
+            sb.append(value);
+        }
+    }
+
+    protected void appendSixDigitUnsignedInt(int value, boolean trimTrailingZeros) {
+        assert value > 0;
+        assert value < 1000000;
+        // Add prefixes if necessary ...
+        if (value < 10) {
+            sb.append("00000");
+        } else if (value < 100) {
+            sb.append("0000");
+        } else if (value < 1000) {
+            sb.append("000");
+        } else if (value < 10000) {
+            sb.append("00");
+        } else if (value < 100000) {
+            sb.append("0");
+        }
+        if (trimTrailingZeros) {
+            // Remove any trailing 0's ...
+            for (int i = 0; i != 6; ++i) {
+                if (value % 10 == 0) {
+                    value /= 10;
+                }
+            }
+            sb.append(value);
+        }
+    }
+
+    protected void appendDate(int year, int month, int day) {
+        if (year < 0) {
+            sb.append('-');
+            year = Math.abs(year);
+        }
+        appendFourDigitUnsignedInt(year);
+        sb.append('-');
+        appendTwoDigitUnsignedInt(month);
+        sb.append('-');
+        appendTwoDigitUnsignedInt(day);
+    }
+
+    protected void appendTime(int hour, int min, int sec, int microSeconds) {
+        appendTwoDigitUnsignedInt(hour);
+        sb.append(':');
+        appendTwoDigitUnsignedInt(min);
+        sb.append(':');
+        appendTwoDigitUnsignedInt(sec);
+        if (microSeconds != 0) {
+            sb.append('.');
+            appendSixDigitUnsignedInt(microSeconds, true);
+        }
+    }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java
index 49187838f..fb512a1eb 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.cdc.connectors.mysql;
 
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
@@ -157,6 +158,11 @@ public class MySqlSource {
             return this;
         }
 
+        public Builder<T> useLegacyJsonFormat(boolean useLegacyJsonFormat) {
+            MySqlSourceConfig.useLegacyJsonFormat = useLegacyJsonFormat;
+            return this;
+        }
+
         public DebeziumSourceFunction<T> build() {
             Properties props = new Properties();
             props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
index 3fce1eb61..05081db57 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
@@ -253,6 +253,15 @@ public class MySqlSourceBuilder<T> {
         return this;
     }
 
+    /**
+     * Whether to use legacy json format. The default value is true, which means there is no
+     * whitespace before value and after comma in json format.
+     */
+    public MySqlSourceBuilder<T> useLegacyJsonFormat(boolean useLegacyJsonFormat) {
+        this.configFactory.useLegacyJsonFormat(useLegacyJsonFormat);
+        return this;
+    }
+
     /**
      * Whether to close idle readers at the end of the snapshot phase. This feature depends on
      * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 56d510516..a7cd09ddf 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -67,6 +67,7 @@ public class MySqlSourceConfig implements Serializable {
     private final Map<ObjectPath, String> chunkKeyColumns;
     private final boolean skipSnapshotBackfill;
     private final boolean parseOnLineSchemaChanges;
+    public static boolean useLegacyJsonFormat = true;
 
     // --------------------------------------------------------------------------------------------
     // Debezium Configurations
@@ -103,7 +104,8 @@ public class MySqlSourceConfig implements Serializable {
             Map<ObjectPath, String> chunkKeyColumns,
             boolean skipSnapshotBackfill,
             boolean parseOnLineSchemaChanges,
-            boolean treatTinyInt1AsBoolean) {
+            boolean treatTinyInt1AsBoolean,
+            boolean useLegacyJsonFormat) {
         this.hostname = checkNotNull(hostname);
         this.port = port;
         this.username = checkNotNull(username);
@@ -133,6 +135,7 @@ public class MySqlSourceConfig implements Serializable {
         this.skipSnapshotBackfill = skipSnapshotBackfill;
         this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
         this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
+        this.useLegacyJsonFormat = useLegacyJsonFormat;
     }
 
     public String getHostname() {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index c1459bf23..1ec841947 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -72,6 +72,7 @@ public class MySqlSourceConfigFactory implements Serializable {
     private boolean skipSnapshotBackfill = false;
     private boolean parseOnLineSchemaChanges = false;
     private boolean treatTinyInt1AsBoolean = true;
+    private boolean useLegacyJsonFormat = true;
 
     public MySqlSourceConfigFactory hostname(String hostname) {
         this.hostname = hostname;
@@ -278,6 +279,15 @@ public class MySqlSourceConfigFactory implements Serializable {
         return this;
     }
 
+    /**
+     * Whether to use legacy json format. The default value is true, which means there is no
+     * whitespace before value and after comma in json format.
+     */
+    public MySqlSourceConfigFactory useLegacyJsonFormat(boolean useLegacyJsonFormat) {
+        this.useLegacyJsonFormat = useLegacyJsonFormat;
+        return this;
+    }
+
     /**
      * Whether to close idle readers at the end of the snapshot phase. This feature depends on
      * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
@@ -399,6 +409,7 @@ public class MySqlSourceConfigFactory implements Serializable {
                 chunkKeyColumns,
                 skipSnapshotBackfill,
                 parseOnLineSchemaChanges,
-                treatTinyInt1AsBoolean);
+                treatTinyInt1AsBoolean,
+                useLegacyJsonFormat);
     }
 }
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index 87de37e05..5f2935932 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -270,4 +270,12 @@ public class MySqlSourceOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
+            ConfigOptions.key("use.legacy.json.format")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
 }
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index 8d023ad30..3c5d6d63b 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -99,6 +99,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
     private final String chunkKeyColumn;
     final boolean skipSnapshotBackFill;
     final boolean parseOnlineSchemaChanges;
+    private final boolean useLegacyJsonFormat;
 
     // --------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -137,7 +138,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
             Duration heartbeatInterval,
             @Nullable String chunkKeyColumn,
             boolean skipSnapshotBackFill,
-            boolean parseOnlineSchemaChanges) {
+            boolean parseOnlineSchemaChanges,
+            boolean useLegacyJsonFormat) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -168,6 +170,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
         this.heartbeatInterval = heartbeatInterval;
         this.chunkKeyColumn = chunkKeyColumn;
         this.skipSnapshotBackFill = skipSnapshotBackFill;
+        this.useLegacyJsonFormat = useLegacyJsonFormat;
     }
 
     @Override
@@ -224,6 +227,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                             .chunkKeyColumn(new ObjectPath(database, tableName), chunkKeyColumn)
                             .skipSnapshotBackfill(skipSnapshotBackFill)
                             .parseOnLineSchemaChanges(parseOnlineSchemaChanges)
+                            .useLegacyJsonFormat(useLegacyJsonFormat)
                             .build();
             return SourceProvider.of(parallelSource);
         } else {
@@ -310,7 +314,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                         heartbeatInterval,
                         chunkKeyColumn,
                         skipSnapshotBackFill,
-                        parseOnlineSchemaChanges);
+                        parseOnlineSchemaChanges,
+                        useLegacyJsonFormat);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index c3404db42..5dadc646b 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -104,6 +104,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
                 config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         boolean parseOnLineSchemaChanges =
                 config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
+        boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
 
         if (enableParallelRead) {
             validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
@@ -148,7 +149,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
                 heartbeatInterval,
                 chunkKeyColumn,
                 skipSnapshotBackFill,
-                parseOnLineSchemaChanges);
+                parseOnLineSchemaChanges,
+                useLegacyJsonFormat);
     }
 
     @Override
@@ -195,6 +197,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
         options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
         options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
+        options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
         return options;
     }
 
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index e5acc934f..6890ca78f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -56,6 +56,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOpt
 import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
 import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.USE_LEGACY_JSON_FORMAT;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -129,7 +130,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -176,7 +178,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         "testCol",
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -219,7 +222,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -260,7 +264,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -280,6 +285,7 @@ public class MySqlTableSourceFactoryTest {
         options.put("scan.incremental.snapshot.chunk.key-column", "testCol");
         options.put("scan.incremental.close-idle-reader.enabled", "true");
         options.put("scan.incremental.snapshot.backfill.skip", "true");
+        options.put("use.legacy.json.format", "true");
 
         DynamicTableSource actualSource = createTableSource(options);
         Properties dbzProperties = new Properties();
@@ -317,7 +323,8 @@ public class MySqlTableSourceFactoryTest {
                         Duration.ofMillis(15213),
                         "testCol",
                         true,
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        true);
         assertEquals(expectedSource, actualSource);
         assertTrue(actualSource instanceof MySqlTableSource);
         MySqlTableSource actualMySqlTableSource = (MySqlTableSource) actualSource;
@@ -372,7 +379,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -411,7 +419,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -451,7 +460,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -492,7 +502,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -531,7 +542,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -575,7 +587,8 @@ public class MySqlTableSourceFactoryTest {
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
+                        USE_LEGACY_JSON_FORMAT.defaultValue());
         expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
 
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql
index e9145a7e8..b75fd69f0 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql
@@ -111,4 +111,4 @@ CREATE TABLE user_info
     balance   DECIMAL(18, 2),
     balance2  DECIMAL(18, 2),
     PRIMARY KEY (user_id)
-) DEFAULT CHARSET=utf8;
\ No newline at end of file
+) DEFAULT CHARSET=utf8;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql
index 0a202f1ca..842cfcc51 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql
@@ -100,4 +100,4 @@ VALUES (DEFAULT, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215
         ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
         ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
         ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'),
-        ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))'));
\ No newline at end of file
+        ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))'));