RMap.getAsync, RMap.putAsync and RMap.removeAsync methods added.

pull/25/head
Nikita 11 years ago
parent f5d8643301
commit e6f8c3cac0

@ -278,4 +278,182 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
}
@Override
public Future<V> getAsync(K key) {
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
return connection.getAsync().hget(getName(), key).addListener(connectionManager.createReleaseListener(connection));
}
@Override
public Future<V> putAsync(K key, V value) {
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
Promise<V> promise = connectionManager.getGroup().next().newPromise();
RedisAsyncConnection<Object, V> async = connection.getAsync();
putAsync(key, value, promise, async);
promise.addListener(connectionManager.createReleaseListener(connection));
return promise;
}
private void putAsync(final K key, final V value, final Promise<V> promise,
final RedisAsyncConnection<Object, V> async) {
async.watch(getName()).addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.hget(getName(), key).addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
final V prev = future.get();
async.multi().addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.hset(getName(), key, value).addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.exec().addListener(new FutureListener<List<Object>>() {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
if (future.get().size() == 1) {
promise.setSuccess(prev);
} else {
putAsync(key, value, promise, async);
}
}
});
}
});
}
});
}
});
}
});
}
@Override
public Future<V> removeAsync(K key) {
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
Promise<V> promise = connectionManager.getGroup().next().newPromise();
RedisAsyncConnection<Object, V> async = connection.getAsync();
removeAsync(key, promise, async);
promise.addListener(connectionManager.createReleaseListener(connection));
return promise;
}
private void removeAsync(final K key, final Promise<V> promise,
final RedisAsyncConnection<Object, V> async) {
async.watch(getName()).addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.hget(getName(), key).addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
final V prev = future.get();
async.multi().addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.hdel(getName(), key).addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.exec().addListener(new FutureListener<List<Object>>() {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
if (future.get().size() == 1) {
promise.setSuccess(prev);
} else {
removeAsync(key, promise, async);
}
}
});
}
});
}
});
}
});
}
});
}
}

@ -15,6 +15,8 @@
*/
package org.redisson.core;
import io.netty.util.concurrent.Future;
import java.util.concurrent.ConcurrentMap;
/**
@ -28,4 +30,10 @@ import java.util.concurrent.ConcurrentMap;
*/
public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable {
Future<V> getAsync(K key);
Future<V> putAsync(K key, V value);
Future<V> removeAsync(K key);
}

@ -1,11 +1,15 @@
package org.redisson;
import io.netty.util.concurrent.Future;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RMap;
public class RedissonMapTest extends BaseTest {
@ -130,7 +134,6 @@ public class RedissonMapTest extends BaseTest {
@Test
public void testLong() {
Redisson redisson = Redisson.create();
Map<Long, Long> map = redisson.getMap("test_long");
map.put(1L, 2L);
map.put(3L, 4L);
@ -141,13 +144,10 @@ public class RedissonMapTest extends BaseTest {
Assert.assertEquals(2L, val.longValue());
Long val2 = map.get(3L);
Assert.assertEquals(4L, val2.longValue());
clear(map, redisson);
}
@Test
public void testNull() {
Redisson redisson = Redisson.create();
Map<Integer, String> map = redisson.getMap("simple12");
map.put(1, null);
map.put(2, null);
@ -161,13 +161,10 @@ public class RedissonMapTest extends BaseTest {
Assert.assertNull(val2);
String val3 = map.get(3);
Assert.assertEquals("43", val3);
clear(map, redisson);
}
@Test
public void testSimpleTypes() {
Redisson redisson = Redisson.create();
Map<Integer, String> map = redisson.getMap("simple12");
map.put(1, "12");
map.put(2, "33");
@ -175,13 +172,10 @@ public class RedissonMapTest extends BaseTest {
String val = map.get(2);
Assert.assertEquals("33", val);
clear(map, redisson);
}
@Test
public void testRemove() {
Redisson redisson = Redisson.create();
Map<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
map.put(new SimpleKey("33"), new SimpleValue("44"));
@ -191,13 +185,10 @@ public class RedissonMapTest extends BaseTest {
map.remove(new SimpleKey("5"));
Assert.assertEquals(1, map.size());
clear(map, redisson);
}
@Test
public void testKeySet() {
Redisson redisson = Redisson.create();
Map<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
map.put(new SimpleKey("33"), new SimpleValue("44"));
@ -205,13 +196,10 @@ public class RedissonMapTest extends BaseTest {
Assert.assertTrue(map.keySet().contains(new SimpleKey("33")));
Assert.assertFalse(map.keySet().contains(new SimpleKey("44")));
clear(map, redisson);
}
@Test
public void testContainsValue() {
Redisson redisson = Redisson.create();
Map<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
map.put(new SimpleKey("33"), new SimpleValue("44"));
@ -220,13 +208,10 @@ public class RedissonMapTest extends BaseTest {
Assert.assertTrue(map.containsValue(new SimpleValue("2")));
Assert.assertFalse(map.containsValue(new SimpleValue("441")));
Assert.assertFalse(map.containsValue(new SimpleKey("5")));
clear(map, redisson);
}
@Test
public void testContainsKey() {
Redisson redisson = Redisson.create();
Map<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
map.put(new SimpleKey("33"), new SimpleValue("44"));
@ -234,13 +219,10 @@ public class RedissonMapTest extends BaseTest {
Assert.assertTrue(map.containsKey(new SimpleKey("33")));
Assert.assertFalse(map.containsKey(new SimpleKey("34")));
clear(map, redisson);
}
@Test
public void testRemoveValue() {
Redisson redisson = Redisson.create();
ConcurrentMap<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
@ -251,13 +233,10 @@ public class RedissonMapTest extends BaseTest {
Assert.assertNull(val1);
Assert.assertEquals(0, map.size());
clear(map, redisson);
}
@Test
public void testRemoveValueFail() {
Redisson redisson = Redisson.create();
ConcurrentMap<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
@ -269,14 +248,11 @@ public class RedissonMapTest extends BaseTest {
SimpleValue val1 = map.get(new SimpleKey("1"));
Assert.assertEquals("2", val1.getValue());
clear(map, redisson);
}
@Test
public void testReplaceOldValueFail() {
Redisson redisson = Redisson.create();
ConcurrentMap<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
@ -285,13 +261,10 @@ public class RedissonMapTest extends BaseTest {
SimpleValue val1 = map.get(new SimpleKey("1"));
Assert.assertEquals("2", val1.getValue());
clear(map, redisson);
}
@Test
public void testReplaceOldValueSuccess() {
Redisson redisson = Redisson.create();
ConcurrentMap<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
@ -303,13 +276,10 @@ public class RedissonMapTest extends BaseTest {
SimpleValue val1 = map.get(new SimpleKey("1"));
Assert.assertEquals("3", val1.getValue());
clear(map, redisson);
}
@Test
public void testReplaceValue() {
Redisson redisson = Redisson.create();
ConcurrentMap<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
@ -318,14 +288,11 @@ public class RedissonMapTest extends BaseTest {
SimpleValue val1 = map.get(new SimpleKey("1"));
Assert.assertEquals("3", val1.getValue());
clear(map, redisson);
}
@Test
public void testReplace() {
Redisson redisson = Redisson.create();
Map<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
map.put(new SimpleKey("33"), new SimpleValue("44"));
@ -337,13 +304,10 @@ public class RedissonMapTest extends BaseTest {
map.put(new SimpleKey("33"), new SimpleValue("abc"));
SimpleValue val2 = map.get(new SimpleKey("33"));
Assert.assertEquals("abc", val2.getValue());
clear(map, redisson);
}
@Test
public void testPutGet() {
Redisson redisson = Redisson.create();
Map<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
map.put(new SimpleKey("33"), new SimpleValue("44"));
@ -354,13 +318,10 @@ public class RedissonMapTest extends BaseTest {
SimpleValue val2 = map.get(new SimpleKey("5"));
Assert.assertEquals("6", val2.getValue());
clear(map, redisson);
}
@Test
public void testSize() {
Redisson redisson = Redisson.create();
Map<SimpleKey, SimpleValue> map = redisson.getMap("simple");
map.put(new SimpleKey("1"), new SimpleValue("2"));
@ -381,8 +342,33 @@ public class RedissonMapTest extends BaseTest {
map.remove(new SimpleKey("3"));
Assert.assertEquals(3, map.size());
}
clear(map, redisson);
@Test
public void testPutAsync() throws InterruptedException, ExecutionException {
RMap<Integer, Integer> map = redisson.getMap("simple");
Future<Integer> future = map.putAsync(2, 3);
Assert.assertNull(future.get());
Assert.assertEquals((Integer)3, map.get(2));
Future<Integer> future1 = map.putAsync(2, 4);
Assert.assertEquals((Integer)3, future1.get());
Assert.assertEquals((Integer)4, map.get(2));
}
@Test
public void testRemoveAsync() throws InterruptedException, ExecutionException {
RMap<Integer, Integer> map = redisson.getMap("simple");
map.put(1, 3);
map.put(3, 5);
map.put(7, 8);
Assert.assertEquals((Integer)3, map.removeAsync(1).get());
Assert.assertEquals((Integer)5, map.removeAsync(3).get());
Assert.assertNull(map.removeAsync(10).get());
Assert.assertEquals((Integer)8, map.removeAsync(7).get());
}
}

Loading…
Cancel
Save