diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataField.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataField.java
index 2adcbb714..f2488a273 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataField.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataField.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.common.types;
 
 import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
 import org.apache.flink.cdc.common.utils.Preconditions;
 
 import javax.annotation.Nullable;
@@ -116,4 +117,12 @@ public class DataField implements Serializable {
                     escapeSingleQuotes(description));
         }
     }
+
+    public org.apache.flink.table.api.DataTypes.Field toFlinkDataTypeField() {
+        return description == null
+                ? org.apache.flink.table.api.DataTypes.FIELD(
+                        name, DataTypeUtils.toFlinkDataType(type))
+                : org.apache.flink.table.api.DataTypes.FIELD(
+                        name, DataTypeUtils.toFlinkDataType(type), description);
+    }
 }
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java
index 38ef13c0d..cc3405008 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java
@@ -24,12 +24,15 @@ import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.StringData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.types.DataField;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** Utilities for handling {@link DataType}s. */
 public class DataTypeUtils {
@@ -134,8 +137,12 @@ public class DataTypeUtils {
                         toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1)));
             case ROW:
                 Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
-                return org.apache.flink.table.api.DataTypes.ROW(
-                        children.toArray(new org.apache.flink.table.types.DataType[] {}));
+                RowType rowType = (RowType) type;
+                List<org.apache.flink.table.api.DataTypes.Field> fields =
+                        rowType.getFields().stream()
+                                .map(DataField::toFlinkDataTypeField)
+                                .collect(Collectors.toList());
+                return org.apache.flink.table.api.DataTypes.ROW(fields);
             default:
                 throw new IllegalArgumentException("Illegal type: " + type);
         }
diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java
new file mode 100644
index 000000000..5853f5769
--- /dev/null
+++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/types/utils/DataTypeUtilsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.types.utils;
+
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link org.apache.flink.cdc.common.types.utils.DataTypeUtils}. */
+class DataTypeUtilsTest {
+    private static final DataType[] ALL_TYPES =
+            new DataType[] {
+                DataTypes.BOOLEAN(),
+                DataTypes.BYTES(),
+                DataTypes.BINARY(10),
+                DataTypes.VARBINARY(10),
+                DataTypes.CHAR(10),
+                DataTypes.VARCHAR(10),
+                DataTypes.STRING(),
+                DataTypes.INT(),
+                DataTypes.TINYINT(),
+                DataTypes.SMALLINT(),
+                DataTypes.BIGINT(),
+                DataTypes.DOUBLE(),
+                DataTypes.FLOAT(),
+                DataTypes.DECIMAL(6, 3),
+                DataTypes.DATE(),
+                DataTypes.TIME(),
+                DataTypes.TIME(6),
+                DataTypes.TIMESTAMP(),
+                DataTypes.TIMESTAMP(6),
+                DataTypes.TIMESTAMP_LTZ(),
+                DataTypes.TIMESTAMP_LTZ(6),
+                DataTypes.TIMESTAMP_TZ(),
+                DataTypes.TIMESTAMP_TZ(6),
+                DataTypes.ARRAY(DataTypes.BIGINT()),
+                DataTypes.MAP(DataTypes.SMALLINT(), DataTypes.STRING()),
+                DataTypes.ROW(
+                        DataTypes.FIELD("f1", DataTypes.STRING()),
+                        DataTypes.FIELD("f2", DataTypes.STRING(), "desc")),
+                DataTypes.ROW(DataTypes.SMALLINT(), DataTypes.STRING())
+            };
+
+    @Test
+    void testToFlinkDataType() {
+        List<DataField> list =
+                IntStream.range(0, ALL_TYPES.length)
+                        .mapToObj(i -> DataTypes.FIELD("f" + i, ALL_TYPES[i]))
+                        .collect(Collectors.toList());
+
+        org.apache.flink.table.types.DataType dataType =
+                DataTypeUtils.toFlinkDataType(new RowType(list));
+
+        org.apache.flink.table.types.DataType expectedDataType =
+                ROW(
+                        FIELD("f0", BOOLEAN()),
+                        FIELD("f1", BYTES()),
+                        FIELD("f2", BINARY(10)),
+                        FIELD("f3", VARBINARY(10)),
+                        FIELD("f4", CHAR(10)),
+                        FIELD("f5", VARCHAR(10)),
+                        FIELD("f6", STRING()),
+                        FIELD("f7", INT()),
+                        FIELD("f8", TINYINT()),
+                        FIELD("f9", SMALLINT()),
+                        FIELD("f10", BIGINT()),
+                        FIELD("f11", DOUBLE()),
+                        FIELD("f12", FLOAT()),
+                        FIELD("f13", DECIMAL(6, 3)),
+                        FIELD("f14", DATE()),
+                        FIELD("f15", TIME()),
+                        FIELD("f16", TIME(6)),
+                        FIELD("f17", TIMESTAMP_WITH_TIME_ZONE()),
+                        FIELD("f18", TIMESTAMP_WITH_TIME_ZONE(6)),
+                        FIELD("f19", TIMESTAMP_LTZ()),
+                        FIELD("f20", TIMESTAMP_LTZ(6)),
+                        FIELD("f21", TIMESTAMP_WITH_TIME_ZONE()),
+                        FIELD("f22", TIMESTAMP_WITH_TIME_ZONE(6)),
+                        FIELD("f23", ARRAY(BIGINT())),
+                        FIELD("f24", MAP(SMALLINT(), STRING())),
+                        FIELD("f25", ROW(FIELD("f1", STRING()), FIELD("f2", STRING(), "desc"))),
+                        FIELD("f26", ROW(SMALLINT(), STRING())));
+
+        assertThat(dataType).isEqualTo(expectedDataType);
+    }
+}