diff --git a/pom.xml b/pom.xml index 8907f379a..106c3a909 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.redisson redisson - 2.2.10-SNAPSHOT + 2.2.11-SNAPSHOT bundle Redisson diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 648beb275..f1fdf2953 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -58,10 +58,10 @@ import org.redisson.core.RListMultimapCache; import org.redisson.core.RLock; import org.redisson.core.RMap; import org.redisson.core.RMapCache; -import org.redisson.core.RMultimapCache; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; import org.redisson.core.RReadWriteLock; +import org.redisson.core.RRemoteService; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSemaphore; @@ -369,6 +369,10 @@ public class Redisson implements RedissonClient { return new RedissonScript(commandExecutor); } + public RRemoteService getRemoteSerivce() { + return new RedissonRemoteService(this); + } + @Override public RSortedSet getSortedSet(String name) { return new RedissonSortedSet(commandExecutor, name); diff --git a/src/main/java/org/redisson/RedissonBloomFilter.java b/src/main/java/org/redisson/RedissonBloomFilter.java index 6b6913039..06fb7b6db 100644 --- a/src/main/java/org/redisson/RedissonBloomFilter.java +++ b/src/main/java/org/redisson/RedissonBloomFilter.java @@ -200,7 +200,7 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF @Override public Future deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getConfigName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getConfigName()); } private void readConfig() { diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index 270877e9f..edb10dced 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -45,6 +45,7 @@ import org.redisson.core.RMapCache; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; import org.redisson.core.RReadWriteLock; +import org.redisson.core.RRemoteService; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSemaphore; @@ -585,6 +586,13 @@ public interface RedissonClient { */ RScript getScript(); + /** + * Returns object for remote operations + * + * @return + */ + RRemoteService getRemoteSerivce(); + /** * Return batch object which executes group of * command in pipeline. diff --git a/src/main/java/org/redisson/RedissonListMultimapCache.java b/src/main/java/org/redisson/RedissonListMultimapCache.java index 62a4800ec..2ba95e527 100644 --- a/src/main/java/org/redisson/RedissonListMultimapCache.java +++ b/src/main/java/org/redisson/RedissonListMultimapCache.java @@ -22,10 +22,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.command.CommandAsyncExecutor; import org.redisson.core.RListMultimapCache; @@ -39,16 +36,18 @@ import io.netty.util.concurrent.Future; */ public class RedissonListMultimapCache extends RedissonListMultimap implements RListMultimapCache { - private static final RedisCommand EVAL_EXPIRE_KEY = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY); + private final RedissonMultimapCache baseCache; RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); + baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); + baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } public Future containsKeyAsync(Object key) { @@ -207,20 +206,27 @@ public class RedissonListMultimapCache extends RedissonListMultimap @Override public Future expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) { - long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); - - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_EXPIRE_KEY, - "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " - + "if tonumber(ARGV[1]) > 0 then " - + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + - "else " + - "redis.call('zrem', KEYS[2], ARGV[2]); " - + "end; " - + "return 1; " - + "else " - + "return 0; " - + "end", - Arrays.asList(getName(), getTimeoutSetName()), ttlTimeout, key); + return baseCache.expireKeyAsync(key, timeToLive, timeUnit); + } + + @Override + public Future deleteAsync() { + return baseCache.deleteAsync(); + } + + @Override + public Future expireAsync(long timeToLive, TimeUnit timeUnit) { + return baseCache.expireAsync(timeToLive, timeUnit); + } + + @Override + public Future expireAtAsync(long timestamp) { + return baseCache.expireAtAsync(timestamp); + } + + @Override + public Future clearExpireAsync() { + return baseCache.clearExpireAsync(); } } diff --git a/src/main/java/org/redisson/RedissonMapCache.java b/src/main/java/org/redisson/RedissonMapCache.java index 8900fd119..d529c2b03 100644 --- a/src/main/java/org/redisson/RedissonMapCache.java +++ b/src/main/java/org/redisson/RedissonMapCache.java @@ -655,7 +655,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override public Future deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()); } @Override diff --git a/src/main/java/org/redisson/RedissonMultimap.java b/src/main/java/org/redisson/RedissonMultimap.java index 475660b82..a072520a5 100644 --- a/src/main/java/org/redisson/RedissonMultimap.java +++ b/src/main/java/org/redisson/RedissonMultimap.java @@ -20,15 +20,18 @@ import java.net.InetSocketAddress; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.Set; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; @@ -152,10 +155,12 @@ public abstract class RedissonMultimap extends RedissonExpirable implement return new EntrySet(); } + @Override public long fastRemove(K ... keys) { return get(fastRemoveAsync(keys)); } + @Override public Future fastRemoveAsync(K ... keys) { if (keys == null || keys.length == 0) { return newSucceededFuture(0L); @@ -184,6 +189,69 @@ public abstract class RedissonMultimap extends RedissonExpirable implement throw new RuntimeException(e); } } + + @Override + public Future deleteAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_AMOUNT, + "local entries = redis.call('hgetall', KEYS[1]); " + + "local keys = {KEYS[1]}; " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "table.insert(keys, name); " + + "end;" + + "end; " + + + "local n = 0 " + + "for i=1, #keys,5000 do " + + "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) " + + "end; " + + "return n;", + Arrays.asList(getName())); + } + + @Override + public Future expireAsync(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local entries = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('pexpire', name, ARGV[1]); " + + "end;" + + "end; " + + "return redis.call('pexpire', KEYS[1], ARGV[1]); ", + Arrays.asList(getName()), timeUnit.toMillis(timeToLive)); + } + + @Override + public Future expireAtAsync(long timestamp) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local entries = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('pexpireat', name, ARGV[1]); " + + "end;" + + "end; " + + "return redis.call('pexpireat', KEYS[1], ARGV[1]); ", + Arrays.asList(getName()), timestamp); + } + + @Override + public Future clearExpireAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local entries = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('persist', name); " + + "end;" + + "end; " + + "return redis.call('persist', KEYS[1]); ", + Arrays.asList(getName())); + } + MapScanResult scanIterator(InetSocketAddress client, long startPos) { Future> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); diff --git a/src/main/java/org/redisson/RedissonMultimapCache.java b/src/main/java/org/redisson/RedissonMultimapCache.java new file mode 100644 index 000000000..765ab8021 --- /dev/null +++ b/src/main/java/org/redisson/RedissonMultimapCache.java @@ -0,0 +1,129 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.command.CommandAsyncExecutor; + +import io.netty.util.concurrent.Future; + +public class RedissonMultimapCache { + + private static final RedisCommand EVAL_EXPIRE_KEY = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY); + + private final CommandAsyncExecutor commandExecutor; + private final String name; + private final Codec codec; + private final String timeoutSetName; + + public RedissonMultimapCache(CommandAsyncExecutor commandExecutor, String name, Codec codec, String timeoutSetName) { + this.commandExecutor = commandExecutor; + this.name = name; + this.codec = codec; + this.timeoutSetName = timeoutSetName; + } + + public Future expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) { + long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); + + return commandExecutor.evalWriteAsync(name, codec, EVAL_EXPIRE_KEY, + "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " + + "if tonumber(ARGV[1]) > 0 then " + + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + + "else " + + "redis.call('zrem', KEYS[2], ARGV[2]); " + + "end; " + + "return 1; " + + "else " + + "return 0; " + + "end", + Arrays.asList(name, timeoutSetName), ttlTimeout, key); + } + + public Future deleteAsync() { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_AMOUNT, + "local entries = redis.call('hgetall', KEYS[1]); " + + "local keys = {KEYS[1], KEYS[2]}; " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "table.insert(keys, name); " + + "end;" + + "end; " + + + "local n = 0 " + + "for i=1, #keys,5000 do " + + "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) " + + "end; " + + "return n;", + Arrays.asList(name, timeoutSetName)); + } + + public Future expireAsync(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag'); " + + "local entries = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('pexpire', name, ARGV[1]); " + + "end;" + + "end; " + + "redis.call('pexpire', KEYS[2], ARGV[1]); " + + "return redis.call('pexpire', KEYS[1], ARGV[1]); ", + Arrays.asList(name, timeoutSetName), timeUnit.toMillis(timeToLive)); + } + + public Future expireAtAsync(long timestamp) { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" + + "local entries = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('pexpireat', name, ARGV[1]); " + + "end;" + + "end; " + + "redis.call('pexpireat', KEYS[2], ARGV[1]); " + + "return redis.call('pexpireat', KEYS[1], ARGV[1]); ", + Arrays.asList(name, timeoutSetName), timestamp); + } + + public Future clearExpireAsync() { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('zrem', KEYS[2], 'redisson__expiretag'); " + + "local entries = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('persist', name); " + + "end;" + + "end; " + + "redis.call('persist', KEYS[2]); " + + "return redis.call('persist', KEYS[1]); ", + Arrays.asList(name, timeoutSetName)); + } + + +} diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java new file mode 100644 index 000000000..b80124260 --- /dev/null +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -0,0 +1,165 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.client.RedisException; +import org.redisson.client.RedisTimeoutException; +import org.redisson.core.MessageListener; +import org.redisson.core.RBlockingQueue; +import org.redisson.core.RRemoteService; +import org.redisson.core.RTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBufUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.ThreadLocalRandom; + +public class RedissonRemoteService implements RRemoteService { + + private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class); + + private final Map beans = PlatformDependent.newConcurrentHashMap(); + + private final Redisson redisson; + + public RedissonRemoteService(Redisson redisson) { + this.redisson = redisson; + } + + @Override + public void register(Class remoteInterface, T object) { + register(remoteInterface, object, 1); + } + + @Override + public void register(Class remoteInterface, T object, int executorsAmount) { + if (executorsAmount < 1) { + throw new IllegalArgumentException("executorsAmount can't be lower than 1"); + } + for (Method method : remoteInterface.getMethods()) { + RemoteServiceMethod value = new RemoteServiceMethod(method, object); + RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName()); + if (beans.put(key, value) != null) { + return; + } + } + + for (int i = 0; i < executorsAmount; i++) { + String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}"; + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + subscribe(remoteInterface, requestQueue); + } + } + + private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue) { + Future take = requestQueue.takeAsync(); + take.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + RemoteServiceRequest request = future.getNow(); + RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); + String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + request.getRequestId(); + RTopic topic = redisson.getTopic(responseName); + RemoteServiceResponse response; + try { + Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); + response = new RemoteServiceResponse(result); + } catch (Exception e) { + response = new RemoteServiceResponse(e.getCause()); + log.error("Can't execute: " + request, e); + } + + long clients = topic.publish(response); + if (clients == 0) { + log.error("None of clients has not received a response: {} for request: {}", response, request); + } + + subscribe(remoteInterface, requestQueue); + } + }); + } + + @Override + public T get(Class remoteInterface) { + return get(remoteInterface, -1, null); + } + + @Override + public T get(final Class remoteInterface, final int timeout, final TimeUnit timeUnit) { + InvocationHandler handler = new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String requestId = generateRequestId(); + + String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}"; + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args); + requestQueue.add(request); + + String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId; + final RTopic topic = redisson.getTopic(responseName); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference response = new AtomicReference(); + int listenerId = topic.addListener(new MessageListener() { + @Override + public void onMessage(String channel, RemoteServiceResponse msg) { + response.set(msg); + latch.countDown(); + } + }); + + if (timeout == -1) { + latch.await(); + } else { + if (!latch.await(timeout, timeUnit)) { + topic.removeListener(listenerId); + throw new RedisTimeoutException("No response after " + timeUnit.toMillis(timeout) + "ms for request: " + request); + } + } + topic.removeListener(listenerId); + RemoteServiceResponse msg = response.get(); + if (msg.getError() != null) { + throw msg.getError(); + } + return msg.getResult(); + } + }; + return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler); + } + + private String generateRequestId() { + byte[] id = new byte[16]; + // TODO JDK UPGRADE replace to native ThreadLocalRandom + ThreadLocalRandom.current().nextBytes(id); + return ByteBufUtil.hexDump(id); + } + +} diff --git a/src/main/java/org/redisson/RedissonSetCache.java b/src/main/java/org/redisson/RedissonSetCache.java index 8a9083c75..97f8526a7 100644 --- a/src/main/java/org/redisson/RedissonSetCache.java +++ b/src/main/java/org/redisson/RedissonSetCache.java @@ -474,7 +474,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< @Override public Future deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()); } @Override diff --git a/src/main/java/org/redisson/RedissonSetMultimapCache.java b/src/main/java/org/redisson/RedissonSetMultimapCache.java index 9b7dee226..4254d3ed6 100644 --- a/src/main/java/org/redisson/RedissonSetMultimapCache.java +++ b/src/main/java/org/redisson/RedissonSetMultimapCache.java @@ -22,10 +22,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.command.CommandAsyncExecutor; import org.redisson.core.RSetMultimapCache; @@ -39,16 +36,18 @@ import io.netty.util.concurrent.Future; */ public class RedissonSetMultimapCache extends RedissonSetMultimap implements RSetMultimapCache { - private static final RedisCommand EVAL_EXPIRE_KEY = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY); + private final RedissonMultimapCache baseCache; RedissonSetMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); + baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } RedissonSetMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); + baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } public Future containsKeyAsync(Object key) { @@ -199,20 +198,27 @@ public class RedissonSetMultimapCache extends RedissonSetMultimap im @Override public Future expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) { - long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); - - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_EXPIRE_KEY, - "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " - + "if tonumber(ARGV[1]) > 0 then " - + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + - "else " + - "redis.call('zrem', KEYS[2], ARGV[2]); " - + "end; " - + "return 1; " - + "else " - + "return 0; " - + "end", - Arrays.asList(getName(), getTimeoutSetName()), ttlTimeout, key); + return baseCache.expireKeyAsync(key, timeToLive, timeUnit); + } + + @Override + public Future deleteAsync() { + return baseCache.deleteAsync(); + } + + @Override + public Future expireAsync(long timeToLive, TimeUnit timeUnit) { + return baseCache.expireAsync(timeToLive, timeUnit); + } + + @Override + public Future expireAtAsync(long timestamp) { + return baseCache.expireAtAsync(timestamp); + } + + @Override + public Future clearExpireAsync() { + return baseCache.clearExpireAsync(); } } diff --git a/src/main/java/org/redisson/RemoteServiceKey.java b/src/main/java/org/redisson/RemoteServiceKey.java new file mode 100644 index 000000000..5cadb3a5a --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceKey.java @@ -0,0 +1,68 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +public class RemoteServiceKey { + + private final Class serviceInterface; + private final String methodName; + + public RemoteServiceKey(Class serviceInterface, String methodName) { + super(); + this.serviceInterface = serviceInterface; + this.methodName = methodName; + } + + public String getMethodName() { + return methodName; + } + + public Class getServiceInterface() { + return serviceInterface; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((methodName == null) ? 0 : methodName.hashCode()); + result = prime * result + ((serviceInterface == null) ? 0 : serviceInterface.getName().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + RemoteServiceKey other = (RemoteServiceKey) obj; + if (methodName == null) { + if (other.methodName != null) + return false; + } else if (!methodName.equals(other.methodName)) + return false; + if (serviceInterface == null) { + if (other.serviceInterface != null) + return false; + } else if (!serviceInterface.equals(other.serviceInterface)) + return false; + return true; + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceMethod.java b/src/main/java/org/redisson/RemoteServiceMethod.java new file mode 100644 index 000000000..153c82d19 --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceMethod.java @@ -0,0 +1,39 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.lang.reflect.Method; + +public class RemoteServiceMethod { + + private final Object bean; + private final Method method; + + public RemoteServiceMethod(Method method, Object bean) { + super(); + this.method = method; + this.bean = bean; + } + + public Object getBean() { + return bean; + } + + public Method getMethod() { + return method; + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceRequest.java b/src/main/java/org/redisson/RemoteServiceRequest.java new file mode 100644 index 000000000..434784456 --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceRequest.java @@ -0,0 +1,54 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Arrays; + +public class RemoteServiceRequest { + + private String requestId; + private String methodName; + private Object[] args; + + public RemoteServiceRequest() { + } + + public RemoteServiceRequest(String requestId, String methodName, Object[] args) { + super(); + this.requestId = requestId; + this.methodName = methodName; + this.args = args; + } + + public String getRequestId() { + return requestId; + } + + public Object[] getArgs() { + return args; + } + + public String getMethodName() { + return methodName; + } + + @Override + public String toString() { + return "RemoteServiceRequest[requestId=" + requestId + ", methodName=" + methodName + ", args=" + + Arrays.toString(args) + "]"; + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceResponse.java b/src/main/java/org/redisson/RemoteServiceResponse.java new file mode 100644 index 000000000..09ebc5999 --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceResponse.java @@ -0,0 +1,47 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +public class RemoteServiceResponse { + + private Object result; + private Throwable error; + + public RemoteServiceResponse() { + } + + public RemoteServiceResponse(Object result) { + this.result = result; + } + + public RemoteServiceResponse(Throwable error) { + this.error = error; + } + + public Throwable getError() { + return error; + } + + public Object getResult() { + return result; + } + + @Override + public String toString() { + return "RemoteServiceResponse [result=" + result + ", error=" + error + "]"; + } + +} diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 85ff35041..53f1c9383 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -174,8 +174,8 @@ public class RedisConnection implements RedisCommands { return closed; } - public void forceReconnect() { - channel.close(); + public ChannelFuture forceReconnectAsync() { + return channel.close(); } /** diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index f7b150e9e..d8ce3d733 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -84,12 +84,12 @@ public class CommandDecoder extends ReplayingDecoder { currentDecoder = StringCodec.INSTANCE.getValueDecoder(); } - if (log.isTraceEnabled()) { - log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); - } - if (state() == null) { state(new State()); + + if (log.isTraceEnabled()) { + log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); + } } state().setDecoderState(null); @@ -139,7 +139,11 @@ public class CommandDecoder extends ReplayingDecoder { cmd.getPromise().tryFailure(e); } if (!cmd.getPromise().isSuccess()) { - error = (RedisException) cmd.getPromise().cause(); + if (!(cmd.getPromise().cause() instanceof RedisMovedException + || cmd.getPromise().cause() instanceof RedisAskException + || cmd.getPromise().cause() instanceof RedisLoadingException)) { + error = (RedisException) cmd.getPromise().cause(); + } } } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 7d62ca62e..9528685ce 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -208,6 +208,7 @@ public interface RedisCommands { RedisStrictCommand DEL = new RedisStrictCommand("DEL"); RedisStrictCommand DBSIZE = new RedisStrictCommand("DBSIZE"); RedisStrictCommand DEL_BOOL = new RedisStrictCommand("DEL", new BooleanReplayConvertor()); + RedisStrictCommand DEL_OBJECTS = new RedisStrictCommand("DEL", new BooleanAmountReplayConvertor()); RedisStrictCommand DEL_VOID = new RedisStrictCommand("DEL", new VoidReplayConvertor()); RedisCommand GET = new RedisCommand("GET"); diff --git a/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/src/main/java/org/redisson/codec/JsonJacksonCodec.java index 711e1156f..ab518b0ed 100755 --- a/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -23,8 +23,15 @@ import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonIdentityInfo; +import com.fasterxml.jackson.annotation.JsonIdentityReference; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.ObjectIdGenerators; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.MapperFeature; @@ -32,6 +39,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder; import io.netty.buffer.ByteBuf; @@ -49,6 +57,12 @@ public class JsonJacksonCodec implements Codec { public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec(); + @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id") + @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.PUBLIC_ONLY, setterVisibility = Visibility.PUBLIC_ONLY, isGetterVisibility = Visibility.PUBLIC_ONLY) + public static class ThrowableMixIn { + + } + private final ObjectMapper mapObjectMapper = initObjectMapper(); protected ObjectMapper initObjectMapper() { @@ -111,6 +125,7 @@ public class JsonJacksonCodec implements Codec { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true); objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); + objectMapper.addMixIn(Throwable.class, ThrowableMixIn.class); } @Override diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index d8c44d2fa..faba6d1fc 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -462,6 +462,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { int timeoutTime = connectionManager.getConfig().getTimeout(); if (skipTimeout.contains(details.getCommand().getName())) { Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString()); + handleBlockingOperations(details, connection); if (popTimeout == 0) { return; } @@ -482,6 +483,29 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.setTimeout(timeout); } + private void handleBlockingOperations(final AsyncDetails details, final RedisConnection connection) { + final FutureListener listener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + details.getMainPromise().cancel(true); + } + }; + details.getMainPromise().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + connectionManager.getShutdownPromise().removeListener(listener); + if (!future.isCancelled()) { + return; + } + // cancel handling for commands from skipTimeout collection + if (details.getAttemptPromise().cancel(true)) { + connection.forceReconnectAsync(); + } + } + }); + connectionManager.getShutdownPromise().addListener(listener); + } + private void checkConnectionFuture(final NodeSource source, final AsyncDetails details) { if (details.getAttemptPromise().isDone() || details.getMainPromise().isCancelled() || details.getConnectionFuture().isCancelled()) { diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 48b2e2c98..21a275ddb 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -107,5 +107,7 @@ public interface ConnectionManager { Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); InfinitySemaphoreLatch getShutdownLatch(); + + Future getShutdownPromise(); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 1fb69f386..065e66f75 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -122,6 +122,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected final Map entries = PlatformDependent.newConcurrentHashMap(); + private final Promise shutdownPromise; + private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch(); private final Set clients = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); @@ -156,6 +158,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.socketChannelClass = NioSocketChannel.class; } this.codec = cfg.getCodec(); + this.shutdownPromise = group.next().newPromise(); this.isClusterMode = cfg.isClusterConfig(); } @@ -674,6 +677,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdown() { + shutdownPromise.trySuccess(true); shutdownLatch.closeAndAwaitUninterruptibly(); for (MasterSlaveEntry entry : entries.values()) { entry.shutdown(); @@ -731,6 +735,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public InfinitySemaphoreLatch getShutdownLatch() { return shutdownLatch; } + + @Override + public Future getShutdownPromise() { + return shutdownPromise; + } @Override public ConnectionEventsHub getConnectionEventsHub() { diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java new file mode 100644 index 000000000..1b75c5dc3 --- /dev/null +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -0,0 +1,58 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.concurrent.TimeUnit; + +public interface RRemoteService { + + /** + * Register remote service with single executor + * + * @param remoteInterface + * @param object + */ + void register(Class remoteInterface, T object); + + /** + * Register remote service with custom executors amount + * + * @param remoteInterface + * @param object + * @param executorsAmount + */ + void register(Class remoteInterface, T object, int executorsAmount); + + /** + * Get remote service object for remote invocations + * + * @param remoteInterface + * @return + */ + T get(Class remoteInterface); + + /** + * Get remote service object for remote invocations + * with specified invocation timeout + * + * @param remoteInterface + * @param timeout - invocation timeout + * @param timeUnit + * @return + */ + T get(Class remoteInterface, int timeout, TimeUnit timeUnit); + +} diff --git a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 183f9e460..85acad239 100644 --- a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -361,7 +361,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im @Override public Publisher delete() { - return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); + return commandExecutor.writeReactive(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 3bb427930..bce238a28 100644 --- a/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -305,7 +305,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple @Override public Publisher delete() { - return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); + return commandExecutor.writeReactive(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()); } @Override diff --git a/src/test/java/org/redisson/BaseTest.java b/src/test/java/org/redisson/BaseTest.java index e1190e384..48c549c92 100644 --- a/src/test/java/org/redisson/BaseTest.java +++ b/src/test/java/org/redisson/BaseTest.java @@ -1,10 +1,8 @@ package org.redisson; -import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; -import org.redisson.client.codec.StringCodec; -import org.redisson.codec.MsgPackJacksonCodec; public abstract class BaseTest { @@ -43,9 +41,9 @@ public abstract class BaseTest { return Redisson.create(config); } - @After - public void after() { - redisson.flushdb(); + @Before + public void before() { + redisson.getKeys().flushall(); } } diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java index d28a5d3da..5fab2294d 100644 --- a/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -1,9 +1,6 @@ package org.redisson; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.*; import java.util.ArrayList; import java.util.HashSet; @@ -22,8 +19,47 @@ import org.junit.Assert; import org.junit.Test; import org.redisson.core.RBlockingQueue; +import io.netty.util.concurrent.Future; + public class RedissonBlockingQueueTest extends BaseTest { + @Test + public void testTakeAsyncCancel() { + Config config = createConfig(); + config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1); + + RedissonClient redisson = Redisson.create(config); + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + for (int i = 0; i < 10; i++) { + Future f = queue1.takeAsync(); + f.cancel(true); + } + assertThat(queue1.add(1)).isTrue(); + assertThat(queue1.add(2)).isTrue(); + assertThat(queue1.size()).isEqualTo(2); + + redisson.shutdown(); + } + + @Test + public void testPollAsyncCancel() { + Config config = createConfig(); + config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1); + + RedissonClient redisson = Redisson.create(config); + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + for (int i = 0; i < 10; i++) { + Future f = queue1.pollAsync(1, TimeUnit.SECONDS); + f.cancel(true); + } + assertThat(queue1.add(1)).isTrue(); + assertThat(queue1.add(2)).isTrue(); + assertThat(queue1.size()).isEqualTo(2); + + redisson.shutdown(); + } + + @Test public void testPollFromAny() throws InterruptedException { final RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); @@ -225,14 +261,14 @@ public class RedissonBlockingQueueTest extends BaseTest { try { // blocking int item = queue.take(); - assertTrue(item > 0 && item <= total); + assertThat(item > 0 && item <= total).isTrue(); } catch (InterruptedException exception) { - fail(); + Assert.fail(); } count++; } - assertThat(counter.get(), equalTo(total)); + assertThat(counter.get()).isEqualTo(total); queue.delete(); } diff --git a/src/test/java/org/redisson/RedissonKeysTest.java b/src/test/java/org/redisson/RedissonKeysTest.java index 26f22b751..4380d26df 100644 --- a/src/test/java/org/redisson/RedissonKeysTest.java +++ b/src/test/java/org/redisson/RedissonKeysTest.java @@ -1,14 +1,12 @@ package org.redisson; -import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.*; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Set; -import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; import org.redisson.core.RBucket; @@ -26,7 +24,7 @@ public class RedissonKeysTest extends BaseTest { Iterator iterator = redisson.getKeys().getKeysByPattern("test?").iterator(); for (; iterator.hasNext();) { String key = iterator.next(); - MatcherAssert.assertThat(key, Matchers.isOneOf("test1", "test2")); + assertThat(key).isIn("test1", "test2"); } } @@ -57,7 +55,7 @@ public class RedissonKeysTest extends BaseTest { RBucket bucket2 = redisson.getBucket("test2"); bucket2.set("someValue2"); - MatcherAssert.assertThat(redisson.getKeys().randomKey(), Matchers.isOneOf("test1", "test2")); + assertThat(redisson.getKeys().randomKey()).isIn("test1", "test2"); redisson.getKeys().delete("test1"); Assert.assertEquals(redisson.getKeys().randomKey(), "test2"); redisson.flushdb(); @@ -95,10 +93,10 @@ public class RedissonKeysTest extends BaseTest { map.fastPut("1", "2"); Collection keys = redisson.getKeys().findKeysByPattern("test?"); - MatcherAssert.assertThat(keys, Matchers.containsInAnyOrder("test1", "test2")); + assertThat(keys).containsOnly("test1", "test2"); Collection keys2 = redisson.getKeys().findKeysByPattern("test"); - MatcherAssert.assertThat(keys2, Matchers.empty()); + assertThat(keys2).isEmpty(); } @Test diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java new file mode 100644 index 000000000..c8daedd90 --- /dev/null +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -0,0 +1,108 @@ +package org.redisson; + +import org.junit.Assert; +import org.junit.Test; +import org.redisson.client.RedisTimeoutException; + +import static org.assertj.core.api.Assertions.*; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class RedissonRemoteServiceTest extends BaseTest { + + public interface RemoteInterface { + + void voidMethod(String name, Long param); + + Long resultMethod(Long value); + + void errorMethod() throws IOException; + + void errorMethodWithCause(); + + void timeoutMethod() throws InterruptedException; + + } + + public class RemoteImpl implements RemoteInterface { + + @Override + public void voidMethod(String name, Long param) { + System.out.println(name + " " + param); + } + + @Override + public Long resultMethod(Long value) { + return value*2; + } + + @Override + public void errorMethod() throws IOException { + throw new IOException("Checking error throw"); + } + + @Override + public void errorMethodWithCause() { + try { + int s = 2 / 0; + } catch (Exception e) { + throw new RuntimeException("Checking error throw", e); + } + } + + @Override + public void timeoutMethod() throws InterruptedException { + Thread.sleep(2000); + } + + + } + + @Test(expected = RedisTimeoutException.class) + public void testTimeout() throws InterruptedException { + RedissonClient r1 = Redisson.create(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RedissonClient r2 = Redisson.create(); + RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, 1, TimeUnit.SECONDS); + + try { + ri.timeoutMethod(); + } finally { + r1.shutdown(); + r2.shutdown(); + } + } + + @Test + public void testInvocations() { + RedissonClient r1 = Redisson.create(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RedissonClient r2 = Redisson.create(); + RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class); + + ri.voidMethod("someName", 100L); + assertThat(ri.resultMethod(100L)).isEqualTo(200); + + try { + ri.errorMethod(); + Assert.fail(); + } catch (IOException e) { + assertThat(e.getMessage()).isEqualTo("Checking error throw"); + } + + try { + ri.errorMethodWithCause(); + Assert.fail(); + } catch (Exception e) { + assertThat(e.getCause()).isInstanceOf(ArithmeticException.class); + assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); + } + + r1.shutdown(); + r2.shutdown(); + } + +} diff --git a/src/test/java/org/redisson/RedissonSetCacheTest.java b/src/test/java/org/redisson/RedissonSetCacheTest.java index a864425fd..9d7c71188 100644 --- a/src/test/java/org/redisson/RedissonSetCacheTest.java +++ b/src/test/java/org/redisson/RedissonSetCacheTest.java @@ -31,6 +31,15 @@ public class RedissonSetCacheTest extends BaseTest { } } + + @Test + public void testDelete() { + RSetCache set = redisson.getSetCache("set"); + assertThat(set.delete()).isFalse(); + set.add(1, 1, TimeUnit.SECONDS); + assertThat(set.delete()).isTrue(); + assertThat(set.delete()).isFalse(); + } @Test public void testEmptyReadAll() { diff --git a/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java b/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java index 7fe68f557..442855b8c 100644 --- a/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java +++ b/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java @@ -1,15 +1,13 @@ package org.redisson; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import java.util.Arrays; import java.util.concurrent.TimeUnit; -import org.junit.Assert; import org.junit.Test; -import org.redisson.codec.MsgPackJacksonCodec; import org.redisson.core.RMultimapCache; -import org.redisson.core.RSetCache; +import org.redisson.core.RSetMultimap; public class RedissonSetMultimapCacheTest extends BaseTest { @@ -137,5 +135,56 @@ public class RedissonSetMultimapCacheTest extends BaseTest { } + @Test + public void testExpire() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimapCache("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expire(100, TimeUnit.MILLISECONDS); + + Thread.sleep(500); + + assertThat(map.size()).isZero(); + } + + @Test + public void testExpireAt() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimapCache("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expireAt(System.currentTimeMillis() + 100); + + Thread.sleep(500); + + assertThat(map.size()).isZero(); + } + + @Test + public void testClearExpire() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimapCache("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expireAt(System.currentTimeMillis() + 100); + + map.clearExpire(); + + Thread.sleep(500); + + assertThat(map.size()).isEqualTo(2); + } + + @Test + public void testDelete() { + RSetMultimap map = redisson.getSetMultimapCache("simple"); + map.put("1", "2"); + map.put("2", "3"); + assertThat(map.delete()).isTrue(); + + RSetMultimap map2 = redisson.getSetMultimapCache("simple1"); + assertThat(map2.delete()).isFalse(); + } } diff --git a/src/test/java/org/redisson/RedissonSetMultimapTest.java b/src/test/java/org/redisson/RedissonSetMultimapTest.java index ef47fd980..c30256c53 100644 --- a/src/test/java/org/redisson/RedissonSetMultimapTest.java +++ b/src/test/java/org/redisson/RedissonSetMultimapTest.java @@ -1,15 +1,17 @@ package org.redisson; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.redisson.core.RSetMultimap; -import static org.assertj.core.api.Assertions.*; public class RedissonSetMultimapTest extends BaseTest { @@ -273,5 +275,56 @@ public class RedissonSetMultimapTest extends BaseTest { assertThat(allValues).containsOnlyElementsOf(values); } + @Test + public void testExpire() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimap("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expire(100, TimeUnit.MILLISECONDS); + + Thread.sleep(500); + + assertThat(map.size()).isZero(); + } + + @Test + public void testExpireAt() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimap("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expireAt(System.currentTimeMillis() + 100); + + Thread.sleep(500); + + assertThat(map.size()).isZero(); + } + + @Test + public void testClearExpire() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimap("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expireAt(System.currentTimeMillis() + 100); + + map.clearExpire(); + + Thread.sleep(500); + + assertThat(map.size()).isEqualTo(2); + } + + @Test + public void testDelete() { + RSetMultimap map = redisson.getSetMultimap("simple"); + map.put("1", "2"); + map.put("2", "3"); + assertThat(map.delete()).isTrue(); + + RSetMultimap map2 = redisson.getSetMultimap("simple1"); + assertThat(map2.delete()).isFalse(); + } }