diff --git a/src/main/java/org/redisson/RedisClusterNodes.java b/src/main/java/org/redisson/RedisClusterNodes.java new file mode 100644 index 000000000..4b59c8d4c --- /dev/null +++ b/src/main/java/org/redisson/RedisClusterNodes.java @@ -0,0 +1,28 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import org.redisson.connection.ConnectionManager; +import org.redisson.core.ClusterNode; +import org.redisson.core.ClusterNodesGroup; + +public class RedisClusterNodes extends RedisNodes implements ClusterNodesGroup { + + public RedisClusterNodes(ConnectionManager connectionManager) { + super(connectionManager); + } + +} diff --git a/src/main/java/org/redisson/RedisNodes.java b/src/main/java/org/redisson/RedisNodes.java index 07fd116f9..6337fe826 100644 --- a/src/main/java/org/redisson/RedisNodes.java +++ b/src/main/java/org/redisson/RedisNodes.java @@ -38,7 +38,7 @@ import io.netty.util.concurrent.Promise; public class RedisNodes implements NodesGroup { - private final ConnectionManager connectionManager; + final ConnectionManager connectionManager; public RedisNodes(ConnectionManager connectionManager) { this.connectionManager = connectionManager; diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 2902f22f9..aec5c611b 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -37,7 +37,7 @@ import org.redisson.connection.ElasticacheConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SingleConnectionManager; -import org.redisson.core.ClusterNode; +import org.redisson.core.ClusterNodesGroup; import org.redisson.core.Node; import org.redisson.core.NodesGroup; import org.redisson.core.RAtomicDouble; @@ -563,11 +563,11 @@ public class Redisson implements RedissonClient { } @Override - public NodesGroup getClusterNodesGroup() { - if (!config.isClusterConfig()) { + public ClusterNodesGroup getClusterNodesGroup() { + if (!connectionManager.isClusterMode()) { throw new IllegalStateException("Redisson is not in cluster mode!"); } - return new RedisNodes(connectionManager); + return new RedisClusterNodes(connectionManager); } @Override diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index b63d5b3a9..0f4609338 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; import org.redisson.core.ClusterNode; +import org.redisson.core.ClusterNodesGroup; import org.redisson.core.Node; import org.redisson.core.NodesGroup; import org.redisson.core.RAtomicDouble; @@ -697,7 +698,7 @@ public interface RedissonClient { * * @return */ - NodesGroup getClusterNodesGroup(); + ClusterNodesGroup getClusterNodesGroup(); /** * Returns {@code true} if this Redisson instance has been shut down. diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index d9128b3e8..d818fc9c0 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -294,7 +294,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public NodesGroup getClusterNodesGroup() { - if (!config.isClusterConfig()) { + if (!connectionManager.isClusterMode()) { throw new IllegalStateException("Redisson not in cluster mode!"); } return new RedisNodes(connectionManager); diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 1a545052e..09215a7a9 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -75,16 +75,16 @@ public class RedisClient { } public RedisClient(String host, int port) { - this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 3000); + this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000); hasOwnGroup = true; } public RedisClient(EventLoopGroup group, String host, int port) { - this(group, NioSocketChannel.class, host, port, 3000); + this(group, NioSocketChannel.class, host, port, 10000); } public RedisClient(EventLoopGroup group, Class socketChannelClass, String host, int port, int connectTimeout) { - this(group, socketChannelClass, host, port, connectTimeout, 3000); + this(group, socketChannelClass, host, port, connectTimeout, 10000); } public RedisClient(EventLoopGroup group, Class socketChannelClass, String host, int port, int connectTimeout, int commandTimeout) { diff --git a/src/main/java/org/redisson/client/codec/JsonJacksonMapValueCodec.java b/src/main/java/org/redisson/client/codec/JsonJacksonMapValueCodec.java index 33f06b086..9587564ca 100644 --- a/src/main/java/org/redisson/client/codec/JsonJacksonMapValueCodec.java +++ b/src/main/java/org/redisson/client/codec/JsonJacksonMapValueCodec.java @@ -1,3 +1,18 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.client.codec; import com.fasterxml.jackson.core.type.TypeReference; diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 676b42dd9..c422c1592 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -38,6 +38,7 @@ import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; +import org.redisson.client.protocol.decoder.SlotsDecoder; import org.redisson.client.protocol.pubsub.Message; import org.redisson.client.protocol.pubsub.PubSubMessage; import org.redisson.client.protocol.pubsub.PubSubPatternMessage; @@ -94,6 +95,7 @@ public class CommandDecoder extends ReplayingDecoder { CommandData cmd = (CommandData)data; if (cmd.getCommand().getReplayMultiDecoder() != null && (NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()) + || SlotsDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()) || ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) { makeCheckpoint = false; } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 3ebcd4b1d..210c18a0f 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -53,6 +53,7 @@ import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; +import org.redisson.client.protocol.decoder.SlotsDecoder; import org.redisson.client.protocol.decoder.StringDataDecoder; import org.redisson.client.protocol.decoder.StringListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; @@ -265,8 +266,10 @@ public interface RedisCommands { new FlatNestedMultiDecoder(new ObjectMapReplayDecoder(), new ListResultReplayDecoder()), ValueType.OBJECT ); + RedisStrictCommand CLUSTER_ADDSLOTS = new RedisStrictCommand("CLUSTER", "ADDSLOTS"); RedisStrictCommand CLUSTER_REPLICATE = new RedisStrictCommand("CLUSTER", "REPLICATE"); RedisStrictCommand CLUSTER_FORGET = new RedisStrictCommand("CLUSTER", "FORGET"); + RedisCommand CLUSTER_SLOTS = new RedisCommand("CLUSTER", "SLOTS", new SlotsDecoder(), ValueType.OBJECT); RedisStrictCommand CLUSTER_RESET = new RedisStrictCommand("CLUSTER", "RESET"); RedisStrictCommand> CLUSTER_GETKEYSINSLOT = new RedisStrictCommand>("CLUSTER", "GETKEYSINSLOT", new StringListReplayDecoder()); RedisStrictCommand CLUSTER_SETSLOT = new RedisStrictCommand("CLUSTER", "SETSLOT"); diff --git a/src/main/java/org/redisson/client/protocol/decoder/SlotsDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/SlotsDecoder.java new file mode 100644 index 000000000..cbd28efb1 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/SlotsDecoder.java @@ -0,0 +1,65 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.redisson.client.handler.State; +import org.redisson.cluster.ClusterSlotRange; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class SlotsDecoder implements MultiDecoder { + + @Override + public Object decode(ByteBuf buf, State state) { + return buf.toString(CharsetUtil.UTF_8); + } + + @Override + public Object decode(List parts, State state) { + if (parts.size() > 2 && parts.get(0) instanceof List) { + Map> result = new HashMap>(); + List> rows = (List>)(Object)parts; + for (List row : rows) { + Iterator iterator = row.iterator(); + Long startSlot = (Long)iterator.next(); + Long endSlot = (Long)iterator.next(); + ClusterSlotRange range = new ClusterSlotRange(startSlot.intValue(), endSlot.intValue()); + Set addresses = new HashSet(); + while(iterator.hasNext()) { + List addressParts = (List) iterator.next(); + addresses.add(addressParts.get(0) + ":" + addressParts.get(1)); + } + result.put(range, addresses); + } + return result; + } + return parts; + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return true; + } + +} diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 8bb82b33b..5b016c7dd 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -64,6 +64,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private final Map lastPartitions = PlatformDependent.newConcurrentHashMap(); private ScheduledFuture monitorFuture; + + private volatile URI lastClusterNode; public ClusterConnectionManager(ClusterServersConfig cfg, Config config) { super(config); @@ -87,6 +89,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); } + lastClusterNode = addr; + Collection partitions = parsePartitions(nodes); List>>> futures = new ArrayList>>>(); for (ClusterPartition partition : partitions) { @@ -321,7 +325,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { scheduleClusterChangeCheck(cfg, null); return; } - URI uri = iterator.next(); + final URI uri = iterator.next(); Future connectionFuture = connect(cfg, uri); connectionFuture.addListener(new FutureListener() { @Override @@ -333,12 +337,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } RedisConnection connection = future.getNow(); - updateClusterState(cfg, connection, iterator); + updateClusterState(cfg, connection, iterator, uri); } }); } - private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator iterator) { + private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator iterator, final URI uri) { Future> future = connection.async(RedisCommands.CLUSTER_NODES); future.addListener(new FutureListener>() { @Override @@ -350,6 +354,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return; } + lastClusterNode = uri; + List nodes = future.getNow(); final StringBuilder nodesValue = new StringBuilder(); if (log.isDebugEnabled()) { @@ -588,14 +594,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { currentPartition.addSlots(addedSlots); MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr()); - + for (Integer slot : addedSlots) { entry.addSlotRange(slot); addEntry(slot, entry); lastPartitions.put(slot, currentPartition); } if (!addedSlots.isEmpty()) { - log.info("{} slots added to {}", addedSlots.size(), entry.getClient().getAddr()); + log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddr()); } Set removedSlots = new HashSet(currentPartition.getSlots()); @@ -609,7 +615,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { currentPartition.removeSlots(removedSlots); if (!removedSlots.isEmpty()) { - log.info("{} slots removed from {}", removedSlots.size(), entry.getClient().getAddr()); + log.info("{} slots removed from {}", removedSlots.size(), currentPartition.getMasterAddr()); } break; } @@ -684,5 +690,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private HashSet getLastPartitions() { return new HashSet(lastPartitions.values()); } + + @Override + public URI getLastClusterNode() { + return lastClusterNode; + } + + @Override + public boolean isClusterMode() { + return true; + } + } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 01acaf266..9d375f83a 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -16,6 +16,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; +import java.net.URI; import java.util.Collection; import java.util.Set; import java.util.concurrent.Semaphore; @@ -42,6 +43,8 @@ import io.netty.util.concurrent.Promise; * */ public interface ConnectionManager { + + URI getLastClusterNode(); boolean isClusterMode(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index e8fecbfaf..0a0280fe0 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -124,8 +124,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected MasterSlaveServersConfig config; - protected boolean isClusterMode; - private final Map entries = PlatformDependent.newConcurrentHashMap(); private final Promise shutdownPromise; @@ -169,11 +167,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } this.codec = cfg.getCodec(); this.shutdownPromise = newPromise(); - this.isClusterMode = cfg.isClusterConfig(); } public boolean isClusterMode() { - return isClusterMode; + return false; } public IdleConnectionWatcher getConnectionWatcher() { @@ -803,4 +800,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { Thread.currentThread().interrupt(); } } + + public URI getLastClusterNode() { + return null; + } } diff --git a/src/main/java/org/redisson/core/ClusterNodesGroup.java b/src/main/java/org/redisson/core/ClusterNodesGroup.java new file mode 100644 index 000000000..89ca2c3bc --- /dev/null +++ b/src/main/java/org/redisson/core/ClusterNodesGroup.java @@ -0,0 +1,20 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +public interface ClusterNodesGroup extends NodesGroup { + +} diff --git a/src/test/java/org/redisson/RedisClientTest.java b/src/test/java/org/redisson/RedisClientTest.java index 6ed5ead5b..9814f2828 100644 --- a/src/test/java/org/redisson/RedisClientTest.java +++ b/src/test/java/org/redisson/RedisClientTest.java @@ -24,6 +24,9 @@ import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.pubsub.PubSubType; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; @@ -106,7 +109,7 @@ public class RedisClientTest { @Test public void test() throws InterruptedException { - RedisClient c = new RedisClient("localhost", 6379); + RedisClient c = new RedisClient(new NioEventLoopGroup(), NioSocketChannel.class, "localhost", 6379, 3000, 10000); final RedisConnection conn = c.connect(); conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0); @@ -128,7 +131,7 @@ public class RedisClientTest { @Test public void testPipeline() throws InterruptedException, ExecutionException { - RedisClient c = new RedisClient("localhost", 6379); + RedisClient c = new RedisClient(new NioEventLoopGroup(), NioSocketChannel.class, "localhost", 6379, 3000, 10000); RedisConnection conn = c.connect(); conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0); diff --git a/src/test/java/org/redisson/RedissonRedLockTest.java b/src/test/java/org/redisson/RedissonRedLockTest.java index 5fe975077..29060d0ba 100644 --- a/src/test/java/org/redisson/RedissonRedLockTest.java +++ b/src/test/java/org/redisson/RedissonRedLockTest.java @@ -43,13 +43,6 @@ public class RedissonRedLockTest { public void run() { RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); lock.lock(); - - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - } - - lock.unlock(); }; }; t.start(); @@ -92,13 +85,14 @@ public class RedissonRedLockTest { Thread.sleep(3000); } catch (InterruptedException e) { } - lock.unlock(); }; }; t.start(); t.join(1000); + lock3.delete(); + RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); lock.lock(); lock.unlock();