RedissonKeysReactive refactored

pull/395/head
Nikita 9 years ago
parent b728aff3d7
commit 4e8450361a

@ -25,6 +25,8 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import io.netty.util.concurrent.Future;
/**
*
* @author Nikita Koksharov
@ -32,6 +34,8 @@ import org.redisson.connection.ConnectionManager;
*/
public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R> Publisher<R> reactive(Future<R> future);
ConnectionManager getConnectionManager();
<T, R> Publisher<R> evalReadReactive(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType,

@ -45,6 +45,10 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
return new NettyFuturePublisher<R>(f);
}
public <R> Publisher<R> reactive(Future<R> future) {
return new NettyFuturePublisher<R>(future);
}
@Override
public <T, R> Publisher<Collection<R>> readAllReactive(RedisCommand<T> command, Object ... params) {
Future<Collection<R>> f = readAllAsync(command, params);

@ -18,14 +18,12 @@ package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.SlotCallback;
import org.redisson.RedissonKeys;
import org.redisson.api.RKeysReactive;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
@ -41,14 +39,17 @@ public class RedissonKeysReactive implements RKeysReactive {
private final CommandReactiveService commandExecutor;
RedissonKeys instance;
public RedissonKeysReactive(CommandReactiveService commandExecutor) {
super();
instance = new RedissonKeys(commandExecutor);
this.commandExecutor = commandExecutor;
}
@Override
public Publisher<Integer> getSlot(String key) {
return commandExecutor.readReactive(null, RedisCommands.KEYSLOT, key);
return commandExecutor.reactive(instance.getSlotAsync(key));
}
@Override
@ -153,59 +154,32 @@ public class RedissonKeysReactive implements RKeysReactive {
@Override
public Publisher<Collection<String>> findKeysByPattern(String pattern) {
return commandExecutor.readAllReactive(RedisCommands.KEYS, pattern);
return commandExecutor.reactive(instance.findKeysByPatternAsync(pattern));
}
@Override
public Publisher<String> randomKey() {
return commandExecutor.readRandomReactive(RedisCommands.RANDOM_KEY);
return commandExecutor.reactive(instance.randomKeyAsync());
}
@Override
public Publisher<Long> deleteByPattern(String pattern) {
return commandExecutor.evalWriteAllReactive(RedisCommands.EVAL_LONG, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
results.addAndGet(result);
}
@Override
public Long onFinish() {
return results.get();
}
}, "local keys = redis.call('keys', ARGV[1]) "
+ "local n = 0 "
+ "for i=1, table.getn(keys),5000 do "
+ "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
+ "end "
+ "return n;",Collections.emptyList(), pattern);
return commandExecutor.reactive(instance.deleteByPatternAsync(pattern));
}
@Override
public Publisher<Long> delete(String ... keys) {
return commandExecutor.writeAllReactive(RedisCommands.DEL, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
results.addAndGet(result);
}
@Override
public Long onFinish() {
return results.get();
}
}, (Object[])keys);
return commandExecutor.reactive(instance.deleteAsync(keys));
}
@Override
public Publisher<Void> flushdb() {
return commandExecutor.writeAllReactive(RedisCommands.FLUSHDB);
return commandExecutor.reactive(instance.flushdbAsync());
}
@Override
public Publisher<Void> flushall() {
return commandExecutor.writeAllReactive(RedisCommands.FLUSHALL);
return commandExecutor.reactive(instance.flushallAsync());
}
}

Loading…
Cancel
Save