diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java
index d6ba64c21..f30a258ca 100755
--- a/src/main/java/org/redisson/Redisson.java
+++ b/src/main/java/org/redisson/Redisson.java
@@ -368,14 +368,15 @@ public class Redisson implements RedissonClient {
}
@Override
- public RExecutorService getExecutorService() {
- return new RedissonExecutorService(new SerializationCodec(), commandExecutor, this);
- }
-
public RExecutorService getExecutorService(String name) {
return new RedissonExecutorService(new SerializationCodec(), commandExecutor, this, name);
}
+ @Override
+ public RExecutorService getExecutorService(Codec codec, String name) {
+ return new RedissonExecutorService(codec, commandExecutor, this, name);
+ }
+
@Override
public RRemoteService getRemoteSerivce() {
return new RedissonRemoteService(this, commandExecutor);
diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java
index 7cdfd3f12..0c38d248b 100755
--- a/src/main/java/org/redisson/RedissonClient.java
+++ b/src/main/java/org/redisson/RedissonClient.java
@@ -20,11 +20,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.redisson.client.codec.Codec;
-import org.redisson.command.CommandExecutor;
-import org.redisson.config.Config;
-import org.redisson.liveobject.provider.CodecProvider;
-import org.redisson.api.ClusterNode;
import org.redisson.api.ClusterNodesGroup;
import org.redisson.api.Node;
import org.redisson.api.NodesGroup;
@@ -64,6 +59,9 @@ import org.redisson.api.RSetMultimap;
import org.redisson.api.RSetMultimapCache;
import org.redisson.api.RSortedSet;
import org.redisson.api.RTopic;
+import org.redisson.client.codec.Codec;
+import org.redisson.config.Config;
+import org.redisson.liveobject.provider.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider;
/**
@@ -612,7 +610,21 @@ public interface RedissonClient {
*/
RScript getScript();
- RExecutorService getExecutorService();
+ /**
+ * Returns ExecutorService by name
+ * using SerializationCodec codec for task serialization and response/request
+ *
+ * @return
+ */
+ RExecutorService getExecutorService(String name);
+
+ /**
+ * Returns ExecutorService by name
+ * using provided codec for task serialization and response/request
+ *
+ * @return
+ */
+ RExecutorService getExecutorService(Codec codec, String name);
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
diff --git a/src/main/java/org/redisson/RedissonExecutorService.java b/src/main/java/org/redisson/RedissonExecutorService.java
index cba86a6ac..5d5f489f0 100644
--- a/src/main/java/org/redisson/RedissonExecutorService.java
+++ b/src/main/java/org/redisson/RedissonExecutorService.java
@@ -85,10 +85,7 @@ public class RedissonExecutorService implements RExecutorService {
private final Map, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap();
private final String name;
-
- public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson) {
- this(codec, commandExecutor, redisson, "redisson_default_executor");
- }
+ private final String requestQueueName;
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name) {
super();
@@ -98,7 +95,8 @@ public class RedissonExecutorService implements RExecutorService {
this.name = name;
this.redisson = redisson;
- String objectName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}";
+ requestQueueName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}";
+ String objectName = requestQueueName;
tasksCounter = redisson.getAtomicLong(objectName + ":counter");
status = redisson.getBucket(objectName + ":status");
topic = redisson.getTopic(objectName + ":topic");
@@ -114,9 +112,7 @@ public class RedissonExecutorService implements RExecutorService {
@Override
public void registerExecutors(int executors) {
- String objectName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}";
-
- RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, objectName);
+ RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, requestQueueName);
service.setStatusName(status.getName());
service.setTasksCounterName(tasksCounter.getName());
service.setTopicName(topic.getChannelNames().get(0));
@@ -198,7 +194,7 @@ public class RedissonExecutorService implements RExecutorService {
@Override
public boolean delete() {
- return keys.delete(status.getName(), tasksCounter.getName()) > 0;
+ return keys.delete(requestQueueName, status.getName(), tasksCounter.getName()) > 0;
}
@Override
diff --git a/src/main/java/org/redisson/RedissonNode.java b/src/main/java/org/redisson/RedissonNode.java
new file mode 100644
index 000000000..b59277e78
--- /dev/null
+++ b/src/main/java/org/redisson/RedissonNode.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2016 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.redisson.config.RedissonNodeConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author Nikita Koksharov
+ *
+ */
+public class RedissonNode {
+
+ private static final Logger log = LoggerFactory.getLogger(RedissonNode.class);
+
+ public static void main(String[] args) {
+ if (args.length == 0) {
+ throw new IllegalArgumentException();
+ }
+
+ String configPath = args[0];
+ RedissonNodeConfig config = null;
+ try {
+ config = RedissonNodeConfig.fromJSON(new File(configPath));
+ } catch (IOException e) {
+ // trying next format
+ try {
+ config = RedissonNodeConfig.fromYAML(new File(configPath));
+ } catch (IOException e1) {
+ throw new IllegalArgumentException("Can't parse config file " + configPath);
+ }
+ }
+
+ if (config.getExecutors().isEmpty()) {
+ throw new IllegalArgumentException("Executor settings are empty");
+ }
+
+ start(config);
+ }
+
+ public static void start(RedissonNodeConfig config) {
+ final RedissonClient redisson = Redisson.create(config);
+ for (Entry entry : config.getExecutors().entrySet()) {
+ String name = entry.getKey();
+ int workers = entry.getValue();
+ redisson.getExecutorService(name).registerExecutors(workers);
+ log.info("{} worker(s) for '{}' ExecutorService registered", workers, name);
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ redisson.shutdown();
+ }
+ });
+
+ log.info("Redisson node started!");
+ }
+
+}
diff --git a/src/main/java/org/redisson/api/RExecutorService.java b/src/main/java/org/redisson/api/RExecutorService.java
index d1ede86ba..7cb2b4dc4 100644
--- a/src/main/java/org/redisson/api/RExecutorService.java
+++ b/src/main/java/org/redisson/api/RExecutorService.java
@@ -24,10 +24,25 @@ import java.util.concurrent.ExecutorService;
*/
public interface RExecutorService extends ExecutorService {
+ /**
+ * Returns executor name
+ *
+ * @return
+ */
String getName();
+ /**
+ * Deletes executor request queue and state objects
+ *
+ * @return true
if any of objects were deleted
+ */
boolean delete();
+ /**
+ * Register executor worker
+ *
+ * @param executors - workers amount
+ */
void registerExecutors(int executors);
}
diff --git a/src/main/java/org/redisson/config/Config.java b/src/main/java/org/redisson/config/Config.java
index 6850a3c20..a473fb1ae 100644
--- a/src/main/java/org/redisson/config/Config.java
+++ b/src/main/java/org/redisson/config/Config.java
@@ -379,7 +379,7 @@ public class Config {
*/
public static Config fromJSON(String content) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromJSON(content);
+ return support.fromJSON(content, Config.class);
}
/**
@@ -391,7 +391,7 @@ public class Config {
*/
public static Config fromJSON(InputStream inputStream) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromJSON(inputStream);
+ return support.fromJSON(inputStream, Config.class);
}
/**
@@ -403,7 +403,7 @@ public class Config {
*/
public static Config fromJSON(File file) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromJSON(file);
+ return support.fromJSON(file, Config.class);
}
/**
@@ -415,7 +415,7 @@ public class Config {
*/
public static Config fromJSON(URL url) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromJSON(url);
+ return support.fromJSON(url, Config.class);
}
/**
@@ -427,7 +427,7 @@ public class Config {
*/
public static Config fromJSON(Reader reader) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromJSON(reader);
+ return support.fromJSON(reader, Config.class);
}
/**
@@ -450,7 +450,7 @@ public class Config {
*/
public static Config fromYAML(String content) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromYAML(content);
+ return support.fromYAML(content, Config.class);
}
/**
@@ -462,7 +462,7 @@ public class Config {
*/
public static Config fromYAML(InputStream inputStream) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromYAML(inputStream);
+ return support.fromYAML(inputStream, Config.class);
}
/**
@@ -474,7 +474,7 @@ public class Config {
*/
public static Config fromYAML(File file) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromYAML(file);
+ return support.fromYAML(file, Config.class);
}
/**
@@ -486,7 +486,7 @@ public class Config {
*/
public static Config fromYAML(URL url) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromYAML(url);
+ return support.fromYAML(url, Config.class);
}
/**
@@ -498,7 +498,7 @@ public class Config {
*/
public static Config fromYAML(Reader reader) throws IOException {
ConfigSupport support = new ConfigSupport();
- return support.fromYAML(reader);
+ return support.fromYAML(reader, Config.class);
}
/**
diff --git a/src/main/java/org/redisson/config/ConfigSupport.java b/src/main/java/org/redisson/config/ConfigSupport.java
index f71a2b239..33ca5961d 100644
--- a/src/main/java/org/redisson/config/ConfigSupport.java
+++ b/src/main/java/org/redisson/config/ConfigSupport.java
@@ -108,48 +108,48 @@ public class ConfigSupport {
private final ObjectMapper jsonMapper = createMapper(null);
private final ObjectMapper yamlMapper = createMapper(new YAMLFactory());
- public Config fromJSON(String content) throws IOException {
- return jsonMapper.readValue(content, Config.class);
+ public T fromJSON(String content, Class configType) throws IOException {
+ return jsonMapper.readValue(content, configType);
}
- public Config fromJSON(File file) throws IOException {
- return jsonMapper.readValue(file, Config.class);
+ public T fromJSON(File file, Class configType) throws IOException {
+ return jsonMapper.readValue(file, configType);
}
- public Config fromJSON(URL url) throws IOException {
- return jsonMapper.readValue(url, Config.class);
+ public T fromJSON(URL url, Class configType) throws IOException {
+ return jsonMapper.readValue(url, configType);
}
- public Config fromJSON(Reader reader) throws IOException {
- return jsonMapper.readValue(reader, Config.class);
+ public T fromJSON(Reader reader, Class configType) throws IOException {
+ return jsonMapper.readValue(reader, configType);
}
- public Config fromJSON(InputStream inputStream) throws IOException {
- return jsonMapper.readValue(inputStream, Config.class);
+ public T fromJSON(InputStream inputStream, Class configType) throws IOException {
+ return jsonMapper.readValue(inputStream, configType);
}
public String toJSON(Config config) throws IOException {
return jsonMapper.writeValueAsString(config);
}
- public Config fromYAML(String content) throws IOException {
- return yamlMapper.readValue(content, Config.class);
+ public T fromYAML(String content, Class configType) throws IOException {
+ return yamlMapper.readValue(content, configType);
}
- public Config fromYAML(File file) throws IOException {
- return yamlMapper.readValue(file, Config.class);
+ public T fromYAML(File file, Class configType) throws IOException {
+ return yamlMapper.readValue(file, configType);
}
- public Config fromYAML(URL url) throws IOException {
- return yamlMapper.readValue(url, Config.class);
+ public T fromYAML(URL url, Class configType) throws IOException {
+ return yamlMapper.readValue(url, configType);
}
- public Config fromYAML(Reader reader) throws IOException {
- return yamlMapper.readValue(reader, Config.class);
+ public T fromYAML(Reader reader, Class configType) throws IOException {
+ return yamlMapper.readValue(reader, configType);
}
- public Config fromYAML(InputStream inputStream) throws IOException {
- return yamlMapper.readValue(inputStream, Config.class);
+ public T fromYAML(InputStream inputStream, Class configType) throws IOException {
+ return yamlMapper.readValue(inputStream, configType);
}
public String toYAML(Config config) throws IOException {
@@ -175,7 +175,6 @@ public class ConfigSupport {
} else {
throw new IllegalArgumentException("server(s) address(es) not defined!");
}
-
}
private static void validate(SingleServerConfig config) {
diff --git a/src/main/java/org/redisson/config/RedissonNodeConfig.java b/src/main/java/org/redisson/config/RedissonNodeConfig.java
new file mode 100644
index 000000000..86319e8d2
--- /dev/null
+++ b/src/main/java/org/redisson/config/RedissonNodeConfig.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2016 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson.config;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * @author Nikita Koksharov
+ *
+ */
+public class RedissonNodeConfig extends Config {
+
+ private Map executors = new HashMap();
+
+ public RedissonNodeConfig() {
+ super();
+ }
+
+ public RedissonNodeConfig(Config oldConf) {
+ super(oldConf);
+ }
+
+ public RedissonNodeConfig setExecutors(Map executors) {
+ this.executors = executors;
+ return this;
+ }
+
+ public Map getExecutors() {
+ return executors;
+ }
+
+ public static RedissonNodeConfig fromJSON(File file) throws IOException {
+ ConfigSupport support = new ConfigSupport();
+ return support.fromJSON(file, RedissonNodeConfig.class);
+ }
+
+ public static RedissonNodeConfig fromYAML(File file) throws IOException {
+ ConfigSupport support = new ConfigSupport();
+ return support.fromYAML(file, RedissonNodeConfig.class);
+ }
+
+}
diff --git a/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java
index 4d44c2f75..a0f84d1c0 100644
--- a/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java
+++ b/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java
@@ -1,7 +1,10 @@
package org.redisson.executor;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -10,40 +13,29 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.BaseTest;
-import org.redisson.Redisson;
-import org.redisson.RedissonClient;
+import org.redisson.RedissonNode;
import org.redisson.api.RExecutorService;
import org.redisson.config.Config;
-
-import static org.assertj.core.api.Assertions.*;
+import org.redisson.config.RedissonNodeConfig;
public class RedissonExecutorServiceTest extends BaseTest {
- private static RedissonClient redissonClient;
-
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
BaseTest.beforeClass();
Config config = createConfig();
- redissonClient = Redisson.create(config);
- redissonClient.getExecutorService().registerExecutors(1);
- }
-
- @AfterClass
- public static void afterClass() throws IOException, InterruptedException {
- BaseTest.afterClass();
-
- redissonClient.shutdown();
+ RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
+ nodeConfig.setExecutors(Collections.singletonMap("test", 1));
+ RedissonNode.start(nodeConfig);
}
@Test
public void testMultipleTasks() throws InterruptedException, ExecutionException, TimeoutException {
- RExecutorService e = redisson.getExecutorService();
+ RExecutorService e = redisson.getExecutorService("test");
e.execute(new RunnableTask());
Future> f = e.submit(new RunnableTask2());
f.get();
@@ -77,7 +69,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test(expected = RejectedExecutionException.class)
public void testRejectExecute() throws InterruptedException, ExecutionException {
- RExecutorService e = redisson.getExecutorService();
+ RExecutorService e = redisson.getExecutorService("test");
e.execute(new RunnableTask());
Future> f1 = e.submit(new RunnableTask2());
Future f2 = e.submit(new CallableTask());
@@ -93,7 +85,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test(expected = RejectedExecutionException.class)
public void testRejectSubmitRunnable() throws InterruptedException, ExecutionException {
- RExecutorService e = redisson.getExecutorService();
+ RExecutorService e = redisson.getExecutorService("test");
e.execute(new RunnableTask());
Future> f1 = e.submit(new RunnableTask2());
Future f2 = e.submit(new CallableTask());
@@ -109,7 +101,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test(expected = RejectedExecutionException.class)
public void testRejectSubmitCallable() throws InterruptedException, ExecutionException {
- RExecutorService e = redisson.getExecutorService();
+ RExecutorService e = redisson.getExecutorService("test");
e.execute(new RunnableTask());
Future> f1 = e.submit(new RunnableTask2());
Future f2 = e.submit(new CallableTask());
@@ -125,7 +117,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test(expected = RejectedExecutionException.class)
public void testEmptyRejectSubmitRunnable() throws InterruptedException, ExecutionException {
- RExecutorService e = redisson.getExecutorService();
+ RExecutorService e = redisson.getExecutorService("test");
e.shutdown();
assertThat(e.isShutdown()).isTrue();
@@ -135,7 +127,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test
public void testShutdown() throws InterruptedException {
- RExecutorService e = redisson.getExecutorService();
+ RExecutorService e = redisson.getExecutorService("test");
assertThat(e.isShutdown()).isFalse();
assertThat(e.isTerminated()).isFalse();
e.execute(new RunnableTask());
@@ -147,7 +139,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test
public void testShutdownEmpty() throws InterruptedException {
- RExecutorService e = redisson.getExecutorService();
+ RExecutorService e = redisson.getExecutorService("test");
assertThat(e.isShutdown()).isFalse();
assertThat(e.isTerminated()).isFalse();
e.shutdown();
@@ -159,7 +151,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test
public void testResetShutdownState() throws InterruptedException, ExecutionException {
for (int i = 0; i < 100; i++) {
- RExecutorService e = redisson.getExecutorService();
+ RExecutorService e = redisson.getExecutorService("test");
e.execute(new RunnableTask());
assertThat(e.isShutdown()).isFalse();
e.shutdown();
@@ -176,10 +168,10 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test
public void testRedissonInjected() throws InterruptedException, ExecutionException {
- Future s1 = redisson.getExecutorService().submit(new CallableRedissonTask(1L));
- Future s2 = redisson.getExecutorService().submit(new CallableRedissonTask(2L));
- Future s3 = redisson.getExecutorService().submit(new CallableRedissonTask(30L));
- Future s4 = (Future) redisson.getExecutorService().submit(new RunnableRedissonTask());
+ Future s1 = redisson.getExecutorService("test").submit(new CallableRedissonTask(1L));
+ Future s2 = redisson.getExecutorService("test").submit(new CallableRedissonTask(2L));
+ Future s3 = redisson.getExecutorService("test").submit(new CallableRedissonTask(30L));
+ Future s4 = (Future) redisson.getExecutorService("test").submit(new RunnableRedissonTask());
List results = Arrays.asList(s1.get(), s2.get(), s3.get());
assertThat(results).containsOnlyOnce(33L);
@@ -190,7 +182,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test(expected = IllegalArgumentException.class)
public void testAnonymousRunnable() {
- redisson.getExecutorService().submit(new Runnable() {
+ redisson.getExecutorService("test").submit(new Runnable() {
@Override
public void run() {
}
@@ -199,7 +191,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test(expected = IllegalArgumentException.class)
public void testAnonymousCallable() {
- redisson.getExecutorService().submit(new Callable