diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index cca20cd04..54975133a 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -17,7 +17,9 @@ package org.redisson; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.redisson.api.RedissonReactiveClient; @@ -58,8 +60,6 @@ import org.redisson.core.RSetCache; import org.redisson.core.RSortedSet; import org.redisson.core.RTopic; -import io.netty.util.concurrent.Future; - /** * Main infrastructure class allows to get access * to all Redisson objects on top of Redis server. @@ -157,8 +157,7 @@ public class Redisson implements RedissonClient { @Override public List> findBuckets(String pattern) { - Future> r = commandExecutor.readAllAsync(RedisCommands.KEYS, pattern); - Collection keys = commandExecutor.get(r); + Collection keys = commandExecutor.get(commandExecutor., String>readAllAsync(RedisCommands.KEYS, pattern)); List> buckets = new ArrayList>(keys.size()); for (String key : keys) { if(key == null) { @@ -169,6 +168,25 @@ public class Redisson implements RedissonClient { return buckets; } + public Map loadBucketValues(Collection keys) { + return loadBucketValues(keys.toArray(new String[keys.size()])); + } + + public Map loadBucketValues(String ... keys) { + Collection values = commandExecutor.get(commandExecutor., Object>readAllAsync(RedisCommands.MGET, keys)); + Map result = new HashMap(values.size()); + int index = 0; + for (Object value : values) { + if(value == null) { + index++; + continue; + } + result.put(keys[index], (V)value); + index++; + } + return result; + } + @Override public List> getBuckets(String pattern) { return findBuckets(pattern); diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index a77c63972..4fbe999aa 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -15,7 +15,9 @@ */ package org.redisson; +import java.util.Collection; import java.util.List; +import java.util.Map; import org.redisson.client.codec.Codec; import org.redisson.core.ClusterNode; @@ -137,6 +139,24 @@ public interface RedissonClient { */ List> findBuckets(String pattern); + /** + * Returns RBucket value mapped by key. Result Map is not contains + * key-value entry for null values. + * + * @param keys + * @return + */ + Map loadBucketValues(Collection keys); + + /** + * Returns RBucket value mapped by key. Result Map is not contains + * key-value entry for null values. + * + * @param keys + * @return + */ + Map loadBucketValues(String ... keys); + /** * Use {@link #findBuckets(String)} */ diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index de097ac61..ab0570ab2 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -168,6 +168,7 @@ public interface RedisCommands { RedisStrictCommand FLUSHALL = new RedisStrictCommand("FLUSHALL", new VoidReplayConvertor()); RedisStrictCommand> KEYS = new RedisStrictCommand>("KEYS", new StringListReplayDecoder()); + RedisCommand> MGET = new RedisCommand>("MGET", new ObjectListReplayDecoder()); RedisCommand HSET = new RedisCommand("HSET", new BooleanReplayConvertor(), 2, ValueType.MAP); RedisStrictCommand HINCRBYFLOAT = new RedisStrictCommand("HINCRBYFLOAT"); diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index ae6fea381..de2641185 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -20,9 +20,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -57,6 +58,7 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; +import io.netty.util.internal.PlatformDependent; /** * @@ -99,14 +101,18 @@ public class CommandAsyncService implements CommandAsyncExecutor { public Future> readAllAsync(RedisCommand command, Object ... params) { final Promise> mainPromise = connectionManager.newPromise(); Promise promise = new DefaultPromise() { - Queue results = new ConcurrentLinkedQueue(); + List results = new ArrayList(); AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size()); @Override public Promise setSuccess(R result) { if (result instanceof Collection) { - results.addAll((Collection)result); + synchronized (results) { + results.addAll((Collection)result); + } } else { - results.add(result); + synchronized (results) { + results.add(result); + } } if (counter.decrementAndGet() == 0 diff --git a/src/test/java/org/redisson/RedissonBucketTest.java b/src/test/java/org/redisson/RedissonBucketTest.java index 743c95508..b01c8abce 100755 --- a/src/test/java/org/redisson/RedissonBucketTest.java +++ b/src/test/java/org/redisson/RedissonBucketTest.java @@ -2,7 +2,9 @@ package org.redisson; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Assert; @@ -11,6 +13,21 @@ import org.redisson.core.RBucket; public class RedissonBucketTest extends BaseTest { + @Test + public void testLoadBucketValues() { + RBucket bucket1 = redisson.getBucket("test1"); + bucket1.set("someValue1"); + RBucket bucket3 = redisson.getBucket("test3"); + bucket3.set("someValue3"); + + Map result = redisson.loadBucketValues("test1", "test2", "test3", "test4"); + Map expected = new HashMap(); + expected.put("test1", "someValue1"); + expected.put("test3", "someValue3"); + + Assert.assertEquals(expected, result); + } + @Test public void testExpire() throws InterruptedException { RBucket bucket = redisson.getBucket("test1");