refactoring

# Conflicts:
#	redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java
#	redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java
#	redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java
pull/1827/head
Nikita Koksharov 6 years ago
parent 96d4110a63
commit e89cce3b3e

@ -146,7 +146,7 @@ public interface RedisCommands {
RedisCommand<Set<Object>> ZRANGEBYSCORE = new RedisCommand<Set<Object>>("ZRANGEBYSCORE", new ObjectSetReplayDecoder<Object>());
RedisCommand<List<Object>> ZRANGEBYSCORE_LIST = new RedisCommand<List<Object>>("ZRANGEBYSCORE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZREVRANGE = new RedisCommand<List<Object>>("ZREVRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZREVRANGEBYSCORE = new RedisCommand<List<Object>>("ZREVRANGEBYSCORE", new ObjectListReplayDecoder<Object>());
RedisCommand<Set<Object>> ZREVRANGEBYSCORE = new RedisCommand<Set<Object>>("ZREVRANGEBYSCORE", new ObjectSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZREVRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZREVRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());

@ -58,6 +58,8 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params);
@ -70,8 +72,12 @@ public interface CommandAsyncExecutor {
<R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);
<T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params);
<R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);
<T, R> RFuture<Collection<R>> readAllAsync(Collection<R> results, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
@ -90,8 +96,6 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params);
<T, R> RFuture<Collection<R>> readAllAsync(Collection<R> results, RedisCommand<T> command, Object ... params);
<R, T> RFuture<R> writeAllAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... params);
<T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object ... params);

@ -245,13 +245,20 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise;
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params) {
List<R> results = new ArrayList<R>();
return readAllAsync(results, codec, command, params);
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
List<R> results = new ArrayList<R>();
return readAllAsync(results, command, params);
return readAllAsync(results, connectionManager.getCodec(), command, params);
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(final Collection<R> results, RedisCommand<T> command, Object... params) {
public <T, R> RFuture<Collection<R>> readAllAsync(final Collection<R> results, Codec codec, RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = createPromise();
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(nodes.size());
@ -284,11 +291,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
for (MasterSlaveEntry entry : nodes) {
RPromise<R> promise = new RedissonPromise<R>();
promise.addListener(listener);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null);
async(true, new NodeSource(entry), codec, command, params, promise, 0, true, null);
}
return mainPromise;
}
@Override
public <T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();

@ -138,7 +138,7 @@ public class PubSubConnectionEntry {
return false;
}
private void removeListener(ChannelName channelName, RedisPubSubListener<?> listener) {
public void removeListener(ChannelName channelName, RedisPubSubListener<?> listener) {
Queue<RedisPubSubListener<?>> queue = channelListeners.get(channelName);
synchronized (queue) {
if (queue.remove(listener) && queue.isEmpty()) {

@ -21,6 +21,7 @@ import java.util.Map.Entry;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.redisson.RedissonMap;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
@ -37,13 +38,13 @@ import reactor.rx.subscription.ReactiveSubscription;
* @param <V> value type
* @param <M> entry type
*/
public class RedissonMapReactiveIterator<K, V, M> {
public class MapReactiveIterator<K, V, M> {
private final RedissonMap<K, V> map;
private final String pattern;
private final int count;
public RedissonMapReactiveIterator(RedissonMap<K, V> map, String pattern, int count) {
public MapReactiveIterator(RedissonMap<K, V> map, String pattern, int count) {
this.map = map;
this.pattern = pattern;
this.count = count;
@ -69,7 +70,7 @@ public class RedissonMapReactiveIterator<K, V, M> {
protected void nextValues() {
final ReactiveSubscription<M> m = this;
map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count).addListener(new FutureListener<MapScanResult<Object, Object>>() {
scanIterator(client, nextIterPos).addListener(new FutureListener<MapScanResult<Object, Object>>() {
@Override
public void operationComplete(Future<MapScanResult<Object, Object>> future)
@ -129,4 +130,8 @@ public class RedissonMapReactiveIterator<K, V, M> {
};
}
public RFuture<MapScanResult<Object, Object>> scanIterator(RedisClient client, long nextIterPos) {
return map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count);
}
}

@ -38,11 +38,11 @@ import reactor.rx.subscription.ReactiveSubscription;
*/
public class RedissonKeysReactive {
private final CommandReactiveService commandExecutor;
private final CommandReactiveExecutor commandExecutor;
private final RedissonKeys instance;
public RedissonKeysReactive(CommandReactiveService commandExecutor) {
public RedissonKeysReactive(CommandReactiveExecutor commandExecutor) {
super();
instance = new RedissonKeys(commandExecutor);
this.commandExecutor = commandExecutor;

@ -50,7 +50,7 @@ public class RedissonMapCacheReactive<K, V> {
}
public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) mapCache, pattern, count).stream();
return new MapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) mapCache, pattern, count).stream();
}
public Publisher<V> valueIterator() {
@ -66,7 +66,7 @@ public class RedissonMapCacheReactive<K, V> {
}
public Publisher<V> valueIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, V>((RedissonMap<K, V>) mapCache, pattern, count) {
return new MapReactiveIterator<K, V, V>((RedissonMap<K, V>) mapCache, pattern, count) {
@Override
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
@ -87,7 +87,7 @@ public class RedissonMapCacheReactive<K, V> {
}
public Publisher<K> keyIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, K>((RedissonMap<K, V>) mapCache, pattern, count) {
return new MapReactiveIterator<K, V, K>((RedissonMap<K, V>) mapCache, pattern, count) {
@Override
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();

@ -60,7 +60,7 @@ public class RedissonMapReactive<K, V> {
}
public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) instance, pattern, count).stream();
return new MapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) instance, pattern, count).stream();
}
public Publisher<V> valueIterator() {
@ -76,7 +76,7 @@ public class RedissonMapReactive<K, V> {
}
public Publisher<V> valueIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, V>((RedissonMap<K, V>) instance, pattern, count) {
return new MapReactiveIterator<K, V, V>((RedissonMap<K, V>) instance, pattern, count) {
@Override
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
@ -97,7 +97,7 @@ public class RedissonMapReactive<K, V> {
}
public Publisher<K> keyIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, K>((RedissonMap<K, V>) instance, pattern, count) {
return new MapReactiveIterator<K, V, K>((RedissonMap<K, V>) instance, pattern, count) {
@Override
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();

Loading…
Cancel
Save