[cdc-connector][debezium] Supports conversion from binary bytes to base64 encoded string (#2435)

This closes #2435.
pull/2799/head
Xin Gong 1 year ago committed by GitHub
parent d1c3f8500b
commit 5f318346ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -324,6 +324,17 @@ Flink SQL> SELECT * FROM orders;
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td>
</tr>
<tr>
<td>debezium.binary.handling.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>debezium.binary.handling.mode 参数可以设置为以下值:
none不进行任何处理直接将二进制数据类型作为字节数组byte array传输。
base64将二进制数据类型转换为 Base64 编码的字符串,然后传输。
hex将二进制数据类型转换为十六进制编码的字符串然后传输。
默认值为 none。根据您的需求和数据类型您可以选择合适的处理模式。如果您的数据库中包含大量二进制数据类型建议使用 base64 或 hex 模式,以便在传输过程中更容易处理。
</tr>
</tbody>
</table>
</div>
@ -709,6 +720,36 @@ $ ./bin/flink run \
* 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
* 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。
### 关于二进制类型数据转换为base64编码数据
```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
binary_data STRING,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test_tb',
'debezium.binary.handling.mode' = 'base64'
);
```
`binary_data`字段, 在数据库中的类型是VARBINARY(N)我们在有些场景需要将二进制数据转换为base64编码的字符串数据可以通过添加参数'debezium.binary.handling.mode' = 'base64'来开启这个功能,
添加此参数的情况下我们就可以在flink sql中将该字段类型映射为`STRING`从而获取base64编码的字符串数据。
数据类型映射
----------------

@ -329,6 +329,17 @@ During a snapshot operation, the connector will query each included table to pro
<td>Boolean</td>
<td>Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.</td>
</tr>
<tr>
<td>debezium.binary.handling.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>debezium.binary.handling.mode can be set to one of the following values:
none: No processing is performed, and the binary data type is transmitted as a byte array (byte array).
base64: The binary data type is converted to a Base64-encoded string and transmitted.
hex: The binary data type is converted to a hexadecimal string and transmitted.
The default value is none. Depending on your requirements and data types, you can choose the appropriate processing mode. If your database contains a large number of binary data types, it is recommended to use base64 or hex mode to make it easier to handle during transmission.</td>
</tr>
</tbody>
</table>
</div>
@ -720,6 +731,36 @@ There are two places that need to be taken care of.
* If no update operation is performed on the specified column, the exactly-once semantics is ensured.
* If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness.
### About converting binary type data to base64 encoded data
```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
binary_data STRING,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test_tb',
'debezium.binary.handling.mode' = 'base64'
);
```
`binary_data` field in the database is of type VARBINARY(N). In some scenarios, we need to convert binary data to base64 encoded string data. This feature can be enabled by adding the parameter 'debezium.binary.handling.mode'='base64',
By adding this parameter, we can map the binary field type to 'STRING' in Flink SQL, thereby obtaining base64 encoded string data.
Data Type Mapping
----------------

@ -47,6 +47,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@ -544,6 +545,10 @@ public final class RowDataDebeziumDeserializeSchema
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else if (dbzObj instanceof String) {
// debezium.binary.handling.mode = base64
String data = (String) dbzObj;
return data.getBytes(StandardCharsets.UTF_8);
} else {
throw new UnsupportedOperationException(
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());

@ -2269,4 +2269,85 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
Thread.sleep(100);
}
}
@Test
public void testBinaryHandlingModeWithBase64() throws Exception {
if (!incrementalSnapshot) {
return;
}
inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE varbinary_base64_table ("
+ " id INT,"
+ " order_id STRING,"
+ " order_date DATE,"
+ " quantity INT,"
+ " product_id INT,"
+ " purchaser STRING,"
+ " PRIMARY KEY(id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s',"
+ " 'debezium.binary.handling.mode' = 'base64'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
TEST_USER,
TEST_PASSWORD,
inventoryDatabase.getDatabaseName(),
"varbinary_base64_table",
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM varbinary_base64_table");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
do {
Thread.sleep(5000L);
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);
CloseableIterator<Row> iterator = result.collect();
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO varbinary_base64_table VALUES "
+ "(6, b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', "
+ "30, 500, 'flink');");
statement.execute(
"INSERT INTO varbinary_base64_table VALUES "
+ "(7, b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', "
+ "30, 500, 'flink-sql');");
statement.execute("UPDATE varbinary_base64_table SET quantity=50 WHERE id=6;");
statement.execute("DELETE FROM varbinary_base64_table WHERE id= 7;");
}
String[] expected =
new String[] {
// snapshot records
"+I[1, BAQEBAQEBAA=, 2021-03-08, 0, 0, flink]",
"+I[2, BAQEBAQEBAE=, 2021-03-08, 10, 100, flink]",
"+I[3, BAQEBAQEBAI=, 2021-03-08, 20, 200, flink]",
"+I[4, BAQEBAQEBAM=, 2021-03-08, 30, 300, flink]",
"+I[5, BAQEBAQEBAQ=, 2021-03-08, 40, 400, flink]",
// binlog records
"+I[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]",
"+I[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]",
"-U[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]",
"+U[6, BAQEBAQEBAU=, 2021-03-08, 50, 500, flink]",
"-D[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}
}

@ -148,4 +148,21 @@ VALUES ('', 0, 'flink'),
('E', 1, 'flink'),
('E', 2, 'flink'),
('e', 4, 'flink'),
('E', 3, 'flink');
('E', 3, 'flink');
CREATE TABLE `varbinary_base64_table`
(
`id` int(11) NOT NULL,
`order_id` varbinary(8) NOT NULL,
`order_date` date NOT NULL,
`quantity` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
`purchaser` varchar(512) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO varbinary_base64_table
VALUES (1, b'0000010000000100000001000000010000000100000001000000010000000000', '2021-03-08', 0, 0, 'flink'),
(2, b'0000010000000100000001000000010000000100000001000000010000000001', '2021-03-08', 10, 100, 'flink'),
(3, b'0000010000000100000001000000010000000100000001000000010000000010', '2021-03-08', 20, 200, 'flink'),
(4, b'0000010000000100000001000000010000000100000001000000010000000011', '2021-03-08', 30, 300, 'flink'),
(5, b'0000010000000100000001000000010000000100000001000000010000000100', '2021-03-08', 40, 400, 'flink');
Loading…
Cancel
Save