RKeys.deleteAsync wasn't worked in cluster mode. Fixed.

pull/395/head
Nikita 9 years ago
parent 1e492d8269
commit 7583397694

@ -18,20 +18,28 @@ package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.core.RKeys;
import org.redisson.misc.CompositeIterable;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class RedissonKeys implements RKeys {
@ -228,7 +236,7 @@ public class RedissonKeys implements RKeys {
* Delete multiple objects by name
*
* @param keys - object names
* @return
* @return number of removed keys
*/
@Override
public long delete(String ... keys) {
@ -239,22 +247,68 @@ public class RedissonKeys implements RKeys {
* Delete multiple objects by name in async mode
*
* @param keys - object names
* @return
* @return number of removed keys
*/
@Override
public Future<Long> deleteAsync(String ... keys) {
return commandExecutor.writeAllAsync(RedisCommands.DEL, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
results.addAndGet(result);
Map<ClusterSlotRange, List<String>> range2key = new HashMap<ClusterSlotRange, List<String>>();
for (String key : keys) {
int slot = commandExecutor.getConnectionManager().calcSlot(key);
for (ClusterSlotRange range : commandExecutor.getConnectionManager().getEntries().keySet()) {
if (range.isOwn(slot)) {
List<String> list = range2key.get(range);
if (list == null) {
list = new ArrayList<String>();
range2key.put(range, list);
}
list.add(key);
}
}
}
final Promise<Long> result = commandExecutor.getConnectionManager().newPromise();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size());
FutureListener<List<?>> listener = new FutureListener<List<?>>() {
@Override
public Long onFinish() {
return results.get();
public void operationComplete(Future<List<?>> future) throws Exception {
if (future.isSuccess()) {
List<Long> result = (List<Long>) future.get();
for (Long res : result) {
count.addAndGet(res);
}
} else {
failed.set(future.cause());
}
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys deleted. But one or more nodes has an error", failed.get());
result.setFailure(ex);
} else {
result.setFailure(failed.get());
}
} else {
result.setSuccess(count.get());
}
}
}
}, (Object[])keys);
};
for (Entry<ClusterSlotRange, List<String>> entry : range2key.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (String key : entry.getValue()) {
executorService.writeAsync(entry.getKey().getStartSlot(), null, RedisCommands.DEL, key);
}
Future<List<?>> future = executorService.executeAsync();
future.addListener(listener);
}
return result;
}
@Override

Loading…
Cancel
Save