RSortedSet performance boost up to 4 times!

pull/537/head
Nikita 9 years ago
parent fb8cae7705
commit 4fcd54ec8c

@ -413,12 +413,12 @@ public class Redisson implements RedissonClient {
@Override @Override
public <V> RSortedSet<V> getSortedSet(String name) { public <V> RSortedSet<V> getSortedSet(String name) {
return new RedissonSortedSet<V>(commandExecutor, name); return new RedissonSortedSet<V>(commandExecutor, name, this);
} }
@Override @Override
public <V> RSortedSet<V> getSortedSet(String name, Codec codec) { public <V> RSortedSet<V> getSortedSet(String name, Codec codec) {
return new RedissonSortedSet<V>(codec, commandExecutor, name); return new RedissonSortedSet<V>(codec, commandExecutor, name, this);
} }
@Override @Override

@ -433,8 +433,6 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
@Override @Override
public V remove(int index) { public V remove(int index) {
checkIndex(index);
if (index == 0) { if (index == 0) {
Future<V> f = commandExecutor.writeAsync(getName(), codec, LPOP, getName()); Future<V> f = commandExecutor.writeAsync(getName(), codec, LPOP, getName());
return get(f); return get(f);
@ -442,18 +440,26 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
Future<V> f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT, Future<V> f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"local tail = redis.call('lrange', KEYS[1], ARGV[1] + 1, -1);" + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" +
"if #tail > 0 then " + "return v",
"for i=1, #tail, 5000 do "
+ "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); "
+ "end "
+ "end;" +
"return v",
Collections.<Object>singletonList(getName()), index); Collections.<Object>singletonList(getName()), index);
return get(f); return get(f);
} }
@Override
public void fastRemove(int index) {
get(fastRemoveAsync(index));
}
@Override
public Future<Void> fastRemoveAsync(int index) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');",
Collections.<Object>singletonList(getName()), index);
}
@Override @Override
public int indexOf(Object o) { public int indexOf(Object o) {
return get(indexOfAsync(o)); return get(indexOfAsync(o));

@ -19,28 +19,25 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.SortedSet; import java.util.SortedSet;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.core.RBucket;
import org.redisson.core.RLock;
import org.redisson.core.RSortedSet; import org.redisson.core.RSortedSet;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
/** /**
@ -95,34 +92,36 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
private Comparator<? super V> comparator = NaturalComparator.NATURAL_ORDER; private Comparator<? super V> comparator = NaturalComparator.NATURAL_ORDER;
CommandExecutor commandExecutor; CommandExecutor commandExecutor;
private RLock lock;
private RedissonList<V> list;
private RBucket<String> comparatorHolder;
protected RedissonSortedSet(CommandExecutor commandExecutor, String name) { protected RedissonSortedSet(CommandExecutor commandExecutor, String name, Redisson redisson) {
super(commandExecutor, name); super(commandExecutor, name);
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE);
lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}");
list = (RedissonList<V>) redisson.getList(getName());
loadComparator(); loadComparator();
} }
public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name) { public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name, Redisson redisson) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name);
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE);
lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}");
list = (RedissonList<V>) redisson.getList(getName());
loadComparator(); loadComparator();
} }
private void loadComparator() { private void loadComparator() {
commandExecutor.read(getName(), codec, new SyncOperation<Void>() {
@Override
public Void execute(Codec codec, RedisConnection conn) {
loadComparator(conn);
return null;
}
});
}
private void loadComparator(RedisConnection connection) {
try { try {
String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName()); String comparatorSign = comparatorHolder.get();
if (comparatorSign != null) { if (comparatorSign != null) {
String[] parts = comparatorSign.split(":"); String[] parts = comparatorSign.split(":");
String className = parts[0]; String className = parts[0];
@ -136,6 +135,8 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
Class<?> clazz = Class.forName(className); Class<?> clazz = Class.forName(className);
comparator = (Comparator<V>) clazz.newInstance(); comparator = (Comparator<V>) clazz.newInstance();
} }
} catch (IllegalStateException e) {
throw e;
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -163,26 +164,17 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override @Override
public int size() { public int size() {
return commandExecutor.read(getName(), codec, RedisCommands.LLEN_INT, getName()); return list.size();
}
private int size(RedisConnection connection) {
return connection.sync(RedisCommands.LLEN_INT, getName()).intValue();
} }
@Override @Override
public boolean isEmpty() { public boolean isEmpty() {
return size() == 0; return list.isEmpty();
} }
@Override @Override
public boolean contains(final Object o) { public boolean contains(final Object o) {
return commandExecutor.read(getName(), codec, new SyncOperation<Boolean>() { return binarySearch((V)o, codec).getIndex() >= 0;
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return binarySearch((V)o, codec, conn).getIndex() >= 0;
}
});
} }
public Iterator<V> iterator() { public Iterator<V> iterator() {
@ -206,7 +198,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
} }
currentIndex++; currentIndex++;
removeExecuted = false; removeExecuted = false;
currentElement = RedissonSortedSet.this.get(currentIndex); currentElement = RedissonSortedSet.this.list.getValue(currentIndex);
return currentElement; return currentElement;
} }
@ -223,57 +215,24 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}; };
} }
private V get(int index) {
return commandExecutor.read(getName(), codec, RedisCommands.LINDEX, getName(), index);
}
@Override @Override
public Object[] toArray() { public Object[] toArray() {
List<V> res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1); return list.toArray();
return res.toArray();
} }
@Override @Override
public <T> T[] toArray(T[] a) { public <T> T[] toArray(T[] a) {
List<V> res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1); return list.toArray(a);
return res.toArray(a);
} }
@Override @Override
public boolean add(final V value) { public boolean add(V value) {
return commandExecutor.write(getName(), codec, new SyncOperation<Boolean>() { lock.lock();
@Override
public Boolean execute(Codec codec, RedisConnection conn) { try {
return add(value, codec, conn); checkComparator();
}
}); BinarySearchResult<V> res = binarySearch(value, codec);
}
public Future<Boolean> addAsync(final V value) {
final Promise<Boolean> promise = commandExecutor.getConnectionManager().newPromise();
commandExecutor.getConnectionManager().getGroup().execute(new Runnable() {
public void run() {
try {
boolean res = add(value);
promise.setSuccess(res);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
return promise;
}
boolean add(V value, Codec codec, RedisConnection connection) {
while (true) {
connection.sync(RedisCommands.WATCH, getName(), getComparatorKeyName());
checkComparator(connection);
BinarySearchResult<V> res = binarySearch(value, codec, connection);
if (res.getIndex() == null) {
continue;
}
if (res.getIndex() < 0) { if (res.getIndex() < 0) {
int index = -(res.getIndex() + 1); int index = -(res.getIndex() + 1);
@ -284,45 +243,47 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
connection.sync(RedisCommands.MULTI); commandExecutor.evalWrite(getName(), RedisCommands.EVAL_VOID,
connection.sync(RedisCommands.EVAL_VOID, "local len = redis.call('llen', KEYS[1]);"
"local len = redis.call('llen', KEYS[1]);" + "if tonumber(ARGV[1]) < len then "
+ "if tonumber(ARGV[1]) < len then " + "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);"
+ "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);" + "redis.call('linsert', KEYS[1], 'before', pivot, ARGV[2]);"
+ "redis.call('linsert', KEYS[1], 'before', pivot, ARGV[2]);" + "return;"
+ "return;" + "end;"
+ "end;" + "redis.call('rpush', KEYS[1], ARGV[2]);", Arrays.<Object>asList(getName()), index, encodedValue);
+ "redis.call('rpush', KEYS[1], ARGV[2]);", 1, getName(), index, encodedValue); return true;
List<Object> re = connection.sync(codec, RedisCommands.EXEC);
if (re.size() == 1) {
return true;
}
} else { } else {
connection.sync(RedisCommands.UNWATCH);
return false; return false;
} }
} finally {
lock.unlock();
} }
} }
private void checkComparator(RedisConnection connection) { private void checkComparator() {
String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName()); String comparatorSign = comparatorHolder.get();
if (comparatorSign != null) { if (comparatorSign != null) {
String[] vals = comparatorSign.split(":"); String[] vals = comparatorSign.split(":");
String className = vals[0]; String className = vals[0];
if (!comparator.getClass().getName().equals(className)) { if (!comparator.getClass().getName().equals(className)) {
loadComparator(connection); loadComparator();
} }
} }
} }
public static double calcIncrement(double value) { public Future<Boolean> addAsync(final V value) {
BigDecimal b = BigDecimal.valueOf(value); final Promise<Boolean> promise = newPromise();
BigDecimal r = b.remainder(BigDecimal.ONE); commandExecutor.getConnectionManager().getGroup().execute(new Runnable() {
if (r.compareTo(BigDecimal.ZERO) == 0) { public void run() {
return 1; try {
} boolean res = add(value);
double res = 1/Math.pow(10, r.scale()); promise.setSuccess(res);
return res; } catch (Exception e) {
promise.setFailure(e);
}
}
});
return promise;
} }
@Override @Override
@ -346,44 +307,21 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
} }
@Override @Override
public boolean remove(final Object value) { public boolean remove(Object value) {
return commandExecutor.write(getName(), codec, new SyncOperation<Boolean>() { lock.lock();
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return remove(value, codec, conn);
}
});
}
boolean remove(Object value, Codec codec, RedisConnection conn) { try {
while (true) { checkComparator();
conn.sync(RedisCommands.WATCH, getName());
BinarySearchResult<V> res = binarySearch((V) value, codec, conn); BinarySearchResult<V> res = binarySearch((V) value, codec);
if (res.getIndex() == null) {
conn.sync(RedisCommands.UNWATCH);
continue;
}
if (res.getIndex() < 0) { if (res.getIndex() < 0) {
conn.sync(RedisCommands.UNWATCH);
return false; return false;
} }
conn.sync(RedisCommands.MULTI); list.remove((int)res.getIndex());
if (res.getIndex() == 0) { return true;
conn.sync(codec, RedisCommands.LPOP, getName()); } finally {
} else { lock.unlock();
conn.sync(RedisCommands.EVAL_VOID,
"local len = redis.call('llen', KEYS[1]);"
+ "local tail = redis.call('lrange', KEYS[1], tonumber(ARGV[1]) + 1, len);"
+ "redis.call('ltrim', KEYS[1], 0, tonumber(ARGV[1]) - 1);"
+ "if #tail > 0 then "
+ "redis.call('rpush', KEYS[1], unpack(tail)); "
+ "end;", 1, getName(), res.getIndex());
}
if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 1) {
return true;
}
} }
} }
@ -460,7 +398,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override @Override
public V first() { public V first() {
V res = commandExecutor.read(getName(), codec, RedisCommands.LINDEX, getName(), 0); V res = list.getValue(0);
if (res == null) { if (res == null) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
@ -469,7 +407,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override @Override
public V last() { public V last() {
V res = commandExecutor.read(getName(), codec, RedisCommands.LINDEX, getName(), -1); V res = list.getValue(-1);
if (res == null) { if (res == null) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
@ -477,7 +415,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
} }
private String getComparatorKeyName() { private String getComparatorKeyName() {
return "redisson__sortedset__comparator__{" + getName() + "}"; return "redisson_sortedset_comparator:{" + getName() + "}";
} }
@Override @Override
@ -498,15 +436,15 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
} }
return res; return res;
} }
public BinarySearchResult<V> binarySearch(V value, Codec codec, RedisConnection connection) { public BinarySearchResult<V> binarySearch(V value, Codec codec) {
int size = size(connection); int size = list.size();
int upperIndex = size - 1; int upperIndex = size - 1;
int lowerIndex = 0; int lowerIndex = 0;
while (lowerIndex <= upperIndex) { while (lowerIndex <= upperIndex) {
int index = lowerIndex + (upperIndex - lowerIndex) / 2; int index = lowerIndex + (upperIndex - lowerIndex) / 2;
V res = connection.sync(codec, RedisCommands.LINDEX, getName(), index); V res = list.getValue(index);
if (res == null) { if (res == null) {
return new BinarySearchResult<V>(); return new BinarySearchResult<V>();
} }

@ -1,25 +0,0 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
public interface SyncOperation<R> {
R execute(Codec codec, RedisConnection conn);
}

@ -18,7 +18,6 @@ package org.redisson.command;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import org.redisson.SyncOperation;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
@ -42,10 +41,6 @@ public interface CommandSyncExecutor {
<T, R> R read(String key, RedisCommand<T> command, Object ... params); <T, R> R read(String key, RedisCommand<T> command, Object ... params);
<R> R read(String key, Codec codec, SyncOperation<R> operation);
<R> R write(String key, Codec codec, SyncOperation<R> operation);
<T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

@ -18,23 +18,13 @@ package org.redisson.command;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import org.redisson.SyncOperation;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
/** /**
* *
@ -100,70 +90,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx
return get(res); return get(res);
} }
@Override
public <R> R write(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return sync(false, codec, new NodeSource(slot), operation, 0);
}
@Override
public <R> R read(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return sync(true, codec, new NodeSource(slot), operation, 0);
}
<R> R sync(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation<R> operation, int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
throw new IllegalStateException("Redisson is shutdown");
}
try {
Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, null);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, null);
}
connectionFuture.syncUninterruptibly();
RedisConnection connection = connectionFuture.getNow();
try {
return operation.execute(codec, connection);
} catch (RedisMovedException e) {
return sync(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.MOVED), operation, attempt);
} catch (RedisAskException e) {
return sync(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.ASK), operation, attempt);
} catch (RedisLoadingException e) {
return sync(readOnlyMode, codec, source, operation, attempt);
} catch (RedisTimeoutException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
attempt++;
return sync(readOnlyMode, codec, source, operation, attempt);
} finally {
connectionManager.getShutdownLatch().release();
if (readOnlyMode) {
connectionManager.releaseRead(source, connection);
} else {
connectionManager.releaseWrite(source, connection);
}
}
} catch (RedisException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
try {
Thread.sleep(connectionManager.getConfig().getRetryInterval());
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
attempt++;
return sync(readOnlyMode, codec, source, operation, attempt);
}
}
@Override @Override
public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> res = writeAsync(key, codec, command, params); Future<R> res = writeAsync(key, codec, command, params);

@ -3,7 +3,6 @@ package org.redisson;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;

Loading…
Cancel
Save