diff --git a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java index b2360026d..a00338f62 100644 --- a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java +++ b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java @@ -8,6 +8,7 @@ import static com.lambdaworks.redis.protocol.CommandType.SUBSCRIBE; import static com.lambdaworks.redis.protocol.CommandType.UNSUBSCRIBE; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.Future; import java.lang.reflect.Array; import java.util.Collection; @@ -88,8 +89,8 @@ public class RedisPubSubConnection extends RedisAsyncConnection { dispatch(SUBSCRIBE, new PubSubOutput(codec), args(channels)); } - public void unsubscribe(String... channels) { - dispatch(UNSUBSCRIBE, new PubSubOutput(codec), args(channels)); + public Future unsubscribe(String... channels) { + return dispatch(UNSUBSCRIBE, new PubSubOutput(codec), args(channels)); } @Override diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 8aa98c93c..c83224183 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.redisson.connection.ConnectionManager; import org.redisson.core.RLock; @@ -108,6 +110,7 @@ public class RedissonLock extends RedissonObject implements RLock { private static final Integer unlockMessage = 0; + private static final ReadWriteLock lock = new ReentrantReadWriteLock(); private static final ConcurrentMap ENTRIES = new ConcurrentHashMap(); RedissonLock(ConnectionManager connectionManager, String name, UUID id) { @@ -119,18 +122,34 @@ public class RedissonLock extends RedissonObject implements RLock { while (true) { RedissonLockEntry entry = ENTRIES.get(getEntryName()); if (entry == null) { + lock.readLock().unlock(); return; } RedissonLockEntry newEntry = new RedissonLockEntry(entry); newEntry.release(); if (ENTRIES.replace(getEntryName(), entry, newEntry)) { + if (!newEntry.isFree()) { + lock.readLock().unlock(); + return; + } + lock.readLock().unlock(); + + lock.writeLock().lock(); + try { + if (ENTRIES.remove(getEntryName(), newEntry)) { + Future future = connectionManager.unsubscribe(getChannelName()); + future.awaitUninterruptibly(); + } + } finally { + lock.writeLock().unlock(); + } return; } } } private String getEntryName() { - return id + getName(); + return getName(); } private Promise aquire() { @@ -149,6 +168,7 @@ public class RedissonLock extends RedissonObject implements RLock { } private Future subscribe() { + lock.readLock().lock(); Promise promise = aquire(); if (promise != null) { return promise; @@ -161,7 +181,11 @@ public class RedissonLock extends RedissonObject implements RLock { if (oldValue != null) { Promise oldPromise = aquire(); if (oldPromise == null) { - return subscribe(); + try { + return subscribe(); + } finally { + lock.readLock().unlock(); + } } return oldPromise; } @@ -189,19 +213,19 @@ public class RedissonLock extends RedissonObject implements RLock { connectionManager.subscribe(listener, getChannelName()); - RedisPubSubAdapter expireListener = new RedisPubSubAdapter() { - - @Override - public void message(String channel, Object message) { - if (getExpireChannelName().equals(channel) - && "expired".equals(message)) { - forceUnlock(); - } - } - - }; - - connectionManager.subscribe(expireListener, getExpireChannelName()); +// RedisPubSubAdapter expireListener = new RedisPubSubAdapter() { +// +// @Override +// public void message(String channel, Object message) { +// if (getExpireChannelName().equals(channel) +// && "expired".equals(message)) { +// forceUnlock(); +// } +// } +// +// }; +// +// connectionManager.subscribe(expireListener, getExpireChannelName()); return newPromise; } @@ -240,12 +264,19 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public void lockInterruptibly() throws InterruptedException { - while (!tryLock()) { - // waiting for message - RedissonLockEntry entry = ENTRIES.get(getEntryName()); - if (entry != null) { - entry.getLatch().acquire(); + Future promise = subscribe(); + try { + promise.awaitUninterruptibly(); + + while (!tryLock()) { + // waiting for message + RedissonLockEntry entry = ENTRIES.get(getEntryName()); + if (entry != null) { + entry.getLatch().acquire(); + } } + } finally { + release(); } } @@ -257,7 +288,7 @@ public class RedissonLock extends RedissonObject implements RLock { return tryLockInner(); } finally { - close(); + release(); } } @@ -306,7 +337,7 @@ public class RedissonLock extends RedissonObject implements RLock { } return true; } finally { - close(); + release(); } } @@ -336,7 +367,7 @@ public class RedissonLock extends RedissonObject implements RLock { connectionManager.releaseWrite(connection); } } finally { - close(); + release(); } } @@ -375,7 +406,7 @@ public class RedissonLock extends RedissonObject implements RLock { connectionManager.releaseWrite(connection); } } finally { - close(); + release(); } } @@ -393,7 +424,7 @@ public class RedissonLock extends RedissonObject implements RLock { connectionManager.releaseRead(connection); } } finally { - close(); + release(); } } @@ -412,7 +443,7 @@ public class RedissonLock extends RedissonObject implements RLock { connectionManager.releaseRead(connection); } } finally { - close(); + release(); } } @@ -434,31 +465,15 @@ public class RedissonLock extends RedissonObject implements RLock { connectionManager.releaseRead(connection); } } finally { - close(); + release(); } } @Override public void delete() { forceUnlock(); - ENTRIES.remove(getEntryName()); - } - - public void close() { + lock.readLock().lock(); release(); - - connectionManager.getGroup().schedule(new Runnable() { - @Override - public void run() { - RedissonLockEntry entry = ENTRIES.get(getEntryName()); - if (entry != null - && entry.isFree() - && ENTRIES.remove(getEntryName(), entry)) { - connectionManager.unsubscribe(getChannelName()); -// connectionManager.unsubscribe(getExpireChannelName()); - } - } - }, 15, TimeUnit.SECONDS); } - + } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 7a18a2b39..dbcb8455a 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -16,6 +16,7 @@ package org.redisson.connection; import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import com.lambdaworks.redis.RedisConnection; @@ -43,7 +44,7 @@ public interface ConnectionManager { PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, String channelName); - void unsubscribe(String channelName); + Future unsubscribe(String channelName); void releaseWrite(RedisConnection сonnection); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 7116127e0..8fd91a660 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -17,6 +17,7 @@ package org.redisson.connection; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import java.net.URI; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisConnection; +import com.lambdaworks.redis.RedisConnectionClosedException; import com.lambdaworks.redis.codec.RedisCodec; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubConnection; @@ -224,8 +226,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager { entry.release(); return oldEntry; } - entry.subscribe(channelName); - return entry; + + synchronized (entry) { + if (!entry.isActive()) { + entry.release(); + return subscribe(channelName); + } + entry.subscribe(channelName); + return entry; + } } } @@ -238,8 +247,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager { returnSubscribeConnection(entry); return oldEntry; } - entry.subscribe(channelName); - return entry; + + synchronized (entry) { + if (!entry.isActive()) { + entry.release(); + return subscribe(channelName); + } + entry.subscribe(channelName); + return entry; + } } RedisPubSubConnection nextPubSubConnection() { @@ -253,30 +269,42 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return сonnEntry; } - Set entries = new HashSet(name2PubSubConnection.values()); - for (PubSubConnectionEntry entry : entries) { - if (entry.tryAcquire()) { - PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { + Set entries = new HashSet(name2PubSubConnection.values()); + for (PubSubConnectionEntry entry : entries) { + if (entry.tryAcquire()) { + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + entry.release(); + return oldEntry; + } + synchronized (entry) { + if (!entry.isActive()) { + entry.release(); + return subscribe(listener, channelName); + } + entry.subscribe(listener, channelName); + return entry; + } + } + } + + RedisPubSubConnection conn = nextPubSubConnection(); + + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); + entry.tryAcquire(); + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + returnSubscribeConnection(entry); + return oldEntry; + } + synchronized (entry) { + if (!entry.isActive()) { entry.release(); - return oldEntry; + return subscribe(listener, channelName); } entry.subscribe(listener, channelName); return entry; } - } - - RedisPubSubConnection conn = nextPubSubConnection(); - - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); - entry.tryAcquire(); - PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { - returnSubscribeConnection(entry); - return oldEntry; - } - entry.subscribe(listener, channelName); - return entry; } void acquireMasterConnection() { @@ -290,16 +318,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public void unsubscribe(String channelName) { - PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); + public Future unsubscribe(String channelName) { + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { - return; + return group.next().newSucceededFuture(null); } - entry.unsubscribe(channelName); - if (entry.tryClose()) { - returnSubscribeConnection(entry); - } + Future future = entry.unsubscribe(channelName); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + synchronized (entry) { + if (entry.tryClose()) { + returnSubscribeConnection(entry); + } + } + } + }); + return future; } protected void returnSubscribeConnection(PubSubConnectionEntry entry) { diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index b14a96bf2..afca5446b 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -15,6 +15,9 @@ */ package org.redisson.connection; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + import java.util.Collection; import java.util.Collections; import java.util.Queue; @@ -26,6 +29,7 @@ import java.util.concurrent.Semaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.lambdaworks.redis.RedisConnectionClosedException; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubConnection; import com.lambdaworks.redis.pubsub.RedisPubSubListener; @@ -132,18 +136,31 @@ public class PubSubConnectionEntry { public void subscribe(RedisPubSubAdapter listener, String channel) { - conn.addListener(listener); + addListener(channel, listener); conn.subscribe(channel); } - public void unsubscribe(String channel) { - conn.unsubscribe(channel); - subscribedChannelsAmount.release(); + public Future unsubscribe(final String channel) { + Queue listeners = channelListeners.get(channel); + if (listeners != null) { + for (RedisPubSubListener listener : listeners) { + removeListener(channel, listener); + } + } + + Future future = conn.unsubscribe(channel); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + subscribedChannelsAmount.release(); + } + }); + return future; } public boolean tryClose() { if (subscribedChannelsAmount.tryAcquire(subscriptionsPerConnection)) { - conn.close(); + close(); return true; } return false; diff --git a/src/test/java/org/redisson/RedissonLockTest.java b/src/test/java/org/redisson/RedissonLockTest.java index 9515e390f..b2d82cafc 100644 --- a/src/test/java/org/redisson/RedissonLockTest.java +++ b/src/test/java/org/redisson/RedissonLockTest.java @@ -206,6 +206,11 @@ public class RedissonLockTest extends BaseConcurrentTest { public void run(Redisson redisson) { for (int i = 0; i < iterations; i++) { redisson.getLock("testConcurrency_MultiInstance1").lock(); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } lockedCounter.set(lockedCounter.get() + 1); redisson.getLock("testConcurrency_MultiInstance1").unlock(); }