Support Redisson instance injection into ExecutorService task. Refactoring. #208

pull/574/merge
Nikita 9 years ago
parent 8f8060937b
commit 93b73e8c86

@ -18,6 +18,8 @@ package org.redisson;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -39,9 +41,9 @@ import org.redisson.api.RAtomicLong;
import org.redisson.api.RBucket; import org.redisson.api.RBucket;
import org.redisson.api.RExecutorService; import org.redisson.api.RExecutorService;
import org.redisson.api.RKeys; import org.redisson.api.RKeys;
import org.redisson.api.RRemoteService;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RInject;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -57,6 +59,11 @@ import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonExecutorService implements RExecutorService { public class RedissonExecutorService implements RExecutorService {
public static final int SHUTDOWN_STATE = 1; public static final int SHUTDOWN_STATE = 1;
@ -97,24 +104,52 @@ public class RedissonExecutorService implements RExecutorService {
topic = redisson.getTopic(objectName + ":topic"); topic = redisson.getTopic(objectName + ":topic");
keys = redisson.getKeys(); keys = redisson.getKeys();
RRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor); ExecutorRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor);
remoteService.setTasksCounterName(tasksCounter.getName());
remoteService.setStatusName(status.getName());
asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck()); asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck());
asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
} }
@Override @Override
public void registerExecutors(int executors) { public void registerExecutors(int executors) {
RemoteExecutorService service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, name); String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}";
RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, objectName);
service.setStatusName(status.getName());
service.setTasksCounterName(tasksCounter.getName());
service.setTopicName(topic.getChannelNames().get(0));
redisson.getRemoteSerivce(name, codec).register(RemoteExecutorService.class, service, executors); redisson.getRemoteSerivce(name, codec).register(RemoteExecutorService.class, service, executors);
} }
@Override @Override
public void execute(Runnable task) { public void execute(Runnable task) {
check(task);
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeVoid(task.getClass().getName(), classBody, state);
execute(promise);
}
private byte[] encode(Object task) {
// erase RedissonClient field to avoid its serialization
Field[] fields = task.getClass().getDeclaredFields();
for (Field field : fields) {
if (RedissonClient.class.isAssignableFrom(field.getType())
&& field.isAnnotationPresent(RInject.class)) {
field.setAccessible(true);
try {
field.set(task, null);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
}
try { try {
byte[] state = codec.getValueEncoder().encode(task); return codec.getValueEncoder().encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeVoid(task.getClass().getName(), classBody, state);
check(promise);
} catch (IOException e) { } catch (IOException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
@ -210,18 +245,24 @@ public class RedissonExecutorService implements RExecutorService {
@Override @Override
public <T> Future<T> submit(Callable<T> task) { public <T> Future<T> submit(Callable<T> task) {
check(task);
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
try { byte[] state = encode(task);
byte[] state = codec.getValueEncoder().encode(task); RemotePromise<T> promise = (RemotePromise<T>)asyncService.execute(task.getClass().getName(), classBody, state);
RemotePromise<T> promise = (RemotePromise<T>)asyncService.execute(task.getClass().getName(), classBody, state); execute(promise);
check(promise); return promise;
return promise; }
} catch (IOException e) {
throw new IllegalArgumentException(e); private void check(Object task) {
if (task.getClass().isAnonymousClass()) {
throw new IllegalArgumentException("Task can't be created using anonymous class");
}
if (!Serializable.class.isAssignableFrom(task.getClass())) {
throw new IllegalArgumentException("Task class should implement Serializable interface");
} }
} }
private <T> void check(RemotePromise<T> promise) { private <T> void execute(RemotePromise<T> promise) {
io.netty.util.concurrent.Future<Boolean> addFuture = promise.getAddFuture(); io.netty.util.concurrent.Future<Boolean> addFuture = promise.getAddFuture();
addFuture.syncUninterruptibly(); addFuture.syncUninterruptibly();
Boolean res = addFuture.getNow(); Boolean res = addFuture.getNow();
@ -249,15 +290,12 @@ public class RedissonExecutorService implements RExecutorService {
@Override @Override
public Future<?> submit(Runnable task) { public Future<?> submit(Runnable task) {
check(task);
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
try { byte[] state = encode(task);
byte[] state = codec.getValueEncoder().encode(task); RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeVoid(task.getClass().getName(), classBody, state);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeVoid(task.getClass().getName(), classBody, state); execute(promise);
check(promise); return promise;
return promise;
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
} }
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,

@ -68,7 +68,7 @@ public class RedissonRemoteService implements RRemoteService {
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap(); private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Codec codec; protected final Codec codec;
protected final Redisson redisson; protected final Redisson redisson;
protected final String name; protected final String name;
protected final CommandExecutor commandExecutor; protected final CommandExecutor commandExecutor;

@ -17,6 +17,11 @@ package org.redisson.api;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/**
*
* @author Nikita Koksharov
*
*/
public interface RExecutorService extends ExecutorService { public interface RExecutorService extends ExecutorService {
String getName(); String getName();

@ -0,0 +1,32 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*
* @author Nikita Koksharov
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface RInject {
}

@ -20,6 +20,11 @@ import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.util.Enumeration; import java.util.Enumeration;
/**
*
* @author Nikita Koksharov
*
*/
public class ClassLoaderDelegator extends ClassLoader { public class ClassLoaderDelegator extends ClassLoader {
private final ThreadLocal<ClassLoader> threadLocalClassLoader = new ThreadLocal<ClassLoader>(); private final ThreadLocal<ClassLoader> threadLocalClassLoader = new ThreadLocal<ClassLoader>();

@ -19,7 +19,6 @@ import java.util.Arrays;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.RedissonRemoteService; import org.redisson.RedissonRemoteService;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueue;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
@ -31,17 +30,26 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
/**
*
* @author Nikita Koksharov
*
*/
public class ExecutorRemoteService extends RedissonRemoteService { public class ExecutorRemoteService extends RedissonRemoteService {
private final RAtomicLong tasksCounter; private String tasksCounterName;
private final RAtomicLong status; private String statusName;
public ExecutorRemoteService(Codec codec, Redisson redisson, String name, CommandExecutor commandExecutor) { public ExecutorRemoteService(Codec codec, Redisson redisson, String name, CommandExecutor commandExecutor) {
super(codec, redisson, name, commandExecutor); super(codec, redisson, name, commandExecutor);
}
String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}";
tasksCounter = redisson.getAtomicLong(objectName + ":counter"); public void setStatusName(String statusName) {
status = redisson.getAtomicLong(objectName + ":status"); this.statusName = statusName;
}
public void setTasksCounterName(String tasksCounterName) {
this.tasksCounterName = tasksCounterName;
} }
@Override @Override
@ -55,7 +63,7 @@ public class ExecutorRemoteService extends RedissonRemoteService {
+ "return 1;" + "return 1;"
+ "end;" + "end;"
+ "return 0;", + "return 0;",
Arrays.<Object>asList(tasksCounter.getName(), status.getName(), requestQueue.getName()), Arrays.<Object>asList(tasksCounterName, statusName, requestQueue.getName()),
encode(request)); encode(request));
result.setAddFuture(future); result.setAddFuture(future);

@ -15,6 +15,11 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonClassLoader extends ClassLoader { public class RedissonClassLoader extends ClassLoader {
public RedissonClassLoader(ClassLoader parent) { public RedissonClassLoader(ClassLoader parent) {

@ -15,6 +15,11 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
/**
*
* @author Nikita Koksharov
*
*/
public interface RemoteExecutorService { public interface RemoteExecutorService {
Object execute(String className, byte[] classBody, byte[] state); Object execute(String className, byte[] classBody, byte[] state);

@ -19,6 +19,11 @@ import org.redisson.remote.RRemoteAsync;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
/**
*
* @author Nikita Koksharov
*
*/
@RRemoteAsync(RemoteExecutorService.class) @RRemoteAsync(RemoteExecutorService.class)
public interface RemoteExecutorServiceAsync { public interface RemoteExecutorServiceAsync {

@ -15,14 +15,14 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import org.redisson.RedissonClient; import org.redisson.RedissonClient;
import org.redisson.RedissonExecutorService; import org.redisson.RedissonExecutorService;
import org.redisson.api.RAtomicLong; import org.redisson.api.annotation.RInject;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -31,6 +31,11 @@ import org.redisson.command.CommandExecutor;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
/**
*
* @author Nikita Koksharov
*
*/
public class RemoteExecutorServiceImpl implements RemoteExecutorService { public class RemoteExecutorServiceImpl implements RemoteExecutorService {
private final ClassLoaderDelegator classLoader = new ClassLoaderDelegator(); private final ClassLoaderDelegator classLoader = new ClassLoaderDelegator();
@ -39,17 +44,15 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService {
private final String name; private final String name;
private final CommandExecutor commandExecutor; private final CommandExecutor commandExecutor;
private final RAtomicLong tasksCounter; private final RedissonClient redisson;
private final RBucket<Integer> status; private String tasksCounterName;
private final RTopic<Integer> topic; private String statusName;
private String topicName;
public RemoteExecutorServiceImpl(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name) { public RemoteExecutorServiceImpl(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name) {
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.name = name;
this.name = name + ":{"+ RemoteExecutorService.class.getName() + "}"; this.redisson = redisson;
tasksCounter = redisson.getAtomicLong(this.name + ":counter");
status = redisson.getBucket(this.name + ":status");
topic = redisson.getTopic(this.name + ":topic");
try { try {
this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader); this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader);
@ -57,6 +60,18 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} }
public void setTasksCounterName(String tasksCounterName) {
this.tasksCounterName = tasksCounterName;
}
public void setStatusName(String statusName) {
this.statusName = statusName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
@Override @Override
public Object execute(String className, byte[] classBody, byte[] state) { public Object execute(String className, byte[] classBody, byte[] state) {
@ -68,7 +83,7 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService {
cl.loadClass(className, classBody); cl.loadClass(className, classBody);
classLoader.setCurrentClassLoader(cl); classLoader.setCurrentClassLoader(cl);
Callable<?> callable = (Callable<?>) codec.getValueDecoder().decode(buf, null); Callable<?> callable = decode(buf);
return callable.call(); return callable.call();
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
@ -78,6 +93,23 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService {
} }
} }
private <T> T decode(ByteBuf buf) throws IOException {
T task = (T) codec.getValueDecoder().decode(buf, null);
Field[] fields = task.getClass().getDeclaredFields();
for (Field field : fields) {
if (RedissonClient.class.isAssignableFrom(field.getType())
&& field.isAnnotationPresent(RInject.class)) {
field.setAccessible(true);
try {
field.set(task, redisson);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
}
return task;
}
@Override @Override
public void executeVoid(String className, byte[] classBody, byte[] state) { public void executeVoid(String className, byte[] classBody, byte[] state) {
ByteBuf buf = null; ByteBuf buf = null;
@ -88,7 +120,7 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService {
cl.loadClass(className, classBody); cl.loadClass(className, classBody);
classLoader.setCurrentClassLoader(cl); classLoader.setCurrentClassLoader(cl);
Runnable runnable = (Runnable) codec.getValueDecoder().decode(buf, null); Runnable runnable = decode(buf);
runnable.run(); runnable.run();
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
@ -106,7 +138,7 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService {
+ "redis.call('set', KEYS[2], ARGV[2]);" + "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);" + "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;", + "end;",
Arrays.<Object>asList(tasksCounter.getName(), status.getName(), topic.getChannelNames().get(0)), Arrays.<Object>asList(tasksCounterName, statusName, topicName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
} }

@ -20,6 +20,11 @@ import org.redisson.misc.PromiseDelegator;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
/**
*
* @author Nikita Koksharov
*
*/
public class RemotePromise<T> extends PromiseDelegator<T> { public class RemotePromise<T> extends PromiseDelegator<T> {
private Future<Boolean> addFuture; private Future<Boolean> addFuture;

@ -0,0 +1,30 @@
package org.redisson.executor;
import java.io.Serializable;
import java.util.concurrent.Callable;
import org.redisson.RedissonClient;
import org.redisson.api.annotation.RInject;
public class CallableRedissonTask implements Callable<Long>, Serializable {
private static final long serialVersionUID = 8875732248655428049L;
private Long incrementValue;
@RInject
private RedissonClient redissonClient;
public CallableRedissonTask() {
}
public CallableRedissonTask(Long incrementValue) {
this.incrementValue = incrementValue;
}
@Override
public Long call() throws Exception {
return redissonClient.getAtomicLong("counter").addAndGet(incrementValue);
}
}

@ -3,7 +3,7 @@ package org.redisson.executor;
import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
public class RedissonCallableTask implements Callable<String>, Serializable { public class CallableTask implements Callable<String>, Serializable {
public static final String RESULT = "callable"; public static final String RESULT = "callable";

@ -3,6 +3,7 @@ package org.redisson.executor;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
@ -41,35 +42,35 @@ public class RedissonExecutorServiceTest extends BaseTest {
} }
@Test @Test
public void test2() throws InterruptedException, ExecutionException, TimeoutException { public void testMultipleTasks() throws InterruptedException, ExecutionException, TimeoutException {
RExecutorService e = redisson.getExecutorService(); RExecutorService e = redisson.getExecutorService();
e.execute(new RedissonRunnableTask()); e.execute(new RunnableTask());
Future<?> f = e.submit(new RedissonRunnableTask2()); Future<?> f = e.submit(new RunnableTask2());
f.get(); f.get();
Future<String> fs = e.submit(new RedissonCallableTask()); Future<String> fs = e.submit(new CallableTask());
assertThat(fs.get()).isEqualTo(RedissonCallableTask.RESULT); assertThat(fs.get()).isEqualTo(CallableTask.RESULT);
Future<Integer> f2 = e.submit(new RedissonRunnableTask(), 12); Future<Integer> f2 = e.submit(new RunnableTask(), 12);
assertThat(f2.get()).isEqualTo(12); assertThat(f2.get()).isEqualTo(12);
String invokeResult = e.invokeAny(Arrays.asList(new RedissonCallableTask(), new RedissonCallableTask(), new RedissonCallableTask())); String invokeResult = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()));
assertThat(invokeResult).isEqualTo(RedissonCallableTask.RESULT); assertThat(invokeResult).isEqualTo(CallableTask.RESULT);
String a = e.invokeAny(Arrays.asList(new RedissonCallableTask(), new RedissonCallableTask(), new RedissonCallableTask()), 1, TimeUnit.SECONDS); String a = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()), 1, TimeUnit.SECONDS);
assertThat(a).isEqualTo(RedissonCallableTask.RESULT); assertThat(a).isEqualTo(CallableTask.RESULT);
List<RedissonCallableTask> invokeAllParams = Arrays.asList(new RedissonCallableTask(), new RedissonCallableTask(), new RedissonCallableTask()); List<CallableTask> invokeAllParams = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask());
List<Future<String>> allResult = e.invokeAll(invokeAllParams); List<Future<String>> allResult = e.invokeAll(invokeAllParams);
assertThat(allResult).hasSize(invokeAllParams.size()); assertThat(allResult).hasSize(invokeAllParams.size());
for (Future<String> future : allResult) { for (Future<String> future : allResult) {
assertThat(future.get()).isEqualTo(RedissonCallableTask.RESULT); assertThat(future.get()).isEqualTo(CallableTask.RESULT);
} }
List<RedissonCallableTask> invokeAllParams1 = Arrays.asList(new RedissonCallableTask(), new RedissonCallableTask(), new RedissonCallableTask()); List<CallableTask> invokeAllParams1 = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask());
List<Future<String>> allResult1 = e.invokeAll(invokeAllParams1, 1, TimeUnit.SECONDS); List<Future<String>> allResult1 = e.invokeAll(invokeAllParams1, 1, TimeUnit.SECONDS);
assertThat(allResult1).hasSize(invokeAllParams.size()); assertThat(allResult1).hasSize(invokeAllParams.size());
for (Future<String> future : allResult1) { for (Future<String> future : allResult1) {
assertThat(future.get()).isEqualTo(RedissonCallableTask.RESULT); assertThat(future.get()).isEqualTo(CallableTask.RESULT);
} }
} }
@ -77,49 +78,49 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test(expected = RejectedExecutionException.class) @Test(expected = RejectedExecutionException.class)
public void testRejectExecute() throws InterruptedException, ExecutionException { public void testRejectExecute() throws InterruptedException, ExecutionException {
RExecutorService e = redisson.getExecutorService(); RExecutorService e = redisson.getExecutorService();
e.execute(new RedissonRunnableTask()); e.execute(new RunnableTask());
Future<?> f1 = e.submit(new RedissonRunnableTask2()); Future<?> f1 = e.submit(new RunnableTask2());
Future<String> f2 = e.submit(new RedissonCallableTask()); Future<String> f2 = e.submit(new CallableTask());
e.shutdown(); e.shutdown();
f1.get(); f1.get();
assertThat(f2.get()).isEqualTo(RedissonCallableTask.RESULT); assertThat(f2.get()).isEqualTo(CallableTask.RESULT);
assertThat(e.isShutdown()).isTrue(); assertThat(e.isShutdown()).isTrue();
e.execute(new RedissonRunnableTask()); e.execute(new RunnableTask());
} }
@Test(expected = RejectedExecutionException.class) @Test(expected = RejectedExecutionException.class)
public void testRejectSubmitRunnable() throws InterruptedException, ExecutionException { public void testRejectSubmitRunnable() throws InterruptedException, ExecutionException {
RExecutorService e = redisson.getExecutorService(); RExecutorService e = redisson.getExecutorService();
e.execute(new RedissonRunnableTask()); e.execute(new RunnableTask());
Future<?> f1 = e.submit(new RedissonRunnableTask2()); Future<?> f1 = e.submit(new RunnableTask2());
Future<String> f2 = e.submit(new RedissonCallableTask()); Future<String> f2 = e.submit(new CallableTask());
e.shutdown(); e.shutdown();
f1.get(); f1.get();
assertThat(f2.get()).isEqualTo(RedissonCallableTask.RESULT); assertThat(f2.get()).isEqualTo(CallableTask.RESULT);
assertThat(e.isShutdown()).isTrue(); assertThat(e.isShutdown()).isTrue();
e.submit(new RedissonRunnableTask2()); e.submit(new RunnableTask2());
} }
@Test(expected = RejectedExecutionException.class) @Test(expected = RejectedExecutionException.class)
public void testRejectSubmitCallable() throws InterruptedException, ExecutionException { public void testRejectSubmitCallable() throws InterruptedException, ExecutionException {
RExecutorService e = redisson.getExecutorService(); RExecutorService e = redisson.getExecutorService();
e.execute(new RedissonRunnableTask()); e.execute(new RunnableTask());
Future<?> f1 = e.submit(new RedissonRunnableTask2()); Future<?> f1 = e.submit(new RunnableTask2());
Future<String> f2 = e.submit(new RedissonCallableTask()); Future<String> f2 = e.submit(new CallableTask());
e.shutdown(); e.shutdown();
f1.get(); f1.get();
assertThat(f2.get()).isEqualTo(RedissonCallableTask.RESULT); assertThat(f2.get()).isEqualTo(CallableTask.RESULT);
assertThat(e.isShutdown()).isTrue(); assertThat(e.isShutdown()).isTrue();
e.submit(new RedissonCallableTask()); e.submit(new CallableTask());
} }
@Test(expected = RejectedExecutionException.class) @Test(expected = RejectedExecutionException.class)
@ -128,7 +129,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
e.shutdown(); e.shutdown();
assertThat(e.isShutdown()).isTrue(); assertThat(e.isShutdown()).isTrue();
e.submit(new RedissonRunnableTask2()); e.submit(new RunnableTask2());
} }
@ -137,7 +138,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
RExecutorService e = redisson.getExecutorService(); RExecutorService e = redisson.getExecutorService();
assertThat(e.isShutdown()).isFalse(); assertThat(e.isShutdown()).isFalse();
assertThat(e.isTerminated()).isFalse(); assertThat(e.isTerminated()).isFalse();
e.execute(new RedissonRunnableTask()); e.execute(new RunnableTask());
e.shutdown(); e.shutdown();
assertThat(e.isShutdown()).isTrue(); assertThat(e.isShutdown()).isTrue();
assertThat(e.awaitTermination(5, TimeUnit.SECONDS)).isTrue(); assertThat(e.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
@ -159,7 +160,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
public void testResetShutdownState() throws InterruptedException, ExecutionException { public void testResetShutdownState() throws InterruptedException, ExecutionException {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
RExecutorService e = redisson.getExecutorService(); RExecutorService e = redisson.getExecutorService();
e.execute(new RedissonRunnableTask()); e.execute(new RunnableTask());
assertThat(e.isShutdown()).isFalse(); assertThat(e.isShutdown()).isFalse();
e.shutdown(); e.shutdown();
assertThat(e.isShutdown()).isTrue(); assertThat(e.isShutdown()).isTrue();
@ -168,9 +169,51 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(e.delete()).isTrue(); assertThat(e.delete()).isTrue();
assertThat(e.isShutdown()).isFalse(); assertThat(e.isShutdown()).isFalse();
assertThat(e.isTerminated()).isFalse(); assertThat(e.isTerminated()).isFalse();
Future<?> future = e.submit(new RedissonRunnableTask()); Future<?> future = e.submit(new RunnableTask());
future.get(); future.get();
} }
} }
@Test
public void testRedissonInjected() throws InterruptedException, ExecutionException {
Future<Long> s1 = redisson.getExecutorService().submit(new CallableRedissonTask(1L));
Future<Long> s2 = redisson.getExecutorService().submit(new CallableRedissonTask(2L));
Future<Long> s3 = redisson.getExecutorService().submit(new CallableRedissonTask(30L));
Future<Void> s4 = (Future<Void>) redisson.getExecutorService().submit(new RunnableRedissonTask());
List<Long> results = Arrays.asList(s1.get(), s2.get(), s3.get());
assertThat(results).containsOnlyOnce(33L);
s4.get();
assertThat(redisson.getAtomicLong("runnableCounter").get()).isEqualTo(100L);
}
@Test(expected = IllegalArgumentException.class)
public void testAnonymousRunnable() {
redisson.getExecutorService().submit(new Runnable() {
@Override
public void run() {
}
});
}
@Test(expected = IllegalArgumentException.class)
public void testAnonymousCallable() {
redisson.getExecutorService().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
}
@Test(expected = IllegalArgumentException.class)
public void testAnonymousRunnableExecute() {
redisson.getExecutorService().execute(new Runnable() {
@Override
public void run() {
}
});
}
} }

@ -0,0 +1,20 @@
package org.redisson.executor;
import java.io.Serializable;
import org.redisson.RedissonClient;
import org.redisson.api.annotation.RInject;
public class RunnableRedissonTask implements Runnable, Serializable {
private static final long serialVersionUID = 4165626916136893351L;
@RInject
private RedissonClient redissonClient;
@Override
public void run() {
redissonClient.getAtomicLong("runnableCounter").addAndGet(100);
}
}

@ -2,7 +2,7 @@ package org.redisson.executor;
import java.io.Serializable; import java.io.Serializable;
public class RedissonRunnableTask implements Runnable, Serializable { public class RunnableTask implements Runnable, Serializable {
private static final long serialVersionUID = 2105094575950438867L; private static final long serialVersionUID = 2105094575950438867L;

@ -2,7 +2,7 @@ package org.redisson.executor;
import java.io.Serializable; import java.io.Serializable;
public class RedissonRunnableTask2 implements Runnable, Serializable { public class RunnableTask2 implements Runnable, Serializable {
private static final long serialVersionUID = 2105094575950438867L; private static final long serialVersionUID = 2105094575950438867L;
Loading…
Cancel
Save