|
|
|
@ -52,9 +52,9 @@ import static org.junit.Assert.assertEquals;
|
|
|
|
|
import static org.junit.Assert.assertNotNull;
|
|
|
|
|
|
|
|
|
|
/** End-to-end tests for tidb-cdc connector uber jar. */
|
|
|
|
|
public class TIDBE2eITCase extends FlinkContainerTestEnvironment {
|
|
|
|
|
public class TiDBE2eITCase extends FlinkContainerTestEnvironment {
|
|
|
|
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(TIDBE2eITCase.class);
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(TiDBE2eITCase.class);
|
|
|
|
|
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
|
|
|
|
|
|
|
|
|
|
public static final String PD_SERVICE_NAME = "pd0";
|
|
|
|
@ -137,16 +137,10 @@ public class TIDBE2eITCase extends FlinkContainerTestEnvironment {
|
|
|
|
|
|
|
|
|
|
@After
|
|
|
|
|
public void after() {
|
|
|
|
|
LOG.info("Stopping containers...");
|
|
|
|
|
Stream.of(TIDB, TIKV, PD).forEach(GenericContainer::stop);
|
|
|
|
|
log.info("Containers are stopped.");
|
|
|
|
|
super.after();
|
|
|
|
|
if (TIDB != null) {
|
|
|
|
|
TIDB.stop();
|
|
|
|
|
}
|
|
|
|
|
if (TIKV != null) {
|
|
|
|
|
TIDB.stop();
|
|
|
|
|
}
|
|
|
|
|
if (PD != null) {
|
|
|
|
|
TIDB.stop();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
@ -161,11 +155,8 @@ public class TIDBE2eITCase extends FlinkContainerTestEnvironment {
|
|
|
|
|
" PRIMARY KEY (`id`) NOT ENFORCED",
|
|
|
|
|
") WITH (",
|
|
|
|
|
" 'connector' = 'tidb-cdc',",
|
|
|
|
|
" 'hostname' = '" + TIDB_SERVICE_NAME + "',",
|
|
|
|
|
" 'tikv.grpc.timeout_in_ms' = '20000',",
|
|
|
|
|
" 'pd-addresses' = '" + PD_SERVICE_NAME + ":" + PD_PORT + "',",
|
|
|
|
|
" 'username' = '" + TIDB_USER + "',",
|
|
|
|
|
" 'password' = '" + TIDB_PASSWORD + "',",
|
|
|
|
|
" 'database-name' = 'inventory',",
|
|
|
|
|
" 'table-name' = 'products'",
|
|
|
|
|
");",
|
|
|
|
@ -264,7 +255,7 @@ public class TIDBE2eITCase extends FlinkContainerTestEnvironment {
|
|
|
|
|
*/
|
|
|
|
|
protected void initializeTidbTable(String sqlFile) {
|
|
|
|
|
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
|
|
|
|
|
final URL ddlTestFile = TIDBE2eITCase.class.getClassLoader().getResource(ddlFile);
|
|
|
|
|
final URL ddlTestFile = TiDBE2eITCase.class.getClassLoader().getResource(ddlFile);
|
|
|
|
|
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
|
|
|
|
|
try (Connection connection = getTidbJdbcConnection("");
|
|
|
|
|
Statement statement = connection.createStatement()) {
|