Feature - RExecutorService.getTaskCount method added. #2350

pull/2400/head
Nikita Koksharov 5 years ago
parent 4ed58193df
commit 246f1d7d3c

@ -15,72 +15,17 @@
*/
package org.redisson;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.lang.invoke.SerializedLambda;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.CronSchedule;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.WorkerOptions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.RedissonExecutorBatchFuture;
import org.redisson.executor.RedissonExecutorFuture;
import org.redisson.executor.RedissonExecutorFutureReference;
import org.redisson.executor.RedissonExecutorRemoteService;
import org.redisson.executor.RedissonScheduledFuture;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
import org.redisson.executor.RemotePromise;
import org.redisson.executor.ScheduledTasksService;
import org.redisson.executor.TasksBatchService;
import org.redisson.executor.TasksRunnerService;
import org.redisson.executor.TasksService;
import org.redisson.executor.params.ScheduledAtFixedRateParameters;
import org.redisson.executor.params.ScheduledCronExpressionParameters;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.executor.params.ScheduledWithFixedDelayParameters;
import org.redisson.executor.params.TaskParameters;
import org.redisson.executor.*;
import org.redisson.executor.params.*;
import org.redisson.misc.Injector;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
@ -91,8 +36,14 @@ import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.io.*;
import java.lang.invoke.SerializedLambda;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/**
*
@ -215,7 +166,17 @@ public class RedissonExecutorService implements RScheduledExecutorService {
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
@Override
public int getTaskCount() {
return commandExecutor.get(getTaskCountAsync());
}
@Override
public RFuture<Integer> getTaskCountAsync() {
return commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_INTEGER, tasksCounterName);
}
@Override
public int countActiveWorkers() {
String id = generateRequestId();

@ -122,7 +122,14 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
* @param options - worker options
*/
void registerWorkers(WorkerOptions options);
/**
* Returns amount of tasks awaiting for execution and/or currently in execution.
*
* @return amount of tasks
*/
int getTaskCount();
/**
* Returns active workers amount available for tasks execution.
*

@ -25,6 +25,13 @@ import java.util.concurrent.Callable;
*/
public interface RExecutorServiceAsync {
/**
* Returns amount of tasks awaiting for execution and/or currently in execution.
*
* @return amount of tasks
*/
RFuture<Integer> getTaskCountAsync();
/**
* Deletes executor request queue and state objects
*

@ -321,7 +321,7 @@ public interface RedisCommands {
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor());
RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor());
RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor(0));
RedisStrictCommand<Double> GET_DOUBLE = new RedisStrictCommand<Double>("GET", new DoubleNullSafeReplayConvertor());
RedisCommand<Object> GETSET = new RedisCommand<Object>("GETSET");
RedisCommand<Void> SET = new RedisCommand<Void>("SET", new VoidReplayConvertor());

@ -71,6 +71,23 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(canceled).isTrue();
}
@Test
public void testTaskCount() throws InterruptedException {
RExecutorService e = redisson.getExecutorService("test");
assertThat(e.getTaskCount()).isEqualTo(0);
e.submit(new DelayedTask(1000, "testcounter"));
e.submit(new DelayedTask(1000, "testcounter"));
for (int i = 0; i < 20; i++) {
e.submit(new RunnableTask());
}
assertThat(e.getTaskCount()).isEqualTo(22);
Thread.sleep(1500);
assertThat(e.getTaskCount()).isEqualTo(21);
}
@Test
public void testBatchSubmitRunnable() throws InterruptedException, ExecutionException, TimeoutException {
RExecutorService e = redisson.getExecutorService("test");

@ -22,14 +22,7 @@ import org.redisson.BaseTest;
import org.redisson.Redisson;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonNode;
import org.redisson.api.CronSchedule;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.WorkerOptions;
import org.redisson.api.*;
import org.redisson.api.annotation.RInject;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
@ -96,7 +89,22 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
client.shutdown();
node.shutdown();
}
@Test
public void testTaskCount() throws InterruptedException {
RScheduledExecutorService e = redisson.getExecutorService("test");
e.schedule(new RunnableTask(), 1, TimeUnit.SECONDS);
e.schedule(new RunnableTask(), 2, TimeUnit.SECONDS);
assertThat(e.getTaskCount()).isEqualTo(2);
Thread.sleep(1100);
assertThat(e.getTaskCount()).isEqualTo(1);
Thread.sleep(1100);
assertThat(e.getTaskCount()).isEqualTo(0);
}
@Test
public void testDelay() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));

Loading…
Cancel
Save