Merge branch '2.2.x'

# Conflicts:
#	src/main/java/org/redisson/RedissonPatternTopic.java
#	src/main/java/org/redisson/RedissonTopic.java
#	src/main/java/org/redisson/connection/MasterSlaveEntry.java
#	src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java
#	src/main/java/org/redisson/reactive/RedissonTopicReactive.java
pull/574/merge
Nikita 9 years ago
commit 7651ceedb1

@ -266,14 +266,16 @@ public class RedissonLock extends RedissonExpirable implements RLock {
final long threadId = Thread.currentThread().getId();
Future<RedissonLockEntry> future = subscribe(threadId);
if (!await(future, time, TimeUnit.MILLISECONDS)) {
future.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (future.isSuccess()) {
unsubscribe(future, threadId);
if (!future.cancel(false)) {
future.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (future.isSuccess()) {
unsubscribe(future, threadId);
}
}
}
});
});
}
return false;
}
@ -639,6 +641,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void run() {
if (!subscribeFuture.isDone()) {
subscribeFuture.cancel(false);
result.trySuccess(false);
}
}

@ -17,7 +17,6 @@ package org.redisson;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.redisson.api.PatternMessageListener;
import org.redisson.api.PatternStatusListener;
@ -26,6 +25,7 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.pubsub.AsyncSemaphore;
import io.netty.util.concurrent.Future;
@ -71,7 +71,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public void removeListener(int listenerId) {
Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);

@ -17,7 +17,6 @@ package org.redisson;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.redisson.api.MessageListener;
import org.redisson.api.RTopic;
@ -26,8 +25,8 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.pubsub.AsyncSemaphore;
import io.netty.util.concurrent.Future;
@ -87,7 +86,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public void removeListener(int listenerId) {
Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);

@ -19,7 +19,6 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.redisson.api.NodeType;
@ -30,6 +29,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.pubsub.AsyncSemaphore;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timeout;
@ -48,7 +48,7 @@ public interface ConnectionManager {
boolean isClusterMode();
Semaphore getSemaphore(String channelName);
AsyncSemaphore getSemaphore(String channelName);
<R> Future<R> newSucceededFuture(R value);
@ -60,7 +60,7 @@ public interface ConnectionManager {
Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener);
Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, final RedisPubSubListener<?> listener, Semaphore semaphore);
Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, AsyncSemaphore semaphore);
ConnectionInitializer getConnectListener();
@ -102,15 +102,15 @@ public interface ConnectionManager {
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener);
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener, Semaphore semaphore);
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener, AsyncSemaphore semaphore);
Codec unsubscribe(final String channelName, Semaphore lock);
Codec unsubscribe(String channelName, AsyncSemaphore lock);
Codec unsubscribe(String channelName);
Codec punsubscribe(String channelName);
Codec punsubscribe(final String channelName, Semaphore lock);
Codec punsubscribe(String channelName, AsyncSemaphore lock);
void shutdown();

@ -49,6 +49,8 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.TransferListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -136,13 +138,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub();
private final Semaphore[] locks = new Semaphore[50];
private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
private final Semaphore freePubSubLock = new Semaphore(1);
{
for (int i = 0; i < locks.length; i++) {
locks[i] = new Semaphore(1);
locks[i] = new AsyncSemaphore(1);
}
}
@ -310,43 +312,58 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener) {
Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)];
lock.acquireUninterruptibly();
return psubscribe(channelName, codec, listener, lock);
public Future<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?> listener) {
final AsyncSemaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)];
final Promise<PubSubConnectionEntry> result = newPromise();
lock.acquire(new Runnable() {
@Override
public void run() {
Future<PubSubConnectionEntry> future = psubscribe(channelName, codec, listener, lock);
future.addListener(new TransferListener<PubSubConnectionEntry>(result));
}
});
return result;
}
public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener, Semaphore semaphore) {
public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) {
Promise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE, semaphore);
return promise;
}
public Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener) {
Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)];
lock.acquireUninterruptibly();
return subscribe(codec, channelName, listener, lock);
public Future<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener) {
final AsyncSemaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)];
final Promise<PubSubConnectionEntry> result = newPromise();
lock.acquire(new Runnable() {
@Override
public void run() {
Future<PubSubConnectionEntry> future = subscribe(codec, channelName, listener, lock);
future.addListener(new TransferListener<PubSubConnectionEntry>(result));
}
});
return result;
}
public Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, Semaphore semaphore) {
public Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) {
Promise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore);
return promise;
}
public Semaphore getSemaphore(String channelName) {
public AsyncSemaphore getSemaphore(String channelName) {
return locks[Math.abs(channelName.hashCode() % locks.length)];
}
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise, PubSubType type, final Semaphore lock) {
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
final Promise<PubSubConnectionEntry> promise, PubSubType type, final AsyncSemaphore lock) {
final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
сonnEntry.addListener(channelName, listener);
сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(сonnEntry);
lock.release();
promise.trySuccess(сonnEntry);
}
});
return;
@ -373,8 +390,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
lock.release();
promise.trySuccess(oldEntry);
}
});
return;
@ -389,8 +406,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(freeEntry);
lock.release();
promise.trySuccess(freeEntry);
}
});
@ -402,18 +419,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener,
final Promise<PubSubConnectionEntry> promise, final PubSubType type, final Semaphore lock) {
final int slot = 0;
private void connect(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
final Promise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
final int slot = calcSlot(channelName);
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
freePubSubLock.release();
lock.release();
promise.tryFailure(future.cause());
return;
}
@ -432,8 +449,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
lock.release();
promise.trySuccess(oldEntry);
}
});
return;
@ -446,8 +463,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(entry);
lock.release();
promise.trySuccess(entry);
}
});
@ -461,7 +478,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
});
}
public Codec unsubscribe(final String channelName, final Semaphore lock) {
public Codec unsubscribe(final String channelName, final AsyncSemaphore lock) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
@ -503,7 +520,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return entryCodec;
}
public Codec punsubscribe(final String channelName, final Semaphore lock) {
public Codec punsubscribe(final String channelName, final AsyncSemaphore lock) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();

@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.NodeType;
@ -39,6 +38,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManagerImpl;
import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -163,31 +163,25 @@ public class MasterSlaveEntry {
private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
for (String channelName : redisPubSubConnection.getChannels().keySet()) {
Semaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners, semaphore);
reattachPubSubListeners(channelName, listeners);
}
for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
Semaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPatternPubSubListeners(channelName, listeners, semaphore);
reattachPatternPubSubListeners(channelName, listeners);
}
}
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener> listeners, Semaphore semaphore) {
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener> listeners) {
Codec subscribeCodec = connectionManager.unsubscribe(channelName);
if (listeners.isEmpty()) {
return;
}
Future<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null, semaphore);
Future<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null);
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
@ -207,10 +201,10 @@ public class MasterSlaveEntry {
}
private void reattachPatternPubSubListeners(final String channelName,
final Collection<RedisPubSubListener> listeners, Semaphore semaphore) {
final Collection<RedisPubSubListener> listeners) {
Codec subscribeCodec = connectionManager.punsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec, null, semaphore);
Future<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec, null);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)

@ -0,0 +1,90 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.pubsub;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
/**
*
* @author Nikita Koksharov
*
*/
public class AsyncSemaphore {
private int counter;
private final Queue<Runnable> listeners = new LinkedList<Runnable>();
public AsyncSemaphore(int permits) {
counter = permits;
}
public void acquireUninterruptibly() {
final CountDownLatch latch = new CountDownLatch(1);
acquire(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void acquire(Runnable listener) {
boolean run = false;
synchronized (this) {
if (counter == 0) {
listeners.add(listener);
return;
}
if (counter > 0) {
counter--;
run = true;
}
}
if (run) {
listener.run();
}
}
public boolean remove(Runnable listener) {
synchronized (this) {
return listeners.remove(listener);
}
}
public void release() {
Runnable runnable = null;
synchronized (this) {
counter++;
runnable = listeners.poll();
}
if (runnable != null) {
acquire(runnable);
}
}
}

@ -16,7 +16,7 @@
package org.redisson.pubsub;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.PubSubEntry;
import org.redisson.client.BaseRedisPubSubListener;
@ -24,6 +24,7 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.misc.PromiseDelegator;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
@ -33,49 +34,71 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
private final ConcurrentMap<String, E> entries = PlatformDependent.newConcurrentHashMap();
public void unsubscribe(E entry, String entryName, String channelName, ConnectionManager connectionManager) {
Semaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquireUninterruptibly();
if (entry.release() == 0) {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (!removed) {
throw new IllegalStateException();
public void unsubscribe(final E entry, final String entryName, final String channelName, final ConnectionManager connectionManager) {
final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {
if (entry.release() == 0) {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (!removed) {
throw new IllegalStateException();
}
connectionManager.unsubscribe(channelName, semaphore);
} else {
semaphore.release();
}
}
connectionManager.unsubscribe(channelName, semaphore);
} else {
semaphore.release();
}
});
}
public E getEntry(String entryName) {
return entries.get(entryName);
}
public Future<E> subscribe(String entryName, String channelName, ConnectionManager connectionManager) {
Semaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquireUninterruptibly();
E entry = entries.get(entryName);
if (entry != null) {
entry.aquire();
semaphore.release();
return entry.getPromise();
public Future<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
final Promise<E> newPromise = new PromiseDelegator<E>(connectionManager.<E>newPromise()) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
Promise<E> newPromise = connectionManager.newPromise();
E value = createEntry(newPromise);
value.aquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.aquire();
semaphore.release();
return oldValue.getPromise();
};
Runnable listener = new Runnable() {
@Override
public void run() {
E entry = entries.get(entryName);
if (entry != null) {
entry.aquire();
semaphore.release();
entry.getPromise().addListener(new TransferListener<E>(newPromise));
return;
}
E value = createEntry(newPromise);
value.aquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.aquire();
semaphore.release();
oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore);
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore);
return newPromise;
};
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}
protected abstract E createEntry(Promise<E> newPromise);

@ -0,0 +1,47 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.pubsub;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/**
*
* @author Nikita Koksharov
*
* @param <T>
*/
public class TransferListener<T> implements FutureListener<T> {
private Promise<T> promise;
public TransferListener(Promise<T> promise) {
super();
this.promise = promise;
}
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(future.getNow());
}
}

@ -17,7 +17,6 @@ package org.redisson.reactive;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.reactivestreams.Publisher;
import org.redisson.PubSubPatternMessageListener;
@ -29,6 +28,7 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.pubsub.AsyncSemaphore;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -89,7 +89,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
@Override
public void removeListener(int listenerId) {
Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);

@ -17,7 +17,6 @@ package org.redisson.reactive;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.reactivestreams.Publisher;
import org.redisson.PubSubMessageListener;
@ -30,7 +29,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.misc.ReclosableLatch;
import org.redisson.pubsub.AsyncSemaphore;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -100,7 +99,7 @@ public class RedissonTopicReactive<M> implements RTopicReactive<M> {
@Override
public void removeListener(int listenerId) {
Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);

@ -4,6 +4,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.junit.FixMethodOrder;
@ -68,5 +69,46 @@ public class RedissonLockHeavyTest extends BaseTest {
executor.awaitTermination(threads * loops, TimeUnit.SECONDS);
}
@Test
public void tryLockUnlockRLock() throws Exception {
for (int i = 0; i < threads; i++) {
Runnable worker = new Runnable() {
@Override
public void run() {
for (int j = 0; j < loops; j++) {
RLock lock = redisson.getLock("RLOCK_" + j);
try {
if (lock.tryLock(ThreadLocalRandom.current().nextInt(10), TimeUnit.MILLISECONDS)) {
try {
RBucket<String> bucket = redisson.getBucket("RBUCKET_" + j);
bucket.set("TEST", 30, TimeUnit.SECONDS);
RSemaphore semaphore = redisson.getSemaphore("SEMAPHORE_" + j);
semaphore.release();
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
};
executor.execute(worker);
}
executor.shutdown();
executor.awaitTermination(threads * loops, TimeUnit.SECONDS);
}
}
Loading…
Cancel
Save