[oracle] Support XMLType for Oracle CDC Connector (#1342)

This closes #1341
pull/1354/head
Matrix42 3 years ago committed by GitHub
parent 2f3c872306
commit 8f6078d12a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -80,6 +80,13 @@ under the License.
<artifactId>ojdbc8</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.xml</groupId>
<artifactId>xdb</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>

@ -418,6 +418,81 @@ public class OracleConnectorITCase extends AbstractTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testXmlType() throws Exception {
// Prepare xml type data
try (Connection connection = OracleTestUtils.testConnection(oracleContainer);
Statement statement = connection.createStatement()) {
statement.execute(
"CREATE TABLE debezium.xmltype_table ("
+ " ID NUMBER(4),"
+ " T15VARCHAR sys.xmltype,"
+ " PRIMARY KEY (ID))");
statement.execute(
"INSERT INTO debezium.xmltype_table "
+ "VALUES (11, sys.xmlType.createXML('<name><a id=\"1\" value=\"some values\">test xmlType</a></name>'))");
}
String sourceDDL =
String.format(
"CREATE TABLE test_xmltype_table ("
+ " ID INT,"
+ " T15VARCHAR STRING,"
+ " PRIMARY KEY (ID) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'oracle-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = 'XE',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
oracleContainer.getHost(),
oracleContainer.getOraclePort(),
"dbzuser",
"dbz",
"debezium",
"xmltype_table");
String sinkDDL =
"CREATE TABLE test_xmltype_sink ("
+ " id INT,"
+ " T15VARCHAR STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '1'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result =
tEnv.executeSql("INSERT INTO test_xmltype_sink SELECT * FROM test_xmltype_table");
waitForSnapshotStarted("test_xmltype_sink");
// waiting for change events finished.
waitForSinkSize("test_xmltype_sink", 1);
String lineSeparator = System.getProperty("line.separator");
String expectedResult =
String.format(
"+I[11, <name>%s"
+ " <a id=\"1\" value=\"some values\">test xmlType</a>%s"
+ "</name>]",
lineSeparator, lineSeparator);
List<String> expected = Arrays.asList(expectedResult);
List<String> actual = TestValuesTableFactory.getRawResults("test_xmltype_sink");
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
// ------------------------------------------------------------------------------------
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {

@ -62,6 +62,7 @@ under the License.
<include>org.antlr:antlr4-runtime</include>
<include>com.github.jsqlparser:jsqlparser</include>
<include>com.oracle.ojdbc:*</include>
<include>com.oracle.database.xml:*</include>
<include>org.apache.kafka:*</include>
<include>com.fasterxml.*:*</include>
<include>com.google.guava:*</include>

Loading…
Cancel
Save