diff --git a/src/main/java/org/redisson/RedissonFairLock.java b/src/main/java/org/redisson/RedissonFairLock.java index 8ec0f3f55..ea7b9f753 100644 --- a/src/main/java/org/redisson/RedissonFairLock.java +++ b/src/main/java/org/redisson/RedissonFairLock.java @@ -61,11 +61,13 @@ public class RedissonFairLock extends RedissonLock implements RLock { return PUBSUB.getEntry(getEntryName() + ":" + threadId); } + @Override protected Future subscribe(long threadId) { return PUBSUB.subscribe(getEntryName() + ":" + threadId, getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager()); } + @Override protected void unsubscribe(Future future, long threadId) { PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId, getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager()); @@ -212,7 +214,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { } @Override - Future forceUnlockAsync() { + public Future forceUnlockAsync() { cancelExpirationRenewal(); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index d789ce821..a688baa80 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -371,7 +371,8 @@ public class RedissonLock extends RedissonExpirable implements RLock { get(forceUnlockAsync()); } - Future forceUnlockAsync() { + @Override + public Future forceUnlockAsync() { cancelExpirationRenewal(); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " diff --git a/src/main/java/org/redisson/RedissonReadLock.java b/src/main/java/org/redisson/RedissonReadLock.java index f986b154f..b9438d1c9 100644 --- a/src/main/java/org/redisson/RedissonReadLock.java +++ b/src/main/java/org/redisson/RedissonReadLock.java @@ -115,7 +115,8 @@ public class RedissonReadLock extends RedissonLock implements RLock { throw new UnsupportedOperationException(); } - Future forceUnlockAsync() { + @Override + public Future forceUnlockAsync() { Future result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hget', KEYS[1], 'mode') == 'read') then " + "redis.call('del', KEYS[1]); " + diff --git a/src/main/java/org/redisson/RedissonWriteLock.java b/src/main/java/org/redisson/RedissonWriteLock.java index ed5e9f9ec..4687c6011 100644 --- a/src/main/java/org/redisson/RedissonWriteLock.java +++ b/src/main/java/org/redisson/RedissonWriteLock.java @@ -117,7 +117,8 @@ public class RedissonWriteLock extends RedissonLock implements RLock { throw new UnsupportedOperationException(); } - Future forceUnlockAsync() { + @Override + public Future forceUnlockAsync() { Future result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hget', KEYS[1], 'mode') == 'write') then " + "redis.call('del', KEYS[1]); " + diff --git a/src/main/java/org/redisson/core/RLock.java b/src/main/java/org/redisson/core/RLock.java index 2ff50805d..d2579589c 100644 --- a/src/main/java/org/redisson/core/RLock.java +++ b/src/main/java/org/redisson/core/RLock.java @@ -114,6 +114,8 @@ public interface RLock extends Lock, RExpirable { */ int getHoldCount(); + Future forceUnlockAsync(); + Future unlockAsync(); Future tryLockAsync(); diff --git a/src/main/java/org/redisson/core/RedissonMultiLock.java b/src/main/java/org/redisson/core/RedissonMultiLock.java index 6f2883b62..c66d86564 100644 --- a/src/main/java/org/redisson/core/RedissonMultiLock.java +++ b/src/main/java/org/redisson/core/RedissonMultiLock.java @@ -17,6 +17,7 @@ package org.redisson.core; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,7 +37,7 @@ import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; /** - * Groups multiple independent locks and handles them as one lock. + * Groups multiple independent locks and manages them as one lock. * * @author Nikita Koksharov * @@ -78,8 +79,9 @@ public class RedissonMultiLock implements Lock { public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); - long currentThreadId = Thread.currentThread().getId(); - lock(promise, 0, leaseTime, unit, locks, currentThreadId); + long currentThreadId = Thread.currentThread().getId(); + Queue lockedLocks = new ConcurrentLinkedQueue(); + lock(promise, 0, leaseTime, unit, locks, currentThreadId, lockedLocks); promise.sync(); } @@ -89,7 +91,8 @@ public class RedissonMultiLock implements Lock { lockInterruptibly(-1, null); } - private void lock(final Promise promise, final long waitTime, final long leaseTime, final TimeUnit unit, final List locks, final long currentThreadId) throws InterruptedException { + private void lock(final Promise promise, final long waitTime, final long leaseTime, final TimeUnit unit, + final List locks, final long currentThreadId, final Queue lockedLocks) throws InterruptedException { final AtomicInteger tryLockRequestsAmount = new AtomicInteger(); final Map, RLock> tryLockFutures = new HashMap, RLock>(locks.size()); @@ -97,11 +100,10 @@ public class RedissonMultiLock implements Lock { AtomicReference lockedLockHolder = new AtomicReference(); AtomicReference failed = new AtomicReference(); - Queue lockedLocks = new ConcurrentLinkedQueue(); @Override public void operationComplete(final Future future) throws Exception { - if (!future.isSuccess()) { + if (isLockFailed(future)) { failed.compareAndSet(null, future.cause()); } @@ -116,7 +118,7 @@ public class RedissonMultiLock implements Lock { } if (tryLockRequestsAmount.decrementAndGet() == 0) { - if (lockedLockHolder.get() == null && failed.get() == null) { + if (isAllLocksAcquired(lockedLockHolder, failed, lockedLocks)) { promise.setSuccess(null); return; } @@ -141,7 +143,8 @@ public class RedissonMultiLock implements Lock { } protected void tryLockAgain(final Promise promise, final long waitTime, final long leaseTime, - final TimeUnit unit, final long currentThreadId, final Map, RLock> tryLockFutures) { + final TimeUnit unit, final long currentThreadId, final Map, RLock> tryLockFutures) throws InterruptedException { + lockedLocks.clear(); if (failed.get() != null) { promise.setFailure(failed.get()); } else if (lockedLockHolder.get() != null) { @@ -154,20 +157,19 @@ public class RedissonMultiLock implements Lock { return; } + lockedLocks.add(lockedLock); List newLocks = new ArrayList(tryLockFutures.values()); newLocks.remove(lockedLock); - lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId); + lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId, lockedLocks); } }); + } else { + lock(promise, waitTime, leaseTime, unit, locks, currentThreadId, lockedLocks); } } }; for (RLock lock : locks) { - if (lock.isHeldByCurrentThread()) { - continue; - } - tryLockRequestsAmount.incrementAndGet(); Future future; if (waitTime > 0 || leaseTime > 0) { @@ -185,23 +187,23 @@ public class RedissonMultiLock implements Lock { @Override public boolean tryLock() { - List> tryLockFutures = new ArrayList>(locks.size()); + Map> tryLockFutures = new HashMap>(locks.size()); for (RLock lock : locks) { - tryLockFutures.add(lock.tryLockAsync()); + tryLockFutures.put(lock, lock.tryLockAsync()); } return sync(tryLockFutures); } - private boolean sync(List> tryLockFutures) { - for (Future future : tryLockFutures) { + protected boolean sync(Map> tryLockFutures) { + for (Future future : tryLockFutures.values()) { try { if (!future.syncUninterruptibly().getNow()) { - unlockInner(); + unlockInner(tryLockFutures.keySet()); return false; } } catch (RuntimeException e) { - unlockInner(); + unlockInner(tryLockFutures.keySet()); throw e; } } @@ -209,7 +211,7 @@ public class RedissonMultiLock implements Lock { return true; } - private void unlockInner() { + protected void unlockInner(Collection locks) { List> futures = new ArrayList>(locks.size()); for (RLock lock : locks) { futures.add(lock.unlockAsync()); @@ -226,9 +228,9 @@ public class RedissonMultiLock implements Lock { } public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { - List> tryLockFutures = new ArrayList>(locks.size()); + Map> tryLockFutures = new HashMap>(locks.size()); for (RLock lock : locks) { - tryLockFutures.add(lock.tryLockAsync(waitTime, leaseTime, unit)); + tryLockFutures.put(lock, lock.tryLockAsync(waitTime, leaseTime, unit)); } return sync(tryLockFutures); @@ -254,4 +256,12 @@ public class RedissonMultiLock implements Lock { throw new UnsupportedOperationException(); } + protected boolean isLockFailed(Future future) { + return !future.isSuccess(); + } + + protected boolean isAllLocksAcquired(AtomicReference lockedLockHolder, AtomicReference failed, Queue lockedLocks) { + return lockedLockHolder.get() == null && failed.get() == null; + } + } diff --git a/src/main/java/org/redisson/core/RedissonRedLock.java b/src/main/java/org/redisson/core/RedissonRedLock.java new file mode 100644 index 000000000..195e272be --- /dev/null +++ b/src/main/java/org/redisson/core/RedissonRedLock.java @@ -0,0 +1,100 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.util.concurrent.Future; + +/** + * RedLock locking algorithm implementation for multiple locks. + * It manages all locks as one. + * + * @see http://redis.io/topics/distlock + * + * @author Nikita Koksharov + * + */ +public class RedissonRedLock extends RedissonMultiLock { + + /** + * Creates instance with multiple {@link RLock} objects. + * Each RLock object could be created by own Redisson instance. + * + * @param locks + */ + public RedissonRedLock(RLock... locks) { + super(locks); + } + + protected boolean sync(Map> tryLockFutures) { + Queue lockedLocks = new ConcurrentLinkedQueue(); + RuntimeException latestException = null; + for (Entry> entry : tryLockFutures.entrySet()) { + try { + if (entry.getValue().syncUninterruptibly().getNow()) { + lockedLocks.add(entry.getKey()); + } + } catch (RuntimeException e) { + latestException = e; + } + } + + if (lockedLocks.size() < minLocksAmount(locks)) { + unlock(); + lockedLocks.clear(); + if (latestException != null) { + throw latestException; + } + return false; + } + + return true; + } + + public void unlock() { + List> futures = new ArrayList>(locks.size()); + + for (RLock lock : locks) { + futures.add(lock.forceUnlockAsync()); + } + + for (Future future : futures) { + future.awaitUninterruptibly(); + } + } + + protected int minLocksAmount(final List locks) { + return locks.size()/2 + 1; + } + + @Override + protected boolean isLockFailed(Future future) { + return false; + } + + @Override + protected boolean isAllLocksAcquired(AtomicReference lockedLockHolder, AtomicReference failed, Queue lockedLocks) { + return (lockedLockHolder.get() == null && failed.get() == null) || lockedLocks.size() >= minLocksAmount(locks); + } + +} diff --git a/src/test/java/org/redisson/RedissonMultiLockTest.java b/src/test/java/org/redisson/RedissonMultiLockTest.java index 62af6b0e8..359036c0b 100644 --- a/src/test/java/org/redisson/RedissonMultiLockTest.java +++ b/src/test/java/org/redisson/RedissonMultiLockTest.java @@ -17,7 +17,7 @@ public class RedissonMultiLockTest { @Test public void testMultiThreads() throws IOException, InterruptedException { - RedisProcess redis1 = redisTestMultilockInstance1(); + RedisProcess redis1 = redisTestMultilockInstance(6320); Config config1 = new Config(); config1.useSingleServer().setAddress("127.0.0.1:6320"); @@ -52,28 +52,15 @@ public class RedissonMultiLockTest { @Test public void test() throws IOException, InterruptedException { - RedisProcess redis1 = redisTestMultilockInstance1(); - RedisProcess redis2 = redisTestMultilockInstance2(); - RedisProcess redis3 = redisTestMultilockInstance3(); + RedisProcess redis1 = redisTestMultilockInstance(6320); + RedisProcess redis2 = redisTestMultilockInstance(6321); + RedisProcess redis3 = redisTestMultilockInstance(6322); NioEventLoopGroup group = new NioEventLoopGroup(); - Config config1 = new Config(); - config1.useSingleServer().setAddress("127.0.0.1:6320"); - config1.setEventLoopGroup(group); - RedissonClient client1 = Redisson.create(config1); - client1.getKeys().flushdb(); - - Config config2 = new Config(); - config2.useSingleServer().setAddress("127.0.0.1:6321"); - config2.setEventLoopGroup(group); - RedissonClient client2 = Redisson.create(config2); - client2.getKeys().flushdb(); - - Config config3 = new Config(); - config3.useSingleServer().setAddress("127.0.0.1:6322"); - config3.setEventLoopGroup(group); - RedissonClient client3 = Redisson.create(config3); - client3.getKeys().flushdb(); + + RedissonClient client1 = createClient(group, "127.0.0.1:6320"); + RedissonClient client2 = createClient(group, "127.0.0.1:6321"); + RedissonClient client3 = createClient(group, "127.0.0.1:6322"); final RLock lock1 = client1.getLock("lock1"); final RLock lock2 = client2.getLock("lock2"); @@ -106,28 +93,26 @@ public class RedissonMultiLockTest { assertThat(redis3.stop()).isEqualTo(0); } - - private RedisProcess redisTestMultilockInstance1() throws IOException, InterruptedException { - return new RedisRunner() - .nosave() - .randomDir() - .port(6320) - .run(); + + private RedissonClient createClient(String host) { + return createClient(null, host); } - - private RedisProcess redisTestMultilockInstance2() throws IOException, InterruptedException { - return new RedisRunner() - .nosave() - .randomDir() - .port(6321) - .run(); + + private RedissonClient createClient(NioEventLoopGroup group, String host) { + Config config1 = new Config(); + config1.useSingleServer().setAddress(host); + config1.setEventLoopGroup(group); + RedissonClient client1 = Redisson.create(config1); + client1.getKeys().flushdb(); + return client1; } - private RedisProcess redisTestMultilockInstance3() throws IOException, InterruptedException { + private RedisProcess redisTestMultilockInstance(int port) throws IOException, InterruptedException { return new RedisRunner() .nosave() .randomDir() - .port(6322) + .port(port) .run(); } + } diff --git a/src/test/java/org/redisson/RedissonRedLockTest.java b/src/test/java/org/redisson/RedissonRedLockTest.java new file mode 100644 index 000000000..459f9dc12 --- /dev/null +++ b/src/test/java/org/redisson/RedissonRedLockTest.java @@ -0,0 +1,204 @@ +package org.redisson; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; +import org.redisson.core.RLock; +import org.redisson.core.RedissonMultiLock; +import org.redisson.core.RedissonRedLock; + +import io.netty.channel.nio.NioEventLoopGroup; +import org.redisson.RedisRunner.RedisProcess; +import static com.jayway.awaitility.Awaitility.await; +import static org.assertj.core.api.Assertions.assertThat; + +public class RedissonRedLockTest { + + @Test + public void testLockFailed() throws IOException, InterruptedException { + RedisProcess redis1 = redisTestMultilockInstance(6320); + RedisProcess redis2 = redisTestMultilockInstance(6321); + + RedissonClient client1 = createClient("127.0.0.1:6320"); + RedissonClient client2 = createClient("127.0.0.1:6321"); + + RLock lock1 = client1.getLock("lock1"); + RLock lock2 = client1.getLock("lock2"); + RLock lock3 = client2.getLock("lock3"); + + Thread t1 = new Thread() { + public void run() { + lock3.lock(); + }; + }; + t1.start(); + t1.join(); + + Thread t = new Thread() { + public void run() { + RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); + lock.lock(); + + System.out.println("123"); + + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + } + + lock.unlock(); + }; + }; + t.start(); + t.join(1000); + + RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); + lock.lock(); + System.out.println("1234"); + lock.unlock(); + + assertThat(redis1.stop()).isEqualTo(0); + assertThat(redis2.stop()).isEqualTo(0); + } + + + @Test + public void testConnectionFailed() throws IOException, InterruptedException { + RedisProcess redis1 = redisTestMultilockInstance(6320); + RedisProcess redis2 = redisTestMultilockInstance(6321); + + RedissonClient client1 = createClient("127.0.0.1:6320"); + RedissonClient client2 = createClient("127.0.0.1:6321"); + + RLock lock1 = client1.getLock("lock1"); + RLock lock2 = client1.getLock("lock2"); + assertThat(redis2.stop()).isEqualTo(0); + RLock lock3 = client2.getLock("lock3"); + + Thread t = new Thread() { + public void run() { + RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); + lock.lock(); + + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + } + + lock.unlock(); + }; + }; + t.start(); + t.join(1000); + + RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); + lock.lock(); + lock.unlock(); + + assertThat(redis1.stop()).isEqualTo(0); + } + + +// @Test + public void testMultiThreads() throws IOException, InterruptedException { + RedisProcess redis1 = redisTestMultilockInstance(6320); + + Config config1 = new Config(); + config1.useSingleServer().setAddress("127.0.0.1:6320"); + RedissonClient client = Redisson.create(config1); + + RLock lock1 = client.getLock("lock1"); + RLock lock2 = client.getLock("lock2"); + RLock lock3 = client.getLock("lock3"); + + Thread t = new Thread() { + public void run() { + RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); + lock.lock(); + + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + } + + lock.unlock(); + }; + }; + t.start(); + t.join(1000); + + RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); + lock.lock(); + lock.unlock(); + + assertThat(redis1.stop()).isEqualTo(0); + } + +// @Test + public void test() throws IOException, InterruptedException { + RedisProcess redis1 = redisTestMultilockInstance(6320); + RedisProcess redis2 = redisTestMultilockInstance(6321); + RedisProcess redis3 = redisTestMultilockInstance(6322); + + NioEventLoopGroup group = new NioEventLoopGroup(); + + RedissonClient client1 = createClient(group, "127.0.0.1:6320"); + RedissonClient client2 = createClient(group, "127.0.0.1:6321"); + RedissonClient client3 = createClient(group, "127.0.0.1:6322"); + + final RLock lock1 = client1.getLock("lock1"); + final RLock lock2 = client2.getLock("lock2"); + final RLock lock3 = client3.getLock("lock3"); + + RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); + lock.lock(); + + final AtomicBoolean executed = new AtomicBoolean(); + + Thread t = new Thread() { + @Override + public void run() { + RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); + assertThat(lock.tryLock()).isFalse(); + assertThat(lock.tryLock()).isFalse(); + executed.set(true); + } + }; + t.start(); + t.join(); + + await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue()); + + lock.unlock(); + + assertThat(redis1.stop()).isEqualTo(0); + + assertThat(redis2.stop()).isEqualTo(0); + + assertThat(redis3.stop()).isEqualTo(0); + } + + private RedissonClient createClient(String host) { + return createClient(null, host); + } + + private RedissonClient createClient(NioEventLoopGroup group, String host) { + Config config1 = new Config(); + config1.useSingleServer().setAddress(host); + config1.setEventLoopGroup(group); + RedissonClient client1 = Redisson.create(config1); + client1.getKeys().flushdb(); + return client1; + } + + private RedisProcess redisTestMultilockInstance(int port) throws IOException, InterruptedException { + return new RedisRunner() + .nosave() + .randomDir() + .port(port) + .run(); + } + +}