diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml index 8aa2644b3..24974cb6c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml @@ -27,7 +27,7 @@ limitations under the License. flink-cdc-pipeline-connector-doris - 1.6.2 + 24.0.1 diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 84eeebbca..4d28b8f77 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -193,6 +193,21 @@ public class DorisMetadataApplierITCase extends DorisSinkTestBase { tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19)))); } + private List generateAlterColumnTypeWithDefaultValueEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null, "2.71828")) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null, "Alice")) + .primaryKey("id") + .build(); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19)))); + } + private List generateNarrowingAlterColumnTypeEvents(TableId tableId) { Schema schema = Schema.newBuilder() @@ -386,6 +401,25 @@ public class DorisMetadataApplierITCase extends DorisSinkTestBase { assertEqualsInOrder(expected, actual); } + @Test + public void testDorisAlterColumnTypeWithDefaultValue() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + runJobWithEvents(generateAlterColumnTypeWithDefaultValueEvents(tableId)); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "number | DOUBLE | Yes | false | 2.71828", + "name | VARCHAR(57) | Yes | false | Alice"); + + assertEqualsInOrder(expected, actual); + } + @Test(expected = JobExecutionException.class) public void testDorisNarrowingAlterColumnType() throws Exception { TableId tableId = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java index 361cad9c5..e060968cc 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisPipelineITCase.java @@ -47,6 +47,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeoutException; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES; @@ -62,6 +63,7 @@ public class DorisPipelineITCase extends DorisSinkTestBase { StreamExecutionEnvironment.getExecutionEnvironment(); private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5; + private static final int DATA_FETCHING_TIMEOUT = 30; @BeforeClass public static void before() { @@ -242,13 +244,21 @@ public class DorisPipelineITCase extends DorisSinkTestBase { env.execute("Values to Doris Sink"); - List actual = fetchTableContent(tableId, 4); - List expected = Arrays.asList( "17 | 6.28 | Doris Day | 2023-01-01 00:00:00", "21 | 1.732 | Disenchanted | 2023-01-01 00:00:00"); - - assertEqualsInAnyOrder(expected, actual); + long timeout = System.currentTimeMillis() + DATA_FETCHING_TIMEOUT * 1000; + + while (System.currentTimeMillis() < timeout) { + List actual = fetchTableContent(tableId, 4); + if (actual.size() < expected.size()) { + Thread.sleep(1000L); + continue; + } + assertEqualsInAnyOrder(expected, actual); + return; + } + throw new TimeoutException("Failed to fetch enough records in time."); } } diff --git a/tools/ci/license_check.rb b/tools/ci/license_check.rb index 3ba68da89..ee7d5de48 100755 --- a/tools/ci/license_check.rb +++ b/tools/ci/license_check.rb @@ -113,7 +113,7 @@ def check_jar_license(jar_file) Zip::File.open(jar_file) do |jar| jar.filter { |e| e.ftype == :file } .filter { |e| !File.basename(e.name).downcase.end_with?(*BINARY_FILE_EXTENSIONS) } - .filter { |e| !File.basename(e.name).downcase.start_with? 'license', 'dependencies', 'notice' } + .filter { |e| !File.basename(e.name).downcase.start_with? 'license', 'dependencies', 'notice', 'third-party' } .filter { |e| EXCEPTION_PACKAGES.none? { |ex| e.name.include? ex } } .map do |e| content = e.get_input_stream.read.force_encoding('UTF-8')