[tidb] Fix data lost when region changed (#1632)

* [tidb] Fix data lost when region changed.

* [tidb] Add tidb cdc region change unit test and polish TiKVRichParallelSourceFunction.

Co-authored-by: liangzhili2 <liangzhili2@joyy.com>
Co-authored-by: gongzhongqiang <gongzhongqiang@gigacloudtech.com>
pull/1690/head
leozlliang 2 years ago committed by GitHub
parent aee88f947c
commit 1111f0270e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -30,6 +30,8 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ververica.cdc.connectors.tidb.table.StartupMode;
import com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils;
import org.slf4j.Logger;
@ -48,6 +50,12 @@ import org.tikv.txn.KVClient;
import java.util.List;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* The source implementation for TiKV that read snapshot events first and then read the change
@ -68,19 +76,26 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
private final String database;
private final String tableName;
// Task local variables
/** Task local variables. */
private transient TiSession session = null;
private transient Coprocessor.KeyRange keyRange = null;
private transient CDCClient cdcClient = null;
private transient SourceContext<T> sourceContext = null;
private transient volatile long resolvedTs = -1L;
private transient TreeMap<RowKeyWithTs, Cdcpb.Event.Row> prewrites = null;
private transient TreeMap<RowKeyWithTs, Cdcpb.Event.Row> commits = null;
private transient BlockingQueue<Cdcpb.Event.Row> committedEvents = null;
private transient OutputCollector<T> outputCollector;
// offset state
private transient boolean running = true;
private transient ExecutorService executorService;
/** offset state. */
private transient ListState<Long> offsetState;
private static final long CLOSE_TIMEOUT = 30L;
public TiKVRichParallelSourceFunction(
TiKVSnapshotEventDeserializationSchema<T> snapshotEventDeserializationSchema,
TiKVChangeEventDeserializationSchema<T> changeEventDeserializationSchema,
@ -114,11 +129,22 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
cdcClient = new CDCClient(session, keyRange);
prewrites = new TreeMap<>();
commits = new TreeMap<>();
// cdc event will lose if pull cdc event block when region split
// use queue to separate read and write to ensure pull event unblock.
// since sink jdbc is slow, 5000W queue size may be safe size.
committedEvents = new LinkedBlockingQueue<>();
outputCollector = new OutputCollector<>();
resolvedTs =
startupMode == StartupMode.INITIAL
? SNAPSHOT_VERSION_EPOCH
: STREAMING_VERSION_START_EPOCH;
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat(
"tidb-source-function-"
+ getRuntimeContext().getIndexOfThisSubtask())
.build();
executorService = Executors.newSingleThreadExecutor(threadFactory);
}
@Override
@ -137,6 +163,7 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
LOG.info("start read change events");
cdcClient.start(resolvedTs);
running = true;
readChangeEvents();
}
@ -167,9 +194,8 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
protected void readSnapshotEvents() throws Exception {
LOG.info("read snapshot events");
final KVClient scanClient = session.createKVClient();
long startTs = session.getTimestamp().getVersion();
try {
try (KVClient scanClient = session.createKVClient()) {
long startTs = session.getTimestamp().getVersion();
ByteString start = keyRange.getStart();
while (true) {
final List<Kvrpcpb.KvPair> segment =
@ -191,13 +217,24 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
.next()
.toByteString();
}
} finally {
scanClient.close();
}
}
protected void readChangeEvents() throws Exception {
LOG.info("read change event from resolvedTs:{}", resolvedTs);
// child thread to sink committed rows.
executorService.execute(
() -> {
while (running) {
try {
Cdcpb.Event.Row committedRow = committedEvents.take();
changeEventDeserializationSchema.deserialize(
committedRow, outputCollector);
} catch (Exception e) {
e.printStackTrace();
}
}
});
while (resolvedTs >= STREAMING_VERSION_START_EPOCH) {
for (int i = 0; i < 1000; i++) {
final Cdcpb.Event.Row row = cdcClient.get();
@ -220,7 +257,8 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
final Cdcpb.Event.Row commitRow = commits.pollFirstEntry().getValue();
final Cdcpb.Event.Row prewriteRow =
prewrites.remove(RowKeyWithTs.ofStart(commitRow));
changeEventDeserializationSchema.deserialize(prewriteRow, outputCollector);
// if pull cdc event block when region split, cdc event will lose.
committedEvents.offer(prewriteRow);
}
}
}
@ -228,9 +266,18 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
@Override
public void cancel() {
try {
running = false;
if (cdcClient != null) {
cdcClient.close();
}
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.warn(
"Failed to close the tidb source function in {} seconds.",
CLOSE_TIMEOUT);
}
}
} catch (final Exception e) {
LOG.error("Unable to close cdcClient", e);
}
@ -293,14 +340,6 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
this(timestamp, RowKey.decode(key));
}
public long getTimestamp() {
return timestamp;
}
public RowKey getRowKey() {
return rowKey;
}
@Override
public int compareTo(final RowKeyWithTs that) {
int res = Long.compare(this.timestamp, that.timestamp);

@ -0,0 +1,250 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.cdc;
import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
import org.apache.flink.shaded.guava30.com.google.common.collect.Range;
import org.apache.flink.shaded.guava30.com.google.common.collect.TreeMultiset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiSession;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.RangeSplitter;
import org.tikv.common.util.RangeSplitter.RegionTask;
import org.tikv.kvproto.Cdcpb.Event.Row;
import org.tikv.kvproto.Coprocessor.KeyRange;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.shade.io.grpc.ManagedChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
/**
* Copied from https://github.com/tikv/client-java project to fix
* https://github.com/tikv/client-java/issues/600 for 3.2.0 version.
*/
public class CDCClient implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(CDCClient.class);
private final TiSession session;
private final KeyRange keyRange;
private final CDCConfig config;
private final BlockingQueue<CDCEvent> eventsBuffer;
private final ConcurrentHashMap<Long, RegionCDCClient> regionClients =
new ConcurrentHashMap<>();
private final Map<Long, Long> regionToResolvedTs = new HashMap<>();
private final TreeMultiset<Long> resolvedTsSet = TreeMultiset.create();
private boolean started = false;
private Consumer<CDCEvent> eventConsumer;
public CDCClient(final TiSession session, final KeyRange keyRange) {
this(session, keyRange, new CDCConfig());
}
public CDCClient(final TiSession session, final KeyRange keyRange, final CDCConfig config) {
Preconditions.checkState(
session.getConf().getIsolationLevel().equals(Kvrpcpb.IsolationLevel.SI),
"Unsupported Isolation Level"); // only support SI for now
this.session = session;
this.keyRange = keyRange;
this.config = config;
eventsBuffer = new LinkedBlockingQueue<>(config.getEventBufferSize());
// fix: use queue.put() instead of queue.offer(), otherwise will lose event
eventConsumer =
(event) -> {
// try 2 times offer.
for (int i = 0; i < 2; i++) {
if (eventsBuffer.offer(event)) {
return;
}
}
// else use put.
try {
eventsBuffer.put(event);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}
public synchronized void start(final long startTs) {
Preconditions.checkState(!started, "Client is already started");
applyKeyRange(keyRange, startTs);
started = true;
}
public synchronized Row get() throws InterruptedException {
final CDCEvent event = eventsBuffer.poll();
if (event != null) {
switch (event.eventType) {
case ROW:
return event.row;
case RESOLVED_TS:
handleResolvedTs(event.regionId, event.resolvedTs);
break;
case ERROR:
handleErrorEvent(event.regionId, event.error, event.resolvedTs);
break;
}
}
return null;
}
public synchronized long getMinResolvedTs() {
return resolvedTsSet.firstEntry().getElement();
}
public synchronized void close() {
removeRegions(regionClients.keySet());
}
private synchronized void applyKeyRange(final KeyRange keyRange, final long timestamp) {
final RangeSplitter splitter = RangeSplitter.newSplitter(session.getRegionManager());
final Iterator<TiRegion> newRegionsIterator =
splitter.splitRangeByRegion(Arrays.asList(keyRange)).stream()
.map(RegionTask::getRegion)
.sorted((a, b) -> Long.compare(a.getId(), b.getId()))
.iterator();
final Iterator<RegionCDCClient> oldRegionsIterator = regionClients.values().iterator();
final ArrayList<TiRegion> regionsToAdd = new ArrayList<>();
final ArrayList<Long> regionsToRemove = new ArrayList<>();
TiRegion newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null;
RegionCDCClient oldRegionClient =
oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
while (newRegion != null && oldRegionClient != null) {
if (newRegion.getId() == oldRegionClient.getRegion().getId()) {
// check if should refresh region
if (!oldRegionClient.isRunning()) {
regionsToRemove.add(newRegion.getId());
regionsToAdd.add(newRegion);
}
newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null;
oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
} else if (newRegion.getId() < oldRegionClient.getRegion().getId()) {
regionsToAdd.add(newRegion);
newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null;
} else {
regionsToRemove.add(oldRegionClient.getRegion().getId());
oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
}
}
while (newRegion != null) {
regionsToAdd.add(newRegion);
newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null;
}
while (oldRegionClient != null) {
regionsToRemove.add(oldRegionClient.getRegion().getId());
oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null;
}
removeRegions(regionsToRemove);
addRegions(regionsToAdd, timestamp);
LOGGER.info("keyRange applied");
}
private synchronized void addRegions(final Iterable<TiRegion> regions, final long timestamp) {
LOGGER.info("add regions: {}, timestamp: {}", regions, timestamp);
for (final TiRegion region : regions) {
if (overlapWithRegion(region)) {
final String address =
session.getRegionManager()
.getStoreById(region.getLeader().getStoreId())
.getStore()
.getAddress();
final ManagedChannel channel =
session.getChannelFactory()
.getChannel(address, session.getPDClient().getHostMapping());
try {
final RegionCDCClient client =
new RegionCDCClient(region, keyRange, channel, eventConsumer, config);
regionClients.put(region.getId(), client);
regionToResolvedTs.put(region.getId(), timestamp);
resolvedTsSet.add(timestamp);
client.start(timestamp);
} catch (final Exception e) {
LOGGER.error(
"failed to add region(regionId: {}, reason: {})", region.getId(), e);
throw new RuntimeException(e);
}
}
}
}
private synchronized void removeRegions(final Iterable<Long> regionIds) {
LOGGER.info("remove regions: {}", regionIds);
for (final long regionId : regionIds) {
final RegionCDCClient regionClient = regionClients.remove(regionId);
if (regionClient != null) {
try {
regionClient.close();
} catch (final Exception e) {
LOGGER.error(
"failed to close region client, region id: {}, error: {}", regionId, e);
} finally {
resolvedTsSet.remove(regionToResolvedTs.remove(regionId));
regionToResolvedTs.remove(regionId);
}
}
}
}
private boolean overlapWithRegion(final TiRegion region) {
final Range<Key> regionRange =
Range.closedOpen(
Key.toRawKey(region.getStartKey()), Key.toRawKey(region.getEndKey()));
final Range<Key> clientRange =
Range.closedOpen(
Key.toRawKey(keyRange.getStart()), Key.toRawKey(keyRange.getEnd()));
final Range<Key> intersection = regionRange.intersection(clientRange);
return !intersection.isEmpty();
}
private void handleResolvedTs(final long regionId, final long resolvedTs) {
LOGGER.info("handle resolvedTs: {}, regionId: {}", resolvedTs, regionId);
resolvedTsSet.remove(regionToResolvedTs.replace(regionId, resolvedTs));
resolvedTsSet.add(resolvedTs);
}
public void handleErrorEvent(final long regionId, final Throwable error, long resolvedTs) {
LOGGER.info("handle error: {}, regionId: {}", error, regionId);
final TiRegion region = regionClients.get(regionId).getRegion();
session.getRegionManager()
.onRequestFail(region); // invalidate cache for corresponding region
removeRegions(Arrays.asList(regionId));
applyKeyRange(keyRange, resolvedTs); // reapply the whole keyRange
}
}

@ -0,0 +1,85 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.cdc;
import org.tikv.kvproto.Cdcpb.Event.Row;
class CDCEvent {
enum CDCEventType {
ROW,
RESOLVED_TS,
ERROR
}
public final long regionId;
public final CDCEventType eventType;
public final long resolvedTs;
public final Row row;
public final Throwable error;
private CDCEvent(
final long regionId,
final CDCEventType eventType,
final long resolvedTs,
final Row row,
final Throwable error) {
this.regionId = regionId;
this.eventType = eventType;
this.resolvedTs = resolvedTs;
this.row = row;
this.error = error;
}
public static CDCEvent rowEvent(final long regionId, final Row row) {
return new CDCEvent(regionId, CDCEventType.ROW, 0, row, null);
}
public static CDCEvent resolvedTsEvent(final long regionId, final long resolvedTs) {
return new CDCEvent(regionId, CDCEventType.RESOLVED_TS, resolvedTs, null, null);
}
public static CDCEvent error(final long regionId, final Throwable error) {
return new CDCEvent(regionId, CDCEventType.ERROR, 0, null, error);
}
// add new CDCEvent constructor
public static CDCEvent error(final long regionId, final Throwable error, long resolvedTs) {
return new CDCEvent(regionId, CDCEventType.ERROR, resolvedTs, null, error);
}
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("CDCEvent[").append(eventType.toString()).append("] {");
switch (eventType) {
case ERROR:
builder.append("error=").append(error.getMessage());
break;
case RESOLVED_TS:
builder.append("resolvedTs=").append(resolvedTs);
break;
case ROW:
builder.append("row=").append(row);
break;
}
return builder.append("}").toString();
}
}

@ -0,0 +1,261 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.cdc;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.FastByteComparisons;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Cdcpb.ChangeDataEvent;
import org.tikv.kvproto.Cdcpb.ChangeDataRequest;
import org.tikv.kvproto.Cdcpb.Event.LogType;
import org.tikv.kvproto.Cdcpb.Event.Row;
import org.tikv.kvproto.Cdcpb.Header;
import org.tikv.kvproto.Cdcpb.ResolvedTs;
import org.tikv.kvproto.ChangeDataGrpc;
import org.tikv.kvproto.ChangeDataGrpc.ChangeDataStub;
import org.tikv.kvproto.Coprocessor.KeyRange;
import org.tikv.shade.io.grpc.ManagedChannel;
import org.tikv.shade.io.grpc.stub.StreamObserver;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Copied from https://github.com/tikv/client-java project to fix
* https://github.com/tikv/client-java/issues/600 for 3.2.0 version.
*/
public class RegionCDCClient implements AutoCloseable, StreamObserver<ChangeDataEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(RegionCDCClient.class);
private static final AtomicLong REQ_ID_COUNTER = new AtomicLong(0);
private static final Set<LogType> ALLOWED_LOGTYPE =
ImmutableSet.of(LogType.PREWRITE, LogType.COMMIT, LogType.COMMITTED, LogType.ROLLBACK);
private TiRegion region;
private final KeyRange keyRange;
private final KeyRange regionKeyRange;
private final ManagedChannel channel;
private final ChangeDataStub asyncStub;
private final Consumer<CDCEvent> eventConsumer;
private final CDCConfig config;
private final Predicate<Row> rowFilter;
private final AtomicBoolean running = new AtomicBoolean(false);
private final boolean started = false;
private long resolvedTs = 0;
public RegionCDCClient(
final TiRegion region,
final KeyRange keyRange,
final ManagedChannel channel,
final Consumer<CDCEvent> eventConsumer,
final CDCConfig config) {
this.region = region;
this.keyRange = keyRange;
this.channel = channel;
this.asyncStub = ChangeDataGrpc.newStub(channel);
this.eventConsumer = eventConsumer;
this.config = config;
this.regionKeyRange =
KeyRange.newBuilder()
.setStart(region.getStartKey())
.setEnd(region.getEndKey())
.build();
this.rowFilter =
regionEnclosed()
? ((row) -> true)
: new Predicate<Row>() {
final byte[] buffer = new byte[config.getMaxRowKeySize()];
final byte[] start = keyRange.getStart().toByteArray();
final byte[] end = keyRange.getEnd().toByteArray();
@Override
public boolean test(final Row row) {
final int len = row.getKey().size();
row.getKey().copyTo(buffer, 0);
return (FastByteComparisons.compareTo(
buffer, 0, len, start, 0, start.length)
>= 0)
&& (FastByteComparisons.compareTo(
buffer, 0, len, end, 0, end.length)
< 0);
}
};
}
public synchronized void start(final long startTs) {
Preconditions.checkState(!started, "RegionCDCClient has already started");
resolvedTs = startTs;
running.set(true);
LOGGER.info("start streaming region: {}, running: {}", region.getId(), running.get());
final ChangeDataRequest request =
ChangeDataRequest.newBuilder()
.setRequestId(REQ_ID_COUNTER.incrementAndGet())
.setHeader(Header.newBuilder().setTicdcVersion("5.0.0").build())
.setRegionId(region.getId())
.setCheckpointTs(startTs)
.setStartKey(keyRange.getStart())
.setEndKey(keyRange.getEnd())
.setRegionEpoch(region.getRegionEpoch())
.setExtraOp(config.getExtraOp())
.build();
final StreamObserver<ChangeDataRequest> requestObserver = asyncStub.eventFeed(this);
HashMap<String, Object> params = new HashMap<>();
params.put("requestId", request.getRequestId());
params.put("header", request.getHeader());
params.put("regionId", request.getRegionId());
params.put("checkpointTs", request.getCheckpointTs());
params.put("startKey", request.getStartKey().toString());
params.put("endKey", request.getEndKey().toString());
params.put("regionEpoch", request.getRegionEpoch());
params.put("extraOp", request.getExtraOp());
requestObserver.onNext(request);
}
public TiRegion getRegion() {
return region;
}
public void setRegion(TiRegion region) {
this.region = region;
}
public KeyRange getKeyRange() {
return keyRange;
}
public KeyRange getRegionKeyRange() {
return regionKeyRange;
}
public boolean regionEnclosed() {
return KeyRangeUtils.makeRange(keyRange.getStart(), keyRange.getEnd())
.encloses(
KeyRangeUtils.makeRange(
regionKeyRange.getStart(), regionKeyRange.getEnd()));
}
public boolean isRunning() {
return running.get();
}
@Override
public void close() throws Exception {
LOGGER.info("close (region: {})", region.getId());
running.set(false);
// fix: close grpc channel will make client threadpool shutdown.
/*
synchronized (this) {
channel.shutdown();
}
try {
LOGGER.debug("awaitTermination (region: {})", region.getId());
channel.awaitTermination(60, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
LOGGER.error("Failed to shutdown channel(regionId: {})", region.getId());
Thread.currentThread().interrupt();
synchronized (this) {
channel.shutdownNow();
}
}
*/
LOGGER.info("terminated (region: {})", region.getId());
}
@Override
public void onCompleted() {
// should never been called
onError(new IllegalStateException("RegionCDCClient should never complete"));
}
@Override
public void onError(final Throwable error) {
onError(error, this.resolvedTs);
}
private void onError(final Throwable error, long resolvedTs) {
LOGGER.error(
"region CDC error: region: {}, resolvedTs:{}, error: {}",
region.getId(),
resolvedTs,
error);
running.set(false);
eventConsumer.accept(CDCEvent.error(region.getId(), error, resolvedTs));
}
@Override
public void onNext(final ChangeDataEvent event) {
try {
if (running.get()) {
// fix: miss to process error event
onErrorEventHandle(event);
event.getEventsList().stream()
.flatMap(ev -> ev.getEntries().getEntriesList().stream())
.filter(row -> ALLOWED_LOGTYPE.contains(row.getType()))
.filter(this.rowFilter)
.map(row -> CDCEvent.rowEvent(region.getId(), row))
.forEach(this::submitEvent);
if (event.hasResolvedTs()) {
final ResolvedTs resolvedTs = event.getResolvedTs();
this.resolvedTs = resolvedTs.getTs();
if (resolvedTs.getRegionsList().indexOf(region.getId()) >= 0) {
submitEvent(CDCEvent.resolvedTsEvent(region.getId(), resolvedTs.getTs()));
}
}
}
} catch (final Exception e) {
onError(e, resolvedTs);
}
}
// error event handle
private void onErrorEventHandle(final ChangeDataEvent event) {
List<Cdcpb.Event> errorEvents =
event.getEventsList().stream()
.filter(errEvent -> errEvent.hasError())
.collect(Collectors.toList());
if (errorEvents != null && errorEvents.size() > 0) {
onError(
new RuntimeException(
"regionCDC error:" + errorEvents.get(0).getError().toString()),
this.resolvedTs);
}
}
private void submitEvent(final CDCEvent event) {
LOGGER.debug("submit event: {}", event);
eventConsumer.accept(event);
}
}

@ -0,0 +1,189 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.HostMapping;
import org.tikv.common.pd.PDUtils;
import org.tikv.shade.io.grpc.ManagedChannel;
import org.tikv.shade.io.grpc.netty.GrpcSslContexts;
import org.tikv.shade.io.grpc.netty.NettyChannelBuilder;
import org.tikv.shade.io.netty.handler.ssl.SslContext;
import org.tikv.shade.io.netty.handler.ssl.SslContextBuilder;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
import java.security.KeyStore;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* Copied from https://github.com/tikv/client-java project to fix
* https://github.com/tikv/client-java/issues/600 for 3.2.0 version.
*/
public class ChannelFactory implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ChannelFactory.class);
private final int maxFrameSize;
private final int keepaliveTime;
private final int keepaliveTimeout;
private final int idleTimeout;
private final ConcurrentHashMap<String, ManagedChannel> connPool = new ConcurrentHashMap<>();
private final SslContextBuilder sslContextBuilder;
private static final String PUB_KEY_INFRA = "PKIX";
public ChannelFactory(
int maxFrameSize, int keepaliveTime, int keepaliveTimeout, int idleTimeout) {
this.maxFrameSize = maxFrameSize;
this.keepaliveTime = keepaliveTime;
this.keepaliveTimeout = keepaliveTimeout;
this.idleTimeout = idleTimeout;
this.sslContextBuilder = null;
}
public ChannelFactory(
int maxFrameSize,
int keepaliveTime,
int keepaliveTimeout,
int idleTimeout,
String trustCertCollectionFilePath,
String keyCertChainFilePath,
String keyFilePath) {
this.maxFrameSize = maxFrameSize;
this.keepaliveTime = keepaliveTime;
this.keepaliveTimeout = keepaliveTimeout;
this.idleTimeout = idleTimeout;
this.sslContextBuilder =
getSslContextBuilder(
trustCertCollectionFilePath, keyCertChainFilePath, keyFilePath);
}
public ChannelFactory(
int maxFrameSize,
int keepaliveTime,
int keepaliveTimeout,
int idleTimeout,
String jksKeyPath,
String jksKeyPassword,
String jkstrustPath,
String jksTrustPassword) {
this.maxFrameSize = maxFrameSize;
this.keepaliveTime = keepaliveTime;
this.keepaliveTimeout = keepaliveTimeout;
this.idleTimeout = idleTimeout;
this.sslContextBuilder =
getSslContextBuilder(jksKeyPath, jksKeyPassword, jkstrustPath, jksTrustPassword);
}
private SslContextBuilder getSslContextBuilder(
String jksKeyPath,
String jksKeyPassword,
String jksTrustPath,
String jksTrustPassword) {
SslContextBuilder builder = GrpcSslContexts.forClient();
try {
if (jksKeyPath != null && jksKeyPassword != null) {
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(new FileInputStream(jksKeyPath), jksKeyPassword.toCharArray());
KeyManagerFactory keyManagerFactory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, jksKeyPassword.toCharArray());
builder.keyManager(keyManagerFactory);
}
if (jksTrustPath != null && jksTrustPassword != null) {
KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(jksTrustPath), jksTrustPassword.toCharArray());
TrustManagerFactory trustManagerFactory =
TrustManagerFactory.getInstance(PUB_KEY_INFRA);
trustManagerFactory.init(trustStore);
builder.trustManager(trustManagerFactory);
}
} catch (Exception e) {
logger.error("JKS SSL context builder failed!", e);
}
return builder;
}
private SslContextBuilder getSslContextBuilder(
String trustCertCollectionFilePath, String keyCertChainFilePath, String keyFilePath) {
SslContextBuilder builder = GrpcSslContexts.forClient();
if (trustCertCollectionFilePath != null) {
builder.trustManager(new File(trustCertCollectionFilePath));
}
if (keyCertChainFilePath != null && keyFilePath != null) {
builder.keyManager(new File(keyCertChainFilePath), new File(keyFilePath));
}
return builder;
}
public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) {
return connPool.computeIfAbsent(
addressStr,
key -> {
URI address;
URI mappedAddr;
try {
address = PDUtils.addrToUri(key);
} catch (Exception e) {
throw new IllegalArgumentException("failed to form address " + key, e);
}
try {
mappedAddr = hostMapping.getMappedURI(address);
} catch (Exception e) {
throw new IllegalArgumentException(
"failed to get mapped address " + address, e);
}
// Channel should be lazy without actual connection until first call
// So a coarse grain lock is ok here
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(
mappedAddr.getHost(), mappedAddr.getPort())
.maxInboundMessageSize(maxFrameSize)
.keepAliveTime(keepaliveTime, TimeUnit.SECONDS)
.keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.idleTimeout(idleTimeout, TimeUnit.SECONDS);
if (sslContextBuilder == null) {
return builder.usePlaintext().build();
} else {
SslContext sslContext = null;
try {
sslContext = sslContextBuilder.build();
} catch (SSLException e) {
logger.error("create ssl context failed!", e);
return null;
}
return builder.sslContext(sslContext).build();
}
});
}
public void close() {
for (ManagedChannel ch : connPool.values()) {
ch.shutdown();
}
connPool.clear();
}
}

@ -27,6 +27,8 @@ import com.ververica.cdc.connectors.tidb.TiDBTestBase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.Statement;
@ -42,6 +44,7 @@ import static org.junit.Assert.assertTrue;
/** Integration tests for TiDB change stream event SQL source. */
public class TiDBConnectorITCase extends TiDBTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TiDBConnectorITCase.class);
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
private final StreamTableEnvironment tEnv =

@ -0,0 +1,151 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import com.ververica.cdc.connectors.tidb.TiDBTestBase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
/** Integration tests for TiDB change stream event SQL source. */
public class TiDBConnectorRegionITCase extends TiDBTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TiDBConnectorRegionITCase.class);
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(1);
}
@Test
public void testRegionChange() throws Exception {
initializeTidbTable("region_switch_test");
String sourceDDL =
String.format(
"CREATE TABLE tidb_source ("
+ " `id` INT NOT NULL,"
+ " b INT,"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'tidb-cdc',"
+ " 'tikv.grpc.timeout_in_ms' = '20000',"
+ " 'pd-addresses' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT_ORIGIN),
"region_switch_test",
"t1");
String sinkDDL =
"CREATE TABLE sink ("
+ " `id` INT NOT NULL,"
+ " b INT,"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
// + " 'sink-expected-messages-num' = '121010'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
// Don't wait for snapshot finished is for the scene in issue
// https://github.com/ververica/flink-cdc-connectors/issues/1206 .
// waitForSinkSize("sink", 1);
int count = 0;
try (Connection connection = getJdbcConnection("region_switch_test");
Statement statement = connection.createStatement()) {
for (int i = 0; i < 20; i++) {
statement.execute(
"INSERT INTO t1 SELECT NULL, FLOOR(RAND()*1000), RANDOM_BYTES(1024), RANDOM_BYTES"
+ "(1024), RANDOM_BYTES(1024) FROM t1 a JOIN t1 b JOIN t1 c LIMIT 10000;");
}
ResultSet resultSet = statement.executeQuery("SHOW TABLE t1 REGIONS;");
while (resultSet.next()) {
String regionId = resultSet.getString(1);
String leaderStoreId = resultSet.getString(2);
String peerStoreIds = resultSet.getString(3);
String regionState = resultSet.getString(4);
String regionRows = resultSet.getString(5);
String regionSize = resultSet.getString(6);
String regionKeys = resultSet.getString(7);
LOG.info(
"regionId: {}, leaderStoreId: {}, peerStoreIds: {}, regionState: {}, regionRows: {}, regionSize: {}, regionKeys: {}",
regionId,
leaderStoreId,
peerStoreIds,
regionState,
regionRows,
regionSize,
regionKeys);
}
ResultSet resultSetCount = statement.executeQuery("select count(*) from t1;");
resultSetCount.next();
count = resultSetCount.getInt(1);
LOG.info("count: {}", count);
}
waitForSinkSize("sink", count);
result.getJobClient().get().cancel().get();
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}

@ -0,0 +1,30 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE region_switch_test;
USE region_switch_test;
-- Create an example table with enough data that fills a few Regions
CREATE TABLE t1 (
id INT NOT NULL PRIMARY KEY auto_increment,
b INT NOT NULL,
pad1 VARBINARY(1024),
pad2 VARBINARY(1024),
pad3 VARBINARY(1024)
);
INSERT INTO t1 SELECT NULL, FLOOR(RAND()*1000), RANDOM_BYTES(1024), RANDOM_BYTES(1024), RANDOM_BYTES(1024) FROM dual;
Loading…
Cancel
Save