LoadBalancer refactored

pull/297/head
Nikita 9 years ago
parent 7d971e9a52
commit b0c79a9fa0

@ -15,43 +15,10 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.connection.ConnectionEntry.FreezeReason;
import io.netty.util.concurrent.Future;
public interface LoadBalancer {
SubscribesConnectionEntry getEntry(List<SubscribesConnectionEntry> clientsCopy);
Future<RedisConnection> getConnection(InetSocketAddress addr);
int getAvailableClients();
void shutdownAsync();
void shutdown();
boolean unfreeze(String host, int port, FreezeReason freezeReason);
Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason);
void init(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry);
void add(SubscribesConnectionEntry entry);
Future<RedisConnection> nextConnection();
Future<RedisPubSubConnection> nextPubSubConnection();
void returnConnection(RedisConnection connection);
void returnSubscribeConnection(RedisPubSubConnection connection);
}

@ -0,0 +1,51 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Collection;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.connection.ConnectionEntry.FreezeReason;
import io.netty.util.concurrent.Future;
public interface LoadBalancerManager {
Future<RedisConnection> getConnection(InetSocketAddress addr);
int getAvailableClients();
void shutdownAsync();
void shutdown();
boolean unfreeze(String host, int port, FreezeReason freezeReason);
Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason);
void add(SubscribesConnectionEntry entry);
Future<RedisConnection> nextConnection();
Future<RedisPubSubConnection> nextPubSubConnection();
void returnConnection(RedisConnection connection);
void returnSubscribeConnection(RedisPubSubConnection connection);
}

@ -35,21 +35,19 @@ import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.PlatformDependent;
abstract class BaseLoadBalancer implements LoadBalancer {
public class LoadBalancerManagerImpl implements LoadBalancerManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private ConnectionManager connectionManager;
final Map<InetSocketAddress, SubscribesConnectionEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, SubscribesConnectionEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPoll pubSubEntries;
private final ConnectionPool<RedisConnection> entries;
PubSubConnectionPoll pubSubEntries;
ConnectionPool<RedisConnection> entries;
public void init(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
public LoadBalancerManagerImpl(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager;
entries = new ConnectionPool<RedisConnection>(config, this, connectionManager, entry);
pubSubEntries = new PubSubConnectionPoll(config, this, connectionManager, entry);
entries = new ConnectionPool<RedisConnection>(config, connectionManager, entry);
pubSubEntries = new PubSubConnectionPoll(config, connectionManager, entry);
}
public synchronized void add(SubscribesConnectionEntry entry) {

@ -28,7 +28,7 @@ import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionEntry.FreezeReason;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.misc.ConnectionPool;
import org.redisson.misc.MasterConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,7 +44,7 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
final Logger log = LoggerFactory.getLogger(getClass());
LoadBalancer slaveBalancer;
LoadBalancerManager slaveBalancer;
SubscribesConnectionEntry masterEntry;
final ConnectionListener connectListener;
@ -52,7 +52,7 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
final MasterSlaveServersConfig config;
final ConnectionManager connectionManager;
final ConnectionPool<RedisConnection> writeConnectionHolder;
final MasterConnectionPool writeConnectionHolder;
final Set<ClusterSlotRange> slotRanges;
final AtomicBoolean active = new AtomicBoolean(true);
@ -63,12 +63,11 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
this.config = config;
this.connectListener = connectListener;
slaveBalancer = config.getLoadBalancer();
slaveBalancer.init(config, connectionManager, this);
slaveBalancer = new LoadBalancerManagerImpl(config, connectionManager, this);
initSlaveBalancer(config);
writeConnectionHolder = new ConnectionPool<RedisConnection>(config, null, connectionManager, this);
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
}
protected void initSlaveBalancer(MasterSlaveServersConfig config) {

@ -19,7 +19,7 @@ import java.security.SecureRandom;
import java.util.List;
import java.util.Random;
public class RandomLoadBalancer extends BaseLoadBalancer {
public class RandomLoadBalancer implements LoadBalancer {
private final Random random = new SecureRandom();

@ -18,7 +18,7 @@ package org.redisson.connection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class RoundRobinLoadBalancer extends BaseLoadBalancer {
public class RoundRobinLoadBalancer implements LoadBalancer {
private final AtomicInteger index = new AtomicInteger(-1);

@ -35,7 +35,11 @@ public class SingleEntry extends MasterSlaveEntry<SubscribesConnectionEntry> {
public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config, ConnectionListener connectListener) {
super(slotRanges, connectionManager, config, connectListener);
pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager, this);
pubSubConnectionHolder = new PubSubConnectionPoll(config, connectionManager, this) {
protected SubscribesConnectionEntry getEntry() {
return entries.get(0);
}
};
}
@Override

@ -30,7 +30,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.FreezeReason;
import org.redisson.connection.ConnectionEntry.NodeType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SubscribesConnectionEntry;
@ -42,7 +41,7 @@ import io.netty.util.concurrent.Promise;
public class ConnectionPool<T extends RedisConnection> {
final List<SubscribesConnectionEntry> entries = new CopyOnWriteArrayList<SubscribesConnectionEntry>();
protected final List<SubscribesConnectionEntry> entries = new CopyOnWriteArrayList<SubscribesConnectionEntry>();
final Deque<Promise<T>> promises = new LinkedBlockingDeque<Promise<T>>();
@ -50,14 +49,10 @@ public class ConnectionPool<T extends RedisConnection> {
final MasterSlaveServersConfig config;
final LoadBalancer loadBalancer;
final MasterSlaveEntry masterSlaveEntry;
public ConnectionPool(MasterSlaveServersConfig config, LoadBalancer loadBalancer,
ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
public ConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
this.config = config;
this.loadBalancer = loadBalancer;
this.masterSlaveEntry = masterSlaveEntry;
this.connectionManager = connectionManager;
@ -117,26 +112,20 @@ public class ConnectionPool<T extends RedisConnection> {
}
protected int getMinimumIdleSize(SubscribesConnectionEntry entry) {
int minimumIdleSize = config.getSlaveConnectionMinimumIdleSize();
// is it a master connection pool?
if (entry.getNodeType() == NodeType.MASTER && loadBalancer == null) {
minimumIdleSize = config.getMasterConnectionMinimumIdleSize();
}
return minimumIdleSize;
return config.getSlaveConnectionMinimumIdleSize();
}
public void remove(SubscribesConnectionEntry entry) {
entries.remove(entry);
}
protected SubscribesConnectionEntry getEntry() {
return config.getLoadBalancer().getEntry(entries);
}
public Future<T> get() {
for (int j = entries.size() - 1; j >= 0; j--) {
SubscribesConnectionEntry entry;
if (ConnectionPool.this.loadBalancer != null) {
entry = ConnectionPool.this.loadBalancer.getEntry(entries);
} else {
entry = entries.get(0);
}
SubscribesConnectionEntry entry = getEntry();
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
Promise<T> promise = connectionManager.newPromise();
connect(entry, promise);

@ -0,0 +1,41 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SubscribesConnectionEntry;
public class MasterConnectionPool extends ConnectionPool<RedisConnection> {
public MasterConnectionPool(MasterSlaveServersConfig config,
ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
super(config, connectionManager, masterSlaveEntry);
}
@Override
protected SubscribesConnectionEntry getEntry() {
return entries.get(0);
}
@Override
protected int getMinimumIdleSize(SubscribesConnectionEntry entry) {
return config.getMasterConnectionMinimumIdleSize();
}
}

@ -18,7 +18,6 @@ package org.redisson.misc;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SubscribesConnectionEntry;
@ -26,9 +25,8 @@ import io.netty.util.concurrent.Future;
public class PubSubConnectionPoll extends ConnectionPool<RedisPubSubConnection> {
public PubSubConnectionPoll(MasterSlaveServersConfig config,
LoadBalancer loadBalancer, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
super(config, loadBalancer, connectionManager, masterSlaveEntry);
public PubSubConnectionPoll(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
super(config, connectionManager, masterSlaveEntry);
}
@Override

Loading…
Cancel
Save