diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 4e2c18f2c..8f777a8a8 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -40,6 +40,7 @@ import org.redisson.api.RMapCacheReactive; import org.redisson.api.RMapReactive; import org.redisson.api.RPatternTopicReactive; import org.redisson.api.RQueueReactive; +import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScriptReactive; import org.redisson.api.RSetCacheReactive; @@ -69,6 +70,7 @@ import org.redisson.reactive.RedissonMapCacheReactive; import org.redisson.reactive.RedissonMapReactive; import org.redisson.reactive.RedissonPatternTopicReactive; import org.redisson.reactive.RedissonQueueReactive; +import org.redisson.reactive.RedissonReadWriteLockReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonScriptReactive; import org.redisson.reactive.RedissonSetCacheReactive; @@ -101,6 +103,11 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getCodecProvider(); } + @Override + public RReadWriteLockReactive getReadWriteLock(String name) { + return new RedissonReadWriteLockReactive(commandExecutor, name, id); + } + @Override public RLockReactive getLock(String name) { return new RedissonLockReactive(commandExecutor, name, id); diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index a55885116..4bf5f770b 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -26,7 +26,7 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.pubsub.LockPubSub; import io.netty.util.concurrent.Future; @@ -40,16 +40,13 @@ import io.netty.util.concurrent.FutureListener; */ public class RedissonReadLock extends RedissonLock implements RLock { - private final CommandExecutor commandExecutor; - - protected RedissonReadLock(CommandExecutor commandExecutor, String name, UUID id) { + public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name, id); - this.commandExecutor = commandExecutor; } @Override String getChannelName() { - return "redisson_rwlock__{" + getName() + "}"; + return prefixName("redisson_rwlock", getName()); } String getWriteLockName(long threadId) { @@ -159,7 +156,8 @@ public class RedissonReadLock extends RedissonLock implements RLock { @Override public boolean isLocked() { - String res = commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); + RFuture future = commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); + String res = get(future); return "read".equals(res); } diff --git a/redisson/src/main/java/org/redisson/RedissonReadWriteLock.java b/redisson/src/main/java/org/redisson/RedissonReadWriteLock.java index f6d7375f9..5bb77ed1a 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadWriteLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadWriteLock.java @@ -20,7 +20,7 @@ import java.util.concurrent.locks.Lock; import org.redisson.api.RLock; import org.redisson.api.RReadWriteLock; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; /** * A {@code ReadWriteLock} maintains a pair of associated {@link @@ -38,11 +38,9 @@ import org.redisson.command.CommandExecutor; public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock { private final UUID id; - private final CommandExecutor commandExecutor; - RedissonReadWriteLock(CommandExecutor commandExecutor, String name, UUID id) { + public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name); - this.commandExecutor = commandExecutor; this.id = id; } diff --git a/redisson/src/main/java/org/redisson/RedissonWriteLock.java b/redisson/src/main/java/org/redisson/RedissonWriteLock.java index 4a56e5807..865189236 100644 --- a/redisson/src/main/java/org/redisson/RedissonWriteLock.java +++ b/redisson/src/main/java/org/redisson/RedissonWriteLock.java @@ -26,7 +26,7 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.pubsub.LockPubSub; import io.netty.util.concurrent.Future; @@ -40,16 +40,13 @@ import io.netty.util.concurrent.FutureListener; */ public class RedissonWriteLock extends RedissonLock implements RLock { - private final CommandExecutor commandExecutor; - - protected RedissonWriteLock(CommandExecutor commandExecutor, String name, UUID id) { + protected RedissonWriteLock(CommandAsyncExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name, id); - this.commandExecutor = commandExecutor; } @Override String getChannelName() { - return "redisson_rwlock__{" + getName() + "}"; + return prefixName("redisson_rwlock", getName()); } @Override @@ -144,7 +141,8 @@ public class RedissonWriteLock extends RedissonLock implements RLock { @Override public boolean isLocked() { - String res = commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); + RFuture future = commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); + String res = get(future); return "write".equals(res); } diff --git a/redisson/src/main/java/org/redisson/api/RReadWriteLockReactive.java b/redisson/src/main/java/org/redisson/api/RReadWriteLockReactive.java new file mode 100644 index 000000000..02a065d28 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RReadWriteLockReactive.java @@ -0,0 +1,49 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.locks.Lock; + +/** + * A {@code ReadWriteLock} maintains a pair of associated {@link + * Lock locks}, one for read-only operations and one for writing. + * The {@link #readLock read lock} may be held simultaneously by + * multiple reader threads, so long as there are no writers. The + * {@link #writeLock write lock} is exclusive. + * + * Works in non-fair mode. Therefore order of read and write + * locking is unspecified. + * + * @author Nikita Koksharov + * + */ +public interface RReadWriteLockReactive extends RExpirableReactive { + + /** + * Returns the lock used for reading. + * + * @return the lock used for reading + */ + RLockReactive readLock(); + + /** + * Returns the lock used for writing. + * + * @return the lock used for writing + */ + RLockReactive writeLock(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index 35ffce644..1105eb7a4 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -30,6 +30,14 @@ import org.redisson.config.Config; */ public interface RedissonReactiveClient { + /** + * Returns readWriteLock instance by name. + * + * @param name - name of object + * @return Lock object + */ + RReadWriteLockReactive getReadWriteLock(String name); + /** * Returns lock instance by name. *

diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java index be2c26793..d50951e1c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java @@ -23,6 +23,7 @@ import org.redisson.RedissonLock; import org.redisson.api.RFuture; import org.redisson.api.RLockAsync; import org.redisson.api.RLockReactive; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandReactiveExecutor; import reactor.fn.Supplier; @@ -38,9 +39,13 @@ public class RedissonLockReactive extends RedissonExpirableReactive implements R public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, UUID id) { super(connectionManager, name); - instance = new RedissonLock(connectionManager, name, id); + instance = createLock(connectionManager, name, id); } + protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) { + return new RedissonLock(commandExecutor, name, id); + } + @Override public Publisher forceUnlock() { return reactive(new Supplier>() { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java new file mode 100644 index 000000000..000943bdb --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java @@ -0,0 +1,65 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.UUID; + +import org.redisson.RedissonReadWriteLock; +import org.redisson.api.RLockAsync; +import org.redisson.api.RLockReactive; +import org.redisson.api.RReadWriteLock; +import org.redisson.api.RReadWriteLockReactive; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.command.CommandReactiveExecutor; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonReadWriteLockReactive extends RedissonExpirableReactive implements RReadWriteLockReactive { + + private final RReadWriteLock instance; + private final UUID id; + + public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name, UUID id) { + super(commandExecutor, name); + this.id = id; + this.instance = new RedissonReadWriteLock(commandExecutor, name, id); + } + + @Override + public RLockReactive readLock() { + return new RedissonLockReactive(commandExecutor, getName(), id) { + @Override + protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) { + return instance.readLock(); + } + }; + } + + @Override + public RLockReactive writeLock() { + return new RedissonLockReactive(commandExecutor, getName(), id) { + @Override + protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) { + return instance.writeLock(); + } + }; + } + + +}