Slot key calculation support. #27

pull/110/head
Nikita 10 years ago
parent 971e99e79a
commit 48fc652713

@ -58,7 +58,7 @@ public class RedissonHyperLogLog<V> extends RedissonObject implements RHyperLogL
@Override
public Future<Long> addAsync(final V obj) {
return connectionManager.writeAsync(new ResultOperation<Long, V>() {
return connectionManager.writeAsync(getName(), new ResultOperation<Long, V>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, V> async) {
return async.pfadd(getName(), obj);
@ -68,7 +68,7 @@ public class RedissonHyperLogLog<V> extends RedissonObject implements RHyperLogL
@Override
public Future<Long> addAllAsync(final Collection<V> objects) {
return connectionManager.writeAsync(new ResultOperation<Long, Object>() {
return connectionManager.writeAsync(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.pfadd(getName(), objects.toArray());
@ -78,7 +78,7 @@ public class RedissonHyperLogLog<V> extends RedissonObject implements RHyperLogL
@Override
public Future<Long> countAsync() {
return connectionManager.writeAsync(new ResultOperation<Long, Object>() {
return connectionManager.writeAsync(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.pfcount(getName());
@ -88,7 +88,7 @@ public class RedissonHyperLogLog<V> extends RedissonObject implements RHyperLogL
@Override
public Future<Long> countWithAsync(final String... otherLogNames) {
return connectionManager.writeAsync(new ResultOperation<Long, Object>() {
return connectionManager.writeAsync(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.pfcount(getName(), otherLogNames);
@ -98,7 +98,7 @@ public class RedissonHyperLogLog<V> extends RedissonObject implements RHyperLogL
@Override
public Future<Long> mergeWithAsync(final String... otherLogNames) {
return connectionManager.writeAsync(new ResultOperation<Long, Object>() {
return connectionManager.writeAsync(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.pfmerge(getName(), otherLogNames);

@ -50,7 +50,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public int size() {
return connectionManager.read(new ResultOperation<Long, V>() {
return connectionManager.read(getName(), new ResultOperation<Long, V>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, V> async) {
return async.llen(getName());
@ -96,7 +96,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
protected boolean remove(final Object o, final int count) {
return connectionManager.write(new ResultOperation<Long, Object>() {
return connectionManager.write(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.lrem(getName(), count, o);
@ -114,7 +114,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
int to = div(size(), batchSize);
for (int i = 0; i < to; i++) {
final int j = i;
List<Object> range = connectionManager.read(new ResultOperation<List<Object>, Object>() {
List<Object> range = connectionManager.read(getName(), new ResultOperation<List<Object>, Object>() {
@Override
protected Future<List<Object>> execute(RedisAsyncConnection<Object, Object> async) {
return async.lrange(getName(), j*batchSize, j*batchSize + batchSize - 1);
@ -134,7 +134,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public boolean addAll(final Collection<? extends V> c) {
connectionManager.write(new ResultOperation<Long, Object>() {
connectionManager.write(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.rpush(getName(), c.toArray());
@ -147,7 +147,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public boolean addAll(final int index, final Collection<? extends V> coll) {
checkPosition(index);
if (index < size()) {
return connectionManager.write(new SyncOperation<Object, Boolean>() {
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> conn) {
while (true) {
@ -181,7 +181,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public boolean removeAll(final Collection<?> c) {
return connectionManager.write(new SyncOperation<Object, Boolean>() {
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> conn) {
boolean result = false;
@ -212,7 +212,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public void clear() {
connectionManager.write(new ResultOperation<Long, V>() {
connectionManager.write(getName(), new ResultOperation<Long, V>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, V> async) {
return async.del(getName());
@ -227,7 +227,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
private V getValue(final int index) {
return connectionManager.read(new ResultOperation<V, V>() {
return connectionManager.read(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.lindex(getName(), index);
@ -260,7 +260,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public V set(final int index, final V element) {
checkIndex(index);
return connectionManager.write(new SyncOperation<V, V>() {
return connectionManager.write(getName(), new SyncOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> conn) {
while (true) {
@ -297,7 +297,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public V remove(final int index) {
checkIndex(index);
return connectionManager.write(new SyncOperation<Object, V>() {
return connectionManager.write(getName(), new SyncOperation<Object, V>() {
@Override
public V execute(RedisConnection<Object, Object> conn) {
if (index == 0) {
@ -328,7 +328,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
int to = div(size(), batchSize);
for (int i = 0; i < to; i++) {
final int j = i;
List<Object> range = connectionManager.read(new ResultOperation<List<Object>, Object>() {
List<Object> range = connectionManager.read(getName(), new ResultOperation<List<Object>, Object>() {
@Override
protected Future<List<Object>> execute(RedisAsyncConnection<Object, Object> async) {
return async.lrange(getName(), j*batchSize, j*batchSize + batchSize - 1);
@ -354,7 +354,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
for (int i = 1; i <= to; i++) {
final int j = i;
final int startIndex = -i*batchSize;
List<Object> range = connectionManager.read(new ResultOperation<List<Object>, Object>() {
List<Object> range = connectionManager.read(getName(), new ResultOperation<List<Object>, Object>() {
@Override
protected Future<List<Object>> execute(RedisAsyncConnection<Object, Object> async) {
return async.lrange(getName(), startIndex, size - (j-1)*batchSize);
@ -479,7 +479,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex);
}
return connectionManager.read(new ResultOperation<List<V>, V>() {
return connectionManager.read(getName(), new ResultOperation<List<V>, V>() {
@Override
protected Future<List<V>> execute(RedisAsyncConnection<Object, V> async) {

@ -44,7 +44,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
}
public V getFirst() {
V value = connectionManager.read(new ResultOperation<V, V>() {
V value = connectionManager.read(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.lindex(getName(), 0);
@ -71,7 +71,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
@Override
public V poll() {
return connectionManager.write(new ResultOperation<V, V>() {
return connectionManager.write(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.lpop(getName());

@ -47,7 +47,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public int size() {
return connectionManager.read(new ResultOperation<Long, V>() {
return connectionManager.read(getName(), new ResultOperation<Long, V>() {
@Override
public Future<Long> execute(RedisAsyncConnection<Object, V> async) {
return async.scard(getName());
@ -62,7 +62,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public boolean contains(final Object o) {
return connectionManager.read(new ResultOperation<Boolean, Object>() {
return connectionManager.read(getName(), new ResultOperation<Boolean, Object>() {
@Override
public Future<Boolean> execute(RedisAsyncConnection<Object, Object> async) {
return async.sismember(getName(), o);
@ -71,7 +71,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
}
private ListScanResult<V> scanIterator(final long startPos) {
return connectionManager.read(new ResultOperation<ListScanResult<V>, V>() {
return connectionManager.read(getName(), new ResultOperation<ListScanResult<V>, V>() {
@Override
public Future<ListScanResult<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.sscan(getName(), startPos);
@ -132,7 +132,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Object[] toArray() {
Set<V> res = connectionManager.read(new ResultOperation<Set<V>, V>() {
Set<V> res = connectionManager.read(getName(), new ResultOperation<Set<V>, V>() {
@Override
public Future<Set<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.smembers(getName());
@ -143,7 +143,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public <T> T[] toArray(T[] a) {
Set<V> res = connectionManager.read(new ResultOperation<Set<V>, V>() {
Set<V> res = connectionManager.read(getName(), new ResultOperation<Set<V>, V>() {
@Override
public Future<Set<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.smembers(getName());
@ -159,7 +159,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> addAsync(final V e) {
return connectionManager.writeAsync(new AsyncOperation<V, Boolean>() {
return connectionManager.writeAsync(getName(), new AsyncOperation<V, Boolean>() {
@Override
public void execute(final Promise<Boolean> promise, RedisAsyncConnection<Object, V> async) {
async.sadd(getName(), e).addListener(new OperationListener<V, Boolean, Long>(promise, async, this) {
@ -174,7 +174,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> removeAsync(final V e) {
return connectionManager.writeAsync(new AsyncOperation<V, Boolean>() {
return connectionManager.writeAsync(getName(), new AsyncOperation<V, Boolean>() {
@Override
public void execute(final Promise<Boolean> promise, RedisAsyncConnection<Object, V> async) {
async.srem(getName(), e).addListener(new OperationListener<V, Boolean, Long>(promise, async, this) {
@ -204,7 +204,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public boolean addAll(final Collection<? extends V> c) {
Long res = connectionManager.write(new ResultOperation<Long, Object>() {
Long res = connectionManager.write(getName(), new ResultOperation<Long, Object>() {
@Override
public Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.sadd(getName(), c.toArray());
@ -227,7 +227,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public boolean removeAll(final Collection<?> c) {
Long res = connectionManager.write(new ResultOperation<Long, Object>() {
Long res = connectionManager.write(getName(), new ResultOperation<Long, Object>() {
@Override
public Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.srem(getName(), c.toArray());

@ -97,7 +97,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
loadComparator();
connectionManager.write(new ResultOperation<Boolean, Object>() {
connectionManager.write(getName(), new ResultOperation<Boolean, Object>() {
@Override
protected Future<Boolean> execute(RedisAsyncConnection<Object, Object> async) {
return async.setnx(getCurrentVersionKey(), 0L);
@ -106,7 +106,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
private void loadComparator() {
connectionManager.read(new SyncOperation<V, Void>() {
connectionManager.read(getName(), new SyncOperation<V, Void>() {
@Override
public Void execute(RedisConnection<Object, V> conn) {
loadComparator(conn);
@ -160,7 +160,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public int size() {
return connectionManager.read(new ResultOperation<Long, V>() {
return connectionManager.read(getName(), new ResultOperation<Long, V>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, V> async) {
@ -180,7 +180,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public boolean contains(final Object o) {
return connectionManager.read(new SyncOperation<V, Boolean>() {
return connectionManager.read(getName(), new SyncOperation<V, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, V> conn) {
return binarySearch((V)o, conn).getIndex() >= 0;
@ -251,7 +251,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
private void remove(final int index) {
connectionManager.write(new SyncOperation<Object, V>() {
connectionManager.write(getName(), new SyncOperation<Object, V>() {
@Override
public V execute(RedisConnection<Object, Object> conn) {
if (index == 0) {
@ -278,7 +278,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
private V get(final int index) {
return connectionManager.read(new ResultOperation<V, V>() {
return connectionManager.read(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.lindex(getName(), index);
@ -288,7 +288,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public Object[] toArray() {
List<V> res = connectionManager.read(new ResultOperation<List<V>, V>() {
List<V> res = connectionManager.read(getName(), new ResultOperation<List<V>, V>() {
@Override
protected Future<List<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.lrange(getName(), 0, -1);
@ -299,7 +299,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public <T> T[] toArray(T[] a) {
List<V> res = connectionManager.read(new ResultOperation<List<V>, V>() {
List<V> res = connectionManager.read(getName(), new ResultOperation<List<V>, V>() {
@Override
protected Future<List<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.lrange(getName(), 0, -1);
@ -309,7 +309,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
private String getCurrentVersionKey() {
return "redisson__sortedset__version__" + getName();
return "redisson__sortedset__version__{" + getName() + "}";
}
private Long getCurrentVersion(RedisConnection<Object, Object> simpleConnection) {
@ -318,7 +318,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public boolean add(final V value) {
return connectionManager.write(new SyncOperation<V, Boolean>() {
return connectionManager.write(getName(), new SyncOperation<V, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, V> conn) {
return add(value, conn);
@ -485,7 +485,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public boolean remove(final Object value) {
return connectionManager.write(new SyncOperation<V, Boolean>() {
return connectionManager.write(getName(), new SyncOperation<V, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, V> conn) {
return remove(value, conn);
@ -600,7 +600,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public V first() {
V res = connectionManager.read(new ResultOperation<V, V>() {
V res = connectionManager.read(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.lindex(getName(), 0);
@ -614,7 +614,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public V last() {
V res = connectionManager.read(new ResultOperation<V, V>() {
V res = connectionManager.read(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.lindex(getName(), -1);
@ -631,12 +631,12 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
private String getComparatorKeyName() {
return "redisson__sortedset__comparator__" + getName();
return "redisson__sortedset__comparator__{" + getName() + "}";
}
@Override
public boolean trySetComparator(final Comparator<? super V> comparator) {
return connectionManager.write(new SyncOperation<String, Boolean>() {
return connectionManager.write(getName(), new SyncOperation<String, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, String> connection) {
connection.watch(getName(), getComparatorKeyName());

@ -336,6 +336,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (entries.size() == 1) {
return -1;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
key = key.substring(start+1, end);
}
int result = CRC16.crc16(key.getBytes()) % 16384;
log.debug("slot {} for {}", result, key);
return result;

Loading…
Cancel
Save