From 8b080e4be005b905a270b6fd500c8a2fff5eb4d8 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 10 Dec 2015 17:24:20 +0300 Subject: [PATCH] RSetCache with TTL added. #319 --- src/main/java/org/redisson/Redisson.java | 34 +- src/main/java/org/redisson/RedissonBatch.java | 18 + .../java/org/redisson/RedissonClient.java | 29 +- ...er.java => RedissonEvictionScheduler.java} | 13 +- src/main/java/org/redisson/RedissonMap.java | 2 +- .../java/org/redisson/RedissonMapCache.java | 12 +- src/main/java/org/redisson/RedissonSet.java | 10 +- .../java/org/redisson/RedissonSetCache.java | 513 ++++++++++++++++++ .../java/org/redisson/api/RBatchReactive.java | 9 +- .../org/redisson/api/RMapCacheReactive.java | 6 +- .../client/handler/CommandEncoder.java | 35 +- ...Encoder.java => DefaultParamsEncoder.java} | 5 +- .../client/protocol/RedisCommand.java | 2 +- .../client/protocol/RedisCommands.java | 4 +- .../command/CommandAsyncExecutor.java | 2 + src/main/java/org/redisson/core/RBatch.java | 32 +- .../java/org/redisson/core/RMapCache.java | 4 +- .../org/redisson/core/RMapCacheAsync.java | 2 +- .../java/org/redisson/core/RSetCache.java | 46 ++ .../org/redisson/core/RSetCacheAsync.java | 33 ++ .../reactive/RedissonBatchReactive.java | 2 +- .../reactive/RedissonMapCacheReactive.java | 14 +- .../org/redisson/RedissonMapCacheTest.java | 39 +- .../org/redisson/RedissonSetCacheTest.java | 354 ++++++++++++ 24 files changed, 1126 insertions(+), 94 deletions(-) rename src/main/java/org/redisson/{RedissonCacheEvictionScheduler.java => RedissonEvictionScheduler.java} (93%) create mode 100644 src/main/java/org/redisson/RedissonSetCache.java rename src/main/java/org/redisson/client/protocol/{StringParamsEncoder.java => DefaultParamsEncoder.java} (87%) create mode 100644 src/main/java/org/redisson/core/RSetCache.java create mode 100644 src/main/java/org/redisson/core/RSetCacheAsync.java create mode 100644 src/test/java/org/redisson/RedissonSetCacheTest.java diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 65bc4d595..3817bb162 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; -import java.util.concurrent.locks.ReadWriteLock; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; @@ -40,7 +39,6 @@ import org.redisson.core.RBatch; import org.redisson.core.RBitSet; import org.redisson.core.RBlockingQueue; import org.redisson.core.RBucket; -import org.redisson.core.RMapCache; import org.redisson.core.RCountDownLatch; import org.redisson.core.RDeque; import org.redisson.core.RHyperLogLog; @@ -49,12 +47,14 @@ import org.redisson.core.RLexSortedSet; import org.redisson.core.RList; import org.redisson.core.RLock; import org.redisson.core.RMap; +import org.redisson.core.RMapCache; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; import org.redisson.core.RReadWriteLock; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSet; +import org.redisson.core.RSetCache; import org.redisson.core.RSortedSet; import org.redisson.core.RTopic; @@ -143,11 +143,6 @@ public class Redisson implements RedissonClient { return new RedissonReactive(config); } - @Override - public RReadWriteLock getReadWriteLock(String name) { - return new RedissonReadWriteLock(commandExecutor, name, id); - } - @Override public RBucket getBucket(String name) { return new RedissonBucket(commandExecutor, name); @@ -196,6 +191,16 @@ public class Redisson implements RedissonClient { return new RedissonMap(commandExecutor, name); } + @Override + public RSetCache getSetCache(String name) { + return new RedissonSetCache(commandExecutor, name); + } + + @Override + public RSetCache getSetCache(String name, Codec codec) { + return new RedissonSetCache(codec, commandExecutor, name); + } + @Override public RMapCache getMapCache(String name) { return new RedissonMapCache(commandExecutor, name); @@ -216,6 +221,11 @@ public class Redisson implements RedissonClient { return new RedissonLock(commandExecutor, name, id); } + @Override + public RReadWriteLock getReadWriteLock(String name) { + return new RedissonReadWriteLock(commandExecutor, name, id); + } + @Override public RSet getSet(String name) { return new RedissonSet(commandExecutor, name); @@ -326,6 +336,11 @@ public class Redisson implements RedissonClient { return new RedissonKeys(commandExecutor); } + @Override + public RBatch createBatch() { + return new RedissonBatch(connectionManager); + } + @Override public void shutdown() { connectionManager.shutdown(); @@ -357,10 +372,5 @@ public class Redisson implements RedissonClient { commandExecutor.get(commandExecutor.writeAllAsync(RedisCommands.FLUSHALL)); } - @Override - public RBatch createBatch() { - return new RedissonBatch(connectionManager); - } - } diff --git a/src/main/java/org/redisson/RedissonBatch.java b/src/main/java/org/redisson/RedissonBatch.java index 9de636970..57fb896ba 100644 --- a/src/main/java/org/redisson/RedissonBatch.java +++ b/src/main/java/org/redisson/RedissonBatch.java @@ -36,10 +36,17 @@ import org.redisson.core.RQueueAsync; import org.redisson.core.RScoredSortedSetAsync; import org.redisson.core.RScriptAsync; import org.redisson.core.RSetAsync; +import org.redisson.core.RSetCacheAsync; import org.redisson.core.RTopicAsync; import io.netty.util.concurrent.Future; +/** + * + * + * @author Nikita Koksharov + * + */ public class RedissonBatch implements RBatch { private final CommandBatchService executorService; @@ -183,6 +190,16 @@ public class RedissonBatch implements RBatch { return new RedissonKeys(executorService); } + @Override + public RSetCacheAsync getSetCache(String name) { + return new RedissonSetCache(executorService, name); + } + + @Override + public RSetCacheAsync getSetCache(String name, Codec codec) { + return new RedissonSetCache(codec, executorService, name); + } + @Override public List execute() { return executorService.execute(); @@ -193,4 +210,5 @@ public class RedissonBatch implements RBatch { return executorService.executeAsync(); } + } diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index da710c3e5..6eadcbcd9 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -41,6 +41,7 @@ import org.redisson.core.RReadWriteLock; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSet; +import org.redisson.core.RSetCache; import org.redisson.core.RSortedSet; import org.redisson.core.RTopic; @@ -54,12 +55,28 @@ import org.redisson.core.RTopic; public interface RedissonClient { /** - * Returns readWriteLock instance by name. + * Returns set-based cache instance by name. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

* * @param name + * @param codec * @return */ - RReadWriteLock getReadWriteLock(String name); + RSetCache getSetCache(String name); + + /** + * Returns set-based cache instance by name. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

+ * + * @param name + * @param codec + * @return + */ + RSetCache getSetCache(String name, Codec codec); /** * Returns map-based cache instance by name @@ -170,6 +187,14 @@ public interface RedissonClient { */ RLock getLock(String name); + /** + * Returns readWriteLock instance by name. + * + * @param name + * @return + */ + RReadWriteLock getReadWriteLock(String name); + /** * Returns set instance by name. * diff --git a/src/main/java/org/redisson/RedissonCacheEvictionScheduler.java b/src/main/java/org/redisson/RedissonEvictionScheduler.java similarity index 93% rename from src/main/java/org/redisson/RedissonCacheEvictionScheduler.java rename to src/main/java/org/redisson/RedissonEvictionScheduler.java index a065936de..19e7e1d6d 100644 --- a/src/main/java/org/redisson/RedissonCacheEvictionScheduler.java +++ b/src/main/java/org/redisson/RedissonEvictionScheduler.java @@ -38,7 +38,9 @@ import io.netty.util.internal.PlatformDependent; * @author Nikita Koksharov * */ -public class RedissonCacheEvictionScheduler { +public class RedissonEvictionScheduler { + + public static final RedissonEvictionScheduler INSTANCE = new RedissonEvictionScheduler(); public static class RedissonCacheTask implements Runnable { @@ -86,7 +88,7 @@ public class RedissonCacheEvictionScheduler { if (sizeHistory.size() == 2) { if (sizeHistory.peekFirst() > sizeHistory.peekLast() && sizeHistory.peekLast() > size) { - delay = Math.min(maxDelay, delay*2); + delay = Math.min(maxDelay, (int)(delay*1.5)); } // if (sizeHistory.peekFirst() < sizeHistory.peekLast() @@ -97,10 +99,10 @@ public class RedissonCacheEvictionScheduler { if (sizeHistory.peekFirst() == sizeHistory.peekLast() && sizeHistory.peekLast() == size) { if (size == keysLimit) { - delay = Math.max(minDelay, delay/2); + delay = Math.max(minDelay, delay/4); } if (size == 0) { - delay = Math.min(maxDelay, delay*2); + delay = Math.min(maxDelay, (int)(delay*1.5)); } } @@ -115,6 +117,9 @@ public class RedissonCacheEvictionScheduler { } + private RedissonEvictionScheduler() { + } + private final ConcurrentMap tasks = PlatformDependent.newConcurrentHashMap(); public void schedule(String name, String timeoutSetName, CommandAsyncExecutor executor) { diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index c3b127a87..ac7b0b0f6 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -291,7 +291,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public Future fastRemoveAsync(K ... keys) { if (keys == null || keys.length == 0) { - return commandExecutor.getConnectionManager().getGroup().next().newSucceededFuture(0L); + return newSucceededFuture(0L); } List args = new ArrayList(keys.length + 1); diff --git a/src/main/java/org/redisson/RedissonMapCache.java b/src/main/java/org/redisson/RedissonMapCache.java index 8c3eed133..74a5c72d5 100644 --- a/src/main/java/org/redisson/RedissonMapCache.java +++ b/src/main/java/org/redisson/RedissonMapCache.java @@ -48,14 +48,14 @@ import io.netty.util.concurrent.Promise; /** *

Map-based cache with ability to set TTL for each entry via - * {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} + * {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} methods. * And therefore has an complex lua-scripts inside.

* - *

Current redis implementation doesnt have eviction functionality. + *

Current redis implementation doesnt have map entry eviction functionality. * Thus entries are checked for TTL expiration during any key/value/entry read operation. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * Clean task deletes removes 100 expired entries at once. - * In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler + * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * deletes expired entries in time interval between 5 seconds to 2 hours.

* *

If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.

@@ -77,16 +77,14 @@ public class RedissonMapCache extends RedissonMap implements RMapCac private static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 5, ValueType.MAP_KEY); private static final RedisCommand EVAL_REMOVE_EXPIRED = new RedisCommand("EVAL", 5); - private static final RedissonCacheEvictionScheduler SCHEDULER = new RedissonCacheEvictionScheduler(); - protected RedissonMapCache(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); - SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor); + RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); } public RedissonMapCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); - SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor); + RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); } @Override diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index f71b9a86f..831f1e2e7 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -24,9 +24,7 @@ import java.util.List; import java.util.NoSuchElementException; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.core.RSet; @@ -42,8 +40,6 @@ import io.netty.util.concurrent.Future; */ public class RedissonSet extends RedissonExpirable implements RSet { - private static final RedisCommand EVAL_OBJECTS = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4); - protected RedissonSet(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } @@ -200,7 +196,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future containsAllAsync(Collection c) { - return commandExecutor.evalReadAsync(getName(), codec, EVAL_OBJECTS, + return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local s = redis.call('smembers', KEYS[1]);" + "for i = 0, table.getn(s), 1 do " + "for j = 0, table.getn(ARGV), 1 do " @@ -236,7 +232,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future retainAllAsync(Collection c) { - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS, + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local changed = 0 " + "local s = redis.call('smembers', KEYS[1]) " + "local i = 0 " @@ -261,7 +257,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future removeAllAsync(Collection c) { - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS, + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, "local v = 0 " + "for i = 0, table.getn(ARGV), 1 do " + "if redis.call('srem', KEYS[1], ARGV[i]) == 1 " diff --git a/src/main/java/org/redisson/RedissonSetCache.java b/src/main/java/org/redisson/RedissonSetCache.java new file mode 100644 index 000000000..8188e137a --- /dev/null +++ b/src/main/java/org/redisson/RedissonSetCache.java @@ -0,0 +1,513 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; + +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.Convertor; +import org.redisson.client.protocol.convertor.VoidReplayConvertor; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.core.RSetCache; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; +import net.openhft.hashing.LongHashFunction; + +/** + *

Set-based cache with ability to set TTL for each entry via + * {@link #put(Object, Object, long, TimeUnit)} method. + * And therefore has an complex lua-scripts inside. + * Uses map (value_hash, value) to tie with expiration sorted set under the hood. + *

+ * + *

Current Redis implementation doesn't have set entry eviction functionality. + * Thus values are checked for TTL expiration during any value read operation. + * If entry expired then it doesn't returns and clean task runs asynchronous. + * Clean task deletes removes 100 expired entries at once. + * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler + * deletes expired entries in time interval between 5 seconds to 2 hours.

+ * + *

If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.

+ * + * @author Nikita Koksharov + * + * @param key + * @param value + */ +public class RedissonSetCache extends RedissonExpirable implements RSetCache { + + private static final RedisCommand EVAL_ADD = new RedisCommand("EVAL", new BooleanReplayConvertor(), 5); + private static final RedisCommand EVAL_ADD_TTL = new RedisCommand("EVAL", new BooleanReplayConvertor(), 7); + private static final RedisCommand EVAL_OBJECTS = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4); + private static final RedisCommand EVAL_REMOVE_EXPIRED = new RedisCommand("EVAL", 5); + private static final RedisCommand> EVAL_CONTAINS_KEY = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); + private static final RedisStrictCommand HDEL = new RedisStrictCommand("HDEL", new BooleanReplayConvertor()); + + protected RedissonSetCache(CommandAsyncExecutor commandExecutor, String name) { + super(commandExecutor, name); + RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); + } + + protected RedissonSetCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); + } + + @Override + public int size() { + return get(sizeAsync()); + } + + @Override + public Future sizeAsync() { + return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName()); + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean contains(Object o) { + return get(containsAsync(o)); + } + + private byte[] hash(Object o) { + if (o == null) { + throw new NullPointerException("Value can't be null"); + } + try { + byte[] objectState = codec.getValueEncoder().encode(o); + long h1 = LongHashFunction.farmUo().hashBytes(objectState); + long h2 = LongHashFunction.xx_r39().hashBytes(objectState); + + return ByteBuffer.allocate((2 * Long.SIZE) / Byte.SIZE) + .putLong(h1) + .putLong(h2) + .array(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + String getTimeoutSetName() { + return "redisson__timeout__set__{" + getName() + "}"; + } + + @Override + public Future containsAsync(Object o) { + Promise result = newPromise(); + + byte[] key = hash(o); + + Future> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_KEY, + "local value = redis.call('hexists', KEYS[1], KEYS[3]); " + + "local expireDate = 92233720368547758; " + + "if value == 1 then " + + "local expireDateScore = redis.call('zscore', KEYS[2], KEYS[3]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "end;" + + "return {expireDate, value}; ", + Arrays.asList(getName(), getTimeoutSetName(), key)); + + addExpireListener(result, future, new BooleanReplayConvertor(), false); + + return result; + } + + private void addExpireListener(final Promise result, Future> future, final Convertor convertor, final T nullValue) { + future.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + result.setFailure(future.cause()); + return; + } + + List res = future.getNow(); + Long expireDate = (Long) res.get(0); + long currentDate = System.currentTimeMillis(); + if (expireDate <= currentDate) { + result.setSuccess(nullValue); + expireMap(currentDate); + return; + } + + if (convertor != null) { + result.setSuccess((T) convertor.convert(res.get(1))); + } else { + result.setSuccess((T) res.get(1)); + } + } + }); + } + + private void expireMap(long currentDate) { + commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED, + "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 100); " + + "if #expiredKeys > 0 then " + + "local s = redis.call('zrem', KEYS[2], unpack(expiredKeys)); " + + "redis.call('hdel', KEYS[1], unpack(expiredKeys)); " + + "end;", + Arrays.asList(getName(), getTimeoutSetName()), currentDate); + } + + ListScanResult scanIterator(InetSocketAddress client, long startPos) { + Future> f = commandExecutor.evalReadAsync(client, getName(), codec, RedisCommands.EVAL_SSCAN, + "local result = {}; " + + "local res = redis.call('hscan', KEYS[1], ARGV[1]); " + + "for i, value in ipairs(res[2]) do " + + "if i % 2 == 0 then " + + "local key = res[2][i-1]; " + + "local expireDate = redis.call('zscore', KEYS[2], key); " + + "if (expireDate == false) or (expireDate ~= false and expireDate > ARGV[2]) then " + + "table.insert(result, value); " + + "end; " + + "end; " + + "end;" + + "return {res[1], result};", Arrays.asList(getName(), getTimeoutSetName()), startPos, System.currentTimeMillis()); + return get(f); + } + + @Override + public Iterator iterator() { + return new Iterator() { + + private List firstValues; + private Iterator iter; + private InetSocketAddress client; + private long nextIterPos; + + private boolean currentElementRemoved; + private boolean removeExecuted; + private V value; + + @Override + public boolean hasNext() { + if (iter == null || !iter.hasNext()) { + if (nextIterPos == -1) { + return false; + } + long prevIterPos = nextIterPos; + ListScanResult res = scanIterator(client, nextIterPos); + client = res.getRedisClient(); + if (nextIterPos == 0 && firstValues == null) { + firstValues = res.getValues(); + } else if (res.getValues().equals(firstValues)) { + return false; + } + iter = res.getValues().iterator(); + nextIterPos = res.getPos(); + if (prevIterPos == nextIterPos && !removeExecuted) { + nextIterPos = -1; + } + } + return iter.hasNext(); + } + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException("No such element at index"); + } + + value = iter.next(); + currentElementRemoved = false; + return value; + } + + @Override + public void remove() { + if (currentElementRemoved) { + throw new IllegalStateException("Element been already deleted"); + } + if (iter == null) { + throw new IllegalStateException(); + } + + iter.remove(); + RedissonSetCache.this.remove(value); + currentElementRemoved = true; + removeExecuted = true; + } + + }; + } + + private Future> readAllAsync() { + final Promise> result = newPromise(); + Future> future = commandExecutor.evalReadAsync(getName(), codec, new RedisCommand>("EVAL", new ObjectListReplayDecoder()), + "local keys = redis.call('hkeys', KEYS[1]);" + + "local expireSize = redis.call('zcard', KEYS[2]); " + + "local maxDate = ARGV[1]; " + + "local minExpireDate = 92233720368547758;" + + "if expireSize > 0 then " + + "for i, key in pairs(keys) do " + + "local expireDate = redis.call('zscore', KEYS[2], key); " + + "if expireDate ~= false and expireDate <= maxDate then " + + "minExpireDate = math.min(tonumber(expireDate), minExpireDate); " + + "table.remove(keys, i); " + + "end;" + + "end;" + + "end; " + + "return {minExpireDate, redis.call('hmget', KEYS[1], unpack(keys))};", + Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); + + future.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + result.setFailure(future.cause()); + return; + } + + List res = future.getNow(); + Long expireDate = (Long) res.get(0); + long currentDate = System.currentTimeMillis(); + if (expireDate <= currentDate) { + expireMap(currentDate); + } + + result.setSuccess((Collection) res.get(1)); + } + }); + + return result; + } + + @Override + public Object[] toArray() { + List res = (List) get(readAllAsync()); + return res.toArray(); + } + + @Override + public T[] toArray(T[] a) { + List res = (List) get(readAllAsync()); + return res.toArray(a); + } + + @Override + public boolean add(V e) { + return get(addAsync(e)); + } + + @Override + public boolean add(V value, long ttl, TimeUnit unit) { + return get(addAsync(value, ttl, unit)); + } + + @Override + public Future addAsync(V value, long ttl, TimeUnit unit) { + if (unit == null) { + throw new NullPointerException("TimeUnit param can't be null"); + } + + byte[] key = hash(value); + long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl); + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_ADD_TTL, + "redis.call('zadd', KEYS[2], ARGV[1], KEYS[3]); " + + "if redis.call('hexists', KEYS[1], KEYS[3]) == 0 then " + + "redis.call('hset', KEYS[1], KEYS[3], ARGV[2]); " + + "return 1; " + + "end;" + + "return 0; ", + Arrays.asList(getName(), getTimeoutSetName(), key), timeoutDate, value); + } + + + @Override + public Future addAsync(V e) { + byte[] key = hash(e); + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_ADD, + "if redis.call('hexists', KEYS[1], KEYS[2]) == 0 then " + + "redis.call('hset', KEYS[1], KEYS[2], ARGV[1]); " + + "return 1; " + + "end; " + + "return 0; ", + Arrays.asList(getName(), key), e); + } + + @Override + public Future removeAsync(Object o) { + byte[] key = hash(o); + return commandExecutor.writeAsync(getName(), codec, HDEL, getName(), key); + } + + @Override + public boolean remove(Object value) { + return get(removeAsync((V)value)); + } + + @Override + public boolean containsAll(Collection c) { + return get(containsAllAsync(c)); + } + + @Override + public Future containsAllAsync(Collection c) { + return commandExecutor.evalReadAsync(getName(), codec, EVAL_OBJECTS, + "local s = redis.call('hvals', KEYS[1]);" + + "for i = 0, table.getn(s), 1 do " + + "for j = 0, table.getn(ARGV), 1 do " + + "if ARGV[j] == s[i] then " + + "table.remove(ARGV, j) " + + "end " + + "end; " + + "end;" + + "return table.getn(ARGV) == 0 and 1 or 0; ", + Collections.singletonList(getName()), c.toArray()); + } + + @Override + public boolean addAll(Collection c) { + return get(addAllAsync(c)); + } + + @Override + public Future addAllAsync(Collection c) { + if (c.isEmpty()) { + return newSucceededFuture(false); + } + + List inParamTypes = new ArrayList(c.size()*2); + List params = new ArrayList(c.size()*2 + 1); + params.add(getName()); + for (V value : c) { + inParamTypes.add(ValueType.BINARY); + inParamTypes.add(ValueType.OBJECTS); + params.add(hash(value)); + params.add(value); + } + + RedisCommand command = new RedisCommand("HMSET", new VoidReplayConvertor(), 2, inParamTypes); + return commandExecutor.writeAsync(getName(), codec, command, params.toArray()); + } + + @Override + public boolean retainAll(Collection c) { + return get(retainAllAsync(c)); + } + + @Override + public Future retainAllAsync(Collection c) { + List hashes = new ArrayList(c.size()); + for (Object object : c) { + hashes.add(hash(object)); + } + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, + "local keys = redis.call('hkeys', KEYS[1]); " + + "local i=1;" + + "while i <= #keys do " + + "local changed = false;" + + "local element = keys[i];" + + "for j, argElement in pairs(ARGV) do " + + "if argElement == element then " + + "changed = true;" + + "table.remove(keys, i); " + + "table.remove(ARGV, j); " + + "break; " + + "end; " + + "end; " + + "if changed == false then " + + "i = i + 1 " + + "end " + + "end " + + "if #keys > 0 then " + + "for i=1, table.getn(keys),5000 do " + + "redis.call('hdel', KEYS[1], unpack(keys, i, math.min(i+4999, table.getn(keys)))); " + + "redis.call('zrem', KEYS[2], unpack(keys, i, math.min(i+4999, table.getn(keys)))); " + + "end " + + "return 1;" + + "end; " + + "return 0; ", + Arrays.asList(getName(), getTimeoutSetName()), hashes.toArray()); + } + + @Override + public Future removeAllAsync(Collection c) { + List hashes = new ArrayList(c.size()+1); + hashes.add(getName()); + for (Object object : c) { + hashes.add(hash(object)); + } + + return commandExecutor.writeAsync(getName(), codec, HDEL, hashes.toArray()); + } + + @Override + public boolean removeAll(Collection c) { + return get(removeAllAsync(c)); + } + + @Override + public void clear() { + delete(); + } + + @Override + public Future deleteAsync() { + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_SINGLE, getName(), getTimeoutSetName()); + } + + @Override + public Future expireAsync(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpire', KEYS[2], ARGV[1]); " + + "return redis.call('pexpire', KEYS[1], ARGV[1]); ", + Arrays.asList(getName(), getTimeoutSetName()), timeUnit.toSeconds(timeToLive)); + } + + @Override + public Future expireAtAsync(long timestamp) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpireat', KEYS[2], ARGV[1]); " + + "return redis.call('pexpireat', KEYS[1], ARGV[1]); ", + Arrays.asList(getName(), getTimeoutSetName()), timestamp); + } + + @Override + public Future clearExpireAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('persist', KEYS[2]); " + + "return redis.call('persist', KEYS[1]); ", + Arrays.asList(getName(), getTimeoutSetName())); + } + +} diff --git a/src/main/java/org/redisson/api/RBatchReactive.java b/src/main/java/org/redisson/api/RBatchReactive.java index 316dc255d..ff0a04d53 100644 --- a/src/main/java/org/redisson/api/RBatchReactive.java +++ b/src/main/java/org/redisson/api/RBatchReactive.java @@ -23,10 +23,9 @@ import org.redisson.client.codec.Codec; /** * Interface for using pipeline feature. * - * All methods invocations via Reactive objects - * which have gotten from this interface are batched - * to separate queue and could be executed later - * with execute() or executeReactive() methods. + * All method invocations on objects + * from this interface are batched to separate queue and could be executed later + * with execute() method. * * * @author Nikita Koksharov @@ -201,6 +200,6 @@ public interface RBatchReactive { * * @return List with result object for each command */ - Publisher> executeReactive(); + Publisher> execute(); } diff --git a/src/main/java/org/redisson/api/RMapCacheReactive.java b/src/main/java/org/redisson/api/RMapCacheReactive.java index 2ddd80a86..361b1f5e1 100644 --- a/src/main/java/org/redisson/api/RMapCacheReactive.java +++ b/src/main/java/org/redisson/api/RMapCacheReactive.java @@ -21,14 +21,14 @@ import org.reactivestreams.Publisher; /** *

Map-based cache with ability to set TTL for each entry via - * {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} + * {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} method. * And therefore has an complex lua-scripts inside.

* - *

Current redis implementation doesnt have eviction functionality. + *

Current redis implementation doesnt have map entry eviction functionality. * Thus entries are checked for TTL expiration during any key/value/entry read operation. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * Clean task deletes removes 100 expired entries at once. - * In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler + * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * deletes expired entries in time interval between 5 seconds to 2 hours.

* *

If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.

diff --git a/src/main/java/org/redisson/client/handler/CommandEncoder.java b/src/main/java/org/redisson/client/handler/CommandEncoder.java index a056878a3..dacbe6911 100644 --- a/src/main/java/org/redisson/client/handler/CommandEncoder.java +++ b/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -15,9 +15,12 @@ */ package org.redisson.client.handler; +import java.util.List; + +import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.Encoder; -import org.redisson.client.protocol.StringParamsEncoder; +import org.redisson.client.protocol.DefaultParamsEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -39,11 +42,11 @@ public class CommandEncoder extends MessageToByteEncoder msg, ByteBuf out) throws Exception { @@ -66,12 +69,12 @@ public class CommandEncoder extends MessageToByteEncoder msg, int param) { + private Encoder selectEncoder(CommandData msg, int param) { int typeIndex = 0; - if (msg.getCommand().getInParamType().size() > 1) { + List inParamType = msg.getCommand().getInParamType(); + if (inParamType.size() > 1) { typeIndex = param; } - if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP) { + if (inParamType.get(typeIndex) == ValueType.MAP) { if (param % 2 != 0) { return msg.getCodec().getMapValueEncoder(); } else { return msg.getCodec().getMapKeyEncoder(); } } - if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_KEY) { + if (inParamType.get(typeIndex) == ValueType.MAP_KEY) { return msg.getCodec().getMapKeyEncoder(); } - if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_VALUE) { + if (inParamType.get(typeIndex) == ValueType.MAP_VALUE) { return msg.getCodec().getMapValueEncoder(); } - if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.OBJECTS) { + if (inParamType.get(typeIndex) == ValueType.OBJECTS) { return msg.getCodec().getValueEncoder(); } + if (inParamType.get(typeIndex) == ValueType.BINARY) { + return ByteArrayCodec.INSTANCE.getValueEncoder(); + } throw new IllegalStateException(); } diff --git a/src/main/java/org/redisson/client/protocol/StringParamsEncoder.java b/src/main/java/org/redisson/client/protocol/DefaultParamsEncoder.java similarity index 87% rename from src/main/java/org/redisson/client/protocol/StringParamsEncoder.java rename to src/main/java/org/redisson/client/protocol/DefaultParamsEncoder.java index dc746b674..394cbce12 100644 --- a/src/main/java/org/redisson/client/protocol/StringParamsEncoder.java +++ b/src/main/java/org/redisson/client/protocol/DefaultParamsEncoder.java @@ -17,10 +17,13 @@ package org.redisson.client.protocol; import java.io.UnsupportedEncodingException; -public class StringParamsEncoder implements Encoder { +public class DefaultParamsEncoder implements Encoder { @Override public byte[] encode(Object in) { + if (in instanceof byte[]) { + return (byte[]) in; + } try { return in.toString().getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index 88bf24ba7..f48ede62e 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -24,7 +24,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder; public class RedisCommand { - public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP} + public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY} private ValueType outParamType = ValueType.OBJECT; private List inParamType = Arrays.asList(ValueType.OBJECT); diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 2ebd63a1a..15cdff6ea 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -103,6 +103,7 @@ public interface RedisCommands { RedisCommand SREM_SINGLE = new RedisCommand("SREM", new BooleanReplayConvertor(), 2); RedisCommand> SMEMBERS = new RedisCommand>("SMEMBERS", new ObjectListReplayDecoder()); RedisCommand> SSCAN = new RedisCommand>("SSCAN", new NestedMultiDecoder(new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.OBJECT); + RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", new NestedMultiDecoder(new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.OBJECT); RedisCommand SISMEMBER = new RedisCommand("SISMEMBER", new BooleanReplayConvertor(), 2); RedisStrictCommand SCARD_INT = new RedisStrictCommand("SCARD", new IntegerReplayConvertor()); RedisStrictCommand SCARD = new RedisStrictCommand("SCARD"); @@ -144,6 +145,7 @@ public interface RedisCommands { RedisStrictCommand> SCRIPT_EXISTS = new RedisStrictCommand>("SCRIPT", "EXISTS", new ObjectListReplayDecoder(), new BooleanReplayConvertor()); RedisStrictCommand EVAL_BOOLEAN = new RedisStrictCommand("EVAL", new BooleanReplayConvertor()); + RedisStrictCommand EVAL_BOOLEAN_WITH_VALUES = new RedisStrictCommand("EVAL", new BooleanReplayConvertor(), 4); RedisStrictCommand EVAL_STRING = new RedisStrictCommand("EVAL", new StringReplayDecoder()); RedisStrictCommand EVAL_INTEGER = new RedisStrictCommand("EVAL", new IntegerReplayConvertor()); RedisStrictCommand EVAL_LONG = new RedisStrictCommand("EVAL"); @@ -173,7 +175,7 @@ public interface RedisCommands { RedisCommand HEXISTS = new RedisCommand("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY); RedisStrictCommand HLEN = new RedisStrictCommand("HLEN", new IntegerReplayConvertor()); RedisCommand> HKEYS = new RedisCommand>("HKEYS", new ObjectSetReplayDecoder(), ValueType.MAP_KEY); - RedisCommand HMSET = new RedisCommand("HMSET", new StringReplayDecoder(), 2, ValueType.MAP); + RedisCommand HMSET = new RedisCommand("HMSET", new VoidReplayConvertor(), 2, ValueType.MAP); RedisCommand> HMGET = new RedisCommand>("HMGET", new ObjectListReplayDecoder(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE); RedisCommand HGET = new RedisCommand("HGET", 2, ValueType.MAP_KEY, ValueType.MAP_VALUE); RedisCommand HDEL = new RedisStrictCommand("HDEL", 2, ValueType.MAP_KEY); diff --git a/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 5f17a7baf..cb9d48264 100644 --- a/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -37,6 +37,8 @@ public interface CommandAsyncExecutor { V get(Future future); + Future evalScriptReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + Future writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params); Future readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params); diff --git a/src/main/java/org/redisson/core/RBatch.java b/src/main/java/org/redisson/core/RBatch.java index 1b47c2c10..5670d1e13 100644 --- a/src/main/java/org/redisson/core/RBatch.java +++ b/src/main/java/org/redisson/core/RBatch.java @@ -25,9 +25,8 @@ import io.netty.util.concurrent.Future; /** * Interface for using pipeline feature. * - * All methods invocations via async objects - * which have gotten from this interface are batched - * to separate queue and could be executed later + * All method invocations on objects + * from this interface are batched to separate queue and could be executed later * with execute() or executeAsync() methods. * * @@ -36,6 +35,33 @@ import io.netty.util.concurrent.Future; */ public interface RBatch { + /** + * Returns set-based cache instance by name. + * Uses map (value_hash, value) under the hood for minimal memory consumption. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

+ * + * @param name + * @param codec + * @return + */ + RSetCacheAsync getSetCache(String name); + + /** + * Returns set-based cache instance by name + * using provided codec for values. + * Uses map (value_hash, value) under the hood for minimal memory consumption. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

+ * + * @param name + * @param codec + * @return + */ + RSetCacheAsync getSetCache(String name, Codec codec); + /** * Returns map-based cache instance by name * using provided codec for both cache keys and values. diff --git a/src/main/java/org/redisson/core/RMapCache.java b/src/main/java/org/redisson/core/RMapCache.java index 7428b1ab2..94f62663a 100644 --- a/src/main/java/org/redisson/core/RMapCache.java +++ b/src/main/java/org/redisson/core/RMapCache.java @@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit; * {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} * And therefore has an complex lua-scripts inside.

* - *

Current redis implementation doesnt have eviction functionality. + *

Current redis implementation doesnt have map entry eviction functionality. * Thus entries are checked for TTL expiration during any key/value/entry read operation. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * Clean task deletes removes 100 expired entries at once. - * In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler + * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * deletes expired entries in time interval between 5 seconds to 2 hours.

* *

If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.

diff --git a/src/main/java/org/redisson/core/RMapCacheAsync.java b/src/main/java/org/redisson/core/RMapCacheAsync.java index fb28dbc19..e7a600492 100644 --- a/src/main/java/org/redisson/core/RMapCacheAsync.java +++ b/src/main/java/org/redisson/core/RMapCacheAsync.java @@ -28,7 +28,7 @@ import io.netty.util.concurrent.Future; * Thus entries are checked for TTL expiration during any key/value/entry read operation. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * Clean task deletes removes 100 expired entries at once. - * In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler + * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * deletes expired entries in time interval between 5 seconds to 2 hours.

* *

If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.

diff --git a/src/main/java/org/redisson/core/RSetCache.java b/src/main/java/org/redisson/core/RSetCache.java new file mode 100644 index 000000000..5fbce9c77 --- /dev/null +++ b/src/main/java/org/redisson/core/RSetCache.java @@ -0,0 +1,46 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + *

Set-based cache with ability to set TTL for each entry via + * {@link #put(Object, Object, long, TimeUnit)} method. + * And therefore has an complex lua-scripts inside. + * Uses map (value_hash, value) to tie with expiration sorted set under the hood. + *

+ * + *

Current Redis implementation doesn't have set entry eviction functionality. + * Thus values are checked for TTL expiration during any value read operation. + * If entry expired then it doesn't returns and clean task runs asynchronous. + * Clean task deletes removes 100 expired entries at once. + * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler + * deletes expired entries in time interval between 5 seconds to 2 hours.

+ * + *

If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.

+ * + * @author Nikita Koksharov + * + * @param key + * @param value + */ +public interface RSetCache extends Set, RExpirable, RSetCacheAsync { + + boolean add(V value, long ttl, TimeUnit unit); + +} diff --git a/src/main/java/org/redisson/core/RSetCacheAsync.java b/src/main/java/org/redisson/core/RSetCacheAsync.java new file mode 100644 index 000000000..601e4a1f9 --- /dev/null +++ b/src/main/java/org/redisson/core/RSetCacheAsync.java @@ -0,0 +1,33 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.concurrent.TimeUnit; + +import io.netty.util.concurrent.Future; + +/** + * Async set functions + * + * @author Nikita Koksharov + * + * @param value + */ +public interface RSetCacheAsync extends RCollectionAsync { + + Future addAsync(V value, long ttl, TimeUnit unit); + +} diff --git a/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 4071a4a9a..0248156a1 100644 --- a/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -173,7 +173,7 @@ public class RedissonBatchReactive implements RBatchReactive { } @Override - public Publisher> executeReactive() { + public Publisher> execute() { return new NettyFuturePublisher>(executorService.executeAsync()); } diff --git a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index e78d115bb..dcc6ea1e9 100644 --- a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; -import org.redisson.RedissonCacheEvictionScheduler; +import org.redisson.RedissonEvictionScheduler; import org.redisson.api.RMapCacheReactive; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; @@ -51,14 +51,14 @@ import reactor.rx.action.support.DefaultSubscriber; /** *

Map-based cache with ability to set TTL for each entry via - * {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} + * {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} method. * And therefore has an complex lua-scripts inside.

* - *

Current redis implementation doesnt have eviction functionality. + *

Current redis implementation doesnt have map entry eviction functionality. * Thus entries are checked for TTL expiration during any key/value/entry read operation. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * Clean task deletes removes 100 expired entries at once. - * In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler + * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * deletes expired entries in time interval between 5 seconds to 2 hours.

* *

If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.

@@ -80,16 +80,14 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im private static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 5, ValueType.MAP_KEY); private static final RedisCommand EVAL_REMOVE_EXPIRED = new RedisCommand("EVAL", 5); - private static final RedissonCacheEvictionScheduler SCHEDULER = new RedissonCacheEvictionScheduler(); - public RedissonMapCacheReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); - SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor); + RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); } public RedissonMapCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { super(codec, commandExecutor, name); - SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor); + RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); } @Override diff --git a/src/test/java/org/redisson/RedissonMapCacheTest.java b/src/test/java/org/redisson/RedissonMapCacheTest.java index 0efc821bb..9a859d36e 100644 --- a/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -22,6 +22,7 @@ import org.redisson.codec.JsonJacksonCodec; import org.redisson.codec.MsgPackJacksonCodec; import org.redisson.core.Predicate; import org.redisson.core.RMapCache; +import org.redisson.core.RSetCache; import org.redisson.core.RMap; import io.netty.util.concurrent.Future; @@ -169,27 +170,6 @@ public class RedissonMapCacheTest extends BaseTest { Assert.assertEquals(expectedMap, filtered); } -// @Test -// public void testFilterKeys() { -// RCache map = redisson.getCache("filterKeys"); -// map.put(1, 100); -// map.put(2, 200); -// map.put(3, 300); -// map.put(4, 400); -// -// Map filtered = map.filterKeys(new Predicate() { -// @Override -// public boolean apply(Integer input) { -// return input >= 2 && input <= 3; -// } -// }); -// -// Map expectedMap = new HashMap(); -// expectedMap.put(2, 200); -// expectedMap.put(3, 300); -// Assert.assertEquals(expectedMap, filtered); -// } - @Test public void testExpiredIterator() throws InterruptedException { RMapCache cache = redisson.getMapCache("simple"); @@ -632,6 +612,23 @@ public class RedissonMapCacheTest extends BaseTest { Assert.assertEquals(testMap.hashCode(), map.hashCode()); } + @Test + public void testExpireOverwrite() throws InterruptedException, ExecutionException { + RMapCache set = redisson.getMapCache("simple"); + set.put("123", 3, 1, TimeUnit.SECONDS); + + Thread.sleep(800); + + set.put("123", 3, 1, TimeUnit.SECONDS); + + Thread.sleep(800); + Assert.assertEquals(3, (int)set.get("123")); + + Thread.sleep(200); + + Assert.assertFalse(set.containsKey("123")); + } + @Test public void testFastRemoveEmpty() throws Exception { RMapCache map = redisson.getMapCache("simple"); diff --git a/src/test/java/org/redisson/RedissonSetCacheTest.java b/src/test/java/org/redisson/RedissonSetCacheTest.java new file mode 100644 index 000000000..e46b01be0 --- /dev/null +++ b/src/test/java/org/redisson/RedissonSetCacheTest.java @@ -0,0 +1,354 @@ +package org.redisson; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.redisson.codec.MsgPackJacksonCodec; +import org.redisson.core.RSetCache; + +public class RedissonSetCacheTest extends BaseTest { + + public static class SimpleBean implements Serializable { + + private Long lng; + + public Long getLng() { + return lng; + } + + public void setLng(Long lng) { + this.lng = lng; + } + + } + + @Test + public void testAddBean() throws InterruptedException, ExecutionException { + SimpleBean sb = new SimpleBean(); + sb.setLng(1L); + RSetCache set = redisson.getSetCache("simple"); + set.add(sb); + Assert.assertEquals(sb.getLng(), set.iterator().next().getLng()); + } + + @Test + public void testAddExpire() throws InterruptedException, ExecutionException { + RSetCache set = redisson.getSetCache("simple"); + set.add("123", 1, TimeUnit.SECONDS); + Assert.assertThat(set, Matchers.contains("123")); + + Thread.sleep(1000); + + Assert.assertFalse(set.contains("123")); + + Thread.sleep(50); + + Assert.assertEquals(0, set.size()); + } + + @Test + public void testExpireOverwrite() throws InterruptedException, ExecutionException { + RSetCache set = redisson.getSetCache("simple"); + set.add("123", 1, TimeUnit.SECONDS); + + Thread.sleep(800); + + set.add("123", 1, TimeUnit.SECONDS); + + Thread.sleep(800); + Assert.assertTrue(set.contains("123")); + + Thread.sleep(200); + + Assert.assertFalse(set.contains("123")); + } + + @Test + public void testRemove() throws InterruptedException, ExecutionException { + RSetCache set = redisson.getSetCache("simple"); + set.add(1, 1, TimeUnit.SECONDS); + set.add(3, 2, TimeUnit.SECONDS); + set.add(7, 3, TimeUnit.SECONDS); + + Assert.assertTrue(set.remove(1)); + Assert.assertFalse(set.contains(1)); + Assert.assertThat(set, Matchers.containsInAnyOrder(3, 7)); + + Assert.assertFalse(set.remove(1)); + Assert.assertThat(set, Matchers.containsInAnyOrder(3, 7)); + + Assert.assertTrue(set.remove(3)); + Assert.assertFalse(set.contains(3)); + Assert.assertThat(set, Matchers.contains(7)); + Assert.assertEquals(1, set.size()); + } + + @Test + public void testIteratorRemove() throws InterruptedException { + RSetCache set = redisson.getSetCache("list"); + set.add("1"); + set.add("4", 1, TimeUnit.SECONDS); + set.add("2"); + set.add("5", 1, TimeUnit.SECONDS); + set.add("3"); + + Thread.sleep(1000); + + for (Iterator iterator = set.iterator(); iterator.hasNext();) { + String value = iterator.next(); + if (value.equals("2")) { + iterator.remove(); + } + } + + Assert.assertThat(set, Matchers.containsInAnyOrder("1", "3")); + + int iteration = 0; + for (Iterator iterator = set.iterator(); iterator.hasNext();) { + iterator.next(); + iterator.remove(); + iteration++; + } + + Assert.assertEquals(2, iteration); + + Assert.assertFalse(set.contains("4")); + Assert.assertFalse(set.contains("5")); + + Thread.sleep(50); + + Assert.assertEquals(0, set.size()); + Assert.assertTrue(set.isEmpty()); + } + + @Test + public void testIteratorSequence() { + RSetCache set = redisson.getSetCache("set"); + for (int i = 0; i < 1000; i++) { + set.add(Long.valueOf(i)); + } + + Set setCopy = new HashSet(); + for (int i = 0; i < 1000; i++) { + setCopy.add(Long.valueOf(i)); + } + + checkIterator(set, setCopy); + } + + private void checkIterator(Set set, Set setCopy) { + for (Iterator iterator = set.iterator(); iterator.hasNext();) { + Long value = iterator.next(); + if (!setCopy.remove(value)) { + Assert.fail(); + } + } + + Assert.assertEquals(0, setCopy.size()); + } + + @Test + public void testRetainAll() { + RSetCache set = redisson.getSetCache("set"); + for (int i = 0; i < 10000; i++) { + set.add(i); + set.add(i*10, 10, TimeUnit.SECONDS); + } + + Assert.assertTrue(set.retainAll(Arrays.asList(1, 2))); + Assert.assertThat(set, Matchers.containsInAnyOrder(1, 2)); + Assert.assertEquals(2, set.size()); + } + + @Test + public void testIteratorRemoveHighVolume() throws InterruptedException { + RSetCache set = redisson.getSetCache("set"); + for (int i = 1; i <= 5000; i++) { + set.add(i); + set.add(i*100000, 20, TimeUnit.SECONDS); + } + int cnt = 0; + + Iterator iterator = set.iterator(); + while (iterator.hasNext()) { + Integer integer = iterator.next(); + iterator.remove(); + cnt++; + } + Assert.assertEquals(10000, cnt); + Assert.assertEquals(0, set.size()); + } + + @Test + public void testContainsAll() { + RSetCache set = redisson.getSetCache("set"); + for (int i = 0; i < 200; i++) { + set.add(i); + } + + Assert.assertTrue(set.containsAll(Collections.emptyList())); + Assert.assertTrue(set.containsAll(Arrays.asList(30, 11))); + Assert.assertFalse(set.containsAll(Arrays.asList(30, 711, 11))); + } + + @Test + public void testToArray() throws InterruptedException { + RSetCache set = redisson.getSetCache("set"); + set.add("1"); + set.add("4"); + set.add("2", 1, TimeUnit.SECONDS); + set.add("5"); + set.add("3"); + + Thread.sleep(1000); + + MatcherAssert.assertThat(Arrays.asList(set.toArray()), Matchers.containsInAnyOrder("1", "4", "5", "3")); + + String[] strs = set.toArray(new String[0]); + MatcherAssert.assertThat(Arrays.asList(strs), Matchers.containsInAnyOrder("1", "4", "5", "3")); + } + + @Test + public void testContains() throws InterruptedException { + RSetCache set = redisson.getSetCache("set"); + + set.add(new TestObject("1", "2")); + set.add(new TestObject("1", "2")); + set.add(new TestObject("2", "3"), 1, TimeUnit.SECONDS); + set.add(new TestObject("3", "4")); + set.add(new TestObject("5", "6")); + + Thread.sleep(1000); + + Assert.assertFalse(set.contains(new TestObject("2", "3"))); + Assert.assertTrue(set.contains(new TestObject("1", "2"))); + Assert.assertFalse(set.contains(new TestObject("1", "9"))); + } + + @Test + public void testDuplicates() { + RSetCache set = redisson.getSetCache("set"); + + set.add(new TestObject("1", "2")); + set.add(new TestObject("1", "2")); + set.add(new TestObject("2", "3")); + set.add(new TestObject("3", "4")); + set.add(new TestObject("5", "6")); + + Assert.assertEquals(4, set.size()); + } + + @Test + public void testSize() { + RSetCache set = redisson.getSetCache("set"); + set.add(1); + set.add(2); + set.add(3); + set.add(3); + set.add(4); + set.add(5); + set.add(5); + + Assert.assertEquals(5, set.size()); + } + + + @Test + public void testRetainAllEmpty() { + RSetCache set = redisson.getSetCache("set"); + set.add(1); + set.add(2); + set.add(3); + set.add(4); + set.add(5); + + Assert.assertTrue(set.retainAll(Collections.emptyList())); + Assert.assertEquals(0, set.size()); + } + + @Test + public void testRetainAllNoModify() { + RSetCache set = redisson.getSetCache("set"); + set.add(1); + set.add(2); + + Assert.assertFalse(set.retainAll(Arrays.asList(1, 2))); // nothing changed + Assert.assertThat(set, Matchers.containsInAnyOrder(1, 2)); + } + + @Test + public void testExpiredIterator() throws InterruptedException { + RSetCache cache = redisson.getSetCache("simple"); + cache.add("0"); + cache.add("1", 1, TimeUnit.SECONDS); + cache.add("2", 3, TimeUnit.SECONDS); + cache.add("3", 4, TimeUnit.SECONDS); + cache.add("4", 1, TimeUnit.SECONDS); + + Thread.sleep(1000); + + MatcherAssert.assertThat(cache, Matchers.contains("0", "2", "3")); + } + + @Test + public void testExpire() throws InterruptedException { + RSetCache cache = redisson.getSetCache("simple"); + cache.add("8", 1, TimeUnit.SECONDS); + + cache.expire(100, TimeUnit.MILLISECONDS); + + Thread.sleep(500); + + Assert.assertEquals(0, cache.size()); + } + + @Test + public void testExpireAt() throws InterruptedException { + RSetCache cache = redisson.getSetCache("simple"); + cache.add("8", 1, TimeUnit.SECONDS); + + cache.expireAt(System.currentTimeMillis() + 100); + + Thread.sleep(500); + + Assert.assertEquals(0, cache.size()); + } + + @Test + public void testClearExpire() throws InterruptedException { + RSetCache cache = redisson.getSetCache("simple"); + cache.add("8", 1, TimeUnit.SECONDS); + + cache.expireAt(System.currentTimeMillis() + 100); + + cache.clearExpire(); + + Thread.sleep(500); + + Assert.assertEquals(1, cache.size()); + } + + @Test + public void testScheduler() throws InterruptedException { + RSetCache cache = redisson.getSetCache("simple", new MsgPackJacksonCodec()); + Assert.assertFalse(cache.contains("33")); + + cache.add("33", 5, TimeUnit.SECONDS); + + Thread.sleep(11000); + + Assert.assertEquals(0, cache.size()); + + } + +}