diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 2770026f5..bb785dce1 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -30,7 +30,7 @@ import org.redisson.api.RLock; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RPromise; import org.redisson.pubsub.LockPubSub; import org.slf4j.Logger; @@ -63,9 +63,9 @@ public class RedissonLock extends RedissonExpirable implements RLock { protected static final LockPubSub PUBSUB = new LockPubSub(); - final CommandExecutor commandExecutor; + final CommandAsyncExecutor commandExecutor; - protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) { + public RedissonLock(CommandAsyncExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = id; @@ -417,12 +417,14 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public boolean isHeldByCurrentThread() { - return commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId())); + RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId())); + return get(future); } @Override public int getHoldCount() { - Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId())); + RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId())); + Long res = get(future); if (res == null) { return 0; } diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 39f380b54..4e2c18f2c 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -35,6 +35,7 @@ import org.redisson.api.RHyperLogLogReactive; import org.redisson.api.RKeysReactive; import org.redisson.api.RLexSortedSetReactive; import org.redisson.api.RListReactive; +import org.redisson.api.RLockReactive; import org.redisson.api.RMapCacheReactive; import org.redisson.api.RMapReactive; import org.redisson.api.RPatternTopicReactive; @@ -63,6 +64,7 @@ import org.redisson.reactive.RedissonHyperLogLogReactive; import org.redisson.reactive.RedissonKeysReactive; import org.redisson.reactive.RedissonLexSortedSetReactive; import org.redisson.reactive.RedissonListReactive; +import org.redisson.reactive.RedissonLockReactive; import org.redisson.reactive.RedissonMapCacheReactive; import org.redisson.reactive.RedissonMapReactive; import org.redisson.reactive.RedissonPatternTopicReactive; @@ -99,6 +101,10 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getCodecProvider(); } + @Override + public RLockReactive getLock(String name) { + return new RedissonLockReactive(commandExecutor, name, id); + } @Override public RMapCacheReactive getMapCache(String name, Codec codec) { diff --git a/redisson/src/main/java/org/redisson/api/RLockReactive.java b/redisson/src/main/java/org/redisson/api/RLockReactive.java new file mode 100644 index 000000000..3f23252d7 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RLockReactive.java @@ -0,0 +1,54 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RLockReactive extends RExpirableReactive { + + Publisher forceUnlock(); + + Publisher unlock(); + + Publisher unlock(long threadId); + + Publisher tryLock(); + + Publisher lock(); + + Publisher lock(long threadId); + + Publisher lock(long leaseTime, TimeUnit unit); + + Publisher lock(long leaseTime, TimeUnit unit, long threadId); + + Publisher tryLock(long threadId); + + Publisher tryLock(long waitTime, TimeUnit unit); + + Publisher tryLock(long waitTime, long leaseTime, TimeUnit unit); + + Publisher tryLock(long waitTime, long leaseTime, TimeUnit unit, long threadId); + + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index 67f0eff3e..35ffce644 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -30,6 +30,16 @@ import org.redisson.config.Config; */ public interface RedissonReactiveClient { + /** + * Returns lock instance by name. + *

+ * Implements a non-fair locking so doesn't guarantee an acquire order by threads. + * + * @param name - name of object + * @return Lock object + */ + RLockReactive getLock(String name); + /** * Returns set-based cache instance by name. * Supports value eviction with a given TTL value. diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java index 952c6efb0..8285b288a 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java @@ -38,8 +38,6 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor { Publisher reactive(Supplier> supplier); - ConnectionManager getConnectionManager(); - Publisher evalReadReactive(InetSocketAddress client, String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java new file mode 100644 index 000000000..be2c26793 --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java @@ -0,0 +1,164 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonLock; +import org.redisson.api.RFuture; +import org.redisson.api.RLockAsync; +import org.redisson.api.RLockReactive; +import org.redisson.command.CommandReactiveExecutor; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonLockReactive extends RedissonExpirableReactive implements RLockReactive { + + private final RLockAsync instance; + + public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, UUID id) { + super(connectionManager, name); + instance = new RedissonLock(connectionManager, name, id); + } + + @Override + public Publisher forceUnlock() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.forceUnlockAsync(); + } + }); + } + + @Override + public Publisher unlock() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.unlockAsync(); + } + }); + } + + @Override + public Publisher unlock(final long threadId) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.unlockAsync(threadId); + } + }); + } + + @Override + public Publisher tryLock() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryLockAsync(); + } + }); + } + + @Override + public Publisher lock() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.lockAsync(); + } + }); + } + + @Override + public Publisher lock(final long threadId) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.lockAsync(threadId); + } + }); + } + + @Override + public Publisher lock(final long leaseTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.lockAsync(leaseTime, unit); + } + }); + } + + @Override + public Publisher lock(final long leaseTime, final TimeUnit unit, final long threadId) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.lockAsync(leaseTime, unit, threadId); + } + }); + } + + @Override + public Publisher tryLock(final long threadId) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryLockAsync(threadId); + } + }); + } + + @Override + public Publisher tryLock(final long waitTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryLockAsync(waitTime, unit); + } + }); + } + + @Override + public Publisher tryLock(final long waitTime, final long leaseTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryLockAsync(waitTime, leaseTime, unit); + } + }); + } + + @Override + public Publisher tryLock(final long waitTime, final long leaseTime, final TimeUnit unit, final long threadId) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryLockAsync(waitTime, leaseTime, unit, threadId); + } + }); + } + +}