From b0c79a9fa0d2b2c71fc7b5166d406a986fe165c2 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 20 Nov 2015 17:11:50 +0300 Subject: [PATCH] LoadBalancer refactored --- .../org/redisson/connection/LoadBalancer.java | 33 ------------ .../connection/LoadBalancerManager.java | 51 +++++++++++++++++++ ...ncer.java => LoadBalancerManagerImpl.java} | 18 +++---- .../redisson/connection/MasterSlaveEntry.java | 11 ++-- .../connection/RandomLoadBalancer.java | 2 +- .../connection/RoundRobinLoadBalancer.java | 2 +- .../org/redisson/connection/SingleEntry.java | 6 ++- .../org/redisson/misc/ConnectionPool.java | 27 +++------- .../redisson/misc/MasterConnectionPool.java | 41 +++++++++++++++ .../redisson/misc/PubSubConnectionPoll.java | 6 +-- 10 files changed, 122 insertions(+), 75 deletions(-) create mode 100644 src/main/java/org/redisson/connection/LoadBalancerManager.java rename src/main/java/org/redisson/connection/{BaseLoadBalancer.java => LoadBalancerManagerImpl.java} (89%) create mode 100644 src/main/java/org/redisson/misc/MasterConnectionPool.java diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index 83ba30466..9673e05f5 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -15,43 +15,10 @@ */ package org.redisson.connection; -import java.net.InetSocketAddress; -import java.util.Collection; import java.util.List; -import org.redisson.MasterSlaveServersConfig; -import org.redisson.client.RedisConnection; -import org.redisson.client.RedisPubSubConnection; -import org.redisson.connection.ConnectionEntry.FreezeReason; - -import io.netty.util.concurrent.Future; - public interface LoadBalancer { SubscribesConnectionEntry getEntry(List clientsCopy); - Future getConnection(InetSocketAddress addr); - - int getAvailableClients(); - - void shutdownAsync(); - - void shutdown(); - - boolean unfreeze(String host, int port, FreezeReason freezeReason); - - Collection freeze(String host, int port, FreezeReason freezeReason); - - void init(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry); - - void add(SubscribesConnectionEntry entry); - - Future nextConnection(); - - Future nextPubSubConnection(); - - void returnConnection(RedisConnection connection); - - void returnSubscribeConnection(RedisPubSubConnection connection); - } diff --git a/src/main/java/org/redisson/connection/LoadBalancerManager.java b/src/main/java/org/redisson/connection/LoadBalancerManager.java new file mode 100644 index 000000000..0f26af099 --- /dev/null +++ b/src/main/java/org/redisson/connection/LoadBalancerManager.java @@ -0,0 +1,51 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.connection; + +import java.net.InetSocketAddress; +import java.util.Collection; + +import org.redisson.client.RedisConnection; +import org.redisson.client.RedisPubSubConnection; +import org.redisson.connection.ConnectionEntry.FreezeReason; + +import io.netty.util.concurrent.Future; + +public interface LoadBalancerManager { + + Future getConnection(InetSocketAddress addr); + + int getAvailableClients(); + + void shutdownAsync(); + + void shutdown(); + + boolean unfreeze(String host, int port, FreezeReason freezeReason); + + Collection freeze(String host, int port, FreezeReason freezeReason); + + void add(SubscribesConnectionEntry entry); + + Future nextConnection(); + + Future nextPubSubConnection(); + + void returnConnection(RedisConnection connection); + + void returnSubscribeConnection(RedisPubSubConnection connection); + +} diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancerManagerImpl.java similarity index 89% rename from src/main/java/org/redisson/connection/BaseLoadBalancer.java rename to src/main/java/org/redisson/connection/LoadBalancerManagerImpl.java index edead0ce7..4a7ebac1a 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancerManagerImpl.java @@ -35,21 +35,19 @@ import org.slf4j.LoggerFactory; import io.netty.util.concurrent.Future; import io.netty.util.internal.PlatformDependent; -abstract class BaseLoadBalancer implements LoadBalancer { +public class LoadBalancerManagerImpl implements LoadBalancerManager { private final Logger log = LoggerFactory.getLogger(getClass()); - private ConnectionManager connectionManager; - final Map addr2Entry = PlatformDependent.newConcurrentHashMap(); + private final ConnectionManager connectionManager; + private final Map addr2Entry = PlatformDependent.newConcurrentHashMap(); + private final PubSubConnectionPoll pubSubEntries; + private final ConnectionPool entries; - PubSubConnectionPoll pubSubEntries; - - ConnectionPool entries; - - public void init(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { + public LoadBalancerManagerImpl(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { this.connectionManager = connectionManager; - entries = new ConnectionPool(config, this, connectionManager, entry); - pubSubEntries = new PubSubConnectionPoll(config, this, connectionManager, entry); + entries = new ConnectionPool(config, connectionManager, entry); + pubSubEntries = new PubSubConnectionPoll(config, connectionManager, entry); } public synchronized void add(SubscribesConnectionEntry entry) { diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 2b584b687..31ad2bb95 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -28,7 +28,7 @@ import org.redisson.client.RedisPubSubConnection; import org.redisson.cluster.ClusterSlotRange; import org.redisson.connection.ConnectionEntry.FreezeReason; import org.redisson.connection.ConnectionEntry.NodeType; -import org.redisson.misc.ConnectionPool; +import org.redisson.misc.MasterConnectionPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +44,7 @@ public class MasterSlaveEntry { final Logger log = LoggerFactory.getLogger(getClass()); - LoadBalancer slaveBalancer; + LoadBalancerManager slaveBalancer; SubscribesConnectionEntry masterEntry; final ConnectionListener connectListener; @@ -52,7 +52,7 @@ public class MasterSlaveEntry { final MasterSlaveServersConfig config; final ConnectionManager connectionManager; - final ConnectionPool writeConnectionHolder; + final MasterConnectionPool writeConnectionHolder; final Set slotRanges; final AtomicBoolean active = new AtomicBoolean(true); @@ -63,12 +63,11 @@ public class MasterSlaveEntry { this.config = config; this.connectListener = connectListener; - slaveBalancer = config.getLoadBalancer(); - slaveBalancer.init(config, connectionManager, this); + slaveBalancer = new LoadBalancerManagerImpl(config, connectionManager, this); initSlaveBalancer(config); - writeConnectionHolder = new ConnectionPool(config, null, connectionManager, this); + writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this); } protected void initSlaveBalancer(MasterSlaveServersConfig config) { diff --git a/src/main/java/org/redisson/connection/RandomLoadBalancer.java b/src/main/java/org/redisson/connection/RandomLoadBalancer.java index 47ce7c144..edb36a407 100644 --- a/src/main/java/org/redisson/connection/RandomLoadBalancer.java +++ b/src/main/java/org/redisson/connection/RandomLoadBalancer.java @@ -19,7 +19,7 @@ import java.security.SecureRandom; import java.util.List; import java.util.Random; -public class RandomLoadBalancer extends BaseLoadBalancer { +public class RandomLoadBalancer implements LoadBalancer { private final Random random = new SecureRandom(); diff --git a/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java b/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java index 6bcec102a..6a6cc6d95 100644 --- a/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java +++ b/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java @@ -18,7 +18,7 @@ package org.redisson.connection; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -public class RoundRobinLoadBalancer extends BaseLoadBalancer { +public class RoundRobinLoadBalancer implements LoadBalancer { private final AtomicInteger index = new AtomicInteger(-1); diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 3bac89d68..123ae93c2 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -35,7 +35,11 @@ public class SingleEntry extends MasterSlaveEntry { public SingleEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config, ConnectionListener connectListener) { super(slotRanges, connectionManager, config, connectListener); - pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager, this); + pubSubConnectionHolder = new PubSubConnectionPoll(config, connectionManager, this) { + protected SubscribesConnectionEntry getEntry() { + return entries.get(0); + } + }; } @Override diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index 35c0dfbca..2420daea5 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -30,7 +30,6 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.ConnectionEntry.FreezeReason; import org.redisson.connection.ConnectionEntry.NodeType; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.LoadBalancer; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.SubscribesConnectionEntry; @@ -42,7 +41,7 @@ import io.netty.util.concurrent.Promise; public class ConnectionPool { - final List entries = new CopyOnWriteArrayList(); + protected final List entries = new CopyOnWriteArrayList(); final Deque> promises = new LinkedBlockingDeque>(); @@ -50,14 +49,10 @@ public class ConnectionPool { final MasterSlaveServersConfig config; - final LoadBalancer loadBalancer; - final MasterSlaveEntry masterSlaveEntry; - public ConnectionPool(MasterSlaveServersConfig config, LoadBalancer loadBalancer, - ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { + public ConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { this.config = config; - this.loadBalancer = loadBalancer; this.masterSlaveEntry = masterSlaveEntry; this.connectionManager = connectionManager; @@ -117,26 +112,20 @@ public class ConnectionPool { } protected int getMinimumIdleSize(SubscribesConnectionEntry entry) { - int minimumIdleSize = config.getSlaveConnectionMinimumIdleSize(); - // is it a master connection pool? - if (entry.getNodeType() == NodeType.MASTER && loadBalancer == null) { - minimumIdleSize = config.getMasterConnectionMinimumIdleSize(); - } - return minimumIdleSize; + return config.getSlaveConnectionMinimumIdleSize(); } public void remove(SubscribesConnectionEntry entry) { entries.remove(entry); } + protected SubscribesConnectionEntry getEntry() { + return config.getLoadBalancer().getEntry(entries); + } + public Future get() { for (int j = entries.size() - 1; j >= 0; j--) { - SubscribesConnectionEntry entry; - if (ConnectionPool.this.loadBalancer != null) { - entry = ConnectionPool.this.loadBalancer.getEntry(entries); - } else { - entry = entries.get(0); - } + SubscribesConnectionEntry entry = getEntry(); if (!entry.isFreezed() && tryAcquireConnection(entry)) { Promise promise = connectionManager.newPromise(); connect(entry, promise); diff --git a/src/main/java/org/redisson/misc/MasterConnectionPool.java b/src/main/java/org/redisson/misc/MasterConnectionPool.java new file mode 100644 index 000000000..5d2b680b9 --- /dev/null +++ b/src/main/java/org/redisson/misc/MasterConnectionPool.java @@ -0,0 +1,41 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisConnection; +import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.SubscribesConnectionEntry; + +public class MasterConnectionPool extends ConnectionPool { + + public MasterConnectionPool(MasterSlaveServersConfig config, + ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { + super(config, connectionManager, masterSlaveEntry); + } + + @Override + protected SubscribesConnectionEntry getEntry() { + return entries.get(0); + } + + @Override + protected int getMinimumIdleSize(SubscribesConnectionEntry entry) { + return config.getMasterConnectionMinimumIdleSize(); + } + +} diff --git a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java index c273086a1..804b4daa3 100644 --- a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java +++ b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java @@ -18,7 +18,6 @@ package org.redisson.misc; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisPubSubConnection; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.LoadBalancer; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.SubscribesConnectionEntry; @@ -26,9 +25,8 @@ import io.netty.util.concurrent.Future; public class PubSubConnectionPoll extends ConnectionPool { - public PubSubConnectionPoll(MasterSlaveServersConfig config, - LoadBalancer loadBalancer, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { - super(config, loadBalancer, connectionManager, masterSlaveEntry); + public PubSubConnectionPoll(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { + super(config, connectionManager, masterSlaveEntry); } @Override