[mysql-cdc] Supports MYSQL_TYPE_TYPED_ARRAY column type when parsing the table map event
This closes #2001pull/2203/head
parent
892f2d6258
commit
bb60bfede4
@ -0,0 +1,92 @@
|
||||
/*
|
||||
* Copyright 2022 Ververica Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.github.shyiko.mysql.binlog.event.deserialization;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Copied from mysql-binlog-connector 0.25.3 to support MYSQL_TYPE_TYPED_ARRAY.
|
||||
*
|
||||
* <p>Line 57: Add support for mysql data type: MYSQL_TYPE_TYPED_ARRAY. Its type code is changed to
|
||||
* 20 in <a
|
||||
* href="https://github.com/mysql/mysql-server/commit/9082b6a820f3948fd563cc32a050f5e8775f2855">MySql
|
||||
* Bug#29948925</a> since mysql 8.0.18+.
|
||||
*
|
||||
* <p>Remove this file once <a
|
||||
* href="https://github.com/osheroff/mysql-binlog-connector-java/issues/104">mysql-binlog-connector-java#104</a>
|
||||
* fixed.
|
||||
*/
|
||||
public enum ColumnType {
|
||||
DECIMAL(0),
|
||||
TINY(1),
|
||||
SHORT(2),
|
||||
LONG(3),
|
||||
FLOAT(4),
|
||||
DOUBLE(5),
|
||||
NULL(6),
|
||||
TIMESTAMP(7),
|
||||
LONGLONG(8),
|
||||
INT24(9),
|
||||
DATE(10),
|
||||
TIME(11),
|
||||
DATETIME(12),
|
||||
YEAR(13),
|
||||
NEWDATE(14),
|
||||
VARCHAR(15),
|
||||
BIT(16),
|
||||
// (TIMESTAMP|DATETIME|TIME)_V2 data types appeared in MySQL 5.6.4
|
||||
// @see http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
|
||||
TIMESTAMP_V2(17),
|
||||
DATETIME_V2(18),
|
||||
TIME_V2(19),
|
||||
TYPED_ARRAY(20),
|
||||
JSON(245),
|
||||
NEWDECIMAL(246),
|
||||
ENUM(247),
|
||||
SET(248),
|
||||
TINY_BLOB(249),
|
||||
MEDIUM_BLOB(250),
|
||||
LONG_BLOB(251),
|
||||
BLOB(252),
|
||||
VAR_STRING(253),
|
||||
STRING(254),
|
||||
GEOMETRY(255);
|
||||
|
||||
private int code;
|
||||
|
||||
private ColumnType(int code) {
|
||||
this.code = code;
|
||||
}
|
||||
|
||||
public int getCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
private static final Map<Integer, ColumnType> INDEX_BY_CODE;
|
||||
|
||||
static {
|
||||
INDEX_BY_CODE = new HashMap<Integer, ColumnType>();
|
||||
for (ColumnType columnType : values()) {
|
||||
INDEX_BY_CODE.put(columnType.code, columnType);
|
||||
}
|
||||
}
|
||||
|
||||
public static ColumnType byCode(int code) {
|
||||
return INDEX_BY_CODE.get(code);
|
||||
}
|
||||
}
|
@ -0,0 +1,136 @@
|
||||
/*
|
||||
* Copyright 2022 Ververica Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.github.shyiko.mysql.binlog.event.deserialization;
|
||||
|
||||
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
|
||||
import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata;
|
||||
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.TYPED_ARRAY;
|
||||
|
||||
/**
|
||||
* Copied from mysql-binlog-connector 0.25.3 to support MYSQL_TYPE_TYPED_ARRAY.
|
||||
*
|
||||
* <p>Line 93 ~ 98: process MYSQL_TYPE_TYPED_ARRAY metadata, imitated the code in canal <a
|
||||
* href="https://github.com/alibaba/canal/blob/master/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java#L546">TableMapLogEvent#decodeFields</a>.
|
||||
*
|
||||
* <p>Remove this file once <a
|
||||
* href="https://github.com/osheroff/mysql-binlog-connector-java/issues/104">mysql-binlog-connector-java#104</a>
|
||||
* fixed.
|
||||
*/
|
||||
public class TableMapEventDataDeserializer implements EventDataDeserializer<TableMapEventData> {
|
||||
|
||||
private final TableMapEventMetadataDeserializer metadataDeserializer =
|
||||
new TableMapEventMetadataDeserializer();
|
||||
|
||||
@Override
|
||||
public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
|
||||
TableMapEventData eventData = new TableMapEventData();
|
||||
eventData.setTableId(inputStream.readLong(6));
|
||||
inputStream.skip(3); // 2 bytes reserved for future use + 1 for the length of database name
|
||||
eventData.setDatabase(inputStream.readZeroTerminatedString());
|
||||
inputStream.skip(1); // table name
|
||||
eventData.setTable(inputStream.readZeroTerminatedString());
|
||||
int numberOfColumns = inputStream.readPackedInteger();
|
||||
eventData.setColumnTypes(inputStream.read(numberOfColumns));
|
||||
inputStream.readPackedInteger(); // metadata length
|
||||
eventData.setColumnMetadata(readMetadata(inputStream, eventData.getColumnTypes()));
|
||||
eventData.setColumnNullability(inputStream.readBitSet(numberOfColumns, true));
|
||||
int metadataLength = inputStream.available();
|
||||
TableMapEventMetadata metadata = null;
|
||||
if (metadataLength > 0) {
|
||||
metadata =
|
||||
metadataDeserializer.deserialize(
|
||||
new ByteArrayInputStream(inputStream.read(metadataLength)),
|
||||
eventData.getColumnTypes().length,
|
||||
numericColumnCount(eventData.getColumnTypes()));
|
||||
}
|
||||
eventData.setEventMetadata(metadata);
|
||||
return eventData;
|
||||
}
|
||||
|
||||
private int numericColumnCount(byte[] types) {
|
||||
int count = 0;
|
||||
for (int i = 0; i < types.length; i++) {
|
||||
switch (ColumnType.byCode(types[i] & 0xff)) {
|
||||
case TINY:
|
||||
case SHORT:
|
||||
case INT24:
|
||||
case LONG:
|
||||
case LONGLONG:
|
||||
case NEWDECIMAL:
|
||||
case FLOAT:
|
||||
case DOUBLE:
|
||||
count++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
private int[] readMetadata(ByteArrayInputStream inputStream, byte[] columnTypes)
|
||||
throws IOException {
|
||||
int[] metadata = new int[columnTypes.length];
|
||||
for (int i = 0; i < columnTypes.length; i++) {
|
||||
ColumnType columnType = ColumnType.byCode(columnTypes[i] & 0xFF);
|
||||
if (columnType == TYPED_ARRAY) {
|
||||
byte[] arrayType = inputStream.read(1);
|
||||
columnType = ColumnType.byCode(arrayType[0] & 0xFF);
|
||||
}
|
||||
switch (columnType) {
|
||||
case FLOAT:
|
||||
case DOUBLE:
|
||||
case BLOB:
|
||||
case JSON:
|
||||
case GEOMETRY:
|
||||
metadata[i] = inputStream.readInteger(1);
|
||||
break;
|
||||
case BIT:
|
||||
case VARCHAR:
|
||||
case NEWDECIMAL:
|
||||
metadata[i] = inputStream.readInteger(2);
|
||||
break;
|
||||
case SET:
|
||||
case ENUM:
|
||||
case STRING:
|
||||
metadata[i] = bigEndianInteger(inputStream.read(2), 0, 2);
|
||||
break;
|
||||
case TIME_V2:
|
||||
case DATETIME_V2:
|
||||
case TIMESTAMP_V2:
|
||||
metadata[i] = inputStream.readInteger(1); // fsp (@see {@link ColumnType})
|
||||
break;
|
||||
default:
|
||||
metadata[i] = 0;
|
||||
}
|
||||
}
|
||||
return metadata;
|
||||
}
|
||||
|
||||
private static int bigEndianInteger(byte[] bytes, int offset, int length) {
|
||||
int result = 0;
|
||||
for (int i = offset; i < (offset + length); i++) {
|
||||
byte b = bytes[i];
|
||||
result = (result << 8) | (b >= 0 ? (int) b : (b + 256));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
@ -0,0 +1,126 @@
|
||||
/*
|
||||
* Copyright 2022 Ververica Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.github.shyiko.mysql.binlog.event.deserialization;
|
||||
|
||||
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
|
||||
import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata;
|
||||
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.BitSet;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/** Tests for the copied class {@link TableMapEventDataDeserializer}. */
|
||||
public class TableMapEventDataDeserializerTest {
|
||||
@Test
|
||||
public void testDeserialize() throws IOException {
|
||||
TableMapEventDataDeserializer deserializer = new TableMapEventDataDeserializer();
|
||||
// The Table_map_event data. See its format at
|
||||
// https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Table__map__event.html
|
||||
byte[] data = {
|
||||
// table_id : 6 bytes
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
// flags : 2 bytes
|
||||
1,
|
||||
0,
|
||||
// database_name string length : 1 byte
|
||||
6,
|
||||
// database_name null-terminated string, end with 0
|
||||
116,
|
||||
101,
|
||||
115,
|
||||
116,
|
||||
68,
|
||||
98,
|
||||
0,
|
||||
// table_name string length : 1 byte
|
||||
9,
|
||||
// table_name null-terminated string, end with 0
|
||||
116,
|
||||
101,
|
||||
115,
|
||||
116,
|
||||
84,
|
||||
97,
|
||||
98,
|
||||
108,
|
||||
101,
|
||||
0,
|
||||
// column_count
|
||||
3,
|
||||
// column_type list
|
||||
8,
|
||||
1,
|
||||
20,
|
||||
// metadata_length
|
||||
1,
|
||||
// metadata
|
||||
8,
|
||||
// null_bits
|
||||
80,
|
||||
// optional metadata fields stored in Type, Length, Value(TLV) format.
|
||||
// Type takes 1 byte. Length is a packed integer value. Values takes Length bytes.
|
||||
|
||||
// SIGNEDNESS
|
||||
1,
|
||||
1,
|
||||
0,
|
||||
// DEFAULT_CHARSET
|
||||
2,
|
||||
1,
|
||||
45
|
||||
};
|
||||
TableMapEventData eventData = deserializer.deserialize(new ByteArrayInputStream(data));
|
||||
assertThat(eventData.toString()).isEqualTo(getExpectedEventData().toString());
|
||||
}
|
||||
|
||||
private TableMapEventData getExpectedEventData() {
|
||||
TableMapEventData eventData = new TableMapEventData();
|
||||
// table_id
|
||||
eventData.setTableId(1);
|
||||
// database_name
|
||||
eventData.setDatabase("testDb");
|
||||
// table_name
|
||||
eventData.setTable("testTable");
|
||||
|
||||
// column_type
|
||||
// 3 column types: MYSQL_TYPE_LONGLONG, MYSQL_TYPE_TINY, MYSQL_TYPE_TYPED_ARRAY<LONGLONG>
|
||||
eventData.setColumnTypes(new byte[] {8, 1, 20});
|
||||
|
||||
// metadata of the column types
|
||||
eventData.setColumnMetadata(new int[] {0, 0, 0});
|
||||
|
||||
// null_bits
|
||||
eventData.setColumnNullability(new BitSet());
|
||||
|
||||
// optional metadata fields
|
||||
TableMapEventMetadata metadata = new TableMapEventMetadata();
|
||||
metadata.setSignedness(new BitSet());
|
||||
TableMapEventMetadata.DefaultCharset charset = new TableMapEventMetadata.DefaultCharset();
|
||||
charset.setDefaultCharsetCollation(45);
|
||||
metadata.setDefaultCharset(charset);
|
||||
eventData.setEventMetadata(metadata);
|
||||
return eventData;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue