From 2cea6711ff52afc112b6bdd49658cc3b784a959c Mon Sep 17 00:00:00 2001
From: Nikita <abracham.mitchell@gmail.com>
Date: Mon, 16 Apr 2018 15:23:01 +0300
Subject: [PATCH] RPermitExpirableSemaphoreReactive object added. #1391

---
 .../RedissonPermitExpirableSemaphore.java     |   6 +-
 .../java/org/redisson/RedissonReactive.java   |   7 +
 .../RPermitExpirableSemaphoreReactive.java    | 224 ++++++++++++++++++
 .../redisson/api/RedissonReactiveClient.java  |   9 +
 ...issonPermitExpirableSemaphoreReactive.java | 151 ++++++++++++
 5 files changed, 394 insertions(+), 3 deletions(-)
 create mode 100644 redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java
 create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java

diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java
index 8c4bd09cc..ae288171a 100644
--- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java
+++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java
@@ -24,7 +24,7 @@ import org.redisson.api.RFuture;
 import org.redisson.api.RPermitExpirableSemaphore;
 import org.redisson.client.codec.LongCodec;
 import org.redisson.client.protocol.RedisCommands;
-import org.redisson.command.CommandExecutor;
+import org.redisson.command.CommandAsyncExecutor;
 import org.redisson.misc.RPromise;
 import org.redisson.misc.RedissonPromise;
 import org.redisson.pubsub.SemaphorePubSub;
@@ -45,13 +45,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
 
     private final SemaphorePubSub semaphorePubSub;
 
-    final CommandExecutor commandExecutor;
+    final CommandAsyncExecutor commandExecutor;
 
     private final String timeoutName;
     
     private final long nonExpirableTimeout = 922337203685477L;
 
-    protected RedissonPermitExpirableSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) {
+    public RedissonPermitExpirableSemaphore(CommandAsyncExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) {
         super(commandExecutor, name);
         this.timeoutName = suffixName(name, "timeout");
         this.commandExecutor = commandExecutor;
diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java
index a1ae2da3b..844c9f4e4 100644
--- a/redisson/src/main/java/org/redisson/RedissonReactive.java
+++ b/redisson/src/main/java/org/redisson/RedissonReactive.java
@@ -41,6 +41,7 @@ import org.redisson.api.RLockReactive;
 import org.redisson.api.RMapCacheReactive;
 import org.redisson.api.RMapReactive;
 import org.redisson.api.RPatternTopicReactive;
+import org.redisson.api.RPermitExpirableSemaphoreReactive;
 import org.redisson.api.RQueueReactive;
 import org.redisson.api.RReadWriteLockReactive;
 import org.redisson.api.RScoredSortedSetReactive;
@@ -76,6 +77,7 @@ import org.redisson.reactive.RedissonLockReactive;
 import org.redisson.reactive.RedissonMapCacheReactive;
 import org.redisson.reactive.RedissonMapReactive;
 import org.redisson.reactive.RedissonPatternTopicReactive;
+import org.redisson.reactive.RedissonPermitExpirableSemaphoreReactive;
 import org.redisson.reactive.RedissonQueueReactive;
 import org.redisson.reactive.RedissonReadWriteLockReactive;
 import org.redisson.reactive.RedissonScoredSortedSetReactive;
@@ -119,6 +121,11 @@ public class RedissonReactive implements RedissonReactiveClient {
         return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub);
     }
     
+    @Override
+    public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name) {
+        return new RedissonPermitExpirableSemaphoreReactive(commandExecutor, name, semaphorePubSub);        
+    }
+    
     @Override
     public RReadWriteLockReactive getReadWriteLock(String name) {
         return new RedissonReadWriteLockReactive(commandExecutor, name);
diff --git a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java
new file mode 100644
index 000000000..495d56465
--- /dev/null
+++ b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java
@@ -0,0 +1,224 @@
+/**
+ * Copyright 2018 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson.api;
+
+import java.util.concurrent.TimeUnit;
+
+import org.reactivestreams.Publisher;
+
+/**
+ * Semaphore object with support of lease time parameter for each acquired permit.
+ * 
+ * <p>Each permit identified by own id and could be released only using its id.
+ * Permit id is a 128-bits unique random identifier generated each time during acquiring.
+ *   
+ * <p>Works in non-fair mode. Therefore order of acquiring is unpredictable.
+ * 
+ * @author Nikita Koksharov
+ *
+ */
+public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
+    
+    /**
+     * Acquires a permit from this semaphore, blocking until one is
+     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns its id,
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of two things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #release(String)} method for this
+     * semaphore and the current thread is next to be assigned a permit; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread.
+     * </ul>
+     * 
+     * @return permit id
+     */
+    Publisher<String> acquire();
+    
+    /**
+     * Acquires a permit with defined lease time from this semaphore, 
+     * blocking until one is available, 
+     * or the thread is {@linkplain Thread#interrupt interrupted}.
+     * 
+     * <p>Acquires a permit, if one is available and returns its id,
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of two things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread.
+     * </ul>
+     * 
+     * @param leaseTime - permit lease time
+     * @param unit - time unit
+     * @return permit id
+     */
+    Publisher<String> acquire(long leaseTime, TimeUnit unit);
+    
+    /**
+     * Acquires a permit only if one is available at the
+     * time of invocation.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * with the permit id,
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then this method will return
+     * immediately with the value {@code null}.
+     *
+     * @return permit id if a permit was acquired and {@code null}
+     *         otherwise
+     */
+    Publisher<String> tryAcquire();
+
+    /**
+     * Acquires a permit from this semaphore, if one becomes available
+     * within the given waiting time and the current thread has not
+     * been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * with the permit id,
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of three things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #release(String)} method for this
+     * semaphore and the current thread is next to be assigned a permit; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     *
+     * <p>If a permit is acquired then the permit id is returned.
+     *
+     * <p>If the specified waiting time elapses then the value {@code null}
+     * is returned. If the time is less than or equal to zero, the method
+     * will not wait at all.
+     * 
+     * @param waitTime the maximum time to wait for a permit
+     * @param unit the time unit of the {@code timeout} argument
+     * @return permit id if a permit was acquired and {@code null}
+     *         if the waiting time elapsed before a permit was acquired
+     */
+    Publisher<String> tryAcquire(long waitTime, TimeUnit unit);
+
+    /**
+     * Acquires a permit with defined lease time from this semaphore,
+     * if one becomes available
+     * within the given waiting time and the current thread has not
+     * been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * with the permit id,
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of three things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #release(String)} method for this
+     * semaphore and the current thread is next to be assigned a permit; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     *
+     * <p>If a permit is acquired then the permit id is returned.
+     *
+     * <p>If the specified waiting time elapses then the value {@code null}
+     * is returned. If the time is less than or equal to zero, the method
+     * will not wait at all.
+     * 
+     * @param waitTime the maximum time to wait for a permit
+     * @param leaseTime permit lease time
+     * @param unit the time unit of the {@code timeout} argument
+     * @return permit id if a permit was acquired and {@code null}
+     *         if the waiting time elapsed before a permit was acquired
+     */
+    Publisher<String> tryAcquire(long waitTime, long leaseTime, TimeUnit unit);
+
+    /**
+     * Releases a permit by its id, returning it to the semaphore.
+     *
+     * <p>Releases a permit, increasing the number of available permits by
+     * one. If any threads of Redisson client are trying to acquire a permit,
+     * then one is selected and given the permit that was just released.
+     *
+     * <p>There is no requirement that a thread that releases a permit must
+     * have acquired that permit by calling {@link #acquire()}.
+     * Correct usage of a semaphore is established by programming convention
+     * in the application.
+     * 
+     * @param permitId - permit id
+     * @return {@code true} if a permit has been released and {@code false}
+     *         otherwise
+     */
+    Publisher<Boolean> tryRelease(String permitId);
+
+    /**
+     * Releases a permit by its id, returning it to the semaphore.
+     *
+     * <p>Releases a permit, increasing the number of available permits by
+     * one. If any threads of Redisson client are trying to acquire a permit,
+     * then one is selected and given the permit that was just released.
+     *
+     * <p>There is no requirement that a thread that releases a permit must
+     * have acquired that permit by calling {@link #acquire()}.
+     * Correct usage of a semaphore is established by programming convention
+     * in the application.
+     * 
+     * <p>Throws an exception if permit id doesn't exist or has already been release
+     * 
+     * @param permitId - permit id
+     * @return void
+     */
+    Publisher<Void> release(String permitId);
+
+    /**
+     * Returns the current number of available permits.
+     *
+     * @return number of available permits
+     */
+    Publisher<Integer> availablePermits();
+
+    /**
+     * Sets number of permits.
+     *
+     * @param permits - number of permits
+     * @return <code>true</code> if permits has been set successfully, otherwise <code>false</code>.  
+     */
+    Publisher<Boolean> trySetPermits(int permits);
+
+    /**
+     * Increases or decreases the number of available permits by defined value. 
+     *
+     * @param permits - number of permits to add/remove
+     * @return void
+     */
+    Publisher<Void> addPermits(int permits);
+
+}
diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java
index c576c605a..2ac9b3fe4 100644
--- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java
+++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java
@@ -38,6 +38,15 @@ public interface RedissonReactiveClient {
      */
     RSemaphoreReactive getSemaphore(String name);
     
+    /**
+     * Returns semaphore instance by name.
+     * Supports lease time parameter for each acquired permit.
+     * 
+     * @param name - name of object
+     * @return PermitExpirableSemaphore object
+     */
+    RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name);
+    
     /**
      * Returns readWriteLock instance by name.
      *
diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java
new file mode 100644
index 000000000..f6047026e
--- /dev/null
+++ b/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java
@@ -0,0 +1,151 @@
+/**
+ * Copyright 2018 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson.reactive;
+
+import java.util.concurrent.TimeUnit;
+
+import org.reactivestreams.Publisher;
+import org.redisson.RedissonLock;
+import org.redisson.RedissonPermitExpirableSemaphore;
+import org.redisson.api.RFuture;
+import org.redisson.api.RLockAsync;
+import org.redisson.api.RPermitExpirableSemaphoreAsync;
+import org.redisson.api.RPermitExpirableSemaphoreReactive;
+import org.redisson.command.CommandAsyncExecutor;
+import org.redisson.command.CommandReactiveExecutor;
+import org.redisson.pubsub.SemaphorePubSub;
+
+import reactor.fn.Supplier;
+
+/**
+ * 
+ * @author Nikita Koksharov
+ *
+ */
+public class RedissonPermitExpirableSemaphoreReactive extends RedissonExpirableReactive implements RPermitExpirableSemaphoreReactive {
+
+    private final RPermitExpirableSemaphoreAsync instance;
+    
+    public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
+        super(connectionManager, name);
+        instance = new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub);
+    }
+
+    protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
+        return new RedissonLock(commandExecutor, name);
+    }
+
+    @Override
+    public Publisher<String> acquire() {
+        return reactive(new Supplier<RFuture<String>>() {
+            @Override
+            public RFuture<String> get() {
+                return instance.acquireAsync();
+            }
+        });
+    }
+
+    @Override
+    public Publisher<String> acquire(final long leaseTime, final TimeUnit unit) {
+        return reactive(new Supplier<RFuture<String>>() {
+            @Override
+            public RFuture<String> get() {
+                return instance.acquireAsync(leaseTime, unit);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<String> tryAcquire() {
+        return reactive(new Supplier<RFuture<String>>() {
+            @Override
+            public RFuture<String> get() {
+                return instance.tryAcquireAsync();
+            }
+        });
+    }
+
+    @Override
+    public Publisher<String> tryAcquire(final long waitTime, final TimeUnit unit) {
+        return reactive(new Supplier<RFuture<String>>() {
+            @Override
+            public RFuture<String> get() {
+                return instance.tryAcquireAsync(waitTime, unit);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<String> tryAcquire(final long waitTime, final long leaseTime, final TimeUnit unit) {
+        return reactive(new Supplier<RFuture<String>>() {
+            @Override
+            public RFuture<String> get() {
+                return instance.tryAcquireAsync(waitTime, leaseTime, unit);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Boolean> tryRelease(final String permitId) {
+        return reactive(new Supplier<RFuture<Boolean>>() {
+            @Override
+            public RFuture<Boolean> get() {
+                return instance.tryReleaseAsync(permitId);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Void> release(final String permitId) {
+        return reactive(new Supplier<RFuture<Void>>() {
+            @Override
+            public RFuture<Void> get() {
+                return instance.releaseAsync(permitId);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Integer> availablePermits() {
+        return reactive(new Supplier<RFuture<Integer>>() {
+            @Override
+            public RFuture<Integer> get() {
+                return instance.availablePermitsAsync();
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Boolean> trySetPermits(final int permits) {
+        return reactive(new Supplier<RFuture<Boolean>>() {
+            @Override
+            public RFuture<Boolean> get() {
+                return instance.trySetPermitsAsync(permits);
+            }
+        });
+    }
+
+    @Override
+    public Publisher<Void> addPermits(final int permits) {
+        return reactive(new Supplier<RFuture<Void>>() {
+            @Override
+            public RFuture<Void> get() {
+                return instance.addPermitsAsync(permits);
+            }
+        });
+    }
+    
+}