Redisson config rework. Master/Slave support. #24

pull/38/head
Nikita 11 years ago
parent 96c7a19a03
commit ca56f63d03

@ -97,6 +97,7 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
private int db;
private boolean closed;
private EventLoopGroup eventLoopGroup;
private RedisClient redisClient;
/**
* Initialize a new connection.
@ -107,12 +108,13 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
* @param unit Unit of time for the timeout.
* @param eventLoopGroup
*/
public RedisAsyncConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit, EventLoopGroup eventLoopGroup) {
public RedisAsyncConnection(RedisClient redisClient, BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit, EventLoopGroup eventLoopGroup) {
this.queue = queue;
this.codec = codec;
this.timeout = timeout;
this.unit = unit;
this.eventLoopGroup = eventLoopGroup;
this.redisClient = redisClient;
}
/**
@ -1171,6 +1173,10 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
return cmd.getNow();
}
public RedisClient getRedisClient() {
return redisClient;
}
@SuppressWarnings("unchecked")
protected <K, V, T> CommandOutput<K, V, T> newScriptOutput(RedisCodec<K, V> codec, ScriptOutputType type) {
switch (type) {

@ -137,7 +137,7 @@ public class RedisClient {
BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();
CommandHandler<K, V> handler = new CommandHandler<K, V>(queue);
RedisAsyncConnection<K, V> connection = new RedisAsyncConnection<K, V>(queue, codec, timeout, unit, bootstrap.group());
RedisAsyncConnection<K, V> connection = new RedisAsyncConnection<K, V>(this, queue, codec, timeout, unit, bootstrap.group());
return connect(handler, connection);
}
@ -154,7 +154,7 @@ public class RedisClient {
BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();
PubSubCommandHandler<K, V> handler = new PubSubCommandHandler<K, V>(queue, codec);
RedisPubSubConnection<K, V> connection = new RedisPubSubConnection<K, V>(queue, codec, timeout, unit, bootstrap.group());
RedisPubSubConnection<K, V> connection = new RedisPubSubConnection<K, V>(this, queue, codec, timeout, unit, bootstrap.group());
return connect(handler, connection);
}
@ -197,5 +197,7 @@ public class RedisClient {
future.awaitUninterruptibly();
bootstrap.group().shutdownGracefully().syncUninterruptibly();
}
}

@ -30,6 +30,10 @@ public class RedisConnection<K, V> {
protected long timeout;
protected TimeUnit unit;
public RedisClient getRedisClient() {
return c.getRedisClient();
}
/**
* Initialize a new connection.
*

@ -2,21 +2,27 @@
package com.lambdaworks.redis.pubsub;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
import static com.lambdaworks.redis.protocol.CommandType.PSUBSCRIBE;
import static com.lambdaworks.redis.protocol.CommandType.PUNSUBSCRIBE;
import static com.lambdaworks.redis.protocol.CommandType.SUBSCRIBE;
import static com.lambdaworks.redis.protocol.CommandType.UNSUBSCRIBE;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.*;
import java.util.Collection;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.redisson.RedisPubSubTopicListenerWrapper;
import static com.lambdaworks.redis.protocol.CommandType.*;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
/**
* An asynchronous thread-safe pub/sub connection to a redis server. After one or
@ -46,8 +52,8 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
* @param unit Unit of time for the timeout.
* @param eventLoopGroup
*/
public RedisPubSubConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit, EventLoopGroup eventLoopGroup) {
super(queue, codec, timeout, unit, eventLoopGroup);
public RedisPubSubConnection(RedisClient client, BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit, EventLoopGroup eventLoopGroup) {
super(client, queue, codec, timeout, unit, eventLoopGroup);
channels = new HashSet<K>();
patterns = new HashSet<K>();
}

@ -0,0 +1,67 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
class BaseConfig<T extends BaseConfig<T>> {
/**
* Password for Redis authentication. Should be null if not needed
*/
private String password;
/**
* Subscriptions per Redis connection limit
*/
private int subscriptionsPerConnection = 5;
BaseConfig() {
}
BaseConfig(T config) {
setPassword(config.getPassword());
setSubscriptionsPerConnection(config.getSubscriptionsPerConnection());
}
/**
* Subscriptions per Redis connection limit
* Default is 5
*
* @param subscriptionsPerConnection
*/
public T setSubscriptionsPerConnection(int subscriptionsPerConnection) {
this.subscriptionsPerConnection = subscriptionsPerConnection;
return (T) this;
}
public int getSubscriptionsPerConnection() {
return subscriptionsPerConnection;
}
/**
* Password for Redis authentication. Should be null if not needed
* Default is <code>null</code>
*
* @param password
*/
public T setPassword(String password) {
this.password = password;
return (T) this;
}
public String getPassword() {
return password;
}
}

@ -15,15 +15,8 @@
*/
package org.redisson;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.RedissonCodec;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.RoundRobinLoadBalancer;
/**
* Redisson configuration
@ -33,33 +26,15 @@ import org.redisson.connection.RoundRobinLoadBalancer;
*/
public class Config {
/**
* Сonnection load balancer to use multiple Redis servers
*/
private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
private MasterSlaveConnectionConfig masterSlaveConnectionConfig;
private SingleConnectionConfig singleConnectionConfig;
/**
* Redis key/value codec. JsonJacksonCodec used by default
*/
private RedissonCodec codec;
/**
* Subscriptions per Redis connection limit
*/
private int subscriptionsPerConnection = 5;
/**
* Redis connection pool size limit
*/
private int connectionPoolSize = 100;
/**
* Password for Redis authentication. Should be null if not needed
*/
private String password;
private List<URI> addresses = new ArrayList<URI>();
public Config() {
}
@ -70,11 +45,12 @@ public class Config {
}
setCodec(oldConf.getCodec());
setConnectionPoolSize(oldConf.getConnectionPoolSize());
setPassword(oldConf.getPassword());
setSubscriptionsPerConnection(oldConf.getSubscriptionsPerConnection());
setAddresses(oldConf.getAddresses());
setLoadBalancer(oldConf.getLoadBalancer());
if (oldConf.getSingleConnectionConfig() != null) {
setSingleConnectionConfig(new SingleConnectionConfig(oldConf.getSingleConnectionConfig()));
}
if (oldConf.getMasterSlaveConnectionConfig() != null) {
setMasterSlaveConnectionConfig(new MasterSlaveConnectionConfig(oldConf.getMasterSlaveConnectionConfig()));
}
}
/**
@ -83,87 +59,44 @@ public class Config {
* @see org.redisson.codec.JsonJacksonCodec
* @see org.redisson.codec.SerializationCodec
*/
public void setCodec(RedissonCodec codec) {
public Config setCodec(RedissonCodec codec) {
this.codec = codec;
return this;
}
public RedissonCodec getCodec() {
return codec;
}
/**
* Password for Redis authentication. Should be null if not needed
* Default is <code>null</code>
*
* @param password
*/
public void setPassword(String password) {
this.password = password;
}
public String getPassword() {
return password;
}
/**
* Subscriptions per Redis connection limit
* Default is 5
*
* @param subscriptionsPerConnection
*/
public void setSubscriptionsPerConnection(int subscriptionsPerConnection) {
this.subscriptionsPerConnection = subscriptionsPerConnection;
public SingleConnectionConfig useSingleConnection() {
if (masterSlaveConnectionConfig != null) {
throw new IllegalStateException("master/slave connection already used!");
}
if (singleConnectionConfig == null) {
singleConnectionConfig = new SingleConnectionConfig();
}
return singleConnectionConfig;
}
public int getSubscriptionsPerConnection() {
return subscriptionsPerConnection;
SingleConnectionConfig getSingleConnectionConfig() {
return singleConnectionConfig;
}
/**
* Redis connection pool size limit
* Default is 100
*
* @param connectionPoolSize
*/
public void setConnectionPoolSize(int connectionPoolSize) {
this.connectionPoolSize = connectionPoolSize;
}
public int getConnectionPoolSize() {
return connectionPoolSize;
void setSingleConnectionConfig(SingleConnectionConfig singleConnectionConfig) {
this.singleConnectionConfig = singleConnectionConfig;
}
/**
* Redis server address. Use follow format -- host:port
*
* @param addressesVar
*/
public void addAddress(String ... addressesVar) {
for (String address : addressesVar) {
try {
addresses.add(new URI("//" + address));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Can't parse " + address);
}
public MasterSlaveConnectionConfig useMasterSlaveConnection() {
if (singleConnectionConfig != null) {
throw new IllegalStateException("single connection already used!");
}
if (masterSlaveConnectionConfig == null) {
masterSlaveConnectionConfig = new MasterSlaveConnectionConfig();
}
return masterSlaveConnectionConfig;
}
public List<URI> getAddresses() {
return addresses;
}
void setAddresses(List<URI> addresses) {
this.addresses = addresses;
}
/**
* Сonnection load balancer to multiple Redis servers.
* Uses Round-robin algorithm by default
*
* @param loadBalancer
*
* @see org.redisson.connection.RoundRobinLoadBalancer
* @see org.redisson.connection.RandomLoadBalancer
*/
public void setLoadBalancer(LoadBalancer loadBalancer) {
this.loadBalancer = loadBalancer;
MasterSlaveConnectionConfig getMasterSlaveConnectionConfig() {
return masterSlaveConnectionConfig;
}
public LoadBalancer getLoadBalancer() {
return loadBalancer;
void setMasterSlaveConnectionConfig(MasterSlaveConnectionConfig masterSlaveConnectionConfig) {
this.masterSlaveConnectionConfig = masterSlaveConnectionConfig;
}
}

@ -0,0 +1,150 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.RoundRobinLoadBalancer;
public class MasterSlaveConnectionConfig extends BaseConfig<MasterSlaveConnectionConfig> {
/**
* Сonnection load balancer to use multiple Redis servers
*/
private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
private List<URI> slaveAddresses = new ArrayList<URI>();
private URI masterAddress;
/**
* Redis 'slave' servers subscription (pub/sub) connection pool size for <b>each</b> slave node
*/
private int slaveSubscriptionConnectionPoolSize = 50;
/**
* Redis 'slave' servers connection pool size for <b>each</b> slave node
*/
private int slaveConnectionPoolSize = 50;
/**
* Redis 'master' server connection pool size limit
*/
private int masterConnectionPoolSize = 100;
public MasterSlaveConnectionConfig() {
}
MasterSlaveConnectionConfig(MasterSlaveConnectionConfig config) {
super(config);
setLoadBalancer(config.getLoadBalancer());
setMasterAddress(config.getMasterAddress());
setMasterConnectionPoolSize(config.getMasterConnectionPoolSize());
setSlaveAddresses(config.getSlaveAddresses());
setSlaveConnectionPoolSize(config.getSlaveConnectionPoolSize());
setSlaveSubscriptionConnectionPoolSize(config.getSlaveSubscriptionConnectionPoolSize());
}
/**
* Set Redis master server address. Use follow format -- host:port
*
* @param masterAddress
*/
public MasterSlaveConnectionConfig setMasterAddress(String masterAddress) {
try {
this.masterAddress = new URI("//" + masterAddress);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Can't parse " + masterAddress);
}
return this;
}
public URI getMasterAddress() {
return masterAddress;
}
void setMasterAddress(URI masterAddress) {
this.masterAddress = masterAddress;
}
/**
* Add Redis slave server address. Use follow format -- host:port
*
* @param addresses
* @return
*/
public MasterSlaveConnectionConfig addSlaveAddress(String ... sAddresses) {
for (String address : sAddresses) {
try {
slaveAddresses.add(new URI("//" + address));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Can't parse " + address);
}
}
return this;
}
public List<URI> getSlaveAddresses() {
return slaveAddresses;
}
void setSlaveAddresses(List<URI> readAddresses) {
this.slaveAddresses = readAddresses;
}
public int getSlaveConnectionPoolSize() {
return slaveConnectionPoolSize;
}
public MasterSlaveConnectionConfig setSlaveConnectionPoolSize(int slaveConnectionPoolSize) {
this.slaveConnectionPoolSize = slaveConnectionPoolSize;
return this;
}
public int getMasterConnectionPoolSize() {
return masterConnectionPoolSize;
}
public MasterSlaveConnectionConfig setMasterConnectionPoolSize(int masterConnectionPoolSize) {
this.masterConnectionPoolSize = masterConnectionPoolSize;
return this;
}
/**
* Сonnection load balancer to multiple Redis servers.
* Uses Round-robin algorithm by default
*
* @param loadBalancer
* @return
*
* @see org.redisson.connection.RoundRobinLoadBalancer
* @see org.redisson.connection.BaseLoadBalancer
*/
public MasterSlaveConnectionConfig setLoadBalancer(LoadBalancer loadBalancer) {
this.loadBalancer = loadBalancer;
return this;
}
public LoadBalancer getLoadBalancer() {
return loadBalancer;
}
public int getSlaveSubscriptionConnectionPoolSize() {
return slaveSubscriptionConnectionPoolSize;
}
public void setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) {
this.slaveSubscriptionConnectionPoolSize = slaveSubscriptionConnectionPoolSize;
}
}

@ -19,6 +19,8 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RBucket;
import org.redisson.core.RCountDownLatch;
@ -68,7 +70,11 @@ public class Redisson {
Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = new ConnectionManager(configCopy);
if (configCopy.getMasterSlaveConnectionConfig() != null) {
connectionManager = new MasterSlaveConnectionManager(configCopy.getMasterSlaveConnectionConfig(), configCopy);
} else {
connectionManager = new SingleConnectionManager(configCopy.getSingleConnectionConfig(), configCopy);
}
}
/**
@ -78,7 +84,8 @@ public class Redisson {
*/
public static Redisson create() {
Config config = new Config();
config.addAddress("127.0.0.1:6379");
config.useSingleConnection().setAddress("127.0.0.1:6379");
// config.useMasterSlaveConnection().setMasterAddress("127.0.0.1:6379").addSlaveAddress("127.0.0.1:6389").addSlaveAddress("127.0.0.1:6399");
return create(config);
}
@ -241,7 +248,7 @@ public class Redisson {
try {
connection.flushdb();
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}

@ -39,7 +39,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
try {
conn.setnx(getName(), 0);
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -49,7 +49,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
try {
return conn.incrby(getName(), delta);
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -71,7 +71,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
}
}
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -81,7 +81,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
try {
return conn.decr(getName());
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -91,7 +91,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
try {
return ((Number) conn.get(getName())).longValue();
} finally {
connectionManager.release(conn);
connectionManager.releaseRead(conn);
}
}
@ -111,7 +111,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
try {
return ((Number) conn.getset(getName(), newValue)).longValue();
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -121,7 +121,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
try {
return conn.incr(getName());
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -140,7 +140,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
try {
conn.set(getName(), newValue);
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}

@ -34,28 +34,18 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
@Override
public V get() {
RedisConnection<String, V> conn = connectionManager.connectionReadOp();
try {
return conn.get(getName());
} finally {
connectionManager.release(conn);
}
return getAsync().awaitUninterruptibly().getNow();
}
@Override
public Future<V> getAsync() {
RedisConnection<String, V> conn = connectionManager.connectionReadOp();
return conn.getAsync().get(getName()).addListener(connectionManager.createReleaseListener(conn));
return conn.getAsync().get(getName()).addListener(connectionManager.createReleaseReadListener(conn));
}
@Override
public void set(V value) {
RedisConnection<String, V> conn = connectionManager.connectionWriteOp();
try {
conn.set(getName(), value);
} finally {
connectionManager.release(conn);
}
setAsync(value).awaitUninterruptibly().getNow();
}
@Override
@ -64,18 +54,13 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
Promise<Void> promise = connectionManager.getGroup().next().newPromise();
Future<String> f = connection.getAsync().set(getName(), value);
addListener(f, promise);
promise.addListener(connectionManager.createReleaseListener(connection));
promise.addListener(connectionManager.createReleaseWriteListener(connection));
return promise;
}
@Override
public void set(V value, long timeToLive, TimeUnit timeUnit) {
RedisConnection<String, V> conn = connectionManager.connectionWriteOp();
try {
conn.setex(getName(), timeUnit.toSeconds(timeToLive), value);
} finally {
connectionManager.release(conn);
}
setAsync(value, timeToLive, timeUnit).awaitUninterruptibly().getNow();
}
private void addListener(Future<String> future, final Promise<Void> promise) {
@ -100,7 +85,7 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
Promise<Void> promise = connectionManager.getGroup().next().newPromise();
Future<String> f = connection.getAsync().setex(getName(), timeUnit.toSeconds(timeToLive), value);
addListener(f, promise);
promise.addListener(connectionManager.createReleaseListener(connection));
promise.addListener(connectionManager.createReleaseWriteListener(connection));
return promise;
}

@ -147,7 +147,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
connection.del(getName());
}
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -167,7 +167,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
return val.longValue();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -188,7 +188,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
connection.publish(getChannelName(), newCountMessage);
return connection.exec().size() == 2;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}

@ -48,7 +48,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
try {
conn.lpush(getName(), e);
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -98,7 +98,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
}
return (V) list.get(0);
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -109,7 +109,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
conn.lpush(getName(), e);
return true;
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -133,7 +133,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
}
return (V) list.get(0);
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -149,7 +149,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
V value = (V) connection.rpop(getName());
return value;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -178,7 +178,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
}
return value;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}

@ -35,7 +35,7 @@ abstract class RedissonExpirable extends RedissonObject implements RExpirable {
try {
return connection.expire(getName(), timeUnit.toSeconds(timeToLive));
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -45,7 +45,7 @@ abstract class RedissonExpirable extends RedissonObject implements RExpirable {
try {
return connection.expireat(getName(), timestamp);
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -55,7 +55,7 @@ abstract class RedissonExpirable extends RedissonObject implements RExpirable {
try {
return connection.expireat(getName(), timestamp);
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -65,7 +65,7 @@ abstract class RedissonExpirable extends RedissonObject implements RExpirable {
try {
return connection.persist(getName());
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -75,7 +75,7 @@ abstract class RedissonExpirable extends RedissonObject implements RExpirable {
try {
return connection.ttl(getName());
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}

@ -58,31 +58,31 @@ public class RedissonHyperLogLog<V> extends RedissonObject implements RHyperLogL
@Override
public Future<Long> addAsync(V obj) {
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
return conn.getAsync().pfadd(getName(), obj).addListener(connectionManager.createReleaseListener(conn));
return conn.getAsync().pfadd(getName(), obj).addListener(connectionManager.createReleaseWriteListener(conn));
}
@Override
public Future<Long> addAllAsync(Collection<V> objects) {
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
return conn.getAsync().pfadd(getName(), objects.toArray()).addListener(connectionManager.createReleaseListener(conn));
return conn.getAsync().pfadd(getName(), objects.toArray()).addListener(connectionManager.createReleaseWriteListener(conn));
}
@Override
public Future<Long> countAsync() {
RedisConnection<String, Object> conn = connectionManager.connectionReadOp();
return conn.getAsync().pfcount(getName()).addListener(connectionManager.createReleaseListener(conn));
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
return conn.getAsync().pfcount(getName()).addListener(connectionManager.createReleaseWriteListener(conn));
}
@Override
public Future<Long> countWithAsync(String... otherLogNames) {
RedisConnection<String, Object> conn = connectionManager.connectionReadOp();
return conn.getAsync().pfcount(getName(), otherLogNames).addListener(connectionManager.createReleaseListener(conn));
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
return conn.getAsync().pfcount(getName(), otherLogNames).addListener(connectionManager.createReleaseWriteListener(conn));
}
@Override
public Future<Long> mergeWithAsync(String... otherLogNames) {
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
return conn.getAsync().pfmerge(getName(), otherLogNames).addListener(connectionManager.createReleaseListener(conn));
return conn.getAsync().pfmerge(getName(), otherLogNames).addListener(connectionManager.createReleaseWriteListener(conn));
}
}

@ -49,7 +49,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
try {
return connection.llen(getName()).intValue();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -95,7 +95,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
try {
return connection.lrem(getName(), count, o) > 0;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -122,7 +122,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return copy.isEmpty();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -134,7 +134,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
conn.rpush(getName(), c.toArray());
return true;
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -157,7 +157,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
}
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
} else {
return addAll(coll);
@ -177,7 +177,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
return result;
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -201,7 +201,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
try {
conn.del(getName());
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -212,7 +212,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
try {
return (V) conn.lindex(getName(), index);
} finally {
connectionManager.release(conn);
connectionManager.releaseRead(conn);
}
}
@ -253,7 +253,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
}
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -295,7 +295,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
}
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -318,7 +318,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return -1;
} finally {
connectionManager.release(conn);
connectionManager.releaseRead(conn);
}
}
@ -343,7 +343,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return -1;
} finally {
connectionManager.release(conn);
connectionManager.releaseRead(conn);
}
}
@ -442,7 +442,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
try {
return (List<V>) conn.lrange(getName(), fromIndex, toIndex - 1);
} finally {
connectionManager.release(conn);
connectionManager.releaseRead(conn);
}
}

@ -204,7 +204,7 @@ public class RedissonLock extends RedissonObject implements RLock {
}
return res;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -249,7 +249,7 @@ public class RedissonLock extends RedissonObject implements RLock {
+ id + " thread-id: " + Thread.currentThread().getId());
}
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -292,7 +292,7 @@ public class RedissonLock extends RedissonObject implements RLock {
}
}
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -305,7 +305,7 @@ public class RedissonLock extends RedissonObject implements RLock {
LockValue lock = (LockValue) connection.get(getKeyName());
return lock != null;
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -320,7 +320,7 @@ public class RedissonLock extends RedissonObject implements RLock {
LockValue lock = (LockValue) connection.get(getKeyName());
return lock != null && lock.equals(currentLock);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -338,7 +338,7 @@ public class RedissonLock extends RedissonObject implements RLock {
}
return 0;
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}

@ -53,7 +53,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
try {
return connection.hlen(getName()).intValue();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -68,7 +68,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
try {
return connection.hexists(getName(), key);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -78,7 +78,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
try {
return connection.hvals(getName()).contains(value);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -88,7 +88,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
try {
return (V) connection.hget(getName(), key);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -106,7 +106,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
}
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -124,7 +124,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
}
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -134,7 +134,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
try {
connection.hmset(getName(), (Map<Object, Object>) map);
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -144,7 +144,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
try {
connection.del(getName());
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -154,7 +154,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
try {
return (Set<K>) connection.hkeys(getName());
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -164,7 +164,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
try {
return (Collection<V>) connection.hvals(getName());
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -179,7 +179,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
return result.entrySet();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -199,7 +199,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
}
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -228,7 +228,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
}
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -251,7 +251,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
}
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -274,14 +274,14 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return null;
}
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@Override
public Future<V> getAsync(K key) {
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
return connection.getAsync().hget(getName(), key).addListener(connectionManager.createReleaseListener(connection));
return connection.getAsync().hget(getName(), key).addListener(connectionManager.createReleaseReadListener(connection));
}
@Override
@ -290,7 +290,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
Promise<V> promise = connectionManager.getGroup().next().newPromise();
RedisAsyncConnection<Object, V> async = connection.getAsync();
putAsync(key, value, promise, async);
promise.addListener(connectionManager.createReleaseListener(connection));
promise.addListener(connectionManager.createReleaseReadListener(connection));
return promise;
}
@ -376,7 +376,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
Promise<V> promise = connectionManager.getGroup().next().newPromise();
RedisAsyncConnection<Object, V> async = connection.getAsync();
removeAsync(key, promise, async);
promise.addListener(connectionManager.createReleaseListener(connection));
promise.addListener(connectionManager.createReleaseWriteListener(connection));
return promise;
}

@ -55,7 +55,7 @@ abstract class RedissonObject implements RObject {
try {
connection.del(getName());
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}

@ -49,7 +49,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
}
return value;
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -62,7 +62,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
}
return value;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -77,7 +77,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
try {
return (V) connection.lpop(getName());
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}

@ -43,7 +43,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
try {
return connection.scard(getName()).intValue();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -58,7 +58,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
try {
return connection.sismember(getName(), o);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -102,7 +102,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
};
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -113,7 +113,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
try {
return connection.smembers(getName()).toArray();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -123,7 +123,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
try {
return connection.smembers(getName()).toArray(a);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -133,7 +133,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
try {
return connection.sadd(getName(), e) > 0;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -143,7 +143,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
try {
return connection.srem(getName(), o) > 0;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -163,7 +163,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
try {
return connection.sadd(getName(), c.toArray()) > 0;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -185,7 +185,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
try {
return connection.srem(getName(), c.toArray()) > 0;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -195,7 +195,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
try {
connection.del(getName());
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}

@ -125,7 +125,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
try {
conn.setnx(getCurrentVersionKey(), 0L);
} finally {
connectionManager.release(conn);
connectionManager.releaseWrite(conn);
}
}
@ -136,7 +136,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -187,7 +187,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
try {
return size(connection);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -206,7 +206,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
try {
return binarySearch((V)o, connection).getIndex() >= 0;
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -216,7 +216,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
try {
startScore = getScoreAtIndex(0, connection);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
return iterator(startScore, Double.MAX_VALUE);
@ -236,7 +236,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
Long remains = connection.zcount(getName(), currentScore, endScore);
return remains > 0;
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -275,7 +275,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
return value;
// }
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -298,7 +298,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
try {
return connection.zrange(getName(), 0, -1).toArray();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -308,7 +308,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
try {
return connection.zrange(getName(), 0, -1).toArray(a);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -326,7 +326,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
try {
return add(value, connection);
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -465,7 +465,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
try {
return remove(value, connection);
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -535,7 +535,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
try {
connection.del(getName());
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -569,12 +569,13 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
return res.getValue();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@Override
public V last() {
// TODO refactor to -inf +inf
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
try {
BinarySearchResult<V> res = getAtIndex(-1, connection);
@ -583,7 +584,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
return res.getValue();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -619,7 +620,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
return false;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}

@ -61,7 +61,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
return connection.zcount(redissonSortedSet.getName(), headScore, tailScore).intValue();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -94,7 +94,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
BinarySearchResult<V> res = redissonSortedSet.binarySearch((V)o, connection);
return res.getScore() < tailScore && res.getScore() > headScore;
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -106,7 +106,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
double tailScore = getTailScore(connection);
return redissonSortedSet.iterator(headScore, tailScore);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -118,7 +118,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
double tailScore = getTailScore(connection);
return connection.zrangebyscore(redissonSortedSet.getName(), headScore, tailScore).toArray();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -130,7 +130,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
double tailScore = getTailScore(connection);
return connection.zrangebyscore(redissonSortedSet.getName(), headScore, tailScore).toArray(a);
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -152,7 +152,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
}
return false;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -169,7 +169,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
}
return false;
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -226,7 +226,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
double tailScore = getTailScore(connection);
connection.zremrangebyscore(redissonSortedSet.getName(), headScore, tailScore);
} finally {
connectionManager.release(connection);
connectionManager.releaseWrite(connection);
}
}
@ -277,7 +277,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
}
return redissonSortedSet.first();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}
@ -301,7 +301,7 @@ class RedissonSubSortedSet<V> implements SortedSet<V> {
}
return redissonSortedSet.last();
} finally {
connectionManager.release(connection);
connectionManager.releaseRead(connection);
}
}

@ -45,7 +45,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
@Override
public Future<Long> publishAsync(M message) {
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
return conn.getAsync().publish(getName(), message).addListener(connectionManager.createReleaseListener(conn));
return conn.getAsync().publish(getName(), message).addListener(connectionManager.createReleaseWriteListener(conn));
}
@Override

@ -0,0 +1,77 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.URI;
import java.net.URISyntaxException;
public class SingleConnectionConfig extends BaseConfig<SingleConnectionConfig> {
/**
* Redis server address
*
*/
private URI address;
/**
* Redis connection pool size limit
*/
private int connectionPoolSize = 100;
SingleConnectionConfig() {
}
public SingleConnectionConfig(SingleConnectionConfig config) {
super(config);
setAddress(config.getAddress());
setConnectionPoolSize(config.getConnectionPoolSize());
}
/**
* Redis connection pool size limit
* Default is 100
*
* @param connectionPoolSize
*/
public SingleConnectionConfig setConnectionPoolSize(int connectionPoolSize) {
this.connectionPoolSize = connectionPoolSize;
return this;
}
public int getConnectionPoolSize() {
return connectionPoolSize;
}
/**
* Set server address. Use follow format -- host:port
*
* @param address
*/
public SingleConnectionConfig setAddress(String address) {
try {
this.address = new URI("//" + address);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Can't parse " + address);
}
return this;
}
public URI getAddress() {
return address;
}
public void setAddress(URI address) {
this.address = address;
}
}

@ -0,0 +1,117 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
abstract class BaseLoadBalancer implements LoadBalancer {
private final Logger log = LoggerFactory.getLogger(getClass());
private RedisCodec codec;
List<ConnectionEntry> clients;
public void init(List<ConnectionEntry> clients, RedisCodec codec) {
this.clients = clients;
this.codec = codec;
}
public RedisPubSubConnection nextPubSubConnection() {
List<ConnectionEntry> clientsCopy = new ArrayList<ConnectionEntry>(clients);
while (true) {
if (clientsCopy.isEmpty()) {
log.warn("Slave connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
// TODO wait
//entry.getPoolSize().acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Connection acquired, time spended: {} ms", endTime);
}
int index = getIndex(clientsCopy);
ConnectionEntry entry = clientsCopy.get(index);
if (!entry.getSubscribeConnectionsSemaphore().tryAcquire()) {
clientsCopy.remove(index);
} else {
RedisPubSubConnection conn = entry.getSubscribeConnections().poll();
if (conn != null) {
return conn;
}
return entry.getClient().connectPubSub(codec);
}
}
}
public RedisConnection nextConnection() {
List<ConnectionEntry> clientsCopy = new ArrayList<ConnectionEntry>(clients);
while (true) {
if (clientsCopy.isEmpty()) {
log.warn("Slave connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
// TODO wait
//entry.getPoolSize().acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Connection acquired, time spended: {} ms", endTime);
}
int index = getIndex(clientsCopy);
ConnectionEntry entry = clientsCopy.get(index);
if (!entry.getConnectionsSemaphore().tryAcquire()) {
clientsCopy.remove(index);
} else {
RedisConnection conn = entry.getConnections().poll();
if (conn != null) {
return conn;
}
return entry.getClient().connect(codec);
}
}
}
abstract int getIndex(List<ConnectionEntry> clientsCopy);
public void returnSubscribeConnection(RedisPubSubConnection connection) {
for (ConnectionEntry entry : clients) {
if (entry.getClient().equals(connection.getRedisClient())) {
entry.getSubscribeConnections().add(connection);
entry.getSubscribeConnectionsSemaphore().release();
break;
}
}
}
public void returnConnection(RedisConnection connection) {
for (ConnectionEntry entry : clients) {
if (entry.getClient().equals(connection.getRedisClient())) {
entry.getConnections().add(connection);
entry.getConnectionsSemaphore().release();
break;
}
}
}
}

@ -0,0 +1,63 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public class ConnectionEntry {
private final RedisClient client;
private final Semaphore subscribeConnectionsSemaphore;
private final Semaphore connectionsSemaphore;
private final Queue<RedisPubSubConnection> subscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
public ConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize) {
super();
this.client = client;
this.connectionsSemaphore = new Semaphore(poolSize);
this.subscribeConnectionsSemaphore = new Semaphore(subscribePoolSize);
}
public RedisClient getClient() {
return client;
}
public Semaphore getSubscribeConnectionsSemaphore() {
return subscribeConnectionsSemaphore;
}
public Semaphore getConnectionsSemaphore() {
return connectionsSemaphore;
}
public Queue<RedisPubSubConnection> getSubscribeConnections() {
return subscribeConnections;
}
public Queue<RedisConnection> getConnections() {
return connections;
}
}

@ -45,172 +45,30 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
*
*/
//TODO ping support
public class ConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final EventLoopGroup group = new NioEventLoopGroup();
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final Queue<PubSubConnectionEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
private final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
private final List<RedisClient> clients = new ArrayList<RedisClient>();
private final Semaphore activeConnections;
private final RedisCodec codec;
private final Config config;
private final LoadBalancer balancer;
public ConnectionManager(Config config) {
for (URI address : config.getAddresses()) {
RedisClient client = new RedisClient(group, address.getHost(), address.getPort());
clients.add(client);
}
balancer = config.getLoadBalancer();
balancer.init(clients);
codec = new RedisCodecWrapper(config.getCodec());
activeConnections = new Semaphore(config.getConnectionPoolSize());
this.config = config;
}
public <T> FutureListener<T> createReleaseListener(final RedisConnection conn) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
release(conn);
}
};
}
public <K, V> RedisConnection<K, V> connectionWriteOp() {
return connectionReadOp();
}
public <K, V> RedisConnection<K, V> connectionReadOp() {
acquireConnection();
RedisConnection<K, V> conn = connections.poll();
if (conn == null) {
conn = balancer.nextClient().connect(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
}
return conn;
}
public PubSubConnectionEntry getEntry(String channelName) {
return name2PubSubConnection.get(channelName);
}
public <K, V> PubSubConnectionEntry subscribe(String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
}
for (PubSubConnectionEntry entry : pubSubConnections) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
}
entry.subscribe(channelName);
return entry;
}
}
acquireConnection();
RedisPubSubConnection<K, V> conn = balancer.nextClient().connectPubSub(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
return oldEntry;
}
entry.subscribe(channelName);
pubSubConnections.add(entry);
return entry;
}
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
}
for (PubSubConnectionEntry entry : pubSubConnections) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
}
entry.subscribe(listener, channelName);
return entry;
}
}
acquireConnection();
RedisPubSubConnection<K, V> conn = balancer.nextClient().connectPubSub(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
return oldEntry;
}
entry.subscribe(listener, channelName);
pubSubConnections.add(entry);
return entry;
}
private void acquireConnection() {
if (!activeConnections.tryAcquire()) {
log.warn("Connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
activeConnections.acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Connection acquired, time spended: {} ms", endTime);
}
}
public void unsubscribe(PubSubConnectionEntry entry, String channelName) {
if (entry.hasListeners(channelName)) {
return;
}
name2PubSubConnection.remove(channelName);
entry.unsubscribe(channelName);
log.debug("unsubscribed from '{}' channel", channelName);
if (entry.tryClose()) {
pubSubConnections.remove(entry);
activeConnections.release();
}
}
public void release(RedisConnection сonnection) {
connections.add(сonnection);
activeConnections.release();
}
public void shutdown() {
for (RedisClient client : clients) {
client.shutdown();
}
}
public EventLoopGroup getGroup() {
return group;
}
public interface ConnectionManager {
<T> FutureListener<T> createReleaseWriteListener(final RedisConnection conn);
<T> FutureListener<T> createReleaseReadListener(final RedisConnection conn);
<K, V> RedisConnection<K, V> connectionWriteOp();
<K, V> RedisConnection<K, V> connectionReadOp();
PubSubConnectionEntry getEntry(String channelName);
<K, V> PubSubConnectionEntry subscribe(String channelName);
<K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, String channelName);
void unsubscribe(PubSubConnectionEntry entry, String channelName);
void releaseWrite(RedisConnection сonnection);
void releaseRead(RedisConnection сonnection);
void shutdown();
EventLoopGroup getGroup();
}

@ -17,12 +17,20 @@ package org.redisson.connection;
import java.util.List;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public interface LoadBalancer {
void init(List<RedisClient> clients);
void init(List<ConnectionEntry> clients, RedisCodec codec);
RedisClient nextClient();
RedisConnection nextConnection();
RedisPubSubConnection nextPubSubConnection();
void returnConnection(RedisConnection connection);
void returnSubscribeConnection(RedisPubSubConnection connection);
}

@ -0,0 +1,252 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.FutureListener;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import org.redisson.Config;
import org.redisson.MasterSlaveConnectionConfig;
import org.redisson.codec.RedisCodecWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
/**
*
* @author Nikita Koksharov
*
*/
//TODO ping support
public class MasterSlaveConnectionManager implements ConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private RedisCodec codec;
private final EventLoopGroup group = new NioEventLoopGroup();
private final List<ConnectionEntry> slaveConnections = new ArrayList<ConnectionEntry>();
private final Queue<RedisConnection> masterConnections = new ConcurrentLinkedQueue<RedisConnection>();
private final Queue<PubSubConnectionEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
private final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
private final List<RedisClient> slaveClients = new ArrayList<RedisClient>();
private RedisClient masterClient;
private Semaphore masterConnectionsSemaphore;
private MasterSlaveConnectionConfig config;
private LoadBalancer balancer;
MasterSlaveConnectionManager() {
}
public MasterSlaveConnectionManager(MasterSlaveConnectionConfig cfg, Config config) {
init(cfg, config);
}
void init(MasterSlaveConnectionConfig config, Config cfg) {
this.config = config;
for (URI address : this.config.getSlaveAddresses()) {
RedisClient client = new RedisClient(group, address.getHost(), address.getPort());
slaveClients.add(client);
slaveConnections.add(new ConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()));
}
masterClient = new RedisClient(group, this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort());
codec = new RedisCodecWrapper(cfg.getCodec());
balancer = config.getLoadBalancer();
balancer.init(slaveConnections, codec);
masterConnectionsSemaphore = new Semaphore(this.config.getMasterConnectionPoolSize());
}
public <T> FutureListener<T> createReleaseWriteListener(final RedisConnection conn) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
releaseWrite(conn);
}
};
}
public <T> FutureListener<T> createReleaseReadListener(final RedisConnection conn) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
releaseRead(conn);
}
};
}
public <K, V> RedisConnection<K, V> connectionWriteOp() {
acquireMasterConnection();
RedisConnection<K, V> conn = masterConnections.poll();
if (conn == null) {
conn = masterClient.connect(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
}
return conn;
}
public <K, V> RedisConnection<K, V> connectionReadOp() {
return balancer.nextConnection();
}
public PubSubConnectionEntry getEntry(String channelName) {
return name2PubSubConnection.get(channelName);
}
public <K, V> PubSubConnectionEntry subscribe(String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
}
for (PubSubConnectionEntry entry : pubSubConnections) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
}
entry.subscribe(channelName);
return entry;
}
}
acquireMasterConnection();
RedisPubSubConnection<K, V> conn = balancer.nextPubSubConnection();
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
return oldEntry;
}
entry.subscribe(channelName);
pubSubConnections.add(entry);
return entry;
}
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
}
for (PubSubConnectionEntry entry : pubSubConnections) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
}
entry.subscribe(listener, channelName);
return entry;
}
}
acquireMasterConnection();
RedisPubSubConnection<K, V> conn = balancer.nextPubSubConnection();
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
return oldEntry;
}
entry.subscribe(listener, channelName);
pubSubConnections.add(entry);
return entry;
}
void acquireMasterConnection() {
if (!masterConnectionsSemaphore.tryAcquire()) {
log.warn("Master connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
masterConnectionsSemaphore.acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Master connection acquired, time spended: {} ms", endTime);
}
}
void releaseMasterConnection() {
masterConnectionsSemaphore.release();
}
public void unsubscribe(PubSubConnectionEntry entry, String channelName) {
if (entry.hasListeners(channelName)) {
return;
}
name2PubSubConnection.remove(channelName);
entry.unsubscribe(channelName);
log.debug("unsubscribed from '{}' channel", channelName);
if (entry.tryClose()) {
pubSubConnections.remove(entry);
balancer.returnSubscribeConnection(entry.getConnection());
}
}
public void releaseWrite(RedisConnection сonnection) {
masterConnections.add(сonnection);
releaseMasterConnection();
}
public void releaseRead(RedisConnection сonnection) {
balancer.returnConnection(сonnection);
}
public void shutdown() {
for (RedisClient client : slaveClients) {
client.shutdown();
}
}
public EventLoopGroup getGroup() {
return group;
}
}

@ -117,4 +117,8 @@ public class PubSubConnectionEntry {
return false;
}
public RedisPubSubConnection getConnection() {
return conn;
}
}

@ -1,37 +0,0 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.util.List;
import java.util.Random;
import com.lambdaworks.redis.RedisClient;
public class RandomLoadBalancer implements LoadBalancer {
private final Random random = new Random();
private List<RedisClient> clients;
public void init(List<RedisClient> clients) {
this.clients = clients;
}
public RedisClient nextClient() {
return clients.get(random.nextInt(clients.size()));
}
}

@ -18,23 +18,13 @@ package org.redisson.connection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import com.lambdaworks.redis.RedisClient;
public class RoundRobinLoadBalancer implements LoadBalancer {
public class RoundRobinLoadBalancer extends BaseLoadBalancer {
private final AtomicInteger index = new AtomicInteger(-1);
private List<RedisClient> clients;
@Override
public void init(List<RedisClient> clients) {
this.clients = clients;
}
@Override
public RedisClient nextClient() {
int ind = Math.abs(index.incrementAndGet() % clients.size());
return clients.get(ind);
int getIndex(List<ConnectionEntry> clientsCopy) {
return Math.abs(index.incrementAndGet() % clients.size());
}
}

@ -0,0 +1,64 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.util.concurrent.Semaphore;
import org.redisson.Config;
import org.redisson.MasterSlaveConnectionConfig;
import org.redisson.SingleConnectionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingleConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private Semaphore connections;
public SingleConnectionManager(SingleConnectionConfig cfg, Config config) {
MasterSlaveConnectionConfig newconfig = new MasterSlaveConnectionConfig();
String addr = cfg.getAddress().getHost() + ":" + cfg.getAddress().getPort();
newconfig.setMasterAddress(addr);
newconfig.addSlaveAddress(addr);
init(newconfig, config);
connections = new Semaphore(cfg.getConnectionPoolSize());
}
void acquireMasterConnection() {
if (!connections.tryAcquire()) {
log.warn("Master connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
connections.acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Connection acquired, time spended: {} ms", endTime);
}
}
void releaseMasterConnection() {
connections.release();
}
void acquireSlaveConnection() {
acquireMasterConnection();
}
void releaseSlaveConnection() {
releaseMasterConnection();
}
}

@ -1,7 +1,5 @@
package org.redisson;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RAtomicLong;

Loading…
Cancel
Save