refactoring

pull/4087/head
Nikita Koksharov 3 years ago
parent 07d606de7c
commit 36bafe4935

@ -32,11 +32,12 @@ import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.iterator.RedissonListIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Predicate;
import static org.redisson.client.protocol.RedisCommands.*;
@ -391,25 +392,21 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public RFuture<V> setAsync(int index, V element) {
RPromise<V> result = new RedissonPromise<V>();
RFuture<V> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " +
"return v",
Collections.<Object>singletonList(getRawName()), index, encode(element));
future.onComplete((res, e) -> {
Collections.singletonList(getRawName()), index, encode(element));
CompletionStage<V> f = future.handle((res, e) -> {
if (e != null) {
if (e.getMessage().contains("ERR index out of range")) {
result.tryFailure(new IndexOutOfBoundsException("index out of range"));
return;
throw new CompletionException(new IndexOutOfBoundsException("index out of range"));
}
result.tryFailure(e);
return;
throw new CompletionException(e);
}
result.trySuccess(res);
return res;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override

@ -32,7 +32,6 @@ import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.iterator.RedissonMapIterator;
import org.redisson.mapreduce.RedissonMapReduce;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -192,7 +191,6 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
newValuePromise.complete(value);
}
return newValuePromise
.exceptionally(e -> null)
.thenCompose(newValue -> {
RFuture<?> future;
if (newValue != null) {
@ -200,17 +198,11 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} else {
future = fastRemoveAsync(key);
}
return future.thenApply(res -> {
lock.unlockAsync(threadId);
return newValue;
});
return future.thenApply(res -> newValue);
});
});
});
f.whenComplete((r, e) -> {
if (e != null) {
}).whenComplete((c, e) -> {
lock.unlockAsync(threadId);
}
});
});
return new CompletableFutureWrapper<>(f);
}
@ -221,64 +213,53 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
Objects.requireNonNull(remappingFunction);
RLock lock = getLock(key);
RPromise<V> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
lock.lockAsync(threadId).onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
RFuture<V> oldValueFuture = getAsync(key);
oldValueFuture.onComplete((oldValue, ex) -> {
if (ex != null) {
lock.unlockAsync(threadId);
result.tryFailure(ex);
return;
}
CompletionStage<V> f = lock.lockAsync(threadId)
.thenCompose(r -> {
RFuture<V> oldValueFuture = getAsync(key);
return oldValueFuture.thenCompose(oldValue -> {
CompletableFuture<V> result = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = remappingFunction.apply(key, oldValue);
} catch (Exception exception) {
result.completeExceptionally(exception);
return;
}
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = remappingFunction.apply(key, oldValue);
} catch (Exception exception) {
lock.unlockAsync(threadId);
result.tryFailure(exception);
return;
}
if (newValue == null) {
if (oldValue != null) {
fastRemoveAsync(key).whenComplete((res, exc) -> {
if (exc != null) {
result.completeExceptionally(exc);
return;
}
if (newValue == null) {
if (oldValue != null) {
fastRemoveAsync(key).onComplete((res, exc) -> {
lock.unlockAsync(threadId);
result.complete(newValue);
});
return;
}
} else {
fastPutAsync(key, newValue).whenComplete((res, exc) -> {
if (exc != null) {
result.tryFailure(exc);
result.completeExceptionally(exc);
return;
}
result.trySuccess(newValue);
result.complete(newValue);
});
return;
}
} else {
fastPutAsync(key, newValue).onComplete((res, exc) -> {
lock.unlockAsync(threadId);
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(newValue);
});
return;
}
result.complete(newValue);
});
return result;
}).whenComplete((c, e) -> {
lock.unlockAsync(threadId);
result.trySuccess(newValue);
});
});
});
return result;
return new CompletableFutureWrapper<>(f);
}
@ -312,56 +293,40 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
Objects.requireNonNull(mappingFunction);
RLock lock = getLock(key);
RPromise<V> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
lock.lockAsync(threadId).onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
RFuture<V> oldValueFuture = getAsync(key);
oldValueFuture.onComplete((oldValue, ex) -> {
if (ex != null) {
lock.unlockAsync(threadId);
result.tryFailure(ex);
return;
}
if (oldValue != null) {
lock.unlockAsync(threadId);
result.trySuccess(oldValue);
return;
}
CompletionStage<V> f = lock.lockAsync(threadId)
.thenCompose(r -> {
RFuture<V> oldValueFuture = getAsync(key);
return oldValueFuture.thenCompose(oldValue -> {
if (oldValue != null) {
return CompletableFuture.completedFuture(oldValue);
}
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = mappingFunction.apply(key);
} catch (Exception exception) {
lock.unlockAsync(threadId);
result.tryFailure(exception);
return;
}
if (newValue != null) {
fastPutAsync(key, newValue).onComplete((res, exc) -> {
lock.unlockAsync(threadId);
if (exc != null) {
result.tryFailure(exc);
CompletableFuture<V> result = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = mappingFunction.apply(key);
} catch (Exception exception) {
result.completeExceptionally(exception);
return;
}
if (newValue != null) {
fastPutAsync(key, newValue).thenAccept(res -> {
result.complete(newValue);
});
return;
}
result.trySuccess(newValue);
result.complete(null);
});
return;
}
lock.unlockAsync(threadId);
result.trySuccess(null);
return result;
}).whenComplete((c, e) -> {
lock.unlockAsync(threadId);
});
});
});
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -393,63 +358,53 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
Objects.requireNonNull(remappingFunction);
RLock lock = getLock(key);
RPromise<V> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
lock.lockAsync(threadId).onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
RFuture<V> oldValueFuture = getAsync(key);
oldValueFuture.onComplete((oldValue, ex) -> {
if (ex != null) {
lock.unlockAsync(threadId);
result.tryFailure(e);
return;
}
if (oldValue == null) {
lock.unlockAsync(threadId);
result.trySuccess(null);
return;
}
CompletionStage<V> f = lock.lockAsync(threadId)
.thenCompose(r -> {
RFuture<V> oldValueFuture = getAsync(key);
return oldValueFuture.thenCompose(oldValue -> {
if (oldValue == null) {
return CompletableFuture.completedFuture(null);
}
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = remappingFunction.apply(key, oldValue);
} catch (Exception exception) {
lock.unlockAsync(threadId);
result.tryFailure(exception);
return;
}
if (newValue != null) {
RFuture<Boolean> fastPutFuture = fastPutAsync(key, newValue);
fastPutFuture.onComplete((re, ex1) -> {
lock.unlockAsync(threadId);
if (ex1 != null) {
result.tryFailure(ex1);
CompletableFuture<V> result = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = remappingFunction.apply(key, oldValue);
} catch (Exception exception) {
result.completeExceptionally(exception);
return;
}
if (newValue != null) {
RFuture<Boolean> fastPutFuture = fastPutAsync(key, newValue);
fastPutFuture.whenComplete((re, ex1) -> {
if (ex1 != null) {
result.completeExceptionally(ex1);
return;
}
result.trySuccess(newValue);
});
} else {
RFuture<Long> removeFuture = fastRemoveAsync(key);
removeFuture.onComplete((re, ex1) -> {
lock.unlockAsync(threadId);
if (ex1 != null) {
result.tryFailure(ex1);
return;
}
result.complete(newValue);
});
} else {
RFuture<Long> removeFuture = fastRemoveAsync(key);
removeFuture.whenComplete((re, ex1) -> {
if (ex1 != null) {
result.completeExceptionally(ex1);
return;
}
result.trySuccess(null);
result.complete(null);
});
}
});
}
return result;
}).whenComplete((c, e) -> {
lock.unlockAsync(threadId);
});
});
});
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -702,22 +657,17 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
protected final <M> RFuture<M> mapWriterFuture(RFuture<M> future, MapWriterTask task, Function<M, Boolean> condition) {
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
future.onComplete((res, e) -> {
CompletionStage<M> f = future.whenComplete((res, e) -> {
if (e == null && condition.apply(res)) {
writeBehindTask.addTask(task);
}
});
return future;
}
final RPromise<M> promise = new RedissonPromise<>();
future.onComplete((res, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
return new CompletableFutureWrapper<>(f);
}
CompletionStage<M> f = future.thenCompose(res -> {
if (condition.apply(res)) {
CompletableFuture<M> promise = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
try {
if (task instanceof MapWriterTask.Add) {
@ -726,17 +676,17 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
options.getWriter().delete(task.getKeys());
}
} catch (Exception ex) {
promise.tryFailure(ex);
promise.completeExceptionally(ex);
return;
}
promise.trySuccess(res);
promise.complete(res);
});
} else {
promise.trySuccess(res);
return promise;
}
return CompletableFuture.completedFuture(res);
});
return promise;
return new CompletableFutureWrapper<>(f);
}
protected RFuture<Void> putAllOperationAsync(Map<? extends K, ? extends V> map) {

Loading…
Cancel
Save