Merge branch 'mrniko/master'

pull/282/head
Rui Gu 10 years ago
commit 42ec7fb9f7

@ -59,6 +59,16 @@ Recent Releases
================================ ================================
####Please Note: trunk is current development branch. ####Please Note: trunk is current development branch.
####05-Sep-2015 - version 2.1.2 released
Fixed - possible NPE during channel reconnection
Fixed - executeAsync freezes in cluster mode
Fixed - use same node for SCAN/SSCAN/HSCAN during iteration
Fixed - possible race-condition during master change
Fixed - `BlockingQueue.peek` race-condition
Fixed - NPE with empty sentinel servers
Fixed - unable to read `clientName` config param in Master\Slave and Sentinel modes
Fixed - "Too many open files" error in cluster mode
####15-Aug-2015 - version 2.1.1 released ####15-Aug-2015 - version 2.1.1 released
Feature - all keys operations extracted to `RKeys` interface Feature - all keys operations extracted to `RKeys` interface
Feature - `RKeys.getKeys`, `RKeys.getKeysByPattern` and `RKeys.randomKey`methods added Feature - `RKeys.getKeys`, `RKeys.getKeysByPattern` and `RKeys.randomKey`methods added

@ -3,7 +3,7 @@
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson</artifactId> <artifactId>redisson</artifactId>
<version>2.1.2-SNAPSHOT</version> <version>2.1.3-SNAPSHOT</version>
<packaging>bundle</packaging> <packaging>bundle</packaging>
<name>Redisson</name> <name>Redisson</name>

@ -0,0 +1,22 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
public interface ReconnectListener {
void onReconnect(RedisConnection redisConnection) throws RedisException;
}

@ -38,6 +38,7 @@ public class RedisConnection implements RedisCommands {
private volatile boolean closed; private volatile boolean closed;
volatile Channel channel; volatile Channel channel;
private ReconnectListener reconnectListener;
public RedisConnection(RedisClient redisClient, Channel channel) { public RedisConnection(RedisClient redisClient, Channel channel) {
super(); super();
@ -46,6 +47,14 @@ public class RedisConnection implements RedisCommands {
updateChannel(channel); updateChannel(channel);
} }
public void setReconnectListener(ReconnectListener reconnectListener) {
this.reconnectListener = reconnectListener;
}
public ReconnectListener getReconnectListener() {
return reconnectListener;
}
public void updateChannel(Channel channel) { public void updateChannel(Channel channel) {
this.channel = channel; this.channel = channel;
channel.attr(CONNECTION).set(this); channel.attr(CONNECTION).set(this);

@ -18,16 +18,17 @@ package org.redisson.client.handler;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.util.concurrent.GenericFutureListener;
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
@ -74,17 +75,35 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection); log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection);
bootstrap.connect().addListener(new GenericFutureListener<ChannelFuture>() { bootstrap.connect().addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(final ChannelFuture future) throws Exception {
if (connection.isClosed()) { if (connection.isClosed()) {
return; return;
} }
if (future.isSuccess()) { try {
log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); if (future.isSuccess()) {
connection.updateChannel(future.channel()); log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr());
return;
if (connection.getReconnectListener() != null) {
bootstrap.group().execute(new Runnable() {
@Override
public void run() {
// new connection used only to init channel
RedisConnection rc = new RedisConnection(connection.getRedisClient(), future.channel());
connection.getReconnectListener().onReconnect(rc);
connection.updateChannel(future.channel());
}
});
} else {
connection.updateChannel(future.channel());
}
return;
}
} catch (RedisException e) {
log.warn("Can't connect " + connection + " to " + connection.getRedisClient().getAddr(), e);
} }
int timeout = 2 << attempts; int timeout = 2 << attempts;
@ -95,6 +114,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
} }
}, timeout, TimeUnit.MILLISECONDS); }, timeout, TimeUnit.MILLISECONDS);
} }
}); });
} }

@ -28,7 +28,6 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.ReclosableLatch; import org.redisson.misc.ReclosableLatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -149,19 +148,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
if (conn != null) { if (conn != null) {
return conn; return conn;
} }
conn = entry.getClient().connectPubSub(); return entry.connectPubSub(config);
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
entry.registerSubscribeConnection(conn);
return conn;
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {
entry.getSubscribeConnectionsSemaphore().release(); entry.getSubscribeConnectionsSemaphore().release();
// TODO connection scoring // TODO connection scoring

@ -20,18 +20,20 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.ReconnectListener;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ConnectionEntry { public class ConnectionEntry {
private final Logger log = LoggerFactory.getLogger(getClass()); final Logger log = LoggerFactory.getLogger(getClass());
private volatile boolean freezed; private volatile boolean freezed;
private final RedisClient client; final RedisClient client;
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>(); private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final Semaphore connectionsSemaphore; private final Semaphore connectionsSemaphore;
@ -61,8 +63,22 @@ public class ConnectionEntry {
return connections; return connections;
} }
public RedisConnection connect(MasterSlaveServersConfig config) { public RedisConnection connect(final MasterSlaveServersConfig config) {
RedisConnection conn = client.connect(); RedisConnection conn = client.connect();
log.debug("new connection created: {}", conn);
prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
}
});
return conn;
}
private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) {
if (config.getPassword() != null) { if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword()); conn.sync(RedisCommands.AUTH, config.getPassword());
} }
@ -72,8 +88,19 @@ public class ConnectionEntry {
if (config.getClientName() != null) { if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
} }
}
log.debug("new connection created: {}", conn); public RedisPubSubConnection connectPubSub(final MasterSlaveServersConfig config) {
RedisPubSubConnection conn = client.connectPubSub();
log.debug("new pubsub connection created: {}", conn);
prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
}
});
return conn; return conn;
} }

@ -103,11 +103,11 @@ public class PubSubConnectionEntry {
} }
} }
public void removeListener(String channelName, RedisPubSubListener listener) { private void removeListener(String channelName, RedisPubSubListener listener) {
Queue<RedisPubSubListener> queue = channelListeners.get(channelName); Queue<RedisPubSubListener> queue = channelListeners.get(channelName);
synchronized (queue) { synchronized (queue) {
if (queue.remove(listener)) { if (queue.remove(listener) && queue.isEmpty()) {
channelListeners.remove(channelName, new ConcurrentLinkedQueue<RedisPubSubListener>()); channelListeners.remove(channelName);
} }
} }
conn.removeListener(listener); conn.removeListener(listener);
@ -139,34 +139,36 @@ public class PubSubConnectionEntry {
@Override @Override
public boolean onStatus(PubSubType type, String ch) { public boolean onStatus(PubSubType type, String ch) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) { if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel); removeListeners(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
subscribedChannelsAmount.release();
return true; return true;
} }
return false; return false;
} }
}); });
conn.addOneShotListener(listener); conn.addOneShotListener(listener);
conn.unsubscribe(channel); conn.unsubscribe(channel);
} }
private void removeListeners(String channel) {
Queue<RedisPubSubListener> queue = channelListeners.get(channel);
if (queue != null) {
synchronized (queue) {
channelListeners.remove(channel);
}
for (RedisPubSubListener listener : queue) {
conn.removeListener(listener);
}
}
subscribedChannelsAmount.release();
}
public void punsubscribe(final String channel, RedisPubSubListener listener) { public void punsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() { conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override @Override
public boolean onStatus(PubSubType type, String ch) { public boolean onStatus(PubSubType type, String ch) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) { if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel); removeListeners(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
subscribedChannelsAmount.release();
return true; return true;
} }
return false; return false;

@ -20,7 +20,6 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;
public class SingleEntry extends MasterSlaveEntry { public class SingleEntry extends MasterSlaveEntry {
@ -54,18 +53,7 @@ public class SingleEntry extends MasterSlaveEntry {
} }
try { try {
conn = masterEntry.getClient().connectPubSub(); return masterEntry.connectPubSub(config);
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
return conn;
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {
((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release(); ((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release();
throw e; throw e;

@ -19,6 +19,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
@ -53,5 +54,12 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
return subscribeConnectionsSemaphore; return subscribeConnectionsSemaphore;
} }
public RedisPubSubConnection connectPubSub(MasterSlaveServersConfig config) {
RedisPubSubConnection conn = super.connectPubSub(config);
allSubscribeConnections.offer(conn);
return conn;
}
} }

Loading…
Cancel
Save