@ -39,8 +39,6 @@ import java.sql.DriverManager;
import java.sql.SQLException ;
import java.sql.Statement ;
import java.time.Duration ;
import java.util.Arrays ;
import java.util.List ;
import java.util.concurrent.TimeoutException ;
/** End-to-end tests for mysql cdc pipeline job. */
@ -104,11 +102,12 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1 ",
+ " parallelism: %d ",
INTER_CONTAINER_MYSQL_ALIAS ,
MYSQL_TEST_USER ,
MYSQL_TEST_PASSWORD ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ;
mysqlInventoryDatabase . getDatabaseName ( ) ,
parallelism ) ;
Path mysqlCdcJar = TestUtils . getResource ( "mysql-cdc-pipeline-connector.jar" ) ;
Path valuesCdcJar = TestUtils . getResource ( "values-cdc-pipeline-connector.jar" ) ;
Path mysqlDriverJar = TestUtils . getResource ( "mysql-driver.jar" ) ;
@ -123,54 +122,24 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ) ;
List < String > expectedEvents =
Arrays . asList (
String . format (
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ) ;
validateResult ( expectedEvents ) ;
validateResult (
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}" ,
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ) ;
LOG . info ( "Begin incremental reading stage." ) ;
// generate binlogs
String mysqlJdbcUrl =
@ -212,38 +181,160 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ) ;
expectedEvents =
Arrays . asList (
String . format (
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ,
String . format (
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ) ;
validateResult ( expectedEvents ) ;
validateResult (
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}" ,
"AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}" ) ;
}
@Test
public void testSchemaChangeEvents ( ) throws Exception {
String pipelineJob =
String . format (
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: %d" ,
INTER_CONTAINER_MYSQL_ALIAS ,
MYSQL_TEST_USER ,
MYSQL_TEST_PASSWORD ,
mysqlInventoryDatabase . getDatabaseName ( ) ,
parallelism ) ;
Path mysqlCdcJar = TestUtils . getResource ( "mysql-cdc-pipeline-connector.jar" ) ;
Path valuesCdcJar = TestUtils . getResource ( "values-cdc-pipeline-connector.jar" ) ;
Path mysqlDriverJar = TestUtils . getResource ( "mysql-driver.jar" ) ;
submitPipelineJob ( pipelineJob , mysqlCdcJar , valuesCdcJar , mysqlDriverJar ) ;
waitUntilJobRunning ( Duration . ofSeconds ( 30 ) ) ;
LOG . info ( "Pipeline job is running" ) ;
waitUntilSpecificEvent (
String . format (
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ) ;
waitUntilSpecificEvent (
String . format (
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ) ;
validateResult (
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}" ,
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}" ) ;
LOG . info ( "Begin incremental reading stage." ) ;
// generate binlogs
String mysqlJdbcUrl =
String . format (
"jdbc:mysql://%s:%s/%s" ,
MYSQL . getHost ( ) ,
MYSQL . getDatabasePort ( ) ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ;
try ( Connection conn =
DriverManager . getConnection (
mysqlJdbcUrl , MYSQL_TEST_USER , MYSQL_TEST_PASSWORD ) ;
Statement stat = conn . createStatement ( ) ) {
stat . execute ( "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;" ) ;
stat . execute ( "UPDATE products SET weight='5.1' WHERE id=107;" ) ;
// Perform DDL changes after the binlog is generated
waitUntilSpecificEvent (
String . format (
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ) ;
LOG . info ( "Begin schema evolution stage." ) ;
// Test AddColumnEvent
stat . execute ( "ALTER TABLE products ADD COLUMN new_col INT;" ) ;
stat . execute (
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);" ) ; // 110
stat . execute (
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);" ) ; // 111
stat . execute (
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;" ) ;
stat . execute ( "UPDATE products SET weight='5.17' WHERE id=111;" ) ;
stat . execute ( "DELETE FROM products WHERE id=111;" ) ;
// Test AlterColumnTypeEvent
stat . execute ( "ALTER TABLE products MODIFY COLUMN new_col BIGINT;" ) ;
stat . execute (
"INSERT INTO products VALUES (default,'derrida','forever 21',2.1728, null, null, null, 2147483649);" ) ; // 112
// Test RenameColumnEvent
stat . execute ( "ALTER TABLE products RENAME COLUMN new_col TO new_column;" ) ;
stat . execute (
"INSERT INTO products VALUES (default,'dynazenon','SSSS',2.1728, null, null, null, 2147483649);" ) ; // 113
// Test DropColumnEvent
stat . execute ( "ALTER TABLE products DROP COLUMN new_column;" ) ;
stat . execute (
"INSERT INTO products VALUES (default,'evangelion','Eva',2.1728, null, null, null);" ) ; // 114
// Test TruncateTableEvent
stat . execute ( "TRUNCATE TABLE products;" ) ;
// Test DropTableEvent. It's all over.
stat . execute ( "DROP TABLE products;" ) ;
} catch ( SQLException e ) {
LOG . error ( "Update table for CDC failed." , e ) ;
throw e ;
}
waitUntilSpecificEvent (
String . format (
"DropTableEvent{tableId=%s.products}" ,
mysqlInventoryDatabase . getDatabaseName ( ) ) ) ;
validateResult (
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}" ,
"AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}" ,
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}" ,
"AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}" ,
"RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}" ,
"DropColumnEvent{tableId=%s.products, droppedColumnNames=[new_column]}" ,
"DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}" ,
"TruncateTableEvent{tableId=%s.products}" ,
"DropTableEvent{tableId=%s.products}" ) ;
}
private void validateResult ( List < String > expectedEvents ) throws Exception {
private void validateResult ( String . . . expectedEvents ) throws Exception {
String dbName = mysqlInventoryDatabase . getDatabaseName ( ) ;
for ( String event : expectedEvents ) {
waitUntilSpecificEvent ( event ) ;
waitUntilSpecificEvent ( String. format ( event, dbName , dbName ) ) ;
}
}