diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cf579841..77b46d7f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ Redisson Releases History Try __ULTRA-FAST__ [Redisson PRO](https://redisson.pro) edition. -### 10-Apr-2017 - versions 2.9.2 and 3.4.2 released +### 10-May-2017 - versions 2.9.2 and 3.4.2 released Feature - __Dropwizard metrics integration__ More details [here](https://github.com/redisson/redisson/wiki/14.-Integration-with-frameworks#147-dropwizard-metrics) Feature - `RLocalCachedMap.preloadCache` method added (thanks to Steve Draper) diff --git a/README.md b/README.md index 56dde5c7b..9a1b5be20 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ Redisson: Redis based In-Memory Data Grid for Java. ==== -[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.4.1) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [Support chat](https://gitter.im/mrniko/redisson) | [Ultra-fast version](https://redisson.pro) +[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.4.1) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [Support chat](https://gitter.im/mrniko/redisson) | **[Ultra-fast version](https://redisson.pro)** Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework. ## Please take part in [Redisson survey](https://www.surveymonkey.com/r/QXQZH5D) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java index fc9151f8d..4410270cf 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java @@ -26,6 +26,12 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry; import io.netty.buffer.ByteBuf; +/** + * + * @author Nikita Koksharov + * + * @param value type + */ abstract class RedissonBaseIterator implements Iterator { private List firstValues; @@ -36,7 +42,6 @@ abstract class RedissonBaseIterator implements Iterator { private boolean finished; private boolean currentElementRemoved; - private boolean removeExecuted; private V value; @Override @@ -47,7 +52,6 @@ abstract class RedissonBaseIterator implements Iterator { free(lastValues); currentElementRemoved = false; - removeExecuted = false; client = null; firstValues = null; lastValues = null; @@ -58,9 +62,7 @@ abstract class RedissonBaseIterator implements Iterator { } finished = false; } - long prevIterPos; do { - prevIterPos = nextIterPos; ListScanResult res = iterator(client, nextIterPos); if (lastValues != null) { free(lastValues); @@ -76,7 +78,6 @@ abstract class RedissonBaseIterator implements Iterator { client = null; firstValues = null; nextIterPos = 0; - prevIterPos = -1; } } else { if (firstValues.isEmpty()) { @@ -87,38 +88,38 @@ abstract class RedissonBaseIterator implements Iterator { client = null; firstValues = null; nextIterPos = 0; - prevIterPos = -1; continue; } if (res.getPos() == 0) { + free(firstValues); + free(lastValues); + finished = true; return false; } } - } else if (lastValues.removeAll(firstValues)) { + } else if (lastValues.removeAll(firstValues) + || (lastValues.isEmpty() && nextIterPos == 0)) { free(firstValues); free(lastValues); currentElementRemoved = false; - removeExecuted = false; + client = null; firstValues = null; lastValues = null; nextIterPos = 0; - prevIterPos = -1; if (tryAgain()) { continue; } + finished = true; return false; } } lastIter = res.getValues().iterator(); nextIterPos = res.getPos(); - } while (!lastIter.hasNext() && nextIterPos != prevIterPos); - if (prevIterPos == nextIterPos && !removeExecuted) { - finished = true; - } + } while (!lastIter.hasNext()); } return lastIter.hasNext(); } @@ -170,7 +171,6 @@ abstract class RedissonBaseIterator implements Iterator { lastIter.remove(); remove(value); currentElementRemoved = true; - removeExecuted = true; } abstract void remove(V value); diff --git a/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java index f2924e205..de2363299 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java @@ -28,6 +28,14 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry; import io.netty.buffer.ByteBuf; +/** + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + * @param loaded value type + */ public abstract class RedissonBaseMapIterator implements Iterator { private Map firstValues; @@ -38,7 +46,6 @@ public abstract class RedissonBaseMapIterator implements Iterator { private boolean finished; private boolean currentElementRemoved; - private boolean removeExecuted; protected Map.Entry entry; @Override @@ -49,7 +56,6 @@ public abstract class RedissonBaseMapIterator implements Iterator { free(lastValues); currentElementRemoved = false; - removeExecuted = false; client = null; firstValues = null; lastValues = null; @@ -60,15 +66,15 @@ public abstract class RedissonBaseMapIterator implements Iterator { } finished = false; } - long prevIterPos; do { - prevIterPos = nextIterPos; MapScanResult res = iterator(); if (lastValues != null) { free(lastValues); } + lastValues = convert(res.getMap()); client = res.getRedisClient(); + if (nextIterPos == 0 && firstValues == null) { firstValues = lastValues; lastValues = null; @@ -76,7 +82,6 @@ public abstract class RedissonBaseMapIterator implements Iterator { client = null; firstValues = null; nextIterPos = 0; - prevIterPos = -1; } } else { if (firstValues.isEmpty()) { @@ -87,38 +92,38 @@ public abstract class RedissonBaseMapIterator implements Iterator { client = null; firstValues = null; nextIterPos = 0; - prevIterPos = -1; continue; } if (res.getPos() == 0) { + free(firstValues); + free(lastValues); + finished = true; return false; } } - } else if (lastValues.keySet().removeAll(firstValues.keySet())) { + } else if (lastValues.keySet().removeAll(firstValues.keySet()) + || (lastValues.isEmpty() && nextIterPos == 0)) { free(firstValues); free(lastValues); currentElementRemoved = false; - removeExecuted = false; + client = null; firstValues = null; lastValues = null; nextIterPos = 0; - prevIterPos = -1; if (tryAgain()) { continue; } + finished = true; return false; } } lastIter = res.getMap().entrySet().iterator(); nextIterPos = res.getPos(); - } while (!lastIter.hasNext() && nextIterPos != prevIterPos); - if (prevIterPos == nextIterPos && !removeExecuted) { - finished = true; - } + } while (!lastIter.hasNext()); } return lastIter.hasNext(); @@ -184,7 +189,6 @@ public abstract class RedissonBaseMapIterator implements Iterator { lastIter.remove(); removeKey(); currentElementRemoved = true; - removeExecuted = true; entry = null; } diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java index 5699208b0..b82bb0761 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java @@ -123,6 +123,17 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { return blockingQueue.pollLastAndOfferFirstTo(queueName, timeout, unit); } + + @Override + public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { + RFuture res = takeLastAndOfferFirstToAsync(queueName); + return res.await().getNow(); + } + + @Override + public RFuture takeLastAndOfferFirstToAsync(String queueName) { + return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); + } @Override public int remainingCapacity() { diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index 46d8b6e0f..4f518e681 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -133,6 +133,17 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock RFuture res = pollLastAndOfferFirstToAsync(queueName, timeout, unit); return res.await().getNow(); } + + @Override + public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { + RFuture res = takeLastAndOfferFirstToAsync(queueName); + return res.await().getNow(); + } + + @Override + public RFuture takeLastAndOfferFirstToAsync(String queueName) { + return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); + } @Override public int remainingCapacity() { diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index f349774aa..ac309c5a9 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -33,6 +33,7 @@ import org.redisson.command.CommandExecutor; import org.redisson.connection.decoder.ListDrainToDecoder; import org.redisson.misc.PromiseDelegator; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.SemaphorePubSub; import io.netty.util.concurrent.Future; @@ -136,7 +137,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements } private RPromise wrapTakeFuture(final RFuture takeFuture) { - final RPromise result = new PromiseDelegator(commandExecutor.getConnectionManager().newPromise()) { + final RPromise result = new RedissonPromise() { @Override public boolean cancel(boolean mayInterruptIfRunning) { super.cancel(mayInterruptIfRunning); @@ -253,6 +254,17 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements return wrapTakeFuture(takeFuture); } + @Override + public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { + RFuture res = takeLastAndOfferFirstToAsync(queueName); + return res.await().getNow(); + } + + @Override + public RFuture takeLastAndOfferFirstToAsync(String queueName) { + return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); + } + @Override public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { RFuture takeFuture = commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout)); diff --git a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java index cc1c8f684..e545ba1bc 100644 --- a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RDelayedQueue; import org.redisson.api.RFuture; -import org.redisson.api.RQueue; import org.redisson.api.RTopic; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; @@ -417,7 +416,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay public RFuture peekAsync() { return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_OBJECT, "local v = redis.call('lindex', KEYS[1], 0); " - + "if v ~= nil then " + + "if v ~= false then " + "local randomId, value = struct.unpack('dLc0', v);" + "return value; " + "end " @@ -429,7 +428,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay public RFuture pollAsync() { return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, "local v = redis.call('lpop', KEYS[1]); " - + "if v ~= nil then " + + "if v ~= false then " + "redis.call('zrem', KEYS[2], v); " + "local randomId, value = struct.unpack('dLc0', v);" + "return value; " @@ -447,7 +446,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay public RFuture pollLastAndOfferFirstToAsync(String queueName) { return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, "local v = redis.call('rpop', KEYS[1]); " - + "if v ~= nil then " + + "if v ~= false then " + "redis.call('zrem', KEYS[2], v); " + "local randomId, value = struct.unpack('dLc0', v);" + "redis.call('lpush', KEYS[3], value); " diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 863a787d2..8cc9d8e1d 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -230,27 +231,15 @@ public class RedissonKeys implements RKeys { }; for (MasterSlaveEntry entry : entries) { - RFuture> findFuture = commandExecutor.readAsync(entry, null, RedisCommands.KEYS, pattern); - findFuture.addListener(new FutureListener>() { - @Override - public void operationComplete(Future> future) throws Exception { - if (!future.isSuccess()) { - failed.set(future.cause()); - checkExecution(result, failed, count, executed); - return; - } - - Collection keys = future.getNow(); - if (keys.isEmpty()) { - checkExecution(result, failed, count, executed); - return; - } - - RFuture deleteFuture = deleteAsync(keys.toArray(new String[keys.size()])); - deleteFuture.addListener(listener); - } - }); - } + Iterator keysIterator = createKeysIterator(entry, pattern, 10); + Collection keys = new HashSet(); + while (keysIterator.hasNext()) { + String key = keysIterator.next(); + keys.add(key); + } + RFuture deleteFuture = deleteAsync(keys.toArray(new String[keys.size()])); + deleteFuture.addListener(listener); + } return result; } diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 7bf4a1521..01a6cc955 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -143,6 +143,27 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc public RFuture lastAsync() { return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_SINGLE, getName(), -1, -1); } + + @Override + public Double firstScore() { + return get(firstScoreAsync()); + } + + @Override + public RFuture firstScoreAsync() { + return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_SINGLE_SCORE, getName(), 0, 0, "WITHSCORES"); + } + + @Override + public Double lastScore() { + return get(lastScoreAsync()); + } + + @Override + public RFuture lastScoreAsync() { + return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_SINGLE_SCORE, getName(), -1, -1, "WITHSCORES"); + } + @Override public RFuture addAsync(double score, V object) { diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java index b39c79c5d..d259c316a 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java @@ -43,5 +43,7 @@ public interface RBlockingQueue extends BlockingQueue, RQueue, RBlockin V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException; V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException; + + V takeLastAndOfferFirstTo(String queueName) throws InterruptedException; } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java index d52047bce..3e09054e6 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java @@ -93,6 +93,8 @@ public interface RBlockingQueueAsync extends RQueueAsync { RFuture drainToAsync(Collection c); RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit); + + RFuture takeLastAndOfferFirstToAsync(String queueName); /** * Retrieves and removes the head of this queue in async mode, waiting up to the diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 72b548972..869ab8dcb 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -52,6 +52,10 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< V first(); V last(); + + Double firstScore(); + + Double lastScore(); Long addAll(Map objects); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index 25bcfe5e4..2a98c42d0 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -37,6 +37,10 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn RFuture firstAsync(); RFuture lastAsync(); + + RFuture firstScoreAsync(); + + RFuture lastScoreAsync(); RFuture addAllAsync(Map objects); diff --git a/redisson/src/main/java/org/redisson/client/handler/BaseConnectionHandler.java b/redisson/src/main/java/org/redisson/client/handler/BaseConnectionHandler.java index 2fd0953ef..63daa0a71 100644 --- a/redisson/src/main/java/org/redisson/client/handler/BaseConnectionHandler.java +++ b/redisson/src/main/java/org/redisson/client/handler/BaseConnectionHandler.java @@ -73,7 +73,7 @@ public abstract class BaseConnectionHandler extends C futures.add(future); } if (config.getClientName() != null) { - RFuture future = connection.async(RedisCommands.CLIENT_SETNAME, config.getDatabase()); + RFuture future = connection.async(RedisCommands.CLIENT_SETNAME, config.getClientName()); futures.add(future); } if (config.isReadOnly()) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index a91c4dda6..b46ded991 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -46,11 +46,11 @@ import org.redisson.client.protocol.decoder.ListResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.Long2MultiDecoder; -import org.redisson.client.protocol.decoder.LongMultiDecoder; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; @@ -115,6 +115,7 @@ public interface RedisCommands { RedisCommand ZREVRANK_INT = new RedisCommand("ZREVRANK", new IntegerReplayConvertor(), 2); RedisStrictCommand ZRANK = new RedisStrictCommand("ZRANK", 2); RedisCommand ZRANGE_SINGLE = new RedisCommand("ZRANGE", new ObjectFirstResultReplayDecoder()); + RedisStrictCommand ZRANGE_SINGLE_SCORE = new RedisStrictCommand("ZRANGE", new ObjectFirstScoreReplayDecoder()); RedisCommand> ZRANGE = new RedisCommand>("ZRANGE", new ObjectListReplayDecoder()); RedisStrictCommand ZREMRANGEBYRANK = new RedisStrictCommand("ZREMRANGEBYRANK", new IntegerReplayConvertor()); RedisStrictCommand ZREMRANGEBYSCORE = new RedisStrictCommand("ZREMRANGEBYSCORE", new IntegerReplayConvertor()); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java new file mode 100644 index 000000000..19a0bf0c5 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java @@ -0,0 +1,49 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.math.BigDecimal; +import java.util.List; + +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +/** + * + * @author Nikita Koksharov + * + * + */ +public class ObjectFirstScoreReplayDecoder implements MultiDecoder { + + @Override + public Object decode(ByteBuf buf, State state) { + return new BigDecimal(buf.toString(CharsetUtil.UTF_8)).doubleValue(); + } + + @Override + public Double decode(List parts, State state) { + return (Double) parts.get(1); + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return paramNum % 2 != 0; + } + +} diff --git a/redisson/src/main/java/org/redisson/config/ConfigSupport.java b/redisson/src/main/java/org/redisson/config/ConfigSupport.java index 05a8cddbe..b61437af0 100644 --- a/redisson/src/main/java/org/redisson/config/ConfigSupport.java +++ b/redisson/src/main/java/org/redisson/config/ConfigSupport.java @@ -19,6 +19,9 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.Reader; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.net.URI; import java.net.URL; import java.util.List; @@ -121,62 +124,101 @@ public class ConfigSupport { private ObjectMapper jsonMapper = createMapper(null, null); private ObjectMapper yamlMapper = createMapper(new YAMLFactory(), null); + private void patchUriObject() throws IOException { + patchUriField("lowMask", "L_DASH"); + patchUriField("highMask", "H_DASH"); + } + + private void patchUriField(String methodName, String fieldName) + throws IOException { + try { + Method lowMask = URI.class.getDeclaredMethod(methodName, String.class); + lowMask.setAccessible(true); + Long lowMaskValue = (Long) lowMask.invoke(null, "-_"); + + Field lowDash = URI.class.getDeclaredField(fieldName); + + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(lowDash, lowDash.getModifiers() & ~Modifier.FINAL); + + lowDash.setAccessible(true); + lowDash.setLong(null, lowMaskValue); + } catch (Exception e) { + throw new IOException(e); + } + } + public T fromJSON(String content, Class configType) throws IOException { + patchUriObject(); return jsonMapper.readValue(content, configType); } public T fromJSON(File file, Class configType) throws IOException { + patchUriObject(); return fromJSON(file, configType, null); } public T fromJSON(File file, Class configType, ClassLoader classLoader) throws IOException { + patchUriObject(); jsonMapper = createMapper(null, classLoader); return jsonMapper.readValue(file, configType); } public T fromJSON(URL url, Class configType) throws IOException { + patchUriObject(); return jsonMapper.readValue(url, configType); } public T fromJSON(Reader reader, Class configType) throws IOException { + patchUriObject(); return jsonMapper.readValue(reader, configType); } public T fromJSON(InputStream inputStream, Class configType) throws IOException { + patchUriObject(); return jsonMapper.readValue(inputStream, configType); } public String toJSON(Config config) throws IOException { + patchUriObject(); return jsonMapper.writeValueAsString(config); } public T fromYAML(String content, Class configType) throws IOException { + patchUriObject(); return yamlMapper.readValue(content, configType); } public T fromYAML(File file, Class configType) throws IOException { + patchUriObject(); return yamlMapper.readValue(file, configType); } public T fromYAML(File file, Class configType, ClassLoader classLoader) throws IOException { + patchUriObject(); yamlMapper = createMapper(new YAMLFactory(), classLoader); return yamlMapper.readValue(file, configType); } public T fromYAML(URL url, Class configType) throws IOException { + patchUriObject(); return yamlMapper.readValue(url, configType); } public T fromYAML(Reader reader, Class configType) throws IOException { + patchUriObject(); return yamlMapper.readValue(reader, configType); } public T fromYAML(InputStream inputStream, Class configType) throws IOException { + patchUriObject(); return yamlMapper.readValue(inputStream, configType); } public String toYAML(Config config) throws IOException { + patchUriObject(); return yamlMapper.writeValueAsString(config); } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index e2d31ac0a..29f6d0128 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -81,7 +81,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { // TODO async List master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); - String masterHost = "redis://" + master.get(0) + ":" + master.get(1); + String masterHost = createAddress(master.get(0), master.get(1)); this.config.setMasterAddress(masterHost); currentMaster.set(masterHost); log.info("master: {} added", masterHost); @@ -97,7 +97,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String port = map.get("port"); String flags = map.get("flags"); - String host = "redis://" + ip + ":" + port; + String host = createAddress(ip, port); this.config.addSlaveAddress(host); slaves.put(host, true); @@ -133,6 +133,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { future.awaitUninterruptibly(); } } + + private String createAddress(String host, Object port) { + if (host.contains(":")) { + host = "[" + host + "]"; + } + return "redis://" + host + ":" + port; + } @Override protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, @@ -208,7 +215,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = parts[2]; String port = parts[3]; - String addr = "redis://" + ip + ":" + port; + String addr = createAddress(ip, port); URI uri = URIBuilder.create(addr); registerSentinel(cfg, uri, c); } @@ -222,7 +229,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { final String ip = parts[2]; final String port = parts[3]; - final String slaveAddr = "redis://" + ip + ":" + port; + final String slaveAddr = createAddress(ip, port); if (!isUseSameMaster(parts)) { return; @@ -310,7 +317,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String slaveAddr = ip + ":" + port; String master = currentMaster.get(); - String slaveMaster = "redis://" + parts[6] + ":" + parts[7]; + String slaveMaster = createAddress(parts[6], parts[7]); if (!master.equals(slaveMaster)) { log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMaster, master); return false; @@ -370,7 +377,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String port = parts[4]; String current = currentMaster.get(); - String newMaster = "redis://" + ip + ":" + port; + String newMaster = createAddress(ip, port); if (!newMaster.equals(current) && currentMaster.compareAndSet(current, newMaster)) { changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster)); diff --git a/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java b/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java index bc648735d..5ae73e626 100644 --- a/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java +++ b/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java @@ -39,18 +39,22 @@ public class RedissonCache implements Cache { private CacheConfig config; + private final boolean allowNullValues; + private final AtomicLong hits = new AtomicLong(); private final AtomicLong misses = new AtomicLong(); - public RedissonCache(RMapCache mapCache, CacheConfig config) { + public RedissonCache(RMapCache mapCache, CacheConfig config, boolean allowNullValues) { this.mapCache = mapCache; this.map = mapCache; this.config = config; + this.allowNullValues = allowNullValues; } - public RedissonCache(RMap map) { + public RedissonCache(RMap map, boolean allowNullValues) { this.map = map; + this.allowNullValues = allowNullValues; } @Override @@ -92,6 +96,15 @@ public class RedissonCache implements Cache { @Override public void put(Object key, Object value) { + if (!allowNullValues && value == null) { + if (mapCache != null) { + mapCache.remove(key); + } else { + map.remove(key); + } + return; + } + value = toStoreValue(value); if (mapCache != null) { mapCache.fastPut(key, value, config.getTTL(), TimeUnit.MILLISECONDS, config.getMaxIdleTime(), TimeUnit.MILLISECONDS); @@ -101,13 +114,22 @@ public class RedissonCache implements Cache { } public ValueWrapper putIfAbsent(Object key, Object value) { - value = toStoreValue(value); Object prevValue; - if (mapCache != null) { - prevValue = mapCache.putIfAbsent(key, value, config.getTTL(), TimeUnit.MILLISECONDS, config.getMaxIdleTime(), TimeUnit.MILLISECONDS); + if (!allowNullValues && value == null) { + if (mapCache != null) { + prevValue = mapCache.get(key); + } else { + prevValue = map.get(key); + } } else { - prevValue = map.putIfAbsent(key, value); + value = toStoreValue(value); + if (mapCache != null) { + prevValue = mapCache.putIfAbsent(key, value, config.getTTL(), TimeUnit.MILLISECONDS, config.getMaxIdleTime(), TimeUnit.MILLISECONDS); + } else { + prevValue = map.putIfAbsent(key, value); + } } + return toValueWrapper(prevValue); } diff --git a/redisson/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java b/redisson/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java index fea4663fa..c8a31de97 100644 --- a/redisson/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java +++ b/redisson/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java @@ -45,6 +45,10 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA private ResourceLoader resourceLoader; + private boolean dynamic = true; + + private boolean allowNullValues = true; + private Codec codec; private RedissonClient redisson; @@ -122,7 +126,37 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA this.configLocation = configLocation; this.codec = codec; } + + /** + * Defines possibility of storing {@code null} values. + *

+ * Default is true + * + * @param allowNullValues - stores if true + */ + public void setAllowNullValues(boolean allowNullValues) { + this.allowNullValues = allowNullValues; + } + /** + * Defines 'fixed' cache names. + * A new cache instance will not be created in dynamic for non-defined names. + *

+ * `null` parameter setups dynamic mode + * + * @param names of caches + */ + public void setCacheNames(Collection names) { + if (names != null) { + for (String name : names) { + getCache(name); + } + dynamic = false; + } else { + dynamic = true; + } + } + /** * Set cache config location * @@ -165,6 +199,9 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA if (cache != null) { return cache; } + if (!dynamic) { + return cache; + } CacheConfig config = configMap.get(name); if (config == null) { @@ -189,7 +226,7 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA map = redisson.getMap(name); } - Cache cache = new RedissonCache(map); + Cache cache = new RedissonCache(map, allowNullValues); Cache oldCache = instanceMap.putIfAbsent(name, cache); if (oldCache != null) { cache = oldCache; @@ -205,7 +242,7 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA map = redisson.getMapCache(name); } - Cache cache = new RedissonCache(map, config); + Cache cache = new RedissonCache(map, config, allowNullValues); Cache oldCache = instanceMap.putIfAbsent(name, cache); if (oldCache != null) { cache = oldCache; diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 54e4192ed..4dab4f004 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -296,16 +296,42 @@ public class RedissonBlockingQueueTest extends BaseTest { // TODO Auto-generated catch block e.printStackTrace(); } - }, 10, TimeUnit.SECONDS); + }, 5, TimeUnit.SECONDS); RBlockingQueue queue2 = redisson.getBlockingQueue("{queue}2"); queue2.put(4); queue2.put(5); queue2.put(6); - queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS); + Integer value = queue1.pollLastAndOfferFirstTo(queue2.getName(), 5, TimeUnit.SECONDS); + assertThat(value).isEqualTo(3); assertThat(queue2).containsExactly(3, 4, 5, 6); } + + @Test + public void testTakeLastAndOfferFirstTo() throws InterruptedException { + final RBlockingQueue queue1 = redisson.getBlockingQueue("{queue}1"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + try { + queue1.put(3); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }, 3, TimeUnit.SECONDS); + + RBlockingQueue queue2 = redisson.getBlockingQueue("{queue}2"); + queue2.put(4); + queue2.put(5); + queue2.put(6); + + long startTime = System.currentTimeMillis(); + Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName()); + assertThat(System.currentTimeMillis() - startTime).isBetween(2900L, 3200L); + assertThat(value).isEqualTo(3); + assertThat(queue2).containsExactly(3, 4, 5, 6); + } + @Test public void testAddOfferOrigin() { diff --git a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java index 2e9cab970..3d533f5d4 100644 --- a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java @@ -509,10 +509,38 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest { queue2.put(5); queue2.put(6); - queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS); + Integer value = queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS); + assertThat(value).isEqualTo(3); assertThat(queue2).containsExactly(3, 4, 5, 6); } + @Test + public void testTakeLastAndOfferFirstTo() throws InterruptedException { + final RBoundedBlockingQueue queue1 = redisson.getBoundedBlockingQueue("{queue}1"); + queue1.trySetCapacity(10); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + try { + queue1.put(3); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }, 3, TimeUnit.SECONDS); + + RBoundedBlockingQueue queue2 = redisson.getBoundedBlockingQueue("{queue}2"); + queue2.trySetCapacity(10); + queue2.put(4); + queue2.put(5); + queue2.put(6); + + long startTime = System.currentTimeMillis(); + Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName()); + assertThat(System.currentTimeMillis() - startTime).isBetween(3000L, 3200L); + assertThat(value).isEqualTo(3); + assertThat(queue2).containsExactly(3, 4, 5, 6); + } + + @Test public void testOffer() { RBoundedBlockingQueue queue = redisson.getBoundedBlockingQueue("blocking:queue"); diff --git a/redisson/src/test/java/org/redisson/RedissonKeysTest.java b/redisson/src/test/java/org/redisson/RedissonKeysTest.java index c19d167c5..6c13be4d2 100644 --- a/redisson/src/test/java/org/redisson/RedissonKeysTest.java +++ b/redisson/src/test/java/org/redisson/RedissonKeysTest.java @@ -66,6 +66,7 @@ public class RedissonKeysTest extends BaseTest { for (int i = 0; i < 115; i++) { String key = "key" + Math.random(); RBucket bucket = redisson.getBucket(key); + keys.add(key); bucket.set("someValue"); } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index cf7d0d3c6..e90447e83 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -274,7 +274,18 @@ public class RedissonScoredSortedSetTest extends BaseTest { Assert.assertEquals("a", set.first()); Assert.assertEquals("d", set.last()); } + + @Test + public void testFirstLastScore() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + set.add(0.4, "d"); + assertThat(set.firstScore()).isEqualTo(0.1); + assertThat(set.lastScore()).isEqualTo(0.4); + } @Test public void testRemoveRangeByScore() { diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index c6be7a759..38843b04a 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -6,6 +6,7 @@ import static org.redisson.BaseTest.createInstance; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -33,10 +34,13 @@ import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.codec.SerializationCodec; import org.redisson.config.Config; import org.redisson.connection.ConnectionListener; +import io.netty.buffer.Unpooled; + public class RedissonTest { protected RedissonClient redisson; @@ -76,7 +80,7 @@ public class RedissonTest { } @Test - public void testIterator() { + public void testIteratorNotLooped() { RedissonBaseIterator iter = new RedissonBaseIterator() { int i; @Override @@ -101,6 +105,41 @@ public class RedissonTest { Assert.assertFalse(iter.hasNext()); } + @Test + public void testIteratorNotLooped2() { + RedissonBaseIterator iter = new RedissonBaseIterator() { + int i; + @Override + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + i++; + if (i == 1) { + return new ListScanResult(14L, Arrays.asList(new ScanObjectEntry(Unpooled.wrappedBuffer(new byte[] {1}), 1))); + } + if (i == 2) { + return new ListScanResult(7L, Collections.emptyList()); + } + if (i == 3) { + return new ListScanResult(0L, Collections.emptyList()); + } + if (i == 4) { + return new ListScanResult(14L, Collections.emptyList()); + } + Assert.fail(); + return null; + } + + @Override + void remove(Integer value) { + } + + }; + + Assert.assertTrue(iter.hasNext()); + assertThat(iter.next()).isEqualTo(1); + Assert.assertFalse(iter.hasNext()); + } + + @BeforeClass public static void beforeClass() throws IOException, InterruptedException { if (!RedissonRuntimeEnvironment.isTravis) { @@ -250,7 +289,7 @@ public class RedissonTest { Assert.assertEquals(0, pp.stop()); - await().atMost(1, TimeUnit.SECONDS).until(() -> assertThat(connectCounter.get()).isEqualTo(1)); + await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(connectCounter.get()).isEqualTo(1)); await().until(() -> assertThat(disconnectCounter.get()).isEqualTo(1)); }