Limit the table size to avoid printing too much logs

pull/3656/head
Runkang He 3 months ago
parent f685ed7404
commit 0ffcf6cd9a

@ -32,11 +32,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** The split to describe the binlog of MySql table(s). */ /** The split to describe the binlog of MySql table(s). */
public class MySqlBinlogSplit extends MySqlSplit { public class MySqlBinlogSplit extends MySqlSplit {
private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplit.class); private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplit.class);
private static final int TABLES_LENGTH_FOR_LOG = 3;
private final BinlogOffset startingOffset; private final BinlogOffset startingOffset;
private final BinlogOffset endingOffset; private final BinlogOffset endingOffset;
@ -44,6 +46,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
private final Map<TableId, TableChange> tableSchemas; private final Map<TableId, TableChange> tableSchemas;
private final int totalFinishedSplitSize; private final int totalFinishedSplitSize;
private final boolean isSuspended; private final boolean isSuspended;
private final String tablesForLog;
@Nullable transient byte[] serializedFormCache; @Nullable transient byte[] serializedFormCache;
public MySqlBinlogSplit( public MySqlBinlogSplit(
@ -61,6 +64,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
this.tableSchemas = tableSchemas; this.tableSchemas = tableSchemas;
this.totalFinishedSplitSize = totalFinishedSplitSize; this.totalFinishedSplitSize = totalFinishedSplitSize;
this.isSuspended = isSuspended; this.isSuspended = isSuspended;
this.tablesForLog = getTablesForLog();
} }
public MySqlBinlogSplit( public MySqlBinlogSplit(
@ -77,6 +81,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
this.tableSchemas = tableSchemas; this.tableSchemas = tableSchemas;
this.totalFinishedSplitSize = totalFinishedSplitSize; this.totalFinishedSplitSize = totalFinishedSplitSize;
this.isSuspended = false; this.isSuspended = false;
this.tablesForLog = getTablesForLog();
} }
public BinlogOffset getStartingOffset() { public BinlogOffset getStartingOffset() {
@ -108,14 +113,18 @@ public class MySqlBinlogSplit extends MySqlSplit {
return totalFinishedSplitSize == finishedSnapshotSplitInfos.size(); return totalFinishedSplitSize == finishedSnapshotSplitInfos.size();
} }
public String getTables() { private String getTablesForLog() {
String tables; List<TableId> tablesForLog = new ArrayList<>();
if (tableSchemas != null) { if (tableSchemas != null) {
tables = tableSchemas.keySet().toString(); List<TableId> tableIds = new ArrayList<>(new TreeSet(tableSchemas.keySet()));
} else { // Truncate tables length to avoid printing too much log
tables = "[]"; tablesForLog = tableIds.subList(0, Math.min(tableIds.size(), TABLES_LENGTH_FOR_LOG));
} }
return tables; return tablesForLog.toString();
}
public String getTables() {
return tablesForLog;
} }
@Override @Override
@ -152,13 +161,12 @@ public class MySqlBinlogSplit extends MySqlSplit {
@Override @Override
public String toString() { public String toString() {
String tables = getTables();
return "MySqlBinlogSplit{" return "MySqlBinlogSplit{"
+ "splitId='" + "splitId='"
+ splitId + splitId
+ '\'' + '\''
+ ", tables=" + ", tables="
+ tables + tablesForLog
+ ", offset=" + ", offset="
+ startingOffset + startingOffset
+ ", endOffset=" + ", endOffset="

@ -67,6 +67,8 @@ public class MySqlBinlogSplitTest {
tableSchemas, tableSchemas,
0, 0,
false); false);
String expectedTables = "[catalog1.table1, catalog2.table2]";
Assert.assertEquals(expectedTables, binlogSplit.getTables());
// case 1: only include table1 // case 1: only include table1
Tables.TableFilter currentTableFilter = tableId -> tableId.table().equals("table1"); Tables.TableFilter currentTableFilter = tableId -> tableId.table().equals("table1");
@ -77,6 +79,8 @@ public class MySqlBinlogSplitTest {
mySqlBinlogSplit.getTableSchemas(); mySqlBinlogSplit.getTableSchemas();
Assert.assertEquals(1, filterTableSchemas.size()); Assert.assertEquals(1, filterTableSchemas.size());
Assert.assertEquals(tableChange1, filterTableSchemas.get(tableId1)); Assert.assertEquals(tableChange1, filterTableSchemas.get(tableId1));
String expectedTables1 = "[catalog1.table1]";
Assert.assertEquals(expectedTables1, mySqlBinlogSplit.getTables());
// case 2: include all tables // case 2: include all tables
currentTableFilter = tableId -> tableId.table().startsWith("table"); currentTableFilter = tableId -> tableId.table().startsWith("table");
@ -87,6 +91,61 @@ public class MySqlBinlogSplitTest {
Assert.assertEquals(2, filterTableSchemas.size()); Assert.assertEquals(2, filterTableSchemas.size());
Assert.assertEquals(tableChange1, filterTableSchemas.get(tableId1)); Assert.assertEquals(tableChange1, filterTableSchemas.get(tableId1));
Assert.assertEquals(tableChange2, filterTableSchemas.get(tableId2)); Assert.assertEquals(tableChange2, filterTableSchemas.get(tableId2));
String expectedTables2 = "[catalog1.table1, catalog2.table2]";
Assert.assertEquals(expectedTables2, mySqlBinlogSplit.getTables());
}
@Test
public void testTruncatedTablesForLog() {
Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
// mock table1
TableId tableId1 = new TableId("catalog1", null, "table1");
TableChanges.TableChange tableChange1 =
new TableChanges.TableChange(
TableChanges.TableChangeType.CREATE,
new MockTable(TableId.parse("catalog1.table1")));
// mock table2
TableId tableId2 = new TableId("catalog2", null, "table2");
TableChanges.TableChange tableChange2 =
new TableChanges.TableChange(
TableChanges.TableChangeType.CREATE,
new MockTable(TableId.parse("catalog2.table2")));
// mock table3
TableId tableId3 = new TableId("catalog3", null, "table3");
TableChanges.TableChange tableChange3 =
new TableChanges.TableChange(
TableChanges.TableChangeType.CREATE,
new MockTable(TableId.parse("catalog3.table3")));
// mock table4
TableId tableId4 = new TableId("catalog4", null, "table4");
TableChanges.TableChange tableChange4 =
new TableChanges.TableChange(
TableChanges.TableChangeType.CREATE,
new MockTable(TableId.parse("catalog4.table4")));
tableSchemas.put(tableId1, tableChange1);
tableSchemas.put(tableId2, tableChange2);
tableSchemas.put(tableId3, tableChange3);
tableSchemas.put(tableId4, tableChange4);
MySqlBinlogSplit binlogSplit =
new MySqlBinlogSplit(
"binlog-split",
BinlogOffset.ofLatest(),
null,
new ArrayList<>(),
tableSchemas,
0,
false);
String expectedTables = "[catalog1.table1, catalog2.table2, catalog3.table3]";
Assert.assertEquals(expectedTables, binlogSplit.getTables());
} }
/** A mock implementation for {@link Table} which is used for unit tests. */ /** A mock implementation for {@link Table} which is used for unit tests. */

Loading…
Cancel
Save