[FLINK-36285][doris] Fix unable to alter column type without default value specified

This closes #3691.
pull/3535/merge
yuxiqian 3 months ago committed by GitHub
parent 4bd3a2347b
commit 8e6c361f96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -27,7 +27,7 @@ limitations under the License.
<name>flink-cdc-pipeline-connector-doris</name> <name>flink-cdc-pipeline-connector-doris</name>
<properties> <properties>
<doris.connector.version>1.6.2</doris.connector.version> <doris.connector.version>24.0.1</doris.connector.version>
</properties> </properties>
<dependencies> <dependencies>

@ -193,6 +193,21 @@ public class DorisMetadataApplierITCase extends DorisSinkTestBase {
tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19)))); tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19))));
} }
private List<Event> generateAlterColumnTypeWithDefaultValueEvents(TableId tableId) {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null, "2.71828"))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null, "Alice"))
.primaryKey("id")
.build();
return Arrays.asList(
new CreateTableEvent(tableId, schema),
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("name", DataTypes.VARCHAR(19))));
}
private List<Event> generateNarrowingAlterColumnTypeEvents(TableId tableId) { private List<Event> generateNarrowingAlterColumnTypeEvents(TableId tableId) {
Schema schema = Schema schema =
Schema.newBuilder() Schema.newBuilder()
@ -386,6 +401,25 @@ public class DorisMetadataApplierITCase extends DorisSinkTestBase {
assertEqualsInOrder(expected, actual); assertEqualsInOrder(expected, actual);
} }
@Test
public void testDorisAlterColumnTypeWithDefaultValue() throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
runJobWithEvents(generateAlterColumnTypeWithDefaultValueEvents(tableId));
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
Arrays.asList(
"id | INT | Yes | true | null",
"number | DOUBLE | Yes | false | 2.71828",
"name | VARCHAR(57) | Yes | false | Alice");
assertEqualsInOrder(expected, actual);
}
@Test(expected = JobExecutionException.class) @Test(expected = JobExecutionException.class)
public void testDorisNarrowingAlterColumnType() throws Exception { public void testDorisNarrowingAlterColumnType() throws Exception {
TableId tableId = TableId tableId =

@ -47,6 +47,7 @@ import java.time.Instant;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
@ -62,6 +63,7 @@ public class DorisPipelineITCase extends DorisSinkTestBase {
StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment.getExecutionEnvironment();
private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5; private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5;
private static final int DATA_FETCHING_TIMEOUT = 30;
@BeforeClass @BeforeClass
public static void before() { public static void before() {
@ -242,13 +244,21 @@ public class DorisPipelineITCase extends DorisSinkTestBase {
env.execute("Values to Doris Sink"); env.execute("Values to Doris Sink");
List<String> actual = fetchTableContent(tableId, 4);
List<String> expected = List<String> expected =
Arrays.asList( Arrays.asList(
"17 | 6.28 | Doris Day | 2023-01-01 00:00:00", "17 | 6.28 | Doris Day | 2023-01-01 00:00:00",
"21 | 1.732 | Disenchanted | 2023-01-01 00:00:00"); "21 | 1.732 | Disenchanted | 2023-01-01 00:00:00");
long timeout = System.currentTimeMillis() + DATA_FETCHING_TIMEOUT * 1000;
while (System.currentTimeMillis() < timeout) {
List<String> actual = fetchTableContent(tableId, 4);
if (actual.size() < expected.size()) {
Thread.sleep(1000L);
continue;
}
assertEqualsInAnyOrder(expected, actual); assertEqualsInAnyOrder(expected, actual);
return;
}
throw new TimeoutException("Failed to fetch enough records in time.");
} }
} }

@ -113,7 +113,7 @@ def check_jar_license(jar_file)
Zip::File.open(jar_file) do |jar| Zip::File.open(jar_file) do |jar|
jar.filter { |e| e.ftype == :file } jar.filter { |e| e.ftype == :file }
.filter { |e| !File.basename(e.name).downcase.end_with?(*BINARY_FILE_EXTENSIONS) } .filter { |e| !File.basename(e.name).downcase.end_with?(*BINARY_FILE_EXTENSIONS) }
.filter { |e| !File.basename(e.name).downcase.start_with? 'license', 'dependencies', 'notice' } .filter { |e| !File.basename(e.name).downcase.start_with? 'license', 'dependencies', 'notice', 'third-party' }
.filter { |e| EXCEPTION_PACKAGES.none? { |ex| e.name.include? ex } } .filter { |e| EXCEPTION_PACKAGES.none? { |ex| e.name.include? ex } }
.map do |e| .map do |e|
content = e.get_input_stream.read.force_encoding('UTF-8') content = e.get_input_stream.read.force_encoding('UTF-8')

Loading…
Cancel
Save