From 48fc652713b470a153fe854ac379eb65f615121b Mon Sep 17 00:00:00 2001 From: Nikita Date: Sun, 28 Dec 2014 11:19:37 +0300 Subject: [PATCH] Slot key calculation support. #27 --- .../org/redisson/RedissonHyperLogLog.java | 10 +++---- src/main/java/org/redisson/RedissonList.java | 26 ++++++++-------- src/main/java/org/redisson/RedissonQueue.java | 4 +-- src/main/java/org/redisson/RedissonSet.java | 18 +++++------ .../java/org/redisson/RedissonSortedSet.java | 30 +++++++++---------- .../MasterSlaveConnectionManager.java | 7 +++++ 6 files changed, 51 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/redisson/RedissonHyperLogLog.java b/src/main/java/org/redisson/RedissonHyperLogLog.java index c9e75c6ef..d9c213a60 100644 --- a/src/main/java/org/redisson/RedissonHyperLogLog.java +++ b/src/main/java/org/redisson/RedissonHyperLogLog.java @@ -58,7 +58,7 @@ public class RedissonHyperLogLog extends RedissonObject implements RHyperLogL @Override public Future addAsync(final V obj) { - return connectionManager.writeAsync(new ResultOperation() { + return connectionManager.writeAsync(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.pfadd(getName(), obj); @@ -68,7 +68,7 @@ public class RedissonHyperLogLog extends RedissonObject implements RHyperLogL @Override public Future addAllAsync(final Collection objects) { - return connectionManager.writeAsync(new ResultOperation() { + return connectionManager.writeAsync(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.pfadd(getName(), objects.toArray()); @@ -78,7 +78,7 @@ public class RedissonHyperLogLog extends RedissonObject implements RHyperLogL @Override public Future countAsync() { - return connectionManager.writeAsync(new ResultOperation() { + return connectionManager.writeAsync(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.pfcount(getName()); @@ -88,7 +88,7 @@ public class RedissonHyperLogLog extends RedissonObject implements RHyperLogL @Override public Future countWithAsync(final String... otherLogNames) { - return connectionManager.writeAsync(new ResultOperation() { + return connectionManager.writeAsync(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.pfcount(getName(), otherLogNames); @@ -98,7 +98,7 @@ public class RedissonHyperLogLog extends RedissonObject implements RHyperLogL @Override public Future mergeWithAsync(final String... otherLogNames) { - return connectionManager.writeAsync(new ResultOperation() { + return connectionManager.writeAsync(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.pfmerge(getName(), otherLogNames); diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index f95a82b1a..54d88ff06 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -50,7 +50,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public int size() { - return connectionManager.read(new ResultOperation() { + return connectionManager.read(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.llen(getName()); @@ -96,7 +96,7 @@ public class RedissonList extends RedissonExpirable implements RList { } protected boolean remove(final Object o, final int count) { - return connectionManager.write(new ResultOperation() { + return connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.lrem(getName(), count, o); @@ -114,7 +114,7 @@ public class RedissonList extends RedissonExpirable implements RList { int to = div(size(), batchSize); for (int i = 0; i < to; i++) { final int j = i; - List range = connectionManager.read(new ResultOperation, Object>() { + List range = connectionManager.read(getName(), new ResultOperation, Object>() { @Override protected Future> execute(RedisAsyncConnection async) { return async.lrange(getName(), j*batchSize, j*batchSize + batchSize - 1); @@ -134,7 +134,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public boolean addAll(final Collection c) { - connectionManager.write(new ResultOperation() { + connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.rpush(getName(), c.toArray()); @@ -147,7 +147,7 @@ public class RedissonList extends RedissonExpirable implements RList { public boolean addAll(final int index, final Collection coll) { checkPosition(index); if (index < size()) { - return connectionManager.write(new SyncOperation() { + return connectionManager.write(getName(), new SyncOperation() { @Override public Boolean execute(RedisConnection conn) { while (true) { @@ -181,7 +181,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public boolean removeAll(final Collection c) { - return connectionManager.write(new SyncOperation() { + return connectionManager.write(getName(), new SyncOperation() { @Override public Boolean execute(RedisConnection conn) { boolean result = false; @@ -212,7 +212,7 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public void clear() { - connectionManager.write(new ResultOperation() { + connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.del(getName()); @@ -227,7 +227,7 @@ public class RedissonList extends RedissonExpirable implements RList { } private V getValue(final int index) { - return connectionManager.read(new ResultOperation() { + return connectionManager.read(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.lindex(getName(), index); @@ -260,7 +260,7 @@ public class RedissonList extends RedissonExpirable implements RList { public V set(final int index, final V element) { checkIndex(index); - return connectionManager.write(new SyncOperation() { + return connectionManager.write(getName(), new SyncOperation() { @Override public V execute(RedisConnection conn) { while (true) { @@ -297,7 +297,7 @@ public class RedissonList extends RedissonExpirable implements RList { public V remove(final int index) { checkIndex(index); - return connectionManager.write(new SyncOperation() { + return connectionManager.write(getName(), new SyncOperation() { @Override public V execute(RedisConnection conn) { if (index == 0) { @@ -328,7 +328,7 @@ public class RedissonList extends RedissonExpirable implements RList { int to = div(size(), batchSize); for (int i = 0; i < to; i++) { final int j = i; - List range = connectionManager.read(new ResultOperation, Object>() { + List range = connectionManager.read(getName(), new ResultOperation, Object>() { @Override protected Future> execute(RedisAsyncConnection async) { return async.lrange(getName(), j*batchSize, j*batchSize + batchSize - 1); @@ -354,7 +354,7 @@ public class RedissonList extends RedissonExpirable implements RList { for (int i = 1; i <= to; i++) { final int j = i; final int startIndex = -i*batchSize; - List range = connectionManager.read(new ResultOperation, Object>() { + List range = connectionManager.read(getName(), new ResultOperation, Object>() { @Override protected Future> execute(RedisAsyncConnection async) { return async.lrange(getName(), startIndex, size - (j-1)*batchSize); @@ -479,7 +479,7 @@ public class RedissonList extends RedissonExpirable implements RList { throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex); } - return connectionManager.read(new ResultOperation, V>() { + return connectionManager.read(getName(), new ResultOperation, V>() { @Override protected Future> execute(RedisAsyncConnection async) { diff --git a/src/main/java/org/redisson/RedissonQueue.java b/src/main/java/org/redisson/RedissonQueue.java index ac721aa14..8f9cf74ab 100644 --- a/src/main/java/org/redisson/RedissonQueue.java +++ b/src/main/java/org/redisson/RedissonQueue.java @@ -44,7 +44,7 @@ public class RedissonQueue extends RedissonList implements RQueue { } public V getFirst() { - V value = connectionManager.read(new ResultOperation() { + V value = connectionManager.read(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.lindex(getName(), 0); @@ -71,7 +71,7 @@ public class RedissonQueue extends RedissonList implements RQueue { @Override public V poll() { - return connectionManager.write(new ResultOperation() { + return connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.lpop(getName()); diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index 63af46463..c56d758a2 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -47,7 +47,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public int size() { - return connectionManager.read(new ResultOperation() { + return connectionManager.read(getName(), new ResultOperation() { @Override public Future execute(RedisAsyncConnection async) { return async.scard(getName()); @@ -62,7 +62,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public boolean contains(final Object o) { - return connectionManager.read(new ResultOperation() { + return connectionManager.read(getName(), new ResultOperation() { @Override public Future execute(RedisAsyncConnection async) { return async.sismember(getName(), o); @@ -71,7 +71,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { } private ListScanResult scanIterator(final long startPos) { - return connectionManager.read(new ResultOperation, V>() { + return connectionManager.read(getName(), new ResultOperation, V>() { @Override public Future> execute(RedisAsyncConnection async) { return async.sscan(getName(), startPos); @@ -132,7 +132,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Object[] toArray() { - Set res = connectionManager.read(new ResultOperation, V>() { + Set res = connectionManager.read(getName(), new ResultOperation, V>() { @Override public Future> execute(RedisAsyncConnection async) { return async.smembers(getName()); @@ -143,7 +143,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public T[] toArray(T[] a) { - Set res = connectionManager.read(new ResultOperation, V>() { + Set res = connectionManager.read(getName(), new ResultOperation, V>() { @Override public Future> execute(RedisAsyncConnection async) { return async.smembers(getName()); @@ -159,7 +159,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future addAsync(final V e) { - return connectionManager.writeAsync(new AsyncOperation() { + return connectionManager.writeAsync(getName(), new AsyncOperation() { @Override public void execute(final Promise promise, RedisAsyncConnection async) { async.sadd(getName(), e).addListener(new OperationListener(promise, async, this) { @@ -174,7 +174,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future removeAsync(final V e) { - return connectionManager.writeAsync(new AsyncOperation() { + return connectionManager.writeAsync(getName(), new AsyncOperation() { @Override public void execute(final Promise promise, RedisAsyncConnection async) { async.srem(getName(), e).addListener(new OperationListener(promise, async, this) { @@ -204,7 +204,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public boolean addAll(final Collection c) { - Long res = connectionManager.write(new ResultOperation() { + Long res = connectionManager.write(getName(), new ResultOperation() { @Override public Future execute(RedisAsyncConnection async) { return async.sadd(getName(), c.toArray()); @@ -227,7 +227,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public boolean removeAll(final Collection c) { - Long res = connectionManager.write(new ResultOperation() { + Long res = connectionManager.write(getName(), new ResultOperation() { @Override public Future execute(RedisAsyncConnection async) { return async.srem(getName(), c.toArray()); diff --git a/src/main/java/org/redisson/RedissonSortedSet.java b/src/main/java/org/redisson/RedissonSortedSet.java index 4aa21cd97..ab8651794 100644 --- a/src/main/java/org/redisson/RedissonSortedSet.java +++ b/src/main/java/org/redisson/RedissonSortedSet.java @@ -97,7 +97,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + connectionManager.write(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.setnx(getCurrentVersionKey(), 0L); @@ -106,7 +106,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + connectionManager.read(getName(), new SyncOperation() { @Override public Void execute(RedisConnection conn) { loadComparator(conn); @@ -160,7 +160,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + return connectionManager.read(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { @@ -180,7 +180,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + return connectionManager.read(getName(), new SyncOperation() { @Override public Boolean execute(RedisConnection conn) { return binarySearch((V)o, conn).getIndex() >= 0; @@ -251,7 +251,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + connectionManager.write(getName(), new SyncOperation() { @Override public V execute(RedisConnection conn) { if (index == 0) { @@ -278,7 +278,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + return connectionManager.read(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.lindex(getName(), index); @@ -288,7 +288,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet res = connectionManager.read(new ResultOperation, V>() { + List res = connectionManager.read(getName(), new ResultOperation, V>() { @Override protected Future> execute(RedisAsyncConnection async) { return async.lrange(getName(), 0, -1); @@ -299,7 +299,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet T[] toArray(T[] a) { - List res = connectionManager.read(new ResultOperation, V>() { + List res = connectionManager.read(getName(), new ResultOperation, V>() { @Override protected Future> execute(RedisAsyncConnection async) { return async.lrange(getName(), 0, -1); @@ -309,7 +309,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet simpleConnection) { @@ -318,7 +318,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + return connectionManager.write(getName(), new SyncOperation() { @Override public Boolean execute(RedisConnection conn) { return add(value, conn); @@ -485,7 +485,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + return connectionManager.write(getName(), new SyncOperation() { @Override public Boolean execute(RedisConnection conn) { return remove(value, conn); @@ -600,7 +600,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + V res = connectionManager.read(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.lindex(getName(), 0); @@ -614,7 +614,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { + V res = connectionManager.read(getName(), new ResultOperation() { @Override protected Future execute(RedisAsyncConnection async) { return async.lindex(getName(), -1); @@ -631,12 +631,12 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet comparator) { - return connectionManager.write(new SyncOperation() { + return connectionManager.write(getName(), new SyncOperation() { @Override public Boolean execute(RedisConnection connection) { connection.watch(getName(), getComparatorKeyName()); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 9b3fd73d7..682d73f08 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -336,6 +336,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (entries.size() == 1) { return -1; } + + int start = key.indexOf('{'); + if (start != -1) { + int end = key.indexOf('}'); + key = key.substring(start+1, end); + } + int result = CRC16.crc16(key.getBytes()) % 16384; log.debug("slot {} for {}", result, key); return result;