From c9ed11750ca89736e8ab0febb8435ff9cdef047b Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 2 Jul 2014 16:14:48 +0400 Subject: [PATCH] +sdown -sdown sentinel handling. lock expire experimental support. --- .../redis/RedisAsyncConnection.java | 2 +- .../com/lambdaworks/redis/RedisClient.java | 2 +- .../redis/protocol/CommandHandler.java | 3 + .../redis/protocol/ConnectionWatchdog.java | 54 ++++++---- .../redis/pubsub/PubSubOutput.java | 23 ++-- .../redis/pubsub/RedisPubSubAdapter.java | 14 +-- .../redis/pubsub/RedisPubSubConnection.java | 36 +++---- .../redis/pubsub/RedisPubSubListener.java | 14 +-- .../RedisPubSubTopicListenerWrapper.java | 4 +- src/main/java/org/redisson/Redisson.java | 2 +- .../org/redisson/RedissonCountDownLatch.java | 39 ++++--- src/main/java/org/redisson/RedissonLock.java | 101 ++++++++++++------ src/main/java/org/redisson/RedissonTopic.java | 10 +- .../redisson/connection/BaseLoadBalancer.java | 32 +++++- .../redisson/connection/ConnectionEntry.java | 9 ++ .../connection/ConnectionManager.java | 24 +---- .../org/redisson/connection/LoadBalancer.java | 4 + .../MasterSlaveConnectionManager.java | 51 +++++---- .../connection/PubSubConnectionEntry.java | 86 ++++++++------- .../java/org/redisson/RedissonLockTest.java | 33 +++++- 20 files changed, 332 insertions(+), 211 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java index dd93d76bb..f9573bb20 100644 --- a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java +++ b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java @@ -561,7 +561,7 @@ public class RedisAsyncConnection extends ChannelInboundHandlerAdapter { } public Future publish(K channel, V message) { - CommandArgs args = new CommandArgs(codec).addKey(channel).addValue(message); + CommandArgs args = new CommandArgs(codec).add(channel.toString()).addValue(message); return dispatch(PUBLISH, new IntegerOutput(codec), args); } diff --git a/src/main/java/com/lambdaworks/redis/RedisClient.java b/src/main/java/com/lambdaworks/redis/RedisClient.java index c9f87b9d0..796f24e60 100644 --- a/src/main/java/com/lambdaworks/redis/RedisClient.java +++ b/src/main/java/com/lambdaworks/redis/RedisClient.java @@ -180,7 +180,7 @@ public class RedisClient { return connection; } catch (Throwable e) { - throw new RedisConnectionException("Unable to connect", e); + throw new RedisConnectionException("Unable to connect " + addr, e); } } diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index 8682a18cc..1b0c4df1c 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -4,6 +4,7 @@ package com.lambdaworks.redis.protocol; import io.netty.buffer.ByteBuf; import io.netty.channel.*; +import io.netty.util.CharsetUtil; import java.util.concurrent.BlockingQueue; @@ -59,6 +60,8 @@ public class CommandHandler extends ChannelDuplexHandler { Command cmd = (Command) msg; ByteBuf buf = ctx.alloc().heapBuffer(); cmd.encode(buf); +// System.out.println("out: " + buf.toString(CharsetUtil.UTF_8)); + ctx.write(buf, promise); } diff --git a/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java b/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java index 16cadfec9..9cfb4165e 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java +++ b/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java @@ -15,6 +15,9 @@ import io.netty.util.concurrent.GenericFutureListener; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A netty {@link ChannelHandler} responsible for monitoring the channel and * reconnecting when the connection is lost. @@ -23,6 +26,9 @@ import java.util.concurrent.TimeUnit; */ @ChannelHandler.Sharable public class ConnectionWatchdog extends ChannelInboundHandlerAdapter{ + + private final Logger log = LoggerFactory.getLogger(getClass()); + private Bootstrap bootstrap; private Channel channel; private ChannelGroup channels; @@ -84,31 +90,35 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter{ } private void doReConnect(final EventLoop loop, final CommandHandler handler, final RedisAsyncConnection connection, final int attempts) { - if (reconnect) { - ChannelFuture connect; - synchronized (bootstrap) { - connect = bootstrap.handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(ConnectionWatchdog.this, handler, connection); - } - }).connect(); - } - connect.addListener(new GenericFutureListener() { + if (!reconnect) { + return; + } + + log.debug("trying to reconnect {}", bootstrap); + + ChannelFuture connect; + synchronized (bootstrap) { + connect = bootstrap.handler(new ChannelInitializer() { @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - int timeout = 2 << attempts; - loop.schedule(new Runnable() { - @Override - public void run() { - doReConnect(loop, handler, connection, Math.min(BACKOFF_CAP, attempts + 1)); - } - }, timeout, TimeUnit.MILLISECONDS); - } + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(ConnectionWatchdog.this, handler, connection); } - }); + }).connect(); } + connect.addListener(new GenericFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + int timeout = 2 << attempts; + loop.schedule(new Runnable() { + @Override + public void run() { + doReConnect(loop, handler, connection, Math.min(BACKOFF_CAP, attempts + 1)); + } + }, timeout, TimeUnit.MILLISECONDS); + } + } + }); } @Override diff --git a/src/main/java/com/lambdaworks/redis/pubsub/PubSubOutput.java b/src/main/java/com/lambdaworks/redis/pubsub/PubSubOutput.java index f7cf4f38d..5803ea1e2 100644 --- a/src/main/java/com/lambdaworks/redis/pubsub/PubSubOutput.java +++ b/src/main/java/com/lambdaworks/redis/pubsub/PubSubOutput.java @@ -19,8 +19,8 @@ public class PubSubOutput extends CommandOutput { enum Type { message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe } private Type type; - private K channel; - private K pattern; + private String channel; + private String pattern; private long count; public PubSubOutput(RedisCodec codec) { @@ -31,11 +31,11 @@ public class PubSubOutput extends CommandOutput { return type; } - public K channel() { + public String channel() { return channel; } - public K pattern() { + public String pattern() { return pattern; } @@ -54,23 +54,28 @@ public class PubSubOutput extends CommandOutput { switch (type) { case pmessage: if (pattern == null) { - pattern = codec.decodeKey(bytes); + pattern = decodeAscii(bytes); break; } case message: if (channel == null) { - channel = codec.decodeKey(bytes); + channel = decodeAscii(bytes); break; } - output = codec.decodeValue(bytes); + if (channel.startsWith("__keyspace@") + || channel.startsWith("__keyevent@")) { + output = (V)decodeAscii(bytes); + } else { + output = codec.decodeValue(bytes); + } break; case psubscribe: case punsubscribe: - pattern = codec.decodeKey(bytes); + pattern = decodeAscii(bytes); break; case subscribe: case unsubscribe: - channel = codec.decodeKey(bytes); + channel = decodeAscii(bytes); break; } } diff --git a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubAdapter.java b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubAdapter.java index 63512dd97..7449c8a79 100644 --- a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubAdapter.java +++ b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubAdapter.java @@ -10,28 +10,28 @@ package com.lambdaworks.redis.pubsub; * * @author Will Glozer */ -public class RedisPubSubAdapter implements RedisPubSubListener { +public class RedisPubSubAdapter implements RedisPubSubListener { @Override - public void message(K channel, V message) { + public void message(String channel, V message) { } @Override - public void message(K pattern, K channel, V message) { + public void message(String pattern, String channel, V message) { } @Override - public void subscribed(K channel, long count) { + public void subscribed(String channel, long count) { } @Override - public void psubscribed(K pattern, long count) { + public void psubscribed(String pattern, long count) { } @Override - public void unsubscribed(K channel, long count) { + public void unsubscribed(String channel, long count) { } @Override - public void punsubscribed(K pattern, long count) { + public void punsubscribed(String pattern, long count) { } } diff --git a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java index 03751f2c5..b2360026d 100644 --- a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java +++ b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java @@ -39,9 +39,9 @@ import com.lambdaworks.redis.protocol.CommandArgs; * @author Will Glozer */ public class RedisPubSubConnection extends RedisAsyncConnection { - private final Queue> listeners = new ConcurrentLinkedQueue>(); - private Set channels; - private Set patterns; + private final Queue> listeners = new ConcurrentLinkedQueue>(); + private Set channels; + private Set patterns; /** * Initialize a new connection. @@ -54,8 +54,8 @@ public class RedisPubSubConnection extends RedisAsyncConnection { */ public RedisPubSubConnection(RedisClient client, BlockingQueue> queue, RedisCodec codec, long timeout, TimeUnit unit, EventLoopGroup eventLoopGroup) { super(client, queue, codec, timeout, unit, eventLoopGroup); - channels = new HashSet(); - patterns = new HashSet(); + channels = new HashSet(); + patterns = new HashSet(); } /** @@ -63,36 +63,32 @@ public class RedisPubSubConnection extends RedisAsyncConnection { * * @param listener Listener. */ - public void addListener(RedisPubSubListener listener) { + public void addListener(RedisPubSubListener listener) { listeners.add(listener); } - public Queue> getListeners() { - return listeners; - } - /** * Remove an existing listener. * * @param listener Listener. */ - public void removeListener(RedisPubSubListener listener) { + public void removeListener(RedisPubSubListener listener) { listeners.remove(listener); } - public void psubscribe(K... patterns) { + public void psubscribe(String... patterns) { dispatch(PSUBSCRIBE, new PubSubOutput(codec), args(patterns)); } - public void punsubscribe(K... patterns) { + public void punsubscribe(String... patterns) { dispatch(PUNSUBSCRIBE, new PubSubOutput(codec), args(patterns)); } - public void subscribe(K... channels) { + public void subscribe(String... channels) { dispatch(SUBSCRIBE, new PubSubOutput(codec), args(channels)); } - public void unsubscribe(K... channels) { + public void unsubscribe(String... channels) { dispatch(UNSUBSCRIBE, new PubSubOutput(codec), args(channels)); } @@ -101,7 +97,7 @@ public class RedisPubSubConnection extends RedisAsyncConnection { super.channelActive(ctx); if (channels.size() > 0) { - subscribe(toArray(channels)); + subscribe(channels.toArray(new String[channels.size()])); channels.clear(); } @@ -115,7 +111,7 @@ public class RedisPubSubConnection extends RedisAsyncConnection { @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { PubSubOutput output = (PubSubOutput) msg; - for (RedisPubSubListener listener : listeners) { + for (RedisPubSubListener listener : listeners) { switch (output.type()) { case message: listener.message(output.channel(), output.get()); @@ -143,9 +139,11 @@ public class RedisPubSubConnection extends RedisAsyncConnection { } } - private CommandArgs args(K... keys) { + private CommandArgs args(String... keys) { CommandArgs args = new CommandArgs(codec); - args.addKeys(keys); + for (String key : keys) { + args.add(key.toString()); + } return args; } diff --git a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubListener.java b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubListener.java index 45bbd6c24..223bc3749 100644 --- a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubListener.java +++ b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubListener.java @@ -9,14 +9,14 @@ package com.lambdaworks.redis.pubsub; * * @author Will Glozer */ -public interface RedisPubSubListener { +public interface RedisPubSubListener { /** * Message received from a channel subscription. * * @param channel Channel. * @param message Message. */ - void message(K channel, V message); + void message(String channel, V message); /** * Message received from a pattern subscription. @@ -25,7 +25,7 @@ public interface RedisPubSubListener { * @param channel Channel. * @param message Message. */ - void message(K pattern, K channel, V message); + void message(String pattern, String channel, V message); /** * Subscribed to a channel. @@ -33,7 +33,7 @@ public interface RedisPubSubListener { * @param channel Channel * @param count Subscription count. */ - void subscribed(K channel, long count); + void subscribed(String channel, long count); /** * Subscribed to a pattern. @@ -41,7 +41,7 @@ public interface RedisPubSubListener { * @param pattern Pattern. * @param count Subscription count. */ - void psubscribed(K pattern, long count); + void psubscribed(String pattern, long count); /** * Unsubscribed from a channel. @@ -49,7 +49,7 @@ public interface RedisPubSubListener { * @param channel Channel * @param count Subscription count. */ - void unsubscribed(K channel, long count); + void unsubscribed(String channel, long count); /** * Unsubscribed from a pattern. @@ -57,5 +57,5 @@ public interface RedisPubSubListener { * @param pattern Channel * @param count Subscription count. */ - void punsubscribed(K pattern, long count); + void punsubscribed(String pattern, long count); } diff --git a/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java b/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java index f48f278ce..0db424b37 100644 --- a/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java +++ b/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java @@ -26,7 +26,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; * @param * @param */ -public class RedisPubSubTopicListenerWrapper extends RedisPubSubAdapter { +public class RedisPubSubTopicListenerWrapper extends RedisPubSubAdapter { private final MessageListener listener; private final String name; @@ -42,7 +42,7 @@ public class RedisPubSubTopicListenerWrapper extends RedisPubSubAdapter ENTRIES = new ConcurrentHashMap(); + + private final UUID id; - private PubSubConnectionEntry pubSubEntry; - - RedissonCountDownLatch(ConnectionManager connectionManager, String name) { + RedissonCountDownLatch(ConnectionManager connectionManager, String name, UUID id) { super(connectionManager, name); + this.id = id; } private Future subscribe() { @@ -62,7 +63,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown Promise newPromise = newPromise(); final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise); value.aquire(); - RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getName(), value); + RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value); if (oldValue != null) { Promise oldPromise = aquire(); if (oldPromise == null) { @@ -71,7 +72,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return oldPromise; } - RedisPubSubAdapter listener = new RedisPubSubAdapter() { + RedisPubSubAdapter listener = new RedisPubSubAdapter() { @Override public void subscribed(String channel, long count) { @@ -95,19 +96,19 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown }; - pubSubEntry = connectionManager.subscribe(listener, getChannelName()); + connectionManager.subscribe(listener, getChannelName()); return newPromise; } private void release() { while (true) { - RedissonCountDownLatchEntry entry = ENTRIES.get(getName()); + RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); if (entry == null) { return; } RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry); newEntry.release(); - if (ENTRIES.replace(getName(), entry, newEntry)) { + if (ENTRIES.replace(getEntryName(), entry, newEntry)) { return; } } @@ -115,11 +116,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown private Promise aquire() { while (true) { - RedissonCountDownLatchEntry entry = ENTRIES.get(getName()); + RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); if (entry != null) { RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry); newEntry.aquire(); - if (ENTRIES.replace(getName(), entry, newEntry)) { + if (ENTRIES.replace(getEntryName(), entry, newEntry)) { return newEntry.getPromise(); } } else { @@ -135,7 +136,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown while (getCountInner() > 0) { // waiting for open state - RedissonCountDownLatchEntry entry = ENTRIES.get(getName()); + RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); if (entry != null) { entry.getLatch().await(); } @@ -161,7 +162,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } long current = System.currentTimeMillis(); // waiting for open state - RedissonCountDownLatchEntry entry = ENTRIES.get(getName()); + RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); if (entry != null) { entry.getLatch().await(time, TimeUnit.MILLISECONDS); } @@ -207,6 +208,10 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } } + private String getEntryName() { + return id + getName(); + } + private String getChannelName() { return groupName + getName(); } @@ -282,7 +287,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } } finally { close(); - ENTRIES.remove(getName()); + ENTRIES.remove(getEntryName()); } } @@ -292,11 +297,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown connectionManager.getGroup().schedule(new Runnable() { @Override public void run() { - RedissonCountDownLatchEntry entry = ENTRIES.get(getName()); + RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); if (entry != null && entry.isFree() - && ENTRIES.remove(getName(), entry)) { - connectionManager.unsubscribe(pubSubEntry, getChannelName()); + && ENTRIES.remove(getEntryName(), entry)) { + connectionManager.unsubscribe(getChannelName()); } } }, 15, TimeUnit.SECONDS); diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index e314b3782..8aa98c93c 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -19,6 +19,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.io.Serializable; +import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -26,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.RLock; import com.lambdaworks.redis.RedisConnection; @@ -110,8 +110,6 @@ public class RedissonLock extends RedissonObject implements RLock { private static final ConcurrentMap ENTRIES = new ConcurrentHashMap(); - private PubSubConnectionEntry pubSubEntry; - RedissonLock(ConnectionManager connectionManager, String name, UUID id) { super(connectionManager, name); this.id = id; @@ -119,25 +117,29 @@ public class RedissonLock extends RedissonObject implements RLock { private void release() { while (true) { - RedissonLockEntry entry = ENTRIES.get(getName()); + RedissonLockEntry entry = ENTRIES.get(getEntryName()); if (entry == null) { return; } RedissonLockEntry newEntry = new RedissonLockEntry(entry); newEntry.release(); - if (ENTRIES.replace(getName(), entry, newEntry)) { + if (ENTRIES.replace(getEntryName(), entry, newEntry)) { return; } } } + + private String getEntryName() { + return id + getName(); + } private Promise aquire() { while (true) { - RedissonLockEntry entry = ENTRIES.get(getName()); + RedissonLockEntry entry = ENTRIES.get(getEntryName()); if (entry != null) { RedissonLockEntry newEntry = new RedissonLockEntry(entry); newEntry.aquire(); - if (ENTRIES.replace(getName(), entry, newEntry)) { + if (ENTRIES.replace(getEntryName(), entry, newEntry)) { return newEntry.getPromise(); } } else { @@ -155,7 +157,7 @@ public class RedissonLock extends RedissonObject implements RLock { Promise newPromise = newPromise(); final RedissonLockEntry value = new RedissonLockEntry(newPromise); value.aquire(); - RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getName(), value); + RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value); if (oldValue != null) { Promise oldPromise = aquire(); if (oldPromise == null) { @@ -164,9 +166,10 @@ public class RedissonLock extends RedissonObject implements RLock { return oldPromise; } +// init(); value.getLatch().acquireUninterruptibly(); - RedisPubSubAdapter listener = new RedisPubSubAdapter() { + RedisPubSubAdapter listener = new RedisPubSubAdapter() { @Override public void subscribed(String channel, long count) { @@ -176,7 +179,7 @@ public class RedissonLock extends RedissonObject implements RLock { } @Override - public void message(String channel, Integer message) { + public void message(String channel, Object message) { if (message.equals(unlockMessage) && getChannelName().equals(channel)) { value.getLatch().release(); } @@ -184,10 +187,43 @@ public class RedissonLock extends RedissonObject implements RLock { }; - pubSubEntry = connectionManager.subscribe(listener, getChannelName()); + connectionManager.subscribe(listener, getChannelName()); + + RedisPubSubAdapter expireListener = new RedisPubSubAdapter() { + + @Override + public void message(String channel, Object message) { + if (getExpireChannelName().equals(channel) + && "expired".equals(message)) { + forceUnlock(); + } + } + + }; + + connectionManager.subscribe(expireListener, getExpireChannelName()); return newPromise; } + private String getExpireChannelName() { + return "__keyspace@0__:\"" + getName() + "\""; + } + + /** + * Turning on the notify-keyspace-events for Keyevent events from Expired keys + * + */ + private void init() { + RedisConnection conn = connectionManager.connectionWriteOp(); + try { + if (!conn.configSet("notify-keyspace-events", "KEx").equals("OK")) { + throw new IllegalStateException(); + } + } finally { + connectionManager.releaseWrite(conn); + } + } + @Override public void lock() { try { @@ -198,10 +234,6 @@ public class RedissonLock extends RedissonObject implements RLock { } } - private String getKeyName() { - return "redisson__lock__" + getName(); - } - private String getChannelName() { return "redisson__lock__channel__" + getName(); } @@ -210,7 +242,7 @@ public class RedissonLock extends RedissonObject implements RLock { public void lockInterruptibly() throws InterruptedException { while (!tryLock()) { // waiting for message - RedissonLockEntry entry = ENTRIES.get(getName()); + RedissonLockEntry entry = ENTRIES.get(getEntryName()); if (entry != null) { entry.getLatch().acquire(); } @@ -235,12 +267,12 @@ public class RedissonLock extends RedissonObject implements RLock { RedisConnection connection = connectionManager.connectionWriteOp(); try { - Boolean res = connection.setnx(getKeyName(), currentLock); + Boolean res = connection.setnx(getName(), currentLock); if (!res) { - LockValue lock = (LockValue) connection.get(getKeyName()); + LockValue lock = (LockValue) connection.get(getName()); if (lock != null && lock.equals(currentLock)) { lock.incCounter(); - connection.set(getKeyName(), lock); + connection.set(getName(), lock); return true; } } @@ -265,7 +297,7 @@ public class RedissonLock extends RedissonObject implements RLock { } long current = System.currentTimeMillis(); // waiting for message - RedissonLockEntry entry = ENTRIES.get(getName()); + RedissonLockEntry entry = ENTRIES.get(getEntryName()); if (entry != null) { entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } @@ -286,12 +318,12 @@ public class RedissonLock extends RedissonObject implements RLock { RedisConnection connection = connectionManager.connectionWriteOp(); try { - LockValue lock = (LockValue) connection.get(getKeyName()); + LockValue lock = (LockValue) connection.get(getName()); LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); if (lock != null && lock.equals(currentLock)) { if (lock.getCounter() > 1) { lock.decCounter(); - connection.set(getKeyName(), lock); + connection.set(getName(), lock); } else { unlock(connection); } @@ -312,9 +344,10 @@ public class RedissonLock extends RedissonObject implements RLock { int counter = 0; while (counter < 5) { connection.multi(); - connection.del(getKeyName()); + connection.del(getName()); connection.publish(getChannelName(), unlockMessage); - if (connection.exec().size() == 2) { + List res = connection.exec(); + if (res.size() == 2) { return; } counter++; @@ -337,10 +370,7 @@ public class RedissonLock extends RedissonObject implements RLock { RedisConnection connection = connectionManager.connectionWriteOp(); try { - LockValue lock = (LockValue) connection.get(getKeyName()); - if (lock != null) { - unlock(connection); - } + unlock(connection); } finally { connectionManager.releaseWrite(connection); } @@ -357,7 +387,7 @@ public class RedissonLock extends RedissonObject implements RLock { RedisConnection connection = connectionManager.connectionReadOp(); try { - LockValue lock = (LockValue) connection.get(getKeyName()); + LockValue lock = (LockValue) connection.get(getName()); return lock != null; } finally { connectionManager.releaseRead(connection); @@ -375,7 +405,7 @@ public class RedissonLock extends RedissonObject implements RLock { RedisConnection connection = connectionManager.connectionReadOp(); try { - LockValue lock = (LockValue) connection.get(getKeyName()); + LockValue lock = (LockValue) connection.get(getName()); LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); return lock != null && lock.equals(currentLock); } finally { @@ -394,7 +424,7 @@ public class RedissonLock extends RedissonObject implements RLock { RedisConnection connection = connectionManager.connectionReadOp(); try { - LockValue lock = (LockValue) connection.get(getKeyName()); + LockValue lock = (LockValue) connection.get(getName()); LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); if (lock != null && lock.equals(currentLock)) { return lock.getCounter(); @@ -411,7 +441,7 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public void delete() { forceUnlock(); - ENTRIES.remove(getName()); + ENTRIES.remove(getEntryName()); } public void close() { @@ -420,11 +450,12 @@ public class RedissonLock extends RedissonObject implements RLock { connectionManager.getGroup().schedule(new Runnable() { @Override public void run() { - RedissonLockEntry entry = ENTRIES.get(getName()); + RedissonLockEntry entry = ENTRIES.get(getEntryName()); if (entry != null && entry.isFree() - && ENTRIES.remove(getName(), entry)) { - connectionManager.unsubscribe(pubSubEntry, getChannelName()); + && ENTRIES.remove(getEntryName(), entry)) { + connectionManager.unsubscribe(getChannelName()); +// connectionManager.unsubscribe(getExpireChannelName()); } } }, 15, TimeUnit.SECONDS); diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index 94d1e568c..c44677493 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -50,15 +50,15 @@ public class RedissonTopic extends RedissonObject implements RTopic { @Override public int addListener(MessageListener listener) { - RedisPubSubTopicListenerWrapper pubSubListener = new RedisPubSubTopicListenerWrapper(listener, getName()); + RedisPubSubTopicListenerWrapper pubSubListener = new RedisPubSubTopicListenerWrapper(listener, getName()); return addListener(pubSubListener); } - private int addListener(RedisPubSubTopicListenerWrapper pubSubListener) { + private int addListener(RedisPubSubTopicListenerWrapper pubSubListener) { PubSubConnectionEntry entry = connectionManager.subscribe(getName()); synchronized (entry) { if (entry.isActive()) { - entry.addListener(pubSubListener); + entry.addListener(getName(), pubSubListener); return pubSubListener.hashCode(); } } @@ -74,8 +74,8 @@ public class RedissonTopic extends RedissonObject implements RTopic { } synchronized (entry) { if (entry.isActive()) { - entry.removeListener(listenerId); - connectionManager.unsubscribe(entry, getName()); + entry.removeListener(getName(), listenerId); + connectionManager.unsubscribe(getName()); return; } } diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index b1417aaa5..524cb710e 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -52,6 +52,30 @@ abstract class BaseLoadBalancer implements LoadBalancer { clientsEmpty.open(); } + public void unfreeze(String host, int port) { + InetSocketAddress addr = new InetSocketAddress(host, port); + for (ConnectionEntry connectionEntry : clients) { + if (!connectionEntry.getClient().getAddr().equals(addr)) { + continue; + } + connectionEntry.setFreezed(false); + } + throw new IllegalStateException("Can't find " + addr + " in slaves!"); + } + + public Queue freeze(String host, int port) { + InetSocketAddress addr = new InetSocketAddress(host, port); + for (ConnectionEntry connectionEntry : clients) { + if (!connectionEntry.getClient().getAddr().equals(addr)) { + continue; + } + + connectionEntry.setFreezed(true); + return connectionEntry.getSubscribeConnections(); + } + throw new IllegalStateException("Can't find " + addr + " in slaves!"); + } + public Queue remove(String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); for (Iterator iterator = clients.iterator(); iterator.hasNext();) { @@ -60,8 +84,8 @@ abstract class BaseLoadBalancer implements LoadBalancer { continue; } - log.info("slave {} removed", entry.getClient().getAddr()); iterator.remove(); + log.info("slave {} removed", entry.getClient().getAddr()); if (clients.isEmpty()) { clientsEmpty.close(); } @@ -93,7 +117,8 @@ abstract class BaseLoadBalancer implements LoadBalancer { int index = getIndex(clientsCopy); ConnectionEntry entry = clientsCopy.get(index); - if (!entry.getSubscribeConnectionsSemaphore().tryAcquire()) { + if (!entry.getSubscribeConnectionsSemaphore().tryAcquire() + || entry.isFreezed()) { clientsCopy.remove(index); } else { try { @@ -134,7 +159,8 @@ abstract class BaseLoadBalancer implements LoadBalancer { int index = getIndex(clientsCopy); ConnectionEntry entry = clientsCopy.get(index); - if (!entry.getConnectionsSemaphore().tryAcquire()) { + if (!entry.getConnectionsSemaphore().tryAcquire() + || entry.isFreezed()) { clientsCopy.remove(index); } else { RedisConnection conn = entry.getConnections().poll(); diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index 901a9231e..06bccf92e 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -25,6 +25,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection; public class ConnectionEntry { + private volatile boolean freezed; private final RedisClient client; private final Semaphore subscribeConnectionsSemaphore; @@ -47,6 +48,14 @@ public class ConnectionEntry { return client; } + public boolean isFreezed() { + return freezed; + } + + public void setFreezed(boolean freezed) { + this.freezed = freezed; + } + public void shutdown() { connectionsSemaphore.acquireUninterruptibly(poolSize); client.shutdown(); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 2ccda918b..7a18a2b39 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -16,28 +16,10 @@ package org.redisson.connection; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.FutureListener; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Semaphore; - -import org.redisson.Config; -import org.redisson.codec.RedisCodecWrapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisConnection; -import com.lambdaworks.redis.codec.RedisCodec; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; -import com.lambdaworks.redis.pubsub.RedisPubSubConnection; /** * @@ -47,8 +29,6 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection; //TODO ping support public interface ConnectionManager { - void changeMaster(String host, int port); - FutureListener createReleaseWriteListener(final RedisConnection conn); FutureListener createReleaseReadListener(final RedisConnection conn); @@ -61,9 +41,9 @@ public interface ConnectionManager { PubSubConnectionEntry subscribe(String channelName); - PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, String channelName); + PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, String channelName); - void unsubscribe(PubSubConnectionEntry entry, String channelName); + void unsubscribe(String channelName); void releaseWrite(RedisConnection сonnection); diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index 11a9ff1c6..829c6cfc7 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -23,6 +23,10 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection; public interface LoadBalancer { + void unfreeze(String host, int port); + + Queue freeze(String host, int port); + void init(RedisCodec codec, String password); void add(ConnectionEntry entry); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index b31007508..4d8e77613 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -21,10 +21,9 @@ import io.netty.util.concurrent.FutureListener; import java.net.URI; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; @@ -63,7 +62,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap(); - private LoadBalancer balancer; + protected LoadBalancer balancer; private final List slaveClients = new ArrayList(); protected volatile RedisClient masterClient; @@ -105,7 +104,22 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.codec = new RedisCodecWrapper(cfg.getCodec()); } - public void changeMaster(String host, int port) { + protected void slaveDown(String host, int port) { + Queue connections = balancer.freeze(host, port); + reattachListeners(connections); + } + + protected void slaveUp(String host, int port) { + balancer.unfreeze(host, port); + } + + /** + * Remove slave with host:port from slaves list. + * Re-attach pub/sub listeners from it to other slave. + * Shutdown old master client. + * + */ + protected void changeMaster(String host, int port) { RedisClient oldMaster = masterClient; masterClient = new RedisClient(group, host, port); Queue connections = balancer.remove(host, port); @@ -116,22 +130,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private void reattachListeners(Queue connections) { for (Entry mapEntry : name2PubSubConnection.entrySet()) { for (RedisPubSubConnection redisPubSubConnection : connections) { - if (!mapEntry.getValue().getConnection().equals(redisPubSubConnection)) { + PubSubConnectionEntry entry = mapEntry.getValue(); + String channelName = mapEntry.getKey(); + + if (!entry.getConnection().equals(redisPubSubConnection)) { continue; } - PubSubConnectionEntry entry = mapEntry.getValue(); - String channelName = mapEntry.getKey(); synchronized (entry) { entry.close(); - unsubscribeEntry(entry, channelName); + unsubscribe(channelName); - List listeners = entry.getListeners(channelName); + Collection listeners = entry.getListeners(channelName); if (!listeners.isEmpty()) { - PubSubConnectionEntry newEntry = subscribe(mapEntry.getKey()); + PubSubConnectionEntry newEntry = subscribe(channelName); for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(redisPubSubListener); + newEntry.addListener(channelName, redisPubSubListener); } } } @@ -224,7 +239,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, String channelName) { + public PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, String channelName) { PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { return сonnEntry; @@ -267,16 +282,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public void unsubscribe(PubSubConnectionEntry entry, String channelName) { - if (entry.hasListeners(channelName)) { + public void unsubscribe(String channelName) { + PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); + if (entry == null) { return; } - - unsubscribeEntry(entry, channelName); - } - - private void unsubscribeEntry(PubSubConnectionEntry entry, String channelName) { - name2PubSubConnection.remove(channelName); + entry.unsubscribe(channelName); if (entry.tryClose()) { returnSubscribeConnection(entry); diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index ea2071af0..b14a96bf2 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -15,12 +15,14 @@ */ package org.redisson.connection; -import java.util.ArrayList; -import java.util.List; +import java.util.Collection; +import java.util.Collections; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; -import org.redisson.RedisPubSubTopicListenerWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,7 @@ public class PubSubConnectionEntry { private final Semaphore subscribedChannelsAmount; private final RedisPubSubConnection conn; private final int subscriptionsPerConnection; + private final ConcurrentMap> channelListeners = new ConcurrentHashMap>(); public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { super(); @@ -46,7 +49,32 @@ public class PubSubConnectionEntry { this.subscribedChannelsAmount = new Semaphore(subscriptionsPerConnection); } - public void addListener(RedisPubSubListener listener) { + public Collection getListeners(String channelName) { + Collection result = channelListeners.get(channelName); + if (result == null) { + return Collections.emptyList(); + } + return result; + } + + public void addListener(String channelName, RedisPubSubListener listener) { + Queue queue = channelListeners.get(channelName); + if (queue == null) { + queue = new ConcurrentLinkedQueue(); + Queue oldQueue = channelListeners.putIfAbsent(channelName, queue); + if (oldQueue != null) { + queue = oldQueue; + } + } + + synchronized (queue) { + if (channelListeners.get(channelName) != queue) { + addListener(channelName, listener); + return; + } + queue.add(listener); + } + conn.addListener(listener); } @@ -58,44 +86,24 @@ public class PubSubConnectionEntry { status = Status.INACTIVE; } - public List getListeners(String channelName) { - List result = new ArrayList(); - Queue queue = conn.getListeners(); - for (RedisPubSubListener listener : queue) { - if (!(listener instanceof RedisPubSubTopicListenerWrapper)) { - continue; - } - - RedisPubSubTopicListenerWrapper entry = (RedisPubSubTopicListenerWrapper) listener; - if (entry.getName().equals(channelName)) { - result.add(entry); - } - } - return result; - } - // TODO optimize - public boolean hasListeners(String channelName) { - return !getListeners(channelName).isEmpty(); - } - - // TODO optimize - public void removeListener(int listenerId) { - Queue queue = conn.getListeners(); - for (RedisPubSubListener listener : queue) { - if (!(listener instanceof RedisPubSubTopicListenerWrapper)) { - continue; - } - - RedisPubSubTopicListenerWrapper entry = (RedisPubSubTopicListenerWrapper) listener; - if (entry.hashCode() == listenerId) { - removeListener(entry); + public void removeListener(String channelName, int listenerId) { + Queue listeners = channelListeners.get(channelName); + for (RedisPubSubListener listener : listeners) { + if (listener.hashCode() == listenerId) { + removeListener(channelName, listener); break; } } } - public void removeListener(RedisPubSubListener listener) { + public void removeListener(String channelName, RedisPubSubListener listener) { + Queue queue = channelListeners.get(channelName); + synchronized (queue) { + if (queue.remove(listener)) { + channelListeners.remove(channelName, new ConcurrentLinkedQueue()); + } + } conn.removeListener(listener); } @@ -110,12 +118,12 @@ public class PubSubConnectionEntry { public void subscribe(final String channelName) { conn.addListener(new RedisPubSubAdapter() { @Override - public void subscribed(Object channel, long count) { + public void subscribed(String channel, long count) { log.debug("subscribed to '{}' channel", channelName); } @Override - public void unsubscribed(Object channel, long count) { + public void unsubscribed(String channel, long count) { log.debug("unsubscribed from '{}' channel", channelName); } }); @@ -123,7 +131,7 @@ public class PubSubConnectionEntry { } - public void subscribe(RedisPubSubAdapter listener, Object channel) { + public void subscribe(RedisPubSubAdapter listener, String channel) { conn.addListener(listener); conn.subscribe(channel); } diff --git a/src/test/java/org/redisson/RedissonLockTest.java b/src/test/java/org/redisson/RedissonLockTest.java index 84e1624a1..9515e390f 100644 --- a/src/test/java/org/redisson/RedissonLockTest.java +++ b/src/test/java/org/redisson/RedissonLockTest.java @@ -1,6 +1,7 @@ package org.redisson; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -21,9 +22,34 @@ public class RedissonLockTest extends BaseConcurrentTest { @After public void after() { - redisson.shutdown(); + try { + redisson.flushdb(); + } finally { + redisson.shutdown(); + } } +// @Test + public void testExpire() throws InterruptedException { + RLock lock = redisson.getLock("lock"); + lock.lock(); +// lock.expire(2, TimeUnit.SECONDS); + + final CountDownLatch latch = new CountDownLatch(1); + new Thread() { + public void run() { + RLock lock1 = redisson.getLock("lock"); + lock1.lock(); + lock1.unlock(); + latch.countDown(); + }; + }.start(); + + latch.await(); + + lock.unlock(); + } + @Test public void testGetHoldCount() { RLock lock = redisson.getLock("lock"); @@ -156,9 +182,14 @@ public class RedissonLockTest extends BaseConcurrentTest { @Override public void run(Redisson redisson) { Lock lock = redisson.getLock("testConcurrency_SingleInstance"); + System.out.println("lock1 " + Thread.currentThread().getId()); lock.lock(); + System.out.println("lock2 "+ Thread.currentThread().getId()); lockedCounter.set(lockedCounter.get() + 1); + System.out.println("lockedCounter " + lockedCounter); + System.out.println("unlock1 "+ Thread.currentThread().getId()); lock.unlock(); + System.out.println("unlock2 "+ Thread.currentThread().getId()); } });