refactoring

pull/4061/head
Nikita Koksharov 3 years ago
parent 336967c799
commit f52a5ffe51

@ -15,23 +15,18 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RLockAsync;
import org.redisson.client.RedisResponseTimeoutException;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
/**
* Groups multiple independent locks and manages them as one lock.
@ -88,17 +83,15 @@ public class RedissonMultiLock implements RLock {
}
RLock lock = iterator.next();
RPromise<Boolean> lockAcquiredFuture = new RedissonPromise<Boolean>();
RFuture<Boolean> lockAcquiredFuture;
if (waitTime == -1 && leaseTime == -1) {
lock.tryLockAsync(threadId)
.onComplete(new TransferListener<Boolean>(lockAcquiredFuture));
lockAcquiredFuture = lock.tryLockAsync(threadId);
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lock.tryLockAsync(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS, threadId)
.onComplete(new TransferListener<Boolean>(lockAcquiredFuture));
lockAcquiredFuture = lock.tryLockAsync(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS, threadId);
}
lockAcquiredFuture.onComplete((res, e) -> {
lockAcquiredFuture.whenComplete((res, e) -> {
boolean lockAcquired = false;
if (res != null) {
lockAcquired = res;

@ -15,10 +15,6 @@
*/
package org.redisson.mapreduce;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.*;
import org.redisson.Redisson;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
@ -29,6 +25,10 @@ import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.codec.Codec;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.*;
/**
*
* @author Nikita Koksharov
@ -121,10 +121,10 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
return null;
}
SubTasksExecutor reduceExecutor = new SubTasksExecutor(executor, workersAmount, startTime, timeout);
SubTasksExecutor reduceExecutor = new SubTasksExecutor(executor, startTime, timeout);
for (int i = 0; i < workersAmount; i++) {
String name = collectorMapName + ":" + i;
Runnable runnable = new ReducerTask<KOut, VOut>(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent);
Runnable runnable = new ReducerTask<>(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent);
reduceExecutor.submit(runnable);
}

@ -21,15 +21,15 @@ import org.redisson.api.mapreduce.RMapReduceExecutor;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.misc.CompletableFutureWrapper;
import java.lang.reflect.Modifier;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
*
@ -88,37 +88,35 @@ abstract class MapReduceExecutor<M, VIn, KOut, VOut> implements RMapReduceExecut
@Override
public RFuture<Map<KOut, VOut>> executeAsync() {
RPromise<Map<KOut, VOut>> promise = new RedissonPromise<Map<KOut, VOut>>();
RFuture<Void> future = executeMapperAsync(resultMapName, null);
if (timeout > 0) {
commandExecutor.getConnectionManager().newTimeout(task -> {
promise.tryFailure(new MapReduceTimeoutException());
}, timeout, TimeUnit.MILLISECONDS);
}
addCancelHandling(promise, future);
future.onComplete((res, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
AtomicReference<RFuture<BatchResult<?>>> batchRef = new AtomicReference<>();
RFuture<Void> mapperFuture = executeMapperAsync(resultMapName, null);
CompletableFuture<Map<KOut, VOut>> f = mapperFuture.thenCompose(res -> {
RBatch batch = redisson.createBatch();
RMapAsync<KOut, VOut> resultMap = batch.getMap(resultMapName, objectCodec);
resultMap.readAllMapAsync().onComplete(new TransferListener<>(promise));
RFuture<Map<KOut, VOut>> future = resultMap.readAllMapAsync();
resultMap.deleteAsync();
RFuture<BatchResult<?>> batchFuture = batch.executeAsync();
addCancelHandling(promise, batchFuture);
});
return promise;
}
private <T> void addCancelHandling(RPromise<T> promise, RFuture<?> future) {
promise.onComplete((res, e) -> {
if (promise.isCancelled()) {
future.cancel(true);
batchRef.set(batchFuture);
return future;
}).toCompletableFuture();
f.whenComplete((r, e) -> {
if (f.isCancelled()) {
if (batchRef.get() != null) {
batchRef.get().cancel(true);
}
mapperFuture.cancel(true);
}
});
if (timeout > 0) {
commandExecutor.getConnectionManager().newTimeout(task -> {
f.completeExceptionally(new MapReduceTimeoutException());
}, timeout, TimeUnit.MILLISECONDS);
}
return new CompletableFutureWrapper<>(f);
}
@Override

@ -15,14 +15,15 @@
*/
package org.redisson.mapreduce;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import java.util.concurrent.TimeoutException;
/**
*
@ -31,46 +32,24 @@ import org.redisson.api.RFuture;
*/
public class SubTasksExecutor {
public static class LatchListener implements BiConsumer<Object, Throwable> {
private CountDownLatch latch;
public LatchListener() {
}
public LatchListener(CountDownLatch latch) {
super();
this.latch = latch;
}
@Override
public void accept(Object t, Throwable u) {
latch.countDown();
}
}
private final List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
private final CountDownLatch latch;
private final List<CompletableFuture<?>> futures = new ArrayList<>();
private final RExecutorService executor;
private final long startTime;
private final long timeout;
public SubTasksExecutor(RExecutorService executor, int workersAmount, long startTime, long timeout) {
public SubTasksExecutor(RExecutorService executor, long startTime, long timeout) {
this.executor = executor;
this.latch = new CountDownLatch(workersAmount);
this.startTime = startTime;
this.timeout = timeout;
}
public void submit(Runnable runnable) {
RFuture<?> future = executor.submitAsync(runnable);
future.onComplete(new LatchListener(latch));
futures.add(future);
futures.add(future.toCompletableFuture());
}
private void cancel(List<RFuture<?>> futures) {
for (RFuture<?> future : futures) {
private void cancel(List<CompletableFuture<?>> futures) {
for (CompletableFuture<?> future : futures) {
future.cancel(true);
}
}
@ -90,23 +69,29 @@ public class SubTasksExecutor {
cancel(futures);
throw new MapReduceTimeoutException();
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
if (timeout > 0 && !latch.await(timeout - timeSpent, TimeUnit.MILLISECONDS)) {
cancel(futures);
throw new MapReduceTimeoutException();
}
if (timeout == 0) {
latch.await();
if (timeout > 0) {
try {
future.get(timeout - timeSpent, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
// skip
} catch (TimeoutException e) {
cancel(futures);
throw new MapReduceTimeoutException();
}
} else if (timeout == 0) {
try {
future.get();
} catch (ExecutionException e) {
throw (Exception) e.getCause();
}
}
} catch (InterruptedException e) {
cancel(futures);
return false;
}
for (RFuture<?> rFuture : futures) {
if (!rFuture.isSuccess()) {
throw (Exception) rFuture.cause();
}
}
return true;
}

@ -1,59 +0,0 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
/**
*
* @author Nikita Koksharov
*
* @param <T> type
*/
public class TransferListener<T> implements BiConsumer<Object, Throwable> {
private final RPromise<T> promise;
private final T value;
public TransferListener(RPromise<T> promise) {
this(promise, null);
}
public TransferListener(RPromise<T> promise, T value) {
super();
this.promise = promise;
this.value = value;
}
@Override
public void accept(Object t, Throwable u) {
if (u != null) {
if (u instanceof CompletionException) {
u = u.getCause();
}
promise.tryFailure(u);
return;
}
if (value != null) {
promise.trySuccess(value);
} else {
promise.trySuccess((T) t);
}
}
}

@ -31,8 +31,6 @@ import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@ -344,7 +342,6 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
return CompletableFuture.completedFuture(false);
}
RPromise<Boolean> result = new RedissonPromise<>();
if (optionsCopy.isAckExpected()) {
String ackName = remoteService.getAckName(requestId);
RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(responseQueueName, LongCodec.INSTANCE,

Loading…
Cancel
Save