From b81840eb08a9b8835df158e2170608a4cc18ae83 Mon Sep 17 00:00:00 2001
From: Nikita <abracham.mitchell@gmail.com>
Date: Thu, 13 Jul 2017 15:49:17 +0300
Subject: [PATCH] Add RLockReactive #962

---
 .../main/java/org/redisson/RedissonLock.java  |  12 +-
 .../java/org/redisson/RedissonReactive.java   |   6 +
 .../java/org/redisson/api/RLockReactive.java  |  54 ++++++
 .../redisson/api/RedissonReactiveClient.java  |  10 ++
 .../command/CommandReactiveExecutor.java      |   2 -
 .../reactive/RedissonLockReactive.java        | 164 ++++++++++++++++++
 6 files changed, 241 insertions(+), 7 deletions(-)
 create mode 100644 redisson/src/main/java/org/redisson/api/RLockReactive.java
 create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java

diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java
index 2770026f5..bb785dce1 100644
--- a/redisson/src/main/java/org/redisson/RedissonLock.java
+++ b/redisson/src/main/java/org/redisson/RedissonLock.java
@@ -30,7 +30,7 @@ import org.redisson.api.RLock;
 import org.redisson.client.codec.LongCodec;
 import org.redisson.client.protocol.RedisCommands;
 import org.redisson.client.protocol.RedisStrictCommand;
-import org.redisson.command.CommandExecutor;
+import org.redisson.command.CommandAsyncExecutor;
 import org.redisson.misc.RPromise;
 import org.redisson.pubsub.LockPubSub;
 import org.slf4j.Logger;
@@ -63,9 +63,9 @@ public class RedissonLock extends RedissonExpirable implements RLock {
 
     protected static final LockPubSub PUBSUB = new LockPubSub();
 
-    final CommandExecutor commandExecutor;
+    final CommandAsyncExecutor commandExecutor;
 
-    protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
+    public RedissonLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
         super(commandExecutor, name);
         this.commandExecutor = commandExecutor;
         this.id = id;
@@ -417,12 +417,14 @@ public class RedissonLock extends RedissonExpirable implements RLock {
 
     @Override
     public boolean isHeldByCurrentThread() {
-        return commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId()));
+        RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId()));
+        return get(future);
     }
 
     @Override
     public int getHoldCount() {
-        Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId()));
+        RFuture<Long> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId()));
+        Long res = get(future);
         if (res == null) {
             return 0;
         }
diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java
index 39f380b54..4e2c18f2c 100644
--- a/redisson/src/main/java/org/redisson/RedissonReactive.java
+++ b/redisson/src/main/java/org/redisson/RedissonReactive.java
@@ -35,6 +35,7 @@ import org.redisson.api.RHyperLogLogReactive;
 import org.redisson.api.RKeysReactive;
 import org.redisson.api.RLexSortedSetReactive;
 import org.redisson.api.RListReactive;
+import org.redisson.api.RLockReactive;
 import org.redisson.api.RMapCacheReactive;
 import org.redisson.api.RMapReactive;
 import org.redisson.api.RPatternTopicReactive;
@@ -63,6 +64,7 @@ import org.redisson.reactive.RedissonHyperLogLogReactive;
 import org.redisson.reactive.RedissonKeysReactive;
 import org.redisson.reactive.RedissonLexSortedSetReactive;
 import org.redisson.reactive.RedissonListReactive;
+import org.redisson.reactive.RedissonLockReactive;
 import org.redisson.reactive.RedissonMapCacheReactive;
 import org.redisson.reactive.RedissonMapReactive;
 import org.redisson.reactive.RedissonPatternTopicReactive;
@@ -99,6 +101,10 @@ public class RedissonReactive implements RedissonReactiveClient {
         codecProvider = config.getCodecProvider();
     }
 
+    @Override
+    public RLockReactive getLock(String name) {
+        return new RedissonLockReactive(commandExecutor, name, id);
+    }
 
     @Override
     public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {
diff --git a/redisson/src/main/java/org/redisson/api/RLockReactive.java b/redisson/src/main/java/org/redisson/api/RLockReactive.java
new file mode 100644
index 000000000..3f23252d7
--- /dev/null
+++ b/redisson/src/main/java/org/redisson/api/RLockReactive.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright 2016 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson.api;
+
+import java.util.concurrent.TimeUnit;
+
+import org.reactivestreams.Publisher;
+
+/**
+ * 
+ * @author Nikita Koksharov
+ *
+ */
+public interface RLockReactive extends RExpirableReactive {
+
+    Publisher<Boolean> forceUnlock();
+    
+    Publisher<Void> unlock();
+    
+    Publisher<Void> unlock(long threadId);
+    
+    Publisher<Boolean> tryLock();
+
+    Publisher<Void> lock();
+
+    Publisher<Void> lock(long threadId);
+    
+    Publisher<Void> lock(long leaseTime, TimeUnit unit);
+    
+    Publisher<Void> lock(long leaseTime, TimeUnit unit, long threadId);
+    
+    Publisher<Boolean> tryLock(long threadId);
+    
+    Publisher<Boolean> tryLock(long waitTime, TimeUnit unit);
+
+    Publisher<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);
+
+    Publisher<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit, long threadId);
+
+    
+}
diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java
index 67f0eff3e..35ffce644 100644
--- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java
+++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java
@@ -30,6 +30,16 @@ import org.redisson.config.Config;
  */
 public interface RedissonReactiveClient {
 
+    /**
+     * Returns lock instance by name.
+     * <p>
+     * Implements a <b>non-fair</b> locking so doesn't guarantee an acquire order by threads.
+     *
+     * @param name - name of object
+     * @return Lock object
+     */
+    RLockReactive getLock(String name);
+    
     /**
      * Returns set-based cache instance by <code>name</code>.
      * Supports value eviction with a given TTL value.
diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java
index 952c6efb0..8285b288a 100644
--- a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java
+++ b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java
@@ -38,8 +38,6 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
 
     <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier);
 
-    ConnectionManager getConnectionManager();
-
     <T, R> Publisher<R> evalReadReactive(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType,
             String script, List<Object> keys, Object ... params);
 
diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java
new file mode 100644
index 000000000..be2c26793
--- /dev/null
+++ b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java
@@ -0,0 +1,164 @@
+/**
+ * Copyright 2016 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson.reactive;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.reactivestreams.Publisher;
+import org.redisson.RedissonLock;
+import org.redisson.api.RFuture;
+import org.redisson.api.RLockAsync;
+import org.redisson.api.RLockReactive;
+import org.redisson.command.CommandReactiveExecutor;
+
+import reactor.fn.Supplier;
+
+/**
+ * 
+ * @author Nikita Koksharov
+ *
+ */
+public class RedissonLockReactive extends RedissonExpirableReactive implements RLockReactive {
+
+    private final RLockAsync instance;
+    
+    public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, UUID id) {
+        super(connectionManager, name);
+        instance = new RedissonLock(connectionManager, name, id);
+    }
+
+    @Override
+    public Publisher<Boolean> forceUnlock() {
+        return reactive(new Supplier<RFuture<Boolean>>() {
+            @Override
+            public RFuture<Boolean> get() {
+                return instance.forceUnlockAsync();
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Void> unlock() {
+        return reactive(new Supplier<RFuture<Void>>() {
+            @Override
+            public RFuture<Void> get() {
+                return instance.unlockAsync();
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Void> unlock(final long threadId) {
+        return reactive(new Supplier<RFuture<Void>>() {
+            @Override
+            public RFuture<Void> get() {
+                return instance.unlockAsync(threadId);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Boolean> tryLock() {
+        return reactive(new Supplier<RFuture<Boolean>>() {
+            @Override
+            public RFuture<Boolean> get() {
+                return instance.tryLockAsync();
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Void> lock() {
+        return reactive(new Supplier<RFuture<Void>>() {
+            @Override
+            public RFuture<Void> get() {
+                return instance.lockAsync();
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Void> lock(final long threadId) {
+        return reactive(new Supplier<RFuture<Void>>() {
+            @Override
+            public RFuture<Void> get() {
+                return instance.lockAsync(threadId);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Void> lock(final long leaseTime, final TimeUnit unit) {
+        return reactive(new Supplier<RFuture<Void>>() {
+            @Override
+            public RFuture<Void> get() {
+                return instance.lockAsync(leaseTime, unit);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Void> lock(final long leaseTime, final TimeUnit unit, final long threadId) {
+        return reactive(new Supplier<RFuture<Void>>() {
+            @Override
+            public RFuture<Void> get() {
+                return instance.lockAsync(leaseTime, unit, threadId);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Boolean> tryLock(final long threadId) {
+        return reactive(new Supplier<RFuture<Boolean>>() {
+            @Override
+            public RFuture<Boolean> get() {
+                return instance.tryLockAsync(threadId);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Boolean> tryLock(final long waitTime, final TimeUnit unit) {
+        return reactive(new Supplier<RFuture<Boolean>>() {
+            @Override
+            public RFuture<Boolean> get() {
+                return instance.tryLockAsync(waitTime, unit);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Boolean> tryLock(final long waitTime, final long leaseTime, final TimeUnit unit) {
+        return reactive(new Supplier<RFuture<Boolean>>() {
+            @Override
+            public RFuture<Boolean> get() {
+                return instance.tryLockAsync(waitTime, leaseTime, unit);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Boolean> tryLock(final long waitTime, final long leaseTime, final TimeUnit unit, final long threadId) {
+        return reactive(new Supplier<RFuture<Boolean>>() {
+            @Override
+            public RFuture<Boolean> get() {
+                return instance.tryLockAsync(waitTime, leaseTime, unit, threadId);
+            }
+        });
+    }
+    
+}