|
|
|
@ -42,12 +42,14 @@ import java.util.Arrays;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
|
|
|
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
|
|
import static org.junit.Assert.assertThat;
|
|
|
|
|
import static org.junit.Assert.fail;
|
|
|
|
|
|
|
|
|
|
/** Integration tests for MongoDB change stream event SQL source. */
|
|
|
|
|
public class MongoDBConnectorITCase extends MongoDBTestBase {
|
|
|
|
@ -476,7 +478,20 @@ public class MongoDBConnectorITCase extends MongoDBTestBase {
|
|
|
|
|
|
|
|
|
|
private static void waitForSinkSize(String sinkName, int expectedSize)
|
|
|
|
|
throws InterruptedException {
|
|
|
|
|
waitForSinkSize(sinkName, expectedSize, 10, TimeUnit.MINUTES);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static void waitForSinkSize(
|
|
|
|
|
String sinkName, int expectedSize, long timeout, TimeUnit timeUnit)
|
|
|
|
|
throws InterruptedException {
|
|
|
|
|
long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
|
|
|
|
|
while (sinkSize(sinkName) < expectedSize) {
|
|
|
|
|
if (System.nanoTime() > deadline) {
|
|
|
|
|
fail(
|
|
|
|
|
"Wait for sink size timeout, raw results: \n"
|
|
|
|
|
+ String.join(
|
|
|
|
|
"\n", TestValuesTableFactory.getRawResults(sinkName)));
|
|
|
|
|
}
|
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|