[hotfix][test] Make pipeline migration tests more robust

This closes #3866
pull/3868/head
yuxiqian 2 weeks ago committed by GitHub
parent 2fa215e5c4
commit cf95dcab64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,33 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: root
password: ""
tables: fallen.\.*
server-id: 5400-5500
server-time-zone: UTC
sink:
type: values
name: Values Sink
pipeline:
name: ${PIPELINE_NAME}
parallelism: 1

@ -40,6 +40,12 @@ def ensure_mystery_data(mystery)
throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
end end
def extract_job_id(output)
current_job_id = output.split("\n").filter { _1.start_with?('Job ID: ') }.first&.split&.last
raise StandardError, "Failed to submit Flink job. Output: #{output}" unless current_job_id&.length == 32
current_job_id
end
puts ' Waiting for source to start up...' puts ' Waiting for source to start up...'
next until exec_sql_source("SELECT '1';") == "1\n1\n" next until exec_sql_source("SELECT '1';") == "1\n1\n"
@ -53,8 +59,7 @@ def test_migration_chore(from_version, to_version)
`rm -rf savepoints` `rm -rf savepoints`
# Prepare for current YAML file # Prepare for current YAML file
test_route = !%w[3.0.0 3.0.1].include?(from_version) yaml_job_template_file = 'conf/pipeline-route.yaml'
yaml_job_template_file = test_route ? 'conf/pipeline-route.yaml' : 'conf/pipeline.yaml'
yaml_job_file = 'conf/temp.yaml' yaml_job_file = 'conf/temp.yaml'
yaml_content = File.open(yaml_job_template_file).read.gsub('${PIPELINE_NAME}', yaml_content = File.open(yaml_job_template_file).read.gsub('${PIPELINE_NAME}',
"Pipeline Migration Job (#{from_version} -> #{to_version})") "Pipeline Migration Job (#{from_version} -> #{to_version})")
@ -63,8 +68,7 @@ def test_migration_chore(from_version, to_version)
# Submit current pipeline job file # Submit current pipeline job file
submit_job_output = `bash ./cdc-versions/#{from_version}/bin/flink-cdc.sh --flink-home #{FLINK_HOME} #{yaml_job_file}` submit_job_output = `bash ./cdc-versions/#{from_version}/bin/flink-cdc.sh --flink-home #{FLINK_HOME} #{yaml_job_file}`
puts " #{submit_job_output}" puts " #{submit_job_output}"
current_job_id = submit_job_output.split("\n")[1].split.last current_job_id = extract_job_id(submit_job_output)
raise StandardError, 'Failed to submit Flink job' unless current_job_id.length == 32
puts " Current Job ID: #{current_job_id}" puts " Current Job ID: #{current_job_id}"
@ -81,9 +85,8 @@ def test_migration_chore(from_version, to_version)
# Migrate to a newer CDC version # Migrate to a newer CDC version
puts " Submitting CDC jobs at #{to_version}..." puts " Submitting CDC jobs at #{to_version}..."
submit_job_output = `bash ./cdc-versions/#{to_version}/bin/flink-cdc.sh --from-savepoint #{Dir.pwd}/savepoints/#{savepoint_file} --allow-nonRestored-state --flink-home #{FLINK_HOME} #{yaml_job_file}` submit_job_output = `bash ./cdc-versions/#{to_version}/bin/flink-cdc.sh --from-savepoint #{Dir.pwd}/savepoints/#{savepoint_file} --allow-nonRestored-state --flink-home #{FLINK_HOME} #{yaml_job_file}`
puts " #{submit_job_output}" puts "#{submit_job_output}"
new_job_id = submit_job_output.split("\n")[1].split.last new_job_id = extract_job_id(submit_job_output)
raise StandardError, 'Failed to submit Flink job' unless new_job_id.length == 32
puts " Upgraded Job ID: #{new_job_id}" puts " Upgraded Job ID: #{new_job_id}"
@ -135,7 +138,7 @@ version_list.each_with_index do |old_version, old_index|
result = test_migration old_version, new_version result = test_migration old_version, new_version
version_result[old_version + new_version] = result ? '✅' : '❌' version_result[old_version + new_version] = result ? '✅' : '❌'
@failures << [old_version, new_version] unless result @failures << "#{old_version} => #{new_version}" unless result
end end
if @failures.any? if @failures.any?

Loading…
Cancel
Save