From 2447a4558a1ee530cf55b8d460a89640aeccb76c Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 8 Dec 2015 18:05:05 +0300 Subject: [PATCH] ReadWriteLock implementation added. #206 --- src/main/java/org/redisson/PubSubEntry.java | 15 ++ src/main/java/org/redisson/Redisson.java | 7 + .../java/org/redisson/RedissonClient.java | 4 + .../java/org/redisson/RedissonReadLock.java | 151 +++++++++++++++++ .../org/redisson/RedissonReadWriteLock.java | 47 ++++++ .../java/org/redisson/RedissonWriteLock.java | 153 ++++++++++++++++++ .../org/redisson/core/RReadWriteLock.java | 35 ++++ .../redisson/pubsub/CountDownLatchPubSub.java | 15 ++ .../java/org/redisson/pubsub/LockPubSub.java | 15 ++ .../org/redisson/pubsub/PublishSubscribe.java | 15 ++ 10 files changed, 457 insertions(+) create mode 100644 src/main/java/org/redisson/RedissonReadLock.java create mode 100644 src/main/java/org/redisson/RedissonReadWriteLock.java create mode 100644 src/main/java/org/redisson/RedissonWriteLock.java create mode 100644 src/main/java/org/redisson/core/RReadWriteLock.java diff --git a/src/main/java/org/redisson/PubSubEntry.java b/src/main/java/org/redisson/PubSubEntry.java index 5ebcd211d..e79008eb9 100644 --- a/src/main/java/org/redisson/PubSubEntry.java +++ b/src/main/java/org/redisson/PubSubEntry.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson; import io.netty.util.concurrent.Promise; diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 5706ecbf6..f62daa69b 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; +import java.util.concurrent.locks.ReadWriteLock; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; @@ -50,6 +51,7 @@ import org.redisson.core.RLock; import org.redisson.core.RMap; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; +import org.redisson.core.RReadWriteLock; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSet; @@ -141,6 +143,11 @@ public class Redisson implements RedissonClient { return new RedissonReactive(config); } + @Override + public RReadWriteLock getReadWriteLock(String name) { + return new RedissonReadWriteLock(commandExecutor, name, id); + } + @Override public RBucket getBucket(String name) { return new RedissonBucket(commandExecutor, name); diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index 735620f92..3ca113984 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -16,6 +16,7 @@ package org.redisson; import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; import org.redisson.client.codec.Codec; import org.redisson.core.ClusterNode; @@ -37,6 +38,7 @@ import org.redisson.core.RLock; import org.redisson.core.RMap; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; +import org.redisson.core.RReadWriteLock; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSet; @@ -52,6 +54,8 @@ import org.redisson.core.RTopic; */ public interface RedissonClient { + RReadWriteLock getReadWriteLock(String name); + /** * Returns map-based cache instance with eviction support by name * using provided codec for both cache keys and values. diff --git a/src/main/java/org/redisson/RedissonReadLock.java b/src/main/java/org/redisson/RedissonReadLock.java new file mode 100644 index 000000000..dca65ae62 --- /dev/null +++ b/src/main/java/org/redisson/RedissonReadLock.java @@ -0,0 +1,151 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; + +import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandExecutor; +import org.redisson.core.RLock; + +import io.netty.util.concurrent.Future; + +/** + * Lock will be removed automatically if client disconnects. + * + * @author Nikita Koksharov + * + */ +public class RedissonReadLock extends RedissonLock implements RLock { + + private final CommandExecutor commandExecutor; + + protected RedissonReadLock(CommandExecutor commandExecutor, String name, UUID id) { + super(commandExecutor, name, id); + this.commandExecutor = commandExecutor; + } + + private String getLockName() { + return id + ":" + Thread.currentThread().getId(); + } + + String getChannelName() { + return "redisson_rwlock__{" + getName() + "}"; + } + + Long tryLockInner(final long leaseTime, final TimeUnit unit) { + internalLockLeaseTime = unit.toMillis(leaseTime); + + return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, + "local mode = redis.call('hget', KEYS[1], 'mode'); " + + "if (mode == false) then " + + "redis.call('hset', KEYS[1], 'mode', 'read'); " + + "redis.call('hset', KEYS[1], KEYS[2], 1); " + + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "return nil; " + + "end; " + + "if (mode == 'read') then " + + "redis.call('hincrby', KEYS[1], KEYS[2], 1); " + + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "return nil; " + + "end;" + + "return redis.call('pttl', KEYS[1]);", + Arrays.asList(getName(), getLockName()), internalLockLeaseTime); + } + + @Override + public void unlock() { + Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local mode = redis.call('hget', KEYS[1], 'mode'); " + + "if (mode == false) then " + + "redis.call('publish', KEYS[3], ARGV[1]); " + + "return true; " + + "else if (mode == 'read') then " + + "local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " + + "if (lockExists == false) then " + + "return nil;" + + "else " + + "local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " + + "if (counter > 0) then " + + "redis.call('pexpire', KEYS[1], ARGV[2]); " + + "return false; " + + "else " + + "redis.call('hdel', KEYS[1], KEYS[2]); " + + "if (redis.call('hlen', KEYS[1]) == 1) then " + + "redis.call('del', KEYS[1]); " + + "redis.call('publish', KEYS[3], ARGV[1]); " + + "end; " + + "return true; "+ + "end; " + + "end; " + + "end; " + + "return nil; ", + Arrays.asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime); + if (opStatus == null) { + throw new IllegalStateException("Can't unlock lock Current id: " + + id + " thread-id: " + Thread.currentThread().getId()); + } + if (opStatus) { + stopRefreshTask(); + } + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + + Future forceUnlockAsync() { + stopRefreshTask(); + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " + + "if (redis.call('hlen', KEYS[1]) == 1) then " + + "redis.call('del', KEYS[1]); " + + "redis.call('publish', KEYS[3], ARGV[1]); " + + "end; " + + "return true; " + + "else " + + "return false; " + + "end;", + Arrays.asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage); + } + + @Override + public boolean isLocked() { + String res = commandExecutor.read(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); + return "read".equals(res); + } + + @Override + public boolean isHeldByCurrentThread() { + return commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName()); + } + + @Override + public int getHoldCount() { + Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName()); + if (res == null) { + return 0; + } + return res.intValue(); + } + +} diff --git a/src/main/java/org/redisson/RedissonReadWriteLock.java b/src/main/java/org/redisson/RedissonReadWriteLock.java new file mode 100644 index 000000000..06913f1ac --- /dev/null +++ b/src/main/java/org/redisson/RedissonReadWriteLock.java @@ -0,0 +1,47 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.UUID; + +import org.redisson.command.CommandExecutor; +import org.redisson.core.RLock; +import org.redisson.core.RReadWriteLock; + +public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock { + + public static final Long unlockMessage = 0L; + + private final UUID id; + private final CommandExecutor commandExecutor; + + RedissonReadWriteLock(CommandExecutor commandExecutor, String name, UUID id) { + super(commandExecutor, name); + this.commandExecutor = commandExecutor; + this.id = id; + } + + @Override + public RLock readLock() { + return new RedissonReadLock(commandExecutor, getName(), id); + } + + @Override + public RLock writeLock() { + return new RedissonWriteLock(commandExecutor, getName(), id); + } + +} diff --git a/src/main/java/org/redisson/RedissonWriteLock.java b/src/main/java/org/redisson/RedissonWriteLock.java new file mode 100644 index 000000000..327d65842 --- /dev/null +++ b/src/main/java/org/redisson/RedissonWriteLock.java @@ -0,0 +1,153 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; + +import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandExecutor; +import org.redisson.core.RLock; + +import io.netty.util.concurrent.Future; + +/** + * Lock will be removed automatically if client disconnects. + * + * @author Nikita Koksharov + * + */ +public class RedissonWriteLock extends RedissonLock implements RLock { + + private final CommandExecutor commandExecutor; + + protected RedissonWriteLock(CommandExecutor commandExecutor, String name, UUID id) { + super(commandExecutor, name, id); + this.commandExecutor = commandExecutor; + } + + private String getLockName() { + return id + ":" + Thread.currentThread().getId(); + } + + String getChannelName() { + return "redisson_rwlock__{" + getName() + "}"; + } + + Long tryLockInner(final long leaseTime, final TimeUnit unit) { + internalLockLeaseTime = unit.toMillis(leaseTime); + + return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, + "local mode = redis.call('hget', KEYS[1], 'mode'); " + + "if (mode == false) then " + + "redis.call('hset', KEYS[1], 'mode', 'write'); " + + "redis.call('hset', KEYS[1], KEYS[2], 1); " + + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "return nil; " + + "end; " + + "if (mode == 'write') then " + + "if (redis.call('hexists', KEYS[1], KEYS[2]) == 1) then " + + "redis.call('hincrby', KEYS[1], KEYS[2], 1); " + + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "return nil; " + + "end; " + + "end;" + + "return redis.call('pttl', KEYS[1]);", + Arrays.asList(getName(), getLockName()), internalLockLeaseTime); + } + + @Override + public void unlock() { + Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local mode = redis.call('hget', KEYS[1], 'mode'); " + + "if (mode == false) then " + + "redis.call('publish', KEYS[3], ARGV[1]); " + + "return true; " + + "else if (mode == 'write') then " + + "local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " + + "if (lockExists == false) then " + + "return nil;" + + "else " + + "local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " + + "if (counter > 0) then " + + "redis.call('pexpire', KEYS[1], ARGV[2]); " + + "return false; " + + "else " + + "redis.call('hdel', KEYS[1], KEYS[2]); " + + "if (redis.call('hlen', KEYS[1]) == 1) then " + + "redis.call('del', KEYS[1]); " + + "redis.call('publish', KEYS[3], ARGV[1]); " + + "end; " + + "return true; "+ + "end; " + + "end; " + + "end; " + + "return nil;", + Arrays.asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime); + if (opStatus == null) { + throw new IllegalStateException("Can't unlock lock Current id: " + + id + " thread-id: " + Thread.currentThread().getId()); + } + if (opStatus) { + stopRefreshTask(); + } + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + + Future forceUnlockAsync() { + stopRefreshTask(); + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " + + "if (redis.call('hlen', KEYS[1]) == 1) then " + + "redis.call('del', KEYS[1]); " + + "redis.call('publish', KEYS[3], ARGV[1]); " + + "end; " + + "return true; " + + "else " + + "return false; " + + "end;", + Arrays.asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage); + } + + @Override + public boolean isLocked() { + String res = commandExecutor.read(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); + return "write".equals(res); + } + + @Override + public boolean isHeldByCurrentThread() { + return commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName()); + } + + @Override + public int getHoldCount() { + Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName()); + if (res == null) { + return 0; + } + return res.intValue(); + } + +} diff --git a/src/main/java/org/redisson/core/RReadWriteLock.java b/src/main/java/org/redisson/core/RReadWriteLock.java new file mode 100644 index 000000000..b0fda523d --- /dev/null +++ b/src/main/java/org/redisson/core/RReadWriteLock.java @@ -0,0 +1,35 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.concurrent.locks.ReadWriteLock; + +/** + * Distributed implementation of {@link java.util.concurrent.locks.Lock} + * Implements reentrant lock. + * Use {@link RReadWriteLock#getHoldCount()} to get a holds count. + * + * @author Nikita Koksharov + * + */ + +public interface RReadWriteLock extends ReadWriteLock, RExpirable { + + RLock readLock(); + + RLock writeLock(); + +} diff --git a/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java b/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java index d77d2e2a7..b0e44745e 100644 --- a/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java +++ b/src/main/java/org/redisson/pubsub/CountDownLatchPubSub.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.pubsub; import org.redisson.RedissonCountDownLatch; diff --git a/src/main/java/org/redisson/pubsub/LockPubSub.java b/src/main/java/org/redisson/pubsub/LockPubSub.java index 4b0274275..54b7a21ee 100644 --- a/src/main/java/org/redisson/pubsub/LockPubSub.java +++ b/src/main/java/org/redisson/pubsub/LockPubSub.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.pubsub; import org.redisson.RedissonLock; diff --git a/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/src/main/java/org/redisson/pubsub/PublishSubscribe.java index bf01d3232..f09f6e773 100644 --- a/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.pubsub; import java.util.concurrent.ConcurrentMap;