RMapAsync interface added. #186

pull/243/head
Nikita 10 years ago
parent 0e0e1c1b3f
commit 68a1c014c2

@ -21,7 +21,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -30,10 +29,11 @@ import java.util.Set;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec; import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.core.Predicate; import org.redisson.core.Predicate;
@ -61,8 +61,12 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override @Override
public int size() { public int size() {
Long res = connectionManager.read(getName(), RedisCommands.HLEN, getName()); return connectionManager.get(sizeAsync());
return res.intValue(); }
@Override
public Future<Integer> sizeAsync() {
return connectionManager.readAsync(getName(), RedisCommands.HLEN, getName());
} }
@Override @Override
@ -75,10 +79,27 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return connectionManager.read(getName(), RedisCommands.HEXISTS, getName(), key); return connectionManager.read(getName(), RedisCommands.HEXISTS, getName(), key);
} }
@Override
public Future<Boolean> containsKeyAsync(Object key) {
return connectionManager.readAsync(getName(), RedisCommands.HEXISTS, getName(), key);
}
@Override @Override
public boolean containsValue(Object value) { public boolean containsValue(Object value) {
Collection<V> list = values(); return connectionManager.get(containsValueAsync(value));
return list.contains(value); }
@Override
public Future<Boolean> containsValueAsync(Object value) {
return connectionManager.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do "
+ "if ARGV[1] == s[i] then "
+ "return true "
+ "end "
+ "end;" +
"return false",
Collections.<Object>singletonList(getName()), value);
} }
@Override @Override
@ -133,13 +154,22 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override @Override
public Set<K> keySet() { public Set<K> keySet() {
List<K> keys = connectionManager.read(getName(), RedisCommands.HKEYS, getName()); return connectionManager.get(keySetAsync());
return new HashSet<K>(keys); }
@Override
public Future<Set<K>> keySetAsync() {
return connectionManager.readAsync(getName(), RedisCommands.HKEYS, getName());
} }
@Override @Override
public Collection<V> values() { public Collection<V> values() {
return connectionManager.read(getName(), RedisCommands.HVALS, getName()); return connectionManager.get(valuesAsync());
}
@Override
public Future<Collection<V>> valuesAsync() {
return connectionManager.readAsync(getName(), RedisCommands.HVALS, getName());
} }
@Override @Override
@ -232,7 +262,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} }
@Override @Override
public Future<Long> fastRemoveAsync(final K ... keys) { public Future<Long> fastRemoveAsync(K ... keys) {
if (keys == null || keys.length == 0) { if (keys == null || keys.length == 0) {
return connectionManager.getGroup().next().newSucceededFuture(0L); return connectionManager.getGroup().next().newSucceededFuture(0L);
} }
@ -335,31 +365,16 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} }
@Override @Override
public V addAndGet(K key, V value) { public V addAndGet(K key, Number value) {
String res = connectionManager.write(getName(), StringCodec.INSTANCE, return connectionManager.get(addAndGetAsync(key, value));
RedisCommands.HINCRBYFLOAT, getName(), key, new BigDecimal(value.toString()).toPlainString());
if (value instanceof Long) {
Object obj = Long.parseLong(res);
return (V)obj;
}
if (value instanceof Integer) {
Object obj = Integer.parseInt(res);
return (V)obj;
} }
if (value instanceof Float) {
Object obj = Float.parseFloat(res); @Override
return (V)obj; public Future<V> addAndGetAsync(K key, Number value) {
} return connectionManager.writeAsync(getName(), StringCodec.INSTANCE,
if (value instanceof Double) { new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
Object obj = Double.parseDouble(res); getName(), key, new BigDecimal(value.toString()).toPlainString());
return (V)obj;
}
if (value instanceof BigDecimal) {
Object obj = new BigDecimal(res);
return (V)obj;
}
throw new IllegalStateException("Wrong value type!");
} }
} }

@ -126,6 +126,10 @@ public class RedisCommand<R> {
this.inParamType = inParamTypes; this.inParamType = inParamTypes;
} }
public RedisCommand(String name, Convertor<R> convertor) {
this(name, convertor, -1);
}
public RedisCommand(String name, Convertor<R> convertor, int encodeParamIndex) { public RedisCommand(String name, Convertor<R> convertor, int encodeParamIndex) {
this(name, null, null, null, encodeParamIndex); this(name, null, null, null, encodeParamIndex);
this.convertor = convertor; this.convertor = convertor;

@ -17,6 +17,7 @@ package org.redisson.client.protocol;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
@ -30,6 +31,7 @@ import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder; import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder; import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapReplayDecoder; import org.redisson.client.protocol.decoder.StringMapReplayDecoder;
@ -110,8 +112,8 @@ public interface RedisCommands {
RedisCommand<Map<Object, Object>> HGETALL = new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP); RedisCommand<Map<Object, Object>> HGETALL = new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP);
RedisCommand<List<Object>> HVALS = new RedisCommand<List<Object>>("HVALS", new ObjectListReplayDecoder(), ValueType.MAP_VALUE); RedisCommand<List<Object>> HVALS = new RedisCommand<List<Object>>("HVALS", new ObjectListReplayDecoder(), ValueType.MAP_VALUE);
RedisCommand<Boolean> HEXISTS = new RedisCommand<Boolean>("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY); RedisCommand<Boolean> HEXISTS = new RedisCommand<Boolean>("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY);
RedisStrictCommand<Long> HLEN = new RedisStrictCommand<Long>("HLEN"); RedisStrictCommand<Integer> HLEN = new RedisStrictCommand<Integer>("HLEN", new IntegerReplayConvertor());
RedisCommand<List<Object>> HKEYS = new RedisCommand<List<Object>>("HKEYS", new ObjectListReplayDecoder(), ValueType.MAP_KEY); RedisCommand<Set<Object>> HKEYS = new RedisCommand<Set<Object>>("HKEYS", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
RedisCommand<String> HMSET = new RedisCommand<String>("HMSET", new StringReplayDecoder(), 1, ValueType.MAP); RedisCommand<String> HMSET = new RedisCommand<String>("HMSET", new StringReplayDecoder(), 1, ValueType.MAP);
RedisCommand<List<Object>> HMGET = new RedisCommand<List<Object>>("HMGET", new ObjectListReplayDecoder(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE); RedisCommand<List<Object>> HMGET = new RedisCommand<List<Object>>("HMGET", new ObjectListReplayDecoder(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<Object> HGET = new RedisCommand<Object>("HGET", 2, ValueType.MAP_KEY, ValueType.MAP_VALUE); RedisCommand<Object> HGET = new RedisCommand<Object>("HGET", 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);

@ -0,0 +1,40 @@
package org.redisson.client.protocol.convertor;
import java.math.BigDecimal;
public class NumberConvertor extends SingleConvertor<Object> {
private Class<?> resultClass;
public NumberConvertor(Class<?> resultClass) {
super();
this.resultClass = resultClass;
}
@Override
public Object convert(Object result) {
String res = (String) result;
if (resultClass.isAssignableFrom(Long.class)) {
Object obj = Long.parseLong(res);
return obj;
}
if (resultClass.isAssignableFrom(Integer.class)) {
Object obj = Integer.parseInt(res);
return obj;
}
if (resultClass.isAssignableFrom(Float.class)) {
Object obj = Float.parseFloat(res);
return obj;
}
if (resultClass.isAssignableFrom(Double.class)) {
Object obj = Double.parseDouble(res);
return obj;
}
if (resultClass.isAssignableFrom(BigDecimal.class)) {
Object obj = new BigDecimal(res);
return obj;
}
throw new IllegalStateException("Wrong value type!");
}
}

@ -0,0 +1,41 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import io.netty.buffer.ByteBuf;
public class ObjectSetReplayDecoder implements MultiDecoder<Set<Object>> {
@Override
public Object decode(ByteBuf buf) {
throw new UnsupportedOperationException();
}
@Override
public Set<Object> decode(List<Object> parts) {
return new HashSet<Object>(parts);
}
@Override
public boolean isApplicable(int paramNum) {
return false;
}
}

@ -15,8 +15,6 @@
*/ */
package org.redisson.core; package org.redisson.core;
import io.netty.util.concurrent.Future;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -30,7 +28,7 @@ import java.util.concurrent.ConcurrentMap;
* @param <K> key * @param <K> key
* @param <V> value * @param <V> value
*/ */
public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable { public interface RMap<K, V> extends ConcurrentMap<K, V>, RMapAsync<K, V> {
/** /**
* Atomically adds the given <code>delta</code> to the current value * Atomically adds the given <code>delta</code> to the current value
@ -42,7 +40,7 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable {
* @param delta the value to add * @param delta the value to add
* @return the updated value * @return the updated value
*/ */
V addAndGet(K key, V delta); V addAndGet(K key, Number delta);
/** /**
* Gets a map slice contains the mappings with defined <code>keys</code> * Gets a map slice contains the mappings with defined <code>keys</code>
@ -100,31 +98,6 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable {
*/ */
long fastRemove(K ... keys); long fastRemove(K ... keys);
/**
* Removes <code>keys</code> from map by one operation in async manner
*
* Works faster than <code>RMap.removeAsync</code> but not returning
* the value associated with <code>key</code>
*
* @param keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
*/
Future<Long> fastRemoveAsync(K ... keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in async manner.
*
* Works faster than <code>RMap.putAsync</code> but not returning
* the previous value associated with <code>key</code>
*
* @param key
* @param value
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
Future<Boolean> fastPutAsync(K key, V value);
/** /**
* Associates the specified <code>value</code> with the specified <code>key</code>. * Associates the specified <code>value</code> with the specified <code>key</code>.
* *
@ -138,18 +111,4 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable {
*/ */
boolean fastPut(K key, V value); boolean fastPut(K key, V value);
Future<V> getAsync(K key);
Future<V> putAsync(K key, V value);
Future<V> removeAsync(K key);
Future<V> replaceAsync(K key, V value);
Future<Boolean> replaceAsync(K key, V oldValue, V newValue);
Future<Long> removeAsync(Object key, Object value);
Future<V> putIfAbsentAsync(K key, V value);
} }

@ -0,0 +1,84 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import java.util.Set;
import io.netty.util.concurrent.Future;
/**
* Async map functions
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public interface RMapAsync<K, V> extends RExpirableAsync {
Future<V> addAndGetAsync(K key, Number value);
Future<Collection<V>> valuesAsync();
Future<Set<K>> keySetAsync();
Future<Boolean> containsValueAsync(Object value);
Future<Boolean> containsKeyAsync(Object key);
Future<Integer> sizeAsync();
/**
* Removes <code>keys</code> from map by one operation in async manner
*
* Works faster than <code>RMap.removeAsync</code> but not returning
* the value associated with <code>key</code>
*
* @param keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
*/
Future<Long> fastRemoveAsync(K ... keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in async manner.
*
* Works faster than <code>RMap.putAsync</code> but not returning
* the previous value associated with <code>key</code>
*
* @param key
* @param value
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
Future<Boolean> fastPutAsync(K key, V value);
Future<V> getAsync(K key);
Future<V> putAsync(K key, V value);
Future<V> removeAsync(K key);
Future<V> replaceAsync(K key, V value);
Future<Boolean> replaceAsync(K key, V oldValue, V newValue);
Future<Long> removeAsync(Object key, Object value);
Future<V> putIfAbsentAsync(K key, V value);
}

@ -1,6 +1,7 @@
package org.redisson; package org.redisson;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -131,6 +132,14 @@ public class RedissonMapTest extends BaseTest {
Assert.assertEquals(112, (int)res); Assert.assertEquals(112, (int)res);
res = map.get(1); res = map.get(1);
Assert.assertEquals(112, (int)res); Assert.assertEquals(112, (int)res);
RMap<Integer, Double> map2 = redisson.getMap("getAll2");
map2.put(1, new Double(100.2));
Double res2 = map2.addAndGet(1, new Double(12.1));
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
res2 = map2.get(1);
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
} }
@Test @Test

Loading…
Cancel
Save