Merge branch 'mrniko/master' into feature/travis-ci

pull/456/head^2
Rui Gu 9 years ago
commit 4b87837111

@ -3,7 +3,7 @@
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson</artifactId> <artifactId>redisson</artifactId>
<version>2.2.10-SNAPSHOT</version> <version>2.2.11-SNAPSHOT</version>
<packaging>bundle</packaging> <packaging>bundle</packaging>
<name>Redisson</name> <name>Redisson</name>

@ -58,10 +58,10 @@ import org.redisson.core.RListMultimapCache;
import org.redisson.core.RLock; import org.redisson.core.RLock;
import org.redisson.core.RMap; import org.redisson.core.RMap;
import org.redisson.core.RMapCache; import org.redisson.core.RMapCache;
import org.redisson.core.RMultimapCache;
import org.redisson.core.RPatternTopic; import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue; import org.redisson.core.RQueue;
import org.redisson.core.RReadWriteLock; import org.redisson.core.RReadWriteLock;
import org.redisson.core.RRemoteService;
import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript; import org.redisson.core.RScript;
import org.redisson.core.RSemaphore; import org.redisson.core.RSemaphore;
@ -369,6 +369,10 @@ public class Redisson implements RedissonClient {
return new RedissonScript(commandExecutor); return new RedissonScript(commandExecutor);
} }
public RRemoteService getRemoteSerivce() {
return new RedissonRemoteService(this);
}
@Override @Override
public <V> RSortedSet<V> getSortedSet(String name) { public <V> RSortedSet<V> getSortedSet(String name) {
return new RedissonSortedSet<V>(commandExecutor, name); return new RedissonSortedSet<V>(commandExecutor, name);

@ -200,7 +200,7 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override @Override
public Future<Boolean> deleteAsync() { public Future<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getConfigName()); return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getConfigName());
} }
private void readConfig() { private void readConfig() {

@ -45,6 +45,7 @@ import org.redisson.core.RMapCache;
import org.redisson.core.RPatternTopic; import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue; import org.redisson.core.RQueue;
import org.redisson.core.RReadWriteLock; import org.redisson.core.RReadWriteLock;
import org.redisson.core.RRemoteService;
import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript; import org.redisson.core.RScript;
import org.redisson.core.RSemaphore; import org.redisson.core.RSemaphore;
@ -585,6 +586,13 @@ public interface RedissonClient {
*/ */
RScript getScript(); RScript getScript();
/**
* Returns object for remote operations
*
* @return
*/
RRemoteService getRemoteSerivce();
/** /**
* Return batch object which executes group of * Return batch object which executes group of
* command in pipeline. * command in pipeline.

@ -22,10 +22,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RListMultimapCache; import org.redisson.core.RListMultimapCache;
@ -39,16 +36,18 @@ import io.netty.util.concurrent.Future;
*/ */
public class RedissonListMultimapCache<K, V> extends RedissonListMultimap<K, V> implements RListMultimapCache<K, V> { public class RedissonListMultimapCache<K, V> extends RedissonListMultimap<K, V> implements RListMultimapCache<K, V> {
private static final RedisCommand<Boolean> EVAL_EXPIRE_KEY = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY); private final RedissonMultimapCache<K> baseCache;
RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name); super(connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
} }
RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name); super(codec, connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
} }
public Future<Boolean> containsKeyAsync(Object key) { public Future<Boolean> containsKeyAsync(Object key) {
@ -207,20 +206,27 @@ public class RedissonListMultimapCache<K, V> extends RedissonListMultimap<K, V>
@Override @Override
public Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) { public Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) {
long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); return baseCache.expireKeyAsync(key, timeToLive, timeUnit);
}
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_EXPIRE_KEY,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " @Override
+ "if tonumber(ARGV[1]) > 0 then " public Future<Boolean> deleteAsync() {
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + return baseCache.deleteAsync();
"else " + }
"redis.call('zrem', KEYS[2], ARGV[2]); "
+ "end; " @Override
+ "return 1; " public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
+ "else " return baseCache.expireAsync(timeToLive, timeUnit);
+ "return 0; " }
+ "end",
Arrays.<Object>asList(getName(), getTimeoutSetName()), ttlTimeout, key); @Override
public Future<Boolean> expireAtAsync(long timestamp) {
return baseCache.expireAtAsync(timestamp);
}
@Override
public Future<Boolean> clearExpireAsync() {
return baseCache.clearExpireAsync();
} }
} }

@ -655,7 +655,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override @Override
public Future<Boolean> deleteAsync() { public Future<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName());
} }
@Override @Override

@ -20,15 +20,18 @@ import java.net.InetSocketAddress;
import java.util.AbstractCollection; import java.util.AbstractCollection;
import java.util.AbstractSet; import java.util.AbstractSet;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.Set; import java.util.Set;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -152,10 +155,12 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
return new EntrySet(); return new EntrySet();
} }
@Override
public long fastRemove(K ... keys) { public long fastRemove(K ... keys) {
return get(fastRemoveAsync(keys)); return get(fastRemoveAsync(keys));
} }
@Override
public Future<Long> fastRemoveAsync(K ... keys) { public Future<Long> fastRemoveAsync(K ... keys) {
if (keys == null || keys.length == 0) { if (keys == null || keys.length == 0) {
return newSucceededFuture(0L); return newSucceededFuture(0L);
@ -184,6 +189,69 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@Override
public Future<Boolean> deleteAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_AMOUNT,
"local entries = redis.call('hgetall', KEYS[1]); " +
"local keys = {KEYS[1]}; " +
"for i, v in ipairs(entries) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"table.insert(keys, name); " +
"end;" +
"end; " +
"local n = 0 "
+ "for i=1, #keys,5000 do "
+ "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
+ "end; "
+ "return n;",
Arrays.<Object>asList(getName()));
}
@Override
public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local entries = redis.call('hgetall', KEYS[1]); " +
"for i, v in ipairs(entries) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"redis.call('pexpire', name, ARGV[1]); " +
"end;" +
"end; " +
"return redis.call('pexpire', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName()), timeUnit.toMillis(timeToLive));
}
@Override
public Future<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local entries = redis.call('hgetall', KEYS[1]); " +
"for i, v in ipairs(entries) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"redis.call('pexpireat', name, ARGV[1]); " +
"end;" +
"end; " +
"return redis.call('pexpireat', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName()), timestamp);
}
@Override
public Future<Boolean> clearExpireAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local entries = redis.call('hgetall', KEYS[1]); " +
"for i, v in ipairs(entries) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"redis.call('persist', name); " +
"end;" +
"end; " +
"return redis.call('persist', KEYS[1]); ",
Arrays.<Object>asList(getName()));
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) { MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);

@ -0,0 +1,129 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import io.netty.util.concurrent.Future;
public class RedissonMultimapCache<K> {
private static final RedisCommand<Boolean> EVAL_EXPIRE_KEY = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY);
private final CommandAsyncExecutor commandExecutor;
private final String name;
private final Codec codec;
private final String timeoutSetName;
public RedissonMultimapCache(CommandAsyncExecutor commandExecutor, String name, Codec codec, String timeoutSetName) {
this.commandExecutor = commandExecutor;
this.name = name;
this.codec = codec;
this.timeoutSetName = timeoutSetName;
}
public Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) {
long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive);
return commandExecutor.evalWriteAsync(name, codec, EVAL_EXPIRE_KEY,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then "
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " +
"else " +
"redis.call('zrem', KEYS[2], ARGV[2]); "
+ "end; "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(name, timeoutSetName), ttlTimeout, key);
}
public Future<Boolean> deleteAsync() {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_AMOUNT,
"local entries = redis.call('hgetall', KEYS[1]); " +
"local keys = {KEYS[1], KEYS[2]}; " +
"for i, v in ipairs(entries) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"table.insert(keys, name); " +
"end;" +
"end; " +
"local n = 0 "
+ "for i=1, #keys,5000 do "
+ "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
+ "end; "
+ "return n;",
Arrays.<Object>asList(name, timeoutSetName));
}
public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag'); " +
"local entries = redis.call('hgetall', KEYS[1]); " +
"for i, v in ipairs(entries) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"redis.call('pexpire', name, ARGV[1]); " +
"end;" +
"end; " +
"redis.call('pexpire', KEYS[2], ARGV[1]); " +
"return redis.call('pexpire', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(name, timeoutSetName), timeUnit.toMillis(timeToLive));
}
public Future<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" +
"local entries = redis.call('hgetall', KEYS[1]); " +
"for i, v in ipairs(entries) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"redis.call('pexpireat', name, ARGV[1]); " +
"end;" +
"end; " +
"redis.call('pexpireat', KEYS[2], ARGV[1]); " +
"return redis.call('pexpireat', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(name, timeoutSetName), timestamp);
}
public Future<Boolean> clearExpireAsync() {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zrem', KEYS[2], 'redisson__expiretag'); " +
"local entries = redis.call('hgetall', KEYS[1]); " +
"for i, v in ipairs(entries) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"redis.call('persist', name); " +
"end;" +
"end; " +
"redis.call('persist', KEYS[2]); " +
"return redis.call('persist', KEYS[1]); ",
Arrays.<Object>asList(name, timeoutSetName));
}
}

@ -0,0 +1,165 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.core.MessageListener;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RRemoteService;
import org.redisson.core.RTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
public class RedissonRemoteService implements RRemoteService {
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Redisson redisson;
public RedissonRemoteService(Redisson redisson) {
this.redisson = redisson;
}
@Override
public <T> void register(Class<T> remoteInterface, T object) {
register(remoteInterface, object, 1);
}
@Override
public <T> void register(Class<T> remoteInterface, T object, int executorsAmount) {
if (executorsAmount < 1) {
throw new IllegalArgumentException("executorsAmount can't be lower than 1");
}
for (Method method : remoteInterface.getMethods()) {
RemoteServiceMethod value = new RemoteServiceMethod(method, object);
RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName());
if (beans.put(key, value) != null) {
return;
}
}
for (int i = 0; i < executorsAmount; i++) {
String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
subscribe(remoteInterface, requestQueue);
}
}
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue) {
Future<RemoteServiceRequest> take = requestQueue.takeAsync();
take.addListener(new FutureListener<RemoteServiceRequest>() {
@Override
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
if (!future.isSuccess()) {
return;
}
RemoteServiceRequest request = future.getNow();
RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + request.getRequestId();
RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
RemoteServiceResponse response;
try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
response = new RemoteServiceResponse(result);
} catch (Exception e) {
response = new RemoteServiceResponse(e.getCause());
log.error("Can't execute: " + request, e);
}
long clients = topic.publish(response);
if (clients == 0) {
log.error("None of clients has not received a response: {} for request: {}", response, request);
}
subscribe(remoteInterface, requestQueue);
}
});
}
@Override
public <T> T get(Class<T> remoteInterface) {
return get(remoteInterface, -1, null);
}
@Override
public <T> T get(final Class<T> remoteInterface, final int timeout, final TimeUnit timeUnit) {
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String requestId = generateRequestId();
String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args);
requestQueue.add(request);
String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId;
final RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<RemoteServiceResponse> response = new AtomicReference<RemoteServiceResponse>();
int listenerId = topic.addListener(new MessageListener<RemoteServiceResponse>() {
@Override
public void onMessage(String channel, RemoteServiceResponse msg) {
response.set(msg);
latch.countDown();
}
});
if (timeout == -1) {
latch.await();
} else {
if (!latch.await(timeout, timeUnit)) {
topic.removeListener(listenerId);
throw new RedisTimeoutException("No response after " + timeUnit.toMillis(timeout) + "ms for request: " + request);
}
}
topic.removeListener(listenerId);
RemoteServiceResponse msg = response.get();
if (msg.getError() != null) {
throw msg.getError();
}
return msg.getResult();
}
};
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler);
}
private String generateRequestId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
}

@ -474,7 +474,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override @Override
public Future<Boolean> deleteAsync() { public Future<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName());
} }
@Override @Override

@ -22,10 +22,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RSetMultimapCache; import org.redisson.core.RSetMultimapCache;
@ -39,16 +36,18 @@ import io.netty.util.concurrent.Future;
*/ */
public class RedissonSetMultimapCache<K, V> extends RedissonSetMultimap<K, V> implements RSetMultimapCache<K, V> { public class RedissonSetMultimapCache<K, V> extends RedissonSetMultimap<K, V> implements RSetMultimapCache<K, V> {
private static final RedisCommand<Boolean> EVAL_EXPIRE_KEY = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY); private final RedissonMultimapCache<K> baseCache;
RedissonSetMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { RedissonSetMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name); super(connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
} }
RedissonSetMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { RedissonSetMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name); super(codec, connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
} }
public Future<Boolean> containsKeyAsync(Object key) { public Future<Boolean> containsKeyAsync(Object key) {
@ -199,20 +198,27 @@ public class RedissonSetMultimapCache<K, V> extends RedissonSetMultimap<K, V> im
@Override @Override
public Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) { public Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) {
long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); return baseCache.expireKeyAsync(key, timeToLive, timeUnit);
}
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_EXPIRE_KEY,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " @Override
+ "if tonumber(ARGV[1]) > 0 then " public Future<Boolean> deleteAsync() {
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + return baseCache.deleteAsync();
"else " + }
"redis.call('zrem', KEYS[2], ARGV[2]); "
+ "end; " @Override
+ "return 1; " public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
+ "else " return baseCache.expireAsync(timeToLive, timeUnit);
+ "return 0; " }
+ "end",
Arrays.<Object>asList(getName(), getTimeoutSetName()), ttlTimeout, key); @Override
public Future<Boolean> expireAtAsync(long timestamp) {
return baseCache.expireAtAsync(timestamp);
}
@Override
public Future<Boolean> clearExpireAsync() {
return baseCache.clearExpireAsync();
} }
} }

@ -0,0 +1,68 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
public class RemoteServiceKey {
private final Class<?> serviceInterface;
private final String methodName;
public RemoteServiceKey(Class<?> serviceInterface, String methodName) {
super();
this.serviceInterface = serviceInterface;
this.methodName = methodName;
}
public String getMethodName() {
return methodName;
}
public Class<?> getServiceInterface() {
return serviceInterface;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((methodName == null) ? 0 : methodName.hashCode());
result = prime * result + ((serviceInterface == null) ? 0 : serviceInterface.getName().hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RemoteServiceKey other = (RemoteServiceKey) obj;
if (methodName == null) {
if (other.methodName != null)
return false;
} else if (!methodName.equals(other.methodName))
return false;
if (serviceInterface == null) {
if (other.serviceInterface != null)
return false;
} else if (!serviceInterface.equals(other.serviceInterface))
return false;
return true;
}
}

@ -0,0 +1,39 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.lang.reflect.Method;
public class RemoteServiceMethod {
private final Object bean;
private final Method method;
public RemoteServiceMethod(Method method, Object bean) {
super();
this.method = method;
this.bean = bean;
}
public Object getBean() {
return bean;
}
public Method getMethod() {
return method;
}
}

@ -0,0 +1,54 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Arrays;
public class RemoteServiceRequest {
private String requestId;
private String methodName;
private Object[] args;
public RemoteServiceRequest() {
}
public RemoteServiceRequest(String requestId, String methodName, Object[] args) {
super();
this.requestId = requestId;
this.methodName = methodName;
this.args = args;
}
public String getRequestId() {
return requestId;
}
public Object[] getArgs() {
return args;
}
public String getMethodName() {
return methodName;
}
@Override
public String toString() {
return "RemoteServiceRequest[requestId=" + requestId + ", methodName=" + methodName + ", args="
+ Arrays.toString(args) + "]";
}
}

@ -0,0 +1,47 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
public class RemoteServiceResponse {
private Object result;
private Throwable error;
public RemoteServiceResponse() {
}
public RemoteServiceResponse(Object result) {
this.result = result;
}
public RemoteServiceResponse(Throwable error) {
this.error = error;
}
public Throwable getError() {
return error;
}
public Object getResult() {
return result;
}
@Override
public String toString() {
return "RemoteServiceResponse [result=" + result + ", error=" + error + "]";
}
}

@ -174,8 +174,8 @@ public class RedisConnection implements RedisCommands {
return closed; return closed;
} }
public void forceReconnect() { public ChannelFuture forceReconnectAsync() {
channel.close(); return channel.close();
} }
/** /**

@ -84,12 +84,12 @@ public class CommandDecoder extends ReplayingDecoder<State> {
currentDecoder = StringCodec.INSTANCE.getValueDecoder(); currentDecoder = StringCodec.INSTANCE.getValueDecoder();
} }
if (log.isTraceEnabled()) {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
}
if (state() == null) { if (state() == null) {
state(new State()); state(new State());
if (log.isTraceEnabled()) {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
}
} }
state().setDecoderState(null); state().setDecoderState(null);
@ -139,7 +139,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {
cmd.getPromise().tryFailure(e); cmd.getPromise().tryFailure(e);
} }
if (!cmd.getPromise().isSuccess()) { if (!cmd.getPromise().isSuccess()) {
error = (RedisException) cmd.getPromise().cause(); if (!(cmd.getPromise().cause() instanceof RedisMovedException
|| cmd.getPromise().cause() instanceof RedisAskException
|| cmd.getPromise().cause() instanceof RedisLoadingException)) {
error = (RedisException) cmd.getPromise().cause();
}
} }
} }

@ -208,6 +208,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> DEL = new RedisStrictCommand<Long>("DEL"); RedisStrictCommand<Long> DEL = new RedisStrictCommand<Long>("DEL");
RedisStrictCommand<Long> DBSIZE = new RedisStrictCommand<Long>("DBSIZE"); RedisStrictCommand<Long> DBSIZE = new RedisStrictCommand<Long>("DBSIZE");
RedisStrictCommand<Boolean> DEL_BOOL = new RedisStrictCommand<Boolean>("DEL", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> DEL_BOOL = new RedisStrictCommand<Boolean>("DEL", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> DEL_OBJECTS = new RedisStrictCommand<Boolean>("DEL", new BooleanAmountReplayConvertor());
RedisStrictCommand<Void> DEL_VOID = new RedisStrictCommand<Void>("DEL", new VoidReplayConvertor()); RedisStrictCommand<Void> DEL_VOID = new RedisStrictCommand<Void>("DEL", new VoidReplayConvertor());
RedisCommand<Object> GET = new RedisCommand<Object>("GET"); RedisCommand<Object> GET = new RedisCommand<Object>("GET");

@ -23,8 +23,15 @@ import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.Encoder;
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.JsonIdentityReference;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.MapperFeature;
@ -32,6 +39,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder; import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -49,6 +57,12 @@ public class JsonJacksonCodec implements Codec {
public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec(); public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec();
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.PUBLIC_ONLY, setterVisibility = Visibility.PUBLIC_ONLY, isGetterVisibility = Visibility.PUBLIC_ONLY)
public static class ThrowableMixIn {
}
private final ObjectMapper mapObjectMapper = initObjectMapper(); private final ObjectMapper mapObjectMapper = initObjectMapper();
protected ObjectMapper initObjectMapper() { protected ObjectMapper initObjectMapper() {
@ -111,6 +125,7 @@ public class JsonJacksonCodec implements Codec {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true); objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
objectMapper.addMixIn(Throwable.class, ThrowableMixIn.class);
} }
@Override @Override

@ -462,6 +462,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
int timeoutTime = connectionManager.getConfig().getTimeout(); int timeoutTime = connectionManager.getConfig().getTimeout();
if (skipTimeout.contains(details.getCommand().getName())) { if (skipTimeout.contains(details.getCommand().getName())) {
Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString()); Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString());
handleBlockingOperations(details, connection);
if (popTimeout == 0) { if (popTimeout == 0) {
return; return;
} }
@ -482,6 +483,29 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.setTimeout(timeout); details.setTimeout(timeout);
} }
private <R, V> void handleBlockingOperations(final AsyncDetails<V, R> details, final RedisConnection connection) {
final FutureListener<Boolean> listener = new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
details.getMainPromise().cancel(true);
}
};
details.getMainPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
connectionManager.getShutdownPromise().removeListener(listener);
if (!future.isCancelled()) {
return;
}
// cancel handling for commands from skipTimeout collection
if (details.getAttemptPromise().cancel(true)) {
connection.forceReconnectAsync();
}
}
});
connectionManager.getShutdownPromise().addListener(listener);
}
private <R, V> void checkConnectionFuture(final NodeSource source, private <R, V> void checkConnectionFuture(final NodeSource source,
final AsyncDetails<V, R> details) { final AsyncDetails<V, R> details) {
if (details.getAttemptPromise().isDone() || details.getMainPromise().isCancelled() || details.getConnectionFuture().isCancelled()) { if (details.getAttemptPromise().isDone() || details.getMainPromise().isCancelled() || details.getConnectionFuture().isCancelled()) {

@ -107,5 +107,7 @@ public interface ConnectionManager {
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
InfinitySemaphoreLatch getShutdownLatch(); InfinitySemaphoreLatch getShutdownLatch();
Future<Boolean> getShutdownPromise();
} }

@ -122,6 +122,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final Map<ClusterSlotRange, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap(); protected final Map<ClusterSlotRange, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();
private final Promise<Boolean> shutdownPromise;
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch(); private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
private final Set<RedisClientEntry> clients = Collections.newSetFromMap(PlatformDependent.<RedisClientEntry, Boolean>newConcurrentHashMap()); private final Set<RedisClientEntry> clients = Collections.newSetFromMap(PlatformDependent.<RedisClientEntry, Boolean>newConcurrentHashMap());
@ -156,6 +158,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.socketChannelClass = NioSocketChannel.class; this.socketChannelClass = NioSocketChannel.class;
} }
this.codec = cfg.getCodec(); this.codec = cfg.getCodec();
this.shutdownPromise = group.next().newPromise();
this.isClusterMode = cfg.isClusterConfig(); this.isClusterMode = cfg.isClusterConfig();
} }
@ -674,6 +677,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public void shutdown() { public void shutdown() {
shutdownPromise.trySuccess(true);
shutdownLatch.closeAndAwaitUninterruptibly(); shutdownLatch.closeAndAwaitUninterruptibly();
for (MasterSlaveEntry entry : entries.values()) { for (MasterSlaveEntry entry : entries.values()) {
entry.shutdown(); entry.shutdown();
@ -731,6 +735,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public InfinitySemaphoreLatch getShutdownLatch() { public InfinitySemaphoreLatch getShutdownLatch() {
return shutdownLatch; return shutdownLatch;
} }
@Override
public Future<Boolean> getShutdownPromise() {
return shutdownPromise;
}
@Override @Override
public ConnectionEventsHub getConnectionEventsHub() { public ConnectionEventsHub getConnectionEventsHub() {

@ -0,0 +1,58 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.concurrent.TimeUnit;
public interface RRemoteService {
/**
* Register remote service with single executor
*
* @param remoteInterface
* @param object
*/
<T> void register(Class<T> remoteInterface, T object);
/**
* Register remote service with custom executors amount
*
* @param remoteInterface
* @param object
* @param executorsAmount
*/
<T> void register(Class<T> remoteInterface, T object, int executorsAmount);
/**
* Get remote service object for remote invocations
*
* @param remoteInterface
* @return
*/
<T> T get(Class<T> remoteInterface);
/**
* Get remote service object for remote invocations
* with specified invocation timeout
*
* @param remoteInterface
* @param timeout - invocation timeout
* @param timeUnit
* @return
*/
<T> T get(Class<T> remoteInterface, int timeout, TimeUnit timeUnit);
}

@ -361,7 +361,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
@Override @Override
public Publisher<Boolean> delete() { public Publisher<Boolean> delete() {
return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); return commandExecutor.writeReactive(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName());
} }
@Override @Override

@ -305,7 +305,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
@Override @Override
public Publisher<Boolean> delete() { public Publisher<Boolean> delete() {
return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); return commandExecutor.writeReactive(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName());
} }
@Override @Override

@ -1,10 +1,8 @@
package org.redisson; package org.redisson;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.MsgPackJacksonCodec;
public abstract class BaseTest { public abstract class BaseTest {
@ -43,9 +41,9 @@ public abstract class BaseTest {
return Redisson.create(config); return Redisson.create(config);
} }
@After @Before
public void after() { public void before() {
redisson.flushdb(); redisson.getKeys().flushall();
} }
} }

@ -1,9 +1,6 @@
package org.redisson; package org.redisson;
import static org.hamcrest.CoreMatchers.equalTo; import static org.assertj.core.api.Assertions.*;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@ -22,8 +19,47 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.core.RBlockingQueue; import org.redisson.core.RBlockingQueue;
import io.netty.util.concurrent.Future;
public class RedissonBlockingQueueTest extends BaseTest { public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testTakeAsyncCancel() {
Config config = createConfig();
config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1);
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
for (int i = 0; i < 10; i++) {
Future<Integer> f = queue1.takeAsync();
f.cancel(true);
}
assertThat(queue1.add(1)).isTrue();
assertThat(queue1.add(2)).isTrue();
assertThat(queue1.size()).isEqualTo(2);
redisson.shutdown();
}
@Test
public void testPollAsyncCancel() {
Config config = createConfig();
config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1);
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
for (int i = 0; i < 10; i++) {
Future<Integer> f = queue1.pollAsync(1, TimeUnit.SECONDS);
f.cancel(true);
}
assertThat(queue1.add(1)).isTrue();
assertThat(queue1.add(2)).isTrue();
assertThat(queue1.size()).isEqualTo(2);
redisson.shutdown();
}
@Test @Test
public void testPollFromAny() throws InterruptedException { public void testPollFromAny() throws InterruptedException {
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany"); final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
@ -225,14 +261,14 @@ public class RedissonBlockingQueueTest extends BaseTest {
try { try {
// blocking // blocking
int item = queue.take(); int item = queue.take();
assertTrue(item > 0 && item <= total); assertThat(item > 0 && item <= total).isTrue();
} catch (InterruptedException exception) { } catch (InterruptedException exception) {
fail(); Assert.fail();
} }
count++; count++;
} }
assertThat(counter.get(), equalTo(total)); assertThat(counter.get()).isEqualTo(total);
queue.delete(); queue.delete();
} }

@ -1,14 +1,12 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.*;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.core.RBucket; import org.redisson.core.RBucket;
@ -26,7 +24,7 @@ public class RedissonKeysTest extends BaseTest {
Iterator<String> iterator = redisson.getKeys().getKeysByPattern("test?").iterator(); Iterator<String> iterator = redisson.getKeys().getKeysByPattern("test?").iterator();
for (; iterator.hasNext();) { for (; iterator.hasNext();) {
String key = iterator.next(); String key = iterator.next();
MatcherAssert.assertThat(key, Matchers.isOneOf("test1", "test2")); assertThat(key).isIn("test1", "test2");
} }
} }
@ -57,7 +55,7 @@ public class RedissonKeysTest extends BaseTest {
RBucket<String> bucket2 = redisson.getBucket("test2"); RBucket<String> bucket2 = redisson.getBucket("test2");
bucket2.set("someValue2"); bucket2.set("someValue2");
MatcherAssert.assertThat(redisson.getKeys().randomKey(), Matchers.isOneOf("test1", "test2")); assertThat(redisson.getKeys().randomKey()).isIn("test1", "test2");
redisson.getKeys().delete("test1"); redisson.getKeys().delete("test1");
Assert.assertEquals(redisson.getKeys().randomKey(), "test2"); Assert.assertEquals(redisson.getKeys().randomKey(), "test2");
redisson.flushdb(); redisson.flushdb();
@ -95,10 +93,10 @@ public class RedissonKeysTest extends BaseTest {
map.fastPut("1", "2"); map.fastPut("1", "2");
Collection<String> keys = redisson.getKeys().findKeysByPattern("test?"); Collection<String> keys = redisson.getKeys().findKeysByPattern("test?");
MatcherAssert.assertThat(keys, Matchers.containsInAnyOrder("test1", "test2")); assertThat(keys).containsOnly("test1", "test2");
Collection<String> keys2 = redisson.getKeys().findKeysByPattern("test"); Collection<String> keys2 = redisson.getKeys().findKeysByPattern("test");
MatcherAssert.assertThat(keys2, Matchers.empty()); assertThat(keys2).isEmpty();
} }
@Test @Test

@ -0,0 +1,108 @@
package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisTimeoutException;
import static org.assertj.core.api.Assertions.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class RedissonRemoteServiceTest extends BaseTest {
public interface RemoteInterface {
void voidMethod(String name, Long param);
Long resultMethod(Long value);
void errorMethod() throws IOException;
void errorMethodWithCause();
void timeoutMethod() throws InterruptedException;
}
public class RemoteImpl implements RemoteInterface {
@Override
public void voidMethod(String name, Long param) {
System.out.println(name + " " + param);
}
@Override
public Long resultMethod(Long value) {
return value*2;
}
@Override
public void errorMethod() throws IOException {
throw new IOException("Checking error throw");
}
@Override
public void errorMethodWithCause() {
try {
int s = 2 / 0;
} catch (Exception e) {
throw new RuntimeException("Checking error throw", e);
}
}
@Override
public void timeoutMethod() throws InterruptedException {
Thread.sleep(2000);
}
}
@Test(expected = RedisTimeoutException.class)
public void testTimeout() throws InterruptedException {
RedissonClient r1 = Redisson.create();
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
RedissonClient r2 = Redisson.create();
RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, 1, TimeUnit.SECONDS);
try {
ri.timeoutMethod();
} finally {
r1.shutdown();
r2.shutdown();
}
}
@Test
public void testInvocations() {
RedissonClient r1 = Redisson.create();
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
RedissonClient r2 = Redisson.create();
RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class);
ri.voidMethod("someName", 100L);
assertThat(ri.resultMethod(100L)).isEqualTo(200);
try {
ri.errorMethod();
Assert.fail();
} catch (IOException e) {
assertThat(e.getMessage()).isEqualTo("Checking error throw");
}
try {
ri.errorMethodWithCause();
Assert.fail();
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(ArithmeticException.class);
assertThat(e.getCause().getMessage()).isEqualTo("/ by zero");
}
r1.shutdown();
r2.shutdown();
}
}

@ -31,6 +31,15 @@ public class RedissonSetCacheTest extends BaseTest {
} }
} }
@Test
public void testDelete() {
RSetCache<Integer> set = redisson.getSetCache("set");
assertThat(set.delete()).isFalse();
set.add(1, 1, TimeUnit.SECONDS);
assertThat(set.delete()).isTrue();
assertThat(set.delete()).isFalse();
}
@Test @Test
public void testEmptyReadAll() { public void testEmptyReadAll() {

@ -1,15 +1,13 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.codec.MsgPackJacksonCodec;
import org.redisson.core.RMultimapCache; import org.redisson.core.RMultimapCache;
import org.redisson.core.RSetCache; import org.redisson.core.RSetMultimap;
public class RedissonSetMultimapCacheTest extends BaseTest { public class RedissonSetMultimapCacheTest extends BaseTest {
@ -137,5 +135,56 @@ public class RedissonSetMultimapCacheTest extends BaseTest {
} }
@Test
public void testExpire() throws InterruptedException {
RSetMultimap<String, String> map = redisson.getSetMultimapCache("simple");
map.put("1", "2");
map.put("2", "3");
map.expire(100, TimeUnit.MILLISECONDS);
Thread.sleep(500);
assertThat(map.size()).isZero();
}
@Test
public void testExpireAt() throws InterruptedException {
RSetMultimap<String, String> map = redisson.getSetMultimapCache("simple");
map.put("1", "2");
map.put("2", "3");
map.expireAt(System.currentTimeMillis() + 100);
Thread.sleep(500);
assertThat(map.size()).isZero();
}
@Test
public void testClearExpire() throws InterruptedException {
RSetMultimap<String, String> map = redisson.getSetMultimapCache("simple");
map.put("1", "2");
map.put("2", "3");
map.expireAt(System.currentTimeMillis() + 100);
map.clearExpire();
Thread.sleep(500);
assertThat(map.size()).isEqualTo(2);
}
@Test
public void testDelete() {
RSetMultimap<String, String> map = redisson.getSetMultimapCache("simple");
map.put("1", "2");
map.put("2", "3");
assertThat(map.delete()).isTrue();
RSetMultimap<String, String> map2 = redisson.getSetMultimapCache("simple1");
assertThat(map2.delete()).isFalse();
}
} }

@ -1,15 +1,17 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.Test; import org.junit.Test;
import org.redisson.core.RSetMultimap; import org.redisson.core.RSetMultimap;
import static org.assertj.core.api.Assertions.*;
public class RedissonSetMultimapTest extends BaseTest { public class RedissonSetMultimapTest extends BaseTest {
@ -273,5 +275,56 @@ public class RedissonSetMultimapTest extends BaseTest {
assertThat(allValues).containsOnlyElementsOf(values); assertThat(allValues).containsOnlyElementsOf(values);
} }
@Test
public void testExpire() throws InterruptedException {
RSetMultimap<String, String> map = redisson.getSetMultimap("simple");
map.put("1", "2");
map.put("2", "3");
map.expire(100, TimeUnit.MILLISECONDS);
Thread.sleep(500);
assertThat(map.size()).isZero();
}
@Test
public void testExpireAt() throws InterruptedException {
RSetMultimap<String, String> map = redisson.getSetMultimap("simple");
map.put("1", "2");
map.put("2", "3");
map.expireAt(System.currentTimeMillis() + 100);
Thread.sleep(500);
assertThat(map.size()).isZero();
}
@Test
public void testClearExpire() throws InterruptedException {
RSetMultimap<String, String> map = redisson.getSetMultimap("simple");
map.put("1", "2");
map.put("2", "3");
map.expireAt(System.currentTimeMillis() + 100);
map.clearExpire();
Thread.sleep(500);
assertThat(map.size()).isEqualTo(2);
}
@Test
public void testDelete() {
RSetMultimap<String, String> map = redisson.getSetMultimap("simple");
map.put("1", "2");
map.put("2", "3");
assertThat(map.delete()).isTrue();
RSetMultimap<String, String> map2 = redisson.getSetMultimap("simple1");
assertThat(map2.delete()).isFalse();
}
} }

Loading…
Cancel
Save