[FLINK-35354] Support host mapping in Flink tikv cdc (#3336)

* [FLINK-35337][cdc] Keep up with the latest version of tikv client

* [FLINK-35354]Support host mapping in Flink tikv cdc

* [FLINK-35354] Add doc for host mapping feature

* [FLINK-35354] fixed annotation import
pull/3469/head
ouyangwulin 7 months ago committed by GitHub
parent ca1470d5dd
commit 302a691225
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -124,6 +124,13 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>TiKV cluster's PD address.</td>
</tr>
<tr>
<td>host-mapping</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.</td>
</tr>
<tr>
<td>tikv.grpc.timeout_in_ms</td>

@ -125,6 +125,13 @@ Connector Options
<td>String</td>
<td>TiKV cluster's PD address.</td>
</tr>
<tr>
<td>host-mapping</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.</td>
</tr>
<tr>
<td>tikv.grpc.timeout_in_ms</td>
<td>optional</td>

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.tidb;
import org.apache.flink.cdc.connectors.tidb.table.utils.UriHostMapping;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@ -25,6 +26,7 @@ import org.tikv.common.ConfigUtils;
import org.tikv.common.TiConfiguration;
import java.util.Map;
import java.util.Optional;
/** Configurations for {@link TiDBSource}. */
public class TDBSourceOptions {
@ -57,6 +59,12 @@ public class TDBSourceOptions {
.noDefaultValue()
.withDescription("TiKV cluster's PD address");
public static final ConfigOption<String> HOST_MAPPING =
ConfigOptions.key("host-mapping")
.stringType()
.noDefaultValue()
.withDescription(
"TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.");
public static final ConfigOption<Long> TIKV_GRPC_TIMEOUT =
ConfigOptions.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
.longType()
@ -82,10 +90,11 @@ public class TDBSourceOptions {
.withDescription("TiKV GRPC batch scan concurrency");
public static TiConfiguration getTiConfiguration(
final String pdAddrsStr, final Map<String, String> options) {
final String pdAddrsStr, final String hostMapping, final Map<String, String> options) {
final Configuration configuration = Configuration.fromMap(options);
final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr);
Optional.of(new UriHostMapping(hostMapping)).ifPresent(tiConf::setHostMapping);
configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout);
configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout);
configuration

@ -33,6 +33,8 @@ import org.apache.flink.types.RowKind;
import org.tikv.common.TiConfiguration;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -52,6 +54,7 @@ public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata
private final String database;
private final String tableName;
private final String pdAddresses;
@Nullable private final String hostMapping;
private final StartupOptions startupOptions;
private final Map<String, String> options;
@ -70,12 +73,14 @@ public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata
String database,
String tableName,
String pdAddresses,
String hostMapping,
StartupOptions startupOptions,
Map<String, String> options) {
this.physicalSchema = physicalSchema;
this.database = checkNotNull(database);
this.tableName = checkNotNull(tableName);
this.pdAddresses = checkNotNull(pdAddresses);
this.hostMapping = hostMapping;
this.startupOptions = startupOptions;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.options = options;
@ -93,7 +98,8 @@ public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
final TiConfiguration tiConf = TDBSourceOptions.getTiConfiguration(pdAddresses, options);
final TiConfiguration tiConf =
TDBSourceOptions.getTiConfiguration(pdAddresses, hostMapping, options);
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
@ -132,7 +138,13 @@ public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata
public DynamicTableSource copy() {
TiDBTableSource source =
new TiDBTableSource(
physicalSchema, database, tableName, pdAddresses, startupOptions, options);
physicalSchema,
database,
tableName,
pdAddresses,
hostMapping,
startupOptions,
options);
source.producedDataType = producedDataType;
source.metadataKeys = metadataKeys;
return source;

@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.DATABASE_NAME;
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.HOST_MAPPING;
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.PD_ADDRESSES;
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.tidb.TDBSourceOptions.TABLE_NAME;
@ -56,6 +57,7 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
String pdAddresses = config.get(PD_ADDRESSES);
String hostMapping = config.get(HOST_MAPPING);
StartupOptions startupOptions = getStartupOptions(config);
ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
@ -67,6 +69,7 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
databaseName,
tableName,
pdAddresses,
hostMapping,
startupOptions,
TiKVOptions.getTiKVOptions(context.getCatalogTable().getOptions()));
}
@ -89,6 +92,7 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(SCAN_STARTUP_MODE);
options.add(HOST_MAPPING);
options.add(TIKV_GRPC_TIMEOUT);
options.add(TIKV_GRPC_SCAN_TIMEOUT);
options.add(TIKV_BATCH_GET_CONCURRENCY);

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.tidb.table.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.HostMapping;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/** Get TiKV Host Mapping Function. {@link HostMapping}.* */
public class UriHostMapping implements HostMapping {
private static final Logger LOG = LoggerFactory.getLogger(UriHostMapping.class);
private final ConcurrentMap<String, String> hostMapping;
public UriHostMapping(String hostMappingString) {
if (hostMappingString == null || hostMappingString.isEmpty()) {
hostMapping = null;
return;
}
try {
this.hostMapping =
Arrays.stream(hostMappingString.split(";"))
.map(
s -> {
String[] hostAndPort = s.split(":");
return new ConcurrentHashMap.SimpleEntry<>(
hostAndPort[0], hostAndPort[1]);
})
.collect(
Collectors.toConcurrentMap(
ConcurrentHashMap.SimpleEntry::getKey,
ConcurrentHashMap.SimpleEntry::getValue));
} catch (Exception e) {
LOG.error("Invalid host mapping string: {}", hostMappingString, e);
throw new IllegalArgumentException("Invalid host mapping string: " + hostMappingString);
}
}
public ConcurrentMap<String, String> getHostMapping() {
return hostMapping;
}
@Override
public URI getMappedURI(URI uri) {
if (hostMapping != null && hostMapping.containsKey(uri.getHost())) {
try {
return new URI(
uri.getScheme(),
uri.getUserInfo(),
hostMapping.get(uri.getHost()),
uri.getPort(),
uri.getPath(),
uri.getQuery(),
uri.getFragment());
} catch (URISyntaxException ex) {
LOG.error("Failed to get mapped URI", ex);
throw new IllegalArgumentException(ex);
}
}
return uri;
}
}

@ -71,6 +71,7 @@ public class TiDBTableSourceFactoryTest {
private static final String MY_DATABASE = "inventory";
private static final String MY_TABLE = "products";
private static final String PD_ADDRESS = "pd0:2379";
private static final String HOST_MAPPING = "host1:1;host2:2;host3:3";
private static final Map<String, String> OPTIONS = new HashMap<>();
@Test
@ -85,6 +86,7 @@ public class TiDBTableSourceFactoryTest {
MY_DATABASE,
MY_TABLE,
PD_ADDRESS,
HOST_MAPPING,
StartupOptions.latest(),
OPTIONS);
assertEquals(expectedSource, actualSource);
@ -93,6 +95,7 @@ public class TiDBTableSourceFactoryTest {
@Test
public void testOptionalProperties() {
Map<String, String> properties = getAllOptions();
properties.put("host-mapping", "host1:1;host2:2;host3:3");
properties.put("tikv.grpc.timeout_in_ms", "20000");
properties.put("tikv.grpc.scan_timeout_in_ms", "20000");
properties.put("tikv.batch_get_concurrency", "4");
@ -115,6 +118,7 @@ public class TiDBTableSourceFactoryTest {
MY_DATABASE,
MY_TABLE,
PD_ADDRESS,
HOST_MAPPING,
StartupOptions.latest(),
options);
assertEquals(expectedSource, actualSource);

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.tidb.table.utils;
import org.apache.flink.cdc.connectors.tidb.TDBSourceOptions;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
/** Unit test for {@link UriHostMapping}. * */
public class UriHostMappingTest {
@Test
public void uriHostMappingTest() {
final TiConfiguration tiConf =
TDBSourceOptions.getTiConfiguration(
"http://0.0.0.0:2347", "host1:1;host2:2;host3:3", new HashMap<>());
UriHostMapping uriHostMapping = (UriHostMapping) tiConf.getHostMapping();
assertEquals(uriHostMapping.getHostMapping().size(), 3);
assertEquals(uriHostMapping.getHostMapping().get("host1"), "1");
}
@Test
public void uriHostMappingEmpty() {
final TiConfiguration tiConf =
TDBSourceOptions.getTiConfiguration("http://0.0.0.0:2347", "", new HashMap<>());
UriHostMapping uriHostMapping = (UriHostMapping) tiConf.getHostMapping();
assertEquals(uriHostMapping.getHostMapping(), null);
}
@Test
public void uriHostMappingError() {
try {
final TiConfiguration tiConf =
TDBSourceOptions.getTiConfiguration(
"http://0.0.0.0:2347", "host1=1;host2=2;host3=3", new HashMap<>());
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "Invalid host mapping string: host1=1;host2=2;host3=3");
}
}
}
Loading…
Cancel
Save