RSortedSet.removeAsync, RSortedSet.addAsync, RSet.removeAsync, RSet.addAsync implemented. #31

pull/38/head
Nikita 11 years ago
parent 3c22b60640
commit c87f728e8b

@ -498,6 +498,10 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
return dispatch(MOVE, new BooleanOutput<K, V>(codec), args);
}
public boolean isMultiMode() {
return multi != null;
}
public Future<String> multi() {
Future<String> cmd = dispatch(MULTI, new StatusOutput<K, V>(codec));
multi = (multi == null ? new MultiOutput<K, V>(codec) : multi);

@ -0,0 +1,61 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import com.lambdaworks.redis.RedisAsyncConnection;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public abstract class OperationListener<V, F> implements FutureListener<F> {
private Promise<V> promise;
private RedisAsyncConnection<Object, V> async;
public OperationListener(Promise<V> promise, RedisAsyncConnection<Object, V> async) {
super();
this.promise = promise;
this.async = async;
}
@Override
public void operationComplete(Future<F> future) throws Exception {
if (isBreak(async, promise, future)) {
return;
}
onOperationComplete(future);
}
public abstract void onOperationComplete(Future<F> future) throws Exception;
protected boolean isBreak(RedisAsyncConnection<Object, V> async, Promise<V> promise, Future<F> future) {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return true;
}
if (promise.isCancelled()) {
if (async.isMultiMode()) {
async.discard();
}
return true;
}
return false;
}
}

@ -16,7 +16,6 @@
package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.util.Collection;
@ -296,61 +295,26 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
private void putAsync(final K key, final V value, final Promise<V> promise,
final RedisAsyncConnection<Object, V> async) {
async.watch(getName()).addListener(new FutureListener<String>() {
async.watch(getName()).addListener(new OperationListener<V, String>(promise, async) {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.hget(getName(), key).addListener(new FutureListener<V>() {
public void onOperationComplete(Future<String> future) throws Exception {
async.hget(getName(), key).addListener(new OperationListener<V, V>(promise, async) {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
public void onOperationComplete(Future<V> future) throws Exception {
final V prev = future.get();
async.multi().addListener(new FutureListener<String>() {
async.multi().addListener(new OperationListener<V, String>(promise, async) {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.hset(getName(), key, value).addListener(new FutureListener<Boolean>() {
public void onOperationComplete(Future<String> future) throws Exception {
async.hset(getName(), key, value).addListener(new OperationListener<V, Boolean>(promise, async) {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.exec().addListener(new FutureListener<List<Object>>() {
public void onOperationComplete(Future<Boolean> future) throws Exception {
async.exec().addListener(new OperationListener<V, List<Object>>(promise, async) {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
public void onOperationComplete(Future<List<Object>> future) throws Exception {
if (future.get().size() == 1) {
promise.setSuccess(prev);
@ -359,7 +323,6 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
}
});
}
});
}
@ -367,6 +330,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
});
}
});
}
@ -382,61 +346,26 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
private void removeAsync(final K key, final Promise<V> promise,
final RedisAsyncConnection<Object, V> async) {
async.watch(getName()).addListener(new FutureListener<String>() {
async.watch(getName()).addListener(new OperationListener<V, String>(promise, async) {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
public void onOperationComplete(Future<String> future) throws Exception {
if (promise.isCancelled()) {
return;
}
async.hget(getName(), key).addListener(new FutureListener<V>() {
async.hget(getName(), key).addListener(new OperationListener<V, V>(promise, async) {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
public void onOperationComplete(Future<V> future) throws Exception {
final V prev = future.get();
async.multi().addListener(new FutureListener<String>() {
async.multi().addListener(new OperationListener<V, String>(promise, async) {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.hdel(getName(), key).addListener(new FutureListener<Long>() {
public void onOperationComplete(Future<String> future) throws Exception {
async.hdel(getName(), key).addListener(new OperationListener<V, Long>(promise, async) {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
async.exec().addListener(new FutureListener<List<Object>>() {
public void onOperationComplete(Future<Long> future) throws Exception {
async.exec().addListener(new OperationListener<V, List<Object>>(promise, async) {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (promise.isCancelled()) {
return;
}
public void onOperationComplete(Future<List<Object>> future) throws Exception {
if (future.get().size() == 1) {
promise.setSuccess(prev);

@ -15,6 +15,10 @@
*/
package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
@ -152,12 +156,50 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
connectionManager.releaseWrite(connection);
}
}
@Override
public Future<Boolean> addAsync(V e) {
final Promise<Boolean> promise = connectionManager.getGroup().next().newPromise();
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
connection.getAsync().sadd(getName(), e).addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(future.get() > 0);
} else {
promise.setFailure(future.cause());
}
}
}).addListener(connectionManager.createReleaseWriteListener(connection));
return promise;
}
@Override
public boolean remove(Object o) {
public Future<Boolean> removeAsync(V value) {
final Promise<Boolean> promise = connectionManager.getGroup().next().newPromise();
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
connection.getAsync().srem(getName(), value).addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(future.get() > 0);
} else {
promise.setFailure(future.cause());
}
}
}).addListener(connectionManager.createReleaseWriteListener(connection));
return promise;
}
@Override
public boolean remove(Object value) {
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
try {
return connection.srem(getName(), o) > 0;
return connection.srem(getName(), value) > 0;
} finally {
connectionManager.releaseWrite(connection);
}

@ -15,6 +15,10 @@
*/
package org.redisson;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
@ -289,7 +293,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public V next() {
if (value == null) {
if (value == null || readNext) {
if (!hasNext()) {
throw new NoSuchElementException("Exhausted iterator");
}
@ -349,6 +353,25 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
connectionManager.releaseWrite(connection);
}
}
public Future<Boolean> addAsync(final V value) {
EventLoop loop = connectionManager.getGroup().next();
final Promise<Boolean> promise = loop.newPromise();
loop.execute(new Runnable() {
@Override
public void run() {
try {
boolean result = add(value);
promise.setSuccess(result);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
return promise;
}
boolean add(V value, RedisConnection<Object, V> connection) {
RedisConnection<Object, Object> simpleConnection = (RedisConnection<Object, Object>)connection;
@ -510,6 +533,26 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
return new NewScore(leftScore, rightScore, score);
}
@Override
public Future<Boolean> removeAsync(final V value) {
EventLoop loop = connectionManager.getGroup().next();
final Promise<Boolean> promise = loop.newPromise();
loop.execute(new Runnable() {
@Override
public void run() {
try {
boolean result = remove(value);
promise.setSuccess(result);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
return promise;
}
@Override
public boolean remove(Object value) {
RedisConnection<Object, V> connection = connectionManager.connectionWriteOp();
@ -727,7 +770,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
return binarySearch(value, connection, 0, upperIndex);
}
public double score(V value, RedisConnection<Object, V> connection, int indexDiff, boolean tail) {
double score(V value, RedisConnection<Object, V> connection, int indexDiff, boolean tail) {
BinarySearchResult<V> res = binarySearch(value, connection);
if (res.getIndex() < 0) {
BinarySearchResult<V> element = getAtIndex(-res.getIndex() + indexDiff, connection);

@ -15,6 +15,8 @@
*/
package org.redisson.core;
import io.netty.util.concurrent.Future;
import java.util.Set;
/**
@ -26,4 +28,8 @@ import java.util.Set;
*/
public interface RSet<V> extends Set<V>, RExpirable {
Future<Boolean> addAsync(V value);
Future<Boolean> removeAsync(V value);
}

@ -15,11 +15,17 @@
*/
package org.redisson.core;
import io.netty.util.concurrent.Future;
import java.util.Comparator;
import java.util.SortedSet;
public interface RSortedSet<V> extends SortedSet<V>, RObject {
Future<Boolean> addAsync(V value);
Future<Boolean> removeAsync(V value);
/**
* Sets new comparator only if current set is empty
*

@ -1,5 +1,7 @@
package org.redisson;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@ -7,14 +9,45 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RSet;
import org.redisson.core.RSortedSet;
public class RedissonSetTest extends BaseTest {
@Test
public void testAddAsync() throws InterruptedException, ExecutionException {
RSet<Integer> set = redisson.getSet("simple");
Future<Boolean> future = set.addAsync(2);
Assert.assertTrue(future.get());
Assert.assertTrue(set.contains(2));
}
@Test
public void testRemoveAsync() throws InterruptedException, ExecutionException {
RSet<Integer> set = redisson.getSet("simple");
set.add(1);
set.add(3);
set.add(7);
Assert.assertTrue(set.removeAsync(1).get());
Assert.assertFalse(set.contains(1));
Assert.assertThat(set, Matchers.containsInAnyOrder(3, 7));
Assert.assertFalse(set.removeAsync(1).get());
Assert.assertThat(set, Matchers.containsInAnyOrder(3, 7));
set.removeAsync(3).get();
Assert.assertFalse(set.contains(3));
Assert.assertThat(set, Matchers.contains(7));
}
@Test
public void testIteratorRemove() {
Set<String> list = redisson.getSet("list");

@ -1,5 +1,7 @@
package org.redisson;
import io.netty.util.concurrent.Future;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -7,6 +9,7 @@ import java.util.Iterator;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
@ -16,6 +19,46 @@ import org.redisson.core.RSortedSet;
public class RedissonSortedSetTest extends BaseTest {
@Test
public void testAddAsync() throws InterruptedException, ExecutionException {
RSortedSet<Integer> set = redisson.getSortedSet("simple");
Future<Boolean> future = set.addAsync(2);
Assert.assertTrue(future.get());
Assert.assertTrue(set.contains(2));
}
@Test
public void testRemoveAsync() throws InterruptedException, ExecutionException {
RSortedSet<Integer> set = redisson.getSortedSet("simple");
set.add(1);
set.add(3);
set.add(7);
Assert.assertTrue(set.removeAsync(1).get());
Assert.assertFalse(set.contains(1));
Assert.assertThat(set, Matchers.contains(3, 7));
Assert.assertFalse(set.removeAsync(1).get());
Assert.assertThat(set, Matchers.contains(3, 7));
set.removeAsync(3).get();
Assert.assertFalse(set.contains(3));
Assert.assertThat(set, Matchers.contains(7));
}
@Test
public void testIteratorNextNext() {
RSortedSet<String> list = redisson.getSortedSet("simple");
list.add("1");
list.add("4");
Iterator<String> iter = list.iterator();
Assert.assertEquals("1", iter.next());
Assert.assertEquals("4", iter.next());
Assert.assertFalse(iter.hasNext());
}
@Test
public void testIteratorRemove() {
RSortedSet<String> list = redisson.getSortedSet("list");

Loading…
Cancel
Save