From 1b89afbd6f88a9c5179e23f8cb345ca32cc76452 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 20 Jul 2017 10:53:26 +0300 Subject: [PATCH] RSemaphoreReactive added #977 --- .../java/org/redisson/RedissonReactive.java | 10 + .../java/org/redisson/RedissonSemaphore.java | 12 +- .../org/redisson/api/RSemaphoreReactive.java | 179 ++++++++++++++++++ .../redisson/api/RedissonReactiveClient.java | 8 + .../reactive/RedissonSemaphoreReactive.java | 152 +++++++++++++++ 5 files changed, 356 insertions(+), 5 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 8f777a8a8..19596e3f9 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -43,6 +43,7 @@ import org.redisson.api.RQueueReactive; import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScriptReactive; +import org.redisson.api.RSemaphoreReactive; import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetReactive; import org.redisson.api.RTopicReactive; @@ -55,6 +56,7 @@ import org.redisson.config.Config; import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; +import org.redisson.pubsub.SemaphorePubSub; import org.redisson.reactive.RedissonAtomicLongReactive; import org.redisson.reactive.RedissonBatchReactive; import org.redisson.reactive.RedissonBitSetReactive; @@ -73,6 +75,7 @@ import org.redisson.reactive.RedissonQueueReactive; import org.redisson.reactive.RedissonReadWriteLockReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonScriptReactive; +import org.redisson.reactive.RedissonSemaphoreReactive; import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetReactive; import org.redisson.reactive.RedissonTopicReactive; @@ -91,7 +94,9 @@ public class RedissonReactive implements RedissonReactiveClient { protected final ConnectionManager connectionManager; protected final Config config; protected final CodecProvider codecProvider; + protected final UUID id = UUID.randomUUID(); + protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(); protected RedissonReactive(Config config) { this.config = config; @@ -103,6 +108,11 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getCodecProvider(); } + @Override + public RSemaphoreReactive getSemaphore(String name) { + return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub); + } + @Override public RReadWriteLockReactive getReadWriteLock(String name) { return new RedissonReadWriteLockReactive(commandExecutor, name, id); diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 14cbd956c..bea274380 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -27,7 +27,7 @@ import org.redisson.api.RSemaphore; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RPromise; import org.redisson.pubsub.SemaphorePubSub; @@ -48,9 +48,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { private final SemaphorePubSub semaphorePubSub; - final CommandExecutor commandExecutor; + final CommandAsyncExecutor commandExecutor; - protected RedissonSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) { + public RedissonSemaphore(CommandAsyncExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.semaphorePubSub = semaphorePubSub; @@ -478,7 +478,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public int drainPermits() { - Long res = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, + RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local value = redis.call('get', KEYS[1]); " + "if (value == false or value == 0) then " + "return 0; " + @@ -486,12 +486,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { "redis.call('set', KEYS[1], 0); " + "return value;", Collections.singletonList(getName())); + Long res = get(future); return res.intValue(); } @Override public int availablePermits() { - Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); + RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); + Long res = get(future); return res.intValue(); } diff --git a/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java new file mode 100644 index 000000000..f9deb34e6 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java @@ -0,0 +1,179 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RSemaphoreReactive extends RExpirableReactive { + + /** + * Acquires a permit only if one is available at the + * time of invocation. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value {@code false}. + * + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + Publisher tryAcquire(); + + /** + * Acquires the given number of permits only if all are available at the + * time of invocation. + * + *

Acquires a permits, if all are available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by given number of permits. + * + *

If no permits are available then this method will return + * immediately with the value {@code false}. + * + * @param permits the number of permits to acquire + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + Publisher tryAcquire(int permits); + + /** + * Acquires a permit from this semaphore. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + * + * @return void + * + */ + Publisher acquire(); + + /** + * Acquires the given number of permits, if they are available, + * and returns immediately, reducing the number of available permits + * by the given amount. + * + * @param permits the number of permits to acquire + * @throws IllegalArgumentException if {@code permits} is negative + * @return void + */ + Publisher acquire(int permits); + + /** + * Releases a permit, returning it to the semaphore. + * + *

Releases a permit, increasing the number of available permits by + * one. If any threads of Redisson client are trying to acquire a permit, + * then one is selected and given the permit that was just released. + * + *

There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link #acquire()}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @return void + */ + Publisher release(); + + /** + * Releases the given number of permits, returning them to the semaphore. + * + *

Releases the given number of permits, increasing the number of available permits by + * the given number of permits. If any threads of Redisson client are trying to + * acquire a permits, then next threads is selected and tries to acquire the permits that was just released. + * + *

There is no requirement that a thread that releases a permits must + * have acquired that permit by calling {@link #acquire()}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @param permits amount + * @return void + */ + Publisher release(int permits); + + /** + * Sets number of permits. + * + * @param permits - number of permits + * @return true if permits has been set successfully, otherwise false. + */ + Publisher trySetPermits(int permits); + + /** + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If a permit is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param waitTime the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(long waitTime, TimeUnit unit); + + /** + * Acquires the given number of permits only if all are available + * within the given waiting time. + * + *

Acquires a permits, if all are available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If a permits is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param permits amount + * @param waitTime the maximum time to wait for a available permits + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(int permits, long waitTime, TimeUnit unit); + + /** + * Shrinks the number of available permits by the indicated + * reduction. This method can be useful in subclasses that use + * semaphores to track resources that become unavailable. This + * method differs from {@link #acquire()} in that it does not block + * waiting for permits to become available. + * + * @param permits - reduction the number of permits to remove + * @return void + * @throws IllegalArgumentException if {@code reduction} is negative + */ + Publisher reducePermits(int permits); + + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index 1105eb7a4..31689bf14 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -30,6 +30,14 @@ import org.redisson.config.Config; */ public interface RedissonReactiveClient { + /** + * Returns semaphore instance by name + * + * @param name - name of object + * @return Semaphore object + */ + RSemaphoreReactive getSemaphore(String name); + /** * Returns readWriteLock instance by name. * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java new file mode 100644 index 000000000..de60f6861 --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java @@ -0,0 +1,152 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonLock; +import org.redisson.RedissonSemaphore; +import org.redisson.api.RFuture; +import org.redisson.api.RLockAsync; +import org.redisson.api.RSemaphoreAsync; +import org.redisson.api.RSemaphoreReactive; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.pubsub.SemaphorePubSub; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonSemaphoreReactive extends RedissonExpirableReactive implements RSemaphoreReactive { + + private final RSemaphoreAsync instance; + + public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { + super(connectionManager, name); + instance = new RedissonSemaphore(commandExecutor, name, semaphorePubSub); + } + + protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) { + return new RedissonLock(commandExecutor, name, id); + } + + @Override + public Publisher tryAcquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(); + } + }); + } + + @Override + public Publisher tryAcquire(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(permits); + } + }); + } + + @Override + public Publisher acquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(); + } + }); + } + + @Override + public Publisher acquire(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(permits); + } + }); + } + + @Override + public Publisher release() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.releaseAsync(); + } + }); + } + + @Override + public Publisher release(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.releaseAsync(permits); + } + }); + } + + @Override + public Publisher trySetPermits(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.trySetPermitsAsync(permits); + } + }); + } + + @Override + public Publisher tryAcquire(final long waitTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(waitTime, unit); + } + }); + } + + @Override + public Publisher tryAcquire(final int permits, final long waitTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(permits, waitTime, unit); + } + }); + } + + @Override + public Publisher reducePermits(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.reducePermitsAsync(permits); + } + }); + } + +}