diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 94addf097..b57a82eb6 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -23,7 +23,6 @@ import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -35,7 +34,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -48,7 +46,6 @@ import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.annotation.RInject; -import org.redisson.api.listener.BaseStatusListener; import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; @@ -66,8 +63,6 @@ import org.redisson.misc.RPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; @@ -338,7 +333,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } @Override - public Future submit(Callable task) { + public RFuture submit(Callable task) { RemotePromise promise = (RemotePromise) submitAsync(task); execute(promise); return promise; @@ -411,7 +406,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } @Override - public Future submit(Runnable task) { + public RFuture submit(Runnable task) { RemotePromise promise = (RemotePromise) submitAsync(task); execute(promise); return promise; @@ -428,7 +423,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } @Override - public ScheduledFuture schedule(Runnable task, long delay, TimeUnit unit) { + public RScheduledFuture schedule(Runnable task, long delay, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(task, delay, unit); execute((RemotePromise)future.getInnerPromise()); return future; @@ -446,7 +441,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } @Override - public ScheduledFuture schedule(Callable task, long delay, TimeUnit unit) { + public RScheduledFuture schedule(Callable task, long delay, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(task, delay, unit); execute((RemotePromise)future.getInnerPromise()); return future; @@ -464,7 +459,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + public RScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAtFixedRateAsync(task, initialDelay, period, unit); execute((RemotePromise)future.getInnerPromise()); return future; @@ -505,7 +500,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + public RScheduledFuture scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleWithFixedDelayAsync(task, initialDelay, delay, unit); execute((RemotePromise)future.getInnerPromise()); return future; diff --git a/redisson/src/main/java/org/redisson/api/RExecutorService.java b/redisson/src/main/java/org/redisson/api/RExecutorService.java index 591365a86..6cd0fdbc3 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorService.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorService.java @@ -15,6 +15,7 @@ */ package org.redisson.api; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; /** @@ -25,6 +26,43 @@ import java.util.concurrent.ExecutorService; */ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync { + /** + * Submits a value-returning task for execution and returns a + * Future representing the pending results of the task. The + * Future's {@code get} method will return the task's result upon + * successful completion. + * + * @param task the task to submit + * @param the type of the task's result + * @return a Future representing pending completion of the task + */ + @Override + RFuture submit(Callable task); + + /** + * Submits a Runnable task for execution and returns a Future + * representing that task. The Future's {@code get} method will + * return the given result upon successful completion. + * + * @param task the task to submit + * @param result the result to return + * @param the type of the result + * @return a Future representing pending completion of the task + */ + @Override + RFuture submit(Runnable task, T result);; + + /** + * Submits a Runnable task for execution and returns a Future + * representing that task. The Future's {@code get} method will + * return {@code null} upon successful completion. + * + * @param task the task to submit + * @return a Future representing pending completion of the task + */ + @Override + RFuture submit(Runnable task); + /** * Returns executor name * diff --git a/redisson/src/main/java/org/redisson/api/RScheduledExecutorService.java b/redisson/src/main/java/org/redisson/api/RScheduledExecutorService.java index 35bb5a845..637018779 100644 --- a/redisson/src/main/java/org/redisson/api/RScheduledExecutorService.java +++ b/redisson/src/main/java/org/redisson/api/RScheduledExecutorService.java @@ -15,8 +15,10 @@ */ package org.redisson.api; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Distributed implementation of {@link java.util.concurrent.ScheduledExecutorService} @@ -26,6 +28,86 @@ import java.util.concurrent.ScheduledFuture; */ public interface RScheduledExecutorService extends RExecutorService, ScheduledExecutorService, RScheduledExecutorServiceAsync { + /** + * Creates and executes a one-shot action that becomes enabled + * after the given delay. + * + * @param command the task to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @return a ScheduledFuture representing pending completion of + * the task and whose {@code get()} method will return + * {@code null} upon completion + */ + @Override + RScheduledFuture schedule(Runnable command, + long delay, TimeUnit unit); + + /** + * Creates and executes a ScheduledFuture that becomes enabled after the + * given delay. + * + * @param callable the function to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @param the type of the callable's result + * @return a ScheduledFuture that can be used to extract result or cancel + */ + @Override + RScheduledFuture schedule(Callable callable, + long delay, TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the given + * period; that is executions will commence after + * {@code initialDelay} then {@code initialDelay+period}, then + * {@code initialDelay + 2 * period}, and so on. + * If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. If any execution of this task + * takes longer than its period, then subsequent executions + * may start late, but will not concurrently execute. + * + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param period the period between successive executions + * @param unit the time unit of the initialDelay and period parameters + * @return a ScheduledFuture representing pending completion of + * the task, and whose {@code get()} method will throw an + * exception upon cancellation + */ + @Override + RScheduledFuture scheduleAtFixedRate(Runnable command, + long initialDelay, + long period, + TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the + * given delay between the termination of one execution and the + * commencement of the next. If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. + * + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param delay the delay between the termination of one + * execution and the commencement of the next + * @param unit the time unit of the initialDelay and delay parameters + * @return a ScheduledFuture representing pending completion of + * the task, and whose {@code get()} method will throw an + * exception upon cancellation + */ + @Override + RScheduledFuture scheduleWithFixedDelay(Runnable command, + long initialDelay, + long delay, + TimeUnit unit); + /** * Cancels scheduled task by id * diff --git a/redisson/src/main/java/org/redisson/executor/RedissonCompletionService.java b/redisson/src/main/java/org/redisson/executor/RedissonCompletionService.java new file mode 100644 index 000000000..3d46278a0 --- /dev/null +++ b/redisson/src/main/java/org/redisson/executor/RedissonCompletionService.java @@ -0,0 +1,112 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.redisson.api.RFuture; +import org.redisson.api.RScheduledExecutorService; + +import io.netty.util.concurrent.FutureListener; + +/** + * A {@link CompletionService} that uses a supplied {@link Executor} + * to execute tasks. This class arranges that submitted tasks are, + * upon completion, placed on a queue accessible using {@code take}. + * The class is lightweight enough to be suitable for transient use + * when processing groups of tasks. + * + * @author Nikita Koksharov + * + * @param + */ +public class RedissonCompletionService implements CompletionService { + + protected final RScheduledExecutorService executorService; + + protected final BlockingQueue> completionQueue; + + public RedissonCompletionService(RScheduledExecutorService executorService) { + this(executorService, null); + } + + public RedissonCompletionService(RScheduledExecutorService executorService, BlockingQueue> completionQueue) { + if (executorService == null) { + throw new NullPointerException("executorService can't be null"); + } + + this.executorService = executorService; + if (completionQueue == null) { + completionQueue = new LinkedBlockingQueue>(); + } + + this.completionQueue = completionQueue; + } + + @Override + public Future submit(Callable task) { + if (task == null) { + throw new NullPointerException("taks can't be null"); + } + + final RFuture f = executorService.submit(task); + f.addListener(new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + completionQueue.add(f); + } + }); + return f; + } + + @Override + public Future submit(Runnable task, V result) { + if (task == null) { + throw new NullPointerException("taks can't be null"); + } + + final RFuture f = executorService.submit(task, result); + f.addListener(new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + completionQueue.add(f); + } + }); + return f; + } + + @Override + public Future take() throws InterruptedException { + return completionQueue.take(); + } + + @Override + public Future poll() { + return completionQueue.poll(); + } + + @Override + public Future poll(long timeout, TimeUnit unit) throws InterruptedException { + return completionQueue.poll(timeout, unit); + } + +}