[tidb][test] Fix tikv port conflicts

pull/898/head
gongzhongqiang 3 years ago committed by Leonard Xu
parent a728c89937
commit 22a468861a

@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.tidb;
import org.apache.flink.test.util.AbstractTestBase;
import com.alibaba.dcm.DnsCacheManipulator;
import org.apache.commons.lang3.RandomUtils;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.AfterClass;
@ -65,21 +66,22 @@ public class TiDBTestBase extends AbstractTestBase {
public static final String TIDB_PASSWORD = "";
public static final int TIDB_PORT = 4000;
public static final int TIKV_PORT = 20160;
public static final int PD_PORT = 2379;
public static final int TIKV_PORT_ORIGIN = 20160;
public static final int PD_PORT_ORIGIN = 2379;
public static int pdPort = PD_PORT_ORIGIN + RandomUtils.nextInt(0, 1000);
@ClassRule public static final Network NETWORK = Network.newNetwork();
@ClassRule
public static final GenericContainer<?> PD =
new FixedHostPortGenericContainer<>("pingcap/pd:v5.3.1")
.withFixedExposedPort(PD_PORT, PD_PORT)
.withFileSystemBind("src/test/resources/config/pd.toml", "/pd.toml")
.withFixedExposedPort(pdPort, PD_PORT_ORIGIN)
.withCommand(
"--name=pd0",
"--client-urls=http://0.0.0.0:2379",
"--client-urls=http://0.0.0.0:" + pdPort + ",http://0.0.0.0:2379",
"--peer-urls=http://0.0.0.0:2380",
"--advertise-client-urls=http://pd0:2379",
"--advertise-client-urls=http://pd0:" + pdPort + ",http://pd0:2379",
"--advertise-peer-urls=http://pd0:2380",
"--initial-cluster=pd0=http://pd0:2380",
"--data-dir=/data/pd0",
@ -93,7 +95,7 @@ public class TiDBTestBase extends AbstractTestBase {
@ClassRule
public static final GenericContainer<?> TIKV =
new FixedHostPortGenericContainer<>("pingcap/tikv:v5.3.1")
.withFixedExposedPort(TIKV_PORT, TIKV_PORT)
.withFixedExposedPort(TIKV_PORT_ORIGIN, TIKV_PORT_ORIGIN)
.withFileSystemBind("src/test/resources/config/tikv.toml", "/tikv.toml")
.withCommand(
"--addr=0.0.0.0:20160",
@ -126,16 +128,18 @@ public class TiDBTestBase extends AbstractTestBase {
@BeforeClass
public static void startContainers() throws Exception {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join();
// Add jvm dns cache for flink to invoke pd interface.
DnsCacheManipulator.setDnsCache(PD_SERVICE_NAME, "127.0.0.1");
DnsCacheManipulator.setDnsCache(TIKV_SERVICE_NAME, "127.0.0.1");
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join();
LOG.info("Containers are started.");
}
@AfterClass
public static void stopContainers() {
DnsCacheManipulator.removeDnsCache(PD_SERVICE_NAME);
DnsCacheManipulator.removeDnsCache(TIKV_SERVICE_NAME);
Stream.of(TIKV, PD, TIDB).forEach(GenericContainer::stop);
}

@ -80,7 +80,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
+ " 'table-name' = '%s'"
+ ")",
TIDB.getContainerIpAddress(),
PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT),
PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT_ORIGIN),
TIDB_USER,
TIDB_PASSWORD,
"inventory",
@ -191,7 +191,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
+ " 'table-name' = '%s'"
+ ")",
TIDB.getContainerIpAddress(),
PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT),
PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT_ORIGIN),
TIDB_USER,
TIDB_PASSWORD,
"inventory",
@ -276,7 +276,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
+ " 'table-name' = '%s'"
+ ")",
TIDB.getContainerIpAddress(),
PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT),
PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT_ORIGIN),
TIDB_USER,
TIDB_PASSWORD,
"inventory",
@ -345,6 +345,41 @@ public class TiDBConnectorITCase extends TiDBTestBase {
result.getJobClient().get().cancel().get();
}
/* @Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
TiDBTableSource tidbTableSource = (TiDBTableSource) actualSource;
tidbTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "table_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = tidbTableSource.copy();
TiDBTableSource expectedSource =
new TiDBTableSource(
SCHEMA_WITH_METADATA,
MY_HOSTNAME,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
PD_ADDRESS,
StartupOptions.latest(),
OPTIONS);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
tidbTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
TiKVRichParallelSourceFunction<RowData> sourceFunction =
(TiKVRichParallelSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(sourceFunction, expectedSource.producedDataType);
}*/
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {

@ -29,15 +29,11 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.connectors.tidb.TiDBTestBase;
import com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction;
import org.junit.Test;
@ -52,7 +48,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
/** Integration tests for TiDB table source factory. */
public class TiDBTableSourceFactoryITCase extends TiDBTestBase {
public class TiDBTableSourceFactoryITCase {
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
@ -140,41 +136,6 @@ public class TiDBTableSourceFactoryITCase extends TiDBTestBase {
assertEquals(expectedSource, actualSource);
}
@Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
TiDBTableSource tidbTableSource = (TiDBTableSource) actualSource;
tidbTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "table_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = tidbTableSource.copy();
TiDBTableSource expectedSource =
new TiDBTableSource(
SCHEMA_WITH_METADATA,
MY_HOSTNAME,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
PD_ADDRESS,
StartupOptions.latest(),
OPTIONS);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
tidbTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
TiKVRichParallelSourceFunction<RowData> sourceFunction =
(TiKVRichParallelSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(sourceFunction, expectedSource.producedDataType);
}
private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "tidb-cdc");

Loading…
Cancel
Save