loadBucketValues method added. #330

pull/337/head
Nikita 9 years ago
parent 365d1fe63b
commit 4848d118f3

@ -17,7 +17,9 @@ package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.redisson.api.RedissonReactiveClient;
@ -58,8 +60,6 @@ import org.redisson.core.RSetCache;
import org.redisson.core.RSortedSet;
import org.redisson.core.RTopic;
import io.netty.util.concurrent.Future;
/**
* Main infrastructure class allows to get access
* to all Redisson objects on top of Redis server.
@ -157,8 +157,7 @@ public class Redisson implements RedissonClient {
@Override
public <V> List<RBucket<V>> findBuckets(String pattern) {
Future<Collection<String>> r = commandExecutor.readAllAsync(RedisCommands.KEYS, pattern);
Collection<String> keys = commandExecutor.get(r);
Collection<String> keys = commandExecutor.get(commandExecutor.<List<String>, String>readAllAsync(RedisCommands.KEYS, pattern));
List<RBucket<V>> buckets = new ArrayList<RBucket<V>>(keys.size());
for (String key : keys) {
if(key == null) {
@ -169,6 +168,25 @@ public class Redisson implements RedissonClient {
return buckets;
}
public <V> Map<String, V> loadBucketValues(Collection<String> keys) {
return loadBucketValues(keys.toArray(new String[keys.size()]));
}
public <V> Map<String, V> loadBucketValues(String ... keys) {
Collection<Object> values = commandExecutor.get(commandExecutor.<List<Object>, Object>readAllAsync(RedisCommands.MGET, keys));
Map<String, V> result = new HashMap<String, V>(values.size());
int index = 0;
for (Object value : values) {
if(value == null) {
index++;
continue;
}
result.put(keys[index], (V)value);
index++;
}
return result;
}
@Override
public <V> List<RBucket<V>> getBuckets(String pattern) {
return findBuckets(pattern);

@ -15,7 +15,9 @@
*/
package org.redisson;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.redisson.client.codec.Codec;
import org.redisson.core.ClusterNode;
@ -137,6 +139,24 @@ public interface RedissonClient {
*/
<V> List<RBucket<V>> findBuckets(String pattern);
/**
* Returns RBucket value mapped by key. Result Map is not contains
* key-value entry for null values.
*
* @param keys
* @return
*/
<V> Map<String, V> loadBucketValues(Collection<String> keys);
/**
* Returns RBucket value mapped by key. Result Map is not contains
* key-value entry for null values.
*
* @param keys
* @return
*/
<V> Map<String, V> loadBucketValues(String ... keys);
/**
* Use {@link #findBuckets(String)}
*/

@ -168,6 +168,7 @@ public interface RedisCommands {
RedisStrictCommand<Void> FLUSHALL = new RedisStrictCommand<Void>("FLUSHALL", new VoidReplayConvertor());
RedisStrictCommand<List<String>> KEYS = new RedisStrictCommand<List<String>>("KEYS", new StringListReplayDecoder());
RedisCommand<List<Object>> MGET = new RedisCommand<List<Object>>("MGET", new ObjectListReplayDecoder<Object>());
RedisCommand<Boolean> HSET = new RedisCommand<Boolean>("HSET", new BooleanReplayConvertor(), 2, ValueType.MAP);
RedisStrictCommand<String> HINCRBYFLOAT = new RedisStrictCommand<String>("HINCRBYFLOAT");

@ -20,9 +20,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -57,6 +58,7 @@ import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -99,14 +101,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
final Promise<Collection<R>> mainPromise = connectionManager.newPromise();
Promise<R> promise = new DefaultPromise<R>() {
Queue<R> results = new ConcurrentLinkedQueue<R>();
List<R> results = new ArrayList<R>();
AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
@Override
public Promise<R> setSuccess(R result) {
if (result instanceof Collection) {
results.addAll((Collection)result);
synchronized (results) {
results.addAll((Collection)result);
}
} else {
results.add(result);
synchronized (results) {
results.add(result);
}
}
if (counter.decrementAndGet() == 0

@ -2,7 +2,9 @@ package org.redisson;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
@ -11,6 +13,21 @@ import org.redisson.core.RBucket;
public class RedissonBucketTest extends BaseTest {
@Test
public void testLoadBucketValues() {
RBucket<String> bucket1 = redisson.getBucket("test1");
bucket1.set("someValue1");
RBucket<String> bucket3 = redisson.getBucket("test3");
bucket3.set("someValue3");
Map<String, String> result = redisson.loadBucketValues("test1", "test2", "test3", "test4");
Map<String, String> expected = new HashMap<String, String>();
expected.put("test1", "someValue1");
expected.put("test3", "someValue3");
Assert.assertEquals(expected, result);
}
@Test
public void testExpire() throws InterruptedException {
RBucket<String> bucket = redisson.getBucket("test1");

Loading…
Cancel
Save