From 682472af3b46b594a7fa39d37fdc0604f427e26e Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 3 Aug 2016 18:17:05 +0300 Subject: [PATCH] Redisson node launch class implemented. #208 --- src/main/java/org/redisson/Redisson.java | 9 ++- .../java/org/redisson/RedissonClient.java | 24 ++++-- .../org/redisson/RedissonExecutorService.java | 14 ++-- src/main/java/org/redisson/RedissonNode.java | 79 +++++++++++++++++++ .../org/redisson/api/RExecutorService.java | 15 ++++ src/main/java/org/redisson/config/Config.java | 20 ++--- .../org/redisson/config/ConfigSupport.java | 41 +++++----- .../redisson/config/RedissonNodeConfig.java | 59 ++++++++++++++ .../executor/RedissonExecutorServiceTest.java | 54 ++++++------- 9 files changed, 234 insertions(+), 81 deletions(-) create mode 100644 src/main/java/org/redisson/RedissonNode.java create mode 100644 src/main/java/org/redisson/config/RedissonNodeConfig.java diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index d6ba64c21..f30a258ca 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -368,14 +368,15 @@ public class Redisson implements RedissonClient { } @Override - public RExecutorService getExecutorService() { - return new RedissonExecutorService(new SerializationCodec(), commandExecutor, this); - } - public RExecutorService getExecutorService(String name) { return new RedissonExecutorService(new SerializationCodec(), commandExecutor, this, name); } + @Override + public RExecutorService getExecutorService(Codec codec, String name) { + return new RedissonExecutorService(codec, commandExecutor, this, name); + } + @Override public RRemoteService getRemoteSerivce() { return new RedissonRemoteService(this, commandExecutor); diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index 7cdfd3f12..0c38d248b 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -20,11 +20,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.redisson.client.codec.Codec; -import org.redisson.command.CommandExecutor; -import org.redisson.config.Config; -import org.redisson.liveobject.provider.CodecProvider; -import org.redisson.api.ClusterNode; import org.redisson.api.ClusterNodesGroup; import org.redisson.api.Node; import org.redisson.api.NodesGroup; @@ -64,6 +59,9 @@ import org.redisson.api.RSetMultimap; import org.redisson.api.RSetMultimapCache; import org.redisson.api.RSortedSet; import org.redisson.api.RTopic; +import org.redisson.client.codec.Codec; +import org.redisson.config.Config; +import org.redisson.liveobject.provider.CodecProvider; import org.redisson.liveobject.provider.ResolverProvider; /** @@ -612,7 +610,21 @@ public interface RedissonClient { */ RScript getScript(); - RExecutorService getExecutorService(); + /** + * Returns ExecutorService by name + * using SerializationCodec codec for task serialization and response/request + * + * @return + */ + RExecutorService getExecutorService(String name); + + /** + * Returns ExecutorService by name + * using provided codec for task serialization and response/request + * + * @return + */ + RExecutorService getExecutorService(Codec codec, String name); /** * Returns object for remote operations prefixed with the default name (redisson_remote_service) diff --git a/src/main/java/org/redisson/RedissonExecutorService.java b/src/main/java/org/redisson/RedissonExecutorService.java index cba86a6ac..5d5f489f0 100644 --- a/src/main/java/org/redisson/RedissonExecutorService.java +++ b/src/main/java/org/redisson/RedissonExecutorService.java @@ -85,10 +85,7 @@ public class RedissonExecutorService implements RExecutorService { private final Map, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap(); private final String name; - - public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson) { - this(codec, commandExecutor, redisson, "redisson_default_executor"); - } + private final String requestQueueName; public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name) { super(); @@ -98,7 +95,8 @@ public class RedissonExecutorService implements RExecutorService { this.name = name; this.redisson = redisson; - String objectName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}"; + requestQueueName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}"; + String objectName = requestQueueName; tasksCounter = redisson.getAtomicLong(objectName + ":counter"); status = redisson.getBucket(objectName + ":status"); topic = redisson.getTopic(objectName + ":topic"); @@ -114,9 +112,7 @@ public class RedissonExecutorService implements RExecutorService { @Override public void registerExecutors(int executors) { - String objectName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}"; - - RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, objectName); + RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, requestQueueName); service.setStatusName(status.getName()); service.setTasksCounterName(tasksCounter.getName()); service.setTopicName(topic.getChannelNames().get(0)); @@ -198,7 +194,7 @@ public class RedissonExecutorService implements RExecutorService { @Override public boolean delete() { - return keys.delete(status.getName(), tasksCounter.getName()) > 0; + return keys.delete(requestQueueName, status.getName(), tasksCounter.getName()) > 0; } @Override diff --git a/src/main/java/org/redisson/RedissonNode.java b/src/main/java/org/redisson/RedissonNode.java new file mode 100644 index 000000000..b59277e78 --- /dev/null +++ b/src/main/java/org/redisson/RedissonNode.java @@ -0,0 +1,79 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.io.File; +import java.io.IOException; +import java.util.Map.Entry; + +import org.redisson.config.RedissonNodeConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonNode { + + private static final Logger log = LoggerFactory.getLogger(RedissonNode.class); + + public static void main(String[] args) { + if (args.length == 0) { + throw new IllegalArgumentException(); + } + + String configPath = args[0]; + RedissonNodeConfig config = null; + try { + config = RedissonNodeConfig.fromJSON(new File(configPath)); + } catch (IOException e) { + // trying next format + try { + config = RedissonNodeConfig.fromYAML(new File(configPath)); + } catch (IOException e1) { + throw new IllegalArgumentException("Can't parse config file " + configPath); + } + } + + if (config.getExecutors().isEmpty()) { + throw new IllegalArgumentException("Executor settings are empty"); + } + + start(config); + } + + public static void start(RedissonNodeConfig config) { + final RedissonClient redisson = Redisson.create(config); + for (Entry entry : config.getExecutors().entrySet()) { + String name = entry.getKey(); + int workers = entry.getValue(); + redisson.getExecutorService(name).registerExecutors(workers); + log.info("{} worker(s) for '{}' ExecutorService registered", workers, name); + } + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + redisson.shutdown(); + } + }); + + log.info("Redisson node started!"); + } + +} diff --git a/src/main/java/org/redisson/api/RExecutorService.java b/src/main/java/org/redisson/api/RExecutorService.java index d1ede86ba..7cb2b4dc4 100644 --- a/src/main/java/org/redisson/api/RExecutorService.java +++ b/src/main/java/org/redisson/api/RExecutorService.java @@ -24,10 +24,25 @@ import java.util.concurrent.ExecutorService; */ public interface RExecutorService extends ExecutorService { + /** + * Returns executor name + * + * @return + */ String getName(); + /** + * Deletes executor request queue and state objects + * + * @return true if any of objects were deleted + */ boolean delete(); + /** + * Register executor worker + * + * @param executors - workers amount + */ void registerExecutors(int executors); } diff --git a/src/main/java/org/redisson/config/Config.java b/src/main/java/org/redisson/config/Config.java index 6850a3c20..a473fb1ae 100644 --- a/src/main/java/org/redisson/config/Config.java +++ b/src/main/java/org/redisson/config/Config.java @@ -379,7 +379,7 @@ public class Config { */ public static Config fromJSON(String content) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromJSON(content); + return support.fromJSON(content, Config.class); } /** @@ -391,7 +391,7 @@ public class Config { */ public static Config fromJSON(InputStream inputStream) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromJSON(inputStream); + return support.fromJSON(inputStream, Config.class); } /** @@ -403,7 +403,7 @@ public class Config { */ public static Config fromJSON(File file) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromJSON(file); + return support.fromJSON(file, Config.class); } /** @@ -415,7 +415,7 @@ public class Config { */ public static Config fromJSON(URL url) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromJSON(url); + return support.fromJSON(url, Config.class); } /** @@ -427,7 +427,7 @@ public class Config { */ public static Config fromJSON(Reader reader) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromJSON(reader); + return support.fromJSON(reader, Config.class); } /** @@ -450,7 +450,7 @@ public class Config { */ public static Config fromYAML(String content) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromYAML(content); + return support.fromYAML(content, Config.class); } /** @@ -462,7 +462,7 @@ public class Config { */ public static Config fromYAML(InputStream inputStream) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromYAML(inputStream); + return support.fromYAML(inputStream, Config.class); } /** @@ -474,7 +474,7 @@ public class Config { */ public static Config fromYAML(File file) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromYAML(file); + return support.fromYAML(file, Config.class); } /** @@ -486,7 +486,7 @@ public class Config { */ public static Config fromYAML(URL url) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromYAML(url); + return support.fromYAML(url, Config.class); } /** @@ -498,7 +498,7 @@ public class Config { */ public static Config fromYAML(Reader reader) throws IOException { ConfigSupport support = new ConfigSupport(); - return support.fromYAML(reader); + return support.fromYAML(reader, Config.class); } /** diff --git a/src/main/java/org/redisson/config/ConfigSupport.java b/src/main/java/org/redisson/config/ConfigSupport.java index f71a2b239..33ca5961d 100644 --- a/src/main/java/org/redisson/config/ConfigSupport.java +++ b/src/main/java/org/redisson/config/ConfigSupport.java @@ -108,48 +108,48 @@ public class ConfigSupport { private final ObjectMapper jsonMapper = createMapper(null); private final ObjectMapper yamlMapper = createMapper(new YAMLFactory()); - public Config fromJSON(String content) throws IOException { - return jsonMapper.readValue(content, Config.class); + public T fromJSON(String content, Class configType) throws IOException { + return jsonMapper.readValue(content, configType); } - public Config fromJSON(File file) throws IOException { - return jsonMapper.readValue(file, Config.class); + public T fromJSON(File file, Class configType) throws IOException { + return jsonMapper.readValue(file, configType); } - public Config fromJSON(URL url) throws IOException { - return jsonMapper.readValue(url, Config.class); + public T fromJSON(URL url, Class configType) throws IOException { + return jsonMapper.readValue(url, configType); } - public Config fromJSON(Reader reader) throws IOException { - return jsonMapper.readValue(reader, Config.class); + public T fromJSON(Reader reader, Class configType) throws IOException { + return jsonMapper.readValue(reader, configType); } - public Config fromJSON(InputStream inputStream) throws IOException { - return jsonMapper.readValue(inputStream, Config.class); + public T fromJSON(InputStream inputStream, Class configType) throws IOException { + return jsonMapper.readValue(inputStream, configType); } public String toJSON(Config config) throws IOException { return jsonMapper.writeValueAsString(config); } - public Config fromYAML(String content) throws IOException { - return yamlMapper.readValue(content, Config.class); + public T fromYAML(String content, Class configType) throws IOException { + return yamlMapper.readValue(content, configType); } - public Config fromYAML(File file) throws IOException { - return yamlMapper.readValue(file, Config.class); + public T fromYAML(File file, Class configType) throws IOException { + return yamlMapper.readValue(file, configType); } - public Config fromYAML(URL url) throws IOException { - return yamlMapper.readValue(url, Config.class); + public T fromYAML(URL url, Class configType) throws IOException { + return yamlMapper.readValue(url, configType); } - public Config fromYAML(Reader reader) throws IOException { - return yamlMapper.readValue(reader, Config.class); + public T fromYAML(Reader reader, Class configType) throws IOException { + return yamlMapper.readValue(reader, configType); } - public Config fromYAML(InputStream inputStream) throws IOException { - return yamlMapper.readValue(inputStream, Config.class); + public T fromYAML(InputStream inputStream, Class configType) throws IOException { + return yamlMapper.readValue(inputStream, configType); } public String toYAML(Config config) throws IOException { @@ -175,7 +175,6 @@ public class ConfigSupport { } else { throw new IllegalArgumentException("server(s) address(es) not defined!"); } - } private static void validate(SingleServerConfig config) { diff --git a/src/main/java/org/redisson/config/RedissonNodeConfig.java b/src/main/java/org/redisson/config/RedissonNodeConfig.java new file mode 100644 index 000000000..86319e8d2 --- /dev/null +++ b/src/main/java/org/redisson/config/RedissonNodeConfig.java @@ -0,0 +1,59 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.config; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonNodeConfig extends Config { + + private Map executors = new HashMap(); + + public RedissonNodeConfig() { + super(); + } + + public RedissonNodeConfig(Config oldConf) { + super(oldConf); + } + + public RedissonNodeConfig setExecutors(Map executors) { + this.executors = executors; + return this; + } + + public Map getExecutors() { + return executors; + } + + public static RedissonNodeConfig fromJSON(File file) throws IOException { + ConfigSupport support = new ConfigSupport(); + return support.fromJSON(file, RedissonNodeConfig.class); + } + + public static RedissonNodeConfig fromYAML(File file) throws IOException { + ConfigSupport support = new ConfigSupport(); + return support.fromYAML(file, RedissonNodeConfig.class); + } + +} diff --git a/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 4d44c2f75..a0f84d1c0 100644 --- a/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -1,7 +1,10 @@ package org.redisson.executor; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -10,40 +13,29 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.redisson.BaseTest; -import org.redisson.Redisson; -import org.redisson.RedissonClient; +import org.redisson.RedissonNode; import org.redisson.api.RExecutorService; import org.redisson.config.Config; - -import static org.assertj.core.api.Assertions.*; +import org.redisson.config.RedissonNodeConfig; public class RedissonExecutorServiceTest extends BaseTest { - private static RedissonClient redissonClient; - @BeforeClass public static void beforeClass() throws IOException, InterruptedException { BaseTest.beforeClass(); Config config = createConfig(); - redissonClient = Redisson.create(config); - redissonClient.getExecutorService().registerExecutors(1); - } - - @AfterClass - public static void afterClass() throws IOException, InterruptedException { - BaseTest.afterClass(); - - redissonClient.shutdown(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.setExecutors(Collections.singletonMap("test", 1)); + RedissonNode.start(nodeConfig); } @Test public void testMultipleTasks() throws InterruptedException, ExecutionException, TimeoutException { - RExecutorService e = redisson.getExecutorService(); + RExecutorService e = redisson.getExecutorService("test"); e.execute(new RunnableTask()); Future f = e.submit(new RunnableTask2()); f.get(); @@ -77,7 +69,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test(expected = RejectedExecutionException.class) public void testRejectExecute() throws InterruptedException, ExecutionException { - RExecutorService e = redisson.getExecutorService(); + RExecutorService e = redisson.getExecutorService("test"); e.execute(new RunnableTask()); Future f1 = e.submit(new RunnableTask2()); Future f2 = e.submit(new CallableTask()); @@ -93,7 +85,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test(expected = RejectedExecutionException.class) public void testRejectSubmitRunnable() throws InterruptedException, ExecutionException { - RExecutorService e = redisson.getExecutorService(); + RExecutorService e = redisson.getExecutorService("test"); e.execute(new RunnableTask()); Future f1 = e.submit(new RunnableTask2()); Future f2 = e.submit(new CallableTask()); @@ -109,7 +101,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test(expected = RejectedExecutionException.class) public void testRejectSubmitCallable() throws InterruptedException, ExecutionException { - RExecutorService e = redisson.getExecutorService(); + RExecutorService e = redisson.getExecutorService("test"); e.execute(new RunnableTask()); Future f1 = e.submit(new RunnableTask2()); Future f2 = e.submit(new CallableTask()); @@ -125,7 +117,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test(expected = RejectedExecutionException.class) public void testEmptyRejectSubmitRunnable() throws InterruptedException, ExecutionException { - RExecutorService e = redisson.getExecutorService(); + RExecutorService e = redisson.getExecutorService("test"); e.shutdown(); assertThat(e.isShutdown()).isTrue(); @@ -135,7 +127,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test public void testShutdown() throws InterruptedException { - RExecutorService e = redisson.getExecutorService(); + RExecutorService e = redisson.getExecutorService("test"); assertThat(e.isShutdown()).isFalse(); assertThat(e.isTerminated()).isFalse(); e.execute(new RunnableTask()); @@ -147,7 +139,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test public void testShutdownEmpty() throws InterruptedException { - RExecutorService e = redisson.getExecutorService(); + RExecutorService e = redisson.getExecutorService("test"); assertThat(e.isShutdown()).isFalse(); assertThat(e.isTerminated()).isFalse(); e.shutdown(); @@ -159,7 +151,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test public void testResetShutdownState() throws InterruptedException, ExecutionException { for (int i = 0; i < 100; i++) { - RExecutorService e = redisson.getExecutorService(); + RExecutorService e = redisson.getExecutorService("test"); e.execute(new RunnableTask()); assertThat(e.isShutdown()).isFalse(); e.shutdown(); @@ -176,10 +168,10 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test public void testRedissonInjected() throws InterruptedException, ExecutionException { - Future s1 = redisson.getExecutorService().submit(new CallableRedissonTask(1L)); - Future s2 = redisson.getExecutorService().submit(new CallableRedissonTask(2L)); - Future s3 = redisson.getExecutorService().submit(new CallableRedissonTask(30L)); - Future s4 = (Future) redisson.getExecutorService().submit(new RunnableRedissonTask()); + Future s1 = redisson.getExecutorService("test").submit(new CallableRedissonTask(1L)); + Future s2 = redisson.getExecutorService("test").submit(new CallableRedissonTask(2L)); + Future s3 = redisson.getExecutorService("test").submit(new CallableRedissonTask(30L)); + Future s4 = (Future) redisson.getExecutorService("test").submit(new RunnableRedissonTask()); List results = Arrays.asList(s1.get(), s2.get(), s3.get()); assertThat(results).containsOnlyOnce(33L); @@ -190,7 +182,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test(expected = IllegalArgumentException.class) public void testAnonymousRunnable() { - redisson.getExecutorService().submit(new Runnable() { + redisson.getExecutorService("test").submit(new Runnable() { @Override public void run() { } @@ -199,7 +191,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test(expected = IllegalArgumentException.class) public void testAnonymousCallable() { - redisson.getExecutorService().submit(new Callable() { + redisson.getExecutorService("test").submit(new Callable() { @Override public Object call() throws Exception { return null; @@ -209,7 +201,7 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test(expected = IllegalArgumentException.class) public void testAnonymousRunnableExecute() { - redisson.getExecutorService().execute(new Runnable() { + redisson.getExecutorService("test").execute(new Runnable() { @Override public void run() { }