diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index efa713a6e..f864b25de 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -266,14 +266,16 @@ public class RedissonLock extends RedissonExpirable implements RLock { final long threadId = Thread.currentThread().getId(); Future future = subscribe(threadId); if (!await(future, time, TimeUnit.MILLISECONDS)) { - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - unsubscribe(future, threadId); + if (!future.cancel(false)) { + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + unsubscribe(future, threadId); + } } - } - }); + }); + } return false; } @@ -639,6 +641,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public void run() { if (!subscribeFuture.isDone()) { + subscribeFuture.cancel(false); result.trySuccess(false); } } diff --git a/src/main/java/org/redisson/RedissonPatternTopic.java b/src/main/java/org/redisson/RedissonPatternTopic.java index 5c70f5a8a..1a4901bb5 100644 --- a/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/src/main/java/org/redisson/RedissonPatternTopic.java @@ -17,7 +17,6 @@ package org.redisson; import java.util.Collections; import java.util.List; -import java.util.concurrent.Semaphore; import org.redisson.api.PatternMessageListener; import org.redisson.api.PatternStatusListener; @@ -26,6 +25,7 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.command.CommandExecutor; import org.redisson.connection.PubSubConnectionEntry; +import org.redisson.pubsub.AsyncSemaphore; import io.netty.util.concurrent.Future; @@ -71,7 +71,7 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeListener(int listenerId) { - Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); semaphore.acquireUninterruptibly(); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index b0376c1e9..18ce36baf 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -17,7 +17,6 @@ package org.redisson; import java.util.Collections; import java.util.List; -import java.util.concurrent.Semaphore; import org.redisson.api.MessageListener; import org.redisson.api.RTopic; @@ -26,8 +25,8 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.PubSubConnectionEntry; +import org.redisson.pubsub.AsyncSemaphore; import io.netty.util.concurrent.Future; @@ -87,7 +86,7 @@ public class RedissonTopic implements RTopic { @Override public void removeListener(int listenerId) { - Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); semaphore.acquireUninterruptibly(); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 13ced9a9b..2410a32f3 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -19,7 +19,6 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.Collection; import java.util.Set; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.redisson.api.NodeType; @@ -30,6 +29,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.InfinitySemaphoreLatch; +import org.redisson.pubsub.AsyncSemaphore; import io.netty.channel.EventLoopGroup; import io.netty.util.Timeout; @@ -48,7 +48,7 @@ public interface ConnectionManager { boolean isClusterMode(); - Semaphore getSemaphore(String channelName); + AsyncSemaphore getSemaphore(String channelName); Future newSucceededFuture(R value); @@ -60,7 +60,7 @@ public interface ConnectionManager { Future subscribe(Codec codec, String channelName, RedisPubSubListener listener); - Future subscribe(Codec codec, String channelName, final RedisPubSubListener listener, Semaphore semaphore); + Future subscribe(Codec codec, String channelName, RedisPubSubListener listener, AsyncSemaphore semaphore); ConnectionInitializer getConnectListener(); @@ -102,15 +102,15 @@ public interface ConnectionManager { Future psubscribe(String pattern, Codec codec, RedisPubSubListener listener); - Future psubscribe(String pattern, Codec codec, RedisPubSubListener listener, Semaphore semaphore); + Future psubscribe(String pattern, Codec codec, RedisPubSubListener listener, AsyncSemaphore semaphore); - Codec unsubscribe(final String channelName, Semaphore lock); + Codec unsubscribe(String channelName, AsyncSemaphore lock); Codec unsubscribe(String channelName); Codec punsubscribe(String channelName); - Codec punsubscribe(final String channelName, Semaphore lock); + Codec punsubscribe(String channelName, AsyncSemaphore lock); void shutdown(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 1ebcf75fa..53c83dd8b 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -49,6 +49,8 @@ import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.InfinitySemaphoreLatch; +import org.redisson.pubsub.AsyncSemaphore; +import org.redisson.pubsub.TransferListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,13 +138,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub(); - private final Semaphore[] locks = new Semaphore[50]; + private final AsyncSemaphore[] locks = new AsyncSemaphore[50]; private final Semaphore freePubSubLock = new Semaphore(1); { for (int i = 0; i < locks.length; i++) { - locks[i] = new Semaphore(1); + locks[i] = new AsyncSemaphore(1); } } @@ -310,43 +312,58 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public Future psubscribe(String channelName, Codec codec, RedisPubSubListener listener) { - Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)]; - lock.acquireUninterruptibly(); - return psubscribe(channelName, codec, listener, lock); + public Future psubscribe(final String channelName, final Codec codec, final RedisPubSubListener listener) { + final AsyncSemaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)]; + final Promise result = newPromise(); + lock.acquire(new Runnable() { + @Override + public void run() { + Future future = psubscribe(channelName, codec, listener, lock); + future.addListener(new TransferListener(result)); + } + }); + return result; } - public Future psubscribe(String channelName, Codec codec, RedisPubSubListener listener, Semaphore semaphore) { + public Future psubscribe(String channelName, Codec codec, RedisPubSubListener listener, AsyncSemaphore semaphore) { Promise promise = newPromise(); subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE, semaphore); return promise; } - public Future subscribe(Codec codec, String channelName, RedisPubSubListener listener) { - Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)]; - lock.acquireUninterruptibly(); - return subscribe(codec, channelName, listener, lock); + public Future subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener) { + final AsyncSemaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)]; + final Promise result = newPromise(); + lock.acquire(new Runnable() { + @Override + public void run() { + Future future = subscribe(codec, channelName, listener, lock); + future.addListener(new TransferListener(result)); + } + }); + return result; } - public Future subscribe(Codec codec, String channelName, RedisPubSubListener listener, Semaphore semaphore) { + public Future subscribe(Codec codec, String channelName, RedisPubSubListener listener, AsyncSemaphore semaphore) { Promise promise = newPromise(); subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore); return promise; } - public Semaphore getSemaphore(String channelName) { + public AsyncSemaphore getSemaphore(String channelName) { return locks[Math.abs(channelName.hashCode() % locks.length)]; } - private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise, PubSubType type, final Semaphore lock) { + private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, + final Promise promise, PubSubType type, final AsyncSemaphore lock) { final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { сonnEntry.addListener(channelName, listener); сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - promise.setSuccess(сonnEntry); lock.release(); + promise.trySuccess(сonnEntry); } }); return; @@ -373,8 +390,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - promise.setSuccess(oldEntry); lock.release(); + promise.trySuccess(oldEntry); } }); return; @@ -389,8 +406,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - promise.setSuccess(freeEntry); lock.release(); + promise.trySuccess(freeEntry); } }); @@ -402,18 +419,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener, - final Promise promise, final PubSubType type, final Semaphore lock) { - final int slot = 0; + private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener, + final Promise promise, final PubSubType type, final AsyncSemaphore lock) { + final int slot = calcSlot(channelName); Future connFuture = nextPubSubConnection(slot); connFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - promise.setFailure(future.cause()); freePubSubLock.release(); lock.release(); + promise.tryFailure(future.cause()); return; } @@ -432,8 +449,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - promise.setSuccess(oldEntry); lock.release(); + promise.trySuccess(oldEntry); } }); return; @@ -446,8 +463,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { entry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - promise.setSuccess(entry); lock.release(); + promise.trySuccess(entry); } }); @@ -461,7 +478,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { }); } - public Codec unsubscribe(final String channelName, final Semaphore lock) { + public Codec unsubscribe(final String channelName, final AsyncSemaphore lock) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { lock.release(); @@ -503,7 +520,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return entryCodec; } - public Codec punsubscribe(final String channelName, final Semaphore lock) { + public Codec punsubscribe(final String channelName, final AsyncSemaphore lock) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { lock.release(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 394d1409e..96e87073d 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -22,7 +22,6 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.api.NodeType; @@ -39,6 +38,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.balancer.LoadBalancerManagerImpl; import org.redisson.connection.pool.MasterConnectionPool; +import org.redisson.pubsub.AsyncSemaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,31 +163,25 @@ public class MasterSlaveEntry { private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) { for (String channelName : redisPubSubConnection.getChannels().keySet()) { - Semaphore semaphore = connectionManager.getSemaphore(channelName); - semaphore.acquireUninterruptibly(); - PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); Collection listeners = pubSubEntry.getListeners(channelName); - reattachPubSubListeners(channelName, listeners, semaphore); + reattachPubSubListeners(channelName, listeners); } for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) { - Semaphore semaphore = connectionManager.getSemaphore(channelName); - semaphore.acquireUninterruptibly(); - PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); Collection listeners = pubSubEntry.getListeners(channelName); - reattachPatternPubSubListeners(channelName, listeners, semaphore); + reattachPatternPubSubListeners(channelName, listeners); } } - private void reattachPubSubListeners(final String channelName, final Collection listeners, Semaphore semaphore) { + private void reattachPubSubListeners(final String channelName, final Collection listeners) { Codec subscribeCodec = connectionManager.unsubscribe(channelName); if (listeners.isEmpty()) { return; } - Future subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null, semaphore); + Future subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null); subscribeFuture.addListener(new FutureListener() { @Override @@ -207,10 +201,10 @@ public class MasterSlaveEntry { } private void reattachPatternPubSubListeners(final String channelName, - final Collection listeners, Semaphore semaphore) { + final Collection listeners) { Codec subscribeCodec = connectionManager.punsubscribe(channelName); if (!listeners.isEmpty()) { - Future future = connectionManager.psubscribe(channelName, subscribeCodec, null, semaphore); + Future future = connectionManager.psubscribe(channelName, subscribeCodec, null); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) diff --git a/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/src/main/java/org/redisson/pubsub/AsyncSemaphore.java new file mode 100644 index 000000000..49f3d29b2 --- /dev/null +++ b/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -0,0 +1,90 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.pubsub; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; + +/** + * + * @author Nikita Koksharov + * + */ +public class AsyncSemaphore { + + private int counter; + private final Queue listeners = new LinkedList(); + + public AsyncSemaphore(int permits) { + counter = permits; + } + + public void acquireUninterruptibly() { + final CountDownLatch latch = new CountDownLatch(1); + acquire(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public void acquire(Runnable listener) { + boolean run = false; + + synchronized (this) { + if (counter == 0) { + listeners.add(listener); + return; + } + if (counter > 0) { + counter--; + run = true; + } + } + + if (run) { + listener.run(); + } + } + + public boolean remove(Runnable listener) { + synchronized (this) { + return listeners.remove(listener); + } + } + + public void release() { + Runnable runnable = null; + + synchronized (this) { + counter++; + runnable = listeners.poll(); + } + + if (runnable != null) { + acquire(runnable); + } + } + +} diff --git a/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/src/main/java/org/redisson/pubsub/PublishSubscribe.java index bcb7f3444..eeb60fd75 100644 --- a/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -16,7 +16,7 @@ package org.redisson.pubsub; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; import org.redisson.PubSubEntry; import org.redisson.client.BaseRedisPubSubListener; @@ -24,6 +24,7 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.connection.ConnectionManager; +import org.redisson.misc.PromiseDelegator; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; @@ -33,49 +34,71 @@ abstract class PublishSubscribe> { private final ConcurrentMap entries = PlatformDependent.newConcurrentHashMap(); - public void unsubscribe(E entry, String entryName, String channelName, ConnectionManager connectionManager) { - Semaphore semaphore = connectionManager.getSemaphore(channelName); - semaphore.acquireUninterruptibly(); - if (entry.release() == 0) { - // just an assertion - boolean removed = entries.remove(entryName) == entry; - if (!removed) { - throw new IllegalStateException(); + public void unsubscribe(final E entry, final String entryName, final String channelName, final ConnectionManager connectionManager) { + final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName); + semaphore.acquire(new Runnable() { + @Override + public void run() { + if (entry.release() == 0) { + // just an assertion + boolean removed = entries.remove(entryName) == entry; + if (!removed) { + throw new IllegalStateException(); + } + connectionManager.unsubscribe(channelName, semaphore); + } else { + semaphore.release(); + } } - connectionManager.unsubscribe(channelName, semaphore); - } else { - semaphore.release(); - } + }); + } public E getEntry(String entryName) { return entries.get(entryName); } - public Future subscribe(String entryName, String channelName, ConnectionManager connectionManager) { - Semaphore semaphore = connectionManager.getSemaphore(channelName); - semaphore.acquireUninterruptibly(); - E entry = entries.get(entryName); - if (entry != null) { - entry.aquire(); - semaphore.release(); - return entry.getPromise(); + public Future subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) { + final AtomicReference listenerHolder = new AtomicReference(); + final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName); + final Promise newPromise = new PromiseDelegator(connectionManager.newPromise()) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return semaphore.remove(listenerHolder.get()); } - - Promise newPromise = connectionManager.newPromise(); - E value = createEntry(newPromise); - value.aquire(); - - E oldValue = entries.putIfAbsent(entryName, value); - if (oldValue != null) { - oldValue.aquire(); - semaphore.release(); - return oldValue.getPromise(); + }; + + Runnable listener = new Runnable() { + + @Override + public void run() { + E entry = entries.get(entryName); + if (entry != null) { + entry.aquire(); + semaphore.release(); + entry.getPromise().addListener(new TransferListener(newPromise)); + return; + } + + E value = createEntry(newPromise); + value.aquire(); + + E oldValue = entries.putIfAbsent(entryName, value); + if (oldValue != null) { + oldValue.aquire(); + semaphore.release(); + oldValue.getPromise().addListener(new TransferListener(newPromise)); + return; + } + + RedisPubSubListener listener = createListener(channelName, value); + connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore); } - - RedisPubSubListener listener = createListener(channelName, value); - connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore); - return newPromise; + }; + semaphore.acquire(listener); + listenerHolder.set(listener); + + return newPromise; } protected abstract E createEntry(Promise newPromise); diff --git a/src/main/java/org/redisson/pubsub/TransferListener.java b/src/main/java/org/redisson/pubsub/TransferListener.java new file mode 100644 index 000000000..d1cfb0031 --- /dev/null +++ b/src/main/java/org/redisson/pubsub/TransferListener.java @@ -0,0 +1,47 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.pubsub; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + +/** + * + * @author Nikita Koksharov + * + * @param + */ +public class TransferListener implements FutureListener { + + private Promise promise; + + public TransferListener(Promise promise) { + super(); + this.promise = promise; + } + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(future.getNow()); + } + +} diff --git a/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java b/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java index ed534ce74..e10269ece 100644 --- a/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java @@ -17,7 +17,6 @@ package org.redisson.reactive; import java.util.Collections; import java.util.List; -import java.util.concurrent.Semaphore; import org.reactivestreams.Publisher; import org.redisson.PubSubPatternMessageListener; @@ -29,6 +28,7 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.command.CommandReactiveExecutor; import org.redisson.connection.PubSubConnectionEntry; +import org.redisson.pubsub.AsyncSemaphore; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -89,7 +89,7 @@ public class RedissonPatternTopicReactive implements RPatternTopicReactive @Override public void removeListener(int listenerId) { - Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); semaphore.acquireUninterruptibly(); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); diff --git a/src/main/java/org/redisson/reactive/RedissonTopicReactive.java b/src/main/java/org/redisson/reactive/RedissonTopicReactive.java index 63f71b0a0..e9ef927e0 100644 --- a/src/main/java/org/redisson/reactive/RedissonTopicReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonTopicReactive.java @@ -17,7 +17,6 @@ package org.redisson.reactive; import java.util.Collections; import java.util.List; -import java.util.concurrent.Semaphore; import org.reactivestreams.Publisher; import org.redisson.PubSubMessageListener; @@ -30,7 +29,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; import org.redisson.connection.PubSubConnectionEntry; -import org.redisson.misc.ReclosableLatch; +import org.redisson.pubsub.AsyncSemaphore; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -100,7 +99,7 @@ public class RedissonTopicReactive implements RTopicReactive { @Override public void removeListener(int listenerId) { - Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); semaphore.acquireUninterruptibly(); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); diff --git a/src/test/java/org/redisson/RedissonLockHeavyTest.java b/src/test/java/org/redisson/RedissonLockHeavyTest.java index 708f0eaad..f64c9eccd 100644 --- a/src/test/java/org/redisson/RedissonLockHeavyTest.java +++ b/src/test/java/org/redisson/RedissonLockHeavyTest.java @@ -4,6 +4,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.junit.FixMethodOrder; @@ -68,5 +69,46 @@ public class RedissonLockHeavyTest extends BaseTest { executor.awaitTermination(threads * loops, TimeUnit.SECONDS); } + + @Test + public void tryLockUnlockRLock() throws Exception { + for (int i = 0; i < threads; i++) { + + Runnable worker = new Runnable() { + + @Override + public void run() { + for (int j = 0; j < loops; j++) { + RLock lock = redisson.getLock("RLOCK_" + j); + try { + if (lock.tryLock(ThreadLocalRandom.current().nextInt(10), TimeUnit.MILLISECONDS)) { + try { + RBucket bucket = redisson.getBucket("RBUCKET_" + j); + bucket.set("TEST", 30, TimeUnit.SECONDS); + RSemaphore semaphore = redisson.getSemaphore("SEMAPHORE_" + j); + semaphore.release(); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } finally { + lock.unlock(); + } + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + }; + executor.execute(worker); + } + executor.shutdown(); + executor.awaitTermination(threads * loops, TimeUnit.SECONDS); + + } + } \ No newline at end of file