refactoring

pull/4087/head
Nikita Koksharov 3 years ago
parent 871b060af9
commit 305522d864

@ -31,7 +31,6 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -146,7 +145,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
@ -305,28 +304,24 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
throw new CompletionException(cause);
}
result.trySuccess(null);
return null;
});
return result;
return new CompletableFutureWrapper<>(f);
}
protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);

@ -24,13 +24,16 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.RPromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedissonPromise;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -59,22 +62,18 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public RFuture<Boolean> addAsync(V e) {
RPromise<Boolean> result = new RedissonPromise<Boolean>();
RFuture<Boolean> future = offerAsync(e);
future.onComplete((res, ex) -> {
CompletionStage<Boolean> f = future.handle((res, ex) -> {
if (ex != null) {
result.tryFailure(ex);
return;
throw new CompletionException(ex);
}
if (!res) {
result.tryFailure(new IllegalStateException("Queue is full"));
return;
throw new CompletionException(new IllegalStateException("Queue is full"));
}
result.trySuccess(res);
return true;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -120,30 +119,19 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
return wrapTakeFuture(takeFuture);
}
private RPromise<V> wrapTakeFuture(RFuture<V> takeFuture) {
RPromise<V> result = new RedissonPromise<V>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
return takeFuture.cancel(mayInterruptIfRunning);
};
};
takeFuture.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
private RFuture<V> wrapTakeFuture(RFuture<V> takeFuture) {
CompletableFuture<V> f = takeFuture.toCompletableFuture().thenCompose(res -> {
if (res == null) {
result.trySuccess(res);
return;
return CompletableFuture.completedFuture(null);
}
return createSemaphore(null).releaseAsync().handle((r, ex) -> res);
});
f.whenComplete((r, e) -> {
if (f.isCancelled()) {
takeFuture.cancel(false);
}
createSemaphore(null).releaseAsync().onComplete((r, ex) -> {
result.trySuccess(res);
});
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override

@ -45,7 +45,6 @@ import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/**
*
@ -647,7 +646,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
result.add(executorFuture);
}
executorRemoteService.executeAddAsync().onComplete((res, e) -> {
executorRemoteService.executeAddAsync().whenComplete((res, e) -> {
if (e != null) {
for (RExecutorFuture<?> executorFuture : result) {
executorFuture.toCompletableFuture().completeExceptionally(e);
@ -757,7 +756,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
result.add(executorFuture);
}
executorRemoteService.executeAddAsync().onComplete((res, e) -> {
executorRemoteService.executeAddAsync().whenComplete((res, e) -> {
if (e != null) {
for (RExecutorFuture<?> executorFuture : result) {
executorFuture.toCompletableFuture().completeExceptionally(e);
@ -1044,24 +1043,17 @@ public class RedissonExecutorService implements RScheduledExecutorService {
return executorRemoteService.cancelExecutionAsync(new RequestId(taskId));
}
private <T> RFuture<T> poll(List<RExecutorFuture<?>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<RFuture<T>> result = new AtomicReference<>();
for (Future<?> future : futures) {
RFuture<T> f = (RFuture<T>) future;
f.onComplete((r, e) -> {
latch.countDown();
result.compareAndSet(null, f);
});
}
if (timeout == -1) {
latch.await();
} else {
latch.await(timeout, timeUnit);
private <T> T poll(List<CompletableFuture<?>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
CompletableFuture<Object> future = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
try {
if (timeout == -1) {
return (T) future.get();
} else {
return (T) future.get(timeout, timeUnit);
}
} catch (ExecutionException e) {
throw commandExecutor.convertException(e);
}
return result.get();
}
@Override
@ -1080,20 +1072,17 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new NullPointerException();
}
List<RExecutorFuture<?>> futures = new ArrayList<>();
List<CompletableFuture<?>> futures = new ArrayList<>();
for (Callable<T> callable : tasks) {
RExecutorFuture<T> future = submit(callable);
futures.add(future);
futures.add(future.toCompletableFuture());
}
RFuture<T> result = poll(futures, timeout, unit);
if (result == null) {
throw new TimeoutException();
}
for (RExecutorFuture<?> f : futures) {
T result = poll(futures, timeout, unit);
for (CompletableFuture<?> f : futures) {
f.cancel(true);
}
return result.getNow();
return result;
}
@Override

@ -29,17 +29,17 @@ import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.CompositeIterable;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.reactive.CommandReactiveBatchService;
import org.redisson.rx.CommandRxBatchService;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@ -239,55 +239,57 @@ public class RedissonKeys implements RKeys {
}
int batchSize = 500;
RPromise<Long> result = new RedissonPromise<Long>();
AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
AtomicLong count = new AtomicLong();
Collection<MasterSlaveEntry> entries = commandExecutor.getConnectionManager().getEntrySet();
AtomicLong executed = new AtomicLong(entries.size());
BiConsumer<Long, Throwable> listener = (res, e) -> {
if (e == null) {
count.addAndGet(res);
} else {
failed.set(e);
}
checkExecution(result, failed, count, executed);
};
for (MasterSlaveEntry entry : entries) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
long count = 0;
try {
Iterator<String> keysIterator = createKeysIterator(entry, RedisCommands.SCAN, pattern, batchSize);
List<String> keys = new ArrayList<String>();
while (keysIterator.hasNext()) {
String key = keysIterator.next();
keys.add(key);
if (keys.size() % batchSize == 0) {
count += delete(keys.toArray(new String[keys.size()]));
keys.clear();
}
}
if (!keys.isEmpty()) {
count += delete(keys.toArray(new String[keys.size()]));
List<CompletableFuture<Long>> futures = new ArrayList<>();
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
long count = 0;
try {
Iterator<String> keysIterator = createKeysIterator(entry, RedisCommands.SCAN, pattern, batchSize);
List<String> keys = new ArrayList<>();
while (keysIterator.hasNext()) {
String key = keysIterator.next();
keys.add(key);
if (keys.size() % batchSize == 0) {
count += delete(keys.toArray(new String[0]));
keys.clear();
}
}
RFuture<Long> future = RedissonPromise.newSucceededFuture(count);
future.onComplete(listener);
} catch (Exception e) {
RFuture<Long> future = RedissonPromise.newFailedFuture(e);
future.onComplete(listener);
if (!keys.isEmpty()) {
count += delete(keys.toArray(new String[0]));
keys.clear();
}
RFuture<Long> future = RedissonPromise.newSucceededFuture(count);
futures.add(future.toCompletableFuture());
} catch (Exception e) {
CompletableFuture<Long> future = new CompletableFuture<>();
future.completeExceptionally(e);
futures.add(future);
}
});
}
return result;
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<Long> res = future.handle((r, e) -> {
long cc = futures.stream()
.filter(f -> f.isDone())
.mapToLong(f -> f.getNow(0L))
.sum();
if (e != null) {
if (cc > 0) {
RedisException ex = new RedisException(
cc + " keys have been deleted. But one or more nodes has an error", e);
throw new CompletionException(ex);
} else {
throw new CompletionException(e);
}
}
return cc;
});
return new CompletableFutureWrapper<>(res);
}
@Override
@ -419,24 +421,6 @@ public class RedissonKeys implements RKeys {
return commandExecutor.writeAllAsync(RedisCommands.FLUSHALL);
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed, AtomicLong count,
AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException(
"" + count.get() + " keys has been deleted. But one or more nodes has an error",
failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
@Override
public long remainTimeToLive(String name) {
return commandExecutor.get(remainTimeToLiveAsync(name));

@ -1532,7 +1532,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
params.add(count);
RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, SCAN,
RFuture<MapCacheScanResult<Object, Object>> future = commandExecutor.evalReadAsync(client, name, codec, SCAN,
"local result = {}; "
+ "local idleKeys = {}; "
+ "local res; "
@ -1572,41 +1572,40 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
Arrays.asList(name, getTimeoutSetName(name), getIdleSetName(name)),
params.toArray());
f.onComplete((res, e) -> {
if (res != null) {
if (res.getIdleKeys().isEmpty()) {
return;
}
List<Object> args = new ArrayList<Object>(res.getIdleKeys().size() + 1);
args.add(System.currentTimeMillis());
encodeMapKeys(args, res.getIdleKeys());
commandExecutor.evalWriteAsync(name, codec, new RedisCommand<Map<Object, Object>>("EVAL",
new MapValueDecoder(new MapGetAllDecoder(args, 1))),
"local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter
+ "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); "
+ "for i = #map, 1, -1 do "
+ "local value = map[i]; "
+ "if value ~= false then "
+ "local key = ARGV[i]; "
+ "local t, val = struct.unpack('dLc0', value); "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[2], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > currentTime then "
+ "redis.call('zadd', KEYS[2], t + currentTime, key); "
+ "end; "
+ "end; "
+ "end; "
+ "end; "
+ "end; ",
Arrays.asList(name, getIdleSetName(name)), args.toArray());
CompletionStage<MapCacheScanResult<Object, Object>> f = future.thenApply(res -> {
if (res.getIdleKeys().isEmpty()) {
return res;
}
List<Object> args = new ArrayList<Object>(res.getIdleKeys().size() + 1);
args.add(System.currentTimeMillis());
encodeMapKeys(args, res.getIdleKeys());
commandExecutor.evalWriteAsync(name, codec, new RedisCommand<Map<Object, Object>>("EVAL",
new MapValueDecoder(new MapGetAllDecoder(args, 1))),
"local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter
+ "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); "
+ "for i = #map, 1, -1 do "
+ "local value = map[i]; "
+ "if value ~= false then "
+ "local key = ARGV[i]; "
+ "local t, val = struct.unpack('dLc0', value); "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[2], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > currentTime then "
+ "redis.call('zadd', KEYS[2], t + currentTime, key); "
+ "end; "
+ "end; "
+ "end; "
+ "end; "
+ "end; ",
Arrays.asList(name, getIdleSetName(name)), args.toArray());
return res;
});
return (RFuture<ScanResult<Map.Entry<Object, Object>>>) (Object) f;
return new CompletableFutureWrapper<>((CompletionStage<ScanResult<Map.Entry<Object, Object>>>) (Object) f);
}
@Override

@ -27,11 +27,13 @@ import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseMapIterator;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
@ -269,7 +271,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
@Override
public RFuture<Void> renameAsync(String newName) {
String newPrefix = suffixName(newName, "");
RFuture<Void> f = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local entries = redis.call('hgetall', KEYS[1]); " +
"local keys = {}; " +
"for i, v in ipairs(entries) do " +
@ -283,18 +285,14 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
+ "redis.call('rename', ARGV[1] .. keys[i], ARGV[2] .. keys[i]); "
+ "end; ",
Arrays.asList(getRawName()), prefix, newPrefix, newName);
f.onComplete((r, e) -> {
if (e == null) {
setName(newName);
}
});
return f;
CompletionStage<Void> f = future.thenAccept(r -> setName(newName));
return new CompletableFutureWrapper<>(f);
}
@Override
public RFuture<Boolean> renamenxAsync(String newName) {
String newPrefix = suffixName(newName, "");
RFuture<Boolean> f = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local entries = redis.call('hgetall', KEYS[1]); " +
"local keys = {}; " +
"for i, v in ipairs(entries) do " +
@ -320,12 +318,13 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
+ "end; " +
"return 1; ",
Arrays.asList(getRawName()), prefix, newPrefix, newName);
f.onComplete((value, e) -> {
if (e == null && value) {
CompletionStage<Boolean> f = future.thenApply(value -> {
if (value) {
setName(newName);
}
return value;
});
return f;
return new CompletableFutureWrapper<>(f);
}
@Override

@ -27,6 +27,7 @@ import org.redisson.misc.Hash;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
@ -140,13 +141,9 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Void> renameAsync(String newName) {
RFuture<Void> f = commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.RENAME, getRawName(), newName);
f.onComplete((r, e) -> {
if (e == null) {
setName(newName);
}
});
return f;
RFuture<Void> future = commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.RENAME, getRawName(), newName);
CompletionStage<Void> f = future.thenAccept(r -> setName(newName));
return new CompletableFutureWrapper<>(f);
}
@Override
@ -186,13 +183,14 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Boolean> renamenxAsync(String newName) {
RFuture<Boolean> f = commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.RENAMENX, getRawName(), newName);
f.onComplete((value, e) -> {
if (e == null && value) {
RFuture<Boolean> future = commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.RENAMENX, getRawName(), newName);
CompletionStage<Boolean> f = future.thenApply(value -> {
if (value) {
setName(newName);
}
return value;
});
return f;
return new CompletableFutureWrapper<>(f);
}

Loading…
Cancel
Save