From 48dea3709e5b4f6b2389777ae4066036c08cf4f9 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 5 Sep 2015 11:59:51 +0300 Subject: [PATCH 1/8] Subscription connection leak in PubSubConnectionEntry::removeListener & unsubscribe process optimization. #237 --- .../connection/PubSubConnectionEntry.java | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 95d891aed..59515c435 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -103,11 +103,11 @@ public class PubSubConnectionEntry { } } - public void removeListener(String channelName, RedisPubSubListener listener) { + private void removeListener(String channelName, RedisPubSubListener listener) { Queue queue = channelListeners.get(channelName); synchronized (queue) { - if (queue.remove(listener)) { - channelListeners.remove(channelName, new ConcurrentLinkedQueue()); + if (queue.remove(listener) && queue.isEmpty()) { + channelListeners.remove(channelName); } } conn.removeListener(listener); @@ -139,34 +139,36 @@ public class PubSubConnectionEntry { @Override public boolean onStatus(PubSubType type, String ch) { if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) { - Queue listeners = channelListeners.get(channel); - if (listeners != null) { - for (RedisPubSubListener listener : listeners) { - removeListener(channel, listener); - } - } - subscribedChannelsAmount.release(); + removeListeners(channel); return true; } return false; } + }); conn.addOneShotListener(listener); conn.unsubscribe(channel); } + private void removeListeners(String channel) { + Queue queue = channelListeners.get(channel); + if (queue != null) { + synchronized (queue) { + channelListeners.remove(channel); + } + for (RedisPubSubListener listener : queue) { + conn.removeListener(listener); + } + } + subscribedChannelsAmount.release(); + } + public void punsubscribe(final String channel, RedisPubSubListener listener) { conn.addOneShotListener(new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String ch) { if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) { - Queue listeners = channelListeners.get(channel); - if (listeners != null) { - for (RedisPubSubListener listener : listeners) { - removeListener(channel, listener); - } - } - subscribedChannelsAmount.release(); + removeListeners(channel); return true; } return false; From efbca96707ce89c62dd5dbc2d6f870e4136068f8 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 5 Sep 2015 19:52:31 +0300 Subject: [PATCH 2/8] [maven-release-plugin] prepare release redisson-2.1.2 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 9d5385698..5ce259ed1 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.redisson redisson - 2.1.2-SNAPSHOT + 2.1.2 bundle Redisson @@ -15,7 +15,7 @@ scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git - HEAD + redisson-2.1.2 From abe97f75e86e925c018cec87e7739ce555ff81bd Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 5 Sep 2015 19:52:37 +0300 Subject: [PATCH 3/8] [maven-release-plugin] prepare for next development iteration --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 5ce259ed1..3706fb294 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.redisson redisson - 2.1.2 + 2.1.3-SNAPSHOT bundle Redisson @@ -15,7 +15,7 @@ scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git - redisson-2.1.2 + HEAD From 1000975b0b51d2384eae08f776aa06bcb7c96575 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sat, 5 Sep 2015 20:04:25 +0300 Subject: [PATCH 4/8] Update README.md --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 3b2315e41..49d4c4a4c 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,16 @@ Recent Releases ================================ ####Please Note: trunk is current development branch. +####05-Sep-2015 - version 2.1.2 released +Fixed - possible NPE during channel reconnection +Fixed - executeAsync freezes in cluster mode +Fixed - use same node for SCAN/SSCAN/HSCAN during iteration +Fixed - possible race-condition during master change +Fixed - `BlockingQueue.peek` race-condition +Fixed - NPE with empty sentinel servers +Fixed - unable to read `clientName` config param in Master\Slave and Sentinel modes +Fixed - "Too many open files" error in cluster mode + ####15-Aug-2015 - version 2.1.1 released Feature - all keys operations extracted to `RKeys` interface Feature - `RKeys.getKeys`, `RKeys.getKeysByPattern` and `RKeys.randomKey`methods added From 043b2c2e099095ed42a5e907294c4f029f63ac20 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 7 Sep 2015 10:13:56 +0300 Subject: [PATCH 5/8] channel configuration during reconnection. #198 --- .../redisson/client/ReconnectListener.java | 7 ++++ .../org/redisson/client/RedisConnection.java | 14 +++++++- .../client/handler/ConnectionWatchdog.java | 8 +++-- .../redisson/connection/BaseLoadBalancer.java | 15 +------- .../redisson/connection/ConnectionEntry.java | 35 ++++++++++++++++--- .../org/redisson/connection/SingleEntry.java | 14 +------- .../connection/SubscribesConnectionEntry.java | 8 +++++ 7 files changed, 66 insertions(+), 35 deletions(-) create mode 100644 src/main/java/org/redisson/client/ReconnectListener.java diff --git a/src/main/java/org/redisson/client/ReconnectListener.java b/src/main/java/org/redisson/client/ReconnectListener.java new file mode 100644 index 000000000..a02e3a356 --- /dev/null +++ b/src/main/java/org/redisson/client/ReconnectListener.java @@ -0,0 +1,7 @@ +package org.redisson.client; + +public interface ReconnectListener { + + void onReconnect(RedisConnection redisConnection); + +} diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index c521ff7dd..c547cbdb0 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -38,6 +38,7 @@ public class RedisConnection implements RedisCommands { private volatile boolean closed; volatile Channel channel; + private ReconnectListener reconnectListener; public RedisConnection(RedisClient redisClient, Channel channel) { super(); @@ -46,11 +47,22 @@ public class RedisConnection implements RedisCommands { updateChannel(channel); } - public void updateChannel(Channel channel) { + public void setReconnectListener(ReconnectListener reconnectListener) { + this.reconnectListener = reconnectListener; + } + + private void updateChannel(Channel channel) { this.channel = channel; channel.attr(CONNECTION).set(this); } + public void onReconnect(Channel channel) { + updateChannel(channel); + if (reconnectListener != null) { + reconnectListener.onReconnect(this); + } + } + public RedisClient getRedisClient() { return redisClient; } diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 690fa06ba..dd6d05eb4 100644 --- a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -23,11 +23,11 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; -import io.netty.util.concurrent.GenericFutureListener; public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { @@ -74,7 +74,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection); - bootstrap.connect().addListener(new GenericFutureListener() { + bootstrap.connect().addListener(new ChannelFutureListener() { + @Override public void operationComplete(ChannelFuture future) throws Exception { if (connection.isClosed()) { @@ -83,7 +84,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { if (future.isSuccess()) { log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); - connection.updateChannel(future.channel()); + connection.onReconnect(future.channel()); return; } @@ -95,6 +96,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { } }, timeout, TimeUnit.MILLISECONDS); } + }); } diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 5a0d8330e..20976ea83 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -28,7 +28,6 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.protocol.RedisCommands; import org.redisson.misc.ReclosableLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,19 +148,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { if (conn != null) { return conn; } - conn = entry.getClient().connectPubSub(); - if (config.getPassword() != null) { - conn.sync(RedisCommands.AUTH, config.getPassword()); - } - if (config.getDatabase() != 0) { - conn.sync(RedisCommands.SELECT, config.getDatabase()); - } - if (config.getClientName() != null) { - conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); - } - - entry.registerSubscribeConnection(conn); - return conn; + return entry.connectPubSub(config); } catch (RedisConnectionException e) { entry.getSubscribeConnectionsSemaphore().release(); // TODO connection scoring diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index 5993f9782..3adbe3cad 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -20,18 +20,20 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.ReconnectListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; +import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.RedisCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConnectionEntry { - private final Logger log = LoggerFactory.getLogger(getClass()); + final Logger log = LoggerFactory.getLogger(getClass()); private volatile boolean freezed; - private final RedisClient client; + final RedisClient client; private final Queue connections = new ConcurrentLinkedQueue(); private final Semaphore connectionsSemaphore; @@ -61,8 +63,22 @@ public class ConnectionEntry { return connections; } - public RedisConnection connect(MasterSlaveServersConfig config) { + public RedisConnection connect(final MasterSlaveServersConfig config) { RedisConnection conn = client.connect(); + log.debug("new connection created: {}", conn); + + prepareConnection(config, conn); + conn.setReconnectListener(new ReconnectListener() { + @Override + public void onReconnect(RedisConnection conn) { + prepareConnection(config, conn); + } + }); + + return conn; + } + + private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) { if (config.getPassword() != null) { conn.sync(RedisCommands.AUTH, config.getPassword()); } @@ -72,8 +88,19 @@ public class ConnectionEntry { if (config.getClientName() != null) { conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); } + } - log.debug("new connection created: {}", conn); + public RedisPubSubConnection connectPubSub(final MasterSlaveServersConfig config) { + RedisPubSubConnection conn = client.connectPubSub(); + log.debug("new pubsub connection created: {}", conn); + + prepareConnection(config, conn); + conn.setReconnectListener(new ReconnectListener() { + @Override + public void onReconnect(RedisConnection conn) { + prepareConnection(config, conn); + } + }); return conn; } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index f4d3cc193..62159402f 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -20,7 +20,6 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.protocol.RedisCommands; public class SingleEntry extends MasterSlaveEntry { @@ -54,18 +53,7 @@ public class SingleEntry extends MasterSlaveEntry { } try { - conn = masterEntry.getClient().connectPubSub(); - if (config.getPassword() != null) { - conn.sync(RedisCommands.AUTH, config.getPassword()); - } - if (config.getDatabase() != 0) { - conn.sync(RedisCommands.SELECT, config.getDatabase()); - } - if (config.getClientName() != null) { - conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); - } - - return conn; + return masterEntry.connectPubSub(config); } catch (RedisConnectionException e) { ((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release(); throw e; diff --git a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java index b7331f4ef..9fe0f8e82 100644 --- a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java +++ b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java @@ -19,6 +19,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; +import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisPubSubConnection; @@ -53,5 +54,12 @@ public class SubscribesConnectionEntry extends ConnectionEntry { return subscribeConnectionsSemaphore; } + public RedisPubSubConnection connectPubSub(MasterSlaveServersConfig config) { + RedisPubSubConnection conn = super.connectPubSub(config); + allSubscribeConnections.offer(conn); + return conn; + } + + } From 3aced8e178bde02151b70b8fccba3f3a7a6a2936 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 7 Sep 2015 10:40:38 +0300 Subject: [PATCH 6/8] license added. #198 --- .../org/redisson/client/ReconnectListener.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/java/org/redisson/client/ReconnectListener.java b/src/main/java/org/redisson/client/ReconnectListener.java index a02e3a356..3d81f5f66 100644 --- a/src/main/java/org/redisson/client/ReconnectListener.java +++ b/src/main/java/org/redisson/client/ReconnectListener.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.client; public interface ReconnectListener { From 429eb6691ab780be19d117a1d339067450f1b206 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 7 Sep 2015 16:34:44 +0300 Subject: [PATCH 7/8] init reconnected channel. #198 --- .../redisson/client/ReconnectListener.java | 2 +- .../org/redisson/client/RedisConnection.java | 13 +++++-------- .../client/handler/ConnectionWatchdog.java | 19 +++++++++++++++---- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/redisson/client/ReconnectListener.java b/src/main/java/org/redisson/client/ReconnectListener.java index 3d81f5f66..289e4c1ac 100644 --- a/src/main/java/org/redisson/client/ReconnectListener.java +++ b/src/main/java/org/redisson/client/ReconnectListener.java @@ -17,6 +17,6 @@ package org.redisson.client; public interface ReconnectListener { - void onReconnect(RedisConnection redisConnection); + void onReconnect(RedisConnection redisConnection) throws RedisException; } diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index c547cbdb0..05fa233d1 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -51,16 +51,13 @@ public class RedisConnection implements RedisCommands { this.reconnectListener = reconnectListener; } - private void updateChannel(Channel channel) { - this.channel = channel; - channel.attr(CONNECTION).set(this); + public ReconnectListener getReconnectListener() { + return reconnectListener; } - public void onReconnect(Channel channel) { - updateChannel(channel); - if (reconnectListener != null) { - reconnectListener.onReconnect(this); - } + public void updateChannel(Channel channel) { + this.channel = channel; + channel.attr(CONNECTION).set(this); } public RedisClient getRedisClient() { diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index dd6d05eb4..464cbdb7d 100644 --- a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -18,6 +18,7 @@ package org.redisson.client.handler; import java.util.concurrent.TimeUnit; import org.redisson.client.RedisConnection; +import org.redisson.client.RedisException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,10 +83,20 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { return; } - if (future.isSuccess()) { - log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); - connection.onReconnect(future.channel()); - return; + try { + if (future.isSuccess()) { + log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); + + if (connection.getReconnectListener() != null) { + // new connection used only to init channel + RedisConnection rc = new RedisConnection(connection.getRedisClient(), future.channel()); + connection.getReconnectListener().onReconnect(rc); + } + connection.updateChannel(future.channel()); + return; + } + } catch (RedisException e) { + log.warn("Can't connect " + connection + " to " + connection.getRedisClient().getAddr(), e); } int timeout = 2 << attempts; From e0814eb73fa4e58ac80cef1a8f5c2760298330f9 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 8 Sep 2015 09:57:46 +0300 Subject: [PATCH 8/8] Commands decoding during reconnection fixed. #198 --- .../client/handler/ConnectionWatchdog.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 464cbdb7d..d55b7acbf 100644 --- a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -78,7 +78,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { bootstrap.connect().addListener(new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture future) throws Exception { + public void operationComplete(final ChannelFuture future) throws Exception { if (connection.isClosed()) { return; } @@ -88,11 +88,18 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); if (connection.getReconnectListener() != null) { - // new connection used only to init channel - RedisConnection rc = new RedisConnection(connection.getRedisClient(), future.channel()); - connection.getReconnectListener().onReconnect(rc); + bootstrap.group().execute(new Runnable() { + @Override + public void run() { + // new connection used only to init channel + RedisConnection rc = new RedisConnection(connection.getRedisClient(), future.channel()); + connection.getReconnectListener().onReconnect(rc); + connection.updateChannel(future.channel()); + } + }); + } else { + connection.updateChannel(future.channel()); } - connection.updateChannel(future.channel()); return; } } catch (RedisException e) {