[oracle][mysql] Improve the Oracle all data types test and clean up debug logs

pull/1590/merge
Leonard Xu 2 years ago committed by Leonard Xu
parent 228fbe650a
commit e4dcd10ec8

@ -671,7 +671,6 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
formatResult( formatResult(
pollRecordsFromReader(reader, RecordUtils::isDataChangeRecord), pollRecordsFromReader(reader, RecordUtils::isDataChangeRecord),
dataType); dataType);
results.forEach(System.out::println);
actual.addAll(results); actual.addAll(results);
} }
return actual; return actual;

@ -51,7 +51,9 @@ import java.util.Objects;
import static com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare; import static com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
import static java.math.BigDecimal.ROUND_CEILING; import static java.math.BigDecimal.ROUND_CEILING;
/** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ /**
* The {@code ChunkSplitter} used to split Oracle table into a set of chunks for JDBC data source.
*/
public class OracleChunkSplitter implements JdbcSourceChunkSplitter { public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(OracleChunkSplitter.class); private static final Logger LOG = LoggerFactory.getLogger(OracleChunkSplitter.class);

@ -33,6 +33,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/** A factory to initialize {@link OracleSourceConfig}. */ /** A factory to initialize {@link OracleSourceConfig}. */
public class OracleSourceConfigFactory extends JdbcSourceConfigFactory { public class OracleSourceConfigFactory extends JdbcSourceConfigFactory {
private static final long serialVersionUID = 1L;
private static final String DATABASE_SERVER_NAME = "oracle_logminer"; private static final String DATABASE_SERVER_NAME = "oracle_logminer";
private static final String DRIVER_ClASS_NAME = "oracle.jdbc.OracleDriver"; private static final String DRIVER_ClASS_NAME = "oracle.jdbc.OracleDriver";

@ -40,7 +40,6 @@ import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.time.ZoneId;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;

@ -17,10 +17,7 @@
package com.ververica.cdc.connectors.oracle.util; package com.ververica.cdc.connectors.oracle.util;
import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.oracle.source.utils.OracleTypeUtils;
import io.debezium.relational.Column; import io.debezium.relational.Column;
import io.debezium.relational.Table; import io.debezium.relational.Table;
import oracle.sql.ROWID; import oracle.sql.ROWID;
@ -32,24 +29,11 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
/** Utilities to split chunks of table. */ /** Utilities to split chunks of table. */
public class ChunkUtils { public class ChunkUtils {
private ChunkUtils() {} private ChunkUtils() {}
public static RowType getChunkKeyColumnType(Table table, @Nullable String chunkKeyColumn) {
return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumn));
}
public static RowType getChunkKeyColumnType(Column chunkKeyColumn) {
return (RowType)
ROW(FIELD(chunkKeyColumn.name(), OracleTypeUtils.fromDbzColumn(chunkKeyColumn)))
.getLogicalType();
}
public static Column getChunkKeyColumn(Table table, @Nullable String chunkKeyColumn) { public static Column getChunkKeyColumn(Table table, @Nullable String chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns(); List<Column> primaryKeys = table.primaryKeyColumns();
@ -69,15 +53,7 @@ public class ChunkUtils {
table.id())); table.id()));
} }
// use the ROWID columns as the chunk key column by default in oracle // Use the ROWID column as the chunk key column by default for oracle cdc connector
return Column.editor().jdbcType(Types.VARCHAR).name(ROWID.class.getSimpleName()).create(); return Column.editor().jdbcType(Types.VARCHAR).name(ROWID.class.getSimpleName()).create();
} }
/** Returns next meta group id according to received meta number and meta group size. */
public static int getNextMetaGroupId(int receivedMetaNum, int metaGroupSize) {
Preconditions.checkState(metaGroupSize > 0);
return receivedMetaNum % metaGroupSize == 0
? (receivedMetaNum / metaGroupSize)
: (receivedMetaNum / metaGroupSize) + 1;
}
} }

@ -119,49 +119,4 @@ public class OracleChangeEventSourceExampleTest {
env.execute("Print Oracle Snapshot + RedoLog"); env.execute("Print Oracle Snapshot + RedoLog");
} }
@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testConsumingAllEventsByUserChunkKeyColumn() throws Exception {
LOG.info(
"getOraclePort:{},getUsername:{},getPassword:{}",
oracleContainer.getOraclePort(),
oracleContainer.getUsername(),
oracleContainer.getPassword());
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy", "online_catalog");
debeziumProperties.setProperty("log.mining.continuous.mine", "true");
JdbcIncrementalSource<String> oracleChangeEventSource =
new OracleSourceBuilder()
.hostname(oracleContainer.getHost())
.port(oracleContainer.getOraclePort())
.databaseList("XE")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username(oracleContainer.getUsername())
.password(oracleContainer.getPassword())
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.chunkKeyColumn("ID")
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(DEFAULT_CHECKPOINT_INTERVAL);
// set the source parallelism to 4
env.fromSource(
oracleChangeEventSource,
WatermarkStrategy.noWatermarks(),
"OracleParallelSource")
.setParallelism(DEFAULT_PARALLELISM)
.print()
.setParallelism(1);
env.execute("Print Oracle Snapshot + RedoLog");
}
} }

@ -261,7 +261,6 @@ public class OracleSourceITCase extends OracleSourceTestBase {
while (size > 0 && iter.hasNext()) { while (size > 0 && iter.hasNext()) {
Row row = iter.next(); Row row = iter.next();
rows.add(row.toString()); rows.add(row.toString());
LOG.info("fetch row:{}", row);
size--; size--;
} }
return rows; return rows;
@ -289,7 +288,7 @@ public class OracleSourceITCase extends OracleSourceTestBase {
statement.execute("DROP TABLE DEBEZIUM.CUSTOMERS"); statement.execute("DROP TABLE DEBEZIUM.CUSTOMERS");
statement.execute("DROP TABLE DEBEZIUM.CUSTOMERS_1"); statement.execute("DROP TABLE DEBEZIUM.CUSTOMERS_1");
} catch (Exception e) { } catch (Exception e) {
LOG.info("DEBEZIUM.CUSTOMERS DEBEZIUM.CUSTOMERS_1 NOT EXITS"); LOG.error("DEBEZIUM.CUSTOMERS DEBEZIUM.CUSTOMERS_1 NOT EXITS", e);
} }
final List<String> statements = final List<String> statements =

@ -204,7 +204,6 @@ public class OracleConnectorITCase extends AbstractTestBase {
}; };
List<String> actual = TestValuesTableFactory.getResults("sink"); List<String> actual = TestValuesTableFactory.getResults("sink");
LOG.info("actual:{}", actual);
assertEqualsInAnyOrder(Arrays.asList(expected), actual); assertEqualsInAnyOrder(Arrays.asList(expected), actual);
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
@ -370,7 +369,7 @@ public class OracleConnectorITCase extends AbstractTestBase {
statement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112"); statement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
statement.execute("DELETE FROM debezium.products WHERE ID=112"); statement.execute("DELETE FROM debezium.products WHERE ID=112");
} }
waitForSinkSize("sink", 16);
List<String> expected = List<String> expected =
Arrays.asList( Arrays.asList(
"+I[XE, DEBEZIUM, PRODUCTS, 101, scooter, Small 2-wheel scooter, 3.140]", "+I[XE, DEBEZIUM, PRODUCTS, 101, scooter, Small 2-wheel scooter, 3.140]",
@ -381,11 +380,15 @@ public class OracleConnectorITCase extends AbstractTestBase {
"+I[XE, DEBEZIUM, PRODUCTS, 106, hammer, 16oz carpenters hammer, 1.000]", "+I[XE, DEBEZIUM, PRODUCTS, 106, hammer, 16oz carpenters hammer, 1.000]",
"+I[XE, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.300]", "+I[XE, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.300]",
"+I[XE, DEBEZIUM, PRODUCTS, 108, jacket, water resistent black wind breaker, 0.100]", "+I[XE, DEBEZIUM, PRODUCTS, 108, jacket, water resistent black wind breaker, 0.100]",
"+I[XE, DEBEZIUM, PRODUCTS, 109, spare tire, 24 inch spare tire, 22.200]"); "+I[XE, DEBEZIUM, PRODUCTS, 109, spare tire, 24 inch spare tire, 22.200]",
"+I[XE, DEBEZIUM, PRODUCTS, 111, jacket, water resistent white wind breaker, 0.200]",
"+I[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.180]",
"+U[XE, DEBEZIUM, PRODUCTS, 106, hammer, 18oz carpenter hammer, 1.000]",
"+U[XE, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.100]",
"+U[XE, DEBEZIUM, PRODUCTS, 111, jacket, new water resistent white wind breaker, 0.500]",
"+U[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]",
"-D[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]");
// TODO: we can't assert merged result for incremental-snapshot, because we can't add a
// keyby shuffle before "values" upsert sink. We should assert merged result once
// https://issues.apache.org/jira/browse/FLINK-24511 is fixed.
List<String> actual = TestValuesTableFactory.getRawResults("sink"); List<String> actual = TestValuesTableFactory.getRawResults("sink");
Collections.sort(expected); Collections.sort(expected);
Collections.sort(actual); Collections.sort(actual);

@ -14,6 +14,8 @@
-- ---------------------------------------------------------------------------------------------------------------- -- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: column_type_test -- DATABASE: column_type_test
-- ---------------------------------------------------------------------------------------------------------------- -- ----------------------------------------------------------------------------------------------------------------
-- Set session timezone to fixed Asia/Shanghai for checking TIMESTAMP_LTZ type
ALTER SESSION SET TIME_ZONE='Asia/Shanghai';
create table DEBEZIUM.FULL_TYPES ( create table DEBEZIUM.FULL_TYPES (
ID NUMBER(9) not null, ID NUMBER(9) not null,

Loading…
Cancel
Save