diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 03c96369a..0a43108e5 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -689,7 +689,7 @@ public abstract class BaseRemoteService { byte[] id = new byte[17]; // TODO JDK UPGRADE replace to native ThreadLocalRandom PlatformDependent.threadLocalRandom().nextBytes(id); - id[0] = 0; + id[0] = 00; return new RequestId(id); } @@ -701,7 +701,7 @@ public abstract class BaseRemoteService { private void cancelExecution(RemoteInvocationOptions optionsCopy, boolean mayInterruptIfRunning, RemotePromise remotePromise) { RMap canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); - canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false)); + canceledRequests.fastPutAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); // subscribe for async result if it's not expected before diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 3e7aa5647..f5c74af70 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -362,7 +362,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS // could be removed not from future object if (future.getNow().isSendResponse()) { RMap map = redisson.getMap(cancelResponseMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); - map.putAsync(request.getId(), response); + map.fastPutAsync(request.getId(), response); map.expireAsync(60, TimeUnit.SECONDS); } } diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index ac7c22ed6..7c846f5cd 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -67,7 +67,7 @@ public class RedissonTopic implements RTopic { this.codec = codec; this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService(); } - + @Override public List getChannelNames() { return Collections.singletonList(name); @@ -264,4 +264,13 @@ public class RedissonTopic implements RTopic { } } + @Override + public int countListeners() { + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); + if (entry != null) { + return entry.countListeners(); + } + return 0; + } + } diff --git a/redisson/src/main/java/org/redisson/api/RTopic.java b/redisson/src/main/java/org/redisson/api/RTopic.java index 6ff7d88af..63a0e67d5 100644 --- a/redisson/src/main/java/org/redisson/api/RTopic.java +++ b/redisson/src/main/java/org/redisson/api/RTopic.java @@ -84,4 +84,10 @@ public interface RTopic extends RTopicAsync { */ void removeAllListeners(); + /** + * Returns amount of registered listeners + * + * @return amount of listeners + */ + int countListeners(); } diff --git a/redisson/src/main/java/org/redisson/client/codec/BaseCodec.java b/redisson/src/main/java/org/redisson/client/codec/BaseCodec.java index 9aa92e81f..73a0b97e3 100644 --- a/redisson/src/main/java/org/redisson/client/codec/BaseCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/BaseCodec.java @@ -49,5 +49,10 @@ public abstract class BaseCodec implements Codec { public ClassLoader getClassLoader() { return getClass().getClassLoader(); } + + @Override + public String toString() { + return getClass().getName(); + } } diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index a5e5c575f..5b3492432 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -146,7 +146,7 @@ public class TasksRunnerService implements RemoteExecutorService { future = service.schedule(params); } try { - executeRunnable(params); + executeRunnable(params, nextStartDate); } catch (RuntimeException e) { // cancel task if it throws an exception if (future != null) { @@ -209,7 +209,7 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - finish(params.getRequestId()); + finish(params.getRequestId(), null); } } @@ -293,8 +293,7 @@ public class TasksRunnerService implements RemoteExecutorService { } } - @Override - public void executeRunnable(TaskParameters params) { + public void executeRunnable(TaskParameters params, Date nextDate) { if (params.getRequestId() != null && params.getRequestId().startsWith("00")) { renewRetryTime(params.getRequestId()); } @@ -309,9 +308,14 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - finish(params.getRequestId()); + finish(params.getRequestId(), nextDate); } } + + @Override + public void executeRunnable(TaskParameters params) { + executeRunnable(params, null); + } /** * Check shutdown state. If tasksCounter equals 0 @@ -323,22 +327,26 @@ public class TasksRunnerService implements RemoteExecutorService { * * @param requestId */ - private void finish(String requestId) { + private void finish(String requestId, Date nextDate) { + String script = ""; + if (nextDate == null) { + script += "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);" + + "if scheduled == false then " + + "redis.call('hdel', KEYS[4], ARGV[3]); " + + "end;"; + } + script += "redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" + + "if redis.call('decr', KEYS[1]) == 0 then " + + "redis.call('del', KEYS[1]);" + + "if redis.call('get', KEYS[2]) == ARGV[1] then " + + "redis.call('del', KEYS[6]);" + + "redis.call('set', KEYS[2], ARGV[2]);" + + "redis.call('publish', KEYS[3], ARGV[2]);" + + "end;" + + "end;"; + commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID, - "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);" - + "if scheduled == false then " - + "redis.call('hdel', KEYS[4], ARGV[3]); " - + "end;" + - - "redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" + - "if redis.call('decr', KEYS[1]) == 0 then " - + "redis.call('del', KEYS[1]);" - + "if redis.call('get', KEYS[2]) == ARGV[1] then " - + "redis.call('del', KEYS[6]);" - + "redis.call('set', KEYS[2], ARGV[2]);" - + "redis.call('publish', KEYS[3], ARGV[2]);" - + "end;" - + "end;", + script, Arrays.asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId); } diff --git a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java index dd35c0756..5702a0445 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java @@ -53,6 +53,10 @@ public class PubSubConnectionEntry { this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection); } + public int countListeners() { + return channelListeners.size(); + } + public boolean hasListeners(ChannelName channelName) { return channelListeners.containsKey(channelName); } diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 5d2ea6627..19da8d3c2 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -107,6 +107,20 @@ public class RedissonTopicTest { } + @Test + public void testCountListeners() { + RedissonClient redisson = BaseTest.createInstance(); + RTopic topic1 = redisson.getTopic("topic", LongCodec.INSTANCE); + assertThat(topic1.countListeners()).isZero(); + int id = topic1.addListener(Long.class, (channel, msg) -> { + }); + assertThat(topic1.countListeners()).isOne(); + topic1.removeListener(id); + assertThat(topic1.countListeners()).isZero(); + + redisson.shutdown(); + } + @Test public void testPing() throws InterruptedException { Config config = BaseTest.createConfig(); diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 87c007b0d..41961c4d6 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -118,6 +118,7 @@ public class RedissonExecutorServiceTest extends BaseTest { Config config = createConfig(); RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node.shutdown(); node = RedissonNode.create(nodeConfig); node.start(); @@ -181,6 +182,7 @@ public class RedissonExecutorServiceTest extends BaseTest { RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node.shutdown(); node = RedissonNode.create(nodeConfig); node.start(); @@ -234,6 +236,7 @@ public class RedissonExecutorServiceTest extends BaseTest { Config config = createConfig(); RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node.shutdown(); node = RedissonNode.create(nodeConfig); node.start(); diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index 54d8778bb..67ed44764 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -17,6 +17,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.redisson.BaseTest; +import org.redisson.Redisson; import org.redisson.RedissonExecutorService; import org.redisson.RedissonNode; import org.redisson.api.CronSchedule; @@ -24,7 +25,9 @@ import org.redisson.api.ExecutorOptions; import org.redisson.api.RExecutorFuture; import org.redisson.api.RScheduledExecutorService; import org.redisson.api.RScheduledFuture; +import org.redisson.api.RedissonClient; import org.redisson.api.RemoteInvocationOptions; +import org.redisson.api.annotation.RInject; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; @@ -55,6 +58,43 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { node.shutdown(); } + public static class TestTask implements Runnable { + + @RInject + RedissonClient redisson; + + @Override + public void run() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + redisson.getAtomicLong("counter").incrementAndGet(); + } + + } + + @Test + public void testSingleWorker() throws InterruptedException { + Config config = createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.getExecutorServiceWorkers().put("JobA", 1); + RedissonNode node = RedissonNode.create(nodeConfig); + node.start(); + + RedissonClient client = Redisson.create(config); + RScheduledExecutorService executorService = client.getExecutorService("JobA"); + executorService.schedule(new TestTask() , CronSchedule.of("0/1 * * * * ?")); + + TimeUnit.MILLISECONDS.sleep(4800); + + assertThat(client.getAtomicLong("counter").get()).isEqualTo(4); + + client.shutdown(); + node.shutdown(); + } + @Test public void testDelay() throws InterruptedException { RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS)); @@ -106,6 +146,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { Config config = createConfig(); RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node.shutdown(); node = RedissonNode.create(nodeConfig); node.start();