|
|
@ -42,7 +42,7 @@ import org.junit.Test;
|
|
|
|
import org.junit.rules.TemporaryFolder;
|
|
|
|
import org.junit.rules.TemporaryFolder;
|
|
|
|
import org.junit.rules.Timeout;
|
|
|
|
import org.junit.rules.Timeout;
|
|
|
|
|
|
|
|
|
|
|
|
import java.lang.reflect.Method;
|
|
|
|
import java.lang.reflect.Field;
|
|
|
|
import java.sql.SQLException;
|
|
|
|
import java.sql.SQLException;
|
|
|
|
import java.time.ZoneId;
|
|
|
|
import java.time.ZoneId;
|
|
|
|
import java.util.ArrayList;
|
|
|
|
import java.util.ArrayList;
|
|
|
@ -557,9 +557,9 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
|
|
|
|
Class<?> clazz =
|
|
|
|
Class<?> clazz =
|
|
|
|
classLoader.loadClass(
|
|
|
|
classLoader.loadClass(
|
|
|
|
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
|
|
|
|
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
|
|
|
|
Method getConfigurationMethod = clazz.getDeclaredMethod("getConfiguration");
|
|
|
|
Field field = clazz.getDeclaredField("configuration");
|
|
|
|
getConfigurationMethod.setAccessible(true);
|
|
|
|
field.setAccessible(true);
|
|
|
|
Configuration configuration = (Configuration) getConfigurationMethod.invoke(env);
|
|
|
|
Configuration configuration = (Configuration) field.get(env);
|
|
|
|
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
|
|
|
|
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
env.setParallelism(parallelism);
|
|
|
|
env.setParallelism(parallelism);
|
|
|
|