From 305522d8646a033b033e474bf89d0b9757f75bbd Mon Sep 17 00:00:00 2001 From: Nikita Koksharov <nkoksharov@redisson.pro> Date: Tue, 18 Jan 2022 13:18:32 +0300 Subject: [PATCH] refactoring --- .../java/org/redisson/RedissonBaseLock.java | 17 +-- .../RedissonBoundedBlockingQueue.java | 52 ++++----- .../org/redisson/RedissonExecutorService.java | 45 +++----- .../main/java/org/redisson/RedissonKeys.java | 108 ++++++++---------- .../java/org/redisson/RedissonMapCache.java | 65 ++++++----- .../java/org/redisson/RedissonMultimap.java | 21 ++-- .../java/org/redisson/RedissonObject.java | 20 ++-- 7 files changed, 140 insertions(+), 188 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseLock.java b/redisson/src/main/java/org/redisson/RedissonBaseLock.java index 9075cfd0a..a413dec1c 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseLock.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseLock.java @@ -31,7 +31,6 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,7 +145,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc } RFuture<Boolean> future = renewExpirationAsync(threadId); - future.onComplete((res, e) -> { + future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); @@ -305,28 +304,24 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc @Override public RFuture<Void> unlockAsync(long threadId) { - RPromise<Void> result = new RedissonPromise<>(); RFuture<Boolean> future = unlockInnerAsync(threadId); - future.onComplete((opStatus, e) -> { + CompletionStage<Void> f = future.handle((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null) { - result.tryFailure(e); - return; + throw new CompletionException(e); } - if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); - result.tryFailure(cause); - return; + throw new CompletionException(cause); } - result.trySuccess(null); + return null; }); - return result; + return new CompletableFutureWrapper<>(f); } protected abstract RFuture<Boolean> unlockInnerAsync(long threadId); diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index 5416faefc..5007a9d99 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -24,13 +24,16 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.ListDrainToDecoder; -import org.redisson.misc.RPromise; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.RedissonPromise; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -59,22 +62,18 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements @Override public RFuture<Boolean> addAsync(V e) { - RPromise<Boolean> result = new RedissonPromise<Boolean>(); RFuture<Boolean> future = offerAsync(e); - future.onComplete((res, ex) -> { + CompletionStage<Boolean> f = future.handle((res, ex) -> { if (ex != null) { - result.tryFailure(ex); - return; + throw new CompletionException(ex); } - + if (!res) { - result.tryFailure(new IllegalStateException("Queue is full")); - return; + throw new CompletionException(new IllegalStateException("Queue is full")); } - - result.trySuccess(res); + return true; }); - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -120,30 +119,19 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements return wrapTakeFuture(takeFuture); } - private RPromise<V> wrapTakeFuture(RFuture<V> takeFuture) { - RPromise<V> result = new RedissonPromise<V>() { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - super.cancel(mayInterruptIfRunning); - return takeFuture.cancel(mayInterruptIfRunning); - }; - }; - - takeFuture.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + private RFuture<V> wrapTakeFuture(RFuture<V> takeFuture) { + CompletableFuture<V> f = takeFuture.toCompletableFuture().thenCompose(res -> { if (res == null) { - result.trySuccess(res); - return; + return CompletableFuture.completedFuture(null); + } + return createSemaphore(null).releaseAsync().handle((r, ex) -> res); + }); + f.whenComplete((r, e) -> { + if (f.isCancelled()) { + takeFuture.cancel(false); } - createSemaphore(null).releaseAsync().onComplete((r, ex) -> { - result.trySuccess(res); - }); }); - return result; + return new CompletableFutureWrapper<>(f); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 0261fa46b..933240a08 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -45,7 +45,6 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; /** * @@ -647,7 +646,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { result.add(executorFuture); } - executorRemoteService.executeAddAsync().onComplete((res, e) -> { + executorRemoteService.executeAddAsync().whenComplete((res, e) -> { if (e != null) { for (RExecutorFuture<?> executorFuture : result) { executorFuture.toCompletableFuture().completeExceptionally(e); @@ -757,7 +756,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { result.add(executorFuture); } - executorRemoteService.executeAddAsync().onComplete((res, e) -> { + executorRemoteService.executeAddAsync().whenComplete((res, e) -> { if (e != null) { for (RExecutorFuture<?> executorFuture : result) { executorFuture.toCompletableFuture().completeExceptionally(e); @@ -1044,24 +1043,17 @@ public class RedissonExecutorService implements RScheduledExecutorService { return executorRemoteService.cancelExecutionAsync(new RequestId(taskId)); } - private <T> RFuture<T> poll(List<RExecutorFuture<?>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference<RFuture<T>> result = new AtomicReference<>(); - for (Future<?> future : futures) { - RFuture<T> f = (RFuture<T>) future; - f.onComplete((r, e) -> { - latch.countDown(); - result.compareAndSet(null, f); - }); - } - - if (timeout == -1) { - latch.await(); - } else { - latch.await(timeout, timeUnit); + private <T> T poll(List<CompletableFuture<?>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { + CompletableFuture<Object> future = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])); + try { + if (timeout == -1) { + return (T) future.get(); + } else { + return (T) future.get(timeout, timeUnit); + } + } catch (ExecutionException e) { + throw commandExecutor.convertException(e); } - - return result.get(); } @Override @@ -1080,20 +1072,17 @@ public class RedissonExecutorService implements RScheduledExecutorService { throw new NullPointerException(); } - List<RExecutorFuture<?>> futures = new ArrayList<>(); + List<CompletableFuture<?>> futures = new ArrayList<>(); for (Callable<T> callable : tasks) { RExecutorFuture<T> future = submit(callable); - futures.add(future); + futures.add(future.toCompletableFuture()); } - RFuture<T> result = poll(futures, timeout, unit); - if (result == null) { - throw new TimeoutException(); - } - for (RExecutorFuture<?> f : futures) { + T result = poll(futures, timeout, unit); + for (CompletableFuture<?> f : futures) { f.cancel(true); } - return result.getNow(); + return result; } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index f02f5c446..b27f0dee2 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -29,17 +29,17 @@ import org.redisson.command.CommandBatchService; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.iterator.RedissonBaseIterator; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompositeIterable; -import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.reactive.CommandReactiveBatchService; import org.redisson.rx.CommandRxBatchService; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -239,55 +239,57 @@ public class RedissonKeys implements RKeys { } int batchSize = 500; - RPromise<Long> result = new RedissonPromise<Long>(); - AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); - AtomicLong count = new AtomicLong(); - Collection<MasterSlaveEntry> entries = commandExecutor.getConnectionManager().getEntrySet(); - AtomicLong executed = new AtomicLong(entries.size()); - BiConsumer<Long, Throwable> listener = (res, e) -> { - if (e == null) { - count.addAndGet(res); - } else { - failed.set(e); - } - - checkExecution(result, failed, count, executed); - }; - - for (MasterSlaveEntry entry : entries) { - commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { - @Override - public void run() { - long count = 0; - try { - Iterator<String> keysIterator = createKeysIterator(entry, RedisCommands.SCAN, pattern, batchSize); - List<String> keys = new ArrayList<String>(); - while (keysIterator.hasNext()) { - String key = keysIterator.next(); - keys.add(key); - - if (keys.size() % batchSize == 0) { - count += delete(keys.toArray(new String[keys.size()])); - keys.clear(); - } - } - - if (!keys.isEmpty()) { - count += delete(keys.toArray(new String[keys.size()])); + List<CompletableFuture<Long>> futures = new ArrayList<>(); + for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + long count = 0; + try { + Iterator<String> keysIterator = createKeysIterator(entry, RedisCommands.SCAN, pattern, batchSize); + List<String> keys = new ArrayList<>(); + while (keysIterator.hasNext()) { + String key = keysIterator.next(); + keys.add(key); + + if (keys.size() % batchSize == 0) { + count += delete(keys.toArray(new String[0])); keys.clear(); } + } - RFuture<Long> future = RedissonPromise.newSucceededFuture(count); - future.onComplete(listener); - } catch (Exception e) { - RFuture<Long> future = RedissonPromise.newFailedFuture(e); - future.onComplete(listener); + if (!keys.isEmpty()) { + count += delete(keys.toArray(new String[0])); + keys.clear(); } + + RFuture<Long> future = RedissonPromise.newSucceededFuture(count); + futures.add(future.toCompletableFuture()); + } catch (Exception e) { + CompletableFuture<Long> future = new CompletableFuture<>(); + future.completeExceptionally(e); + futures.add(future); } }); } - return result; + CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + CompletableFuture<Long> res = future.handle((r, e) -> { + long cc = futures.stream() + .filter(f -> f.isDone()) + .mapToLong(f -> f.getNow(0L)) + .sum(); + if (e != null) { + if (cc > 0) { + RedisException ex = new RedisException( + cc + " keys have been deleted. But one or more nodes has an error", e); + throw new CompletionException(ex); + } else { + throw new CompletionException(e); + } + } + + return cc; + }); + return new CompletableFutureWrapper<>(res); } @Override @@ -419,24 +421,6 @@ public class RedissonKeys implements RKeys { return commandExecutor.writeAllAsync(RedisCommands.FLUSHALL); } - private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed, AtomicLong count, - AtomicLong executed) { - if (executed.decrementAndGet() == 0) { - if (failed.get() != null) { - if (count.get() > 0) { - RedisException ex = new RedisException( - "" + count.get() + " keys has been deleted. But one or more nodes has an error", - failed.get()); - result.tryFailure(ex); - } else { - result.tryFailure(failed.get()); - } - } else { - result.trySuccess(count.get()); - } - } - } - @Override public long remainTimeToLive(String name) { return commandExecutor.get(remainTimeToLiveAsync(name)); diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 9cc4c27b1..2e1c33149 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -1532,7 +1532,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac } params.add(count); - RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, SCAN, + RFuture<MapCacheScanResult<Object, Object>> future = commandExecutor.evalReadAsync(client, name, codec, SCAN, "local result = {}; " + "local idleKeys = {}; " + "local res; " @@ -1572,41 +1572,40 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac Arrays.asList(name, getTimeoutSetName(name), getIdleSetName(name)), params.toArray()); - f.onComplete((res, e) -> { - if (res != null) { - if (res.getIdleKeys().isEmpty()) { - return; - } - - List<Object> args = new ArrayList<Object>(res.getIdleKeys().size() + 1); - args.add(System.currentTimeMillis()); - encodeMapKeys(args, res.getIdleKeys()); - - commandExecutor.evalWriteAsync(name, codec, new RedisCommand<Map<Object, Object>>("EVAL", - new MapValueDecoder(new MapGetAllDecoder(args, 1))), - "local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter - + "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); " - + "for i = #map, 1, -1 do " - + "local value = map[i]; " - + "if value ~= false then " - + "local key = ARGV[i]; " - + "local t, val = struct.unpack('dLc0', value); " - - + "if t ~= 0 then " - + "local expireIdle = redis.call('zscore', KEYS[2], key); " - + "if expireIdle ~= false then " - + "if tonumber(expireIdle) > currentTime then " - + "redis.call('zadd', KEYS[2], t + currentTime, key); " - + "end; " - + "end; " - + "end; " - + "end; " - + "end; ", - Arrays.asList(name, getIdleSetName(name)), args.toArray()); + CompletionStage<MapCacheScanResult<Object, Object>> f = future.thenApply(res -> { + if (res.getIdleKeys().isEmpty()) { + return res; } + + List<Object> args = new ArrayList<Object>(res.getIdleKeys().size() + 1); + args.add(System.currentTimeMillis()); + encodeMapKeys(args, res.getIdleKeys()); + + commandExecutor.evalWriteAsync(name, codec, new RedisCommand<Map<Object, Object>>("EVAL", + new MapValueDecoder(new MapGetAllDecoder(args, 1))), + "local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter + + "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); " + + "for i = #map, 1, -1 do " + + "local value = map[i]; " + + "if value ~= false then " + + "local key = ARGV[i]; " + + "local t, val = struct.unpack('dLc0', value); " + + + "if t ~= 0 then " + + "local expireIdle = redis.call('zscore', KEYS[2], key); " + + "if expireIdle ~= false then " + + "if tonumber(expireIdle) > currentTime then " + + "redis.call('zadd', KEYS[2], t + currentTime, key); " + + "end; " + + "end; " + + "end; " + + "end; " + + "end; ", + Arrays.asList(name, getIdleSetName(name)), args.toArray()); + return res; }); - return (RFuture<ScanResult<Map.Entry<Object, Object>>>) (Object) f; + return new CompletableFutureWrapper<>((CompletionStage<ScanResult<Map.Entry<Object, Object>>>) (Object) f); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonMultimap.java b/redisson/src/main/java/org/redisson/RedissonMultimap.java index d5ea9900f..769fd7649 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonMultimap.java @@ -27,11 +27,13 @@ import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.iterator.RedissonBaseMapIterator; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.Hash; import org.redisson.misc.RedissonPromise; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; /** @@ -269,7 +271,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement @Override public RFuture<Void> renameAsync(String newName) { String newPrefix = suffixName(newName, ""); - RFuture<Void> f = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, + RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "local entries = redis.call('hgetall', KEYS[1]); " + "local keys = {}; " + "for i, v in ipairs(entries) do " + @@ -283,18 +285,14 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement + "redis.call('rename', ARGV[1] .. keys[i], ARGV[2] .. keys[i]); " + "end; ", Arrays.asList(getRawName()), prefix, newPrefix, newName); - f.onComplete((r, e) -> { - if (e == null) { - setName(newName); - } - }); - return f; + CompletionStage<Void> f = future.thenAccept(r -> setName(newName)); + return new CompletableFutureWrapper<>(f); } @Override public RFuture<Boolean> renamenxAsync(String newName) { String newPrefix = suffixName(newName, ""); - RFuture<Boolean> f = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local entries = redis.call('hgetall', KEYS[1]); " + "local keys = {}; " + "for i, v in ipairs(entries) do " + @@ -320,12 +318,13 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement + "end; " + "return 1; ", Arrays.asList(getRawName()), prefix, newPrefix, newName); - f.onComplete((value, e) -> { - if (e == null && value) { + CompletionStage<Boolean> f = future.thenApply(value -> { + if (value) { setName(newName); } + return value; }); - return f; + return new CompletableFutureWrapper<>(f); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index f1abd0d6c..99dd89a37 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -27,6 +27,7 @@ import org.redisson.misc.Hash; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.stream.Stream; @@ -140,13 +141,9 @@ public abstract class RedissonObject implements RObject { @Override public RFuture<Void> renameAsync(String newName) { - RFuture<Void> f = commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.RENAME, getRawName(), newName); - f.onComplete((r, e) -> { - if (e == null) { - setName(newName); - } - }); - return f; + RFuture<Void> future = commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.RENAME, getRawName(), newName); + CompletionStage<Void> f = future.thenAccept(r -> setName(newName)); + return new CompletableFutureWrapper<>(f); } @Override @@ -186,13 +183,14 @@ public abstract class RedissonObject implements RObject { @Override public RFuture<Boolean> renamenxAsync(String newName) { - RFuture<Boolean> f = commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.RENAMENX, getRawName(), newName); - f.onComplete((value, e) -> { - if (e == null && value) { + RFuture<Boolean> future = commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.RENAMENX, getRawName(), newName); + CompletionStage<Boolean> f = future.thenApply(value -> { + if (value) { setName(newName); } + return value; }); - return f; + return new CompletableFutureWrapper<>(f); }