Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 6 years ago
commit 76b8311981

@ -25,11 +25,9 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -930,76 +928,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, private <T> io.netty.util.concurrent.Future<T> poll(List<RExecutorFuture<?>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException {
boolean timed, long millis) throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null) {
throw new NullPointerException();
}
int ntasks = tasks.size();
if (ntasks == 0) {
throw new IllegalArgumentException();
}
List<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
try {
ExecutionException ee = null;
long lastTime = timed ? System.currentTimeMillis() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = poll(futures);
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = poll(futures, millis, TimeUnit.MILLISECONDS);
if (f == null)
throw new TimeoutException();
long now = System.currentTimeMillis();
millis -= now - lastTime;
lastTime = now;
}
else
f = poll(futures, -1, null);
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException("No tasks were finised", null);
throw ee;
} finally {
for (Future<T> f : futures) {
f.cancel(true);
}
}
}
private <T> Future<T> poll(List<Future<T>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Future<T>> result = new AtomicReference<Future<T>>(); final AtomicReference<io.netty.util.concurrent.Future<T>> result = new AtomicReference<io.netty.util.concurrent.Future<T>>();
FutureListener<T> listener = new FutureListener<T>() { FutureListener<T> listener = new FutureListener<T>() {
@Override @Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception { public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
@ -1007,7 +938,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
result.compareAndSet(null, future); result.compareAndSet(null, future);
} }
}; };
for (Future<T> future : futures) { for (Future<?> future : futures) {
RFuture<T> f = (RFuture<T>) future; RFuture<T> f = (RFuture<T>) future;
f.addListener(listener); f.addListener(listener);
} }
@ -1018,7 +949,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
latch.await(timeout, timeUnit); latch.await(timeout, timeUnit);
} }
for (Future<T> future : futures) { for (Future<?> future : futures) {
RFuture<T> f = (RFuture<T>) future; RFuture<T> f = (RFuture<T>) future;
f.removeListener(listener); f.removeListener(listener);
} }
@ -1026,20 +957,11 @@ public class RedissonExecutorService implements RScheduledExecutorService {
return result.get(); return result.get();
} }
private <T> Future<T> poll(List<Future<T>> futures) {
for (Future<T> future : futures) {
if (future.isDone()) {
return future;
}
}
return null;
}
@Override @Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
try { try {
return doInvokeAny(tasks, false, 0); return invokeAny(tasks, -1, null);
} catch (TimeoutException cannotHappen) { } catch (TimeoutException cannotHappen) {
return null; return null;
} }
@ -1048,8 +970,20 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException { throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toMillis(timeout)); if (tasks == null) {
throw new NullPointerException();
}
RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()]));
io.netty.util.concurrent.Future<T> result = poll(future.getTaskFutures(), timeout, unit);
if (result == null) {
throw new TimeoutException();
}
for (RExecutorFuture<?> f : future.getTaskFutures()) {
f.cancel(true);
}
return result.getNow();
} }
@Override @Override
@ -1058,29 +992,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new NullPointerException(); throw new NullPointerException();
} }
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()]));
boolean done = false; future.await();
try { List<?> futures = future.getTaskFutures();
for (Callable<T> t : tasks) { return (List<Future<T>>)futures;
Future<T> future = submit(t);
futures.add(future);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
} }
@Override @Override
@ -1090,55 +1005,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new NullPointerException(); throw new NullPointerException();
} }
long millis = unit.toMillis(timeout); RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()]));
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); future.await(timeout, unit);
boolean done = false; List<?> futures = future.getTaskFutures();
return (List<Future<T>>)futures;
try {
long lastTime = System.currentTimeMillis();
for (Callable<T> task : tasks) {
Future<T> future = submit(task);
futures.add(future);
long now = System.currentTimeMillis();
millis -= now - lastTime;
lastTime = now;
if (millis <= 0) {
int remainFutures = tasks.size() - futures.size();
for (int i = 0; i < remainFutures; i++) {
RPromise<T> cancelledFuture = new RedissonPromise<T>();
cancelledFuture.cancel(true);
futures.add(cancelledFuture);
}
return futures;
}
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (millis <= 0)
return futures;
try {
f.get(millis, TimeUnit.MILLISECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.currentTimeMillis();
millis -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
} }
} }

@ -0,0 +1,40 @@
package org.redisson.executor;
import java.util.concurrent.Callable;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
public class DelayedTask implements Callable<Object> {
@RInject
private RedissonClient redisson;
private long delay;
private String counterName;
public DelayedTask() {
// TODO Auto-generated constructor stub
}
public DelayedTask(long delay, String counter) {
super();
this.delay = delay;
this.counterName = counter;
}
@Override
public Object call() throws Exception {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
RAtomicLong counter = redisson.getAtomicLong(counterName);
counter.incrementAndGet();
return null;
}
}

@ -374,6 +374,25 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(redisson.getKeys().count()).isZero(); assertThat(redisson.getKeys().count()).isZero();
} }
@Test
public void testInvokeAll() throws InterruptedException {
RExecutorService e = redisson.getExecutorService("test");
List<Future<String>> futures = e.invokeAll(Arrays.asList(new CallableTask(), new CallableTask()));
for (Future<String> future : futures) {
assertThat(future.isDone());
}
e.shutdown();
}
@Test
public void testInvokeAny() throws InterruptedException, ExecutionException {
RExecutorService e = redisson.getExecutorService("test");
Object res = e.invokeAny(Arrays.asList((Callable<Object>)(Object)new CallableTask(), new DelayedTask(20000, "counter")));
assertThat(res).isEqualTo(CallableTask.RESULT);
e.shutdown();
}
@Test(expected = RejectedExecutionException.class) @Test(expected = RejectedExecutionException.class)
public void testEmptyRejectSubmitRunnable() throws InterruptedException, ExecutionException { public void testEmptyRejectSubmitRunnable() throws InterruptedException, ExecutionException {
RExecutorService e = redisson.getExecutorService("test"); RExecutorService e = redisson.getExecutorService("test");

Loading…
Cancel
Save