Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit 70cd481e96

@ -60,12 +60,12 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet(); Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
URI addr = URIBuilder.create(address); URI addr = URIBuilder.create(address);
for (MasterSlaveEntry masterSlaveEntry : entries) { for (MasterSlaveEntry masterSlaveEntry : entries) {
if (URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) { if (masterSlaveEntry.getAllEntries().isEmpty() && URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) {
return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
} }
for (ClientConnectionsEntry entry : masterSlaveEntry.getSlaveEntries()) {
if (URIBuilder.compare(entry.getClient().getAddr(), addr) || for (ClientConnectionsEntry entry : masterSlaveEntry.getAllEntries()) {
entry.getFreezeReason() == null || entry.getFreezeReason() == FreezeReason.RECONNECT) { if (URIBuilder.compare(entry.getClient().getAddr(), addr) && entry.getFreezeReason() != FreezeReason.MANAGER) {
return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType()); return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType());
} }
} }
@ -78,16 +78,15 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet(); Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
List<N> result = new ArrayList<N>(); List<N> result = new ArrayList<N>();
for (MasterSlaveEntry masterSlaveEntry : entries) { for (MasterSlaveEntry masterSlaveEntry : entries) {
if (type == NodeType.MASTER) { if (masterSlaveEntry.getAllEntries().isEmpty() && type == NodeType.MASTER) {
RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
result.add((N) entry); result.add((N) entry);
} }
if (type == NodeType.SLAVE) {
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) { for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) {
if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) { if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER && slaveEntry.getNodeType() == type) {
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType()); RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
result.add((N) entry); result.add((N) entry);
}
} }
} }
} }
@ -100,11 +99,13 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet(); Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
List<N> result = new ArrayList<N>(); List<N> result = new ArrayList<N>();
for (MasterSlaveEntry masterSlaveEntry : entries) { for (MasterSlaveEntry masterSlaveEntry : entries) {
RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); if (masterSlaveEntry.getAllEntries().isEmpty()) {
result.add((N) masterEntry); RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
result.add((N) masterEntry);
}
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) { for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) {
if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) { if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER) {
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType()); RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
result.add((N) entry); result.add((N) entry);
} }

@ -229,6 +229,7 @@ public class CommandBatchService extends CommandAsyncService {
BatchPromise<R> batchPromise = (BatchPromise<R>) promise; BatchPromise<R> batchPromise = (BatchPromise<R>) promise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise(); RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
super.handleError(sentPromise, cause); super.handleError(sentPromise, cause);
super.handleError(promise, cause);
semaphore.release(); semaphore.release();
return; return;
} }
@ -462,39 +463,42 @@ public class CommandBatchService extends CommandAsyncService {
return; return;
} }
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : future.getNow().entrySet()) { try {
Entry commandEntry = commands.get(entry.getKey()); for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : future.getNow().entrySet()) {
Iterator<Object> resultIter = entry.getValue().iterator(); Entry commandEntry = commands.get(entry.getKey());
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) { Iterator<Object> resultIter = entry.getValue().iterator();
if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) { for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {
break; if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
break;
}
RPromise<Object> promise = (RPromise<Object>) data.getPromise();
promise.trySuccess(resultIter.next());
} }
RPromise<Object> promise = (RPromise<Object>) data.getPromise();
promise.trySuccess(resultIter.next());
} }
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
List<BatchCommandData> entries = new ArrayList<BatchCommandData>(); for (Entry e : commands.values()) {
for (Entry e : commands.values()) { entries.addAll(e.getCommands());
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (isWaitCommand(commandEntry)) {
syncedSlaves += (Integer) commandEntry.getPromise().getNow();
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = tryHandleReference(entryResult);
responses.add(entryResult);
} }
Collections.sort(entries);
List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (isWaitCommand(commandEntry)) {
syncedSlaves += (Integer) commandEntry.getPromise().getNow();
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = tryHandleReference(entryResult);
responses.add(entryResult);
}
}
BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
resultPromise.trySuccess((R)result);
} catch (Exception e) {
resultPromise.tryFailure(e);
} }
BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
resultPromise.trySuccess((R)result);
commands = null; commands = null;
} }
}); });

@ -352,14 +352,8 @@ public class MasterSlaveEntry {
return slaveBalancer.getEntry(client); return slaveBalancer.getEntry(client);
} }
public Collection<ClientConnectionsEntry> getSlaveEntries() { public Collection<ClientConnectionsEntry> getAllEntries() {
List<ClientConnectionsEntry> result = new ArrayList<ClientConnectionsEntry>(); return slaveBalancer.getEntries();
for (ClientConnectionsEntry slaveEntry : slaveBalancer.getEntries()) {
if (slaveEntry.getNodeType() == NodeType.SLAVE) {
result.add(slaveEntry);
}
}
return result;
} }
public RedisClient getClient() { public RedisClient getClient() {

@ -3,6 +3,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -28,6 +29,7 @@ import org.redisson.api.RBatch;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RListAsync; import org.redisson.api.RListAsync;
import org.redisson.api.RMapAsync; import org.redisson.api.RMapAsync;
import org.redisson.api.RMapCache;
import org.redisson.api.RMapCacheAsync; import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RScript; import org.redisson.api.RScript;
import org.redisson.api.RScript.Mode; import org.redisson.api.RScript.Mode;
@ -135,15 +137,20 @@ public class RedissonBatchTest extends BaseTest {
process.shutdown(); process.shutdown();
} }
@Test @Test
public void testWriteTimeout() { public void testWriteTimeout() {
RBatch batch = redisson.createBatch(batchOptions); RBatch batch = redisson.createBatch(batchOptions);
RMapCacheAsync<String, String> map = batch.getMapCache("test");
for (int i = 0; i < 200000; i++) { for (int i = 0; i < 200000; i++) {
RMapCacheAsync<String, String> map = batch.getMapCache("test"); RFuture<String> f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES);
map.putAsync("" + i, "" + i, 10, TimeUnit.SECONDS); if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
f.syncUninterruptibly();
}
} }
batch.execute(); batch.execute();
assertThat(redisson.getMapCache("test").size()).isEqualTo(200000);
} }
@Test @Test

@ -32,6 +32,7 @@ import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess; import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.ClusterNode; import org.redisson.api.ClusterNode;
import org.redisson.api.Node; import org.redisson.api.Node;
import org.redisson.api.NodeType;
import org.redisson.api.Node.InfoSection; import org.redisson.api.Node.InfoSection;
import org.redisson.api.NodesGroup; import org.redisson.api.NodesGroup;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -506,6 +507,8 @@ public class RedissonTest {
public void testNode() { public void testNode() {
Node node = redisson.getNodesGroup().getNode(RedisRunner.getDefaultRedisServerBindAddressAndPort()); Node node = redisson.getNodesGroup().getNode(RedisRunner.getDefaultRedisServerBindAddressAndPort());
assertThat(node).isNotNull(); assertThat(node).isNotNull();
assertThat(node.info(InfoSection.ALL)).isNotEmpty();
} }
@Test @Test
@ -643,6 +646,38 @@ public class RedissonTest {
Assert.assertTrue(nodes.pingAll()); Assert.assertTrue(nodes.pingAll());
} }
@Test
public void testNodesInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slot1)
.addNode(master2, slot2)
.addNode(master3, slot3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
for (Node node : redisson.getClusterNodesGroup().getNodes()) {
assertThat(node.info(InfoSection.ALL)).isNotEmpty();
}
assertThat(redisson.getClusterNodesGroup().getNodes(NodeType.SLAVE)).hasSize(3);
assertThat(redisson.getClusterNodesGroup().getNodes(NodeType.MASTER)).hasSize(3);
assertThat(redisson.getClusterNodesGroup().getNodes()).hasSize(6);
redisson.shutdown();
process.shutdown();
}
@Test @Test
public void testMovedRedirectInCluster() throws Exception { public void testMovedRedirectInCluster() throws Exception {

Loading…
Cancel
Save