Cancel unsuccessful subscription and remove AsyncSemaphore listener. #543

pull/574/merge
Nikita 9 years ago
parent 2c3b9337f2
commit f7e994975e

@ -266,14 +266,16 @@ public class RedissonLock extends RedissonExpirable implements RLock {
final long threadId = Thread.currentThread().getId();
Future<RedissonLockEntry> future = subscribe(threadId);
if (!await(future, time, TimeUnit.MILLISECONDS)) {
future.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (future.isSuccess()) {
unsubscribe(future, threadId);
if (!future.cancel(false)) {
future.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (future.isSuccess()) {
unsubscribe(future, threadId);
}
}
}
});
});
}
return false;
}
@ -639,6 +641,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void run() {
if (!subscribeFuture.isDone()) {
subscribeFuture.cancel(false);
result.trySuccess(false);
}
}

@ -65,7 +65,12 @@ public class AsyncSemaphore {
if (run) {
listener.run();
return;
}
}
public boolean remove(Runnable listener) {
synchronized (this) {
return listeners.remove(listener);
}
}

@ -16,6 +16,7 @@
package org.redisson.pubsub;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.PubSubEntry;
import org.redisson.client.BaseRedisPubSubListener;
@ -23,6 +24,7 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.misc.PromiseDelegator;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
@ -57,10 +59,16 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
}
public Future<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
final Promise<E> newPromise = connectionManager.newPromise();
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
final Promise<E> newPromise = new PromiseDelegator<E>(connectionManager.newPromise()) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};
Runnable listener = new Runnable() {
@Override
public void run() {
@ -86,7 +94,9 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
RedisPubSubListener<Object> listener = createListener(channelName, value);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore);
}
});
};
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}

@ -4,6 +4,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.junit.FixMethodOrder;
@ -68,5 +69,46 @@ public class RedissonLockHeavyTest extends BaseTest {
executor.awaitTermination(threads * loops, TimeUnit.SECONDS);
}
@Test
public void tryLockUnlockRLock() throws Exception {
for (int i = 0; i < threads; i++) {
Runnable worker = new Runnable() {
@Override
public void run() {
for (int j = 0; j < loops; j++) {
RLock lock = redisson.getLock("RLOCK_" + j);
try {
if (lock.tryLock(ThreadLocalRandom.current().nextInt(10), TimeUnit.MILLISECONDS)) {
try {
RBucket<String> bucket = redisson.getBucket("RBUCKET_" + j);
bucket.set("TEST", 30, TimeUnit.SECONDS);
RSemaphore semaphore = redisson.getSemaphore("SEMAPHORE_" + j);
semaphore.release();
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
};
executor.execute(worker);
}
executor.shutdown();
executor.awaitTermination(threads * loops, TimeUnit.SECONDS);
}
}
Loading…
Cancel
Save