From 246f1d7d3ce3b2e51e31f9fd928011e8f60a0431 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 30 Oct 2019 13:00:26 +0300 Subject: [PATCH] Feature - RExecutorService.getTaskCount method added. #2350 --- .../org/redisson/RedissonExecutorService.java | 87 +++++-------------- .../org/redisson/api/RExecutorService.java | 9 +- .../redisson/api/RExecutorServiceAsync.java | 7 ++ .../client/protocol/RedisCommands.java | 2 +- .../executor/RedissonExecutorServiceTest.java | 17 ++++ .../RedissonScheduledExecutorServiceTest.java | 26 ++++-- 6 files changed, 74 insertions(+), 74 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 493636a2f..5d8c694d3 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -15,72 +15,17 @@ */ package org.redisson; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.lang.invoke.SerializedLambda; -import java.lang.ref.ReferenceQueue; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -import org.redisson.api.CronSchedule; -import org.redisson.api.ExecutorOptions; -import org.redisson.api.RAtomicLong; -import org.redisson.api.RExecutorBatchFuture; -import org.redisson.api.RExecutorFuture; -import org.redisson.api.RFuture; -import org.redisson.api.RRemoteService; -import org.redisson.api.RScheduledExecutorService; -import org.redisson.api.RScheduledFuture; -import org.redisson.api.RSemaphore; -import org.redisson.api.RTopic; -import org.redisson.api.RemoteInvocationOptions; -import org.redisson.api.WorkerOptions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import org.redisson.api.*; import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.connection.ConnectionManager; -import org.redisson.executor.RedissonExecutorBatchFuture; -import org.redisson.executor.RedissonExecutorFuture; -import org.redisson.executor.RedissonExecutorFutureReference; -import org.redisson.executor.RedissonExecutorRemoteService; -import org.redisson.executor.RedissonScheduledFuture; -import org.redisson.executor.RemoteExecutorService; -import org.redisson.executor.RemoteExecutorServiceAsync; -import org.redisson.executor.RemotePromise; -import org.redisson.executor.ScheduledTasksService; -import org.redisson.executor.TasksBatchService; -import org.redisson.executor.TasksRunnerService; -import org.redisson.executor.TasksService; -import org.redisson.executor.params.ScheduledAtFixedRateParameters; -import org.redisson.executor.params.ScheduledCronExpressionParameters; -import org.redisson.executor.params.ScheduledParameters; -import org.redisson.executor.params.ScheduledWithFixedDelayParameters; -import org.redisson.executor.params.TaskParameters; +import org.redisson.executor.*; +import org.redisson.executor.params.*; import org.redisson.misc.Injector; import org.redisson.misc.PromiseDelegator; import org.redisson.misc.RPromise; @@ -91,8 +36,14 @@ import org.redisson.remote.ResponseEntry.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; +import java.io.*; +import java.lang.invoke.SerializedLambda; +import java.lang.ref.ReferenceQueue; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; /** * @@ -215,7 +166,17 @@ public class RedissonExecutorService implements RScheduledExecutorService { ThreadLocalRandom.current().nextBytes(id); return ByteBufUtil.hexDump(id); } - + + @Override + public int getTaskCount() { + return commandExecutor.get(getTaskCountAsync()); + } + + @Override + public RFuture getTaskCountAsync() { + return commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_INTEGER, tasksCounterName); + } + @Override public int countActiveWorkers() { String id = generateRequestId(); diff --git a/redisson/src/main/java/org/redisson/api/RExecutorService.java b/redisson/src/main/java/org/redisson/api/RExecutorService.java index 1ca6c722f..484ab925c 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorService.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorService.java @@ -122,7 +122,14 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync * @param options - worker options */ void registerWorkers(WorkerOptions options); - + + /** + * Returns amount of tasks awaiting for execution and/or currently in execution. + * + * @return amount of tasks + */ + int getTaskCount(); + /** * Returns active workers amount available for tasks execution. * diff --git a/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java b/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java index 9cad6c90c..81ae93027 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java @@ -25,6 +25,13 @@ import java.util.concurrent.Callable; */ public interface RExecutorServiceAsync { + /** + * Returns amount of tasks awaiting for execution and/or currently in execution. + * + * @return amount of tasks + */ + RFuture getTaskCountAsync(); + /** * Deletes executor request queue and state objects * diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 2461567e2..21e9a307d 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -321,7 +321,7 @@ public interface RedisCommands { RedisCommand GET = new RedisCommand("GET"); RedisStrictCommand GET_LONG = new RedisStrictCommand("GET", new LongReplayConvertor()); - RedisStrictCommand GET_INTEGER = new RedisStrictCommand("GET", new IntegerReplayConvertor()); + RedisStrictCommand GET_INTEGER = new RedisStrictCommand("GET", new IntegerReplayConvertor(0)); RedisStrictCommand GET_DOUBLE = new RedisStrictCommand("GET", new DoubleNullSafeReplayConvertor()); RedisCommand GETSET = new RedisCommand("GETSET"); RedisCommand SET = new RedisCommand("SET", new VoidReplayConvertor()); diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 9a9cd6d7e..25a18a052 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -71,6 +71,23 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(canceled).isTrue(); } + @Test + public void testTaskCount() throws InterruptedException { + RExecutorService e = redisson.getExecutorService("test"); + assertThat(e.getTaskCount()).isEqualTo(0); + + e.submit(new DelayedTask(1000, "testcounter")); + e.submit(new DelayedTask(1000, "testcounter")); + for (int i = 0; i < 20; i++) { + e.submit(new RunnableTask()); + } + assertThat(e.getTaskCount()).isEqualTo(22); + + Thread.sleep(1500); + + assertThat(e.getTaskCount()).isEqualTo(21); + } + @Test public void testBatchSubmitRunnable() throws InterruptedException, ExecutionException, TimeoutException { RExecutorService e = redisson.getExecutorService("test"); diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index 7ef1deef3..6a1d2d438 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -22,14 +22,7 @@ import org.redisson.BaseTest; import org.redisson.Redisson; import org.redisson.RedissonExecutorService; import org.redisson.RedissonNode; -import org.redisson.api.CronSchedule; -import org.redisson.api.ExecutorOptions; -import org.redisson.api.RExecutorFuture; -import org.redisson.api.RScheduledExecutorService; -import org.redisson.api.RScheduledFuture; -import org.redisson.api.RedissonClient; -import org.redisson.api.RemoteInvocationOptions; -import org.redisson.api.WorkerOptions; +import org.redisson.api.*; import org.redisson.api.annotation.RInject; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; @@ -96,7 +89,22 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { client.shutdown(); node.shutdown(); } - + + @Test + public void testTaskCount() throws InterruptedException { + RScheduledExecutorService e = redisson.getExecutorService("test"); + e.schedule(new RunnableTask(), 1, TimeUnit.SECONDS); + e.schedule(new RunnableTask(), 2, TimeUnit.SECONDS); + assertThat(e.getTaskCount()).isEqualTo(2); + + Thread.sleep(1100); + assertThat(e.getTaskCount()).isEqualTo(1); + + Thread.sleep(1100); + assertThat(e.getTaskCount()).isEqualTo(0); + } + + @Test public void testDelay() throws InterruptedException { RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));