Fixed - race condition with load balancer node selection. #1337

pull/1423/head
Nikita 7 years ago
parent 60fb467012
commit 03ebf8f96c

@ -164,18 +164,16 @@ abstract class ConnectionPool<T extends RedisConnection> {
protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry);
protected ClientConnectionsEntry getEntry() {
return config.getLoadBalancer().getEntry(entries);
}
public RFuture<T> get(RedisCommand<?> command) {
for (int j = entries.size() - 1; j >= 0; j--) {
final ClientConnectionsEntry entry = getEntry();
List<ClientConnectionsEntry> entriesCopy = new LinkedList<ClientConnectionsEntry>(entries);
while (!entriesCopy.isEmpty()) {
ClientConnectionsEntry entry = config.getLoadBalancer().getEntry(entriesCopy);
if ((!entry.isFreezed() ||
(entry.getFreezeReason() == FreezeReason.SYSTEM && config.getReadMode() == ReadMode.MASTER_SLAVE)) &&
tryAcquireConnection(entry)) {
return acquireConnection(command, entry);
}
entriesCopy.remove(entry);
}
List<InetSocketAddress> failed = new LinkedList<InetSocketAddress>();
@ -201,21 +199,14 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
public RFuture<T> get(RedisCommand<?> command, ClientConnectionsEntry entry) {
if ((!entry.isFreezed() || entry.getFreezeReason() == FreezeReason.SYSTEM) &&
tryAcquireConnection(entry)) {
return acquireConnection(command, entry);
}
RedisConnectionException exception = new RedisConnectionException(
"Can't aquire connection to " + entry);
return RedissonPromise.newFailedFuture(exception);
return acquireConnection(command, entry);
}
public static abstract class AcquireCallback<T> implements Runnable, FutureListener<T> {
}
private RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) {
protected final RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) {
final RPromise<T> result = new RedissonPromise<T>();
AcquireCallback<T> callback = new AcquireCallback<T>() {

@ -15,11 +15,13 @@
*/
package org.redisson.connection.pool;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.ClientConnectionsEntry;
/**
@ -35,10 +37,10 @@ public class MasterConnectionPool extends ConnectionPool<RedisConnection> {
}
@Override
protected ClientConnectionsEntry getEntry() {
return entries.get(0);
public RFuture<RedisConnection> get(RedisCommand<?> command) {
return acquireConnection(command, entries.get(0));
}
public void remove(ClientConnectionsEntry entry) {
entries.remove(entry);
}

@ -15,6 +15,9 @@
*/
package org.redisson.connection.pool;
import org.redisson.api.RFuture;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
@ -34,8 +37,8 @@ public class MasterPubSubConnectionPool extends PubSubConnectionPool {
}
@Override
protected ClientConnectionsEntry getEntry() {
return entries.get(0);
public RFuture<RedisPubSubConnection> get(RedisCommand<?> command) {
return acquireConnection(command, entries.get(0));
}
public void remove(ClientConnectionsEntry entry) {

@ -37,6 +37,7 @@ import org.redisson.api.listener.StatusListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
public class RedissonTopicTest {
@ -684,7 +685,7 @@ public class RedissonTopicTest {
for (int i = 0; i < 100; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
RFuture<?> f2 = redisson.getBucket("i" + i).setAsync("");
RFuture<?> f3 = redisson.getTopic("topic").publishAsync("testmsg");
RFuture<?> f3 = redisson.getTopic("topic").publishAsync(1);
futures.add(f1);
futures.add(f2);
futures.add(f3);
@ -716,6 +717,7 @@ public class RedissonTopicTest {
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);

Loading…
Cancel
Save