RedissonRedLock implemented. #533

pull/544/head
Nikita 9 years ago
parent fe3f3dfb0f
commit db4fba6533

@ -61,11 +61,13 @@ public class RedissonFairLock extends RedissonLock implements RLock {
return PUBSUB.getEntry(getEntryName() + ":" + threadId);
}
@Override
protected Future<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
}
@Override
protected void unsubscribe(Future<RedissonLockEntry> future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
@ -212,7 +214,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
}
@Override
Future<Boolean> forceUnlockAsync() {
public Future<Boolean> forceUnlockAsync() {
cancelExpirationRenewal();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads

@ -371,7 +371,8 @@ public class RedissonLock extends RedissonExpirable implements RLock {
get(forceUnlockAsync());
}
Future<Boolean> forceUnlockAsync() {
@Override
public Future<Boolean> forceUnlockAsync() {
cancelExpirationRenewal();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "

@ -115,7 +115,8 @@ public class RedissonReadLock extends RedissonLock implements RLock {
throw new UnsupportedOperationException();
}
Future<Boolean> forceUnlockAsync() {
@Override
public Future<Boolean> forceUnlockAsync() {
Future<Boolean> result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'read') then " +
"redis.call('del', KEYS[1]); " +

@ -117,7 +117,8 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
throw new UnsupportedOperationException();
}
Future<Boolean> forceUnlockAsync() {
@Override
public Future<Boolean> forceUnlockAsync() {
Future<Boolean> result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'write') then " +
"redis.call('del', KEYS[1]); " +

@ -114,6 +114,8 @@ public interface RLock extends Lock, RExpirable {
*/
int getHoldCount();
Future<Boolean> forceUnlockAsync();
Future<Void> unlockAsync();
Future<Boolean> tryLockAsync();

@ -17,6 +17,7 @@ package org.redisson.core;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -36,7 +37,7 @@ import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
/**
* Groups multiple independent locks and handles them as one lock.
* Groups multiple independent locks and manages them as one lock.
*
* @author Nikita Koksharov
*
@ -78,8 +79,9 @@ public class RedissonMultiLock implements Lock {
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();
long currentThreadId = Thread.currentThread().getId();
lock(promise, 0, leaseTime, unit, locks, currentThreadId);
long currentThreadId = Thread.currentThread().getId();
Queue<RLock> lockedLocks = new ConcurrentLinkedQueue<RLock>();
lock(promise, 0, leaseTime, unit, locks, currentThreadId, lockedLocks);
promise.sync();
}
@ -89,7 +91,8 @@ public class RedissonMultiLock implements Lock {
lockInterruptibly(-1, null);
}
private void lock(final Promise<Void> promise, final long waitTime, final long leaseTime, final TimeUnit unit, final List<RLock> locks, final long currentThreadId) throws InterruptedException {
private void lock(final Promise<Void> promise, final long waitTime, final long leaseTime, final TimeUnit unit,
final List<RLock> locks, final long currentThreadId, final Queue<RLock> lockedLocks) throws InterruptedException {
final AtomicInteger tryLockRequestsAmount = new AtomicInteger();
final Map<Future<Boolean>, RLock> tryLockFutures = new HashMap<Future<Boolean>, RLock>(locks.size());
@ -97,11 +100,10 @@ public class RedissonMultiLock implements Lock {
AtomicReference<RLock> lockedLockHolder = new AtomicReference<RLock>();
AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
Queue<RLock> lockedLocks = new ConcurrentLinkedQueue<RLock>();
@Override
public void operationComplete(final Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
if (isLockFailed(future)) {
failed.compareAndSet(null, future.cause());
}
@ -116,7 +118,7 @@ public class RedissonMultiLock implements Lock {
}
if (tryLockRequestsAmount.decrementAndGet() == 0) {
if (lockedLockHolder.get() == null && failed.get() == null) {
if (isAllLocksAcquired(lockedLockHolder, failed, lockedLocks)) {
promise.setSuccess(null);
return;
}
@ -141,7 +143,8 @@ public class RedissonMultiLock implements Lock {
}
protected void tryLockAgain(final Promise<Void> promise, final long waitTime, final long leaseTime,
final TimeUnit unit, final long currentThreadId, final Map<Future<Boolean>, RLock> tryLockFutures) {
final TimeUnit unit, final long currentThreadId, final Map<Future<Boolean>, RLock> tryLockFutures) throws InterruptedException {
lockedLocks.clear();
if (failed.get() != null) {
promise.setFailure(failed.get());
} else if (lockedLockHolder.get() != null) {
@ -154,20 +157,19 @@ public class RedissonMultiLock implements Lock {
return;
}
lockedLocks.add(lockedLock);
List<RLock> newLocks = new ArrayList<RLock>(tryLockFutures.values());
newLocks.remove(lockedLock);
lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId);
lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId, lockedLocks);
}
});
} else {
lock(promise, waitTime, leaseTime, unit, locks, currentThreadId, lockedLocks);
}
}
};
for (RLock lock : locks) {
if (lock.isHeldByCurrentThread()) {
continue;
}
tryLockRequestsAmount.incrementAndGet();
Future<Boolean> future;
if (waitTime > 0 || leaseTime > 0) {
@ -185,23 +187,23 @@ public class RedissonMultiLock implements Lock {
@Override
public boolean tryLock() {
List<Future<Boolean>> tryLockFutures = new ArrayList<Future<Boolean>>(locks.size());
Map<RLock, Future<Boolean>> tryLockFutures = new HashMap<RLock, Future<Boolean>>(locks.size());
for (RLock lock : locks) {
tryLockFutures.add(lock.tryLockAsync());
tryLockFutures.put(lock, lock.tryLockAsync());
}
return sync(tryLockFutures);
}
private boolean sync(List<Future<Boolean>> tryLockFutures) {
for (Future<Boolean> future : tryLockFutures) {
protected boolean sync(Map<RLock, Future<Boolean>> tryLockFutures) {
for (Future<Boolean> future : tryLockFutures.values()) {
try {
if (!future.syncUninterruptibly().getNow()) {
unlockInner();
unlockInner(tryLockFutures.keySet());
return false;
}
} catch (RuntimeException e) {
unlockInner();
unlockInner(tryLockFutures.keySet());
throw e;
}
}
@ -209,7 +211,7 @@ public class RedissonMultiLock implements Lock {
return true;
}
private void unlockInner() {
protected void unlockInner(Collection<RLock> locks) {
List<Future<Void>> futures = new ArrayList<Future<Void>>(locks.size());
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
@ -226,9 +228,9 @@ public class RedissonMultiLock implements Lock {
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
List<Future<Boolean>> tryLockFutures = new ArrayList<Future<Boolean>>(locks.size());
Map<RLock, Future<Boolean>> tryLockFutures = new HashMap<RLock, Future<Boolean>>(locks.size());
for (RLock lock : locks) {
tryLockFutures.add(lock.tryLockAsync(waitTime, leaseTime, unit));
tryLockFutures.put(lock, lock.tryLockAsync(waitTime, leaseTime, unit));
}
return sync(tryLockFutures);
@ -254,4 +256,12 @@ public class RedissonMultiLock implements Lock {
throw new UnsupportedOperationException();
}
protected boolean isLockFailed(Future<Boolean> future) {
return !future.isSuccess();
}
protected boolean isAllLocksAcquired(AtomicReference<RLock> lockedLockHolder, AtomicReference<Throwable> failed, Queue<RLock> lockedLocks) {
return lockedLockHolder.get() == null && failed.get() == null;
}
}

@ -0,0 +1,100 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.util.concurrent.Future;
/**
* RedLock locking algorithm implementation for multiple locks.
* It manages all locks as one.
*
* @see <a href="http://redis.io/topics/distlock">http://redis.io/topics/distlock</a>
*
* @author Nikita Koksharov
*
*/
public class RedissonRedLock extends RedissonMultiLock {
/**
* Creates instance with multiple {@link RLock} objects.
* Each RLock object could be created by own Redisson instance.
*
* @param locks
*/
public RedissonRedLock(RLock... locks) {
super(locks);
}
protected boolean sync(Map<RLock, Future<Boolean>> tryLockFutures) {
Queue<RLock> lockedLocks = new ConcurrentLinkedQueue<RLock>();
RuntimeException latestException = null;
for (Entry<RLock, Future<Boolean>> entry : tryLockFutures.entrySet()) {
try {
if (entry.getValue().syncUninterruptibly().getNow()) {
lockedLocks.add(entry.getKey());
}
} catch (RuntimeException e) {
latestException = e;
}
}
if (lockedLocks.size() < minLocksAmount(locks)) {
unlock();
lockedLocks.clear();
if (latestException != null) {
throw latestException;
}
return false;
}
return true;
}
public void unlock() {
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(locks.size());
for (RLock lock : locks) {
futures.add(lock.forceUnlockAsync());
}
for (Future<Boolean> future : futures) {
future.awaitUninterruptibly();
}
}
protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}
@Override
protected boolean isLockFailed(Future<Boolean> future) {
return false;
}
@Override
protected boolean isAllLocksAcquired(AtomicReference<RLock> lockedLockHolder, AtomicReference<Throwable> failed, Queue<RLock> lockedLocks) {
return (lockedLockHolder.get() == null && failed.get() == null) || lockedLocks.size() >= minLocksAmount(locks);
}
}

@ -17,7 +17,7 @@ public class RedissonMultiLockTest {
@Test
public void testMultiThreads() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance1();
RedisProcess redis1 = redisTestMultilockInstance(6320);
Config config1 = new Config();
config1.useSingleServer().setAddress("127.0.0.1:6320");
@ -52,28 +52,15 @@ public class RedissonMultiLockTest {
@Test
public void test() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance1();
RedisProcess redis2 = redisTestMultilockInstance2();
RedisProcess redis3 = redisTestMultilockInstance3();
RedisProcess redis1 = redisTestMultilockInstance(6320);
RedisProcess redis2 = redisTestMultilockInstance(6321);
RedisProcess redis3 = redisTestMultilockInstance(6322);
NioEventLoopGroup group = new NioEventLoopGroup();
Config config1 = new Config();
config1.useSingleServer().setAddress("127.0.0.1:6320");
config1.setEventLoopGroup(group);
RedissonClient client1 = Redisson.create(config1);
client1.getKeys().flushdb();
Config config2 = new Config();
config2.useSingleServer().setAddress("127.0.0.1:6321");
config2.setEventLoopGroup(group);
RedissonClient client2 = Redisson.create(config2);
client2.getKeys().flushdb();
Config config3 = new Config();
config3.useSingleServer().setAddress("127.0.0.1:6322");
config3.setEventLoopGroup(group);
RedissonClient client3 = Redisson.create(config3);
client3.getKeys().flushdb();
RedissonClient client1 = createClient(group, "127.0.0.1:6320");
RedissonClient client2 = createClient(group, "127.0.0.1:6321");
RedissonClient client3 = createClient(group, "127.0.0.1:6322");
final RLock lock1 = client1.getLock("lock1");
final RLock lock2 = client2.getLock("lock2");
@ -106,28 +93,26 @@ public class RedissonMultiLockTest {
assertThat(redis3.stop()).isEqualTo(0);
}
private RedisProcess redisTestMultilockInstance1() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6320)
.run();
private RedissonClient createClient(String host) {
return createClient(null, host);
}
private RedisProcess redisTestMultilockInstance2() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6321)
.run();
private RedissonClient createClient(NioEventLoopGroup group, String host) {
Config config1 = new Config();
config1.useSingleServer().setAddress(host);
config1.setEventLoopGroup(group);
RedissonClient client1 = Redisson.create(config1);
client1.getKeys().flushdb();
return client1;
}
private RedisProcess redisTestMultilockInstance3() throws IOException, InterruptedException {
private RedisProcess redisTestMultilockInstance(int port) throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(6322)
.port(port)
.run();
}
}

@ -0,0 +1,204 @@
package org.redisson;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import org.redisson.core.RLock;
import org.redisson.core.RedissonMultiLock;
import org.redisson.core.RedissonRedLock;
import io.netty.channel.nio.NioEventLoopGroup;
import org.redisson.RedisRunner.RedisProcess;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonRedLockTest {
@Test
public void testLockFailed() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance(6320);
RedisProcess redis2 = redisTestMultilockInstance(6321);
RedissonClient client1 = createClient("127.0.0.1:6320");
RedissonClient client2 = createClient("127.0.0.1:6321");
RLock lock1 = client1.getLock("lock1");
RLock lock2 = client1.getLock("lock2");
RLock lock3 = client2.getLock("lock3");
Thread t1 = new Thread() {
public void run() {
lock3.lock();
};
};
t1.start();
t1.join();
Thread t = new Thread() {
public void run() {
RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3);
lock.lock();
System.out.println("123");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
lock.unlock();
};
};
t.start();
t.join(1000);
RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3);
lock.lock();
System.out.println("1234");
lock.unlock();
assertThat(redis1.stop()).isEqualTo(0);
assertThat(redis2.stop()).isEqualTo(0);
}
@Test
public void testConnectionFailed() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance(6320);
RedisProcess redis2 = redisTestMultilockInstance(6321);
RedissonClient client1 = createClient("127.0.0.1:6320");
RedissonClient client2 = createClient("127.0.0.1:6321");
RLock lock1 = client1.getLock("lock1");
RLock lock2 = client1.getLock("lock2");
assertThat(redis2.stop()).isEqualTo(0);
RLock lock3 = client2.getLock("lock3");
Thread t = new Thread() {
public void run() {
RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3);
lock.lock();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
lock.unlock();
};
};
t.start();
t.join(1000);
RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3);
lock.lock();
lock.unlock();
assertThat(redis1.stop()).isEqualTo(0);
}
// @Test
public void testMultiThreads() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance(6320);
Config config1 = new Config();
config1.useSingleServer().setAddress("127.0.0.1:6320");
RedissonClient client = Redisson.create(config1);
RLock lock1 = client.getLock("lock1");
RLock lock2 = client.getLock("lock2");
RLock lock3 = client.getLock("lock3");
Thread t = new Thread() {
public void run() {
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
lock.lock();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
lock.unlock();
};
};
t.start();
t.join(1000);
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
lock.lock();
lock.unlock();
assertThat(redis1.stop()).isEqualTo(0);
}
// @Test
public void test() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance(6320);
RedisProcess redis2 = redisTestMultilockInstance(6321);
RedisProcess redis3 = redisTestMultilockInstance(6322);
NioEventLoopGroup group = new NioEventLoopGroup();
RedissonClient client1 = createClient(group, "127.0.0.1:6320");
RedissonClient client2 = createClient(group, "127.0.0.1:6321");
RedissonClient client3 = createClient(group, "127.0.0.1:6322");
final RLock lock1 = client1.getLock("lock1");
final RLock lock2 = client2.getLock("lock2");
final RLock lock3 = client3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
lock.lock();
final AtomicBoolean executed = new AtomicBoolean();
Thread t = new Thread() {
@Override
public void run() {
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
assertThat(lock.tryLock()).isFalse();
assertThat(lock.tryLock()).isFalse();
executed.set(true);
}
};
t.start();
t.join();
await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue());
lock.unlock();
assertThat(redis1.stop()).isEqualTo(0);
assertThat(redis2.stop()).isEqualTo(0);
assertThat(redis3.stop()).isEqualTo(0);
}
private RedissonClient createClient(String host) {
return createClient(null, host);
}
private RedissonClient createClient(NioEventLoopGroup group, String host) {
Config config1 = new Config();
config1.useSingleServer().setAddress(host);
config1.setEventLoopGroup(group);
RedissonClient client1 = Redisson.create(config1);
client1.getKeys().flushdb();
return client1;
}
private RedisProcess redisTestMultilockInstance(int port) throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.port(port)
.run();
}
}
Loading…
Cancel
Save