From 2bcddb1d3bf0c96df57ec5ef53a3fbfd8b9cd3fc Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 26 Aug 2019 14:05:47 +0300 Subject: [PATCH] Feature - allow to use Spring's @Autowired, @Value and JSR-330 @Inject annotations in ExecutorService tasks. #1657 --- .../org/redisson/RedissonExecutorService.java | 24 +++- .../main/java/org/redisson/RedissonNode.java | 15 ++- .../org/redisson/api/RExecutorService.java | 22 ++-- .../java/org/redisson/api/WorkerOptions.java | 89 +++++++++++++ .../redisson/config/RedissonNodeConfig.java | 16 +++ .../redisson/executor/TasksRunnerService.java | 14 ++ .../RedissonExecutorServiceSpringTest.java | 123 ++++++++++++++++++ 7 files changed, 289 insertions(+), 14 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/WorkerOptions.java create mode 100644 redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceSpringTest.java diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 1fcfb83a8..97c1855b2 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -57,6 +57,7 @@ import org.redisson.api.RScheduledFuture; import org.redisson.api.RSemaphore; import org.redisson.api.RTopic; import org.redisson.api.RemoteInvocationOptions; +import org.redisson.api.WorkerOptions; import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; @@ -237,11 +238,15 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public void registerWorkers(int workers) { - registerWorkers(workers, commandExecutor.getConnectionManager().getExecutor()); + registerWorkers(WorkerOptions.defaults().workers(workers)); } @Override - public void registerWorkers(int workers, ExecutorService executor) { + public void registerWorkers(WorkerOptions options) { + if (options.getWorkers() == 0) { + throw new IllegalArgumentException("workers amount can't be zero"); + } + QueueTransferTask task = new QueueTransferTask(connectionManager) { @Override protected RTopic getTopic() { @@ -306,17 +311,28 @@ public class RedissonExecutorService implements RScheduledExecutorService { service.setSchedulerChannelName(schedulerChannelName); service.setSchedulerQueueName(schedulerQueueName); service.setTasksRetryIntervalName(tasksRetryIntervalName); + service.setBeanFactory(options.getBeanFactory()); + + ExecutorService es = commandExecutor.getConnectionManager().getExecutor(); + if (options.getExecutorService() != null) { + es = options.getExecutorService(); + } - remoteService.register(RemoteExecutorService.class, service, workers, executor); + remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es); workersGroupListenerId = workersTopic.addListener(String.class, new MessageListener() { @Override public void onMessage(CharSequence channel, String id) { - redisson.getAtomicLong(workersCounterName + ":" + id).getAndAdd(workers); + redisson.getAtomicLong(workersCounterName + ":" + id).getAndAdd(options.getWorkers()); redisson.getSemaphore(workersSemaphoreName + ":" + id).release(); } }); } + @Override + public void registerWorkers(int workers, ExecutorService executor) { + registerWorkers(WorkerOptions.defaults().workers(workers).executorService(executor)); + } + @Override public void execute(Runnable task) { check(task); diff --git a/redisson/src/main/java/org/redisson/RedissonNode.java b/redisson/src/main/java/org/redisson/RedissonNode.java index 37af94c1d..032e8876d 100644 --- a/redisson/src/main/java/org/redisson/RedissonNode.java +++ b/redisson/src/main/java/org/redisson/RedissonNode.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RExecutorService; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; +import org.redisson.api.WorkerOptions; import org.redisson.client.RedisConnection; import org.redisson.config.RedissonNodeConfig; import org.redisson.connection.ConnectionManager; @@ -139,14 +140,24 @@ public final class RedissonNode { if (mapReduceWorkers == 0) { mapReduceWorkers = Runtime.getRuntime().availableProcessors(); } - redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME).registerWorkers(mapReduceWorkers); + + WorkerOptions options = WorkerOptions.defaults() + .workers(mapReduceWorkers) + .beanFactory(config.getBeanFactory()); + + redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME).registerWorkers(options); log.info("{} map reduce worker(s) registered", mapReduceWorkers); } for (Entry entry : config.getExecutorServiceWorkers().entrySet()) { String name = entry.getKey(); int workers = entry.getValue(); - redisson.getExecutorService(name).registerWorkers(workers); + + WorkerOptions options = WorkerOptions.defaults() + .workers(workers) + .beanFactory(config.getBeanFactory()); + + redisson.getExecutorService(name).registerWorkers(options); log.info("{} worker(s) registered for ExecutorService with '{}' name", workers, name); } diff --git a/redisson/src/main/java/org/redisson/api/RExecutorService.java b/redisson/src/main/java/org/redisson/api/RExecutorService.java index 2aabd83b6..99c158263 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorService.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorService.java @@ -102,21 +102,27 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync */ boolean delete(); - /** - * Register workers + /* + * Use {@link #registerWorkers(WorkerOptions)} setting instead * - * @param workers - workers amount */ + @Deprecated void registerWorkers(int workers); - - /** - * Register workers with custom executor + + /* + * Use {@link #registerWorkers(WorkerOptions)} setting instead * - * @param workers - workers amount - * @param executor - executor instance */ + @Deprecated void registerWorkers(int workers, ExecutorService executor); + /** + * Register workers + * + * @param options - worker options + */ + void registerWorkers(WorkerOptions options); + /** * Returns active worker groups * diff --git a/redisson/src/main/java/org/redisson/api/WorkerOptions.java b/redisson/src/main/java/org/redisson/api/WorkerOptions.java new file mode 100644 index 000000000..0c5a493d3 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/WorkerOptions.java @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.ExecutorService; + +import org.redisson.config.Config; +import org.springframework.beans.factory.BeanFactory; + +/** + * Configuration for RExecutorService workers. + * + * @author Nikita Koksharov + * + */ +public final class WorkerOptions { + + private int workers; + private ExecutorService executorService; + private BeanFactory beanFactory; + + private WorkerOptions() { + } + + public static WorkerOptions defaults() { + return new WorkerOptions(); + } + + public int getWorkers() { + return workers; + } + + /** + * Defines workers amount used to execute tasks. + * + * @param workers - workers amount + * @return self instance + */ + public WorkerOptions workers(int workers) { + this.workers = workers; + return this; + } + + public BeanFactory getBeanFactory() { + return beanFactory; + } + + /** + * Defines Spring BeanFactory instance to execute tasks with Spring's '@Autowired', + * '@Value' or JSR-330's '@Inject' annotation. + * + * @param beanFactory - Spring BeanFactory instance + * @return self instance + */ + public WorkerOptions beanFactory(BeanFactory beanFactory) { + this.beanFactory = beanFactory; + return this; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + /** + * Defines custom ExecutorService to execute tasks. + * {@link Config#setExecutor(ExecutorService)} is used by default. + * + * @param executorService - custom ExecutorService + * @return self instance + */ + public WorkerOptions executorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + +} diff --git a/redisson/src/main/java/org/redisson/config/RedissonNodeConfig.java b/redisson/src/main/java/org/redisson/config/RedissonNodeConfig.java index a6fa90fac..d3af64af0 100644 --- a/redisson/src/main/java/org/redisson/config/RedissonNodeConfig.java +++ b/redisson/src/main/java/org/redisson/config/RedissonNodeConfig.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import org.redisson.api.RedissonNodeInitializer; +import org.springframework.beans.factory.BeanFactory; /** * Redisson Node configuration @@ -32,6 +33,7 @@ public class RedissonNodeConfig extends Config { private int mapReduceWorkers = 0; private RedissonNodeInitializer redissonNodeInitializer; + private BeanFactory beanFactory; private Map executorServiceWorkers = new HashMap(); public RedissonNodeConfig() { @@ -47,6 +49,7 @@ public class RedissonNodeConfig extends Config { this.executorServiceWorkers = new HashMap(oldConf.executorServiceWorkers); this.redissonNodeInitializer = oldConf.redissonNodeInitializer; this.mapReduceWorkers = oldConf.mapReduceWorkers; + this.beanFactory = oldConf.beanFactory; } /** @@ -98,6 +101,19 @@ public class RedissonNodeConfig extends Config { return redissonNodeInitializer; } + public BeanFactory getBeanFactory() { + return beanFactory; + } + + /** + * Defines Spring Bean Factory instance to execute tasks with Spring's '@Autowired', + * '@Value' or JSR-330's '@Inject' annotation. + * + * @param beanFactory - Spring BeanFactory instance + */ + public void setBeanFactory(BeanFactory beanFactory) { + this.beanFactory = beanFactory; + } /** * Read config object stored in JSON format from File diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index a47877715..18533b348 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -50,6 +50,8 @@ import org.redisson.misc.HashValue; import org.redisson.misc.Injector; import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -79,6 +81,7 @@ public class TasksRunnerService implements RemoteExecutorService { private String schedulerQueueName; private String schedulerChannelName; private String tasksRetryIntervalName; + private BeanFactory beanFactory; private ConcurrentMap responses; public TasksRunnerService(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name, ConcurrentMap responses) { @@ -90,6 +93,10 @@ public class TasksRunnerService implements RemoteExecutorService { this.codec = codec; } + public void setBeanFactory(BeanFactory beanFactory) { + this.beanFactory = beanFactory; + } + public void setTasksRetryIntervalName(String tasksRetryInterval) { this.tasksRetryIntervalName = tasksRetryInterval; } @@ -292,6 +299,13 @@ public class TasksRunnerService implements RemoteExecutorService { } Injector.inject(task, redisson); + + if (beanFactory != null) { + AutowiredAnnotationBeanPostProcessor bpp = new AutowiredAnnotationBeanPostProcessor(); + bpp.setBeanFactory(beanFactory); + bpp.processInjection(task); + } + return task; } catch (Exception e) { throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceSpringTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceSpringTest.java new file mode 100644 index 000000000..61067507f --- /dev/null +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceSpringTest.java @@ -0,0 +1,123 @@ +package org.redisson.executor; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.concurrent.Callable; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.redisson.BaseTest; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.RedissonNode; +import org.redisson.api.RExecutorFuture; +import org.redisson.api.RedissonClient; +import org.redisson.api.annotation.RInject; +import org.redisson.config.Config; +import org.redisson.config.RedissonNodeConfig; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Service; + +public class RedissonExecutorServiceSpringTest extends BaseTest { + + public static class SampleRunnable implements Runnable, Serializable { + + @Autowired + private SampleBean bean; + + @RInject + private RedissonClient redisson; + + public SampleRunnable() { + } + + @Override + public void run() { + String res = bean.myMethod("runnable"); + redisson.getBucket("result").set(res); + } + + } + + public static class SampleCallable implements Callable, Serializable { + + @Autowired + private SampleBean bean; + + public SampleCallable() { + } + + @Override + public String call() throws Exception { + return bean.myMethod("callable"); + } + + } + + @Service + public static class SampleBean { + + public String myMethod(String key) { + return "hello " + key; + } + + } + + private static final String EXECUTOR_NAME = "spring_test"; + + @Configuration + @ComponentScan + public static class Application { + + @Bean(destroyMethod = "shutdown") + RedissonNode redissonNode(BeanFactory beanFactory) { + Config config = BaseTest.createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.setExecutorServiceWorkers(Collections.singletonMap(EXECUTOR_NAME, 1)); + nodeConfig.setBeanFactory(beanFactory); + RedissonNode node = RedissonNode.create(nodeConfig); + node.start(); + return node; + } + + } + + private static AnnotationConfigApplicationContext context; + + @BeforeClass + public static void beforeTest() throws FailedToStartRedisException, IOException, InterruptedException { + context = new AnnotationConfigApplicationContext(Application.class); + } + + @AfterClass + public static void afterTest() { + context.close(); + } + + @Test + public void testRunnable() throws InterruptedException { + redisson.getExecutorService(EXECUTOR_NAME).execute(new SampleRunnable()); + + Thread.sleep(500); + + assertThat(redisson.getBucket("result").get()).isEqualTo("hello runnable"); + } + + @Test + public void testCallable() throws InterruptedException { + RExecutorFuture future = redisson.getExecutorService(EXECUTOR_NAME).submit(new SampleCallable()); + + Thread.sleep(500); + + assertThat(future.sync().getNow()).isEqualTo("hello callable"); + } + +}