Using promises instead of CountDownLatch. #6, #5

pull/25/head
Nikita 11 years ago
parent 4ffeca0ea5
commit c18df16636

@ -30,6 +30,9 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@ -39,7 +42,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -94,6 +96,7 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
private String password;
private int db;
private boolean closed;
private EventLoopGroup eventLoopGroup;
/**
* Initialize a new connection.
@ -102,12 +105,14 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
* @param codec Codec used to encode/decode keys and values.
* @param timeout Maximum time to wait for a response.
* @param unit Unit of time for the timeout.
* @param eventLoopGroup
*/
public RedisAsyncConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
public RedisAsyncConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit, EventLoopGroup eventLoopGroup) {
this.queue = queue;
this.codec = codec;
this.timeout = timeout;
this.unit = unit;
this.eventLoopGroup = eventLoopGroup;
}
/**
@ -127,7 +132,7 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
public String auth(String password) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(password);
Command<K, V, String> cmd = dispatch(AUTH, new StatusOutput<K, V>(codec), args);
Future<String> cmd = dispatch(AUTH, new StatusOutput<K, V>(codec), args);
String status = await(cmd, timeout, unit);
if ("OK".equals(status)) this.password = password;
return status;
@ -489,7 +494,7 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
}
public Future<String> multi() {
Command<K, V, String> cmd = dispatch(MULTI, new StatusOutput<K, V>(codec));
Future<String> cmd = dispatch(MULTI, new StatusOutput<K, V>(codec));
multi = (multi == null ? new MultiOutput<K, V>(codec) : multi);
return cmd;
}
@ -638,7 +643,7 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
public String select(int db) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(db);
Command<K, V, String> cmd = dispatch(SELECT, new StatusOutput<K, V>(codec), args);
Future<String> cmd = dispatch(SELECT, new StatusOutput<K, V>(codec), args);
String status = await(cmd, timeout, unit);
if ("OK".equals(status)) this.db = db;
return status;
@ -984,6 +989,21 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
return dispatch(ZUNIONSTORE, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> pfadd(K key, V... values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addValues(values);
return dispatch(PFADD, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> pfcount(K key, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKeys(keys);
return dispatch(PFCOUNT, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> pfmerge(K destkey, K... sourceKeys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(destkey).addKeys(sourceKeys);
return dispatch(PFADD, new IntegerOutput<K, V>(codec), args);
}
/**
* Wait until commands are complete or the connection timeout is reached.
*
@ -1059,19 +1079,19 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
if (password != null) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(password);
tmp.add(new Command<K, V, String>(AUTH, new StatusOutput<K, V>(codec), args, false));
tmp.add(new Command<K, V, String>(AUTH, new StatusOutput<K, V>(codec), args, false, ctx.executor().<String>newPromise()));
}
if (db != 0) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(db);
tmp.add(new Command<K, V, String>(SELECT, new StatusOutput<K, V>(codec), args, false));
tmp.add(new Command<K, V, String>(SELECT, new StatusOutput<K, V>(codec), args, false, ctx.executor().<String>newPromise()));
}
tmp.addAll(queue);
queue.clear();
for (Command<K, V, ?> cmd : tmp) {
if (!cmd.isCancelled()) {
if (!cmd.getProimse().isCancelled()) {
queue.add(cmd);
channel.writeAndFlush(cmd);
}
@ -1095,27 +1115,28 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
}
}
public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output) {
public <T> Future<T> dispatch(CommandType type, CommandOutput<K, V, T> output) {
return dispatch(type, output, (CommandArgs<K, V>) null);
}
public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key) {
public <T> Future<T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
return dispatch(type, output, args);
}
public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key, V value) {
public <T> Future<T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key, V value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addValue(value);
return dispatch(type, output, args);
}
public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key, V[] values) {
public <T> Future<T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key, V[] values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addValues(values);
return dispatch(type, output, args);
}
public synchronized <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
Command<K, V, T> cmd = new Command<K, V, T>(type, output, args, multi != null);
public synchronized <T> Future<T> dispatch(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
Promise<T> promise = eventLoopGroup.next().<T>newPromise();
Command<K, V, T> cmd = new Command<K, V, T>(type, output, args, multi != null, promise);
try {
if (multi != null) {
@ -1133,17 +1154,21 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
throw new RedisCommandInterruptedException(e);
}
return cmd;
if (multi != null && type != MULTI) {
return eventLoopGroup.next().newSucceededFuture(null);
}
return cmd.getProimse();
}
public <T> T await(Command<K, V, T> cmd, long timeout, TimeUnit unit) {
if (!cmd.await(timeout, unit)) {
public <T> T await(Future<T> cmd, long timeout, TimeUnit unit) {
if (!cmd.awaitUninterruptibly(timeout, unit)) {
cmd.cancel(true);
throw new RedisException("Command timed out");
}
CommandOutput<K, V, T> output = cmd.getOutput();
if (output.hasError()) throw new RedisException(output.getError());
return output.get();
if (!cmd.isSuccess()) {
throw (RedisException) cmd.cause();
}
return cmd.getNow();
}
@SuppressWarnings("unchecked")

@ -137,7 +137,7 @@ public class RedisClient {
BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();
CommandHandler<K, V> handler = new CommandHandler<K, V>(queue);
RedisAsyncConnection<K, V> connection = new RedisAsyncConnection<K, V>(queue, codec, timeout, unit);
RedisAsyncConnection<K, V> connection = new RedisAsyncConnection<K, V>(queue, codec, timeout, unit, bootstrap.group());
return connect(handler, connection);
}
@ -154,7 +154,7 @@ public class RedisClient {
BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();
PubSubCommandHandler<K, V> handler = new PubSubCommandHandler<K, V>(queue, codec);
RedisPubSubConnection<K, V> connection = new RedisPubSubConnection<K, V>(queue, codec, timeout, unit);
RedisPubSubConnection<K, V> connection = new RedisPubSubConnection<K, V>(queue, codec, timeout, unit, bootstrap.group());
return connect(handler, connection);
}

@ -5,8 +5,9 @@ package com.lambdaworks.redis;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import io.netty.util.concurrent.Future;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static com.lambdaworks.redis.protocol.CommandType.MULTI;
@ -210,6 +211,18 @@ public class RedisConnection<K, V> {
return (T) await(c.evalsha(digest, type, keys, values));
}
public Long pfadd(K key, V... values) {
return await(c.pfadd(key, values));
}
public Long pfcount(K key, K... keys) {
return await(c.pfcount(key, keys));
}
public Long pfmerge(K destkey, K... sourceKeys) {
return await(c.pfmerge(destkey, sourceKeys));
}
public Boolean exists(K key) {
return await(c.exists(key));
}
@ -818,16 +831,11 @@ public class RedisConnection<K, V> {
return c.digest(script);
}
@SuppressWarnings("unchecked")
private <T> T await(Future<T> future, long timeout, TimeUnit unit) {
Command<K, V, T> cmd = (Command<K, V, T>) future;
return c.await(cmd, timeout, unit);
return c.await(future, timeout, unit);
}
@SuppressWarnings("unchecked")
private <T> T await(Future<T> future) {
Command<K, V, T> cmd = (Command<K, V, T>) future;
if (c.multi != null && cmd.type != MULTI) return null;
return c.await(cmd, timeout, unit);
return c.await(future, timeout, unit);
}
}

@ -2,10 +2,10 @@
package com.lambdaworks.redis.protocol;
import com.lambdaworks.redis.RedisCommandInterruptedException;
import io.netty.buffer.ByteBuf;
import com.lambdaworks.redis.RedisException;
import java.util.concurrent.*;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Promise;
/**
* A redis command and its result. All successfully executed commands will
@ -15,13 +15,14 @@ import java.util.concurrent.*;
*
* @author Will Glozer
*/
public class Command<K, V, T> implements Future<T> {
public class Command<K, V, T> /*implements Future<T>*/ {
private static final byte[] CRLF = "\r\n".getBytes(Charsets.ASCII);
private final Promise<T> proimse;
public final CommandType type;
protected CommandArgs<K, V> args;
protected CommandOutput<K, V, T> output;
protected CountDownLatch latch;
protected final CommandOutput<K, V, T> output;
protected int completeAmount;
/**
* Create a new command with the supplied type and args.
@ -31,106 +32,16 @@ public class Command<K, V, T> implements Future<T> {
* @param args Command args, if any.
* @param multi Flag indicating if MULTI active.
*/
public Command(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args, boolean multi) {
public Command(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args, boolean multi, Promise<T> proimse) {
this.type = type;
this.output = output;
this.args = args;
this.latch = new CountDownLatch(multi ? 2 : 1);
this.completeAmount = multi ? 2 : 1;
this.proimse = proimse;
}
/**
* Cancel the command and notify any waiting consumers. This does
* not cause the redis server to stop executing the command.
*
* @param ignored Ignored parameter.
*
* @return true if the command was cancelled.
*/
@Override
public boolean cancel(boolean ignored) {
boolean cancelled = false;
if (latch.getCount() == 1) {
latch.countDown();
output = null;
cancelled = true;
}
return cancelled;
}
/**
* Check if the command has been cancelled.
*
* @return True if the command was cancelled.
*/
@Override
public boolean isCancelled() {
return latch.getCount() == 0 && output == null;
}
/**
* Check if the command has completed.
*
* @return true if the command has completed.
*/
@Override
public boolean isDone() {
return latch.getCount() == 0;
}
/**
* Get the command output and if the command hasn't completed
* yet, wait until it does.
*
* @return The command output.
*/
@Override
public T get() {
try {
latch.await();
return output.get();
} catch (InterruptedException e) {
throw new RedisCommandInterruptedException(e);
}
}
/**
* Get the command output and if the command hasn't completed yet,
* wait up to the specified time until it does.
*
* @param timeout Maximum time to wait for a result.
* @param unit Unit of time for the timeout.
*
* @return The command output.
*
* @throws TimeoutException if the wait timed out.
*/
@Override
public T get(long timeout, TimeUnit unit) throws TimeoutException {
try {
if (!latch.await(timeout, unit)) {
throw new TimeoutException("Command timed out");
}
} catch (InterruptedException e) {
throw new RedisCommandInterruptedException(e);
}
return output.get();
}
/**
* Wait up to the specified time for the command output to become
* available.
*
* @param timeout Maximum time to wait for a result.
* @param unit Unit of time for the timeout.
*
* @return true if the output became available.
*/
public boolean await(long timeout, TimeUnit unit) {
try {
return latch.await(timeout, unit);
} catch (InterruptedException e) {
throw new RedisCommandInterruptedException(e);
}
public Promise<T> getProimse() {
return proimse;
}
/**
@ -146,7 +57,17 @@ public class Command<K, V, T> implements Future<T> {
* Mark this command complete and notify all waiting threads.
*/
public void complete() {
latch.countDown();
completeAmount--;
if (completeAmount == 0) {
Object res = output.get();
if (res instanceof RedisException) {
proimse.setFailure((Exception)res);
} if (output.hasError()) {
proimse.setFailure(new RedisException(output.getError()));
} else {
proimse.setSuccess((T)res);
}
}
}
/**

@ -8,6 +8,7 @@ import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Array;
import java.util.*;
@ -43,9 +44,10 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
* @param codec Codec used to encode/decode keys and values.
* @param timeout Maximum time to wait for a responses.
* @param unit Unit of time for the timeout.
* @param eventLoopGroup
*/
public RedisPubSubConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
super(queue, codec, timeout, unit);
public RedisPubSubConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit, EventLoopGroup eventLoopGroup) {
super(queue, codec, timeout, unit, eventLoopGroup);
channels = new HashSet<K>();
patterns = new HashSet<K>();
}

@ -17,6 +17,7 @@ package org.redisson.connection;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.FutureListener;
import java.net.URI;
import java.util.ArrayList;
@ -69,6 +70,15 @@ public class ConnectionManager {
this.config = config;
}
public <T> FutureListener<T> createListener(final RedisConnection conn) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
release(conn);
}
};
}
public <K, V> RedisConnection<K, V> connectionWriteOp() {
return connectionReadOp();
}

Loading…
Cancel
Save