diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index 8c4bd09cc..ae288171a 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -24,7 +24,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.SemaphorePubSub; @@ -45,13 +45,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen private final SemaphorePubSub semaphorePubSub; - final CommandExecutor commandExecutor; + final CommandAsyncExecutor commandExecutor; private final String timeoutName; private final long nonExpirableTimeout = 922337203685477L; - protected RedissonPermitExpirableSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) { + public RedissonPermitExpirableSemaphore(CommandAsyncExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) { super(commandExecutor, name); this.timeoutName = suffixName(name, "timeout"); this.commandExecutor = commandExecutor; diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index a1ae2da3b..844c9f4e4 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -41,6 +41,7 @@ import org.redisson.api.RLockReactive; import org.redisson.api.RMapCacheReactive; import org.redisson.api.RMapReactive; import org.redisson.api.RPatternTopicReactive; +import org.redisson.api.RPermitExpirableSemaphoreReactive; import org.redisson.api.RQueueReactive; import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RScoredSortedSetReactive; @@ -76,6 +77,7 @@ import org.redisson.reactive.RedissonLockReactive; import org.redisson.reactive.RedissonMapCacheReactive; import org.redisson.reactive.RedissonMapReactive; import org.redisson.reactive.RedissonPatternTopicReactive; +import org.redisson.reactive.RedissonPermitExpirableSemaphoreReactive; import org.redisson.reactive.RedissonQueueReactive; import org.redisson.reactive.RedissonReadWriteLockReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive; @@ -119,6 +121,11 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub); } + @Override + public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name) { + return new RedissonPermitExpirableSemaphoreReactive(commandExecutor, name, semaphorePubSub); + } + @Override public RReadWriteLockReactive getReadWriteLock(String name) { return new RedissonReadWriteLockReactive(commandExecutor, name); diff --git a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java new file mode 100644 index 000000000..495d56465 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java @@ -0,0 +1,224 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; + +/** + * Semaphore object with support of lease time parameter for each acquired permit. + * + *

Each permit identified by own id and could be released only using its id. + * Permit id is a 128-bits unique random identifier generated each time during acquiring. + * + *

Works in non-fair mode. Therefore order of acquiring is unpredictable. + * + * @author Nikita Koksharov + * + */ +public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive { + + /** + * Acquires a permit from this semaphore, blocking until one is + * available, or the thread is {@linkplain Thread#interrupt interrupted}. + * + *

Acquires a permit, if one is available and returns its id, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

+ * + * @return permit id + */ + Publisher acquire(); + + /** + * Acquires a permit with defined lease time from this semaphore, + * blocking until one is available, + * or the thread is {@linkplain Thread#interrupt interrupted}. + * + *

Acquires a permit, if one is available and returns its id, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

+ * + * @param leaseTime - permit lease time + * @param unit - time unit + * @return permit id + */ + Publisher acquire(long leaseTime, TimeUnit unit); + + /** + * Acquires a permit only if one is available at the + * time of invocation. + * + *

Acquires a permit, if one is available and returns immediately, + * with the permit id, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value {@code null}. + * + * @return permit id if a permit was acquired and {@code null} + * otherwise + */ + Publisher tryAcquire(); + + /** + * Acquires a permit from this semaphore, if one becomes available + * within the given waiting time and the current thread has not + * been {@linkplain Thread#interrupt interrupted}. + * + *

Acquires a permit, if one is available and returns immediately, + * with the permit id, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of three things happens: + *

+ * + *

If a permit is acquired then the permit id is returned. + * + *

If the specified waiting time elapses then the value {@code null} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param waitTime the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return permit id if a permit was acquired and {@code null} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(long waitTime, TimeUnit unit); + + /** + * Acquires a permit with defined lease time from this semaphore, + * if one becomes available + * within the given waiting time and the current thread has not + * been {@linkplain Thread#interrupt interrupted}. + * + *

Acquires a permit, if one is available and returns immediately, + * with the permit id, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of three things happens: + *

+ * + *

If a permit is acquired then the permit id is returned. + * + *

If the specified waiting time elapses then the value {@code null} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param waitTime the maximum time to wait for a permit + * @param leaseTime permit lease time + * @param unit the time unit of the {@code timeout} argument + * @return permit id if a permit was acquired and {@code null} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(long waitTime, long leaseTime, TimeUnit unit); + + /** + * Releases a permit by its id, returning it to the semaphore. + * + *

Releases a permit, increasing the number of available permits by + * one. If any threads of Redisson client are trying to acquire a permit, + * then one is selected and given the permit that was just released. + * + *

There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link #acquire()}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @param permitId - permit id + * @return {@code true} if a permit has been released and {@code false} + * otherwise + */ + Publisher tryRelease(String permitId); + + /** + * Releases a permit by its id, returning it to the semaphore. + * + *

Releases a permit, increasing the number of available permits by + * one. If any threads of Redisson client are trying to acquire a permit, + * then one is selected and given the permit that was just released. + * + *

There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link #acquire()}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + *

Throws an exception if permit id doesn't exist or has already been release + * + * @param permitId - permit id + * @return void + */ + Publisher release(String permitId); + + /** + * Returns the current number of available permits. + * + * @return number of available permits + */ + Publisher availablePermits(); + + /** + * Sets number of permits. + * + * @param permits - number of permits + * @return true if permits has been set successfully, otherwise false. + */ + Publisher trySetPermits(int permits); + + /** + * Increases or decreases the number of available permits by defined value. + * + * @param permits - number of permits to add/remove + * @return void + */ + Publisher addPermits(int permits); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index c576c605a..2ac9b3fe4 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -38,6 +38,15 @@ public interface RedissonReactiveClient { */ RSemaphoreReactive getSemaphore(String name); + /** + * Returns semaphore instance by name. + * Supports lease time parameter for each acquired permit. + * + * @param name - name of object + * @return PermitExpirableSemaphore object + */ + RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name); + /** * Returns readWriteLock instance by name. * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java new file mode 100644 index 000000000..f6047026e --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java @@ -0,0 +1,151 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonLock; +import org.redisson.RedissonPermitExpirableSemaphore; +import org.redisson.api.RFuture; +import org.redisson.api.RLockAsync; +import org.redisson.api.RPermitExpirableSemaphoreAsync; +import org.redisson.api.RPermitExpirableSemaphoreReactive; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.pubsub.SemaphorePubSub; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonPermitExpirableSemaphoreReactive extends RedissonExpirableReactive implements RPermitExpirableSemaphoreReactive { + + private final RPermitExpirableSemaphoreAsync instance; + + public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { + super(connectionManager, name); + instance = new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub); + } + + protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { + return new RedissonLock(commandExecutor, name); + } + + @Override + public Publisher acquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(); + } + }); + } + + @Override + public Publisher acquire(final long leaseTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(leaseTime, unit); + } + }); + } + + @Override + public Publisher tryAcquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(); + } + }); + } + + @Override + public Publisher tryAcquire(final long waitTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(waitTime, unit); + } + }); + } + + @Override + public Publisher tryAcquire(final long waitTime, final long leaseTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(waitTime, leaseTime, unit); + } + }); + } + + @Override + public Publisher tryRelease(final String permitId) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryReleaseAsync(permitId); + } + }); + } + + @Override + public Publisher release(final String permitId) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.releaseAsync(permitId); + } + }); + } + + @Override + public Publisher availablePermits() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.availablePermitsAsync(); + } + }); + } + + @Override + public Publisher trySetPermits(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.trySetPermitsAsync(permits); + } + }); + } + + @Override + public Publisher addPermits(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addPermitsAsync(permits); + } + }); + } + +}