lettuce sources removed. #183

pull/243/head
Nikita 10 years ago
parent 16e770cd46
commit 18febb690e

@ -1,62 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.codec;
/**
* High-performance base16 (AKA hex) codec.
*
* @author Will Glozer
*/
public class Base16 {
private static final char[] upper = "0123456789ABCDEF".toCharArray();
private static final char[] lower = "0123456789abcdef".toCharArray();
private static final byte[] decode = new byte[128];
static {
for (int i = 0; i < 10; i++) {
decode['0' + i] = (byte) i;
decode['A' + i] = (byte) (10 + i);
decode['a' + i] = (byte) (10 + i);
}
}
/**
* Encode bytes to base16 chars.
*
* @param src Bytes to encode.
* @param upper Use upper or lowercase chars.
*
* @return Encoded chars.
*/
public static char[] encode(byte[] src, boolean upper) {
char[] table = upper ? Base16.upper : Base16.lower;
char[] dst = new char[src.length * 2];
for (int si = 0, di = 0; si < src.length; si++) {
byte b = src[si];
dst[di++] = table[(b & 0xf0) >>> 4];
dst[di++] = table[(b & 0x0f)];
}
return dst;
}
/**
* Decode base16 chars to bytes.
*
* @param src Chars to decode.
*
* @return Decoded bytes.
*/
public static byte[] decode(char[] src) {
byte[] dst = new byte[src.length / 2];
for (int si = 0, di = 0; di < dst.length; di++) {
byte high = decode[src[si++] & 0x7f];
byte low = decode[src[si++] & 0x7f];
dst[di] = (byte) ((high << 4) + low);
}
return dst;
}
}

@ -1,35 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis;
/**
* A key-value pair.
*
* @author Will Glozer
*/
public class KeyValue<K, V> {
public final K key;
public final V value;
public KeyValue(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
KeyValue<?, ?> that = (KeyValue<?, ?>) o;
return key.equals(that.key) && value.equals(that.value);
}
@Override
public int hashCode() {
return 31 * key.hashCode() + value.hashCode();
}
@Override
public String toString() {
return String.format("(%s, %s)", key, value);
}
}

@ -1,223 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.codec.Utf8StringCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandHandler;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.pubsub.PubSubCommandHandler;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
/**
* A scalable thread-safe <a href="http://redis.io/">Redis</a> client. Multiple threads
* may share one connection provided they avoid blocking and transactional operations
* such as BLPOP and MULTI/EXEC.
*
* @author Will Glozer
*/
public class RedisClient {
private Bootstrap bootstrap;
private ChannelGroup channels;
private long timeout;
private TimeUnit unit;
private InetSocketAddress addr;
/**
* Create a new client that connects to the supplied host on the default port.
*
* @param host Server hostname.
*/
public RedisClient(EventLoopGroup group, String host) {
this(group, host, 6379, 60000);
}
/**
* Create a new client that connects to the supplied host and port. Connection
* attempts and non-blocking commands will {@link #setDefaultTimeout timeout}
* after 60 seconds.
*
* @param host Server hostname.
* @param port Server port.
*/
public RedisClient(EventLoopGroup group, String host, int port, int timeout) {
this(group, NioSocketChannel.class, host, port, timeout);
}
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host,
int port, int timeout) {
addr = new InetSocketAddress(host, port);
bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
setDefaultTimeout(timeout, TimeUnit.MILLISECONDS);
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
/**
* Set the default timeout for {@link RedisConnection connections} created by
* this client. The timeout applies to connection attempts and non-blocking
* commands.
*
* @param timeout Default connection timeout.
* @param unit Unit of time for the timeout.
*/
public void setDefaultTimeout(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) unit.toMillis(timeout));
}
/**
* Open a new synchronous connection to the redis server that treats
* keys and values as UTF-8 strings.
*
* @return A new connection.
*/
public RedisConnection<String, String> connect() {
return connect(new Utf8StringCodec());
}
/**
* Open a new asynchronous connection to the redis server that treats
* keys and values as UTF-8 strings.
*
* @return A new connection.
*/
public RedisAsyncConnection<String, String> connectAsync() {
return connectAsync(new Utf8StringCodec());
}
/**
* Open a new pub/sub connection to the redis server that treats
* keys and values as UTF-8 strings.
*
* @return A new connection.
*/
public RedisPubSubConnection<String, String> connectPubSub() {
return connectPubSub(new Utf8StringCodec());
}
/**
* Open a new synchronous connection to the redis server. Use the supplied
* {@link RedisCodec codec} to encode/decode keys and values.
*
* @param codec Use this codec to encode/decode keys and values.
*
* @return A new connection.
*/
public <K, V> RedisConnection<K, V> connect(RedisCodec<K, V> codec) {
return new RedisConnection<K, V>(connectAsync(codec));
}
/**
* Open a new asynchronous connection to the redis server. Use the supplied
* {@link RedisCodec codec} to encode/decode keys and values.
*
* @param codec Use this codec to encode/decode keys and values.
*
* @return A new connection.
*/
public <K, V> RedisAsyncConnection<K, V> connectAsync(RedisCodec<K, V> codec) {
BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();
CommandHandler<K, V> handler = new CommandHandler<K, V>(queue);
RedisAsyncConnection<K, V> connection = new RedisAsyncConnection<K, V>(this, queue, codec, timeout, unit, bootstrap.group());
return connect(handler, connection);
}
/**
* Open a new pub/sub connection to the redis server. Use the supplied
* {@link RedisCodec codec} to encode/decode keys and values.
*
* @param codec Use this codec to encode/decode keys and values.
*
* @return A new pub/sub connection.
*/
public <K, V> RedisPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> codec) {
BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();
PubSubCommandHandler<K, V> handler = new PubSubCommandHandler<K, V>(queue, codec);
RedisPubSubConnection<K, V> connection = new RedisPubSubConnection<K, V>(this, queue, codec, timeout, unit, bootstrap.group());
return connect(handler, connection);
}
private <K, V, T extends RedisAsyncConnection<K, V>> T connect(final CommandHandler<K, V> handler, final T connection) {
try {
final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, channels);
ChannelFuture connect = null;
// TODO use better concurrent workaround
synchronized (bootstrap) {
connect = bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(watchdog, handler, connection);
}
}).connect();
}
connect.sync();
connection.setReconnect(true);
return connection;
} catch (Throwable e) {
throw new RedisConnectionException("Unable to connect " + addr, e);
}
}
/**
* Shutdown this client and close all open connections. The client should be
* discarded after calling shutdown.
*/
public void shutdown() {
ChannelGroupFuture future = shutdownAsync();
future.awaitUninterruptibly();
}
public ChannelGroupFuture shutdownAsync() {
bootstrap.attr(ConnectionWatchdog.SHUTDOWN_KEY, true);
for (Channel c : channels) {
ChannelPipeline pipeline = c.pipeline();
RedisAsyncConnection<?, ?> connection = pipeline.get(RedisAsyncConnection.class);
connection.close();
}
return channels.close();
}
public InetSocketAddress getAddr() {
return addr;
}
@Override
public String toString() {
return "RedisClient [addr=" + addr + "]";
}
}

@ -1,16 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis;
/**
* Exception thrown when the thread executing a redis command is
* interrupted.
*
* @author Will Glozer
*/
@SuppressWarnings("serial")
public class RedisCommandInterruptedException extends RedisException {
public RedisCommandInterruptedException(Throwable e) {
super("Command interrupted", e);
}
}

@ -1,871 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis;
import static java.lang.Math.max;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.util.concurrent.Future;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.lambdaworks.redis.output.ListScanResult;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
/**
* A synchronous thread-safe connection to a redis server. Multiple threads may
* share one {@link RedisConnection} provided they avoid blocking and transactional
* operations such as {@link #blpop} and {@link #multi()}/{@link #exec}.
*
* A {@link ConnectionWatchdog} monitors each connection and reconnects
* automatically until {@link #close} is called. All pending commands will be
* (re)sent after successful reconnection.
*
* @author Will Glozer
*/
public class RedisConnection<K, V> {
protected RedisAsyncConnection<K, V> c;
protected long timeout;
protected TimeUnit unit;
public RedisClient getRedisClient() {
return c.getRedisClient();
}
/**
* Initialize a new connection.
*
* @param c Underlying async connection.
*/
public RedisConnection(RedisAsyncConnection<K, V> c) {
this.c = c;
this.timeout = c.timeout;
this.unit = c.unit;
}
/**
* Set the command timeout for this connection.
*
* @param timeout Command timeout.
* @param unit Unit of time for the timeout.
*/
public void setTimeout(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
c.setTimeout(timeout, unit);
}
public Long append(K key, V value) {
return await(c.append(key, value));
}
public String auth(String password) {
return c.auth(password);
}
public String bgrewriteaof() {
return await(c.bgrewriteaof());
}
public String bgsave() {
return await(c.bgsave());
}
public Long bitcount(K key) {
return await(c.bitcount(key));
}
public Long bitcount(K key, long start, long end) {
return await(c.bitcount(key, start, end));
}
public Long bitopAnd(K destination, K... keys) {
return await(c.bitopAnd(destination, keys));
}
public Long bitopNot(K destination, K source) {
return await(c.bitopNot(destination, source));
}
public Long bitopOr(K destination, K... keys) {
return await(c.bitopOr(destination, keys));
}
public Long bitopXor(K destination, K... keys) {
return await(c.bitopXor(destination, keys));
}
public KeyValue<K, V> blpop(long timeout, K... keys) throws InterruptedException {
long timeout2 = (timeout == 0 ? Long.MAX_VALUE : max(timeout, unit.toSeconds(this.timeout)));
return awaitInterruptibly(c.blpop(timeout, keys), timeout2, SECONDS);
}
public KeyValue<K, V> brpop(long timeout, K... keys) {
long timeout2 = (timeout == 0 ? Long.MAX_VALUE : max(timeout, unit.toSeconds(this.timeout)));
return await(c.brpop(timeout, keys), timeout2, SECONDS);
}
public V brpoplpush(long timeout, K source, K destination) throws InterruptedException {
long timeout2 = (timeout == 0 ? Long.MAX_VALUE : max(timeout, unit.toSeconds(this.timeout)));
return awaitInterruptibly(c.brpoplpush(timeout, source, destination), timeout2, SECONDS);
}
public K clientGetname() {
return await(c.clientGetname());
}
public String clientSetname(K name) {
return await(c.clientSetname(name));
}
public String clientKill(String addr) {
return await(c.clientKill(addr));
}
public String clientList() {
return await(c.clientList());
}
public List<String> configGet(String parameter) {
return await(c.configGet(parameter));
}
public String configResetstat() {
return await(c.configResetstat());
}
public String configSet(String parameter, String value) {
return await(c.configSet(parameter, value));
}
public Long dbsize() {
return await(c.dbsize());
}
public String debugObject(K key) {
return await(c.debugObject(key));
}
public Long decr(K key) {
return await(c.decr(key));
}
public Long decrby(K key, long amount) {
return await(c.decrby(key, amount));
}
public Long del(K... keys) {
return await(c.del(keys));
}
public String discard() {
return await(c.discard());
}
public byte[] dump(K key) {
return await(c.dump(key));
}
public V echo(V msg) {
return await(c.echo(msg));
}
/**
* Eval the supplied script, which must result in the requested
* {@link ScriptOutputType type}.
*
* @param script Lua script to evaluate.
* @param type Script output type.
* @param keys Redis keys to pass to script.
*
* @param <T> Expected return type.
*
* @return The result of evaluating the script.
*/
@SuppressWarnings("unchecked")
public <T> T eval(V script, ScriptOutputType type, K... keys) {
return (T) await(c.eval(script, type, keys, (V[]) new Object[0]));
}
@SuppressWarnings("unchecked")
public <T> T eval(V script, ScriptOutputType type, K[] keys, V... values) {
return (T) await(c.eval(script, type, keys, values));
}
/**
* Eval a pre-loaded script identified by its SHA-1 digest, which must result
* in the requested {@link ScriptOutputType type}.
*
* @param digest Lowercase hex string of script's SHA-1 digest.
* @param type Script output type.
* @param keys Redis keys to pass to script.
*
* @param <T> Expected return type.
*
* @return The result of evaluating the script.
*/
@SuppressWarnings("unchecked")
public <T> T evalsha(String digest, ScriptOutputType type, K... keys) {
return (T) await(c.evalsha(digest, type, keys, (V[]) new Object[0]));
}
@SuppressWarnings("unchecked")
public <T> T evalsha(String digest, ScriptOutputType type, K[] keys, V... values) {
return (T) await(c.evalsha(digest, type, keys, values));
}
public Long pfadd(K key, V... values) {
return await(c.pfadd(key, values));
}
public Long pfcount(K key, K... keys) {
return await(c.pfcount(key, keys));
}
public Long pfmerge(K destkey, K... sourceKeys) {
return await(c.pfmerge(destkey, sourceKeys));
}
public Boolean exists(K key) {
return await(c.exists(key));
}
public Boolean expire(K key, long seconds) {
return await(c.expire(key, seconds));
}
public Boolean expireat(K key, Date timestamp) {
return await(c.expireat(key, timestamp));
}
public Boolean expireat(K key, long timestamp) {
return await(c.expireat(key, timestamp));
}
public List<Object> exec() {
return await(c.exec());
}
public String flushall() {
return await(c.flushall());
}
public String flushdb() {
return await(c.flushdb());
}
public V get(K key) {
return await(c.get(key));
}
public Long getbit(K key, long offset) {
return await(c.getbit(key, offset));
}
public V getrange(K key, long start, long end) {
return await(c.getrange(key, start, end));
}
public V getset(K key, V value) {
return await(c.getset(key, value));
}
public Long hdel(K key, K... fields) {
return await(c.hdel(key, fields));
}
public Boolean hexists(K key, K field) {
return await(c.hexists(key, field));
}
public V hget(K key, K field) {
return await(c.hget(key, field));
}
public Long hincrby(K key, K field, long amount) {
return await(c.hincrby(key, field, amount));
}
public String hincrbyfloat(K key, K field, String amount) {
return await(c.hincrbyfloat(key, field, amount));
}
public Map<K, V> hgetall(K key) {
return await(c.hgetall(key));
}
public Set<K> hkeys(K key) {
return await(c.hkeys(key));
}
public Long hlen(K key) {
return await(c.hlen(key));
}
public List<V> hmget(K key, K... fields) {
return await(c.hmget(key, fields));
}
public String hmset(K key, Map<K, V> map) {
return await(c.hmset(key, map));
}
public Boolean hset(K key, K field, V value) {
return await(c.hset(key, field, value));
}
public Boolean hsetnx(K key, K field, V value) {
return await(c.hsetnx(key, field, value));
}
public List<V> hvals(K key) {
return await(c.hvals(key));
}
public Long incr(K key) {
return await(c.incr(key));
}
public Long incrby(K key, long amount) {
return await(c.incrby(key, amount));
}
public String incrbyfloat(K key, String amount) {
return await(c.incrbyfloat(key, amount));
}
public String info() {
return await(c.info());
}
public String info(String section) {
return await(c.info(section));
}
public List<K> keys(K pattern) {
return await(c.keys(pattern));
}
public Date lastsave() {
return await(c.lastsave());
}
public V lindex(K key, long index) {
return await(c.lindex(key, index));
}
public Long linsert(K key, boolean before, V pivot, V value) {
return await(c.linsert(key, before, pivot, value));
}
public Long llen(K key) {
return await(c.llen(key));
}
public V lpop(K key) {
return await(c.lpop(key));
}
public Long lpush(K key, V... values) {
return await(c.lpush(key, values));
}
public Long lpushx(K key, V value) {
return await(c.lpushx(key, value));
}
public List<V> lrange(K key, long start, long stop) {
return await(c.lrange(key, start, stop));
}
public Long lrem(K key, long count, V value) {
return await(c.lrem(key, count, value));
}
public String lset(K key, long index, V value) {
return await(c.lset(key, index, value));
}
public String ltrim(K key, long start, long stop) {
return await(c.ltrim(key, start, stop));
}
public String migrate(String host, int port, K key, int db, long timeout) {
return await(c.migrate(host, port, key, db, timeout));
}
public List<V> mget(K... keys) {
return await(c.mget(keys));
}
public Boolean move(K key, int db) {
return await(c.move(key, db));
}
public String multi() {
return await(c.multi());
}
public String mset(Map<K, V> map) {
return await(c.mset(map));
}
public Boolean msetnx(Map<K, V> map) {
return await(c.msetnx(map));
}
public String objectEncoding(K key) {
return await(c.objectEncoding(key));
}
public Long objectIdletime(K key) {
return await(c.objectIdletime(key));
}
public Long objectRefcount(K key) {
return await(c.objectRefcount(key));
}
public Boolean persist(K key) {
return await(c.persist(key));
}
public Boolean pexpire(K key, long milliseconds) {
return await(c.pexpire(key, milliseconds));
}
public Boolean pexpireat(K key, Date timestamp) {
return await(c.pexpireat(key, timestamp));
}
public Boolean pexpireat(K key, long timestamp) {
return await(c.pexpireat(key, timestamp));
}
public String ping() {
return await(c.ping());
}
public Long pttl(K key) {
return await(c.pttl(key));
}
public Long publish(K channel, V message) {
return await(c.publish(channel, message));
}
public String quit() {
return await(c.quit());
}
public V randomkey() {
return await(c.randomkey());
}
public String rename(K key, K newKey) {
return await(c.rename(key, newKey));
}
public Boolean renamenx(K key, K newKey) {
return await(c.renamenx(key, newKey));
}
public String restore(K key, long ttl, byte[] value) {
return await(c.restore(key, ttl, value));
}
public V rpop(K key) {
return await(c.rpop(key));
}
public V rpoplpush(K source, K destination) {
return await(c.rpoplpush(source, destination));
}
public Long rpush(K key, V... values) {
return await(c.rpush(key, values));
}
public Long rpushx(K key, V value) {
return await(c.rpushx(key, value));
}
public Long sadd(K key, V... members) {
return await(c.sadd(key, members));
}
public String save() {
return await(c.save());
}
public Long scard(K key) {
return await(c.scard(key));
}
public List<Boolean> scriptExists(String... digests) {
return await(c.scriptExists(digests));
}
public String scriptFlush() {
return await(c.scriptFlush());
}
public String scriptKill() {
return await(c.scriptKill());
}
public String scriptLoad(V script) {
return await(c.scriptLoad(script));
}
public Set<V> sdiff(K... keys) {
return await(c.sdiff(keys));
}
public Long sdiffstore(K destination, K... keys) {
return await(c.sdiffstore(destination, keys));
}
public String select(int db) {
return c.select(db);
}
public String set(K key, V value) {
return await(c.set(key, value));
}
public Long setbit(K key, long offset, int value) {
return await(c.setbit(key, offset, value));
}
public String setex(K key, long seconds, V value) {
return await(c.setex(key, seconds, value));
}
public String psetex(K key, long millis, V value) {
return await(c.psetex(key, millis, value));
}
public Boolean setnx(K key, V value) {
return await(c.setnx(key, value));
}
public String setexnx(K key, V value, long millis) {
return await(c.setexnx(key, value, millis));
}
public Long setrange(K key, long offset, V value) {
return await(c.setrange(key, offset, value));
}
@Deprecated
public void shutdown() {
c.shutdown();
}
public void shutdown(boolean save) {
c.shutdown(save);
}
public Set<V> sinter(K... keys) {
return await(c.sinter(keys));
}
public Long sinterstore(K destination, K... keys) {
return await(c.sinterstore(destination, keys));
}
public Boolean sismember(K key, V member) {
return await(c.sismember(key, member));
}
public Boolean smove(K source, K destination, V member) {
return await(c.smove(source, destination, member));
}
public String slaveof(String host, int port) {
return await(c.slaveof(host, port));
}
public String slaveofNoOne() {
return await(c.slaveofNoOne());
}
public List<Object> slowlogGet() {
return await(c.slowlogGet());
}
public List<Object> slowlogGet(int count) {
return await(c.slowlogGet(count));
}
public Long slowlogLen() {
return await(c.slowlogLen());
}
public String slowlogReset() {
return await(c.slowlogReset());
}
public Set<V> smembers(K key) {
return await(c.smembers(key));
}
public List<V> sort(K key) {
return await(c.sort(key));
}
public List<V> sort(K key, SortArgs sortArgs) {
return await(c.sort(key, sortArgs));
}
public Long sortStore(K key, SortArgs sortArgs, K destination) {
return await(c.sortStore(key, sortArgs, destination));
}
public V spop(K key) {
return await(c.spop(key));
}
public V srandmember(K key) {
return await(c.srandmember(key));
}
public Set<V> srandmember(K key, long count) {
return await(c.srandmember(key, count));
}
public Long srem(K key, V... members) {
return await(c.srem(key, members));
}
public Set<V> sunion(K... keys) {
return await(c.sunion(keys));
}
public Long sunionstore(K destination, K... keys) {
return await(c.sunionstore(destination, keys));
}
public String sync() {
return await(c.sync());
}
public Long strlen(K key) {
return await(c.strlen(key));
}
public Long ttl(K key) {
return await(c.ttl(key));
}
public String type(K key) {
return await(c.type(key));
}
public String watch(K... keys) {
return await(c.watch(keys));
}
public String unwatch() {
return await(c.unwatch());
}
public Long zadd(K key, double score, V member) {
return await(c.zadd(key, score, member));
}
public Long zadd(K key, Object... scoresAndValues) {
return await(c.zadd(key, scoresAndValues));
}
public Long zcard(K key) {
return await(c.zcard(key));
}
public Long zcount(K key, double min, double max) {
return await(c.zcount(key, min, max));
}
public Long zcount(K key, String min, String max) {
return await(c.zcount(key, min, max));
}
public Double zincrby(K key, double amount, K member) {
return await(c.zincrby(key, amount, member));
}
public Long zinterstore(K destination, K... keys) {
return await(c.zinterstore(destination, keys));
}
public Long zinterstore(K destination, ZStoreArgs storeArgs, K... keys) {
return await(c.zinterstore(destination, storeArgs, keys));
}
public List<V> zrange(K key, long start, long stop) {
return await(c.zrange(key, start, stop));
}
public List<ScoredValue<V>> zrangeWithScores(K key, long start, long stop) {
return await(c.zrangeWithScores(key, start, stop));
}
public List<V> zrangebyscore(K key, double min, double max) {
return await(c.zrangebyscore(key, min, max));
}
public List<V> zrangebyscore(K key, String min, String max) {
return await(c.zrangebyscore(key, min, max));
}
public List<V> zrangebyscore(K key, double min, double max, long offset, long count) {
return await(c.zrangebyscore(key, min, max, offset, count));
}
public List<V> zrangebyscore(K key, String min, String max, long offset, long count) {
return await(c.zrangebyscore(key, min, max, offset, count));
}
public List<ScoredValue<V>> zrangebyscoreWithScores(K key, double min, double max) {
return await(c.zrangebyscoreWithScores(key, min, max));
}
public List<ScoredValue<V>> zrangebyscoreWithScores(K key, String min, String max) {
return await(c.zrangebyscoreWithScores(key, min, max));
}
public List<ScoredValue<V>> zrangebyscoreWithScores(K key, double min, double max, long offset, long count) {
return await(c.zrangebyscoreWithScores(key, min, max, offset, count));
}
public List<ScoredValue<V>> zrangebyscoreWithScores(K key, String min, String max, long offset, long count) {
return await(c.zrangebyscoreWithScores(key, min, max, offset, count));
}
public Long zrank(K key, V member) {
return await(c.zrank(key, member));
}
public Long zrem(K key, V... members) {
return await(c.zrem(key, members));
}
public Long zremrangebyrank(K key, long start, long stop) {
return await(c.zremrangebyrank(key, start, stop));
}
public Long zremrangebyscore(K key, double min, double max) {
return await(c.zremrangebyscore(key, min, max));
}
public Long zremrangebyscore(K key, String min, String max) {
return await(c.zremrangebyscore(key, min, max));
}
public List<String> time() {
return await(c.time());
}
public List<V> zrevrange(K key, long start, long stop) {
return await(c.zrevrange(key, start, stop));
}
public List<ScoredValue<V>> zrevrangeWithScores(K key, long start, long stop) {
return await(c.zrevrangeWithScores(key, start, stop));
}
public List<V> zrevrangebyscore(K key, double max, double min) {
return await(c.zrevrangebyscore(key, max, min));
}
public List<V> zrevrangebyscore(K key, String max, String min) {
return await(c.zrevrangebyscore(key, max, min));
}
public List<V> zrevrangebyscore(K key, double max, double min, long offset, long count) {
return await(c.zrevrangebyscore(key, max, min, offset, count));
}
public List<V> zrevrangebyscore(K key, String max, String min, long offset, long count) {
return await(c.zrevrangebyscore(key, max, min, offset, count));
}
public List<ScoredValue<V>> zrevrangebyscoreWithScores(K key, double max, double min) {
return await(c.zrevrangebyscoreWithScores(key, max, min));
}
public List<ScoredValue<V>> zrevrangebyscoreWithScores(K key, String max, String min) {
return await(c.zrevrangebyscoreWithScores(key, max, min));
}
public List<ScoredValue<V>> zrevrangebyscoreWithScores(K key, double max, double min, long offset, long count) {
return await(c.zrevrangebyscoreWithScores(key, max, min, offset, count));
}
public List<ScoredValue<V>> zrevrangebyscoreWithScores(K key, String max, String min, long offset, long count) {
return await(c.zrevrangebyscoreWithScores(key, max, min, offset, count));
}
public Long zrevrank(K key, V member) {
return await(c.zrevrank(key, member));
}
public Double zscore(K key, V member) {
return await(c.zscore(key, member));
}
public Long zunionstore(K destination, K... keys) {
return await(c.zunionstore(destination, keys));
}
public Long zunionstore(K destination, ZStoreArgs storeArgs, K... keys) {
return await(c.zunionstore(destination, storeArgs, keys));
}
public ListScanResult<V> sscan(K key, long startValue) {
return await(c.sscan(key, startValue));
}
public ListScanResult<V> zscan(K key, long startValue) {
return await(c.zscan(key, startValue));
}
public RedisAsyncConnection<K, V> getAsync() {
return c;
}
/**
* Close the connection.
*/
public void close() {
c.close();
}
/**
* Generate SHA-1 digest for the supplied script.
*
* @param script Lua script.
*
* @return Script digest as a lowercase hex string.
*/
public String digest(V script) {
return c.digest(script);
}
private <T> T await(Future<T> future, long timeout, TimeUnit unit) {
return c.await(future, timeout, unit);
}
private <T> T awaitInterruptibly(Future<T> future, long timeout, TimeUnit unit) throws InterruptedException {
return c.awaitInterruptibly(future, timeout, unit);
}
private <T> T await(Future<T> future) {
return c.await(future, timeout, unit);
}
}

@ -1,11 +0,0 @@
package com.lambdaworks.redis;
public class RedisConnectionClosedException extends RedisException {
private static final long serialVersionUID = 1895201562761894967L;
public RedisConnectionClosedException(String msg) {
super(msg);
}
}

@ -1,15 +0,0 @@
package com.lambdaworks.redis;
public class RedisConnectionException extends RedisException {
private static final long serialVersionUID = 4007817232147176510L;
public RedisConnectionException(String msg) {
super(msg);
}
public RedisConnectionException(String msg, Throwable e) {
super(msg, e);
}
}

@ -1,24 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis;
/**
* Exception thrown when redis returns an error message, or when the client
* fails for any reason.
*
* @author Will Glozer
*/
@SuppressWarnings("serial")
public class RedisException extends RuntimeException {
public RedisException() {
}
public RedisException(String msg) {
super(msg);
}
public RedisException(String msg, Throwable e) {
super(msg, e);
}
}

@ -1,17 +0,0 @@
package com.lambdaworks.redis;
public class RedisMovedException extends RedisException {
private static final long serialVersionUID = -6969734163155547631L;
private int slot;
public RedisMovedException(int slot) {
this.slot = slot;
}
public int getSlot() {
return slot;
}
}

@ -1,7 +0,0 @@
package com.lambdaworks.redis;
public class RedisTimeoutException extends RedisException {
private static final long serialVersionUID = -6969734163155547631L;
}

@ -1,37 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis;
/**
* A value and its associated score from a ZSET.
*
* @author Will Glozer
*/
public class ScoredValue<V> {
public final double score;
public final V value;
public ScoredValue(double score, V value) {
this.score = score;
this.value = value;
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
ScoredValue<?> that = (ScoredValue<?>) o;
return Double.compare(that.score, score) == 0 && value.equals(that.value);
}
@Override
public int hashCode() {
long temp = score != +0.0d ? Double.doubleToLongBits(score) : 0L;
int result = (int) (temp ^ (temp >>> 32));
return 31 * result + (value != null ? value.hashCode() : 0);
}
@Override
public String toString() {
return String.format("(%f, %s)", score, value);
}
}

@ -1,22 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis;
/**
* A Lua script returns one of the following types:
*
* <ul>
* <li>{@link #BOOLEAN} boolean</li>
* <li>{@link #INTEGER} 64-bit integer</li>
* <li>{@link #STATUS} status string</li>
* <li>{@link #VALUE} value</li>
* <li>{@link #MAPVALUE} typed value</li>
* <li>{@link #MULTI} of these types</li>.
* </ul>
*
* @author Will Glozer
*/
public enum ScriptOutputType {
BOOLEAN, INTEGER, MULTI, STATUS, VALUE, MAPVALUE, MAPVALUELIST
}

@ -1,124 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis;
import com.lambdaworks.redis.protocol.CommandArgs;
import com.lambdaworks.redis.protocol.CommandKeyword;
import java.util.ArrayList;
import java.util.List;
import static com.lambdaworks.redis.protocol.CommandKeyword.*;
import static com.lambdaworks.redis.protocol.CommandType.GET;
/**
* Argument list builder for the redis <a href="http://redis.io/commands/sort">SORT</a>
* command. Static import the methods from {@link Builder} and chain the method calls:
* <code>by("weight_*").desc().limit(0, 2)</code>.
*
* @author Will Glozer
*/
public class SortArgs {
private String by;
private Long offset, count;
private List<String> get;
private CommandKeyword order;
private boolean alpha;
/**
* Static builder methods.
*/
public static class Builder {
public static SortArgs by(String pattern) {
return new SortArgs().by(pattern);
}
public static SortArgs limit(long offset, long count) {
return new SortArgs().limit(offset, count);
}
public static SortArgs get(String pattern) {
return new SortArgs().get(pattern);
}
public static SortArgs asc() {
return new SortArgs().asc();
}
public static SortArgs desc() {
return new SortArgs().desc();
}
public static SortArgs alpha() {
return new SortArgs().alpha();
}
}
public SortArgs by(String pattern) {
by = pattern;
return this;
}
public SortArgs limit(long offset, long count) {
this.offset = offset;
this.count = count;
return this;
}
public SortArgs get(String pattern) {
if (get == null) {
get = new ArrayList<String>();
}
get.add(pattern);
return this;
}
public SortArgs asc() {
order = ASC;
return this;
}
public SortArgs desc() {
order = DESC;
return this;
}
public SortArgs alpha() {
alpha = true;
return this;
}
<K, V> void build(CommandArgs<K, V> args, K store) {
if (by != null) {
args.add(BY);
args.add(by);
}
if (get != null) {
for (String pattern : get) {
args.add(GET);
args.add(pattern);
}
}
if (offset != null) {
args.add(LIMIT);
args.add(offset);
args.add(count);
}
if (order != null) {
args.add(order);
}
if (alpha) {
args.add(ALPHA);
}
if (store != null) {
args.add(STORE);
args.addKey(store);
}
}
}

@ -1,91 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis;
import com.lambdaworks.redis.protocol.CommandArgs;
import java.util.*;
import static com.lambdaworks.redis.protocol.CommandKeyword.*;
/**
* Argument list builder for the redis <a href="http://redis.io/commands/zunionstore">ZUNIONSTORE</a>
* and <a href="http://redis.io/commands/zinterstore">ZINTERSTORE</a> commands. Static import the
* methods from {@link Builder} and chain the method calls: <code>weights(1, 2).max()</code>.
*
* @author Will Glozer
*/
public class ZStoreArgs {
private static enum Aggregate { SUM, MIN, MAX }
private List<Long> weights;
private Aggregate aggregate;
/**
* Static builder methods.
*/
public static class Builder {
public static ZStoreArgs weights(long... weights) {
return new ZStoreArgs().weights(weights);
}
public static ZStoreArgs sum() {
return new ZStoreArgs().sum();
}
public static ZStoreArgs min() {
return new ZStoreArgs().min();
}
public static ZStoreArgs max() {
return new ZStoreArgs().max();
}
}
public ZStoreArgs weights(long... weights) {
this.weights = new ArrayList<Long>(weights.length);
for (long weight : weights) {
this.weights.add(weight);
}
return this;
}
public ZStoreArgs sum() {
aggregate = Aggregate.SUM;
return this;
}
public ZStoreArgs min() {
aggregate = Aggregate.MIN;
return this;
}
public ZStoreArgs max() {
aggregate = Aggregate.MAX;
return this;
}
<K, V> void build(CommandArgs<K, V> args) {
if (weights != null) {
args.add(WEIGHTS);
for (long weight : weights) {
args.add(weight);
}
}
if (aggregate != null) {
args.add(AGGREGATE);
switch (aggregate) {
case SUM:
args.add(SUM);
break;
case MIN:
args.add(MIN);
break;
case MAX:
args.add(MAX);
break;
}
}
}
}

@ -1,66 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.codec;
import java.nio.ByteBuffer;
/**
* A RedisCodec encodes keys and values sent to redis, and decodes keys
* and values in the command output.
*
* The encode methods will be called by multiple threads and must be thread-safe,
* however the decode methods will only be called by one thread.
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Will Glozer
*/
public abstract class RedisCodec<K, V> {
/**
* Decode the key output by redis.
*
* @param bytes Raw bytes of the key.
*
* @return The decoded key.
*/
public abstract K decodeKey(ByteBuffer bytes);
/**
* Decode the value output by redis.
*
* @param bytes Raw bytes of the value.
*
* @return The decoded value.
*/
public abstract V decodeValue(ByteBuffer bytes);
/**
* Encode the key for output to redis.
*
* @param key Key.
*
* @return The encoded key.
*/
public abstract byte[] encodeKey(K key);
/**
* Encode the value for output to redis.
*
* @param value Value.
*
* @return The encoded value.
*/
public abstract byte[] encodeValue(V value);
public abstract byte[] encodeMapValue(V value);
public abstract byte[] encodeMapKey(K key);
public abstract V decodeMapValue(ByteBuffer bytes);
public abstract K decodeMapKey(ByteBuffer bytes);
}

@ -1,87 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.codec;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.*;
import static java.nio.charset.CoderResult.OVERFLOW;
/**
* A {@link RedisCodec} that handles UTF-8 encoded keys and values.
*
* @author Will Glozer
*/
public class Utf8StringCodec extends RedisCodec<String, String> {
private Charset charset;
private CharsetDecoder decoder;
private CharBuffer chars;
/**
* Initialize a new instance that encodes and decodes strings using
* the UTF-8 charset;
*/
public Utf8StringCodec() {
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
chars = CharBuffer.allocate(1024);
}
@Override
public String decodeKey(ByteBuffer bytes) {
return decode(bytes);
}
@Override
public String decodeValue(ByteBuffer bytes) {
return decode(bytes);
}
@Override
public byte[] encodeKey(String key) {
return encode(key);
}
@Override
public byte[] encodeValue(String value) {
return encode(value);
}
private String decode(ByteBuffer bytes) {
chars.clear();
bytes.mark();
decoder.reset();
while (decoder.decode(bytes, chars, true) == OVERFLOW || decoder.flush(chars) == OVERFLOW) {
chars = CharBuffer.allocate(chars.capacity() * 2);
bytes.reset();
}
return chars.flip().toString();
}
private byte[] encode(String string) {
return string.getBytes(charset);
}
@Override
public byte[] encodeMapValue(String value) {
return encodeValue(value);
}
@Override
public byte[] encodeMapKey(String key) {
return encodeKey(key);
}
@Override
public String decodeMapValue(ByteBuffer bytes) {
return decodeValue(bytes);
}
@Override
public String decodeMapKey(ByteBuffer bytes) {
return decodeKey(bytes);
}
}

@ -1,25 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.util.ArrayList;
import java.util.List;
/**
* {@link java.util.List} of boolean output.
*
* @author Will Glozer
*/
public class BooleanListOutput<K, V> extends CommandOutput<K, V, List<Boolean>> {
public BooleanListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Boolean>());
}
@Override
public void set(long integer) {
output.add((integer == 1) ? Boolean.TRUE : Boolean.FALSE);
}
}

@ -1,31 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
/**
* Boolean output. The actual value is returned as an integer
* where 0 indicates false and 1 indicates true, or as a null
* bulk reply for script output.
*
* @author Will Glozer
*/
public class BooleanOutput<K, V> extends CommandOutput<K, V, Boolean> {
public BooleanOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(long integer) {
output = (integer == 1) ? Boolean.TRUE : Boolean.FALSE;
}
@Override
public void set(ByteBuffer bytes) {
output = (bytes != null) ? Boolean.TRUE : Boolean.FALSE;
}
}

@ -1,27 +0,0 @@
// Copyright (C) 2012 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
/**
* Byte array output.
*
* @author Will Glozer
*/
public class ByteArrayOutput<K, V> extends CommandOutput<K, V, byte[]> {
public ByteArrayOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
if (bytes != null) {
output = new byte[bytes.remaining()];
bytes.get(output);
}
}
}

@ -1,24 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.util.Date;
/**
* Date output with no milliseconds.
*
* @author Will Glozer
*/
public class DateOutput<K, V> extends CommandOutput<K, V, Date> {
public DateOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(long time) {
output = new Date(time * 1000);
}
}

@ -1,26 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import static java.lang.Double.parseDouble;
/**
* Double output, may be null.
*
* @author Will Glozer
*/
public class DoubleOutput<K, V> extends CommandOutput<K, V, Double> {
public DoubleOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
output = (bytes == null) ? null : parseDouble(decodeAscii(bytes));
}
}

@ -1,31 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
/**
* 64-bit integer output, may be null.
*
* @author Will Glozer
*/
public class IntegerOutput<K, V> extends CommandOutput<K, V, Long> {
public IntegerOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(long integer) {
output = integer;
}
@Override
public void set(ByteBuffer bytes) {
output = bytes == null ? null : new Long(decodeAscii(bytes));
}
}

@ -1,28 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* {@link List} of keys output.
*
* @param <K> Key type.
*
* @author Will Glozer
*/
public class KeyListOutput<K, V> extends CommandOutput<K, V, List<K>> {
public KeyListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<K>());
}
@Override
public void set(ByteBuffer bytes) {
output.add(codec.decodeKey(bytes));
}
}

@ -1,26 +0,0 @@
// Copyright (C) 2013 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
/**
* Key output.
*
* @param <K> Key type.
*
* @author Will Glozer
*/
public class KeyOutput<K, V> extends CommandOutput<K, V, K> {
public KeyOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
output = (bytes == null) ? null : codec.decodeKey(bytes);
}
}

@ -1,37 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.KeyValue;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
/**
* Key-value pair output.
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Will Glozer
*/
public class KeyValueOutput<K, V> extends CommandOutput<K, V, KeyValue<K, V>> {
private K key;
public KeyValueOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
if (bytes != null) {
if (key == null) {
key = codec.decodeKey(bytes);
} else {
V value = codec.decodeValue(bytes);
output = new KeyValue<K, V>(key, value);
}
}
}
}

@ -1,46 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
public class ListMapOutput<K, V> extends CommandOutput<K, V, List<Map<K, V>>> {
private K key;
private int index = 0;
public ListMapOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Map<K, V>>());
}
@Override
public void set(ByteBuffer bytes) {
if (key == null) {
key = codec.decodeMapKey(bytes);
return;
}
V value = (bytes == null) ? null : codec.decodeMapValue(bytes);
if (output.isEmpty()) {
output.add(new HashMap<K, V>());
}
Map<K, V> map = output.get(index);
if (map == null) {
map = new HashMap<K, V>();
output.add(map);
}
if (map.get(key) != null) {
index++;
map = new HashMap<K, V>();
output.add(map);
}
map.put(key, value);
key = null;
}
}

@ -1,23 +0,0 @@
package com.lambdaworks.redis.output;
import java.nio.ByteBuffer;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
public class ListScanOutput<K, V> extends CommandOutput<K, V, ListScanResult<V>> {
public ListScanOutput(RedisCodec<K, V> codec) {
super(codec, new ListScanResult<V>());
}
@Override
public void set(ByteBuffer bytes) {
if (output.getPos() == null) {
output.setPos(((Number) codec.decodeValue(bytes)).longValue());
} else {
output.addValue(codec.decodeValue(bytes));
}
}
}

@ -1,25 +0,0 @@
package com.lambdaworks.redis.output;
import java.util.ArrayList;
import java.util.List;
public class ListScanResult<V> {
private Long pos;
private List<V> values = new ArrayList<V>();
public void setPos(Long pos) {
this.pos = pos;
}
public Long getPos() {
return pos;
}
public void addValue(V value) {
values.add(value);
}
public List<V> getValues() {
return values;
}
}

@ -1,29 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import java.nio.ByteBuffer;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
/**
* {@link List} of keys output.
*
* @param <K> Key type.
*
* @author Will Glozer
*/
public class MapKeyListOutput<K, V> extends CommandOutput<K, V, Set<K>> {
public MapKeyListOutput(RedisCodec<K, V> codec) {
super(codec, new LinkedHashSet<K>());
}
@Override
public void set(ByteBuffer bytes) {
output.add(codec.decodeMapKey(bytes));
}
}

@ -1,38 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
/**
* {@link Map} of keys and values output.
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Will Glozer
*/
public class MapOutput<K, V> extends CommandOutput<K, V, Map<K, V>> {
private K key;
public MapOutput(RedisCodec<K, V> codec) {
super(codec, new HashMap<K, V>());
}
@Override
public void set(ByteBuffer bytes) {
if (key == null) {
key = codec.decodeMapKey(bytes);
return;
}
V value = (bytes == null) ? null : codec.decodeMapValue(bytes);
output.put(key, value);
key = null;
}
}

@ -1,28 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* {@link List} of values output.
*
* @param <V> Value type.
*
* @author Will Glozer
*/
public class MapValueListOutput<K, V> extends CommandOutput<K, V, List<V>> {
public MapValueListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<V>());
}
@Override
public void set(ByteBuffer bytes) {
output.add(bytes == null ? null : codec.decodeMapValue(bytes));
}
}

@ -1,26 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
/**
* Value output.
*
* @param <V> Value type.
*
* @author Will Glozer
*/
public class MapValueOutput<K, V> extends CommandOutput<K, V, V> {
public MapValueOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
output = (bytes == null) ? null : codec.decodeMapValue(bytes);
}
}

@ -1,65 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.*;
/**
* Output of all commands within a MULTI block.
*
* @author Will Glozer
*/
public class MultiOutput<K, V> extends CommandOutput<K, V, List<Object>> {
private Queue<Command<K, V, ?>> queue;
public MultiOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Object>());
queue = new LinkedList<Command<K, V, ?>>();
}
public void add(Command<K, V, ?> cmd) {
queue.add(cmd);
}
public void cancel() {
for (Command<K, V, ?> c : queue) {
c.cancel();
}
}
@Override
public void set(long integer) {
queue.peek().getOutput().set(integer);
}
@Override
public void set(ByteBuffer bytes) {
queue.peek().getOutput().set(bytes);
}
@Override
public void setError(ByteBuffer error) {
CommandOutput<K, V, ?> output = queue.isEmpty() ? this : queue.peek().getOutput();
output.setError(decodeAscii(error));
}
@Override
public void complete(int depth) {
if (depth == 1) {
Command<K, V, ?> cmd = queue.remove();
CommandOutput<K, V, ?> o = cmd.getOutput();
output.add(!o.hasError() ? o.get() : new RedisException(o.getError()));
cmd.complete();
} else if (depth == 0 && !queue.isEmpty()) {
for (Command<K, V, ?> cmd : queue) {
cmd.complete();
}
}
}
}

@ -1,55 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.*;
/**
* {@link List} of command outputs, possibly deeply nested.
*
* @author Will Glozer
*/
public class NestedMultiOutput<K, V> extends CommandOutput<K, V, List<Object>> {
private LinkedList<List<Object>> stack;
private int depth;
public NestedMultiOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Object>());
stack = new LinkedList<List<Object>>();
depth = 1;
}
@Override
public void set(long integer) {
output.add(integer);
}
@Override
public void set(ByteBuffer bytes) {
output.add(bytes == null ? null : codec.decodeKey(bytes));
}
@Override
public void setError(ByteBuffer error) {
output.add(new RedisException(decodeAscii(error)));
}
@Override
public void complete(int depth) {
if (depth > this.depth) {
Object o = output.remove(output.size() - 1);
stack.push(output);
output = new ArrayList<Object>();
output.add(o);
} else if (depth > 0 && depth < this.depth) {
stack.peek().add(output);
output = stack.pop();
}
this.depth = depth;
}
}

@ -1,38 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.ScoredValue;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* {@link List} of values and their associated scores.
*
* @param <V> Value type.
*
* @author Will Glozer
*/
public class ScoredValueListOutput<K, V> extends CommandOutput<K, V, List<ScoredValue<V>>> {
private V value;
public ScoredValueListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<ScoredValue<V>>());
}
@Override
public void set(ByteBuffer bytes) {
if (value == null) {
value = codec.decodeValue(bytes);
return;
}
double score = Double.parseDouble(decodeAscii(bytes));
output.add(new ScoredValue<V>(score, value));
value = null;
}
}

@ -1,28 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import static com.lambdaworks.redis.protocol.Charsets.buffer;
/**
* Status message output.
*
* @author Will Glozer
*/
public class StatusOutput<K, V> extends CommandOutput<K, V, String> {
private static final ByteBuffer OK = buffer("OK");
public StatusOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
output = OK.equals(bytes) ? "OK" : decodeAscii(bytes);
}
}

@ -1,26 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* {@link List} of string output.
*
* @author Will Glozer
*/
public class StringListOutput<K, V> extends CommandOutput<K, V, List<String>> {
public StringListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<String>());
}
@Override
public void set(ByteBuffer bytes) {
output.add(bytes == null ? null : decodeAscii(bytes));
}
}

@ -1,28 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* {@link List} of values output.
*
* @param <V> Value type.
*
* @author Will Glozer
*/
public class ValueListOutput<K, V> extends CommandOutput<K, V, List<V>> {
public ValueListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<V>());
}
@Override
public void set(ByteBuffer bytes) {
output.add(bytes == null ? null : codec.decodeValue(bytes));
}
}

@ -1,26 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
/**
* Value output.
*
* @param <V> Value type.
*
* @author Will Glozer
*/
public class ValueOutput<K, V> extends CommandOutput<K, V, V> {
public ValueOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
output = (bytes == null) ? null : codec.decodeValue(bytes);
}
}

@ -1,28 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
/**
* {@link Set} of value output.
*
* @param <V> Value type.
*
* @author Will Glozer
*/
public class ValueSetOutput<K, V> extends CommandOutput<K, V, Set<V>> {
public ValueSetOutput(RedisCodec<K, V> codec) {
super(codec, new HashSet<V>());
}
@Override
public void set(ByteBuffer bytes) {
output.add(bytes == null ? null : codec.decodeMapValue(bytes));
}
}

@ -1,27 +0,0 @@
package com.lambdaworks.redis.output;
import java.nio.ByteBuffer;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
public class ValueSetScanOutput<K, V> extends CommandOutput<K, V, ListScanResult<V>> {
public ValueSetScanOutput(RedisCodec<K, V> codec) {
super(codec, new ListScanResult<V>());
}
@Override
public void set(ByteBuffer bytes) {
if (output.getPos() == null) {
output.setPos(toLong(bytes));
} else {
output.addValue(codec.decodeMapValue(bytes));
}
}
private Long toLong(ByteBuffer bytes) {
return bytes == null ? null : new Long(new String(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit()));
}
}

@ -1,19 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.protocol;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
/**
* {@link Charset}-related utilities.
*
* @author Will Glozer
*/
public class Charsets {
public static final Charset ASCII = Charset.forName("US-ASCII");
public static ByteBuffer buffer(String s) {
return ByteBuffer.wrap(s.getBytes(ASCII));
}
}

@ -1,133 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.protocol;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisMovedException;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Promise;
/**
* A redis command and its result. All successfully executed commands will
* eventually return a {@link CommandOutput} object.
*
* @param <T> Command output type.
*
* @author Will Glozer
*/
public class Command<K, V, T> {
private static final byte[] CRLF = "\r\n".getBytes(Charsets.ASCII);
private final Promise<T> promise;
public final CommandType type;
protected CommandArgs<K, V> args;
protected final CommandOutput<K, V, T> output;
protected int completeAmount;
/**
* Create a new command with the supplied type and args.
*
* @param type Command type.
* @param output Command output.
* @param args Command args, if any.
* @param multi Flag indicating if MULTI active.
*/
public Command(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args, boolean multi, Promise<T> proimse) {
this.type = type;
this.output = output;
this.args = args;
this.completeAmount = multi ? 2 : 1;
this.promise = proimse;
}
public Promise<T> getPromise() {
return promise;
}
/**
* Get the object that holds this command's output.
*
* @return The command output object.
*/
public CommandOutput<K, V, T> getOutput() {
return output;
}
public void cancel() {
promise.cancel(true);
}
public void complete() {
completeAmount--;
if (completeAmount == 0) {
Object res = output.get();
if (promise.isCancelled()) {
return;
}
if (res instanceof RedisException) {
promise.setFailure((Exception)res);
} else if (output.hasError()) {
if (output.getError().startsWith("MOVED")) {
String[] parts = output.getError().split(" ");
int slot = Integer.valueOf(parts[1]);
promise.setFailure(new RedisMovedException(slot));
} else if (output.getError().startsWith("(error) ASK")) {
String[] parts = output.getError().split(" ");
int slot = Integer.valueOf(parts[2]);
promise.setFailure(new RedisMovedException(slot));
} else {
promise.setFailure(new RedisException(output.getError()));
}
} else if (output.hasException()) {
promise.setFailure(output.getException());
} else {
promise.setSuccess((T)res);
}
}
}
/**
* Encode and write this command to the supplied buffer using the new
* <a href="http://redis.io/topics/protocol">Unified Request Protocol</a>.
*
* @param buf Buffer to write to.
*/
void encode(ByteBuf buf) {
buf.writeByte('*');
writeInt(buf, 1 + (args != null ? args.count() : 0));
buf.writeBytes(CRLF);
buf.writeByte('$');
writeInt(buf, type.bytes.length);
buf.writeBytes(CRLF);
buf.writeBytes(type.bytes);
buf.writeBytes(CRLF);
if (args != null) {
buf.writeBytes(args.buffer());
}
}
/**
* Write the textual value of a positive integer to the supplied buffer.
*
* @param buf Buffer to write to.
* @param value Value to write.
*/
protected static void writeInt(ByteBuf buf, int value) {
if (value < 10) {
buf.writeByte('0' + value);
return;
}
StringBuilder sb = new StringBuilder(8);
while (value > 0) {
int digit = value % 10;
sb.append((char) ('0' + digit));
value /= 10;
}
for (int i = sb.length() - 1; i >= 0; i--) {
buf.writeByte(sb.charAt(i));
}
}
}

@ -1,209 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.protocol;
import com.lambdaworks.redis.codec.RedisCodec;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import static java.lang.Math.max;
/**
* Redis command argument encoder.
*
* @author Will Glozer
*/
public class CommandArgs<K, V> {
private static final byte[] CRLF = "\r\n".getBytes(Charsets.ASCII);
private RedisCodec<K, V> codec;
private ByteBuffer buffer;
private int count;
public CommandArgs(RedisCodec<K, V> codec) {
this.codec = codec;
this.buffer = ByteBuffer.allocate(32);
}
public ByteBuffer buffer() {
buffer.flip();
return buffer;
}
public int count() {
return count;
}
public CommandArgs<K, V> addMapKeys(K... keys) {
for (K key : keys) {
addMapKey(key);
}
return this;
}
public CommandArgs<K, V> addMapKey(K key) {
return write(codec.encodeMapKey(key));
}
public CommandArgs<K, V> addKey(K key) {
return write(codec.encodeKey(key));
}
public CommandArgs<K, V> addKeys(List<K> keys) {
for (K key : keys) {
addKey(key);
}
return this;
}
public CommandArgs<K, V> addKeys(K... keys) {
for (K key : keys) {
addKey(key);
}
return this;
}
public CommandArgs<K, V> addMapValue(V value) {
return write(codec.encodeMapValue(value));
}
public CommandArgs<K, V> addValue(V value) {
return write(codec.encodeValue(value));
}
public CommandArgs<K, V> addMapValues(V... values) {
for (V value : values) {
addMapValue(value);
}
return this;
}
public CommandArgs<K, V> addValues(V... values) {
for (V value : values) {
addValue(value);
}
return this;
}
public CommandArgs<K, V> add(Map<K, V> map) {
if (map.size() > 2) {
realloc(buffer.capacity() + 16 * map.size());
}
for (Map.Entry<K, V> entry : map.entrySet()) {
write(codec.encodeMapKey(entry.getKey()));
write(codec.encodeMapValue(entry.getValue()));
}
return this;
}
public CommandArgs<K, V> add(String s) {
return write(s);
}
public CommandArgs<K, V> add(long n) {
return write(Long.toString(n));
}
public CommandArgs<K, V> add(double n) {
return write(Double.toString(n));
}
public CommandArgs<K, V> add(byte[] value) {
return write(value);
}
public CommandArgs<K, V> add(CommandKeyword keyword) {
return write(keyword.bytes);
}
public CommandArgs<K, V> add(CommandType type) {
return write(type.bytes);
}
private CommandArgs<K, V> write(byte[] arg) {
buffer.mark();
if (buffer.remaining() < arg.length) {
int estimate = buffer.remaining() + arg.length + 10;
realloc(max(buffer.capacity() * 2, estimate));
}
while (true) {
try {
buffer.put((byte) '$');
write(arg.length);
buffer.put(CRLF);
buffer.put(arg);
buffer.put(CRLF);
break;
} catch (BufferOverflowException e) {
buffer.reset();
realloc(buffer.capacity() * 2);
}
}
count++;
return this;
}
private CommandArgs<K, V> write(String arg) {
int length = arg.length();
buffer.mark();
if (buffer.remaining() < length) {
int estimate = buffer.remaining() + length + 10;
realloc(max(buffer.capacity() * 2, estimate));
}
while (true) {
try {
buffer.put((byte) '$');
write(length);
buffer.put(CRLF);
for (int i = 0; i < length; i++) {
buffer.put((byte) arg.charAt(i));
}
buffer.put(CRLF);
break;
} catch (BufferOverflowException e) {
buffer.reset();
realloc(buffer.capacity() * 2);
}
}
count++;
return this;
}
private void write(long value) {
if (value < 10) {
buffer.put((byte) ('0' + value));
return;
}
StringBuilder sb = new StringBuilder(8);
while (value > 0) {
long digit = value % 10;
sb.append((char) ('0' + digit));
value /= 10;
}
for (int i = sb.length() - 1; i >= 0; i--) {
buffer.put((byte) sb.charAt(i));
}
}
private void realloc(int size) {
ByteBuffer buffer = ByteBuffer.allocate(size);
this.buffer.flip();
buffer.put(this.buffer);
buffer.mark();
this.buffer = buffer;
}
}

@ -1,101 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.StringUtil;
import java.util.concurrent.BlockingQueue;
/**
* A netty {@link ChannelHandler} responsible for writing redis commands and
* reading responses from the server.
*
* @author Will Glozer
*/
@ChannelHandler.Sharable
public class CommandHandler<K, V> extends ChannelDuplexHandler {
protected BlockingQueue<Command<K, V, ?>> queue;
protected ByteBuf buffer;
protected RedisStateMachine<K, V> rsm;
/**
* Initialize a new instance that handles commands from the supplied queue.
*
* @param queue The command queue.
*/
public CommandHandler(BlockingQueue<Command<K, V, ?>> queue) {
this.queue = queue;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
buffer = ctx.alloc().heapBuffer();
rsm = new RedisStateMachine<K, V>();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
buffer.release();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
try {
if (!input.isReadable()) return;
// System.out.println("in: " + toHexString(input));
buffer.discardReadBytes();
buffer.writeBytes(input);
decode(ctx, buffer);
} finally {
input.release();
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg;
ByteBuf buf = ctx.alloc().heapBuffer();
cmd.encode(buf);
// System.out.println("out: " + toHexString(buf));
ctx.write(buf, promise);
}
private String toHexString(ByteBuf buf) {
final StringBuilder builder = new StringBuilder(buf.readableBytes() * 2);
buf.forEachByte(new ByteBufProcessor() {
@Override
public boolean process(byte value) throws Exception {
char b = (char) value;
if ((b < ' ' && b != '\n' && b != '\r') || b > '~') {
builder.append("\\x").append(StringUtil.byteToHexStringPadded(value));
} else {
builder.append(b);
}
return true;
}
});
return builder.toString();
}
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
while (true) {
Command<K, V, ?> cmd = queue.peek();
if (cmd == null
|| !rsm.decode(buffer, cmd.getOutput())) {
break;
}
cmd = queue.take();
cmd.complete();
}
}
}

@ -1,21 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.protocol;
/**
* Keyword modifiers for redis commands.
*
* @author Will Glozer
*/
public enum CommandKeyword {
AFTER, AGGREGATE, ALPHA, AND, ASC, BEFORE, BY, COUNT, DESC, ENCODING, FLUSH,
GETNAME, IDLETIME, KILL, LEN, LIMIT, LIST, LOAD, MAX, MIN, NO, NOSAVE, NOT,
ONE, OR, REFCOUNT, RESET, RESETSTAT, SETNAME, STORE, SUM, WEIGHTS,
WITHSCORES, XOR, NODES;
public byte[] bytes;
private CommandKeyword() {
bytes = name().getBytes(Charsets.ASCII);
}
}

@ -1,147 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.protocol;
import com.lambdaworks.redis.codec.RedisCodec;
import java.nio.ByteBuffer;
/**
* Abstract representation of the output of a redis command.
*
* @param <T> Output type.
*
* @author Will Glozer
*/
public abstract class CommandOutput<K, V, T> {
protected RedisCodec<K, V> codec;
protected T output;
protected String error;
protected Throwable exception;
/**
* Initialize a new instance that encodes and decodes keys and
* values using the supplied codec.
*
* @param codec Codec used to encode/decode keys and values.
* @param output Initial value of output.
*/
public CommandOutput(RedisCodec<K, V> codec, T output) {
this.codec = codec;
this.output = output;
}
/**
* Get the command output.
*
* @return The command output.
*/
public T get() {
return output;
}
/**
* Set the command output to a sequence of bytes, or null. Concrete
* {@link CommandOutput} implementations must override this method
* unless they only receive an integer value which cannot be null.
*
* @param bytes The command output, or null.
*/
public void set(ByteBuffer bytes) {
throw new IllegalStateException();
}
/**
* Set the command output to a 64-bit signed integer. Concrete
* {@link CommandOutput} implementations must override this method
* unless they only receive a byte array value.
*
* @param integer The command output.
*/
public void set(long integer) {
throw new IllegalStateException();
}
/**
* Set command output to an error message from the server.
*
* @param error Error message.
*/
public void setError(ByteBuffer error) {
this.error = decodeAscii(error);
}
/**
* Set command output to an error message from the client.
*
* @param error Error message.
*/
public void setError(String error) {
this.error = error;
}
/**
* Check if the command resulted in an error.
*
* @return true if command resulted in an error.
*/
public boolean hasError() {
return this.error != null;
}
/**
* Get the error that occurred.
*
* @return The error.
*/
public String getError() {
return error;
}
/**
* Set exception that was caught while processing result in command output.
*
* @param exception Exception caught while processing command result.
*/
public void setException(Throwable exception) {
this.exception = exception;
}
/**
* Check if the processing command result resulted in an exception.
*
* @return true if processing of command result resulted in an exception.
*/
public boolean hasException() {
return this.exception != null;
}
/**
* Get the exception that occurred while processing command result.
*
* @return The exception.
*/
public Throwable getException () {
return exception;
}
/**
* Mark the command output complete.
*
* @param depth Remaining depth of output queue.
*/
public void complete(int depth) {
// nothing to do by default
}
protected String decodeAscii(ByteBuffer bytes) {
if (bytes == null) {
return null;
}
char[] chars = new char[bytes.remaining()];
for (int i = 0; i < chars.length; i++) {
chars[i] = (char) bytes.get();
}
return new String(chars);
}
}

@ -1,89 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.protocol;
/**
* Redis commands.
*
* @author Will Glozer
*/
public enum CommandType {
// Connection
AUTH, ECHO, PING, QUIT, SELECT,
// Server
BGREWRITEAOF, BGSAVE, CLIENT, CONFIG, DBSIZE, DEBUG, FLUSHALL,
FLUSHDB, INFO, LASTSAVE, MONITOR, SAVE, SHUTDOWN, SLAVEOF,
SLOWLOG, SYNC,
// Keys
DEL, DUMP, EXISTS, EXPIRE, EXPIREAT, KEYS, MIGRATE, MOVE, OBJECT, PERSIST,
PEXPIRE, PEXPIREAT, PTTL, RANDOMKEY, RENAME, RENAMENX, RESTORE, TTL, TYPE,
// String
APPEND, GET, GETRANGE, GETSET, MGET, MSET, MSETNX, SET, SETEX, SETNX,
SETRANGE, STRLEN, PSETEX,
// Numeric
DECR, DECRBY, INCR, INCRBY, INCRBYFLOAT,
// List
BLPOP, BRPOP, BRPOPLPUSH,
LINDEX, LINSERT, LLEN, LPOP, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM,
RPOP, RPOPLPUSH, RPUSH, RPUSHX, SORT,
// Hash
HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT, HKEYS, HLEN,
HMGET, HMSET, HSET, HSETNX, HVALS,
// Transaction
DISCARD, EXEC, MULTI, UNWATCH, WATCH,
// Pub/Sub
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE,
// Sets
SADD, SCARD, SDIFF, SDIFFSTORE, SINTER, SINTERSTORE, SISMEMBER,
SMEMBERS, SMOVE, SPOP, SRANDMEMBER, SREM, SUNION, SUNIONSTORE,
// Sorted Set
ZADD, ZCARD, ZCOUNT, ZINCRBY, ZINTERSTORE, ZRANGE, ZRANGEBYSCORE,
ZRANK, ZREM, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE,
ZREVRANGEBYSCORE, ZREVRANK, ZSCORE, ZUNIONSTORE,
TIME,
// Scripting
EVAL, EVALSHA, SCRIPT,
// Bits
BITCOUNT, BITOP, GETBIT, SETBIT,
// HyperLogLog
PFADD, PFCOUNT, PFMERGE,
SENTINEL,
SSCAN, ZSCAN, HSCAN,
ASKING, CLUSTER;
public byte[] bytes;
private CommandType() {
bytes = name().getBytes(Charsets.ASCII);
}
}

@ -1,145 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.protocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisAsyncConnection;
/**
* A netty {@link ChannelHandler} responsible for monitoring the channel and
* reconnecting when the connection is lost.
*
* @author Will Glozer
*/
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter{
public static final AttributeKey<Boolean> SHUTDOWN_KEY = AttributeKey.valueOf("shutdown");
private final Logger log = LoggerFactory.getLogger(getClass());
private Bootstrap bootstrap;
private Channel channel;
private ChannelGroup channels;
private static final int BACKOFF_CAP = 12;
/**
* Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup}
* and establishes a new {@link Channel} when disconnected, while reconnect is true.
*
* @param bootstrap Configuration for new channels.
*/
public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels) {
this.bootstrap = bootstrap;
this.channels = channels;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
channels.add(channel);
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeLine = channel.pipeline();
CommandHandler<?, ?> handler = pipeLine.get(CommandHandler.class);
RedisAsyncConnection<?, ?> connection = pipeLine.get(RedisAsyncConnection.class);
if (connection.isReconnect()) {
EventLoop loop = ctx.channel().eventLoop();
reconnect(loop, handler, connection);
}
ctx.fireChannelInactive();
}
/**
* Reconnect to the remote address that the closed channel was connected to.
* This creates a new {@link ChannelPipeline} with the same handler instances
* contained in the old channel's pipeline.
*
* @param loop EventLoop
* @param handler Redis Command handle.
* @param connection RedisAsyncConnection
*
* @throws Exception when reconnection fails.
*/
private void reconnect(final EventLoop loop, final CommandHandler<?, ?> handler, final RedisAsyncConnection<?, ?> connection){
loop.schedule(new Runnable() {
@Override
public void run() {
doReConnect(loop, handler, connection, 1);
}
}, 100, TimeUnit.MILLISECONDS);
}
private void doReConnect(final EventLoop loop, final CommandHandler<?, ?> handler, final RedisAsyncConnection<?, ?> connection, final int attempts) {
if (!connection.isReconnect()) {
return;
}
log.debug("trying to reconnect {}", connection.getRedisClient().getAddr());
ChannelFuture connect;
synchronized (bootstrap) {
connect = bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(ConnectionWatchdog.this, handler, connection);
}
}).connect();
}
connect.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.channel().attr(SHUTDOWN_KEY).get() != null) {
future.channel().pipeline().remove(ConnectionWatchdog.this);
return;
}
if (!future.isSuccess()) {
if (!connection.isReconnect()) {
return;
}
int timeout = 2 << attempts;
loop.schedule(new Runnable() {
@Override
public void run() {
doReConnect(loop, handler, connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
}
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
@Override
public String toString() {
return super.toString() + " - bootstrap: " + bootstrap;
}
}

@ -1,198 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.protocol;
import com.lambdaworks.redis.RedisException;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import static com.lambdaworks.redis.protocol.Charsets.buffer;
import static com.lambdaworks.redis.protocol.RedisStateMachine.State.Type.*;
/**
* State machine that decodes redis server responses encoded according to the
* <a href="http://redis.io/topics/protocol">Unified Request Protocol</a>.
*
* @author Will Glozer
*/
public class RedisStateMachine<K, V> {
private static final ByteBuffer QUEUED = buffer("QUEUED");
static class State {
enum Type { SINGLE, ERROR, INTEGER, BULK, MULTI, BYTES }
Type type = null;
int count = -1;
}
private LinkedList<State> stack;
/**
* Initialize a new instance.
*/
public RedisStateMachine() {
stack = new LinkedList<State>();
}
/**
* Attempt to decode a redis response and return a flag indicating whether a complete
* response was read.
*
* @param buffer Buffer containing data from the server.
* @param output Current command output.
*
* @return true if a complete response was read.
*/
public boolean decode(ByteBuf buffer, CommandOutput<K, V, ?> output) {
int length, end;
ByteBuffer bytes;
if (stack.isEmpty()) {
stack.add(new State());
}
if (output == null) {
return stack.isEmpty();
}
loop:
while (!stack.isEmpty()) {
State state = stack.peek();
if (state.type == null) {
if (!buffer.isReadable()) break;
state.type = readReplyType(buffer);
buffer.markReaderIndex();
}
switch (state.type) {
case SINGLE:
if ((bytes = readLine(buffer)) == null) break loop;
if (!QUEUED.equals(bytes)) {
setCommandOutputSafely(output, bytes);
}
break;
case ERROR:
if ((bytes = readLine(buffer)) == null) break loop;
output.setError(bytes);
break;
case INTEGER:
if ((end = findLineEnd(buffer)) == -1) break loop;
setCommandOutputSafely(output, readLong(buffer, buffer.readerIndex(), end));
break;
case BULK:
if ((end = findLineEnd(buffer)) == -1) break loop;
length = (int) readLong(buffer, buffer.readerIndex(), end);
if (length == -1) {
setCommandOutputSafely(output, null);
} else {
state.type = BYTES;
state.count = length + 2;
buffer.markReaderIndex();
continue loop;
}
break;
case MULTI:
if (state.count == -1) {
if ((end = findLineEnd(buffer)) == -1) break loop;
length = (int) readLong(buffer, buffer.readerIndex(), end);
state.count = length;
buffer.markReaderIndex();
}
if (state.count <= 0) break;
state.count--;
stack.addFirst(new State());
continue loop;
case BYTES:
if ((bytes = readBytes(buffer, state.count)) == null) break loop;
setCommandOutputSafely(output, bytes);
}
buffer.markReaderIndex();
stack.remove();
output.complete(stack.size());
}
return stack.isEmpty();
}
private int findLineEnd(ByteBuf buffer) {
int start = buffer.readerIndex();
int index = buffer.indexOf(start, buffer.writerIndex(), (byte) '\n');
return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
}
private State.Type readReplyType(ByteBuf buffer) {
switch (buffer.readByte()) {
case '+': return SINGLE;
case '-': return ERROR;
case ':': return INTEGER;
case '$': return BULK;
case '*': return MULTI;
default: throw new RedisException("Invalid first byte");
}
}
private long readLong(ByteBuf buffer, int start, int end) {
long value = 0;
boolean negative = buffer.getByte(start) == '-';
int offset = negative ? start + 1 : start;
while (offset < end - 1) {
int digit = buffer.getByte(offset++) - '0';
value = value * 10 - digit;
}
if (!negative) value = -value;
buffer.readerIndex(end + 1);
return value;
}
private ByteBuffer readLine(ByteBuf buffer) {
ByteBuffer bytes = null;
int end = findLineEnd(buffer);
if (end > -1) {
int start = buffer.readerIndex();
bytes = buffer.nioBuffer(start, end - start - 1);
buffer.readerIndex(end + 1);
}
return bytes;
}
private ByteBuffer readBytes(ByteBuf buffer, int count) {
ByteBuffer bytes = null;
if (buffer.readableBytes() >= count) {
bytes = buffer.nioBuffer(buffer.readerIndex(), count - 2);
buffer.readerIndex(buffer.readerIndex() + count);
}
return bytes;
}
private boolean setCommandOutputSafely(CommandOutput output, ByteBuffer bytes) {
boolean success = false;
try {
output.set(bytes);
success = true;
} catch (Throwable t) {
output.setException(t);
}
return success;
}
private boolean setCommandOutputSafely(CommandOutput output, long value) {
boolean success = false;
try {
output.set(value);
success = true;
} catch (Throwable t) {
output.setException(t);
}
return success;
}
}

@ -1,56 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.pubsub;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import java.util.concurrent.BlockingQueue;
/**
* A netty {@link ChannelHandler} responsible for writing redis pub/sub commands
* and reading the response stream from the server.
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Will Glozer
*/
public class PubSubCommandHandler<K, V> extends CommandHandler<K, V> {
private RedisCodec<K, V> codec;
private PubSubOutput<K, V> output;
/**
* Initialize a new instance.
*
* @param queue Command queue.
* @param codec Codec.
*/
public PubSubCommandHandler(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec) {
super(queue);
this.codec = codec;
this.output = new PubSubOutput<K, V>(codec);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
while (output.type() == null && !queue.isEmpty()) {
CommandOutput<K, V, ?> output = queue.peek().getOutput();
if (!rsm.decode(buffer, output)) {
return;
}
queue.take().complete();
if (output instanceof PubSubOutput && ((PubSubOutput) output).type() != null) {
ctx.fireChannelRead(output);
}
}
while (rsm.decode(buffer, output)) {
ctx.fireChannelRead(output);
output = new PubSubOutput<K, V>(codec);
}
}
}

@ -1,87 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.pubsub;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
/**
* One element of the redis pub/sub stream. May be a message or notification
* of subscription details.
*
* @param <V> Value type.
*
* @author Will Glozer
*/
public class PubSubOutput<K, V> extends CommandOutput<K, V, V> {
enum Type { message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe }
private Type type;
private String channel;
private String pattern;
private long count;
public PubSubOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
public Type type() {
return type;
}
public String channel() {
return channel;
}
public String pattern() {
return pattern;
}
public long count() {
return count;
}
@Override
@SuppressWarnings("fallthrough")
public void set(ByteBuffer bytes) {
if (type == null) {
type = Type.valueOf(decodeAscii(bytes));
return;
}
switch (type) {
case pmessage:
if (pattern == null) {
pattern = decodeAscii(bytes);
break;
}
case message:
if (channel == null) {
channel = decodeAscii(bytes);
break;
}
if (channel.startsWith("__keyspace@")
|| channel.startsWith("__keyevent@")) {
output = (V)decodeAscii(bytes);
} else {
output = codec.decodeValue(bytes);
}
break;
case psubscribe:
case punsubscribe:
pattern = decodeAscii(bytes);
break;
case subscribe:
case unsubscribe:
channel = decodeAscii(bytes);
break;
}
}
@Override
public void set(long integer) {
count = integer;
}
}

@ -1,37 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.pubsub;
/**
* Convenience adapter with an empty implementation of all
* {@link RedisPubSubListener} callback methods.
*
* @param <V> Value type.
*
* @author Will Glozer
*/
public class RedisPubSubAdapter<V> implements RedisPubSubListener<V> {
@Override
public void message(String channel, V message) {
}
@Override
public void message(String pattern, String channel, V message) {
}
@Override
public void subscribed(String channel, long count) {
}
@Override
public void psubscribed(String pattern, long count) {
}
@Override
public void unsubscribed(String channel, long count) {
}
@Override
public void punsubscribed(String pattern, long count) {
}
}

@ -1,174 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.pubsub;
import static com.lambdaworks.redis.protocol.CommandType.PSUBSCRIBE;
import static com.lambdaworks.redis.protocol.CommandType.PUNSUBSCRIBE;
import static com.lambdaworks.redis.protocol.CommandType.SUBSCRIBE;
import static com.lambdaworks.redis.protocol.CommandType.UNSUBSCRIBE;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
/**
* An asynchronous thread-safe pub/sub connection to a redis server. After one or
* more channels are subscribed to only pub/sub related commands or {@link #quit}
* may be called.
*
* Incoming messages and results of the {@link #subscribe}/{@link #unsubscribe}
* calls will be passed to all registered {@link RedisPubSubListener}s.
*
* A {@link com.lambdaworks.redis.protocol.ConnectionWatchdog} monitors each
* connection and reconnects automatically until {@link #close} is called. Channel
* and pattern subscriptions are renewed after reconnecting.
*
* @author Will Glozer
*/
public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
private final Queue<RedisPubSubListener<V>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<V>>();
private Set<String> channels;
private Set<String> patterns;
/**
* Initialize a new connection.
*
* @param queue Command queue.
* @param codec Codec used to encode/decode keys and values.
* @param timeout Maximum time to wait for a responses.
* @param unit Unit of time for the timeout.
* @param eventLoopGroup
*/
public RedisPubSubConnection(RedisClient client, BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit, EventLoopGroup eventLoopGroup) {
super(client, queue, codec, timeout, unit, eventLoopGroup);
channels = new HashSet<String>();
patterns = new HashSet<String>();
}
/**
* Add a new listener.
*
* @param listener Listener.
*/
public void addListener(RedisPubSubListener<V> listener) {
listeners.add(listener);
}
/**
* Remove an existing listener.
*
* @param listener Listener.
*/
public void removeListener(RedisPubSubListener<V> listener) {
listeners.remove(listener);
}
public void psubscribe(String... patterns) {
dispatch(PSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
}
public void subscribe(String... channels) {
dispatch(SUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
}
public Future<V> unsubscribe(String... channels) {
return dispatch(UNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
}
public Future<V> punsubscribe(String... patterns) {
return dispatch(PUNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
}
@Override
public synchronized void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
if (channels.size() > 0) {
subscribe(channels.toArray(new String[channels.size()]));
channels.clear();
}
if (patterns.size() > 0) {
psubscribe(toArray(patterns));
patterns.clear();
}
}
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
PubSubOutput<K, V> output = (PubSubOutput<K, V>) msg;
// Update internal state
switch (output.type()) {
case psubscribe:
patterns.add(output.pattern());
break;
case punsubscribe:
patterns.remove(output.pattern());
break;
case subscribe:
channels.add(output.channel());
break;
case unsubscribe:
channels.remove(output.channel());
break;
default:
break;
}
// notify watchers, if any
for (RedisPubSubListener<V> listener : listeners) {
switch (output.type()) {
case message:
listener.message(output.channel(), output.get());
break;
case pmessage:
listener.message(output.pattern(), output.channel(), output.get());
break;
case psubscribe:
listener.psubscribed(output.pattern(), output.count());
break;
case punsubscribe:
listener.punsubscribed(output.pattern(), output.count());
break;
case subscribe:
listener.subscribed(output.channel(), output.count());
break;
case unsubscribe:
listener.unsubscribed(output.channel(), output.count());
break;
}
}
}
private CommandArgs<K, V> args(String... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
for (String key : keys) {
args.add(key.toString());
}
return args;
}
@SuppressWarnings("unchecked")
private <T> T[] toArray(Collection<T> c) {
Class<T> cls = (Class<T>) c.iterator().next().getClass();
T[] array = (T[]) Array.newInstance(cls, c.size());
return c.toArray(array);
}
}

@ -1,61 +0,0 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.
package com.lambdaworks.redis.pubsub;
/**
* Interface for redis pub/sub listeners.
*
* @param <V> Value type.
*
* @author Will Glozer
*/
public interface RedisPubSubListener<V> {
/**
* Message received from a channel subscription.
*
* @param channel Channel.
* @param message Message.
*/
void message(String channel, V message);
/**
* Message received from a pattern subscription.
*
* @param pattern Pattern.
* @param channel Channel.
* @param message Message.
*/
void message(String pattern, String channel, V message);
/**
* Subscribed to a channel.
*
* @param channel Channel
* @param count Subscription count.
*/
void subscribed(String channel, long count);
/**
* Subscribed to a pattern.
*
* @param pattern Pattern.
* @param count Subscription count.
*/
void psubscribed(String pattern, long count);
/**
* Unsubscribed from a channel.
*
* @param channel Channel
* @param count Subscription count.
*/
void unsubscribed(String channel, long count);
/**
* Unsubscribed from a pattern.
*
* @param pattern Channel
* @param count Subscription count.
*/
void punsubscribed(String pattern, long count);
}
Loading…
Cancel
Save