RedissonCompletionService implementation added

pull/816/head
Nikita 8 years ago
parent 6af05f4c57
commit ecf44e6d9c

@ -23,7 +23,6 @@ import java.lang.reflect.Modifier;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -35,7 +34,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -48,7 +46,6 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RInject; import org.redisson.api.annotation.RInject;
import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
@ -66,8 +63,6 @@ import org.redisson.misc.RPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -338,7 +333,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public <T> Future<T> submit(Callable<T> task) { public <T> RFuture<T> submit(Callable<T> task) {
RemotePromise<T> promise = (RemotePromise<T>) submitAsync(task); RemotePromise<T> promise = (RemotePromise<T>) submitAsync(task);
execute(promise); execute(promise);
return promise; return promise;
@ -411,7 +406,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public Future<?> submit(Runnable task) { public RFuture<?> submit(Runnable task) {
RemotePromise<Void> promise = (RemotePromise<Void>) submitAsync(task); RemotePromise<Void> promise = (RemotePromise<Void>) submitAsync(task);
execute(promise); execute(promise);
return promise; return promise;
@ -428,7 +423,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) { public RScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, delay, unit); RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, delay, unit);
execute((RemotePromise<?>)future.getInnerPromise()); execute((RemotePromise<?>)future.getInnerPromise());
return future; return future;
@ -446,7 +441,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public <V> ScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) { public <V> RScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(task, delay, unit); RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(task, delay, unit);
execute((RemotePromise<V>)future.getInnerPromise()); execute((RemotePromise<V>)future.getInnerPromise());
return future; return future;
@ -464,7 +459,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { public RScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAtFixedRateAsync(task, initialDelay, period, unit); RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAtFixedRateAsync(task, initialDelay, period, unit);
execute((RemotePromise<?>)future.getInnerPromise()); execute((RemotePromise<?>)future.getInnerPromise());
return future; return future;
@ -505,7 +500,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { public RScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleWithFixedDelayAsync(task, initialDelay, delay, unit); RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleWithFixedDelayAsync(task, initialDelay, delay, unit);
execute((RemotePromise<?>)future.getInnerPromise()); execute((RemotePromise<?>)future.getInnerPromise());
return future; return future;

@ -15,6 +15,7 @@
*/ */
package org.redisson.api; package org.redisson.api;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -25,6 +26,43 @@ import java.util.concurrent.ExecutorService;
*/ */
public interface RExecutorService extends ExecutorService, RExecutorServiceAsync { public interface RExecutorService extends ExecutorService, RExecutorServiceAsync {
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
*/
@Override
<T> RFuture<T> submit(Callable<T> task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @param <T> the type of the result
* @return a Future representing pending completion of the task
*/
@Override
<T> RFuture<T> submit(Runnable task, T result);;
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
*/
@Override
RFuture<?> submit(Runnable task);
/** /**
* Returns executor name * Returns executor name
* *

@ -15,8 +15,10 @@
*/ */
package org.redisson.api; package org.redisson.api;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** /**
* Distributed implementation of {@link java.util.concurrent.ScheduledExecutorService} * Distributed implementation of {@link java.util.concurrent.ScheduledExecutorService}
@ -26,6 +28,86 @@ import java.util.concurrent.ScheduledFuture;
*/ */
public interface RScheduledExecutorService extends RExecutorService, ScheduledExecutorService, RScheduledExecutorServiceAsync { public interface RScheduledExecutorService extends RExecutorService, ScheduledExecutorService, RScheduledExecutorServiceAsync {
/**
* Creates and executes a one-shot action that becomes enabled
* after the given delay.
*
* @param command the task to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
* @return a ScheduledFuture representing pending completion of
* the task and whose {@code get()} method will return
* {@code null} upon completion
*/
@Override
RScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
/**
* Creates and executes a ScheduledFuture that becomes enabled after the
* given delay.
*
* @param callable the function to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
* @param <V> the type of the callable's result
* @return a ScheduledFuture that can be used to extract result or cancel
*/
@Override
<V> RScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the given
* period; that is executions will commence after
* {@code initialDelay} then {@code initialDelay+period}, then
* {@code initialDelay + 2 * period}, and so on.
* If any execution of the task
* encounters an exception, subsequent executions are suppressed.
* Otherwise, the task will only terminate via cancellation or
* termination of the executor. If any execution of this task
* takes longer than its period, then subsequent executions
* may start late, but will not concurrently execute.
*
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param period the period between successive executions
* @param unit the time unit of the initialDelay and period parameters
* @return a ScheduledFuture representing pending completion of
* the task, and whose {@code get()} method will throw an
* exception upon cancellation
*/
@Override
RScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the
* given delay between the termination of one execution and the
* commencement of the next. If any execution of the task
* encounters an exception, subsequent executions are suppressed.
* Otherwise, the task will only terminate via cancellation or
* termination of the executor.
*
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param delay the delay between the termination of one
* execution and the commencement of the next
* @param unit the time unit of the initialDelay and delay parameters
* @return a ScheduledFuture representing pending completion of
* the task, and whose {@code get()} method will throw an
* exception upon cancellation
*/
@Override
RScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
/** /**
* Cancels scheduled task by id * Cancels scheduled task by id
* *

@ -0,0 +1,112 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RScheduledExecutorService;
import io.netty.util.concurrent.FutureListener;
/**
* A {@link CompletionService} that uses a supplied {@link Executor}
* to execute tasks. This class arranges that submitted tasks are,
* upon completion, placed on a queue accessible using {@code take}.
* The class is lightweight enough to be suitable for transient use
* when processing groups of tasks.
*
* @author Nikita Koksharov
*
* @param <V>
*/
public class RedissonCompletionService<V> implements CompletionService<V> {
protected final RScheduledExecutorService executorService;
protected final BlockingQueue<RFuture<V>> completionQueue;
public RedissonCompletionService(RScheduledExecutorService executorService) {
this(executorService, null);
}
public RedissonCompletionService(RScheduledExecutorService executorService, BlockingQueue<RFuture<V>> completionQueue) {
if (executorService == null) {
throw new NullPointerException("executorService can't be null");
}
this.executorService = executorService;
if (completionQueue == null) {
completionQueue = new LinkedBlockingQueue<RFuture<V>>();
}
this.completionQueue = completionQueue;
}
@Override
public Future<V> submit(Callable<V> task) {
if (task == null) {
throw new NullPointerException("taks can't be null");
}
final RFuture<V> f = executorService.submit(task);
f.addListener(new FutureListener<V>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<V> future) throws Exception {
completionQueue.add(f);
}
});
return f;
}
@Override
public Future<V> submit(Runnable task, V result) {
if (task == null) {
throw new NullPointerException("taks can't be null");
}
final RFuture<V> f = executorService.submit(task, result);
f.addListener(new FutureListener<V>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<V> future) throws Exception {
completionQueue.add(f);
}
});
return f;
}
@Override
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
@Override
public Future<V> poll() {
return completionQueue.poll();
}
@Override
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}
Loading…
Cancel
Save