From c1bebaf6e245dc9b0588c9b05ff6c4b6eef21380 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 14 Sep 2018 11:15:58 +0300 Subject: [PATCH 1/2] refactoring --- .../org/redisson/RedissonExecutorService.java | 184 +++--------------- 1 file changed, 27 insertions(+), 157 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index c75084d61..6b1ff7f70 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -25,11 +25,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -929,77 +927,10 @@ public class RedissonExecutorService implements RScheduledExecutorService { return commandExecutor.get(scheduledFuture); } - - private T doInvokeAny(Collection> tasks, - boolean timed, long millis) throws InterruptedException, ExecutionException, TimeoutException { - if (tasks == null) { - throw new NullPointerException(); - } - - int ntasks = tasks.size(); - if (ntasks == 0) { - throw new IllegalArgumentException(); - } - - List> futures = new ArrayList>(ntasks); - - try { - ExecutionException ee = null; - long lastTime = timed ? System.currentTimeMillis() : 0; - Iterator> it = tasks.iterator(); - - // Start one task for sure; the rest incrementally - futures.add(submit(it.next())); - --ntasks; - int active = 1; - - for (;;) { - Future f = poll(futures); - if (f == null) { - if (ntasks > 0) { - --ntasks; - futures.add(submit(it.next())); - ++active; - } - else if (active == 0) - break; - else if (timed) { - f = poll(futures, millis, TimeUnit.MILLISECONDS); - if (f == null) - throw new TimeoutException(); - long now = System.currentTimeMillis(); - millis -= now - lastTime; - lastTime = now; - } - else - f = poll(futures, -1, null); - } - if (f != null) { - --active; - try { - return f.get(); - } catch (ExecutionException eex) { - ee = eex; - } catch (RuntimeException rex) { - ee = new ExecutionException(rex); - } - } - } - - if (ee == null) - ee = new ExecutionException("No tasks were finised", null); - throw ee; - - } finally { - for (Future f : futures) { - f.cancel(true); - } - } - } - private Future poll(List> futures, long timeout, TimeUnit timeUnit) throws InterruptedException { + private io.netty.util.concurrent.Future poll(List> futures, long timeout, TimeUnit timeUnit) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference> result = new AtomicReference>(); + final AtomicReference> result = new AtomicReference>(); FutureListener listener = new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { @@ -1007,7 +938,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { result.compareAndSet(null, future); } }; - for (Future future : futures) { + for (Future future : futures) { RFuture f = (RFuture) future; f.addListener(listener); } @@ -1018,7 +949,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { latch.await(timeout, timeUnit); } - for (Future future : futures) { + for (Future future : futures) { RFuture f = (RFuture) future; f.removeListener(listener); } @@ -1026,20 +957,11 @@ public class RedissonExecutorService implements RScheduledExecutorService { return result.get(); } - private Future poll(List> futures) { - for (Future future : futures) { - if (future.isDone()) { - return future; - } - } - return null; - } - @Override public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { try { - return doInvokeAny(tasks, false, 0); + return invokeAny(tasks, -1, null); } catch (TimeoutException cannotHappen) { return null; } @@ -1048,8 +970,20 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return doInvokeAny(tasks, true, unit.toMillis(timeout)); + throws InterruptedException, ExecutionException, TimeoutException { + if (tasks == null) { + throw new NullPointerException(); + } + + RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()])); + io.netty.util.concurrent.Future result = poll(future.getTaskFutures(), timeout, unit); + if (result == null) { + throw new TimeoutException(); + } + for (RExecutorFuture f : future.getTaskFutures()) { + f.cancel(true); + } + return result.getNow(); } @Override @@ -1058,29 +992,10 @@ public class RedissonExecutorService implements RScheduledExecutorService { throw new NullPointerException(); } - List> futures = new ArrayList>(tasks.size()); - boolean done = false; - try { - for (Callable t : tasks) { - Future future = submit(t); - futures.add(future); - } - for (Future f : futures) { - if (!f.isDone()) { - try { - f.get(); - } catch (CancellationException ignore) { - } catch (ExecutionException ignore) { - } - } - } - done = true; - return futures; - } finally { - if (!done) - for (Future f : futures) - f.cancel(true); - } + RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()])); + future.await(); + List futures = future.getTaskFutures(); + return (List>)futures; } @Override @@ -1090,55 +1005,10 @@ public class RedissonExecutorService implements RScheduledExecutorService { throw new NullPointerException(); } - long millis = unit.toMillis(timeout); - List> futures = new ArrayList>(tasks.size()); - boolean done = false; - - try { - long lastTime = System.currentTimeMillis(); - - for (Callable task : tasks) { - Future future = submit(task); - futures.add(future); - - long now = System.currentTimeMillis(); - millis -= now - lastTime; - lastTime = now; - if (millis <= 0) { - int remainFutures = tasks.size() - futures.size(); - for (int i = 0; i < remainFutures; i++) { - RPromise cancelledFuture = new RedissonPromise(); - cancelledFuture.cancel(true); - futures.add(cancelledFuture); - - } - return futures; - } - } - - for (Future f : futures) { - if (!f.isDone()) { - if (millis <= 0) - return futures; - try { - f.get(millis, TimeUnit.MILLISECONDS); - } catch (CancellationException ignore) { - } catch (ExecutionException ignore) { - } catch (TimeoutException toe) { - return futures; - } - long now = System.currentTimeMillis(); - millis -= now - lastTime; - lastTime = now; - } - } - done = true; - return futures; - } finally { - if (!done) - for (Future f : futures) - f.cancel(true); - } + RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()])); + future.await(timeout, unit); + List futures = future.getTaskFutures(); + return (List>)futures; } } From a50eb9e0d7f59c3d2b2046b1520e545020c64d37 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 14 Sep 2018 11:16:05 +0300 Subject: [PATCH 2/2] tests added --- .../org/redisson/executor/DelayedTask.java | 40 +++++++++++++++++++ .../executor/RedissonExecutorServiceTest.java | 19 +++++++++ 2 files changed, 59 insertions(+) create mode 100644 redisson/src/test/java/org/redisson/executor/DelayedTask.java diff --git a/redisson/src/test/java/org/redisson/executor/DelayedTask.java b/redisson/src/test/java/org/redisson/executor/DelayedTask.java new file mode 100644 index 000000000..dd4ae4b08 --- /dev/null +++ b/redisson/src/test/java/org/redisson/executor/DelayedTask.java @@ -0,0 +1,40 @@ +package org.redisson.executor; + +import java.util.concurrent.Callable; + +import org.redisson.api.RAtomicLong; +import org.redisson.api.RedissonClient; +import org.redisson.api.annotation.RInject; + +public class DelayedTask implements Callable { + + @RInject + private RedissonClient redisson; + + private long delay; + private String counterName; + + public DelayedTask() { + // TODO Auto-generated constructor stub + } + + public DelayedTask(long delay, String counter) { + super(); + this.delay = delay; + this.counterName = counter; + } + + @Override + public Object call() throws Exception { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + RAtomicLong counter = redisson.getAtomicLong(counterName); + counter.incrementAndGet(); + return null; + } + +} diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 0b1379f80..87c007b0d 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -374,6 +374,25 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(redisson.getKeys().count()).isZero(); } + @Test + public void testInvokeAll() throws InterruptedException { + RExecutorService e = redisson.getExecutorService("test"); + List> futures = e.invokeAll(Arrays.asList(new CallableTask(), new CallableTask())); + for (Future future : futures) { + assertThat(future.isDone()); + } + e.shutdown(); + } + + @Test + public void testInvokeAny() throws InterruptedException, ExecutionException { + RExecutorService e = redisson.getExecutorService("test"); + Object res = e.invokeAny(Arrays.asList((Callable)(Object)new CallableTask(), new DelayedTask(20000, "counter"))); + assertThat(res).isEqualTo(CallableTask.RESULT); + e.shutdown(); + } + + @Test(expected = RejectedExecutionException.class) public void testEmptyRejectSubmitRunnable() throws InterruptedException, ExecutionException { RExecutorService e = redisson.getExecutorService("test");