diff --git a/tools/mig-test/conf/pipeline.yaml b/tools/mig-test/conf/pipeline.yaml deleted file mode 100644 index 290b899b5..000000000 --- a/tools/mig-test/conf/pipeline.yaml +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -source: - type: mysql - name: MySQL Source - hostname: localhost - port: 3306 - username: root - password: "" - tables: fallen.\.* - server-id: 5400-5500 - server-time-zone: UTC - -sink: - type: values - name: Values Sink - -pipeline: - name: ${PIPELINE_NAME} - parallelism: 1 diff --git a/tools/mig-test/run_migration_test.rb b/tools/mig-test/run_migration_test.rb index 614d713ab..d9f8d908e 100644 --- a/tools/mig-test/run_migration_test.rb +++ b/tools/mig-test/run_migration_test.rb @@ -40,6 +40,12 @@ def ensure_mystery_data(mystery) throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery end +def extract_job_id(output) + current_job_id = output.split("\n").filter { _1.start_with?('Job ID: ') }.first&.split&.last + raise StandardError, "Failed to submit Flink job. Output: #{output}" unless current_job_id&.length == 32 + current_job_id +end + puts ' Waiting for source to start up...' next until exec_sql_source("SELECT '1';") == "1\n1\n" @@ -53,8 +59,7 @@ def test_migration_chore(from_version, to_version) `rm -rf savepoints` # Prepare for current YAML file - test_route = !%w[3.0.0 3.0.1].include?(from_version) - yaml_job_template_file = test_route ? 'conf/pipeline-route.yaml' : 'conf/pipeline.yaml' + yaml_job_template_file = 'conf/pipeline-route.yaml' yaml_job_file = 'conf/temp.yaml' yaml_content = File.open(yaml_job_template_file).read.gsub('${PIPELINE_NAME}', "Pipeline Migration Job (#{from_version} -> #{to_version})") @@ -63,8 +68,7 @@ def test_migration_chore(from_version, to_version) # Submit current pipeline job file submit_job_output = `bash ./cdc-versions/#{from_version}/bin/flink-cdc.sh --flink-home #{FLINK_HOME} #{yaml_job_file}` puts " #{submit_job_output}" - current_job_id = submit_job_output.split("\n")[1].split.last - raise StandardError, 'Failed to submit Flink job' unless current_job_id.length == 32 + current_job_id = extract_job_id(submit_job_output) puts " Current Job ID: #{current_job_id}" @@ -81,9 +85,8 @@ def test_migration_chore(from_version, to_version) # Migrate to a newer CDC version puts " Submitting CDC jobs at #{to_version}..." submit_job_output = `bash ./cdc-versions/#{to_version}/bin/flink-cdc.sh --from-savepoint #{Dir.pwd}/savepoints/#{savepoint_file} --allow-nonRestored-state --flink-home #{FLINK_HOME} #{yaml_job_file}` - puts " #{submit_job_output}" - new_job_id = submit_job_output.split("\n")[1].split.last - raise StandardError, 'Failed to submit Flink job' unless new_job_id.length == 32 + puts "#{submit_job_output}" + new_job_id = extract_job_id(submit_job_output) puts " Upgraded Job ID: #{new_job_id}" @@ -135,7 +138,7 @@ version_list.each_with_index do |old_version, old_index| result = test_migration old_version, new_version version_result[old_version + new_version] = result ? '✅' : '❌' - @failures << [old_version, new_version] unless result + @failures << "#{old_version} => #{new_version}" unless result end if @failures.any?