|
|
|
@ -26,10 +26,12 @@ import org.apache.flink.runtime.client.JobStatusMessage;
|
|
|
|
|
import org.apache.flink.table.api.ValidationException;
|
|
|
|
|
import org.apache.flink.util.TestLogger;
|
|
|
|
|
|
|
|
|
|
import com.github.dockerjava.api.DockerClient;
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
|
|
|
|
|
import org.junit.After;
|
|
|
|
|
import org.junit.AfterClass;
|
|
|
|
|
import org.junit.Before;
|
|
|
|
|
import org.junit.ClassRule;
|
|
|
|
|
import org.junit.Rule;
|
|
|
|
@ -38,6 +40,7 @@ import org.junit.runner.RunWith;
|
|
|
|
|
import org.junit.runners.Parameterized;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
import org.testcontainers.DockerClientFactory;
|
|
|
|
|
import org.testcontainers.containers.Container.ExecResult;
|
|
|
|
|
import org.testcontainers.containers.GenericContainer;
|
|
|
|
|
import org.testcontainers.containers.Network;
|
|
|
|
@ -174,6 +177,45 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
|
|
|
|
|
mysqlInventoryDatabase.dropDatabase();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@AfterClass
|
|
|
|
|
public static void afterClass() {
|
|
|
|
|
DockerClient dockerClient = DockerClientFactory.instance().client();
|
|
|
|
|
|
|
|
|
|
// List all containers and remove the ones that are not testcontainers related.
|
|
|
|
|
dockerClient.listContainersCmd().exec().stream()
|
|
|
|
|
.filter(container -> !container.getImage().startsWith("testcontainers"))
|
|
|
|
|
.forEach(
|
|
|
|
|
container -> {
|
|
|
|
|
dockerClient.stopContainerCmd(container.getId()).exec();
|
|
|
|
|
dockerClient.removeContainerCmd(container.getId()).exec();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// List all images and remove the ones that are not flink、mysql、testcontainers related.
|
|
|
|
|
dockerClient.listImagesCmd().exec().stream()
|
|
|
|
|
.filter(
|
|
|
|
|
image ->
|
|
|
|
|
image.getRepoTags() != null
|
|
|
|
|
&& Arrays.stream(image.getRepoTags())
|
|
|
|
|
.anyMatch(
|
|
|
|
|
tag ->
|
|
|
|
|
!tag.startsWith("flink:")
|
|
|
|
|
&& !tag.startsWith(
|
|
|
|
|
"testcontainers")
|
|
|
|
|
&& !tag.equals(
|
|
|
|
|
MYSQL
|
|
|
|
|
.getDockerImageName())))
|
|
|
|
|
.forEach(
|
|
|
|
|
image -> {
|
|
|
|
|
try {
|
|
|
|
|
dockerClient.removeImageCmd(image.getId()).exec();
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
LOG.warn(
|
|
|
|
|
"Failed to remove image: {}",
|
|
|
|
|
String.join(",", image.getRepoTags()));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Allow overriding the default flink properties. */
|
|
|
|
|
public void overrideFlinkProperties(String properties) {
|
|
|
|
|
jobManager.withEnv("FLINK_PROPERTIES", properties);
|
|
|
|
|