[FLINK-34689][MySQL][Feature] check binlog_row_value_options (#3148)

pull/3235/head
L 10 months ago committed by GitHub
parent 1b0b9f868d
commit af7665d338
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -50,6 +50,7 @@ public class MySqlValidator implements Validator {
private static final String BINLOG_FORMAT_ROW = "ROW";
private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL";
private static final String DEFAULT_BINLOG_ROW_VALUE_OPTIONS = "";
private final Properties dbzProperties;
private final MySqlSourceConfig sourceConfig;
@ -70,6 +71,7 @@ public class MySqlValidator implements Validator {
checkVersion(connection);
checkBinlogFormat(connection);
checkBinlogRowImage(connection);
checkBinlogRowValueOptions(connection);
checkTimeZone(connection);
} catch (SQLException ex) {
throw new TableException(
@ -159,6 +161,30 @@ public class MySqlValidator implements Validator {
}
}
/** Check whether the binlog row value options is empty. */
private void checkBinlogRowValueOptions(JdbcConnection connection) throws SQLException {
String rowValueOptions =
connection
.queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'binlog_row_value_options'",
rs ->
rs.next()
? rs.getString(2)
: DEFAULT_BINLOG_ROW_VALUE_OPTIONS)
.trim()
.toUpperCase();
// This setting was introduced in MySQL 8.0+ with default of empty string ''
// For older versions, assume empty string ''
if (!DEFAULT_BINLOG_ROW_VALUE_OPTIONS.equals(rowValueOptions)) {
throw new ValidationException(
String.format(
"The MySQL server is configured with binlog_row_value_options=%s, which is possible to cause losing some binlog events "
+ "for the mysql cdc connector. Please remove the binlog_row_value_options setting in the MySQL server and rerun the job."
+ "See more details at https://dev.mysql.com/doc/refman/8.0/en/replication-features-json.html.",
rowValueOptions));
}
}
/** Check whether the server timezone matches the configured timezone. */
private void checkTimeZone(JdbcConnection connection) throws SQLException {
String timeZoneProperty = dbzProperties.getProperty("database.serverTimezone");

Loading…
Cancel
Save