Fixed - ExecutorService tasks aren't reloaded properly. #1468

pull/1499/head
Nikita 7 years ago
parent c6a9e1cab8
commit 670ccd4b77

@ -1,84 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Enumeration;
/**
*
* @author Nikita Koksharov
*
*/
public class ClassLoaderDelegator extends ClassLoader {
private final ThreadLocal<ClassLoader> threadLocalClassLoader = new ThreadLocal<ClassLoader>();
public void setCurrentClassLoader(ClassLoader classLoader) {
threadLocalClassLoader.set(classLoader);
}
public int hashCode() {
return threadLocalClassLoader.get().hashCode();
}
public boolean equals(Object obj) {
return threadLocalClassLoader.get().equals(obj);
}
public String toString() {
return threadLocalClassLoader.get().toString();
}
public Class<?> loadClass(String name) throws ClassNotFoundException {
return threadLocalClassLoader.get().loadClass(name);
}
public URL getResource(String name) {
return threadLocalClassLoader.get().getResource(name);
}
public Enumeration<URL> getResources(String name) throws IOException {
return threadLocalClassLoader.get().getResources(name);
}
public InputStream getResourceAsStream(String name) {
return threadLocalClassLoader.get().getResourceAsStream(name);
}
public void setDefaultAssertionStatus(boolean enabled) {
threadLocalClassLoader.get().setDefaultAssertionStatus(enabled);
}
public void setPackageAssertionStatus(String packageName, boolean enabled) {
threadLocalClassLoader.get().setPackageAssertionStatus(packageName, enabled);
}
public void setClassAssertionStatus(String className, boolean enabled) {
threadLocalClassLoader.get().setClassAssertionStatus(className, enabled);
}
public void clearAssertionStatus() {
threadLocalClassLoader.get().clearAssertionStatus();
}
public void clearCurrentClassLoader() {
threadLocalClassLoader.remove();
}
}

@ -53,10 +53,7 @@ import io.netty.util.concurrent.FutureListener;
*/
public class TasksRunnerService implements RemoteExecutorService {
private final ClassLoaderDelegator classLoader = new ClassLoaderDelegator();
private final Codec codec;
private final ClassLoader codecClassLoader;
private final String name;
private final CommandExecutor commandExecutor;
@ -77,12 +74,7 @@ public class TasksRunnerService implements RemoteExecutorService {
this.redisson = redisson;
this.responses = responses;
try {
this.codecClassLoader = codec.getClassLoader();
this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
this.codec = codec;
}
public void setTasksRetryIntervalName(String tasksRetryInterval) {
@ -184,11 +176,10 @@ public class TasksRunnerService implements RemoteExecutorService {
try {
buf.writeBytes(state);
RedissonClassLoader cl = new RedissonClassLoader(codecClassLoader);
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody);
classLoader.setCurrentClassLoader(cl);
Callable<?> callable = decode(buf);
Callable<?> callable = decode(cl, buf);
return callable.call();
} catch (RedissonShutdownException e) {
return null;
@ -248,12 +239,17 @@ public class TasksRunnerService implements RemoteExecutorService {
});
}
@SuppressWarnings("unchecked")
private <T> T decode(ByteBuf buf) throws IOException {
T task = (T) codec.getValueDecoder().decode(buf, null);
Injector.inject(task, redisson);
return task;
private <T> T decode(RedissonClassLoader cl, ByteBuf buf) throws IOException {
try {
Codec codec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl);
T task = (T) codec.getValueDecoder().decode(buf, null);
Injector.inject(task, redisson);
return task;
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
}
@Override
@ -266,11 +262,10 @@ public class TasksRunnerService implements RemoteExecutorService {
try {
buf.writeBytes(state);
RedissonClassLoader cl = new RedissonClassLoader(codecClassLoader);
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody);
classLoader.setCurrentClassLoader(cl);
Runnable runnable = decode(buf);
Runnable runnable = decode(cl, buf);
runnable.run();
} catch (RedissonShutdownException e) {
// skip
@ -295,8 +290,6 @@ public class TasksRunnerService implements RemoteExecutorService {
* @param requestId
*/
private void finish(String requestId) {
classLoader.clearCurrentClassLoader();
commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
+ "if scheduled == false then "

Loading…
Cancel
Save