diff --git a/redisson/src/main/java/org/redisson/RedisNodes.java b/redisson/src/main/java/org/redisson/RedisNodes.java index d49c97a68..e8580571d 100644 --- a/redisson/src/main/java/org/redisson/RedisNodes.java +++ b/redisson/src/main/java/org/redisson/RedisNodes.java @@ -60,12 +60,12 @@ public class RedisNodes implements NodesGroup { Collection entries = connectionManager.getEntrySet(); URI addr = URIBuilder.create(address); for (MasterSlaveEntry masterSlaveEntry : entries) { - if (URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) { + if (masterSlaveEntry.getAllEntries().isEmpty() && URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) { return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); } - for (ClientConnectionsEntry entry : masterSlaveEntry.getSlaveEntries()) { - if (URIBuilder.compare(entry.getClient().getAddr(), addr) || - entry.getFreezeReason() == null || entry.getFreezeReason() == FreezeReason.RECONNECT) { + + for (ClientConnectionsEntry entry : masterSlaveEntry.getAllEntries()) { + if (URIBuilder.compare(entry.getClient().getAddr(), addr) && entry.getFreezeReason() != FreezeReason.MANAGER) { return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType()); } } @@ -78,16 +78,15 @@ public class RedisNodes implements NodesGroup { Collection entries = connectionManager.getEntrySet(); List result = new ArrayList(); for (MasterSlaveEntry masterSlaveEntry : entries) { - if (type == NodeType.MASTER) { + if (masterSlaveEntry.getAllEntries().isEmpty() && type == NodeType.MASTER) { RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); result.add((N) entry); } - if (type == NodeType.SLAVE) { - for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) { - if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) { - RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType()); - result.add((N) entry); - } + + for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) { + if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER && slaveEntry.getNodeType() == type) { + RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType()); + result.add((N) entry); } } } @@ -100,11 +99,13 @@ public class RedisNodes implements NodesGroup { Collection entries = connectionManager.getEntrySet(); List result = new ArrayList(); for (MasterSlaveEntry masterSlaveEntry : entries) { - RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); - result.add((N) masterEntry); + if (masterSlaveEntry.getAllEntries().isEmpty()) { + RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); + result.add((N) masterEntry); + } - for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) { - if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) { + for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) { + if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER) { RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType()); result.add((N) entry); } diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 4199aaf37..1c4961eb2 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -229,6 +229,7 @@ public class CommandBatchService extends CommandAsyncService { BatchPromise batchPromise = (BatchPromise) promise; RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); super.handleError(sentPromise, cause); + super.handleError(promise, cause); semaphore.release(); return; } @@ -462,39 +463,42 @@ public class CommandBatchService extends CommandAsyncService { return; } - for (java.util.Map.Entry> entry : future.getNow().entrySet()) { - Entry commandEntry = commands.get(entry.getKey()); - Iterator resultIter = entry.getValue().iterator(); - for (BatchCommandData data : commandEntry.getCommands()) { - if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) { - break; + try { + for (java.util.Map.Entry> entry : future.getNow().entrySet()) { + Entry commandEntry = commands.get(entry.getKey()); + Iterator resultIter = entry.getValue().iterator(); + for (BatchCommandData data : commandEntry.getCommands()) { + if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) { + break; + } + RPromise promise = (RPromise) data.getPromise(); + promise.trySuccess(resultIter.next()); } - RPromise promise = (RPromise) data.getPromise(); - promise.trySuccess(resultIter.next()); } - } - - List entries = new ArrayList(); - for (Entry e : commands.values()) { - entries.addAll(e.getCommands()); - } - Collections.sort(entries); - List responses = new ArrayList(entries.size()); - int syncedSlaves = 0; - for (BatchCommandData commandEntry : entries) { - if (isWaitCommand(commandEntry)) { - syncedSlaves += (Integer) commandEntry.getPromise().getNow(); - } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) - && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) { - Object entryResult = commandEntry.getPromise().getNow(); - entryResult = tryHandleReference(entryResult); - responses.add(entryResult); + + List entries = new ArrayList(); + for (Entry e : commands.values()) { + entries.addAll(e.getCommands()); } + Collections.sort(entries); + List responses = new ArrayList(entries.size()); + int syncedSlaves = 0; + for (BatchCommandData commandEntry : entries) { + if (isWaitCommand(commandEntry)) { + syncedSlaves += (Integer) commandEntry.getPromise().getNow(); + } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) + && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) { + Object entryResult = commandEntry.getPromise().getNow(); + entryResult = tryHandleReference(entryResult); + responses.add(entryResult); + } + } + BatchResult result = new BatchResult(responses, syncedSlaves); + resultPromise.trySuccess((R)result); + } catch (Exception e) { + resultPromise.tryFailure(e); } - BatchResult result = new BatchResult(responses, syncedSlaves); - resultPromise.trySuccess((R)result); - commands = null; } }); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index ae1c801cd..198fbbce5 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -352,14 +352,8 @@ public class MasterSlaveEntry { return slaveBalancer.getEntry(client); } - public Collection getSlaveEntries() { - List result = new ArrayList(); - for (ClientConnectionsEntry slaveEntry : slaveBalancer.getEntries()) { - if (slaveEntry.getNodeType() == NodeType.SLAVE) { - result.add(slaveEntry); - } - } - return result; + public Collection getAllEntries() { + return slaveBalancer.getEntries(); } public RedisClient getClient() { diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index be1530de8..58cbf99b3 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -3,6 +3,7 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -28,6 +29,7 @@ import org.redisson.api.RBatch; import org.redisson.api.RFuture; import org.redisson.api.RListAsync; import org.redisson.api.RMapAsync; +import org.redisson.api.RMapCache; import org.redisson.api.RMapCacheAsync; import org.redisson.api.RScript; import org.redisson.api.RScript.Mode; @@ -135,15 +137,20 @@ public class RedissonBatchTest extends BaseTest { process.shutdown(); } - + @Test public void testWriteTimeout() { RBatch batch = redisson.createBatch(batchOptions); + RMapCacheAsync map = batch.getMapCache("test"); for (int i = 0; i < 200000; i++) { - RMapCacheAsync map = batch.getMapCache("test"); - map.putAsync("" + i, "" + i, 10, TimeUnit.SECONDS); + RFuture f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES); + if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { + f.syncUninterruptibly(); + } } + batch.execute(); + assertThat(redisson.getMapCache("test").size()).isEqualTo(200000); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 81604ffda..6960659b3 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -32,6 +32,7 @@ import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.ClusterNode; import org.redisson.api.Node; +import org.redisson.api.NodeType; import org.redisson.api.Node.InfoSection; import org.redisson.api.NodesGroup; import org.redisson.api.RFuture; @@ -506,6 +507,8 @@ public class RedissonTest { public void testNode() { Node node = redisson.getNodesGroup().getNode(RedisRunner.getDefaultRedisServerBindAddressAndPort()); assertThat(node).isNotNull(); + + assertThat(node.info(InfoSection.ALL)).isNotEmpty(); } @Test @@ -643,6 +646,38 @@ public class RedissonTest { Assert.assertTrue(nodes.pingAll()); } + + @Test + public void testNodesInCluster() throws Exception { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slot1) + .addNode(master2, slot2) + .addNode(master3, slot3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + for (Node node : redisson.getClusterNodesGroup().getNodes()) { + assertThat(node.info(InfoSection.ALL)).isNotEmpty(); + } + assertThat(redisson.getClusterNodesGroup().getNodes(NodeType.SLAVE)).hasSize(3); + assertThat(redisson.getClusterNodesGroup().getNodes(NodeType.MASTER)).hasSize(3); + assertThat(redisson.getClusterNodesGroup().getNodes()).hasSize(6); + + redisson.shutdown(); + process.shutdown(); + } @Test public void testMovedRedirectInCluster() throws Exception {