Merge branch 'master' into new_pubsub

pull/555/head
Nikita 9 years ago
commit 44cdadbe4e

@ -0,0 +1,28 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.ClusterNode;
import org.redisson.core.ClusterNodesGroup;
public class RedisClusterNodes extends RedisNodes<ClusterNode> implements ClusterNodesGroup {
public RedisClusterNodes(ConnectionManager connectionManager) {
super(connectionManager);
}
}

@ -38,7 +38,7 @@ import io.netty.util.concurrent.Promise;
public class RedisNodes<N extends Node> implements NodesGroup<N> {
private final ConnectionManager connectionManager;
final ConnectionManager connectionManager;
public RedisNodes(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;

@ -37,7 +37,7 @@ import org.redisson.connection.ElasticacheConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.ClusterNode;
import org.redisson.core.ClusterNodesGroup;
import org.redisson.core.Node;
import org.redisson.core.NodesGroup;
import org.redisson.core.RAtomicDouble;
@ -563,11 +563,11 @@ public class Redisson implements RedissonClient {
}
@Override
public NodesGroup<ClusterNode> getClusterNodesGroup() {
if (!config.isClusterConfig()) {
public ClusterNodesGroup getClusterNodesGroup() {
if (!connectionManager.isClusterMode()) {
throw new IllegalStateException("Redisson is not in cluster mode!");
}
return new RedisNodes<ClusterNode>(connectionManager);
return new RedisClusterNodes(connectionManager);
}
@Override

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.core.ClusterNode;
import org.redisson.core.ClusterNodesGroup;
import org.redisson.core.Node;
import org.redisson.core.NodesGroup;
import org.redisson.core.RAtomicDouble;
@ -697,7 +698,7 @@ public interface RedissonClient {
*
* @return
*/
NodesGroup<ClusterNode> getClusterNodesGroup();
ClusterNodesGroup getClusterNodesGroup();
/**
* Returns {@code true} if this Redisson instance has been shut down.

@ -294,7 +294,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public NodesGroup<ClusterNode> getClusterNodesGroup() {
if (!config.isClusterConfig()) {
if (!connectionManager.isClusterMode()) {
throw new IllegalStateException("Redisson not in cluster mode!");
}
return new RedisNodes<ClusterNode>(connectionManager);

@ -75,16 +75,16 @@ public class RedisClient {
}
public RedisClient(String host, int port) {
this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 3000);
this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000);
hasOwnGroup = true;
}
public RedisClient(EventLoopGroup group, String host, int port) {
this(group, NioSocketChannel.class, host, port, 3000);
this(group, NioSocketChannel.class, host, port, 10000);
}
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int connectTimeout) {
this(group, socketChannelClass, host, port, connectTimeout, 3000);
this(group, socketChannelClass, host, port, connectTimeout, 10000);
}
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int connectTimeout, int commandTimeout) {

@ -1,3 +1,18 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import com.fasterxml.jackson.core.type.TypeReference;

@ -38,6 +38,7 @@ import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.SlotsDecoder;
import org.redisson.client.protocol.pubsub.Message;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
@ -94,6 +95,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
if (cmd.getCommand().getReplayMultiDecoder() != null
&& (NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())
|| SlotsDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())
|| ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) {
makeCheckpoint = false;
}

@ -53,6 +53,7 @@ import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder;
import org.redisson.client.protocol.decoder.SlotsDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
@ -265,8 +266,10 @@ public interface RedisCommands {
new FlatNestedMultiDecoder(new ObjectMapReplayDecoder(), new ListResultReplayDecoder()), ValueType.OBJECT
);
RedisStrictCommand<Void> CLUSTER_ADDSLOTS = new RedisStrictCommand<Void>("CLUSTER", "ADDSLOTS");
RedisStrictCommand<Void> CLUSTER_REPLICATE = new RedisStrictCommand<Void>("CLUSTER", "REPLICATE");
RedisStrictCommand<Void> CLUSTER_FORGET = new RedisStrictCommand<Void>("CLUSTER", "FORGET");
RedisCommand<Object> CLUSTER_SLOTS = new RedisCommand<Object>("CLUSTER", "SLOTS", new SlotsDecoder(), ValueType.OBJECT);
RedisStrictCommand<Void> CLUSTER_RESET = new RedisStrictCommand<Void>("CLUSTER", "RESET");
RedisStrictCommand<List<String>> CLUSTER_GETKEYSINSLOT = new RedisStrictCommand<List<String>>("CLUSTER", "GETKEYSINSLOT", new StringListReplayDecoder());
RedisStrictCommand<Void> CLUSTER_SETSLOT = new RedisStrictCommand<Void>("CLUSTER", "SETSLOT");

@ -0,0 +1,65 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.redisson.client.handler.State;
import org.redisson.cluster.ClusterSlotRange;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class SlotsDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}
@Override
public Object decode(List<Object> parts, State state) {
if (parts.size() > 2 && parts.get(0) instanceof List) {
Map<ClusterSlotRange, Set<String>> result = new HashMap<ClusterSlotRange, Set<String>>();
List<List<Object>> rows = (List<List<Object>>)(Object)parts;
for (List<Object> row : rows) {
Iterator<Object> iterator = row.iterator();
Long startSlot = (Long)iterator.next();
Long endSlot = (Long)iterator.next();
ClusterSlotRange range = new ClusterSlotRange(startSlot.intValue(), endSlot.intValue());
Set<String> addresses = new HashSet<String>();
while(iterator.hasNext()) {
List<Object> addressParts = (List<Object>) iterator.next();
addresses.add(addressParts.get(0) + ":" + addressParts.get(1));
}
result.put(range, addresses);
}
return result;
}
return parts;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}

@ -64,6 +64,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Map<Integer, ClusterPartition> lastPartitions = PlatformDependent.newConcurrentHashMap();
private ScheduledFuture<?> monitorFuture;
private volatile URI lastClusterNode;
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
super(config);
@ -87,6 +89,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
}
lastClusterNode = addr;
Collection<ClusterPartition> partitions = parsePartitions(nodes);
List<Future<Collection<Future<Void>>>> futures = new ArrayList<Future<Collection<Future<Void>>>>();
for (ClusterPartition partition : partitions) {
@ -321,7 +325,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
scheduleClusterChangeCheck(cfg, null);
return;
}
URI uri = iterator.next();
final URI uri = iterator.next();
Future<RedisConnection> connectionFuture = connect(cfg, uri);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
@ -333,12 +337,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
RedisConnection connection = future.getNow();
updateClusterState(cfg, connection, iterator);
updateClusterState(cfg, connection, iterator, uri);
}
});
}
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator) {
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator, final URI uri) {
Future<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES);
future.addListener(new FutureListener<List<ClusterNodeInfo>>() {
@Override
@ -350,6 +354,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return;
}
lastClusterNode = uri;
List<ClusterNodeInfo> nodes = future.getNow();
final StringBuilder nodesValue = new StringBuilder();
if (log.isDebugEnabled()) {
@ -588,14 +594,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
currentPartition.addSlots(addedSlots);
MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr());
for (Integer slot : addedSlots) {
entry.addSlotRange(slot);
addEntry(slot, entry);
lastPartitions.put(slot, currentPartition);
}
if (!addedSlots.isEmpty()) {
log.info("{} slots added to {}", addedSlots.size(), entry.getClient().getAddr());
log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddr());
}
Set<Integer> removedSlots = new HashSet<Integer>(currentPartition.getSlots());
@ -609,7 +615,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
currentPartition.removeSlots(removedSlots);
if (!removedSlots.isEmpty()) {
log.info("{} slots removed from {}", removedSlots.size(), entry.getClient().getAddr());
log.info("{} slots removed from {}", removedSlots.size(), currentPartition.getMasterAddr());
}
break;
}
@ -684,5 +690,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private HashSet<ClusterPartition> getLastPartitions() {
return new HashSet<ClusterPartition>(lastPartitions.values());
}
@Override
public URI getLastClusterNode() {
return lastClusterNode;
}
@Override
public boolean isClusterMode() {
return true;
}
}

@ -16,6 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Semaphore;
@ -42,6 +43,8 @@ import io.netty.util.concurrent.Promise;
*
*/
public interface ConnectionManager {
URI getLastClusterNode();
boolean isClusterMode();

@ -124,8 +124,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveServersConfig config;
protected boolean isClusterMode;
private final Map<Integer, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();
private final Promise<Boolean> shutdownPromise;
@ -169,11 +167,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
this.codec = cfg.getCodec();
this.shutdownPromise = newPromise();
this.isClusterMode = cfg.isClusterConfig();
}
public boolean isClusterMode() {
return isClusterMode;
return false;
}
public IdleConnectionWatcher getConnectionWatcher() {
@ -803,4 +800,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
Thread.currentThread().interrupt();
}
}
public URI getLastClusterNode() {
return null;
}
}

@ -0,0 +1,20 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
public interface ClusterNodesGroup extends NodesGroup<ClusterNode> {
}

@ -24,6 +24,9 @@ import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ -106,7 +109,7 @@ public class RedisClientTest {
@Test
public void test() throws InterruptedException {
RedisClient c = new RedisClient("localhost", 6379);
RedisClient c = new RedisClient(new NioEventLoopGroup(), NioSocketChannel.class, "localhost", 6379, 3000, 10000);
final RedisConnection conn = c.connect();
conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0);
@ -128,7 +131,7 @@ public class RedisClientTest {
@Test
public void testPipeline() throws InterruptedException, ExecutionException {
RedisClient c = new RedisClient("localhost", 6379);
RedisClient c = new RedisClient(new NioEventLoopGroup(), NioSocketChannel.class, "localhost", 6379, 3000, 10000);
RedisConnection conn = c.connect();
conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0);

@ -43,13 +43,6 @@ public class RedissonRedLockTest {
public void run() {
RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3);
lock.lock();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
lock.unlock();
};
};
t.start();
@ -92,13 +85,14 @@ public class RedissonRedLockTest {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
lock.unlock();
};
};
t.start();
t.join(1000);
lock3.delete();
RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3);
lock.lock();
lock.unlock();

Loading…
Cancel
Save