Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java
pull/1821/head
Nikita 7 years ago
commit c1f3fa8140

@ -93,13 +93,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
@Override @Override
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) { return get(scanIteratorAsync(name, client, startPos, pattern, count));
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "COUNT", count);
return get(f);
}
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "MATCH", pattern, "COUNT", count);
return get(f);
} }
@Override @Override
@ -577,7 +571,11 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
@Override @Override
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos,
String pattern, int count) { String pattern, int count) {
throw new UnsupportedOperationException(); if (pattern == null) {
return commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "COUNT", count);
}
return commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "MATCH", pattern, "COUNT", count);
} }
} }

@ -141,18 +141,27 @@ public interface RKeysReactive {
Publisher<RType> getType(String key); Publisher<RType> getType(String key);
/** /**
* Load keys in incrementally iterate mode. * Load keys in incrementally iterate mode. Keys traversed with SCAN operation.
* Each SCAN operation loads up to 10 keys per request.
* *
* Uses <code>SCAN</code> Redis command. * @return keys
*
* @return all keys
*/ */
Publisher<String> getKeys(); Publisher<String> getKeys();
/** /**
* Find keys by pattern and load it in incrementally iterate mode. * Load keys in incrementally iterate mode. Keys traversed with SCAN operation.
* Each SCAN operation loads up to <code>count</code> keys per request.
* *
* Uses <code>SCAN</code> Redis command. * @param count - keys loaded per request to Redis
* @return keys
*/
Publisher<String> getKeys(int count);
/**
* Find keys by pattern and load it in incrementally iterate mode.
* Keys traversed with SCAN operation.
* Each SCAN operation loads up to 10 keys per request.
* <p>
* *
* Supported glob-style patterns: * Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo * h?llo subscribes to hello, hallo and hxllo
@ -164,6 +173,25 @@ public interface RKeysReactive {
*/ */
Publisher<String> getKeysByPattern(String pattern); Publisher<String> getKeysByPattern(String pattern);
/**
* Get all keys by pattern using iterator.
* Keys traversed with SCAN operation. Each SCAN operation loads
* up to <code>count</code> keys per request.
* <p>
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern - match pattern
* @param count - keys loaded per request to Redis
* @return keys
*/
Publisher<String> getKeysByPattern(String pattern, int count);
/** /**
* Get hash slot identifier for key. * Get hash slot identifier for key.
* Available for cluster nodes only. * Available for cluster nodes only.

@ -15,6 +15,7 @@
*/ */
package org.redisson.api; package org.redisson.api;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -28,6 +29,34 @@ import org.reactivestreams.Publisher;
*/ */
public interface RSetReactive<V> extends RCollectionReactive<V> { public interface RSetReactive<V> extends RCollectionReactive<V> {
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return iterator
*/
Publisher<V> iterator(int count);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return iterator
*/
Publisher<V> iterator(String pattern, int count);
/**
* Returns iterator over elements in this set matches <code>pattern</code>.
*
* @param pattern - search pattern
* @return iterator
*/
Publisher<V> iterator(String pattern);
/** /**
* Removes and returns random elements from set * Removes and returns random elements from set
* in async mode * in async mode

@ -67,10 +67,15 @@ public class RedissonKeysReactive implements RKeysReactive {
} }
@Override @Override
public Publisher<String> getKeysByPattern(final String pattern) { public Publisher<String> getKeysByPattern(String pattern) {
return getKeysByPattern(pattern, 10);
}
@Override
public Publisher<String> getKeysByPattern(String pattern, int count) {
List<Publisher<String>> publishers = new ArrayList<Publisher<String>>(); List<Publisher<String>> publishers = new ArrayList<Publisher<String>>();
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
publishers.add(createKeysIterator(entry, pattern)); publishers.add(createKeysIterator(entry, pattern, count));
} }
return Flux.merge(publishers); return Flux.merge(publishers);
} }
@ -80,14 +85,19 @@ public class RedissonKeysReactive implements RKeysReactive {
return getKeysByPattern(null); return getKeysByPattern(null);
} }
private Publisher<ListScanResult<String>> scanIterator(MasterSlaveEntry entry, long startPos, String pattern) { @Override
public Publisher<String> getKeys(int count) {
return getKeysByPattern(null, count);
}
private Publisher<ListScanResult<String>> scanIterator(MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) { if (pattern == null) {
return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos); return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
} }
return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern); return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
} }
private Publisher<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern) { private Publisher<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) {
return Flux.create(new Consumer<FluxSink<String>>() { return Flux.create(new Consumer<FluxSink<String>>() {
@Override @Override
@ -105,7 +115,7 @@ public class RedissonKeysReactive implements RKeysReactive {
} }
protected void nextValues(FluxSink<String> emitter) { protected void nextValues(FluxSink<String> emitter) {
scanIterator(entry, nextIterPos, pattern).subscribe(new Subscriber<ListScanResult<String>>() { scanIterator(entry, nextIterPos, pattern, count).subscribe(new Subscriber<ListScanResult<String>>() {
@Override @Override
public void onSubscribe(Subscription s) { public void onSubscribe(Subscription s) {

@ -110,8 +110,13 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
}); });
} }
private Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long startPos) { private Publisher<ListScanResult<Object>> scanIteratorReactive(final RedisClient client, final long startPos, final String pattern, final int count) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); return reactive(new Supplier<RFuture<ListScanResult<Object>>>() {
@Override
public RFuture<ListScanResult<Object>> get() {
return ((RedissonSet)instance).scanIteratorAsync(getName(), client, startPos, pattern, count);
}
});
} }
@Override @Override
@ -252,13 +257,28 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
} }
@Override @Override
public Publisher<V> iterator() { public Publisher<V> iterator(int count) {
return iterator(null, count);
}
@Override
public Publisher<V> iterator(String pattern) {
return iterator(pattern, 10);
}
@Override
public Publisher<V> iterator(final String pattern, final int count) {
return Flux.create(new SetReactiveIterator<V>() { return Flux.create(new SetReactiveIterator<V>() {
@Override @Override
protected Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos) { protected Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos); return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos, pattern, count);
} }
}); });
} }
@Override
public Publisher<V> iterator() {
return iterator(null, 10);
}
} }

Loading…
Cancel
Save