diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index db480403b..ccde73895 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -413,12 +413,12 @@ public class Redisson implements RedissonClient { @Override public RSortedSet getSortedSet(String name) { - return new RedissonSortedSet(commandExecutor, name); + return new RedissonSortedSet(commandExecutor, name, this); } @Override public RSortedSet getSortedSet(String name, Codec codec) { - return new RedissonSortedSet(codec, commandExecutor, name); + return new RedissonSortedSet(codec, commandExecutor, name, this); } @Override diff --git a/src/main/java/org/redisson/RedissonListMultimapValues.java b/src/main/java/org/redisson/RedissonListMultimapValues.java index 3cf698a8e..3e40b3308 100644 --- a/src/main/java/org/redisson/RedissonListMultimapValues.java +++ b/src/main/java/org/redisson/RedissonListMultimapValues.java @@ -433,8 +433,6 @@ public class RedissonListMultimapValues extends RedissonExpirable implements @Override public V remove(int index) { - checkIndex(index); - if (index == 0) { Future f = commandExecutor.writeAsync(getName(), codec, LPOP, getName()); return get(f); @@ -442,18 +440,26 @@ public class RedissonListMultimapValues extends RedissonExpirable implements Future f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT, "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + - "local tail = redis.call('lrange', KEYS[1], ARGV[1] + 1, -1);" + - "redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" + - "if #tail > 0 then " + - "for i=1, #tail, 5000 do " - + "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); " - + "end " - + "end;" + - "return v", + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" + + "return v", Collections.singletonList(getName()), index); return get(f); } + @Override + public void fastRemove(int index) { + get(fastRemoveAsync(index)); + } + + @Override + public Future fastRemoveAsync(int index) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');", + Collections.singletonList(getName()), index); + } + @Override public int indexOf(Object o) { return get(indexOfAsync(o)); diff --git a/src/main/java/org/redisson/RedissonSortedSet.java b/src/main/java/org/redisson/RedissonSortedSet.java index e0dd93bd7..8f1b3edb3 100644 --- a/src/main/java/org/redisson/RedissonSortedSet.java +++ b/src/main/java/org/redisson/RedissonSortedSet.java @@ -19,28 +19,25 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.math.BigDecimal; import java.math.BigInteger; import java.security.MessageDigest; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; -import java.util.List; import java.util.NoSuchElementException; import java.util.SortedSet; -import org.redisson.client.RedisConnection; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; +import org.redisson.core.RBucket; +import org.redisson.core.RLock; import org.redisson.core.RSortedSet; import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; /** @@ -95,34 +92,36 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet comparator = NaturalComparator.NATURAL_ORDER; CommandExecutor commandExecutor; + + private RLock lock; + private RedissonList list; + private RBucket comparatorHolder; - protected RedissonSortedSet(CommandExecutor commandExecutor, String name) { + protected RedissonSortedSet(CommandExecutor commandExecutor, String name, Redisson redisson) { super(commandExecutor, name); this.commandExecutor = commandExecutor; + comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE); + lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}"); + list = (RedissonList) redisson.getList(getName()); + loadComparator(); } - public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name) { + public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name, Redisson redisson) { super(codec, commandExecutor, name); this.commandExecutor = commandExecutor; + comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE); + lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}"); + list = (RedissonList) redisson.getList(getName()); + loadComparator(); } private void loadComparator() { - commandExecutor.read(getName(), codec, new SyncOperation() { - @Override - public Void execute(Codec codec, RedisConnection conn) { - loadComparator(conn); - return null; - } - }); - } - - private void loadComparator(RedisConnection connection) { try { - String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName()); + String comparatorSign = comparatorHolder.get(); if (comparatorSign != null) { String[] parts = comparatorSign.split(":"); String className = parts[0]; @@ -136,6 +135,8 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet clazz = Class.forName(className); comparator = (Comparator) clazz.newInstance(); } + } catch (IllegalStateException e) { + throw e; } catch (Exception e) { throw new IllegalStateException(e); } @@ -163,26 +164,17 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { - @Override - public Boolean execute(Codec codec, RedisConnection conn) { - return binarySearch((V)o, codec, conn).getIndex() >= 0; - } - }); + return binarySearch((V)o, codec).getIndex() >= 0; } public Iterator iterator() { @@ -206,7 +198,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1); - return res.toArray(); + return list.toArray(); } @Override public T[] toArray(T[] a) { - List res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1); - return res.toArray(a); + return list.toArray(a); } @Override - public boolean add(final V value) { - return commandExecutor.write(getName(), codec, new SyncOperation() { - @Override - public Boolean execute(Codec codec, RedisConnection conn) { - return add(value, codec, conn); - } - }); - } - - public Future addAsync(final V value) { - final Promise promise = commandExecutor.getConnectionManager().newPromise(); - commandExecutor.getConnectionManager().getGroup().execute(new Runnable() { - public void run() { - try { - boolean res = add(value); - promise.setSuccess(res); - } catch (Exception e) { - promise.setFailure(e); - } - } - }); - return promise; - } - - boolean add(V value, Codec codec, RedisConnection connection) { - while (true) { - connection.sync(RedisCommands.WATCH, getName(), getComparatorKeyName()); - - checkComparator(connection); - - BinarySearchResult res = binarySearch(value, codec, connection); - if (res.getIndex() == null) { - continue; - } + public boolean add(V value) { + lock.lock(); + + try { + checkComparator(); + + BinarySearchResult res = binarySearch(value, codec); if (res.getIndex() < 0) { int index = -(res.getIndex() + 1); @@ -284,45 +243,47 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet re = connection.sync(codec, RedisCommands.EXEC); - if (re.size() == 1) { - return true; - } + commandExecutor.evalWrite(getName(), RedisCommands.EVAL_VOID, + "local len = redis.call('llen', KEYS[1]);" + + "if tonumber(ARGV[1]) < len then " + + "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);" + + "redis.call('linsert', KEYS[1], 'before', pivot, ARGV[2]);" + + "return;" + + "end;" + + "redis.call('rpush', KEYS[1], ARGV[2]);", Arrays.asList(getName()), index, encodedValue); + return true; } else { - connection.sync(RedisCommands.UNWATCH); return false; } + } finally { + lock.unlock(); } } - private void checkComparator(RedisConnection connection) { - String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName()); + private void checkComparator() { + String comparatorSign = comparatorHolder.get(); if (comparatorSign != null) { String[] vals = comparatorSign.split(":"); String className = vals[0]; if (!comparator.getClass().getName().equals(className)) { - loadComparator(connection); + loadComparator(); } } } - public static double calcIncrement(double value) { - BigDecimal b = BigDecimal.valueOf(value); - BigDecimal r = b.remainder(BigDecimal.ONE); - if (r.compareTo(BigDecimal.ZERO) == 0) { - return 1; - } - double res = 1/Math.pow(10, r.scale()); - return res; + public Future addAsync(final V value) { + final Promise promise = newPromise(); + commandExecutor.getConnectionManager().getGroup().execute(new Runnable() { + public void run() { + try { + boolean res = add(value); + promise.setSuccess(res); + } catch (Exception e) { + promise.setFailure(e); + } + } + }); + return promise; } @Override @@ -346,44 +307,21 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { - @Override - public Boolean execute(Codec codec, RedisConnection conn) { - return remove(value, codec, conn); - } - }); - } + public boolean remove(Object value) { + lock.lock(); - boolean remove(Object value, Codec codec, RedisConnection conn) { - while (true) { - conn.sync(RedisCommands.WATCH, getName()); - BinarySearchResult res = binarySearch((V) value, codec, conn); - if (res.getIndex() == null) { - conn.sync(RedisCommands.UNWATCH); - continue; - } + try { + checkComparator(); + + BinarySearchResult res = binarySearch((V) value, codec); if (res.getIndex() < 0) { - conn.sync(RedisCommands.UNWATCH); return false; } - conn.sync(RedisCommands.MULTI); - if (res.getIndex() == 0) { - conn.sync(codec, RedisCommands.LPOP, getName()); - } else { - conn.sync(RedisCommands.EVAL_VOID, - "local len = redis.call('llen', KEYS[1]);" - + "local tail = redis.call('lrange', KEYS[1], tonumber(ARGV[1]) + 1, len);" - + "redis.call('ltrim', KEYS[1], 0, tonumber(ARGV[1]) - 1);" - + "if #tail > 0 then " - + "redis.call('rpush', KEYS[1], unpack(tail)); " - + "end;", 1, getName(), res.getIndex()); - } - - if (((List)conn.sync(codec, RedisCommands.EXEC)).size() == 1) { - return true; - } + list.remove((int)res.getIndex()); + return true; + } finally { + lock.unlock(); } } @@ -460,7 +398,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet binarySearch(V value, Codec codec, RedisConnection connection) { - int size = size(connection); + + public BinarySearchResult binarySearch(V value, Codec codec) { + int size = list.size(); int upperIndex = size - 1; int lowerIndex = 0; while (lowerIndex <= upperIndex) { int index = lowerIndex + (upperIndex - lowerIndex) / 2; - V res = connection.sync(codec, RedisCommands.LINDEX, getName(), index); + V res = list.getValue(index); if (res == null) { return new BinarySearchResult(); } diff --git a/src/main/java/org/redisson/SyncOperation.java b/src/main/java/org/redisson/SyncOperation.java deleted file mode 100644 index d6fc5449d..000000000 --- a/src/main/java/org/redisson/SyncOperation.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright 2014 Nikita Koksharov, Nickolay Borbit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson; - -import org.redisson.client.RedisConnection; -import org.redisson.client.codec.Codec; - -public interface SyncOperation { - - R execute(Codec codec, RedisConnection conn); - -} diff --git a/src/main/java/org/redisson/command/CommandSyncExecutor.java b/src/main/java/org/redisson/command/CommandSyncExecutor.java index a022cd4c8..4e60b8def 100644 --- a/src/main/java/org/redisson/command/CommandSyncExecutor.java +++ b/src/main/java/org/redisson/command/CommandSyncExecutor.java @@ -18,7 +18,6 @@ package org.redisson.command; import java.net.InetSocketAddress; import java.util.List; -import org.redisson.SyncOperation; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; @@ -42,10 +41,6 @@ public interface CommandSyncExecutor { R read(String key, RedisCommand command, Object ... params); - R read(String key, Codec codec, SyncOperation operation); - - R write(String key, Codec codec, SyncOperation operation); - R read(String key, Codec codec, RedisCommand command, Object ... params); R evalRead(String key, RedisCommand evalCommandType, String script, List keys, Object ... params); diff --git a/src/main/java/org/redisson/command/CommandSyncService.java b/src/main/java/org/redisson/command/CommandSyncService.java index 738ac94ae..007ea6ab1 100644 --- a/src/main/java/org/redisson/command/CommandSyncService.java +++ b/src/main/java/org/redisson/command/CommandSyncService.java @@ -18,23 +18,13 @@ package org.redisson.command; import java.net.InetSocketAddress; import java.util.List; -import org.redisson.SyncOperation; -import org.redisson.client.RedisAskException; -import org.redisson.client.RedisConnection; -import org.redisson.client.RedisException; -import org.redisson.client.RedisLoadingException; -import org.redisson.client.RedisMovedException; -import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.NodeSource; -import org.redisson.connection.NodeSource.Redirect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; /** * @@ -100,70 +90,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx return get(res); } - @Override - public R write(String key, Codec codec, SyncOperation operation) { - int slot = connectionManager.calcSlot(key); - return sync(false, codec, new NodeSource(slot), operation, 0); - } - - @Override - public R read(String key, Codec codec, SyncOperation operation) { - int slot = connectionManager.calcSlot(key); - return sync(true, codec, new NodeSource(slot), operation, 0); - } - - R sync(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation operation, int attempt) { - if (!connectionManager.getShutdownLatch().acquire()) { - throw new IllegalStateException("Redisson is shutdown"); - } - - try { - Future connectionFuture; - if (readOnlyMode) { - connectionFuture = connectionManager.connectionReadOp(source, null); - } else { - connectionFuture = connectionManager.connectionWriteOp(source, null); - } - connectionFuture.syncUninterruptibly(); - - RedisConnection connection = connectionFuture.getNow(); - - try { - return operation.execute(codec, connection); - } catch (RedisMovedException e) { - return sync(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.MOVED), operation, attempt); - } catch (RedisAskException e) { - return sync(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.ASK), operation, attempt); - } catch (RedisLoadingException e) { - return sync(readOnlyMode, codec, source, operation, attempt); - } catch (RedisTimeoutException e) { - if (attempt == connectionManager.getConfig().getRetryAttempts()) { - throw e; - } - attempt++; - return sync(readOnlyMode, codec, source, operation, attempt); - } finally { - connectionManager.getShutdownLatch().release(); - if (readOnlyMode) { - connectionManager.releaseRead(source, connection); - } else { - connectionManager.releaseWrite(source, connection); - } - } - } catch (RedisException e) { - if (attempt == connectionManager.getConfig().getRetryAttempts()) { - throw e; - } - try { - Thread.sleep(connectionManager.getConfig().getRetryInterval()); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } - attempt++; - return sync(readOnlyMode, codec, source, operation, attempt); - } - } - @Override public R write(String key, Codec codec, RedisCommand command, Object ... params) { Future res = writeAsync(key, codec, command, params); diff --git a/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java b/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java index 3dff1f5ab..18db0a135 100644 --- a/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java +++ b/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java @@ -3,7 +3,6 @@ package org.redisson; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random;