From 7583397694449599054f4bb9e5b55f584bd067c2 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 1 Feb 2016 12:14:25 +0300 Subject: [PATCH] RKeys.deleteAsync wasn't worked in cluster mode. Fixed. --- src/main/java/org/redisson/RedissonKeys.java | 74 +++++++++++++++++--- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/redisson/RedissonKeys.java b/src/main/java/org/redisson/RedissonKeys.java index 148596659..e60637d08 100644 --- a/src/main/java/org/redisson/RedissonKeys.java +++ b/src/main/java/org/redisson/RedissonKeys.java @@ -18,20 +18,28 @@ package org.redisson; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.redisson.client.RedisException; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.cluster.ClusterSlotRange; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.command.CommandBatchService; import org.redisson.core.RKeys; import org.redisson.misc.CompositeIterable; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; public class RedissonKeys implements RKeys { @@ -228,7 +236,7 @@ public class RedissonKeys implements RKeys { * Delete multiple objects by name * * @param keys - object names - * @return + * @return number of removed keys */ @Override public long delete(String ... keys) { @@ -239,22 +247,68 @@ public class RedissonKeys implements RKeys { * Delete multiple objects by name in async mode * * @param keys - object names - * @return + * @return number of removed keys */ @Override public Future deleteAsync(String ... keys) { - return commandExecutor.writeAllAsync(RedisCommands.DEL, new SlotCallback() { - AtomicLong results = new AtomicLong(); - @Override - public void onSlotResult(Long result) { - results.addAndGet(result); + Map> range2key = new HashMap>(); + for (String key : keys) { + int slot = commandExecutor.getConnectionManager().calcSlot(key); + for (ClusterSlotRange range : commandExecutor.getConnectionManager().getEntries().keySet()) { + if (range.isOwn(slot)) { + List list = range2key.get(range); + if (list == null) { + list = new ArrayList(); + range2key.put(range, list); + } + list.add(key); + } } + } + final Promise result = commandExecutor.getConnectionManager().newPromise(); + final AtomicReference failed = new AtomicReference(); + final AtomicLong count = new AtomicLong(); + final AtomicLong executed = new AtomicLong(range2key.size()); + FutureListener> listener = new FutureListener>() { @Override - public Long onFinish() { - return results.get(); + public void operationComplete(Future> future) throws Exception { + if (future.isSuccess()) { + List result = (List) future.get(); + for (Long res : result) { + count.addAndGet(res); + } + } else { + failed.set(future.cause()); + } + + if (executed.decrementAndGet() == 0) { + if (failed.get() != null) { + if (count.get() > 0) { + RedisException ex = new RedisException("" + count.get() + " keys deleted. But one or more nodes has an error", failed.get()); + result.setFailure(ex); + } else { + result.setFailure(failed.get()); + } + } else { + result.setSuccess(count.get()); + } + } } - }, (Object[])keys); + }; + + for (Entry> entry : range2key.entrySet()) { + // executes in batch due to CROSSLOT error + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); + for (String key : entry.getValue()) { + executorService.writeAsync(entry.getKey().getStartSlot(), null, RedisCommands.DEL, key); + } + + Future> future = executorService.executeAsync(); + future.addListener(listener); + } + + return result; } @Override