Fixed - Collection iterator is not using the same Redis node. #1447

pull/1461/head
Nikita 7 years ago
parent 3aaed2f9d1
commit fe98d0aa14

@ -572,12 +572,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
MasterSlaveEntry entry = source.getEntry();
if (entry == null && source.getSlot() != null) {
entry = getEntry(source.getSlot());
}
if (source.getRedisClient() != null) {
entry = getEntry(source.getRedisClient());
}
if (entry == null && source.getSlot() != null) {
entry = getEntry(source.getSlot());
}
return entry;
}
@ -592,6 +592,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (source.getRedirect() != null) {
return entry.connectionReadOp(command, source.getAddr());
}
if (source.getRedisClient() != null) {
return entry.connectionReadOp(command, source.getRedisClient());
}
return entry.connectionReadOp(command);
}

@ -498,11 +498,12 @@ public class MasterSlaveEntry {
}
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, URI addr) {
if (config.getReadMode() == ReadMode.MASTER) {
return connectionWriteOp(command);
}
return slaveBalancer.getConnection(command, addr);
}
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, RedisClient client) {
return slaveBalancer.getConnection(command, client);
}
public RFuture<RedisPubSubConnection> nextPubSubConnection() {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {

@ -232,6 +232,15 @@ public class LoadBalancerManager {
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr);
return RedissonPromise.newFailedFuture(exception);
}
public RFuture<RedisConnection> getConnection(RedisCommand<?> command, RedisClient client) {
ClientConnectionsEntry entry = getEntry(client);
if (entry != null) {
return slaveConnectionPool.get(command, entry);
}
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + client);
return RedissonPromise.newFailedFuture(exception);
}
public RFuture<RedisConnection> nextConnection(RedisCommand<?> command) {
return slaveConnectionPool.get(command);

@ -2,6 +2,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
@ -13,12 +14,17 @@ import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
public class RedissonSetTest extends BaseTest {
@ -374,6 +380,48 @@ public class RedissonSetTest extends BaseTest {
Assert.assertEquals(2, set.size());
}
@Test
public void testClusteredIterator() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave4 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave5 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave6 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1, slave4)
.addNode(master2, slave2, slave5)
.addNode(master3, slave3, slave6);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
int size = 10000;
RSet<String> set = redisson.getSet("test");
for (int i = 0; i < size; i++) {
set.add("" + i);
}
Set<String> keys = new HashSet<>();
for (String key : set) {
keys.add(key);
}
assertThat(keys).hasSize(size);
redisson.shutdown();
process.shutdown();
}
@Test
public void testIteratorRemoveHighVolume() throws InterruptedException {
Set<Integer> set = redisson.getSet("set") /*new HashSet<Integer>()*/;

Loading…
Cancel
Save