Merge branch 'mrniko/master' into feature/travis-ci

pull/509/head
Rui Gu 9 years ago
commit 4380ead23d

@ -2,6 +2,21 @@ Redisson Releases History
================================
####Please Note: trunk is current development branch.
####04-Apr-2016 - version 2.2.11 released
Since this version Redisson has __perfomance boost up to 43%__
Feature - __new object added__ `RGeo`
Feature - __new object added__ `RBuckets`
Feature - travis-ci integration (thanks to jackygurui)
Improvement - `RScoredSortedSet.removeAllAsync` & `removeAll` methods optimization
Improvement - `RemoteService` reliability tuned up
Improvement - Reattaching RBlockingQueue\Deque blocking commands (poll, take ...) after Redis failover process or channel reconnection
Fixed - iterator objects may skip results in some cases
Fixed - RTopic listeners hangs during synchronous commands execution inside it
Fixed - Redisson hangs during shutdown if `RBlockingQueue\Deque.take` or `RBlockingQueue\Deque.poll` methods were invoked
####23-Mar-2016 - version 2.2.10 released
Feature - __new object added__ `RRemoteService`

@ -82,12 +82,12 @@ Include the following to your dependency list:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.2.10</version>
<version>2.2.11</version>
</dependency>
### Gradle
compile 'org.redisson:redisson:2.2.10'
compile 'org.redisson:redisson:2.2.11'
### Supported by

@ -3,7 +3,7 @@
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.2.11-SNAPSHOT</version>
<version>2.2.12-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Redisson</name>
@ -100,33 +100,33 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.0.35.Final</version>
<version>4.0.36.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.0.35.Final</version>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>4.0.35.Final</version>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.0.35.Final</version>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.0.35.Final</version>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.0.35.Final</version>
<version>4.0.36.Final</version>
</dependency>
<dependency>
@ -261,7 +261,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<version>2.10</version>
<configuration>
<downloadSources>true</downloadSources>
<forceRecheck>true</forceRecheck>
@ -310,7 +310,7 @@
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<version>3.5.1</version>
<configuration>
<source>${source.version}</source>
<target>${source.version}</target>
@ -336,7 +336,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<version>3.0.0</version>
<executions>
<execution>
<id>attach-sources</id>
@ -351,7 +351,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.16</version>
<version>2.19.1</version>
</plugin>
<plugin>

@ -0,0 +1,96 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.ListScanResult;
abstract class RedissonBaseIterator<V> implements Iterator<V> {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private long startPos = -1;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos;
do {
prevIterPos = nextIterPos;
ListScanResult<V> res = iterator(client, nextIterPos);
client = res.getRedisClient();
if (startPos == -1) {
startPos = res.getPos();
}
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues) && res.getPos() == startPos) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
} while (!iter.hasNext() && nextIterPos != prevIterPos);
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
abstract ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos);
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element");
}
value = iter.next();
currentElementRemoved = false;
return value;
}
@Override
public void remove() {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
remove(value);
currentElementRemoved = true;
removeExecuted = true;
}
abstract void remove(V value);
}

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -32,10 +33,12 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> firstValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> iter;
protected long iterPos = 0;
protected long nextIterPos;
protected long startPos = -1;
protected InetSocketAddress client;
private boolean finished;
private boolean currentElementRemoved;
private boolean removeExecuted;
protected Map.Entry<ScanObjectEntry, ScanObjectEntry> entry;
@ -44,26 +47,41 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
if (finished) {
return false;
}
if (iter == null || !iter.hasNext()) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator();
client = res.getRedisClient();
if (iterPos == 0 && firstValues == null) {
firstValues = convert(res.getMap());
} else {
Map<ByteBuf, ByteBuf> newValues = convert(res.getMap());
if (newValues.equals(firstValues)) {
finished = true;
free(firstValues);
if (nextIterPos == -1) {
return false;
}
long prevIterPos;
do {
prevIterPos = nextIterPos;
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator();
client = res.getRedisClient();
if (startPos == -1) {
startPos = res.getPos();
}
if (nextIterPos == 0 && firstValues == null) {
firstValues = convert(res.getMap());
} else {
Map<ByteBuf, ByteBuf> newValues = convert(res.getMap());
if (newValues.equals(firstValues)) {
finished = true;
free(firstValues);
free(newValues);
firstValues = null;
return false;
}
free(newValues);
firstValues = null;
return false;
}
free(newValues);
iter = res.getMap().entrySet().iterator();
nextIterPos = res.getPos();
} while (!iter.hasNext() && nextIterPos != prevIterPos);
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
iter = res.getMap().entrySet().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
protected abstract MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator();
@ -90,7 +108,7 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
}
entry = iter.next();
removeExecuted = false;
currentElementRemoved = false;
return getValue(entry);
}
@ -108,14 +126,16 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
@Override
public void remove() {
if (removeExecuted) {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
// lazy init iterator
hasNext();
iter.remove();
removeKey();
currentElementRemoved = true;
removeExecuted = true;
}

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -23,7 +24,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -90,55 +90,18 @@ public class RedissonKeys implements RKeys {
}
private Iterator<String> createKeysIterator(final int slot, final String pattern) {
return new Iterator<String>() {
private List<String> firstValues;
private Iterator<String> iter;
private long iterPos;
private boolean removeExecuted;
private String value;
return new RedissonBaseIterator<String>() {
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
ListScanResult<String> res = scanIterator(slot, iterPos, pattern);
if (iterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
ListScanResult<String> iterator(InetSocketAddress client, long nextIterPos) {
return RedissonKeys.this.scanIterator(slot, nextIterPos, pattern);
}
@Override
public String next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element");
}
value = iter.next();
removeExecuted = false;
return value;
void remove(String value) {
RedissonKeys.this.delete(value);
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
delete(value);
removeExecuted = true;
}
};
}

@ -29,7 +29,7 @@ public class RedissonMapIterator<K, V, M> extends RedissonBaseMapIterator<K, V,
}
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() {
return map.scanIterator(client, iterPos);
return map.scanIterator(client, nextIterPos);
}
protected void removeKey() {

@ -29,7 +29,7 @@ public class RedissonMultiMapKeysIterator<K, V, M> extends RedissonBaseMapIterat
}
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() {
return map.scanIterator(client, iterPos);
return map.scanIterator(client, nextIterPos);
}
protected void removeKey() {

@ -25,7 +25,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScoredCodec;
@ -245,54 +244,18 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long iterPos;
private boolean removeExecuted;
private V value;
return new RedissonBaseIterator<V>() {
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
ListScanResult<V> res = scanIterator(client, iterPos);
client = res.getRedisClient();
if (iterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
removeExecuted = false;
return value;
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
iter.remove();
void remove(V value) {
RedissonScoredSortedSet.this.remove(value);
removeExecuted = true;
}
};
}

@ -82,66 +82,18 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos = nextIterPos;
ListScanResult<V> res = scanIterator(client, nextIterPos);
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
return new RedissonBaseIterator<V>() {
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
currentElementRemoved = false;
return value;
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}
@Override
public void remove() {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
void remove(V value) {
RedissonSet.this.remove(value);
currentElementRemoved = true;
removeExecuted = true;
}
};
}

@ -161,66 +161,18 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos = nextIterPos;
ListScanResult<V> res = scanIterator(client, nextIterPos);
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
return new RedissonBaseIterator<V>() {
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
currentElementRemoved = false;
return value;
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}
@Override
public void remove() {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
void remove(V value) {
RedissonSetCache.this.remove(value);
currentElementRemoved = true;
removeExecuted = true;
}
};
}

@ -22,7 +22,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import org.redisson.client.codec.Codec;
@ -128,66 +127,18 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos = nextIterPos;
ListScanResult<V> res = scanIterator(client, nextIterPos);
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
return new RedissonBaseIterator<V>() {
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
currentElementRemoved = false;
return value;
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}
@Override
public void remove() {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
void remove(V value) {
RedissonSetMultimapValues.this.remove(value);
currentElementRemoved = true;
removeExecuted = true;
}
};
}

@ -19,6 +19,7 @@ import java.util.List;
import java.util.Map;
/**
* Geospatial items holder
*
* @author Nikita Koksharov
*
@ -26,26 +27,140 @@ import java.util.Map;
*/
public interface RGeo<V> extends RExpirable, RGeoAsync<V> {
/**
* Adds geospatial member.
*
* @param entries
* @return number of elements added to the sorted set,
* not including elements already existing for which
* the score was updated
*/
long add(double longitude, double latitude, V member);
/**
* Adds geospatial members.
*
* @param entries
* @return number of elements added to the sorted set,
* not including elements already existing for which
* the score was updated
*/
long add(GeoEntry... entries);
/**
* Returns distance between members in <code>GeoUnit</code> units.
*
* @see {@link GeoUnit}
*
* @param firstMember
* @param secondMember
* @param geoUnit
* @return
*/
Double dist(V firstMember, V secondMember, GeoUnit geoUnit);
/**
* Returns 11 characters Geohash string mapped by defined member.
*
* @param members
* @return
*/
Map<V, String> hash(V... members);
/**
* Returns geo-position mapped by defined member.
*
* @param members
* @return
*/
Map<V, GeoPosition> pos(V... members);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
List<V> radius(double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Returns the distance mapped by member, distance between member and the location.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Map<V, Double> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Map<V, GeoPosition> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
List<V> radius(V member, double radius, GeoUnit geoUnit);
/**
* Returns the distance mapped by member, distance between member and the defined member location.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Map<V, Double> radiusWithDistance(V member, double radius, GeoUnit geoUnit);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Map<V, GeoPosition> radiusWithPosition(V member, double radius, GeoUnit geoUnit);
}

@ -28,26 +28,140 @@ import io.netty.util.concurrent.Future;
*/
public interface RGeoAsync<V> extends RExpirableAsync {
/**
* Adds geospatial member.
*
* @param entries
* @return number of elements added to the sorted set,
* not including elements already existing for which
* the score was updated
*/
Future<Long> addAsync(double longitude, double latitude, V member);
/**
* Adds geospatial members.
*
* @param entries
* @return number of elements added to the sorted set,
* not including elements already existing for which
* the score was updated
*/
Future<Long> addAsync(GeoEntry... entries);
/**
* Returns distance between members in <code>GeoUnit</code> units.
*
* @see {@link GeoUnit}
*
* @param firstMember
* @param secondMember
* @param geoUnit
* @return
*/
Future<Double> distAsync(V firstMember, V secondMember, GeoUnit geoUnit);
/**
* Returns 11 characters Geohash string mapped by defined member.
*
* @param members
* @return
*/
Future<Map<V, String>> hashAsync(V... members);
/**
* Returns geo-position mapped by defined member.
*
* @param members
* @return
*/
Future<Map<V, GeoPosition>> posAsync(V... members);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Future<List<V>> radiusAsync(double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Returns the distance mapped by member, distance between member and the location.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Future<Map<V, Double>> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Future<Map<V, GeoPosition>> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Future<List<V>> radiusAsync(V member, double radius, GeoUnit geoUnit);
/**
* Returns the distance mapped by member, distance between member and the defined member location.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Future<Map<V, Double>> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude
* @param latitude
* @param radius
* @param geoUnit
* @return
*/
Future<Map<V, GeoPosition>> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit);
}

@ -15,6 +15,8 @@
*/
package org.redisson.reactive;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.api.RCollectionReactive;
@ -40,9 +42,10 @@ public class PublisherAdder<V> {
c.subscribe(new DefaultSubscriber<V>() {
volatile boolean completed;
AtomicLong values = new AtomicLong();
Subscription s;
Long lastSize = 0L;
V lastValue;
@Override
public void onSubscribe(Subscription s) {
@ -52,7 +55,7 @@ public class PublisherAdder<V> {
@Override
public void onNext(V o) {
lastValue = o;
values.getAndIncrement();
destination.add(o).subscribe(new DefaultSubscriber<Long>() {
@Override
@ -68,19 +71,18 @@ public class PublisherAdder<V> {
@Override
public void onNext(Long o) {
lastSize = sum(lastSize, o);
}
@Override
public void onComplete() {
lastValue = null;
s.request(1);
if (values.decrementAndGet() == 0 && completed) {
promise.onNext(lastSize);
}
}
});
}
@Override
public void onComplete() {
if (lastValue == null) {
completed = true;
if (values.get() == 0) {
promise.onNext(lastSize);
}
}

@ -16,7 +16,6 @@
package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
@ -24,56 +23,36 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
public abstract class SetReactiveIterator<V> extends Stream<V> {
@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new SubscriberBarrier<V, V>(t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
private List<V> firstValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
private List<V> prevValues = new ArrayList<V>();
@Override
protected void doRequest(long n) {
protected void onRequest(long n) {
currentIndex = n;
if (!prevValues.isEmpty()) {
List<V> vals = new ArrayList<V>(prevValues);
prevValues.clear();
handle(vals);
if (currentIndex == 0) {
return;
}
}
nextValues();
}
private void handle(List<V> vals) {
for (V val : vals) {
if (currentIndex > 0) {
onNext(val);
} else {
prevValues.add(val);
}
currentIndex--;
if (currentIndex == 0) {
onComplete();
}
onNext(val);
}
}
protected void nextValues() {
final SubscriberBarrier<V, V> m = this;
final ReactiveSubscription<V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
@Override

@ -23,6 +23,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.core.Predicate;
import org.redisson.core.RMap;
import org.redisson.core.RSet;
import io.netty.util.concurrent.Future;
@ -257,6 +258,26 @@ public class RedissonMapTest extends BaseTest {
assertThat(val2).isEqualTo(4);
}
@Test
public void testIteratorRemoveHighVolume() throws InterruptedException {
RMap<Integer, Integer> map = redisson.getMap("simpleMap");
for (int i = 0; i < 10000; i++) {
map.put(i, i*10);
}
int cnt = 0;
Iterator<Integer> iterator = map.keySet().iterator();
while (iterator.hasNext()) {
Integer integer = iterator.next();
iterator.remove();
cnt++;
}
Assert.assertEquals(10000, cnt);
assertThat(map).isEmpty();
Assert.assertEquals(0, map.size());
}
@Test
public void testIterator() {
RMap<Integer, Integer> rMap = redisson.getMap("123");

Loading…
Cancel
Save