[mongodb] Correct the TypeInformation argument of DebeziumDeserializationSchema in SourceFuntion (#780)

pull/925/head
Zongwen Li 3 years ago committed by GitHub
parent 8002de4a5b
commit a8879bf4b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -135,8 +135,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toPhysicalRowDataType());
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
new MongoDBConnectorDeserializationSchema(

@ -28,9 +28,14 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.junit.Test;
import java.time.ZoneId;
@ -43,6 +48,7 @@ import java.util.Map;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_ALL;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -185,6 +191,13 @@ public class MongoDBTableFactoryTest {
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
mongoDBSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
DebeziumSourceFunction<RowData> debeziumSourceFunction =
(DebeziumSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(debeziumSourceFunction, expectedSource.producedDataType);
}
@Test

@ -29,10 +29,15 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.junit.Test;
import java.util.ArrayList;
@ -42,6 +47,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -202,6 +208,13 @@ public class OracleTableSourceFactoryTest {
Arrays.asList("op_ts", "database_name", "table_name", "schema_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
oracleTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
DebeziumSourceFunction<RowData> debeziumSourceFunction =
(DebeziumSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(debeziumSourceFunction, expectedSource.producedDataType);
}
@Test

@ -29,10 +29,15 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.junit.Test;
import java.util.ArrayList;
@ -42,6 +47,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -160,6 +166,13 @@ public class PostgreSQLTableFactoryTest {
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
postgreSQLTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
DebeziumSourceFunction<RowData> debeziumSourceFunction =
(DebeziumSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(debeziumSourceFunction, expectedSource.producedDataType);
}
@Test

@ -28,8 +28,13 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.junit.Test;
import java.time.ZoneId;
@ -40,6 +45,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals;
/** Test for {@link SqlServerTableSource} created by {@link SqlServerTableFactory}. */
@ -154,6 +160,13 @@ public class SqlServerTableFactoryTest {
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
sqlServerTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
DebeziumSourceFunction<RowData> debeziumSourceFunction =
(DebeziumSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(debeziumSourceFunction, expectedSource.producedDataType);
}
private Map<String, String> getAllOptions() {

@ -47,6 +47,12 @@ under the License.
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>

@ -18,15 +18,23 @@
package com.ververica.cdc.connectors.utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/** Utilities for asserting {@link SourceRecord}. */
/** Utilities for asserting {@link SourceRecord} and {@link DebeziumSourceFunction}. */
public class AssertUtils {
/**
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#CREATE
@ -220,4 +228,21 @@ public class AssertUtils {
hasValidKey(record, pkField, pk);
assertDelete(record, true);
}
/**
* Verify that the given produced data type of {@code DebeziumSourceFunction<RowData>} matches
* the resolved schema data type.
*
* @param debeziumSourceFunction the actual DebeziumSourceFunction
* @param expectedProducedType expected DataType of resolved schema
*/
public static void assertProducedTypeOfSourceFunction(
DebeziumSourceFunction<RowData> debeziumSourceFunction, DataType expectedProducedType) {
TypeInformation<RowData> producedType = debeziumSourceFunction.getProducedType();
assertThat(producedType, instanceOf(InternalTypeInfo.class));
InternalTypeInfo<RowData> rowDataInternalTypeInfo =
(InternalTypeInfo<RowData>) producedType;
DataType producedDataType = rowDataInternalTypeInfo.getDataType();
assertEquals(expectedProducedType.toString(), producedDataType.toString());
}
}

Loading…
Cancel
Save