[tidb] Fix unstable TiDB region changed test. (#1702)

Co-authored-by: gongzhongqiang <gongzhongqiang@gigacloudtech.com>
pull/1710/head
gongzhongqiang 2 years ago committed by GitHub
parent 086634666f
commit d8542cdec0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -47,37 +47,6 @@ under the License.
<version>${tikv.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.35.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.35.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.35.0</version>
</dependency>
<!-- Logging API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- test dependencies on Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
@ -99,6 +68,7 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>

@ -20,7 +20,6 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
import org.tikv.kvproto.Cdcpb.Event.Row;
import java.io.Serializable;
@ -35,6 +34,6 @@ import java.io.Serializable;
public interface TiKVChangeEventDeserializationSchema<T>
extends Serializable, ResultTypeQueryable<T> {
/** Deserialize the Debezium record, it is represented in Kafka {@link SourceRecord}. */
/** Deserialize the TiDB record. */
void deserialize(Row record, Collector<T> out) throws Exception;
}

@ -20,7 +20,6 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
import org.tikv.kvproto.Kvrpcpb.KvPair;
import java.io.Serializable;
@ -35,6 +34,6 @@ import java.io.Serializable;
public interface TiKVSnapshotEventDeserializationSchema<T>
extends Serializable, ResultTypeQueryable<T> {
/** Deserialize the Debezium record, it is represented in Kafka {@link SourceRecord}. */
/** Deserialize the TiDB record. */
void deserialize(KvPair record, Collector<T> out) throws Exception;
}

@ -80,22 +80,19 @@ public class TiDBConnectorRegionITCase extends TiDBTestBase {
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
// + " 'sink-expected-messages-num' = '121010'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
// Don't wait for snapshot finished is for the scene in issue
// https://github.com/ververica/flink-cdc-connectors/issues/1206 .
// waitForSinkSize("sink", 1);
waitForSinkSize("sink", 1);
int count = 0;
try (Connection connection = getJdbcConnection("region_switch_test");
Statement statement = connection.createStatement()) {
for (int i = 0; i < 20; i++) {
for (int i = 0; i < 15; i++) {
statement.execute(
"INSERT INTO t1 SELECT NULL, FLOOR(RAND()*1000), RANDOM_BYTES(1024), RANDOM_BYTES"
+ "(1024), RANDOM_BYTES(1024) FROM t1 a JOIN t1 b JOIN t1 c LIMIT 10000;");

Loading…
Cancel
Save