diff --git a/src/main/java/org/redisson/PubSubEntry.java b/src/main/java/org/redisson/PubSubEntry.java new file mode 100644 index 000000000..5ebcd211d --- /dev/null +++ b/src/main/java/org/redisson/PubSubEntry.java @@ -0,0 +1,13 @@ +package org.redisson; + +import io.netty.util.concurrent.Promise; + +public interface PubSubEntry { + + void aquire(); + + int release(); + + Promise getPromise(); + +} diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index 6588440c4..6534f32d4 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -15,22 +15,17 @@ */ package org.redisson; -import java.util.Collections; +import java.util.Arrays; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import org.redisson.client.BaseRedisPubSubListener; -import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandAsyncExecutor; import org.redisson.core.RCountDownLatch; +import org.redisson.pubsub.CountDownLatchPubSub; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; -import io.netty.util.internal.PlatformDependent; /** * Distributed alternative to the {@link java.util.concurrent.CountDownLatch} @@ -43,10 +38,10 @@ import io.netty.util.internal.PlatformDependent; */ public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch { - private static final Long zeroCountMessage = 0L; - private static final Long newCountMessage = 1L; + public static final Long zeroCountMessage = 0L; + public static final Long newCountMessage = 1L; - private static final ConcurrentMap ENTRIES = PlatformDependent.newConcurrentHashMap(); + private static final CountDownLatchPubSub PUBSUB = new CountDownLatchPubSub(); private final UUID id; @@ -55,72 +50,6 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown this.id = id; } - private Future subscribe() { - synchronized (ENTRIES) { - RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); - if (entry != null) { - entry.aquire(); - return entry.getPromise(); - } - - Promise newPromise = newPromise(); - final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise); - value.aquire(); - - RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value); - if (oldValue != null) { - oldValue.aquire(); - return oldValue.getPromise(); - } - - RedisPubSubListener listener = createListener(value); - commandExecutor.getConnectionManager().subscribe(LongCodec.INSTANCE, getChannelName(), listener); - return newPromise; - } - } - - private RedisPubSubListener createListener(final RedissonCountDownLatchEntry value) { - RedisPubSubListener listener = new BaseRedisPubSubListener() { - - @Override - public void onMessage(String channel, Long message) { - if (!getChannelName().equals(channel)) { - return; - } - if (message.equals(zeroCountMessage)) { - value.getLatch().open(); - } - if (message.equals(newCountMessage)) { - value.getLatch().close(); - } - } - - @Override - public boolean onStatus(PubSubType type, String channel) { - if (channel.equals(getChannelName()) - && type == PubSubType.SUBSCRIBE) { - value.getPromise().trySuccess(value); - return true; - } - return false; - } - - }; - return listener; - } - - private void unsubscribe(RedissonCountDownLatchEntry entry) { - synchronized (ENTRIES) { - if (entry.release() == 0) { - // just an assertion - boolean removed = ENTRIES.remove(getEntryName()) == entry; - if (removed) { - commandExecutor.getConnectionManager().unsubscribe(getChannelName()); - } - } - } - } - public void await() throws InterruptedException { Future promise = subscribe(); try { @@ -128,17 +57,16 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown while (getCountInner() > 0) { // waiting for open state - RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); + RedissonCountDownLatchEntry entry = getEntry(); if (entry != null) { entry.getLatch().await(); } } } finally { - unsubscribe(promise.getNow()); + unsubscribe(promise); } } - @Override public boolean await(long time, TimeUnit unit) throws InterruptedException { Future promise = subscribe(); @@ -154,7 +82,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } long current = System.currentTimeMillis(); // waiting for open state - RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName()); + RedissonCountDownLatchEntry entry = getEntry(); if (entry != null) { entry.getLatch().await(time, TimeUnit.MILLISECONDS); } @@ -165,18 +93,30 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return true; } finally { - unsubscribe(promise.getNow()); + unsubscribe(promise); } } + private RedissonCountDownLatchEntry getEntry() { + return PUBSUB.getEntry(getEntryName()); + } + + private Future subscribe() { + return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager()); + } + + private void unsubscribe(Future future) { + PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager()); + } + @Override public void countDown() { - Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1, + Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local v = redis.call('decr', KEYS[1]);" + "if v <= 0 then redis.call('del', KEYS[1]) end;" + - "if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" + + "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;" + "return true", - Collections.singletonList(getName()), zeroCountMessage, getChannelName()); + Arrays.asList(getName(), getChannelName()), zeroCountMessage); get(f); } @@ -185,7 +125,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown } private String getChannelName() { - return "redisson_countdownlatch_{" + getName() + "}"; + return "redisson_countdownlatch__channel__{" + getName() + "}"; } @Override @@ -204,28 +144,28 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown @Override public boolean trySetCount(long count) { - Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1, + Future f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[1]) == 0 then " + "redis.call('set', KEYS[1], ARGV[2]); " - + "redis.call('publish', ARGV[3], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + "return true " + "else " + "return false " + "end", - Collections.singletonList(getName()), newCountMessage, count, getChannelName()); + Arrays.asList(getName(), getChannelName()), newCountMessage, count); return get(f); } @Override public Future deleteAsync() { - return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1, + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('del', KEYS[1]) == 1 then " - + "redis.call('publish', ARGV[2], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + "return true " + "else " + "return false " + "end", - Collections.singletonList(getName()), newCountMessage, getChannelName()); + Arrays.asList(getName(), getChannelName()), newCountMessage); } } diff --git a/src/main/java/org/redisson/RedissonCountDownLatchEntry.java b/src/main/java/org/redisson/RedissonCountDownLatchEntry.java index 181d173f1..3f6197098 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatchEntry.java +++ b/src/main/java/org/redisson/RedissonCountDownLatchEntry.java @@ -19,7 +19,7 @@ import org.redisson.misc.ReclosableLatch; import io.netty.util.concurrent.Promise; -public class RedissonCountDownLatchEntry { +public class RedissonCountDownLatchEntry implements PubSubEntry { private int counter; diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 86d586f51..f8cee1a00 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -15,24 +15,22 @@ */ package org.redisson; +import java.util.Arrays; import java.util.Collections; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; -import org.redisson.client.BaseRedisPubSubListener; -import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandExecutor; import org.redisson.core.RLock; +import org.redisson.pubsub.LockPubSub; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; /** @@ -49,13 +47,13 @@ public class RedissonLock extends RedissonExpirable implements RLock { private static final ConcurrentMap refreshTaskMap = PlatformDependent.newConcurrentHashMap(); protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS); - private final UUID id; + final UUID id; - private static final Long unlockMessage = 0L; + public static final Long unlockMessage = 0L; - private static final ConcurrentMap ENTRIES = PlatformDependent.newConcurrentHashMap(); + private static final LockPubSub PUBSUB = new LockPubSub(); - private final CommandExecutor commandExecutor; + final CommandExecutor commandExecutor; protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name); @@ -63,68 +61,12 @@ public class RedissonLock extends RedissonExpirable implements RLock { this.id = id; } - private void unsubscribe(RedissonLockEntry entry) { - synchronized (ENTRIES) { - if (entry.release() == 0) { - // just an assertion - boolean removed = ENTRIES.remove(getEntryName()) == entry; - if (removed) { - commandExecutor.getConnectionManager().unsubscribe(getChannelName()); - } - } - } - } - private String getEntryName() { return id + ":" + getName(); } - private Future subscribe() { - synchronized (ENTRIES) { - RedissonLockEntry entry = ENTRIES.get(getEntryName()); - if (entry != null) { - entry.aquire(); - return entry.getPromise(); - } - - Promise newPromise = newPromise(); - final RedissonLockEntry value = new RedissonLockEntry(newPromise); - value.aquire(); - - RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value); - if (oldValue != null) { - oldValue.aquire(); - return oldValue.getPromise(); - } - - RedisPubSubListener listener = new BaseRedisPubSubListener() { - - @Override - public void onMessage(String channel, Long message) { - if (message.equals(unlockMessage) && getChannelName().equals(channel)) { - value.getLatch().release(); - } - } - - @Override - public boolean onStatus(PubSubType type, String channel) { - if (channel.equals(getChannelName()) - && type == PubSubType.SUBSCRIBE) { - value.getPromise().trySuccess(value); - return true; - } - return false; - } - - }; - - commandExecutor.getConnectionManager().subscribe(LongCodec.INSTANCE, getChannelName(), listener); - return newPromise; - } - } - - private String getChannelName() { - return "redisson__lock__channel__{" + getName() + "}"; + String getChannelName() { + return "redisson_lock__channel__{" + getName() + "}"; } @Override @@ -180,7 +122,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { } // waiting for message - RedissonLockEntry entry = ENTRIES.get(getEntryName()); + RedissonLockEntry entry = getEntry(); if (ttl >= 0) { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { @@ -188,7 +130,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { } } } finally { - unsubscribe(future.getNow()); + unsubscribe(future); } } @@ -229,7 +171,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { * Stop refresh timer * @return true if timer was stopped successfully */ - private void stopRefreshTask() { + void stopRefreshTask() { Timeout task = refreshTaskMap.remove(getName()); if (task != null) { task.cancel(); @@ -237,7 +179,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { } - private Long tryLockInner(final long leaseTime, final TimeUnit unit) { + Long tryLockInner(final long leaseTime, final TimeUnit unit) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, @@ -248,7 +190,8 @@ public class RedissonLock extends RedissonExpirable implements RLock { "else " + " local o = cjson.decode(v); " + " if (o['o'] == ARGV[1]) then " + - " o['c'] = o['c'] + 1; redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]); " + + " o['c'] = o['c'] + 1; " + + " redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]); " + " return nil; " + " end;" + " return redis.call('pttl', KEYS[1]); " + @@ -292,7 +235,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { // waiting for message long current = System.currentTimeMillis(); - RedissonLockEntry entry = ENTRIES.get(getEntryName()); + RedissonLockEntry entry = getEntry(); if (ttl >= 0 && ttl < time) { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); @@ -305,10 +248,22 @@ public class RedissonLock extends RedissonExpirable implements RLock { } return true; } finally { - unsubscribe(future.getNow()); + unsubscribe(future); } } + private RedissonLockEntry getEntry() { + return PUBSUB.getEntry(getEntryName()); + } + + private Future subscribe() { + return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager()); + } + + private void unsubscribe(Future future) { + PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager()); + } + @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryLock(time, -1, unit); @@ -316,10 +271,10 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public void unlock() { - Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R2, + Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local v = redis.call('get', KEYS[1]); " + "if (v == false) then " + - " redis.call('publish', ARGV[4], ARGV[2]); " + + " redis.call('publish', KEYS[2], ARGV[2]); " + " return true; " + "else " + " local o = cjson.decode(v); " + @@ -330,13 +285,13 @@ public class RedissonLock extends RedissonExpirable implements RLock { " return false;"+ " else " + " redis.call('del', KEYS[1]);" + - " redis.call('publish', ARGV[4], ARGV[2]); " + + " redis.call('publish', KEYS[2], ARGV[2]); " + " return true;"+ " end" + " end;" + " return nil; " + "end", - Collections.singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId(), unlockMessage, internalLockLeaseTime, getChannelName()); + Arrays.asList(getName(), getChannelName()), id.toString() + "-" + Thread.currentThread().getId(), unlockMessage, internalLockLeaseTime); if (opStatus == null) { throw new IllegalStateException("Can't unlock lock Current id: " + id + " thread-id: " + Thread.currentThread().getId()); @@ -357,21 +312,21 @@ public class RedissonLock extends RedissonExpirable implements RLock { get(forceUnlockAsync()); } - private Future forceUnlockAsync() { + Future forceUnlockAsync() { stopRefreshTask(); - return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1, + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " - + "redis.call('publish', ARGV[2], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + "return true " + "else " + "return false " + "end", - Collections.singletonList(getName()), unlockMessage, getChannelName()); + Arrays.asList(getName(), getChannelName()), unlockMessage); } @Override public boolean isLocked() { - return commandExecutor.read(getName(), RedisCommands.EXISTS, getName()); + return isExists(); } @Override diff --git a/src/main/java/org/redisson/RedissonLockEntry.java b/src/main/java/org/redisson/RedissonLockEntry.java index ca56333ab..bee4080e9 100644 --- a/src/main/java/org/redisson/RedissonLockEntry.java +++ b/src/main/java/org/redisson/RedissonLockEntry.java @@ -19,19 +19,13 @@ import io.netty.util.concurrent.Promise; import java.util.concurrent.Semaphore; -public class RedissonLockEntry { +public class RedissonLockEntry implements PubSubEntry { private int counter; private final Semaphore latch; private final Promise promise; - public RedissonLockEntry(RedissonLockEntry source) { - counter = source.counter; - latch = source.latch; - promise = source.promise; - } - public RedissonLockEntry(Promise promise) { super(); this.latch = new Semaphore(0); @@ -54,26 +48,4 @@ public class RedissonLockEntry { return latch; } - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + counter; - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - RedissonLockEntry other = (RedissonLockEntry) obj; - if (counter != other.counter) - return false; - return true; - } - } diff --git a/src/main/java/org/redisson/core/RLock.java b/src/main/java/org/redisson/core/RLock.java index c4a725ed8..e3acd7ce4 100644 --- a/src/main/java/org/redisson/core/RLock.java +++ b/src/main/java/org/redisson/core/RLock.java @@ -91,7 +91,7 @@ public interface RLock extends Lock, RExpirable { void forceUnlock(); /** - * Checks if this lock locked by any thread in Redisson cluster + * Checks if this lock locked by any thread * * @return true if locked otherwise false */ diff --git a/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java b/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java new file mode 100644 index 000000000..d77d2e2a7 --- /dev/null +++ b/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java @@ -0,0 +1,54 @@ +package org.redisson.pubsub; + +import org.redisson.RedissonCountDownLatch; +import org.redisson.RedissonCountDownLatchEntry; +import org.redisson.client.BaseRedisPubSubListener; +import org.redisson.client.RedisPubSubListener; +import org.redisson.client.protocol.pubsub.PubSubType; + +import io.netty.util.concurrent.Promise; + +public class CountDownLatchPubSub extends PublishSubscribe { + + @Override + protected RedissonCountDownLatchEntry createEntry(Promise newPromise) { + return new RedissonCountDownLatchEntry(newPromise); + } + + @Override + protected RedisPubSubListener createListener(final String channelName, final RedissonCountDownLatchEntry value) { + RedisPubSubListener listener = new BaseRedisPubSubListener() { + + @Override + public void onMessage(String channel, Long message) { + if (!channelName.equals(channel)) { + return; + } + + if (message.equals(RedissonCountDownLatch.zeroCountMessage)) { + value.getLatch().open(); + } + if (message.equals(RedissonCountDownLatch.newCountMessage)) { + value.getLatch().close(); + } + } + + @Override + public boolean onStatus(PubSubType type, String channel) { + if (!channelName.equals(channel)) { + return false; + } + + if (type == PubSubType.SUBSCRIBE) { + value.getPromise().trySuccess(value); + return true; + } + return false; + } + + }; + return listener; + } + + +} diff --git a/src/main/java/org/redisson/pubsub/LockPubSub.java b/src/main/java/org/redisson/pubsub/LockPubSub.java new file mode 100644 index 000000000..4b0274275 --- /dev/null +++ b/src/main/java/org/redisson/pubsub/LockPubSub.java @@ -0,0 +1,51 @@ +package org.redisson.pubsub; + +import org.redisson.RedissonLock; +import org.redisson.RedissonLockEntry; +import org.redisson.client.BaseRedisPubSubListener; +import org.redisson.client.RedisPubSubListener; +import org.redisson.client.protocol.pubsub.PubSubType; + +import io.netty.util.concurrent.Promise; + +public class LockPubSub extends PublishSubscribe { + + @Override + protected RedissonLockEntry createEntry(Promise newPromise) { + return new RedissonLockEntry(newPromise); + } + + @Override + protected RedisPubSubListener createListener(final String channelName, final RedissonLockEntry value) { + RedisPubSubListener listener = new BaseRedisPubSubListener() { + + @Override + public void onMessage(String channel, Long message) { + if (!channelName.equals(channel)) { + return; + } + + if (message.equals(RedissonLock.unlockMessage)) { + value.getLatch().release(); + } + } + + @Override + public boolean onStatus(PubSubType type, String channel) { + if (!channelName.equals(channel)) { + return false; + } + + if (type == PubSubType.SUBSCRIBE) { + value.getPromise().trySuccess(value); + return true; + } + return false; + } + + }; + return listener; + } + + +} diff --git a/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/src/main/java/org/redisson/pubsub/PublishSubscribe.java new file mode 100644 index 000000000..bf01d3232 --- /dev/null +++ b/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -0,0 +1,61 @@ +package org.redisson.pubsub; + +import java.util.concurrent.ConcurrentMap; + +import org.redisson.PubSubEntry; +import org.redisson.client.RedisPubSubListener; +import org.redisson.client.codec.LongCodec; +import org.redisson.connection.ConnectionManager; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.PlatformDependent; + +abstract class PublishSubscribe> { + + private final ConcurrentMap entries = PlatformDependent.newConcurrentHashMap(); + + public void unsubscribe(E entry, String entryName, String channelName, ConnectionManager connectionManager) { + synchronized (this) { + if (entry.release() == 0) { + // just an assertion + boolean removed = entries.remove(entryName) == entry; + if (removed) { + connectionManager.unsubscribe(channelName); + } + } + } + } + + public E getEntry(String entryName) { + return entries.get(entryName); + } + + public Future subscribe(String entryName, String channelName, ConnectionManager connectionManager) { + synchronized (this) { + E entry = entries.get(entryName); + if (entry != null) { + entry.aquire(); + return entry.getPromise(); + } + + Promise newPromise = connectionManager.newPromise(); + E value = createEntry(newPromise); + value.aquire(); + + E oldValue = entries.putIfAbsent(entryName, value); + if (oldValue != null) { + oldValue.aquire(); + return oldValue.getPromise(); + } + + RedisPubSubListener listener = createListener(channelName, value); + connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener); + return newPromise; + } + } + + protected abstract E createEntry(Promise newPromise); + + protected abstract RedisPubSubListener createListener(String channelName, E value); +}